使用子进程获取实时输出[重复]

2024-12-24 08:56:00
admin
原创
84
摘要:问题描述:我正在尝试为命令行程序 (svnadmin verify) 编写一个包装器脚本,该脚本将显示操作的良好进度指示器。这要求我能够在输出时立即看到包装程序的每一行输出。我认为我只需使用 执行程序subprocess.Popen,stdout=PIPE然后读取每一行并相应地执行操作即可。但是,当我运行以下...

问题描述:

我正在尝试为命令行程序 (svnadmin verify) 编写一个包装器脚本,该脚本将显示操作的良好进度指示器。这要求我能够在输出时立即看到包装程序的每一行输出。

我认为我只需使用 执行程序subprocess.Popenstdout=PIPE然后读取每一行并相应地执行操作即可。但是,当我运行以下代码时,输​​出似乎被缓冲在某处,导致它出现在两个块中,即第 1 行到第 332 行,然后是第 333 行到第 439 行(输出的最后一行)

from subprocess import Popen, PIPE, STDOUT

p = Popen('svnadmin verify /var/svn/repos/config', stdout = PIPE, 
        stderr = STDOUT, shell = True)
for line in p.stdout:
    print line.replace('
', '')

在查看了有关子进程的文档之后,我发现了bufsize参数Popen,因此我尝试将 bufsize 设置为 1(缓冲每行)和 0(无缓冲),但这两个值似乎都没有改变行传递的方式。

这时我开始抓住救命稻草,因此我写了以下输出循环:

while True:
    try:
        print p.stdout.next().replace('
', '')
    except StopIteration:
        break

但结果是一样的。

是否可以使用子进程获取执行的程序的“实时”程序输出?Python 中还有其他向前兼容(不兼容exec*)的选项吗?


解决方案 1:

我尝试了这个,但出于某种原因,代码

for line in p.stdout:
  ...

缓冲积极,变体

while True:
  line = p.stdout.readline()
  if not line: break
  ...

不会。显然这是一个已知错误:http ://bugs.python.org/issue3907 (截至 2018 年 8 月 29 日,该问题已“关闭”)

解决方案 2:

通过将缓冲区大小设置为 1,您实际上强制进程不缓冲输出。

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1)
for line in iter(p.stdout.readline, b''):
    print line,
p.stdout.close()
p.wait()

解决方案 3:

您可以将子进程的输出直接发送到流。简化示例:

subprocess.run(['ls'], stderr=sys.stderr, stdout=sys.stdout)

解决方案 4:

您可以尝试以下操作:

import subprocess
import sys

process = subprocess.Popen(
    cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

while True:
    out = process.stdout.read(1)
    if out == '' and process.poll() != None:
        break
    if out != '':
        sys.stdout.write(out)
        sys.stdout.flush()

如果使用 readline 而不是 read,则在某些情况下不会打印输入消息。尝试使用需要内联输入的命令并亲自查看。

解决方案 5:

在 Python 3.x 中,该过程可能会挂起,因为输出是字节数组而不是字符串。请确保将其解码为字符串。

从 Python 3.6 开始,你可以使用Popen Constructorencoding中的参数来实现。完整示例:

process = subprocess.Popen(
    'my_command',
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    shell=True,
    encoding='utf-8',
    errors='replace'
)

while True:
    realtime_output = process.stdout.readline()

    if realtime_output == '' and process.poll() is not None:
        break

    if realtime_output:
        print(realtime_output.strip(), flush=True)

请注意,此代码重定向 stderrstdout并处理输出错误。

解决方案 6:

实时输出问题已解决:我在 Python 中遇到了类似的问题,当时正在捕获 C 程序的实时输出。我添加了fflush(stdout);我的 C 代码。它对我有用。这是代码。

C 程序:

#include <stdio.h>
void main()
{
    int count = 1;
    while (1)
    {
        printf(" Count  %d
", count++);
        fflush(stdout);
        sleep(1);
    }
}

Python 程序:

#!/usr/bin/python

import os, sys
import subprocess


procExe = subprocess.Popen(".//count", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

while procExe.poll() is None:
    line = procExe.stdout.readline()
    print("Print:" + line)

输出:

Print: Count  1
Print: Count  2
Print: Count  3

解决方案 7:

Kevin McCarthy发表的《在 Python 中使用 asyncio 实现流式子进程 stdin 和 stdout》博客文章展示了如何使用 asyncio 来实现这一点:

import asyncio
from asyncio.subprocess import PIPE
from asyncio import create_subprocess_exec


async def _read_stream(stream, callback):
    while True:
        line = await stream.readline()
        if line:
            callback(line)
        else:
            break


async def run(command):
    process = await create_subprocess_exec(
        *command, stdout=PIPE, stderr=PIPE
    )

    await asyncio.wait(
        [
            _read_stream(
                process.stdout,
                lambda x: print(
                    "STDOUT: {}".format(x.decode("UTF8"))
                ),
            ),
            _read_stream(
                process.stderr,
                lambda x: print(
                    "STDERR: {}".format(x.decode("UTF8"))
                ),
            ),
        ]
    )

    await process.wait()


async def main():
    await run("docker build -t my-docker-image:latest .")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

解决方案 8:

根据使用情况,您可能还想禁用子进程本身的缓冲。

如果子进程是 Python 进程,则可以在调用之前执行以下操作:

os.environ["PYTHONUNBUFFERED"] = "1"

或者将其作为env参数传递给Popen

否则,如果您使用的是 Linux/Unix,则可以使用该stdbuf工具。例如:

cmd = ["stdbuf", "-oL"] + cmd

另请参阅此处或stdbuf其他选项。

(另请参阅此处以获得相同的答案。)

解决方案 9:

这是我经常使用的基本框架。它可以轻松实现超时,并能够处理不可避免的挂起进程。

import subprocess
import threading
import Queue

def t_read_stdout(process, queue):
    """Read from stdout"""

    for output in iter(process.stdout.readline, b''):
        queue.put(output)

    return

process = subprocess.Popen(['dir'],
                           stdout=subprocess.PIPE,
                           stderr=subprocess.STDOUT,
                           bufsize=1,
                           cwd='C:\\',
                           shell=True)

queue = Queue.Queue()
t_stdout = threading.Thread(target=t_read_stdout, args=(process, queue))
t_stdout.daemon = True
t_stdout.start()

while process.poll() is None or not queue.empty():
    try:
        output = queue.get(timeout=.5)

    except Queue.Empty:
        continue

    if not output:
        continue

    print(output),

t_stdout.join()

解决方案 10:

在此处找到此“即插即用”功能。效果非常好!

import subprocess

def myrun(cmd):
    """from
    http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
    """
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    stdout = []
    while True:
        line = p.stdout.readline()
        stdout.append(line)
        print line,
        if line == '' and p.poll() != None:
            break
    return ''.join(stdout)

解决方案 11:

我使用此解决方案来获取子进程的实时输出。进程完成后,此循环将立即停止,无需 break 语句或可能的无限循环。

sub_process = subprocess.Popen(my_command, close_fds=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

while sub_process.poll() is None:
    out = sub_process.stdout.read(1)
    sys.stdout.write(out)
    sys.stdout.flush()

解决方案 12:

我之前也遇到过同样的问题。我的解决方案是放弃迭代方法read,即使子进程尚未完成执行,该方法也会立即返回,等等。

解决方案 13:

完整解决方案:

import contextlib
import subprocess

# Unix, Windows and old Macintosh end-of-line
newlines = ['
', '
', '
']
def unbuffered(proc, stream='stdout'):
    stream = getattr(proc, stream)
    with contextlib.closing(stream):
        while True:
            out = []
            last = stream.read(1)
            # Don't loop forever
            if last == '' and proc.poll() is not None:
                break
            while last not in newlines:
                # Don't loop forever
                if last == '' and proc.poll() is not None:
                    break
                out.append(last)
                last = stream.read(1)
            out = ''.join(out)
            yield out

def example():
    cmd = ['ls', '-l', '/']
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        # Make all end-of-lines '
'
        universal_newlines=True,
    )
    for line in unbuffered(proc):
        print line

example()

解决方案 14:

您可以在子进程的输出上使用迭代器遍历每个字节。这允许从子进程进行内联更新(以 '\r' 结尾的行覆盖前一个输出行):

from subprocess import PIPE, Popen

command = ["my_command", "-my_arg"]

# Open pipe to subprocess
subprocess = Popen(command, stdout=PIPE, stderr=PIPE)


# read each byte of subprocess
while subprocess.poll() is None:
    for c in iter(lambda: subprocess.stdout.read(1) if subprocess.poll() is None else {}, b''):
        c = c.decode('ascii')
        sys.stdout.write(c)
sys.stdout.flush()

if subprocess.returncode != 0:
    raise Exception("The subprocess did not terminate correctly.")

解决方案 15:

如果你只是想实时将日志转发到控制台

下面的代码适用于两者

 p = subprocess.Popen(cmd,
                         shell=True,
                         cwd=work_dir,
                         bufsize=1,
                         stdin=subprocess.PIPE,
                         stderr=sys.stderr,
                         stdout=sys.stdout)

解决方案 16:

使用pexpect和非阻塞 readlines 可以解决此问题。这是因为管道是缓冲的,因此应用程序的输出会被管道缓冲,因此您无法获得该输出,直到缓冲区填满或进程终止。

解决方案 17:

答案迟了,但是以下内容适用于 Python3:

import subprocess
import sys

process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

while True:
    out = process.stdout.read(1)
    if process.poll() is not None:
        break
    if out != '':
        sys.stdout.buffer.write(out)
        sys.stdout.flush()

解决方案 18:

以下是对我有用的方法:

import subprocess
import sys

def run_cmd_print_output_to_console_and_log_to_file(cmd, log_file_path):
    make_file_if_not_exist(log_file_path)
    logfile = open(log_file_path, 'w')

    proc=subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell = True)
    for line in proc.stdout:
        sys.stdout.write(line.decode("utf-8") )
        print(line.decode("utf-8").strip(), file=logfile, flush=True)
    proc.wait()

    logfile.close()

解决方案 19:

还有另一个答案!我有以下要求:

  • 运行一些命令并将输出打印到 stdout,就像用户运行它一样

  • 向用户显示命令中的任何提示。例如,pip uninstall numpy将提示... Proceed (Y/n)?(不以换行符结尾)

  • 将输出(用户看到的)捕获为字符串

这对我有用(仅在 Windows 上的 Python 3.10 中测试过):

def run(*args: list[str]) -> str:
    proc = subprocess.Popen(
        *args,
        text=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

    result = ""

    while proc.poll() is None:
        output = proc.stdout.read(1)

        if output:
            sys.stdout.write(output)
            sys.stdout.flush()
            result += output

    return result

解决方案 20:

这些都是很好的例子,但我发现它们要么 (a) 处理部分行(例如“你确定吗(Y/n):”)但速度非常慢,要么 b) 速度很快但挂在部分行上。

我曾进行过以下工作:

  • 为 stdout 和 stderr 提供实时输出到各自的流

  • 由于采用流缓冲,速度极快

  • 允许使用超时,因为它永远不会阻塞 read()

  • 有效地独立保存 stdout 和 stderr

  • 处理文本编码(尽管很容易适应二进制流)

  • 适用于 Python 3.6+

import os
import subprocess
import sys
import selectors
import io

def run_command(command: str) -> (int, str):

    proc = subprocess.Popen(
        command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    )

    sel = selectors.DefaultSelector()
    for fobj in [ proc.stdout, proc.stderr ]:
        os.set_blocking(fobj.fileno(), False)
        sel.register(fobj, selectors.EVENT_READ)

    out=io.StringIO()
    err=io.StringIO()

    # loop until all descriptors removed
    while len(sel.get_map()) > 0:
        events = sel.select()
        if len(events) == 0:
            # timeout or signal, kill to prevent wait hanging
            proc.terminate()
            break
        for key, _ in events:
            # read all available data
            buf = key.fileobj.read().decode(errors='ignore')
            if buf == '':
                sel.unregister(key.fileobj)
            elif key.fileobj == proc.stdout:
                sys.stdout.write(buf)
                sys.stdout.flush()
                out.write(buf)
            elif key.fileobj == proc.stderr:
                sys.stderr.write(buf)
                sys.stderr.flush()
                err.write(buf)

    sel.close()
    proc.wait()
    if proc.returncode != 0:
        return (proc.returncode, err.getvalue())
    return (0, out.getvalue())

我没有包含超时逻辑(因为主题是实时输出),但将它们添加到 select()/wait() 很简单,不再担心无限挂起。

我已经计时cat '25MB-file'并与.read(1)解决方案进行了比较,它大约快了 300 倍。

解决方案 21:

(该解决方案已使用 Python 2.7.15 测试过)

您只需要在每次读/写之后执行 sys.stdout.flush():

while proc.poll() is None:
    line = proc.stdout.readline()
    sys.stdout.write(line)
    # or print(line.strip()), you still need to force the flush.
    sys.stdout.flush()

解决方案 22:

很少有答案建议使用 python 3.x 或 pthon 2.x,下面的代码适用于两者。

 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,)
    stdout = []
    while True:
        line = p.stdout.readline()
        if not isinstance(line, (str)):
            line = line.decode('utf-8')
        stdout.append(line)
        print (line)
        if (line == '' and p.poll() != None):
            break

解决方案 23:

def run_command(command):
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
while True:
    output = process.stdout.readline()
    if output == '' and process.poll() is not None:
        break
    if output:
        print(output.strip())
rc = process.poll()
return rc

解决方案 24:

以下是我的解决方案:

process = subprocess.Popen(command, stdout=PIPE, stderr=PIPE)

error_output = ""

while True:

    # The empty string is important to fulfill the exit condition (see below)
    stdout_line = ""
    if process.stdout:
        stdout = process.stdout.readline()
        if stdout:
            stdout_line = stdout.decode("utf-8")
            log.debug(stdout_line)

    # The empty string is important to fulfill the exit condition (see below)
    stderr_line = ""
    if process.stderr:
        stderr = process.stderr.readline()
        if stderr:
            stderr_line = stderr.decode("utf-8")
            error_output += stderr_line
            log.debug(stderr_line)

    # It might be the case that the process is finished but reading the
    # output is not finished. This is why we check both conditions:
    # Condition for readline:
    #   https://docs.python.org/3.6/tutorial/inputoutput.html#methods-of-file-objects
    # Condition for poll:
    #   https://docs.python.org/3/library/subprocess.html#subprocess.Popen.poll
    if stdout_line == "" and stderr_line == "" and process.poll() != None:
        break

if process.returncode != 0:
    raise Exception(error_output)
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1120  
  IPD(Integrated Product Development,集成产品开发)流程是一种广泛应用于高科技和制造业的产品开发方法论。它通过跨职能团队的紧密协作,将产品开发周期缩短,同时提高产品质量和市场成功率。在IPD流程中,CDCP(Concept Decision Checkpoint,概念决策检查点)是一个关...
IPD培训课程   75  
  研发IPD(集成产品开发)流程作为一种系统化的产品开发方法,已经在许多行业中得到广泛应用。它不仅能够提升产品开发的效率和质量,还能够通过优化流程和资源分配,显著提高客户满意度。客户满意度是企业长期成功的关键因素之一,而IPD流程通过其独特的结构和机制,能够确保产品从概念到市场交付的每个环节都围绕客户需求展开。本文将深入...
IPD流程   66  
  IPD(Integrated Product Development,集成产品开发)流程是一种以跨职能团队协作为核心的产品开发方法,旨在通过优化资源分配、提高沟通效率以及减少返工,从而缩短项目周期并提升产品质量。随着企业对产品上市速度的要求越来越高,IPD流程的应用价值愈发凸显。通过整合产品开发过程中的各个环节,IPD...
IPD项目管理咨询   76  
  跨部门沟通是企业运营中不可或缺的一环,尤其在复杂的产品开发过程中,不同部门之间的协作效率直接影响项目的成败。集成产品开发(IPD)作为一种系统化的项目管理方法,旨在通过优化流程和增强团队协作来提升产品开发的效率和质量。然而,跨部门沟通的复杂性往往成为IPD实施中的一大挑战。部门之间的目标差异、信息不对称以及沟通渠道不畅...
IPD是什么意思   70  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用