Python 中对 subprocess.PIPE 进行非阻塞读取

2024-11-20 08:44:00
admin
原创
7
摘要:问题描述:我正在使用subprocess 模块启动子进程并连接到其输出流(标准输出)。我希望能够在其标准输出上执行非阻塞读取。有没有办法使 .readline 非阻塞或在调用之前检查流中是否有数据.readline?我希望它是可移植的,或者至少可以在 Windows 和 Linux 下工作。以下是我目前的操作...

问题描述:

我正在使用subprocess 模块启动子进程并连接到其输出流(标准输出)。我希望能够在其标准输出上执行非阻塞读取。有没有办法使 .readline 非阻塞或在调用之前检查流中是否有数据.readline?我希望它是可移植的,或者至少可以在 Windows 和 Linux 下工作。

以下是我目前的操作方式(.readline如果没有可用数据,则会阻塞):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

解决方案 1:

fcntl,,在这种情况下没有帮助selectasyncproc

无论操作系统如何,读取流而不阻塞的可靠方法是使用Queue.get_nowait()

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

解决方案 2:

我经常遇到类似的问题;我经常编写的 Python 程序需要能够执行一些主要功能,同时从命令行 (stdin) 接受用户输入。简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为会readline()阻塞并且没有超时。如果主要功能已完成并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能,因为readline()它仍然阻塞在另一个线程中等待一行。我发现解决这个问题的一个方法是使用 fcntl 模块将 stdin 变成一个非阻塞文件:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

在我看来,这比使用选择或信号模块来解决这个问题要干净一些,但它只适用于 UNIX......

解决方案 3:

在类 Unix 系统和 Python 3.5+ 上,有os.set_blocking一个功能与它所说的完全一致。

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

输出:

1 b''
2 b'0
'
1 b''
2 b'1
'
1 b''
2 b'2
'
1 b''
2 b'3
'
1 b''
2 b'4
'

附有os.set_blocking评论的是:

0 b'0
'
1 b'1
'
0 b'2
'
1 b'3
'
0 b'4
'
1 b''

解决方案 4:

Python 3.4为异步 IO引入了新的临时 API——asyncio模块。

该方法类似于twisted@Bryan Ward 的基于答案的方法——定义一个协议,并且在数据准备好后立即调用其方法:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

请参阅文档中的“子流程”。

有一个高级接口asyncio.create_subprocess_exec()返回Process对象,允许使用StreamReader.readline()协程异步读取一行
(使用async/ awaitPython 3.5+语法):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'
') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill()执行以下任务:

  • 启动子进程,将其标准输出重定向到管道

  • 从子进程的标准输出异步读取一行

  • 终止子进程

  • 等待它退出

如果有必要,每个步骤可以通过超时秒数来限制。

解决方案 5:

尝试一下asyncproc模块。例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

该模块负责处理 S.Lott 建议的所有线程。

解决方案 6:

现代 Python 的情况好多了。

这是一个简单的子程序“hello.py”:

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

以及与之交互的程序:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob
")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice
")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit
")
    await proc.wait()


asyncio.run(main())

打印结果如下:

b'hello bob
'
b'hello alice
'

请注意,实际模式(几乎所有先前的答案,包括此处和相关问题)是将子进程的 stdout 文件描述符设置为非阻塞,然后在某种选择循环中对其进行轮询。当然,现在该循环由 asyncio 提供。

解决方案 7:

您可以在Twisted中轻松完成此操作。根据您现有的代码库,这可能不是那么容易使用,但如果您正在构建一个 twisted 应用程序,那么这样的事情就变得几乎微不足道了。您创建一个ProcessProtocol类,然后重写该outReceived()方法。Twisted(取决于所使用的反应器)通常只是一个select()安装了回调的大循环,用于处理来自不同文件描述符(通常是网络套接字)的数据。因此该outReceived()方法只是安装了一个回调来处理来自的数据STDOUT。演示此行为的简单示例如下:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Twisted 文档对此有一些很好的信息。

如果您围绕 Twisted 构建整个应用程序,那么与其他进程(本地或远程)的异步通信就会变得非常优雅。另一方面,如果您的程序不是基于 Twisted 构建的,那么这实际上不会有什么帮助。希望这对其他读者有所帮助,即使它不适用于您的特定应用程序。

