Python:并行执行 cat 子进程

2025-01-16 08:38:00
admin
原创
107
摘要:问题描述:我在cat | zgrep远程服务器上运行几个命令并单独收集它们的输出以进行进一步处理:class MainProcessor(mp.Process): def __init__(self, peaks_array): super(MainProcessor, self)._...

问题描述:

我在cat | zgrep远程服务器上运行几个命令并单独收集它们的输出以进行进一步处理:

class MainProcessor(mp.Process):
    def __init__(self, peaks_array):
        super(MainProcessor, self).__init__()
        self.peaks_array = peaks_array

    def run(self):
        for peak_arr in self.peaks_array:
            peak_processor = PeakProcessor(peak_arr)
            peak_processor.start()

class PeakProcessor(mp.Process):
    def __init__(self, peak_arr):
        super(PeakProcessor, self).__init__()
        self.peak_arr = peak_arr

    def run(self):
        command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
        log_lines = (subprocess.check_output(command, shell=True)).split('
')
        process_data(log_lines)

但是,这会导致子进程('ssh ... cat ...')命令的顺序执行。第二个峰值等待第一个峰值完成,依此类推。

我如何修改此代码以便子进程调用并行运行,同时仍然能够单独收集每个子进程的输出?


解决方案 1:

您不需要multiprocessingthreading并行运行子进程。例如:

#!/usr/bin/env python
from subprocess import Popen

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
             for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]

它同时运行 5 个 shell 命令。注意:这里既不使用线程也不multiprocessing使用模块。没有必要&在 shell 命令中添加 & 符号:Popen不等待命令完成。您需要.wait()明确调用。

使用线程来收集子进程的输出很方便,但不是必须的:

#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
                   stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
             for i in range(5)]

# collect output in parallel
def get_lines(process):
    return process.communicate()[0].splitlines()

outputs = Pool(len(processes)).map(get_lines, processes)

相关:Python 线程多个 bash 子进程?。

以下是在同一线程中同时从多个子进程获取输出的代码示例(Python 3.8+):

#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE, STDOUT


async def get_lines(shell_command):
    p = await asyncio.create_subprocess_shell(
        shell_command, stdin=PIPE, stdout=PIPE, stderr=STDOUT
    )
    return (await p.communicate())[0].splitlines()


async def main():
    # get commands output in parallel
    coros = [
        get_lines(
            f'"{sys.executable}" -c "print({i:d}); import time; time.sleep({i:d})"'
        )
        for i in range(5)
    ]
    print(await asyncio.gather(*coros))


if __name__ == "__main__":
    asyncio.run(main())

旧(2014)答案(Python 3.4?):

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

@asyncio.coroutine
def get_lines(shell_command):
    p = yield from asyncio.create_subprocess_shell(shell_command,
            stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    return (yield from p.communicate())[0].splitlines()

if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
                    .format(i=i, e=sys.executable)) for i in range(5)]
print(loop.run_until_complete(asyncio.gather(*coros)))
loop.close()

解决方案 2:

另一种方法(而不是将 shell 进程放在后台的其他建议)是使用多线程。

您所使用的方法run将会执行以下操作:

thread.start_new_thread ( myFuncThatDoesZGrep)

要收集结果,您可以执行以下操作:

class MyThread(threading.Thread):
   def run(self):
       self.finished = False
       # Your code to run the command here.
       blahBlah()
       # When finished....
       self.finished = True
       self.results = []

按照上面多线程链接中所述运行线程。当您的线程对象具有 myThread.finished == True 时,您可以通过 myThread.results 收集结果。

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用