Python 进程池是非守护进程吗?

2025-02-13 08:35:00
admin
原创
57
摘要:问题描述:是否可以创建一个非守护进程的 Python 池?我希望池能够调用内部有另一个池的函数。我想要这个是因为守护进程无法创建进程。具体来说,它会导致错误:AssertionError: daemonic processes are not allowed to have children 例如,考虑以下场...

问题描述:

是否可以创建一个非守护进程的 Python 池?我希望池能够调用内部有另一个池的函数。

我想要这个是因为守护进程无法创建进程。具体来说,它会导致错误:

AssertionError: daemonic processes are not allowed to have children

例如,考虑以下场景:function_a有一个运行的池function_b,该池有一个运行的池function_c。此函数链将失败,因为function_b正在守护进程中运行,而守护进程无法创建进程。


解决方案 1:

该类multiprocessing.pool.Pool在其方法中创建工作进程,使它们成为守护进程并启动它们,并且在启动它们之前__init__无法重新设置它们的daemon属性(之后也不允许这样做)。但您可以创建自己的子类(只是一个包装函数)并替换您自己的子类(它始终是非守护进程),以用于工作进程。False`multiprocesing.pool.Poolmultiprocessing.Poolmultiprocessing.Process`

以下是如何执行此操作的完整示例。重要的部分是顶部的两个类NoDaemonProcess和以及末尾在实例上调用的和。MyPool`pool.close()pool.join()MyPool`

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()

解决方案 2:

我需要在 Python 3.7 中使用非守护进程池,最终调整了已接受答案中发布的代码。下面是创建非守护进程池的代码片段:

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

由于的当前实现multiprocessing已被广泛重构为基于上下文,我们需要提供一个NoDaemonContext具有NoDaemonProcessas 属性的类。NestablePool然后将使用该上下文而不是默认上下文。

尽管如此,我应该警告,这种方法至少有两个注意事项:

  1. 它仍然依赖于multiprocessing包的实现细节,因此随时可能出现故障。

  2. multiprocessing造成非守护进程难以使用的原因有很多,其中许多原因都在此处进行了解释。我认为最有说服力的是:

至于允许子线程使用子进程产生自己的子线程,如果父线程或子线程在子进程完成并返回之前终止,则有创建一小队僵尸“孙子”的风险。

解决方案 3:

从 Python 3.8 开始,concurrent.futures.ProcessPoolExecutor不再有此限制。它可以毫无问题地拥有嵌套进程池:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

以上演示代码已使用 Python 3.8 进行测试。

然而,的局限性ProcessPoolExecutor在于它没有maxtasksperchild。如果您需要这个,请考虑Massimiliano 的答案。

来源:jfs 的回答

解决方案 4:

多处理模块具有良好的接口,可以将池与进程线程一起使用。根据您当前的用例,您可以考虑将其用于multiprocessing.pool.ThreadPool外部池,这将产生线程(允许从内部生成进程)而不是进程。

它可能受到 GIL 的限制,但在我的特定情况下(我对两者进行了测试) ,这里Pool创建的外部进程的启动时间远远超过了解决方案。ThreadPool


Processes交换起来真的很容易。在此处或此处Threads了解有关如何使用ThreadPool解决方案的更多信息。

解决方案 5:

在某些 Python 版本中,将标准池替换为自定义池可能会引发错误:AssertionError: group argument must be None for now

在这里我找到了一个可以提供帮助的解决方案:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc

解决方案 6:

我见过有人使用celery的分支multiprocessingbilliard (多处理池扩展)来处理这个问题,它允许守护进程生成子进程。解决方法是简单地将模块替换multiprocessing为:

import billiard as multiprocessing

解决方案 7:

我遇到的问题是尝试在模块之间导入全局变量,导致 ProcessPool() 行被多次评估。

全局变量.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
     
    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

然后从代码中的其他地方安全导入

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

我在这里编写了一个更加扩展的包装类pathos.multiprocessing

附注:如果您的用例只需要异步多进程映射作为性能优化,那么 joblib 将在后台管理您的所有进程池并允许使用这个非常简单的语法:

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )

解决方案 8:

即使您已经处于守护进程中,也可以通过以下方法启动池。这已在 Python 3.8.5 中进行了测试

