在进程运行时不断打印子进程输出
- 2024-12-19 09:23:00
- admin 原创
- 80
问题描述:
要从我的 Python 脚本启动程序,我使用以下方法:
def execute(command):
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = process.communicate()[0]
exitCode = process.returncode
if (exitCode == 0):
return output
else:
raise ProcessException(command, exitCode, output)
因此,当我启动一个进程时Process.execute("mvn clean install")
,我的程序会等到该进程完成,然后我才能获得程序的完整输出。如果我正在运行一个需要一段时间才能完成的进程,这会很烦人。
我可以让我的程序逐行写入进程输出,通过在循环完成之前轮询进程输出吗?
我发现这篇文章可能与此相关。
解决方案 1:
您可以使用iter在命令输出行后立即处理它们:lines = iter(fd.readline, "")
。这是一个展示典型用例的完整示例(感谢@jfs 的帮助):
from __future__ import print_function # Only Python 2.x
import subprocess
def execute(cmd):
popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True)
for stdout_line in iter(popen.stdout.readline, ""):
yield stdout_line
popen.stdout.close()
return_code = popen.wait()
if return_code:
raise subprocess.CalledProcessError(return_code, cmd)
# Example
for path in execute(["locate", "a"]):
print(path, end="")
解决方案 2:
在 Python 3 中刷新子进程的标准输出缓冲区后,逐行打印子进程的输出:
from subprocess import Popen, PIPE, CalledProcessError
with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
for line in p.stdout:
print(line, end='') # process line here
if p.returncode != 0:
raise CalledProcessError(p.returncode, p.args)
注意:你不需要p.poll()
——当到达 eof 时循环结束。你也不需要iter(p.stdout.readline, '')
——预读错误已在 Python 3 中修复。
另请参阅Python:从 subprocess.communicate() 读取流输入。
解决方案 3:
好的,我设法通过使用这个问题中的片段来解决这个问题,而无需线程(任何使用线程更好的建议都会受到赞赏),在子进程运行时拦截子进程的标准输出
def execute(command):
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Poll process for new output until finished
while True:
nextline = process.stdout.readline()
if nextline == '' and process.poll() is not None:
break
sys.stdout.write(nextline)
sys.stdout.flush()
output = process.communicate()[0]
exitCode = process.returncode
if (exitCode == 0):
return output
else:
raise ProcessException(command, exitCode, output)
解决方案 4:
当您只想要print
输出时,实际上有一个非常简单的方法可以做到这一点:
import subprocess
import sys
def execute(command):
subprocess.check_call(command, shell=True, stdout=sys.stdout, stderr=subprocess.STDOUT)
这里我们只是将子流程指向我们自己的stdout
,并使用现有的成功或异常 API。
解决方案 5:
在 Python >= 3.5 中,使用subprocess.run
对我来说有效:
import subprocess
cmd = 'echo foo; sleep 1; echo foo; sleep 2; echo foo'
subprocess.run(cmd, shell=True)
(在执行过程中获取输出也可以shell=True
)
https://docs.python.org/3/library/subprocess.html#subprocess.run
解决方案 6:
@tokland
尝试了你的代码并针对 3.4 和 windows 进行了更正 dir.cmd 是一个简单的 dir 命令,保存为 cmd 文件
import subprocess
c = "dir.cmd"
def execute(command):
popen = subprocess.Popen(command, stdout=subprocess.PIPE,bufsize=1)
lines_iterator = iter(popen.stdout.readline, b"")
while popen.poll() is None:
for line in lines_iterator:
nline = line.rstrip()
print(nline.decode("latin"), end = "
",flush =True) # yield line
execute(c)
解决方案 7:
为了回答最初的问题,在我看来,最好的方法就是将子进程stdout
直接重定向到你的程序中stdout
(也可以这样做stderr
,如下例所示)
p = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
p.communicate()
解决方案 8:
对于任何尝试回答这个问题以从 Python 脚本中获取标准输出的人,请注意 Python 会缓冲其标准输出,因此可能需要一段时间才能看到标准输出。
可以通过在目标脚本中的每个 stdout 写入后添加以下内容来纠正此问题:
sys.stdout.flush()
解决方案 9:
如果有人想同时使用线程读取两者stdout
,stderr
这就是我想出的办法:
import threading
import subprocess
import Queue
class AsyncLineReader(threading.Thread):
def __init__(self, fd, outputQueue):
threading.Thread.__init__(self)
assert isinstance(outputQueue, Queue.Queue)
assert callable(fd.readline)
self.fd = fd
self.outputQueue = outputQueue
def run(self):
map(self.outputQueue.put, iter(self.fd.readline, ''))
def eof(self):
return not self.is_alive() and self.outputQueue.empty()
@classmethod
def getForFd(cls, fd, start=True):
queue = Queue.Queue()
reader = cls(fd, queue)
if start:
reader.start()
return reader, queue
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdoutReader, stdoutQueue) = AsyncLineReader.getForFd(process.stdout)
(stderrReader, stderrQueue) = AsyncLineReader.getForFd(process.stderr)
# Keep checking queues until there is no more output.
while not stdoutReader.eof() or not stderrReader.eof():
# Process all available lines from the stdout Queue.
while not stdoutQueue.empty():
line = stdoutQueue.get()
print 'Received stdout: ' + repr(line)
# Do stuff with stdout line.
# Process all available lines from the stderr Queue.
while not stderrQueue.empty():
line = stderrQueue.get()
print 'Received stderr: ' + repr(line)
# Do stuff with stderr line.
# Sleep for a short time to avoid excessive CPU use while waiting for data.
sleep(0.05)
print "Waiting for async readers to finish..."
stdoutReader.join()
stderrReader.join()
# Close subprocess' file descriptors.
process.stdout.close()
process.stderr.close()
print "Waiting for process to exit..."
returnCode = process.wait()
if returnCode != 0:
raise subprocess.CalledProcessError(returnCode, command)
我只是想分享一下,因为我最终遇到了这个问题并尝试做类似的事情,但没有一个答案可以解决我的问题。希望它能帮助到别人!
请注意,在我的用例中,外部进程终止了我们所需的进程Popen()
。
解决方案 10:
基于@jfs的出色回答,这里有一个完整的工作示例供您使用。需要Python 3.7或更新版本。
子目录
import time
for i in range(10):
print(i)
time.sleep(1)
主程序
from subprocess import PIPE, Popen
import sys
with Popen([sys.executable, '-u', 'sub.py'], bufsize=1, stdout=PIPE, text=True
) as sub:
for line in sub.stdout:
print(line, end='')
-u
是为了避免缓冲输出 - 或者,print(i, flush=True)
也可以。
解决方案 11:
这里的答案都不能满足我的所有需求。
没有用于 stdout 的线程(也没有队列等)
非阻塞,因为我需要检查正在发生的其他事情
使用 PIPE 因为我需要做多件事,例如流输出、写入日志文件并返回输出的字符串副本。
一点背景知识:我正在使用 ThreadPoolExecutor 来管理线程池,每个线程启动一个子进程并并发运行它们。(在 Python2.7 中,但这也应该适用于较新的 3.x)。我不想只使用线程来收集输出,因为我希望尽可能多的线程可用于其他用途(20 个进程的池将使用 40 个线程来运行;1 个用于进程线程,1 个用于 stdout...如果您想要 stderr,我想还有更多线程)
我在这里删除了很多异常等,因此这是基于在生产中有效的代码。希望我没有在复制和粘贴中毁掉它。此外,非常欢迎反馈!
import time
import fcntl
import subprocess
import time
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Make stdout non-blocking when using read/readline
proc_stdout = proc.stdout
fl = fcntl.fcntl(proc_stdout, fcntl.F_GETFL)
fcntl.fcntl(proc_stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def handle_stdout(proc_stream, my_buffer, echo_streams=True, log_file=None):
"""A little inline function to handle the stdout business. """
# fcntl makes readline non-blocking so it raises an IOError when empty
try:
for s in iter(proc_stream.readline, ''): # replace '' with b'' for Python 3
my_buffer.append(s)
if echo_streams:
sys.stdout.write(s)
if log_file:
log_file.write(s)
except IOError:
pass
# The main loop while subprocess is running
stdout_parts = []
while proc.poll() is None:
handle_stdout(proc_stdout, stdout_parts)
# ...Check for other things here...
# For example, check a multiprocessor.Value('b') to proc.kill()
time.sleep(0.01)
# Not sure if this is needed, but run it again just to be sure we got it all?
handle_stdout(proc_stdout, stdout_parts)
stdout_str = "".join(stdout_parts) # Just to demo
我确信这里会增加开销,但就我而言这不是问题。从功能上讲,它满足了我的需求。我唯一没有解决的问题是,为什么它对日志消息完美无缺,但我发现有些print
消息稍后会同时出现。
解决方案 12:
此 PoC 不断读取进程的输出,并可在需要时访问。仅保留最后的结果,所有其他输出均被丢弃,从而防止 PIPE 内存溢出:
import subprocess
import time
import threading
import Queue
class FlushPipe(object):
def __init__(self):
self.command = ['python', './print_date.py']
self.process = None
self.process_output = Queue.LifoQueue(0)
self.capture_output = threading.Thread(target=self.output_reader)
def output_reader(self):
for line in iter(self.process.stdout.readline, b''):
self.process_output.put_nowait(line)
def start_process(self):
self.process = subprocess.Popen(self.command,
stdout=subprocess.PIPE)
self.capture_output.start()
def get_output_for_processing(self):
line = self.process_output.get()
print ">>>" + line
if __name__ == "__main__":
flush_pipe = FlushPipe()
flush_pipe.start_process()
now = time.time()
while time.time() - now < 10:
flush_pipe.get_output_for_processing()
time.sleep(2.5)
flush_pipe.capture_output.join(timeout=0.001)
flush_pipe.process.kill()
打印日期.py
#!/usr/bin/env python
import time
if __name__ == "__main__":
while True:
print str(time.time())
time.sleep(0.01)
输出:您可以清楚地看到,只有约 2.5 秒间隔的输出,中间没有任何内容。
>>>1520535158.51
>>>1520535161.01
>>>1520535163.51
>>>1520535166.01
解决方案 13:
这至少在 Python3.4 中有效
import subprocess
process = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
for line in process.stdout:
print(line.decode().strip())
解决方案 14:
如果您想在进程运行时从标准输出打印,请使用-u
Python 选项。(尽管存在风险,但这是必要的......)subprocess.Popen()
`shell=True`
解决方案 15:
import time
import sys
import subprocess
import threading
import queue
cmd='esptool.py --chip esp8266 write_flash -z 0x1000 /home/pi/zero2/fw/base/boot_40m.bin'
cmd2='esptool.py --chip esp32 -b 115200 write_flash -z 0x1000 /home/pi/zero2/fw/test.bin'
cmd3='esptool.py --chip esp32 -b 115200 erase_flash'
class ExecutorFlushSTDOUT(object):
def __init__(self,timeout=15):
self.process = None
self.process_output = queue.Queue(0)
self.capture_output = threading.Thread(target=self.output_reader)
self.timeout=timeout
self.result=False
self.validator=None
def output_reader(self):
start=time.time()
while self.process.poll() is None and (time.time() - start) < self.timeout:
try:
if not self.process_output.full():
line=self.process.stdout.readline()
if line:
line=line.decode().rstrip("
")
start=time.time()
self.process_output.put(line)
if self.validator:
if self.validator in line: print("Valid");self.result=True
except:pass
self.process.kill()
return
def start_process(self,cmd_list,callback=None,validator=None,timeout=None):
if timeout: self.timeout=timeout
self.validator=validator
self.process = subprocess.Popen(cmd_list,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
self.capture_output.start()
line=None
self.result=False
while self.process.poll() is None:
try:
if not self.process_output.empty():
line = self.process_output.get()
if line:
if callback:callback(line)
#print(line)
line=None
except:pass
error = self.process.returncode
if error:
print("Error Found",str(error))
raise RuntimeError(error)
return self.result
execute = ExecutorFlushSTDOUT()
def liveOUTPUT(line):
print("liveOUTPUT",line)
try:
if "Writing" in line:
line=''.join([n for n in line.split(' ')[3] if n.isdigit()])
print("percent={}".format(line))
except Exception as e:
pass
result=execute.start_process(cmd2,callback=liveOUTPUT,validator="Hash of data verified.")
print("Finish",result)
解决方案 16:
简单比复杂好。
os
库有内置模块system
。您应该执行代码并查看输出。
import os
os.system("python --version")
# Output
"""
Python 3.8.6
0
"""
之后的版本也将返回值打印为0
。
解决方案 17:
在 Python 3.6 中我使用了这个:
import subprocess
cmd = "command"
output = subprocess.call(cmd, shell=True)
print(process)
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)
- 项目管理必备:盘点2024年13款好用的项目管理软件