Python 多处理 PicklingError:无法 pickle <type'function'>

2024-12-03 08:45:00
admin
原创
182
摘要:问题描述:很抱歉,我无法用更简单的示例重现该错误,而且我的代码太复杂,无法发布。如果我在 IPython shell 而不是常规 Python 中运行该程序,则一切顺利。我查了一下之前关于这个问题的一些笔记,都是因为用pool调用类函数中定义的函数导致的,但我的情况不是这样。Exception in thre...

问题描述:

很抱歉,我无法用更简单的示例重现该错误,而且我的代码太复杂,无法发布。如果我在 IPython shell 而不是常规 Python 中运行该程序,则一切顺利。

我查了一下之前关于这个问题的一些笔记,都是因为用pool调用类函数中定义的函数导致的,但我的情况不是这样。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我将非常感激您的帮助。

更新:我 pickle 的函数是在模块的顶层定义的。虽然它调用了一个包含嵌套函数的函数。即,f()调用具有嵌套函数的g()调用,我正在调用。,,都是在顶层定义的。我尝试了这个模式的更简单的例子,但它确实有效。h()`i()pool.apply_async(f)f()g()h()`


解决方案 1:

以下是可以 pickle 的内容列表。具体来说,只有在模块顶层定义的函数才可以被 pickle。

这段代码:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

产生的错误与您发布的错误几乎相同:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

问题在于,pool所有方法都使用mp.SimpleQueue将任务传递给工作进程。通过 的所有内容都mp.SimpleQueue必须是可拾取的,但foo.work由于未在模块的顶层定义,因此不可拾取。

可以通过在顶层定义一个函数来修复此问题,该函数调用foo.work()

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

请注意,foo是可选择的,因为Foo它是在顶层定义的,并且 foo.__dict__是可选择的。

解决方案 2:

我会使用pathos.multiprocesssing,而不是multiprocessing。 是使用pathos.multiprocessing的 fork 。可以序列化 Python 中的几乎所有内容,因此您可以并行发送更多内容。fork 还能够直接使用多个参数函数,就像类方法所需要的那样。multiprocessing`dilldillpathos`

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

获取pathos(如果你喜欢,dill)此处:https:
//github.com/uqfoundation

解决方案 3:

当这个问题出现时,multiprocessing一个简单的解决方案是从 切换PoolThreadPool。这可以在不更改代码的情况下完成,除了导入之外-

from multiprocessing.pool import ThreadPool as Pool

这是因为 ThreadPool 与主线程共享内存,而不是创建新进程 —— 这意味着不需要 pickling。

这种方法的缺点是 Python 并不是处理线程最好的语言——它使用一种称为全局解释器锁的东西来保持线程安全,这可能会减慢这里的一些用例。但是,如果您主要与其他系统交互(运行 HTTP 命令、与数据库通信、写入文件系统),那么您的代码可能不受 CPU 的约束,不会受到太大的影响。事实上,我在编写 HTTP/HTTPS 基准测试时发现,这里使用的线程模型的开销和延迟更少,因为创建新进程的开销远高于创建新线程的开销,而程序只是在等待 HTTP 响应。

因此,如果您在 python 用户空间中处理大量内容,这可能不是最好的方法。

解决方案 4:

正如其他人所说,multiprocessing只能将 Python 对象传输到可以进行 pickle 的工作进程。如果您无法按照 unutbu 所述重新组织代码,则可以使用dill扩展的 pickling/unpickling 功能来传输数据(尤其是代码数据),如下所示。

该解决方案仅需要安装dill,不需要其他库pathos

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

解决方案 5:

我发现,通过尝试使用分析器,我也可以在一段完美运行的代码上生成准确的错误输出。

请注意,这是在 Windows 上(分叉有点不太优雅)。

我正在跑步:

python -m profile -o output.pstats <script> 

我发现删除分析会消除错误,而放置分析会恢复错误。这也让我抓狂,因为我知道代码曾经可以工作。我检查是否有东西更新了pool.py...然后有一种沉重的感觉,于是删除了分析,就这样了。

在此发布以供存档,以防其他人遇到它。

解决方案 6:

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

如果传递给异步作业的模型对象内部有任何内置函数,也会出现此错误。

因此,请确保检查传递的模型对象没有内置函数。(在我们的例子中,我们使用模型内部的django-model-utilsFieldTracker()函数来跟踪某个字段)。以下是相关 GitHub 问题的链接。

解决方案 7:

此解决方案仅需要安装 dill,不需要安装其他库,因为 pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

它也适用于 numpy 数组。

解决方案 8:

一个快速的解决方法是使函数全局化

from multiprocessing import Pool


class Test:
    def __init__(self, x):
        self.x = x
    
    @staticmethod
    def test(x):
        return x**2


    def test_apply(self, list_):
        global r
        def r(x):
            return Test.test(x + self.x)

        with Pool() as p:
            l = p.map(r, list_)

        return l



if __name__ == '__main__':
    o = Test(2)
    print(o.test_apply(range(10)))

解决方案 9:

基于@rocksportrocker 解决方案,在发送和接收结果时进行 dill 是有意义的。

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s
'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)

解决方案 10:

这就是我目前使用的multiprocessing.Pool apply方法apply_async

import multiprocessing
from tqdm import tqdm
import time

def worker(symbol):
    print(symbol)
    return symbol

if __name__ == '__main__':
    symbols = ['AAPL', 'GOOG', 'MSFT', 'AMZN', 'FB']  # List of symbols to process
    symbol_list_length = len(symbols)

    with tqdm(total=symbol_list_length, desc="Obtaining data...") as progress_bar:
        def update_progress_callback(result):
            progress_bar.update(1)

        with multiprocessing.Pool(processes=4) as pool:
            for symbol in symbols:
                pool.apply_async(worker, args=(symbol,), callback=update_progress_callback)

            pool.close()
            pool.join()

返回池处理的update_progress_callback结果,return我猜这也是一个奖励。

解决方案 11:

正如 @penky Suresh 在这个答案中所建议的,不要使用内置关键字。

显然args是处理多处理时的一个内置关键字


class TTS:
    def __init__(self):
        pass

    def process_and_render_items(self):
        multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]

        with ProcessPoolExecutor(max_workers=10) as executor:
          # Using args here is fine. 
            future_processes = {
              executor.submit(TTS.process_and_render_item, args)
                for args in multiprocessing_args
            }

            for future in as_completed(future_processes):
                try:
                    data = future.result()
                except Exception as exc:
                    print(f"Generated an exception: {exc}")
                else:
                   print(f"Generated data for comment process: {future}")
 

    # Dont use 'args' here. It seems to be a built-in keyword.
    # Changing 'args' to 'arg' worked for me.
    def process_and_render_item(arg):
        print(arg)
      # This will print {"a": "b", "c": "d"} for the first process
      # and {"e": "f", "g": "h"} for the second process.



PS:制表符/空格可能有点偏离。

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用