使用 popen 和专用 TTY Python 运行交互式 Bash

2024-11-01 08:41:00
admin
原创
39
摘要:问题描述:我需要在 Python 中一个独立的进程中运行一个交互式 Bash 实例,它有自己的专用 TTY(我不能使用 pexpect)。我使用了我在类似程序中经常看到的代码片段:master, slave = pty.openpty() p = subprocess.Popen(["/b...

问题描述:

我需要在 Python 中一个独立的进程中运行一个交互式 Bash 实例,它有自己的专用 TTY(我不能使用 pexpect)。我使用了我在类似程序中经常看到的代码片段:

master, slave = pty.openpty()

p = subprocess.Popen(["/bin/bash", "-i"], stdin=slave, stdout=slave, stderr=slave)

os.close(slave)

x = os.read(master, 1026)

print x

subprocess.Popen.kill(p)
os.close(master)

但当我运行它时我得到以下输出:

$ ./pty_try.py
bash: cannot set terminal process group (10790): Inappropriate ioctl for device
bash: no job control in this shell

运行的strace显示一些错误:

...
readlink("/usr/bin/python2.7", 0x7ffc8db02510, 4096) = -1 EINVAL (Invalid argument)
...
ioctl(3, SNDCTL_TMR_TIMEBASE or SNDRV_TIMER_IOCTL_NEXT_DEVICE or TCGETS, 0x7ffc8db03590) = -1 ENOTTY (Inappropriate ioctl for device)
...
readlink("./pty_try.py", 0x7ffc8db00610, 4096) = -1 EINVAL (Invalid argument)

代码片段看起来很简单,Bash 没有得到它需要的东西吗?这里可能是什么问题?


解决方案 1:

这是在子进程中运行交互式命令的解决方案。它使用伪终端使 stdout 非阻塞(某些命令也需要 tty 设备,例如 bash)。它使用 select 来处理子进程的输入和输出。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()


try:
    # use os.setsid() make it run in a new process group, or bash job control will not be enabled
    p = Popen(command,
              preexec_fn=os.setsid,
              stdin=slave_fd,
              stdout=slave_fd,
              stderr=slave_fd,
              universal_newlines=True)

    while p.poll() is None:
        r, w, e = select.select([sys.stdin, master_fd], [], [])
        if sys.stdin in r:
            d = os.read(sys.stdin.fileno(), 10240)
            os.write(master_fd, d)
        elif master_fd in r:
            o = os.read(master_fd, 10240)
            if o:
                os.write(sys.stdout.fileno(), o)
finally:
    # restore tty settings back
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

解决方案 2:

这是最终对我有用的解决方案(按照 qarma 的建议):

libc = ctypes.CDLL('libc.so.6')

master, slave = pty.openpty()
p = subprocess.Popen(["/bin/bash", "-i"], preexec_fn=libc.setsid, stdin=slave, stdout=slave, stderr=slave)
os.close(slave)

... do stuff here ...

x = os.read(master, 1026)
print x

解决方案 3:

这是一个完全面向对象的解决方案,使用线程和队列执行带有 TTY 的交互式 shell 命令,用于 stdout 和 stderr IO 处理。我花了一段时间从多个位置构建它,但它到目前为止在 Unix/Linux 系统上以及作为 Juniper 操作脚本的一部分运行良好。我想在这里发布它以节省其他人尝试构建类似东西的时间。

import pty
import re
import select
import threading
from datetime import datetime, timedelta
import os
import logging
import subprocess
import time
from queue import Queue, Empty

lib_logger = logging.getLogger("lib")

# Handler function to be run as a thread for pulling pty channels from an interactive shell
def _pty_handler(pty_master, logger, queue, stop):
    poller = select.poll()
    poller.register(pty_master, select.POLLIN)
    while True:
        # Stop handler if flagged
        if stop():
            logger.debug("Disabling pty handler for interactive shell")
            break

        fd_event = poller.poll(100)
        for descriptor, event in fd_event:
            # Read data from pipe and send to queue if there is data to read
            if event == select.POLLIN:
                data = os.read(descriptor, 1).decode("utf-8")
                if not data:
                    break
                # logger.debug("Reading in to handler queue: " + data)
                queue.put(data)
            # Exit handler if stdout is closing
            elif event == select.POLLHUP:
                logger.debug("Disabling pty handler for interactive shell")
                break


# Function for reading outputs from the given queue by draining it and returning the output
def _get_queue_output(queue: Queue) -> str:
    value = ""
    try:
        while True:
            value += queue.get_nowait()
    except Empty:
        return value


# Helper function to create the needed list for popen and print the command run to the logger
def popen_command(command, logger, *args):
    popen_list = list()
    popen_list.append(command)
    command_output = command
    for arg in args:
        popen_list.append(arg)
        command_output += " " + arg
    lib_logger.debug("Making Popen call using: " + str(popen_list))
    logger.debug("")
    logger.debug(command_output)
    logger.debug("")

    return popen_list


# Class for create an interactive shell and sending commands to it along with logging output to loggers
class InteractiveShell(object):
    def __init__(self, command, logger, *args):
        self.logger = logger
        self.command = command
        self.process = None
        self.popen_list = popen_command(command, logger, *args)
        self.master_stdout = None
        self.slave_stdout = None
        self.master_stderr = None
        self.slave_stderr = None
        self.stdout_handler = None
        self.stderr_handler = None
        self.stdout_queue = None
        self.stderr_queue = None
        self.stop_handlers = False

    # Open interactive shell and setup all threaded IO handlers
    def open(self, shell_prompt, timeout=DEVICE_TIMEOUT):
        # Create PTYs
        self.master_stdout, self.slave_stdout = pty.openpty()
        self.master_stderr, self.slave_stderr = pty.openpty()

        # Create shell subprocess
        self.process = subprocess.Popen(self.popen_list, stdin=self.slave_stdout, stdout=self.slave_stdout,
                                        stderr=self.slave_stderr, bufsize=0, start_new_session=True)

        lib_logger.debug("")
        lib_logger.debug("Started interactive shell for command " + self.command)
        lib_logger.debug("")

        # Create thread and queues for handling pty output and start them
        self.stdout_queue = Queue()
        self.stderr_queue = Queue()
        self.stdout_handler = threading.Thread(target=_pty_handler, args=(self.master_stdout,
                                                                          lib_logger,
                                                                          self.stdout_queue,
                                                                          lambda: self.stop_handlers))
        self.stderr_handler = threading.Thread(target=_pty_handler, args=(self.master_stderr,
                                                                          lib_logger,
                                                                          self.stderr_queue,
                                                                          lambda: self.stop_handlers))
        self.stdout_handler.daemon = True
        self.stderr_handler.daemon = True
        lib_logger.debug("Enabling stderr handler for interactive shell " + self.command)
        self.stderr_handler.start()
        lib_logger.debug("Enabling stdout handler for interactive shell " + self.command)
        self.stdout_handler.start()

        # Wait for shell prompt
        lib_logger.debug("Waiting for shell prompt: " + shell_prompt)
        return self.wait_for(shell_prompt, timeout)

    # Close interactive shell which should also kill all threaded IO handlers
    def close(self):
        # Wait 5 seconds before closing to let shell handle all input and outputs
        time.sleep(5)

        # Stop IO handler threads and terminate the process then wait another 5 seconds for cleanup to happen
        self.stop_handlers = True
        self.process.terminate()
        time.sleep(5)

        # Check for any additional output from the stdout handler
        output = ""
        while True:
            data = _get_queue_output(self.stdout_queue)
            if data != "":
                output += data
            else:
                break
        for line in iter(output.splitlines()):
            self.logger.debug(line)

        # Check for any additional output from the stderr handler
        output = ""
        while True:
            data = _get_queue_output(self.stderr_queue)
            if data != "":
                output += data
            else:
                break
        for line in iter(output.splitlines()):
            self.logger.error(line)

        # Cleanup PTYs
        os.close(self.master_stdout)
        os.close(self.master_stderr)
        os.close(self.slave_stdout)
        os.close(self.slave_stderr)

        lib_logger.debug("Interactive shell command " + self.command + " terminated")

    # Run series of commands given as a list of a list of commands and wait_for strings. If no wait_for is needed then
    # only provide the command. Return if all the commands completed successfully or not.
    # Ex:
    # [
    #     ["ssh jsas@" + vnf_ip, r"jsas@.*:"],
    #     ["juniper123", r"jsas@.*$"],
    #     ["sudo su", r".*jsas:"],
    #     ["juniper123", r"root@.*#"],
    #     ["usermod -p 'blah' jsas"]
    # ]
    def run_commands(self, commands_list):
        shell_status = True
        for command in commands_list:
            shell_status = self.run(command[0])
            if shell_status and len(command) == 2:
                shell_status = self.wait_for(command[1])

            # Break out of running commands if a command failed
            if not shell_status:
                break

        return shell_status

    # Run given command and return False if error occurs otherwise return True
    def run(self, command, sleep=0):
        # Check process to make sure it is still running and if not grab the stderr output
        if self.process.poll():
            self.logger.error("Interactive shell command " + self.command + " closed with return code: " +
                              self.process.returncode)
            data = _get_queue_output(self.stderr_queue)
            if data != "":
                self.logger.error("Interactive shell error messages:")
                for line in iter(data.splitlines()):
                    self.logger.error(line)
            return False

        # Write command to process and check to make sure a newline is in command otherwise add it
        if "
" not in command:
            command += "
"
        os.write(self.master_stdout, command.encode("utf-8"))
        if sleep:
            time.sleep(sleep)

        return True

    # Wait for specific regex expression in output before continuing return False if wait time expires otherwise return
    # True
    def wait_for(self, this, timeout=DEVICE_TIMEOUT):
        timeout = datetime.now() + timedelta(seconds=timeout)
        output = ""

        # Keep searching for output until timeout occurs
        while timeout > datetime.now():
            data = _get_queue_output(self.stdout_queue)
            if data != "":
                # Add to output line and check for match to regex given and if match then break and send output to
                # logger
                output += data
                lib_logger.debug("Checking for " + this + " in data: ")
                for line in iter(output.splitlines()):
                    lib_logger.debug(line)
                if re.search(r"{}s?$".format(this), output):
                    break
            time.sleep(1)

        # Send output to logger
        for line in iter(output.splitlines()):
            self.logger.debug(line)

        # If wait time expired print error message and return False
        if timeout < datetime.now():
            self.logger.error("Wait time expired when waiting for " + this)
            return False

        return True
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   601  
  华为IPD与传统研发模式的8大差异在快速变化的商业环境中,产品研发模式的选择直接决定了企业的市场响应速度和竞争力。华为作为全球领先的通信技术解决方案供应商,其成功在很大程度上得益于对产品研发模式的持续创新。华为引入并深度定制的集成产品开发(IPD)体系,相较于传统的研发模式,展现出了显著的差异和优势。本文将详细探讨华为...
IPD流程是谁发明的   7  
  如何通过IPD流程缩短产品上市时间?在快速变化的市场环境中,产品上市时间成为企业竞争力的关键因素之一。集成产品开发(IPD, Integrated Product Development)作为一种先进的产品研发管理方法,通过其结构化的流程设计和跨部门协作机制,显著缩短了产品上市时间,提高了市场响应速度。本文将深入探讨如...
华为IPD流程   9  
  在项目管理领域,IPD(Integrated Product Development,集成产品开发)流程图是连接创意、设计与市场成功的桥梁。它不仅是一个视觉工具,更是一种战略思维方式的体现,帮助团队高效协同,确保产品按时、按质、按量推向市场。尽管IPD流程图可能初看之下显得错综复杂,但只需掌握几个关键点,你便能轻松驾驭...
IPD开发流程管理   8  
  在项目管理领域,集成产品开发(IPD)流程被视为提升产品上市速度、增强团队协作与创新能力的重要工具。然而,尽管IPD流程拥有诸多优势,其实施过程中仍可能遭遇多种挑战,导致项目失败。本文旨在深入探讨八个常见的IPD流程失败原因,并提出相应的解决方法,以帮助项目管理者规避风险,确保项目成功。缺乏明确的项目目标与战略对齐IP...
IPD流程图   8  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用