使用 popen 和专用 TTY Python 运行交互式 Bash

2024-11-01 08:41:00
admin
原创
183
摘要:问题描述:我需要在 Python 中一个独立的进程中运行一个交互式 Bash 实例,它有自己的专用 TTY(我不能使用 pexpect)。我使用了我在类似程序中经常看到的代码片段:master, slave = pty.openpty() p = subprocess.Popen(["/b...

问题描述:

我需要在 Python 中一个独立的进程中运行一个交互式 Bash 实例,它有自己的专用 TTY(我不能使用 pexpect)。我使用了我在类似程序中经常看到的代码片段:

master, slave = pty.openpty()

p = subprocess.Popen(["/bin/bash", "-i"], stdin=slave, stdout=slave, stderr=slave)

os.close(slave)

x = os.read(master, 1026)

print x

subprocess.Popen.kill(p)
os.close(master)

但当我运行它时我得到以下输出:

$ ./pty_try.py
bash: cannot set terminal process group (10790): Inappropriate ioctl for device
bash: no job control in this shell

运行的strace显示一些错误:

...
readlink("/usr/bin/python2.7", 0x7ffc8db02510, 4096) = -1 EINVAL (Invalid argument)
...
ioctl(3, SNDCTL_TMR_TIMEBASE or SNDRV_TIMER_IOCTL_NEXT_DEVICE or TCGETS, 0x7ffc8db03590) = -1 ENOTTY (Inappropriate ioctl for device)
...
readlink("./pty_try.py", 0x7ffc8db00610, 4096) = -1 EINVAL (Invalid argument)

代码片段看起来很简单,Bash 没有得到它需要的东西吗?这里可能是什么问题?


解决方案 1:

这是在子进程中运行交互式命令的解决方案。它使用伪终端使 stdout 非阻塞(某些命令也需要 tty 设备,例如 bash)。它使用 select 来处理子进程的输入和输出。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()


try:
    # use os.setsid() make it run in a new process group, or bash job control will not be enabled
    p = Popen(command,
              preexec_fn=os.setsid,
              stdin=slave_fd,
              stdout=slave_fd,
              stderr=slave_fd,
              universal_newlines=True)

    while p.poll() is None:
        r, w, e = select.select([sys.stdin, master_fd], [], [])
        if sys.stdin in r:
            d = os.read(sys.stdin.fileno(), 10240)
            os.write(master_fd, d)
        elif master_fd in r:
            o = os.read(master_fd, 10240)
            if o:
                os.write(sys.stdout.fileno(), o)
finally:
    # restore tty settings back
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

解决方案 2:

这是最终对我有用的解决方案(按照 qarma 的建议):

libc = ctypes.CDLL('libc.so.6')

master, slave = pty.openpty()
p = subprocess.Popen(["/bin/bash", "-i"], preexec_fn=libc.setsid, stdin=slave, stdout=slave, stderr=slave)
os.close(slave)

... do stuff here ...

x = os.read(master, 1026)
print x

解决方案 3:

这是一个完全面向对象的解决方案,使用线程和队列执行带有 TTY 的交互式 shell 命令,用于 stdout 和 stderr IO 处理。我花了一段时间从多个位置构建它,但它到目前为止在 Unix/Linux 系统上以及作为 Juniper 操作脚本的一部分运行良好。我想在这里发布它以节省其他人尝试构建类似东西的时间。

import pty
import re
import select
import threading
from datetime import datetime, timedelta
import os
import logging
import subprocess
import time
from queue import Queue, Empty

lib_logger = logging.getLogger("lib")

# Handler function to be run as a thread for pulling pty channels from an interactive shell
def _pty_handler(pty_master, logger, queue, stop):
    poller = select.poll()
    poller.register(pty_master, select.POLLIN)
    while True:
        # Stop handler if flagged
        if stop():
            logger.debug("Disabling pty handler for interactive shell")
            break

        fd_event = poller.poll(100)
        for descriptor, event in fd_event:
            # Read data from pipe and send to queue if there is data to read
            if event == select.POLLIN:
                data = os.read(descriptor, 1).decode("utf-8")
                if not data:
                    break
                # logger.debug("Reading in to handler queue: " + data)
                queue.put(data)
            # Exit handler if stdout is closing
            elif event == select.POLLHUP:
                logger.debug("Disabling pty handler for interactive shell")
                break


# Function for reading outputs from the given queue by draining it and returning the output
def _get_queue_output(queue: Queue) -> str:
    value = ""
    try:
        while True:
            value += queue.get_nowait()
    except Empty:
        return value


# Helper function to create the needed list for popen and print the command run to the logger
def popen_command(command, logger, *args):
    popen_list = list()
    popen_list.append(command)
    command_output = command
    for arg in args:
        popen_list.append(arg)
        command_output += " " + arg
    lib_logger.debug("Making Popen call using: " + str(popen_list))
    logger.debug("")
    logger.debug(command_output)
    logger.debug("")

    return popen_list


# Class for create an interactive shell and sending commands to it along with logging output to loggers
class InteractiveShell(object):
    def __init__(self, command, logger, *args):
        self.logger = logger
        self.command = command
        self.process = None
        self.popen_list = popen_command(command, logger, *args)
        self.master_stdout = None
        self.slave_stdout = None
        self.master_stderr = None
        self.slave_stderr = None
        self.stdout_handler = None
        self.stderr_handler = None
        self.stdout_queue = None
        self.stderr_queue = None
        self.stop_handlers = False

    # Open interactive shell and setup all threaded IO handlers
    def open(self, shell_prompt, timeout=DEVICE_TIMEOUT):
        # Create PTYs
        self.master_stdout, self.slave_stdout = pty.openpty()
        self.master_stderr, self.slave_stderr = pty.openpty()

        # Create shell subprocess
        self.process = subprocess.Popen(self.popen_list, stdin=self.slave_stdout, stdout=self.slave_stdout,
                                        stderr=self.slave_stderr, bufsize=0, start_new_session=True)

        lib_logger.debug("")
        lib_logger.debug("Started interactive shell for command " + self.command)
        lib_logger.debug("")

        # Create thread and queues for handling pty output and start them
        self.stdout_queue = Queue()
        self.stderr_queue = Queue()
        self.stdout_handler = threading.Thread(target=_pty_handler, args=(self.master_stdout,
                                                                          lib_logger,
                                                                          self.stdout_queue,
                                                                          lambda: self.stop_handlers))
        self.stderr_handler = threading.Thread(target=_pty_handler, args=(self.master_stderr,
                                                                          lib_logger,
                                                                          self.stderr_queue,
                                                                          lambda: self.stop_handlers))
        self.stdout_handler.daemon = True
        self.stderr_handler.daemon = True
        lib_logger.debug("Enabling stderr handler for interactive shell " + self.command)
        self.stderr_handler.start()
        lib_logger.debug("Enabling stdout handler for interactive shell " + self.command)
        self.stdout_handler.start()

        # Wait for shell prompt
        lib_logger.debug("Waiting for shell prompt: " + shell_prompt)
        return self.wait_for(shell_prompt, timeout)

    # Close interactive shell which should also kill all threaded IO handlers
    def close(self):
        # Wait 5 seconds before closing to let shell handle all input and outputs
        time.sleep(5)

        # Stop IO handler threads and terminate the process then wait another 5 seconds for cleanup to happen
        self.stop_handlers = True
        self.process.terminate()
        time.sleep(5)

        # Check for any additional output from the stdout handler
        output = ""
        while True:
            data = _get_queue_output(self.stdout_queue)
            if data != "":
                output += data
            else:
                break
        for line in iter(output.splitlines()):
            self.logger.debug(line)

        # Check for any additional output from the stderr handler
        output = ""
        while True:
            data = _get_queue_output(self.stderr_queue)
            if data != "":
                output += data
            else:
                break
        for line in iter(output.splitlines()):
            self.logger.error(line)

        # Cleanup PTYs
        os.close(self.master_stdout)
        os.close(self.master_stderr)
        os.close(self.slave_stdout)
        os.close(self.slave_stderr)

        lib_logger.debug("Interactive shell command " + self.command + " terminated")

    # Run series of commands given as a list of a list of commands and wait_for strings. If no wait_for is needed then
    # only provide the command. Return if all the commands completed successfully or not.
    # Ex:
    # [
    #     ["ssh jsas@" + vnf_ip, r"jsas@.*:"],
    #     ["juniper123", r"jsas@.*$"],
    #     ["sudo su", r".*jsas:"],
    #     ["juniper123", r"root@.*#"],
    #     ["usermod -p 'blah' jsas"]
    # ]
    def run_commands(self, commands_list):
        shell_status = True
        for command in commands_list:
            shell_status = self.run(command[0])
            if shell_status and len(command) == 2:
                shell_status = self.wait_for(command[1])

            # Break out of running commands if a command failed
            if not shell_status:
                break

        return shell_status

    # Run given command and return False if error occurs otherwise return True
    def run(self, command, sleep=0):
        # Check process to make sure it is still running and if not grab the stderr output
        if self.process.poll():
            self.logger.error("Interactive shell command " + self.command + " closed with return code: " +
                              self.process.returncode)
            data = _get_queue_output(self.stderr_queue)
            if data != "":
                self.logger.error("Interactive shell error messages:")
                for line in iter(data.splitlines()):
                    self.logger.error(line)
            return False

        # Write command to process and check to make sure a newline is in command otherwise add it
        if "
" not in command:
            command += "
"
        os.write(self.master_stdout, command.encode("utf-8"))
        if sleep:
            time.sleep(sleep)

        return True

    # Wait for specific regex expression in output before continuing return False if wait time expires otherwise return
    # True
    def wait_for(self, this, timeout=DEVICE_TIMEOUT):
        timeout = datetime.now() + timedelta(seconds=timeout)
        output = ""

        # Keep searching for output until timeout occurs
        while timeout > datetime.now():
            data = _get_queue_output(self.stdout_queue)
            if data != "":
                # Add to output line and check for match to regex given and if match then break and send output to
                # logger
                output += data
                lib_logger.debug("Checking for " + this + " in data: ")
                for line in iter(output.splitlines()):
                    lib_logger.debug(line)
                if re.search(r"{}s?$".format(this), output):
                    break
            time.sleep(1)

        # Send output to logger
        for line in iter(output.splitlines()):
            self.logger.debug(line)

        # If wait time expired print error message and return False
        if timeout < datetime.now():
            self.logger.error("Wait time expired when waiting for " + this)
            return False

        return True
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用