Python:并行执行 cat 子进程
- 2025-01-16 08:38:00
- admin 原创
- 107
问题描述:
我在cat | zgrep
远程服务器上运行几个命令并单独收集它们的输出以进行进一步处理:
class MainProcessor(mp.Process):
def __init__(self, peaks_array):
super(MainProcessor, self).__init__()
self.peaks_array = peaks_array
def run(self):
for peak_arr in self.peaks_array:
peak_processor = PeakProcessor(peak_arr)
peak_processor.start()
class PeakProcessor(mp.Process):
def __init__(self, peak_arr):
super(PeakProcessor, self).__init__()
self.peak_arr = peak_arr
def run(self):
command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
log_lines = (subprocess.check_output(command, shell=True)).split('
')
process_data(log_lines)
但是,这会导致子进程('ssh ... cat ...')命令的顺序执行。第二个峰值等待第一个峰值完成,依此类推。
我如何修改此代码以便子进程调用并行运行,同时仍然能够单独收集每个子进程的输出?
解决方案 1:
您不需要multiprocessing
或threading
并行运行子进程。例如:
#!/usr/bin/env python
from subprocess import Popen
# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]
它同时运行 5 个 shell 命令。注意:这里既不使用线程也不multiprocessing
使用模块。没有必要&
在 shell 命令中添加 & 符号:Popen
不等待命令完成。您需要.wait()
明确调用。
使用线程来收集子进程的输出很方便,但不是必须的:
#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT
# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
for i in range(5)]
# collect output in parallel
def get_lines(process):
return process.communicate()[0].splitlines()
outputs = Pool(len(processes)).map(get_lines, processes)
相关:Python 线程多个 bash 子进程?。
以下是在同一线程中同时从多个子进程获取输出的代码示例(Python 3.8+):
#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE, STDOUT
async def get_lines(shell_command):
p = await asyncio.create_subprocess_shell(
shell_command, stdin=PIPE, stdout=PIPE, stderr=STDOUT
)
return (await p.communicate())[0].splitlines()
async def main():
# get commands output in parallel
coros = [
get_lines(
f'"{sys.executable}" -c "print({i:d}); import time; time.sleep({i:d})"'
)
for i in range(5)
]
print(await asyncio.gather(*coros))
if __name__ == "__main__":
asyncio.run(main())
旧(2014)答案(Python 3.4?):
#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT
@asyncio.coroutine
def get_lines(shell_command):
p = yield from asyncio.create_subprocess_shell(shell_command,
stdin=PIPE, stdout=PIPE, stderr=STDOUT)
return (yield from p.communicate())[0].splitlines()
if sys.platform.startswith('win'):
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
.format(i=i, e=sys.executable)) for i in range(5)]
print(loop.run_until_complete(asyncio.gather(*coros)))
loop.close()
解决方案 2:
另一种方法(而不是将 shell 进程放在后台的其他建议)是使用多线程。
您所使用的方法run
将会执行以下操作:
thread.start_new_thread ( myFuncThatDoesZGrep)
要收集结果,您可以执行以下操作:
class MyThread(threading.Thread):
def run(self):
self.finished = False
# Your code to run the command here.
blahBlah()
# When finished....
self.finished = True
self.results = []
按照上面多线程链接中所述运行线程。当您的线程对象具有 myThread.finished == True 时,您可以通过 myThread.results 收集结果。
相关推荐
热门文章
项目管理软件有哪些?
热门标签
云禅道AD