如何获取传递给 multiprocessing.Process 的函数的返回值?

2024-12-16 08:35:00
admin
原创
138
摘要:问题描述:在下面的示例代码中,我想获取函数的返回值worker。我该怎么做?这个值存储在哪里?示例代码:import multiprocessing def worker(procnum): '''worker function''' print str(procnum) + ' repre...

问题描述:

在下面的示例代码中,我想获取函数的返回值worker。我该怎么做?这个值存储在哪里?

示例代码:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

我似乎无法在存储的对象中找到相关属性jobs


解决方案 1:

使用共享变量进行通信。例如,

示例代码:

import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())

输出:

0 represent!
1 represent!
3 represent!
2 represent!
4 represent!
[0, 1, 3, 2, 4]

解决方案 2:

我认为sega_sai 建议的方法更好。但它确实需要一个代码示例,因此如下:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

这将打印返回值:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

如果您熟悉map(Python 2 内置),这应该不会太难。否则,请查看sega_Sai 的链接。

请注意,所需的代码非常少。(还请注意如何重复使用流程。)

解决方案 3:

对于任何正在寻求如何从Process使用中获取价值的人来说Queue

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

请注意,在 Windows 或 Jupyter Notebook 中,multithreading您必须将其保存为文件并执行该文件。如果您在命令提示符中执行此操作,您将看到如下错误:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>

解决方案 4:

由于某种原因,我无法Queue在任何地方找到如何执行此操作的一般示例(即使 Python 的文档示例也不会产生多个进程),因此,这是我尝试 10 次后得到的结果:

from multiprocessing import Process, Queue

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue是一个阻塞的线程安全队列,可用于存储子进程的返回值。因此,您必须将队列传递给每个进程。这里不太明显的是,您必须在esget()之前从队列中取出,否则队列会填满并阻塞所有内容。join`Process`

面向对象人士的更新(在 Python 3.4 中测试):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

解决方案 5:

此示例显示如何使用multiprocessing.Pipe实例列表从任意数量的进程返回字符串:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

该解决方案使用的资源比multiprocessing.Queue要少,后者使用

  • 管道

  • 至少一个锁

  • 缓冲区

  • 一个线程

或者使用multiprocessing.SimpleQueue

  • 管道

  • 至少一个锁

查看每种类型的来源都非常有启发。

解决方案 6:

看来您应该使用multiprocessing.Pool类,并使用方法 .apply() .apply_async()、map()

参考:class multiprocessing.pool.AsyncResult

解决方案 7:

可以使用内置函数设置进程的退出代码,可以从进程的属性exit中获取:exitcode

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

解决方案 8:

pebble包有一个很好的抽象杠杆作用,这multiprocessing.Pipe使得这变得非常简单:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

示例来自:https://pythonhosted.org/Pebble/#concurrent-decorators

解决方案 9:

我想简化从上面复制的最简单的示例,这些示例在 Py3.6 上对我有用。最简单的是multiprocessing.Pool

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

您可以使用例如来设置池中的进程数Pool(processes=5)。但是它默认为 CPU 数量,因此对于 CPU 密集型任务,请将其留空。(I/O 密集型任务通常适合线程,因为线程大多处于等待状态,因此可以共享 CPU 核心。)Pool也应用分块优化。

(请注意,工作方法不能嵌套在方法中。我最初在调用的方法内部定义了工作方法pool.map,以使其保持独立,但随后进程无法导入它,并抛出了“AttributeError:无法腌制本地对象 outer_method..inner_method”。更多信息请见这里。它可以在类内部。)

(感谢原始问题指定打印'represent!'而不是time.sleep(),但如果没有它,我认为某些代码正在同时运行,但事实并非如此。)


Py3ProcessPoolExecutor也有两行(.map返回一个生成器,因此你需要list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

使用简单的Processes:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

SimpleQueue如果您需要的只是put和,请使用get。第一个循环启动所有进程,然后第二个循环进行阻塞queue.get调用。我认为没有必要调用p.join()

解决方案 10:

如果您使用的是 Python 3,则可以使用concurrent.futures.ProcessPoolExecutor以下方便的抽象:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

解决方案 11:

您可以使用ProcessPoolExecutor从函数中获取返回值,如下所示:

from concurrent.futures import ProcessPoolExecutor

def test(num1, num2):
    return num1 + num2

with ProcessPoolExecutor() as executor:
    feature = executor.submit(test, 2, 3)
    print(feature.result()) # 5

解决方案 12:

一个简单的解决方案:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

解决方案 13:

我稍微修改了vartec 的答案,因为我需要从函数中获取错误代码。 (谢谢 vartec !!!这是一个很棒的技巧。)

这也可以用 来实现manager.list,但我认为最好将其放在字典中并在其中存储列表。这样,我们就可以保留函数和结果,因为我们无法确定列表的填充顺序。

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # No need for return since Multiprocess doesn't return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # If fail won't join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time-out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # Here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # 'name' was set up in the Process line 53

    # Here will only show the function that worked and where able to populate the
    # manager dict
    for i, j in manager.items():
        print dir(i)  # Things you can do to the function
        print i, j

解决方案 14:

You can also use decorator for printing result of function

def printer(func):
    def inner(*args, **kwargs):
        result = func(*args, **kwargs)
        print(result)
        return result
    return inner


@printer
def cube(nums: list):
    result = []
    for v in nums:
        result.append(v ** 3)
    return result




if __name__ == '__main__':
    nums = [2, 3, 4, 6]
    p1 = Process(target=square, args=(nums,))
    p2 = Process(target=cube, args=(nums,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1579  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1355  
  信创产品在政府采购中的占比分析随着信息技术的飞速发展以及国家对信息安全重视程度的不断提高,信创产业应运而生并迅速崛起。信创,即信息技术应用创新,旨在实现信息技术领域的自主可控,减少对国外技术的依赖,保障国家信息安全。政府采购作为推动信创产业发展的重要力量,其对信创产品的采购占比情况备受关注。这不仅关系到信创产业的发展前...
信创和国产化的区别   8  
  信创,即信息技术应用创新产业,旨在实现信息技术领域的自主可控,摆脱对国外技术的依赖。近年来,国货国用信创发展势头迅猛,在诸多领域取得了显著成果。这一发展趋势对科技创新产生了深远的推动作用,不仅提升了我国在信息技术领域的自主创新能力,还为经济社会的数字化转型提供了坚实支撑。信创推动核心技术突破信创产业的发展促使企业和科研...
信创工作   9  
  信创技术,即信息技术应用创新产业,旨在实现信息技术领域的自主可控与安全可靠。近年来,信创技术发展迅猛,对中小企业产生了深远的影响,带来了诸多不可忽视的价值。在数字化转型的浪潮中,中小企业面临着激烈的市场竞争和复杂多变的环境,信创技术的出现为它们提供了新的发展机遇和支撑。信创技术对中小企业的影响技术架构变革信创技术促使中...
信创国产化   8  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用