解决方案 8:

使用选择&读取(1)。

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

对于类似 readline() 的情况:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('
')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('
')
for a in range(len(lines)-1):
  print a

解决方案 9:

这是一个基于线程的简单解决方案:

  • 适用于 Linux 和 Windows(不依赖于select)。

  • stdout异步读取stderr

  • 不依赖于具有任意等待时间的主动轮询(CPU 友好)。

  • 不使用asyncio(可能会与其他库冲突)。

  • 运行直到子进程终止。

打印机.py

import time
import sys

sys.stdout.write("Hello
")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!
")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error
")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine
")
sys.stdout.flush()
time.sleep(1)

读者

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

解决方案 10:

这是我的代码,用于尽快捕获子进程的每个输出,包括部分行。它同时泵送,并且 stdout 和 stderr 几乎按正确的顺序进行。

经过测试,在 Python 2.7 Linux 和 Windows 上可以正确运行。

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr
")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

解决方案 11:

一个解决方案是让另一个进程执行对该进程的读取,或者让该进程的线程具有超时限制。

这是超时函数的线程版本:

http://code.activestate.com/recipes/473878/

但是,您需要在标准输出传入时读取它吗?另一个解决方案可能是将输出转储到文件并使用p.wait()等待该过程完成。

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

解决方案 12:

免责声明:这仅适用于 Torno

您可以通过将 fd 设置为非阻塞,然后使用 ioloop 注册回调来实现这一点。我已将其打包在名为tornado_subprocess的 egg 中,您可以通过 PyPI 安装它:

easy_install tornado_subprocess

现在你可以做这样的事情:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

你也可以将它与 RequestHandler 一起使用

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

解决方案 13:

现有的解决方案对我来说不起作用(详情如下)。最终有效的是使用 read(1) 实现 readline(基于此答案)。后者不会阻塞:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '
':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

现有解决方案为何不起作用:

  1. 需要 readline 的解决方案(包括基于队列的解决方案)总是会阻塞。很难(不可能?)杀死执行 readline 的线程。只有当创建它的进程完成时,它才会被杀死,而不会在输出生成进程被杀死时被杀死。

  2. 正如 anonnn 指出的那样,将低级 fcntl 与高级 readline 调用混合可能无法正常工作。

  3. 使用 select.poll() 很简洁,但根据 python 文档,它在 Windows 上不起作用。

  4. 对于这项任务来说,使用第三方库似乎有些过度,而且会增加额外的依赖性。

解决方案 14:

我添加了这个问题来读取一些 subprocess.Popen stdout。这是我的非阻塞读取解决方案:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test
'

解决方案 15:

此版本的非阻塞读取不需要特殊模块,并且可以在大多数 Linux 发行版上开箱即用。

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

解决方案 16:

这不是第一个,也可能不是最后一个,我构建了一个使用两种不同方法执行非阻塞 stdout PIPE 读取的包,一种方法是基于 JF Sebastian (@jfs) 的回答的工作,另一种是一个简单的communication()循环,带有一个线程来检查超时。

两种 stdout 捕获方法均经过测试,可在 Linux 和 Windows 下使用,截至撰写本文时,Python 版本为 2.7 至 3.9

由于它是非阻塞的,因此即使有多个子进程和孙进程,甚至在 Python 2.7 下,它也能保证超时执行。

该包还处理字节和文本标准输出编码,当尝试捕获 EOF 时,这将是一场噩梦。

您可以在https://github.com/netinvent/command_runner找到该软件包

如果您需要一些经过充分测试的非阻塞读取实现,请尝试一下(或破解代码):

pip install command_runner

from command_runner import command_runner

exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')

_poll_process()您可以在或中找到核心非阻塞读取代码_monitor_process(),具体取决于所采用的捕获方法。从那里,您可以按照自己的方式破解所需的内容,或者简单地使用整个包作为子进程替换来执行命令。

解决方案 17:

我遇到了原始提问者的问题,但不想调用线程。我将 Jesse 的解决方案与read()管道直接连接以及我自己的缓冲区处理程序混合在一起以进行行读取(但是,我的子进程 - ping - 总是写入完整的行 < 系统页面大小)。我通过仅在 gobject 注册的 io 监视中读取来避免忙等待。这些天我通常在 gobject MainLoop 中运行代码以避免线程。

def set_up_ping(ip, w):
    # run the sub-process
    # watch the resultant pipe
    p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
    # make stdout a non-blocking file
    fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
    return stdout_gid # for shutting down

观察者是

def watch(f, *other):
    print 'reading',f.read()
    return True

主程序建立一个ping然后调用gobject邮件循环。

def main():
    set_up_ping('192.168.1.8', watch)
    # discard gid as unused here
    gobject.MainLoop().run()

任何其他工作都附加到 gobject 中的回调。

解决方案 18:

选择模块帮助您确定下一个有用的输入在哪里。

但是,使用单独的线程几乎总是更令人高兴。一个线程执行阻塞读取 stdin 的操作,另一个线程执行您不想阻塞的操作。

解决方案 19:

就我而言,我需要一个日志模块来捕获后台应用程序的输出并对其进行增强(添加时间戳、颜色等)。

我最终得到了一个执行实际 I/O 的后台线程。以下代码仅适用于 POSIX 平台。我删除了不必要的部分。

如果有人打算长期使用这款机器,请考虑管理开放描述符。就我而言,这不是什么大问题。

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '
'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

解决方案 20:

在这里添加这个答案,因为它提供了在 Windows 和 Unix 上设置非阻塞管道的能力。

所有ctypes细节均感谢@techtonik 的回答。

有一个稍微修改过的版本可以在 Unix 和 Windows 系统上使用。

  • 兼容 Python3 (仅需进行微小改动)

  • 包括 posix 版本,并定义了用于其中任一的异常。

这样,您就可以对 Unix 和 Windows 代码使用相同的函数和异常。

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

为了避免读取不完整的数据,我最终编写了自己的 readline 生成器(它返回每行的字节字符串)。

它是一个发电机,因此您可以例如......

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'
')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

解决方案 21:

这是在子进程中运行交互式命令的示例,stdout 使用伪终端进行交互。您可以参考: https: //stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

解决方案 22:

我的问题有点不同,因为我想从正在运行的进程中收集 stdout 和 stderr,但最终是一样的,因为我想在生成的小部件中呈现输出。

我不想采用使用队列或附加线程的许多提议的解决方法,因为它们对于执行诸如运行另一个脚本并收集其输出这样的常见任务不是必要的。

在阅读了建议的解决方案和 Python 文档后,我通过以下实现解决了我的问题。是的,它仅适用于 POSIX,因为我正在使用select函数调用。

我同意这些文档令人困惑,而且对于如此常见的脚本任务,实现起来很尴尬。我相信旧版本的 Python 有不同的默认值Popen和不同的解释,因此造成了很多混乱。这似乎对 Python 2.7.12 和 3.5.2 都很好用。

关键是设置bufsize=1行缓冲,然后universal_newlines=True将其作为文本文件而不是二进制文件进行处理,这似乎成为设置时的默认设置bufsize=1

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR、DEBUG 和 VERBOSE 只是将输出打印到终端的宏。

我认为这个解决方案的有效性有 99.99%,因为它仍然使用阻塞readline功能,所以我们假设子进程很好并且输出完整的行。

由于我对 Python 还不熟悉,因此我欢迎反馈以改进解决方案。

解决方案 23:

尝试wexpect ,它是pexpect的 Windows 替代品。

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()

解决方案 24:

这个是纯 Python 的,易于理解并且运行良好:

import subprocess
import sys
import threading

# Function to read and print stdout of the subprocess
def readstdout():
    for l in iter(p.stdout.readline, b""):
        sys.stdout.write(f'{l.decode("utf-8", "backslashreplace")}
')

# Function to read and print stderr of the subprocess
def readstderr():
    for l in iter(p.stderr.readline, b""):
        sys.stderr.write(f'{l.decode("utf-8", "backslashreplace")}
')

# Function to send a command to the subprocess
def sendcommand(cmd):
    p.stdin.write(cmd.encode() + b"
")
    p.stdin.flush()

# Create a subprocess
p = subprocess.Popen(
    "adb.exe -s 127.0.0.1:5555 shell",
    stdout=subprocess.PIPE,
    stdin=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

# Create two threads to read and print stdout and stderr concurrently
t1 = threading.Thread(target=readstdout)
t2 = threading.Thread(target=readstderr)

# Start the threads to capture and print the subprocess output
t1.start()
t2.start()

# Send a command to the subprocess
sendcommand("ls")

解决方案 25:

我已经基于JF Sebastian 的解决方案创建了一个库。您可以使用它。

https://github.com/cenkalti/what

解决方案 26:

根据 JF Sebastian 的回答和其他一些来源,我整理了一个简单的子进程管理器。它提供请求非阻塞读取,以及并行运行多个进程。它不使用任何特定于操作系统的调用(据我所知),因此应该可以在任何地方工作。

它可从 pypi 获得,因此只需pip install shelljob。请参阅项目页面以获取示例和完整文档。

解决方案 27:

编辑:此实现仍然阻塞。请改用 JFSebastian 的答案。

我尝试了最佳答案,但是线程代码的额外风险和维护令人担忧。

通过查看io 模块(仅限于 2.6),我找到了 BufferedReader。这是我的无线程、非阻塞解决方案。

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '
' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

解决方案 28:

为什么要费心线程和队列?与 readline() 不同,BufferedReader.read1() 不会阻塞等待 \r\n,如果有任何输出进入,它会尽快返回。

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

解决方案 29:

此解决方案使用select模块从 IO 流中“读取任何可用数据”。此功能最初会阻塞,直到有可用数据,但之后只会读取可用的数据,不会进一步阻塞。

鉴于它使用了select模块,它只在 Unix 上有效。

该代码完全符合 PEP8 标准。

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

解决方案 30:

我也遇到了Jesse描述的问题,并通过像Bradley、Andy和其他人一样使用“select”解决了该问题,但采用阻塞模式以避免繁忙循环。它使用虚拟管道作为假标准输入。select 阻塞并等待标准输入或管道准备就绪。按下键时,标准输入会解除对 select 的阻塞,并且可以使用 read(1) 检索键值。当不同的线程写入管道时,管道会解除对 select 的阻塞,这可以视为对标准输入的需求已经结束的迹象。以下是一些参考代码:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   601  
  华为IPD与传统研发模式的8大差异在快速变化的商业环境中,产品研发模式的选择直接决定了企业的市场响应速度和竞争力。华为作为全球领先的通信技术解决方案供应商,其成功在很大程度上得益于对产品研发模式的持续创新。华为引入并深度定制的集成产品开发(IPD)体系,相较于传统的研发模式,展现出了显著的差异和优势。本文将详细探讨华为...
IPD流程是谁发明的   7  
  如何通过IPD流程缩短产品上市时间?在快速变化的市场环境中,产品上市时间成为企业竞争力的关键因素之一。集成产品开发(IPD, Integrated Product Development)作为一种先进的产品研发管理方法,通过其结构化的流程设计和跨部门协作机制,显著缩短了产品上市时间,提高了市场响应速度。本文将深入探讨如...
华为IPD流程   9  
  在项目管理领域,IPD(Integrated Product Development,集成产品开发)流程图是连接创意、设计与市场成功的桥梁。它不仅是一个视觉工具,更是一种战略思维方式的体现,帮助团队高效协同,确保产品按时、按质、按量推向市场。尽管IPD流程图可能初看之下显得错综复杂,但只需掌握几个关键点,你便能轻松驾驭...
IPD开发流程管理   8  
  在项目管理领域,集成产品开发(IPD)流程被视为提升产品上市速度、增强团队协作与创新能力的重要工具。然而,尽管IPD流程拥有诸多优势,其实施过程中仍可能遭遇多种挑战,导致项目失败。本文旨在深入探讨八个常见的IPD流程失败原因,并提出相应的解决方法,以帮助项目管理者规避风险,确保项目成功。缺乏明确的项目目标与战略对齐IP...
IPD流程图   8  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用