如何获取传递给 multiprocessing.Process 的函数的返回值?
- 2024-12-16 08:35:00
- admin 原创
- 137
问题描述:
在下面的示例代码中,我想获取函数的返回值worker
。我该怎么做?这个值存储在哪里?
示例代码:
import multiprocessing
def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
我似乎无法在存储的对象中找到相关属性jobs
。
解决方案 1:
使用共享变量进行通信。例如,
示例代码:
import multiprocessing
def worker(procnum, return_dict):
"""worker function"""
print(str(procnum) + " represent!")
return_dict[procnum] = procnum
if __name__ == "__main__":
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print(return_dict.values())
输出:
0 represent!
1 represent!
3 represent!
2 represent!
4 represent!
[0, 1, 3, 2, 4]
解决方案 2:
我认为sega_sai 建议的方法更好。但它确实需要一个代码示例,因此如下:
import multiprocessing
from os import getpid
def worker(procnum):
print('I am number %d in process %d' % (procnum, getpid()))
return getpid()
if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print(pool.map(worker, range(5)))
这将打印返回值:
I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]
如果您熟悉map
(Python 2 内置),这应该不会太难。否则,请查看sega_Sai 的链接。
请注意,所需的代码非常少。(还请注意如何重复使用流程。)
解决方案 3:
对于任何正在寻求如何从Process
使用中获取价值的人来说Queue
:
import multiprocessing
ret = {'foo': False}
def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)
if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join()
print(queue.get()) # Prints {"foo": True}
请注意,在 Windows 或 Jupyter Notebook 中,multithreading
您必须将其保存为文件并执行该文件。如果您在命令提示符中执行此操作,您将看到如下错误:
AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
解决方案 4:
由于某种原因,我无法Queue
在任何地方找到如何执行此操作的一般示例(即使 Python 的文档示例也不会产生多个进程),因此,这是我尝试 10 次后得到的结果:
from multiprocessing import Process, Queue
def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)
def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
Queue
是一个阻塞的线程安全队列,可用于存储子进程的返回值。因此,您必须将队列传递给每个进程。这里不太明显的是,您必须在esget()
之前从队列中取出,否则队列会填满并阻塞所有内容。join
`Process`
面向对象人士的更新(在 Python 3.4 中测试):
from multiprocessing import Process, Queue
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets
# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
解决方案 5:
此示例显示如何使用multiprocessing.Pipe实例列表从任意数量的进程返回字符串:
import multiprocessing
def worker(procnum, send_end):
'''worker function'''
result = str(procnum) + ' represent!'
print result
send_end.send(result)
def main():
jobs = []
pipe_list = []
for i in range(5):
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=worker, args=(i, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print result_list
if __name__ == '__main__':
main()
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']
该解决方案使用的资源比multiprocessing.Queue要少,后者使用
管道
至少一个锁
缓冲区
一个线程
或者使用multiprocessing.SimpleQueue
管道
至少一个锁
查看每种类型的来源都非常有启发。
解决方案 6:
看来您应该使用multiprocessing.Pool类,并使用方法 .apply() .apply_async()、map()
参考:class multiprocessing.pool.AsyncResult
解决方案 7:
可以使用内置函数设置进程的退出代码,可以从进程的属性exit
中获取:exitcode
import multiprocessing
def worker(procnum):
print str(procnum) + ' represent!'
exit(procnum)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
result = []
for proc in jobs:
proc.join()
result.append(proc.exitcode)
print result
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
解决方案 8:
pebble包有一个很好的抽象杠杆作用,这multiprocessing.Pipe
使得这变得非常简单:
from pebble import concurrent
@concurrent.process
def function(arg, kwarg=0):
return arg + kwarg
future = function(1, kwarg=1)
print(future.result())
示例来自:https://pythonhosted.org/Pebble/#concurrent-decorators
解决方案 9:
我想简化从上面复制的最简单的示例,这些示例在 Py3.6 上对我有用。最简单的是multiprocessing.Pool
:
import multiprocessing
import time
def worker(x):
time.sleep(1)
return x
pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))
您可以使用例如来设置池中的进程数Pool(processes=5)
。但是它默认为 CPU 数量,因此对于 CPU 密集型任务,请将其留空。(I/O 密集型任务通常适合线程,因为线程大多处于等待状态,因此可以共享 CPU 核心。)Pool
也应用分块优化。
(请注意,工作方法不能嵌套在方法中。我最初在调用的方法内部定义了工作方法pool.map
,以使其保持独立,但随后进程无法导入它,并抛出了“AttributeError:无法腌制本地对象 outer_method..inner_method”。更多信息请见这里。它可以在类内部。)
(感谢原始问题指定打印'represent!'
而不是time.sleep()
,但如果没有它,我认为某些代码正在同时运行,但事实并非如此。)
Py3ProcessPoolExecutor
也有两行(.map
返回一个生成器,因此你需要list()
):
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(10))))
使用简单的Process
es:
import multiprocessing
import time
def worker(x, queue):
time.sleep(1)
queue.put(x)
queue = multiprocessing.SimpleQueue()
tasks = range(10)
for task in tasks:
multiprocessing.Process(target=worker, args=(task, queue,)).start()
for _ in tasks:
print(queue.get())
SimpleQueue
如果您需要的只是put
和,请使用get
。第一个循环启动所有进程,然后第二个循环进行阻塞queue.get
调用。我认为没有必要调用p.join()
。
解决方案 10:
如果您使用的是 Python 3,则可以使用concurrent.futures.ProcessPoolExecutor
以下方便的抽象:
from concurrent.futures import ProcessPoolExecutor
def worker(procnum):
'''worker function'''
print(str(procnum) + ' represent!')
return procnum
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(5))))
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
解决方案 11:
您可以使用ProcessPoolExecutor从函数中获取返回值,如下所示:
from concurrent.futures import ProcessPoolExecutor
def test(num1, num2):
return num1 + num2
with ProcessPoolExecutor() as executor:
feature = executor.submit(test, 2, 3)
print(feature.result()) # 5
解决方案 12:
一个简单的解决方案:
import multiprocessing
output=[]
data = range(0,10)
def f(x):
return x**2
def handler():
p = multiprocessing.Pool(64)
r=p.map(f, data)
return r
if __name__ == '__main__':
output.append(handler())
print(output[0])
输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
解决方案 13:
我稍微修改了vartec 的答案,因为我需要从函数中获取错误代码。 (谢谢 vartec !!!这是一个很棒的技巧。)
这也可以用 来实现manager.list
,但我认为最好将其放在字典中并在其中存储列表。这样,我们就可以保留函数和结果,因为我们无法确定列表的填充顺序。
from multiprocessing import Process
import time
import datetime
import multiprocessing
def func1(fn, m_list):
print 'func1: starting'
time.sleep(1)
m_list[fn] = "this is the first function"
print 'func1: finishing'
# return "func1" # No need for return since Multiprocess doesn't return it =(
def func2(fn, m_list):
print 'func2: starting'
time.sleep(3)
m_list[fn] = "this is function 2"
print 'func2: finishing'
# return "func2"
def func3(fn, m_list):
print 'func3: starting'
time.sleep(9)
# If fail won't join the rest because it never populate the dict
# or do a try/except to get something in return.
raise ValueError("failed here")
# if we want to get the error in the manager dict we can catch the error
try:
raise ValueError("failed here")
m_list[fn] = "this is third"
except:
m_list[fn] = "this is third and it fail horrible"
# print 'func3: finishing'
# return "func3"
def runInParallel(*fns): # * is to accept any input in list
start_time = datetime.datetime.now()
proc = []
manager = multiprocessing.Manager()
m_list = manager.dict()
for fn in fns:
# print fn
# print dir(fn)
p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
p.start()
proc.append(p)
for p in proc:
p.join() # 5 is the time-out
print datetime.datetime.now() - start_time
return m_list, proc
if __name__ == '__main__':
manager, proc = runInParallel(func1, func2, func3)
# print dir(proc[0])
# print proc[0]._name
# print proc[0].name
# print proc[0].exitcode
# Here you can check what did fail
for i in proc:
print i.name, i.exitcode # 'name' was set up in the Process line 53
# Here will only show the function that worked and where able to populate the
# manager dict
for i, j in manager.items():
print dir(i) # Things you can do to the function
print i, j
解决方案 14:
You can also use decorator for printing result of function
def printer(func):
def inner(*args, **kwargs):
result = func(*args, **kwargs)
print(result)
return result
return inner
@printer
def cube(nums: list):
result = []
for v in nums:
result.append(v ** 3)
return result
if __name__ == '__main__':
nums = [2, 3, 4, 6]
p1 = Process(target=square, args=(nums,))
p2 = Process(target=cube, args=(nums,))
p1.start()
p2.start()
p1.join()
p2.join()