使用子进程获取实时输出[重复]
- 2024-12-24 08:56:00
- admin 原创
- 84
问题描述:
我正在尝试为命令行程序 (svnadmin verify) 编写一个包装器脚本,该脚本将显示操作的良好进度指示器。这要求我能够在输出时立即看到包装程序的每一行输出。
我认为我只需使用 执行程序subprocess.Popen
,stdout=PIPE
然后读取每一行并相应地执行操作即可。但是,当我运行以下代码时,输出似乎被缓冲在某处,导致它出现在两个块中,即第 1 行到第 332 行,然后是第 333 行到第 439 行(输出的最后一行)
from subprocess import Popen, PIPE, STDOUT
p = Popen('svnadmin verify /var/svn/repos/config', stdout = PIPE,
stderr = STDOUT, shell = True)
for line in p.stdout:
print line.replace('
', '')
在查看了有关子进程的文档之后,我发现了bufsize
参数Popen
,因此我尝试将 bufsize 设置为 1(缓冲每行)和 0(无缓冲),但这两个值似乎都没有改变行传递的方式。
这时我开始抓住救命稻草,因此我写了以下输出循环:
while True:
try:
print p.stdout.next().replace('
', '')
except StopIteration:
break
但结果是一样的。
是否可以使用子进程获取执行的程序的“实时”程序输出?Python 中还有其他向前兼容(不兼容exec*
)的选项吗?
解决方案 1:
我尝试了这个,但出于某种原因,代码
for line in p.stdout:
...
缓冲积极,变体
while True:
line = p.stdout.readline()
if not line: break
...
不会。显然这是一个已知错误:http ://bugs.python.org/issue3907 (截至 2018 年 8 月 29 日,该问题已“关闭”)
解决方案 2:
通过将缓冲区大小设置为 1,您实际上强制进程不缓冲输出。
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1)
for line in iter(p.stdout.readline, b''):
print line,
p.stdout.close()
p.wait()
解决方案 3:
您可以将子进程的输出直接发送到流。简化示例:
subprocess.run(['ls'], stderr=sys.stderr, stdout=sys.stdout)
解决方案 4:
您可以尝试以下操作:
import subprocess
import sys
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
while True:
out = process.stdout.read(1)
if out == '' and process.poll() != None:
break
if out != '':
sys.stdout.write(out)
sys.stdout.flush()
如果使用 readline 而不是 read,则在某些情况下不会打印输入消息。尝试使用需要内联输入的命令并亲自查看。
解决方案 5:
在 Python 3.x 中,该过程可能会挂起,因为输出是字节数组而不是字符串。请确保将其解码为字符串。
从 Python 3.6 开始,你可以使用Popen Constructorencoding
中的参数来实现。完整示例:
process = subprocess.Popen(
'my_command',
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
encoding='utf-8',
errors='replace'
)
while True:
realtime_output = process.stdout.readline()
if realtime_output == '' and process.poll() is not None:
break
if realtime_output:
print(realtime_output.strip(), flush=True)
请注意,此代码重定向 stderr
到stdout
并处理输出错误。
解决方案 6:
实时输出问题已解决:我在 Python 中遇到了类似的问题,当时正在捕获 C 程序的实时输出。我添加了fflush(stdout);
我的 C 代码。它对我有用。这是代码。
C 程序:
#include <stdio.h>
void main()
{
int count = 1;
while (1)
{
printf(" Count %d
", count++);
fflush(stdout);
sleep(1);
}
}
Python 程序:
#!/usr/bin/python
import os, sys
import subprocess
procExe = subprocess.Popen(".//count", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
while procExe.poll() is None:
line = procExe.stdout.readline()
print("Print:" + line)
输出:
Print: Count 1
Print: Count 2
Print: Count 3
解决方案 7:
Kevin McCarthy发表的《在 Python 中使用 asyncio 实现流式子进程 stdin 和 stdout》博客文章展示了如何使用 asyncio 来实现这一点:
import asyncio
from asyncio.subprocess import PIPE
from asyncio import create_subprocess_exec
async def _read_stream(stream, callback):
while True:
line = await stream.readline()
if line:
callback(line)
else:
break
async def run(command):
process = await create_subprocess_exec(
*command, stdout=PIPE, stderr=PIPE
)
await asyncio.wait(
[
_read_stream(
process.stdout,
lambda x: print(
"STDOUT: {}".format(x.decode("UTF8"))
),
),
_read_stream(
process.stderr,
lambda x: print(
"STDERR: {}".format(x.decode("UTF8"))
),
),
]
)
await process.wait()
async def main():
await run("docker build -t my-docker-image:latest .")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
解决方案 8:
根据使用情况,您可能还想禁用子进程本身的缓冲。
如果子进程是 Python 进程,则可以在调用之前执行以下操作:
os.environ["PYTHONUNBUFFERED"] = "1"
或者将其作为env
参数传递给Popen
。
否则,如果您使用的是 Linux/Unix,则可以使用该stdbuf
工具。例如:
cmd = ["stdbuf", "-oL"] + cmd
另请参阅此处或stdbuf
其他选项。
(另请参阅此处以获得相同的答案。)
解决方案 9:
这是我经常使用的基本框架。它可以轻松实现超时,并能够处理不可避免的挂起进程。
import subprocess
import threading
import Queue
def t_read_stdout(process, queue):
"""Read from stdout"""
for output in iter(process.stdout.readline, b''):
queue.put(output)
return
process = subprocess.Popen(['dir'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
cwd='C:\\',
shell=True)
queue = Queue.Queue()
t_stdout = threading.Thread(target=t_read_stdout, args=(process, queue))
t_stdout.daemon = True
t_stdout.start()
while process.poll() is None or not queue.empty():
try:
output = queue.get(timeout=.5)
except Queue.Empty:
continue
if not output:
continue
print(output),
t_stdout.join()
解决方案 10:
在此处找到此“即插即用”功能。效果非常好!
import subprocess
def myrun(cmd):
"""from
http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
"""
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout = []
while True:
line = p.stdout.readline()
stdout.append(line)
print line,
if line == '' and p.poll() != None:
break
return ''.join(stdout)
解决方案 11:
我使用此解决方案来获取子进程的实时输出。进程完成后,此循环将立即停止,无需 break 语句或可能的无限循环。
sub_process = subprocess.Popen(my_command, close_fds=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while sub_process.poll() is None:
out = sub_process.stdout.read(1)
sys.stdout.write(out)
sys.stdout.flush()
解决方案 12:
我之前也遇到过同样的问题。我的解决方案是放弃迭代方法read
,即使子进程尚未完成执行,该方法也会立即返回,等等。
解决方案 13:
完整解决方案:
import contextlib
import subprocess
# Unix, Windows and old Macintosh end-of-line
newlines = ['
', '
', '
']
def unbuffered(proc, stream='stdout'):
stream = getattr(proc, stream)
with contextlib.closing(stream):
while True:
out = []
last = stream.read(1)
# Don't loop forever
if last == '' and proc.poll() is not None:
break
while last not in newlines:
# Don't loop forever
if last == '' and proc.poll() is not None:
break
out.append(last)
last = stream.read(1)
out = ''.join(out)
yield out
def example():
cmd = ['ls', '-l', '/']
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
# Make all end-of-lines '
'
universal_newlines=True,
)
for line in unbuffered(proc):
print line
example()
解决方案 14:
您可以在子进程的输出上使用迭代器遍历每个字节。这允许从子进程进行内联更新(以 '\r' 结尾的行覆盖前一个输出行):
from subprocess import PIPE, Popen
command = ["my_command", "-my_arg"]
# Open pipe to subprocess
subprocess = Popen(command, stdout=PIPE, stderr=PIPE)
# read each byte of subprocess
while subprocess.poll() is None:
for c in iter(lambda: subprocess.stdout.read(1) if subprocess.poll() is None else {}, b''):
c = c.decode('ascii')
sys.stdout.write(c)
sys.stdout.flush()
if subprocess.returncode != 0:
raise Exception("The subprocess did not terminate correctly.")
解决方案 15:
如果你只是想实时将日志转发到控制台
下面的代码适用于两者
p = subprocess.Popen(cmd,
shell=True,
cwd=work_dir,
bufsize=1,
stdin=subprocess.PIPE,
stderr=sys.stderr,
stdout=sys.stdout)
解决方案 16:
使用pexpect和非阻塞 readlines 可以解决此问题。这是因为管道是缓冲的,因此应用程序的输出会被管道缓冲,因此您无法获得该输出,直到缓冲区填满或进程终止。
解决方案 17:
答案迟了,但是以下内容适用于 Python3:
import subprocess
import sys
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while True:
out = process.stdout.read(1)
if process.poll() is not None:
break
if out != '':
sys.stdout.buffer.write(out)
sys.stdout.flush()
解决方案 18:
以下是对我有用的方法:
import subprocess
import sys
def run_cmd_print_output_to_console_and_log_to_file(cmd, log_file_path):
make_file_if_not_exist(log_file_path)
logfile = open(log_file_path, 'w')
proc=subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell = True)
for line in proc.stdout:
sys.stdout.write(line.decode("utf-8") )
print(line.decode("utf-8").strip(), file=logfile, flush=True)
proc.wait()
logfile.close()
解决方案 19:
还有另一个答案!我有以下要求:
运行一些命令并将输出打印到 stdout,就像用户运行它一样
向用户显示命令中的任何提示。例如,
pip uninstall numpy
将提示... Proceed (Y/n)?
(不以换行符结尾)将输出(用户看到的)捕获为字符串
这对我有用(仅在 Windows 上的 Python 3.10 中测试过):
def run(*args: list[str]) -> str:
proc = subprocess.Popen(
*args,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
result = ""
while proc.poll() is None:
output = proc.stdout.read(1)
if output:
sys.stdout.write(output)
sys.stdout.flush()
result += output
return result
解决方案 20:
这些都是很好的例子,但我发现它们要么 (a) 处理部分行(例如“你确定吗(Y/n):”)但速度非常慢,要么 b) 速度很快但挂在部分行上。
我曾进行过以下工作:
为 stdout 和 stderr 提供实时输出到各自的流
由于采用流缓冲,速度极快
允许使用超时,因为它永远不会阻塞 read()
有效地独立保存 stdout 和 stderr
处理文本编码(尽管很容易适应二进制流)
适用于 Python 3.6+
import os
import subprocess
import sys
import selectors
import io
def run_command(command: str) -> (int, str):
proc = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
sel = selectors.DefaultSelector()
for fobj in [ proc.stdout, proc.stderr ]:
os.set_blocking(fobj.fileno(), False)
sel.register(fobj, selectors.EVENT_READ)
out=io.StringIO()
err=io.StringIO()
# loop until all descriptors removed
while len(sel.get_map()) > 0:
events = sel.select()
if len(events) == 0:
# timeout or signal, kill to prevent wait hanging
proc.terminate()
break
for key, _ in events:
# read all available data
buf = key.fileobj.read().decode(errors='ignore')
if buf == '':
sel.unregister(key.fileobj)
elif key.fileobj == proc.stdout:
sys.stdout.write(buf)
sys.stdout.flush()
out.write(buf)
elif key.fileobj == proc.stderr:
sys.stderr.write(buf)
sys.stderr.flush()
err.write(buf)
sel.close()
proc.wait()
if proc.returncode != 0:
return (proc.returncode, err.getvalue())
return (0, out.getvalue())
我没有包含超时逻辑(因为主题是实时输出),但将它们添加到 select()/wait() 很简单,不再担心无限挂起。
我已经计时cat '25MB-file'
并与.read(1)
解决方案进行了比较,它大约快了 300 倍。
解决方案 21:
(该解决方案已使用 Python 2.7.15 测试过)
您只需要在每次读/写之后执行 sys.stdout.flush():
while proc.poll() is None:
line = proc.stdout.readline()
sys.stdout.write(line)
# or print(line.strip()), you still need to force the flush.
sys.stdout.flush()
解决方案 22:
很少有答案建议使用 python 3.x 或 pthon 2.x,下面的代码适用于两者。
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,)
stdout = []
while True:
line = p.stdout.readline()
if not isinstance(line, (str)):
line = line.decode('utf-8')
stdout.append(line)
print (line)
if (line == '' and p.poll() != None):
break
解决方案 23:
def run_command(command):
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print(output.strip())
rc = process.poll()
return rc
解决方案 24:
以下是我的解决方案:
process = subprocess.Popen(command, stdout=PIPE, stderr=PIPE)
error_output = ""
while True:
# The empty string is important to fulfill the exit condition (see below)
stdout_line = ""
if process.stdout:
stdout = process.stdout.readline()
if stdout:
stdout_line = stdout.decode("utf-8")
log.debug(stdout_line)
# The empty string is important to fulfill the exit condition (see below)
stderr_line = ""
if process.stderr:
stderr = process.stderr.readline()
if stderr:
stderr_line = stderr.decode("utf-8")
error_output += stderr_line
log.debug(stderr_line)
# It might be the case that the process is finished but reading the
# output is not finished. This is why we check both conditions:
# Condition for readline:
# https://docs.python.org/3.6/tutorial/inputoutput.html#methods-of-file-objects
# Condition for poll:
# https://docs.python.org/3/library/subprocess.html#subprocess.Popen.poll
if stdout_line == "" and stderr_line == "" and process.poll() != None:
break
if process.returncode != 0:
raise Exception(error_output)
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 项目管理必备:盘点2024年13款好用的项目管理软件
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)