首先定义Undaemonize上下文管理器,暂时删除当前进程的守护进程状态。

class Undaemonize(object):
    '''Context Manager to resolve AssertionError: daemonic processes are not allowed to have children
    
    Tested in python 3.8.5'''
    def __init__(self):
        self.p = multiprocessing.process.current_process()
        if 'daemon' in self.p._config:
            self.daemon_status_set = True
        else:
            self.daemon_status_set = False
        self.daemon_status_value = self.p._config.get('daemon')
    def __enter__(self):
        if self.daemon_status_set:
            del self.p._config['daemon']
    def __exit__(self, type, value, traceback):
        if self.daemon_status_set:
            self.p._config['daemon'] = self.daemon_status_value

现在您可以按如下方式启动池,甚至可以从守护进程内部启动:

with Undaemonize():
    pool = multiprocessing.Pool(1)
pool.map(... # you can do something with the pool outside of the context manager 

虽然这里的其他方法旨在首先创建非守护进程的池,但即使您已经处于守护进程中,这种方法也允许您启动池。

解决方案 9:

当错误看似是误报时,这提供了一种解决方法。正如James 所指出的,这种情况可能发生在从守护进程无意导入时。

比如如果你有下面这样的简单代码,WORKER_POOL可能会无意中从一个 worker 中导入,从而导致错误。

import multiprocessing

WORKER_POOL = multiprocessing.Pool()

一个简单但可靠的解决方法是:

import multiprocessing
import multiprocessing.pool


class MyClass:

    @property
    def worker_pool(self) -> multiprocessing.pool.Pool:
        # Ref: https://stackoverflow.com/a/63984747/
        try:
            return self._worker_pool  # type: ignore
        except AttributeError:
            # pylint: disable=protected-access
            self.__class__._worker_pool = multiprocessing.Pool()  # type: ignore
            return self.__class__._worker_pool  # type: ignore
            # pylint: enable=protected-access

在上述解决方法中,MyClass.worker_pool可以使用而不会出现错误。如果您认为此方法可以改进,请告诉我。

解决方案 10:

从 Python 3.7 版本开始,我们可以创建非守护进程 ProcessPoolExecutor

使用多处理时必须使用if __name__ == "__main__":

from concurrent.futures import ProcessPoolExecutor as Pool

num_pool = 10
    
def main_pool(num):
    print(num)
    strings_write = (f'{num}-{i}' for i in range(num))
    with Pool(num) as subp:
        subp.map(sub_pool,strings_write)
    return None


def sub_pool(x):
    print(f'{x}')
    return None


if __name__ == "__main__":
    with Pool(num_pool) as p:
        p.map(main_pool,list(range(1,num_pool+1)))

解决方案 11:

对我来说,这个答案很有帮助:https://stackoverflow.com/a/71929459/14715428

虽然问题本身有点不同,但速度保持不变,我不需要重写任何内容

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1590  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1361  
  信创产品在政府采购中的占比分析随着信息技术的飞速发展以及国家对信息安全重视程度的不断提高,信创产业应运而生并迅速崛起。信创,即信息技术应用创新,旨在实现信息技术领域的自主可控,减少对国外技术的依赖,保障国家信息安全。政府采购作为推动信创产业发展的重要力量,其对信创产品的采购占比情况备受关注。这不仅关系到信创产业的发展前...
信创和国产化的区别   18  
  信创,即信息技术应用创新产业,旨在实现信息技术领域的自主可控,摆脱对国外技术的依赖。近年来,国货国用信创发展势头迅猛,在诸多领域取得了显著成果。这一发展趋势对科技创新产生了深远的推动作用,不仅提升了我国在信息技术领域的自主创新能力,还为经济社会的数字化转型提供了坚实支撑。信创推动核心技术突破信创产业的发展促使企业和科研...
信创工作   18  
  信创技术,即信息技术应用创新产业,旨在实现信息技术领域的自主可控与安全可靠。近年来,信创技术发展迅猛,对中小企业产生了深远的影响,带来了诸多不可忽视的价值。在数字化转型的浪潮中,中小企业面临着激烈的市场竞争和复杂多变的环境,信创技术的出现为它们提供了新的发展机遇和支撑。信创技术对中小企业的影响技术架构变革信创技术促使中...
信创国产化   19  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用