Python 多处理 PicklingError:无法 pickle <type'function'>
- 2024-12-03 08:45:00
- admin 原创
- 184
问题描述:
很抱歉,我无法用更简单的示例重现该错误,而且我的代码太复杂,无法发布。如果我在 IPython shell 而不是常规 Python 中运行该程序,则一切顺利。
我查了一下之前关于这个问题的一些笔记,都是因为用pool调用类函数中定义的函数导致的,但我的情况不是这样。
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我将非常感激您的帮助。
更新:我 pickle 的函数是在模块的顶层定义的。虽然它调用了一个包含嵌套函数的函数。即,f()
调用具有嵌套函数的g()
调用,我正在调用。,,都是在顶层定义的。我尝试了这个模式的更简单的例子,但它确实有效。h()
`i()pool.apply_async(f)
f()g()
h()`
解决方案 1:
以下是可以 pickle 的内容列表。具体来说,只有在模块顶层定义的函数才可以被 pickle。
这段代码:
import multiprocessing as mp
class Foo():
@staticmethod
def work(self):
pass
if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
产生的错误与您发布的错误几乎相同:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
问题在于,pool
所有方法都使用mp.SimpleQueue
将任务传递给工作进程。通过 的所有内容都mp.SimpleQueue
必须是可拾取的,但foo.work
由于未在模块的顶层定义,因此不可拾取。
可以通过在顶层定义一个函数来修复此问题,该函数调用foo.work()
:
def work(foo):
foo.work()
pool.apply_async(work,args=(foo,))
请注意,foo
是可选择的,因为Foo
它是在顶层定义的,并且 foo.__dict__
是可选择的。
解决方案 2:
我会使用pathos.multiprocesssing
,而不是multiprocessing
。 是使用pathos.multiprocessing
的 fork 。可以序列化 Python 中的几乎所有内容,因此您可以并行发送更多内容。fork 还能够直接使用多个参数函数,就像类方法所需要的那样。multiprocessing
`dilldill
pathos`
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
... @staticmethod
... def work(self, x):
... return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101
获取pathos
(如果你喜欢,dill
)此处:https:
//github.com/uqfoundation
解决方案 3:
当这个问题出现时,multiprocessing
一个简单的解决方案是从 切换Pool
到ThreadPool
。这可以在不更改代码的情况下完成,除了导入之外-
from multiprocessing.pool import ThreadPool as Pool
这是因为 ThreadPool 与主线程共享内存,而不是创建新进程 —— 这意味着不需要 pickling。
这种方法的缺点是 Python 并不是处理线程最好的语言——它使用一种称为全局解释器锁的东西来保持线程安全,这可能会减慢这里的一些用例。但是,如果您主要与其他系统交互(运行 HTTP 命令、与数据库通信、写入文件系统),那么您的代码可能不受 CPU 的约束,不会受到太大的影响。事实上,我在编写 HTTP/HTTPS 基准测试时发现,这里使用的线程模型的开销和延迟更少,因为创建新进程的开销远高于创建新线程的开销,而程序只是在等待 HTTP 响应。
因此,如果您在 python 用户空间中处理大量内容,这可能不是最好的方法。
解决方案 4:
正如其他人所说,multiprocessing
只能将 Python 对象传输到可以进行 pickle 的工作进程。如果您无法按照 unutbu 所述重新组织代码,则可以使用dill
扩展的 pickling/unpickling 功能来传输数据(尤其是代码数据),如下所示。
该解决方案仅需要安装dill
,不需要其他库pathos
:
import os
from multiprocessing import Pool
import dill
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
return fun(*args)
def apply_async(pool, fun, args):
payload = dill.dumps((fun, args))
return pool.apply_async(run_dill_encoded, (payload,))
if __name__ == "__main__":
pool = Pool(processes=5)
# asyn execution of lambda
jobs = []
for i in range(10):
job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
jobs.append(job)
for job in jobs:
print job.get()
print
# async execution of static method
class O(object):
@staticmethod
def calc():
return os.getpid()
jobs = []
for i in range(10):
job = apply_async(pool, O.calc, ())
jobs.append(job)
for job in jobs:
print job.get()
解决方案 5:
我发现,通过尝试使用分析器,我也可以在一段完美运行的代码上生成准确的错误输出。
请注意,这是在 Windows 上(分叉有点不太优雅)。
我正在跑步:
python -m profile -o output.pstats <script>
我发现删除分析会消除错误,而放置分析会恢复错误。这也让我抓狂,因为我知道代码曾经可以工作。我检查是否有东西更新了pool.py...然后有一种沉重的感觉,于是删除了分析,就这样了。
在此发布以供存档,以防其他人遇到它。
解决方案 6:
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
如果传递给异步作业的模型对象内部有任何内置函数,也会出现此错误。
因此,请确保检查传递的模型对象没有内置函数。(在我们的例子中,我们使用模型内部的django-model-utilsFieldTracker()
函数来跟踪某个字段)。以下是相关 GitHub 问题的链接。
解决方案 7:
此解决方案仅需要安装 dill,不需要安装其他库,因为 pathos
def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
"""
Unpack dumped function as target function and call it with arguments.
:param (dumped_function, item, args, kwargs):
a tuple of dumped function and its arguments
:return:
result of target function
"""
target_function = dill.loads(dumped_function)
res = target_function(item, *args, **kwargs)
return res
def pack_function_for_map(target_function, items, *args, **kwargs):
"""
Pack function and arguments to object that can be sent from one
multiprocessing.Process to another. The main problem is:
«multiprocessing.Pool.map*» or «apply*»
cannot use class methods or closures.
It solves this problem with «dill».
It works with target function as argument, dumps it («with dill»)
and returns dumped function with arguments of target function.
For more performance we dump only target function itself
and don't dump its arguments.
How to use (pseudo-code):
~>>> import multiprocessing
~>>> images = [...]
~>>> pool = multiprocessing.Pool(100500)
~>>> features = pool.map(
~... *pack_function_for_map(
~... super(Extractor, self).extract_features,
~... images,
~... type='png'
~... **options,
~... )
~... )
~>>>
:param target_function:
function, that you want to execute like target_function(item, *args, **kwargs).
:param items:
list of items for map
:param args:
positional arguments for target_function(item, *args, **kwargs)
:param kwargs:
named arguments for target_function(item, *args, **kwargs)
:return: tuple(function_wrapper, dumped_items)
It returs a tuple with
* function wrapper, that unpack and call target function;
* list of packed target function and its' arguments.
"""
dumped_function = dill.dumps(target_function)
dumped_items = [(dumped_function, item, args, kwargs) for item in items]
return apply_packed_function_for_map, dumped_items
它也适用于 numpy 数组。
解决方案 8:
一个快速的解决方法是使函数全局化
from multiprocessing import Pool
class Test:
def __init__(self, x):
self.x = x
@staticmethod
def test(x):
return x**2
def test_apply(self, list_):
global r
def r(x):
return Test.test(x + self.x)
with Pool() as p:
l = p.map(r, list_)
return l
if __name__ == '__main__':
o = Test(2)
print(o.test_apply(range(10)))
解决方案 9:
基于@rocksportrocker 解决方案,在发送和接收结果时进行 dill 是有意义的。
import dill
import itertools
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
res = fun(*args)
res = dill.dumps(res)
return res
def dill_map_async(pool, fun, args_list,
as_tuple=True,
**kw):
if as_tuple:
args_list = ((x,) for x in args_list)
it = itertools.izip(
itertools.cycle([fun]),
args_list)
it = itertools.imap(dill.dumps, it)
return pool.map_async(run_dill_encoded, it, **kw)
if __name__ == '__main__':
import multiprocessing as mp
import sys,os
p = mp.Pool(4)
res = dill_map_async(p, lambda x:[sys.stdout.write('%s
'%os.getpid()),x][-1],
[lambda x:x+1]*10,)
res = res.get(timeout=100)
res = map(dill.loads,res)
print(res)
解决方案 10:
这就是我目前使用的multiprocessing.Pool
apply
方法apply_async
:
import multiprocessing
from tqdm import tqdm
import time
def worker(symbol):
print(symbol)
return symbol
if __name__ == '__main__':
symbols = ['AAPL', 'GOOG', 'MSFT', 'AMZN', 'FB'] # List of symbols to process
symbol_list_length = len(symbols)
with tqdm(total=symbol_list_length, desc="Obtaining data...") as progress_bar:
def update_progress_callback(result):
progress_bar.update(1)
with multiprocessing.Pool(processes=4) as pool:
for symbol in symbols:
pool.apply_async(worker, args=(symbol,), callback=update_progress_callback)
pool.close()
pool.join()
返回池处理的update_progress_callback
结果,return
我猜这也是一个奖励。
解决方案 11:
正如 @penky Suresh 在这个答案中所建议的,不要使用内置关键字。
显然args
是处理多处理时的一个内置关键字
class TTS:
def __init__(self):
pass
def process_and_render_items(self):
multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]
with ProcessPoolExecutor(max_workers=10) as executor:
# Using args here is fine.
future_processes = {
executor.submit(TTS.process_and_render_item, args)
for args in multiprocessing_args
}
for future in as_completed(future_processes):
try:
data = future.result()
except Exception as exc:
print(f"Generated an exception: {exc}")
else:
print(f"Generated data for comment process: {future}")
# Dont use 'args' here. It seems to be a built-in keyword.
# Changing 'args' to 'arg' worked for me.
def process_and_render_item(arg):
print(arg)
# This will print {"a": "b", "c": "d"} for the first process
# and {"e": "f", "g": "h"} for the second process.
PS:制表符/空格可能有点偏离。