多处理中的共享内存对象

2025-01-07 08:44:00
admin
原创
108
摘要:问题描述:假设我有一个很大的内存 numpy 数组,我有一个函数func将这个巨型数组作为输入(连同一些其他参数)。func具有不同参数的函数可以并行运行。例如:def func(arr, param): # do stuff to arr, param # build array arr poo...

问题描述:

假设我有一个很大的内存 numpy 数组,我有一个函数func将这个巨型数组作为输入(连同一些其他参数)。func具有不同参数的函数可以并行运行。例如:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

如果我使用多处理库,那么该巨型数组将被多次复制到不同的进程中。

有没有办法让不同的进程共享同一个数组?这个数组对象是只读的,永远不会被修改。

更复杂的是,如果 arr 不是一个数组,而是一个任意的 python 对象,有没有办法共享它?

[已编辑]

我读了答案,但还是有点困惑。由于 fork() 是写时复制的,因此在 python 多处理库中生成新进程时,我们不应该调用任何额外成本。但以下代码表明存在巨大的开销:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

输出(顺便说一下,成本随着数组的大小增加而增加,所以我怀疑仍然存在与内存复制相关的开销):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

如果我们不复制数组,为什么会有如此巨大的开销?共享内存又能为我节省多少呢?


解决方案 1:

如果您使用的操作系统采用写时复制fork()语义(如任何常见的 unix),那么只要您不改变数据结构,它就会可供所有子进程使用,而不会占用额外内存。您无需做任何特殊的事情(除了绝对确保您不改变对象)。

解决此问题最有效的方法将数组打包成一个高效的数组结构(使用numpyarray),将其放在共享内存中,用 包装它multiprocessing.Array,然后将其传递给函数。此答案显示了如何做到这一点。

如果您想要一个可写的共享对象,那么您将需要用某种同步或锁定来包装它。multiprocessing提供了两种方法:一种使用共享内存(适用于简单值,数组或ctypes)或Manager代理,其中一个进程保存内存并且管理器仲裁其他进程对它的访问(甚至通过网络)。

Manager方法可以用于任意 Python 对象,但由于对象需要序列化/反序列化并在进程之间发送,因此速度会比使用共享内存的方法慢。

Python 中有大量的并行处理库和方法。multiprocessing是一个优秀且全面的库,但如果您有特殊需求,也许其他方法可能会更好。

解决方案 2:

这是Ray的预期用例,Ray 是一个用于并行和分布式 Python 的库。在底层,它使用Apache Arrow数据布局(零拷贝格式)序列化对象,并将它们存储在共享内存对象存储中,以便多个进程可以访问它们而无需创建副本。

代码如下所示。

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

如果您不调用ray.put,那么数组仍将存储在共享内存中,但每次调用时只会执行一次func,这不是您想要的。

请注意,这不仅适用于数组,也适用于包含数组的对象,例如,将整数映射到数组的字典,如下所示。

您可以通过在 IPython 中运行以下命令来比较 Ray 和 pickle 中的序列化性能。

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

使用 Ray 进行序列化仅比 pickle 稍快,但由于使用共享内存,反序列化速度提高了 1000 倍(这个数字当然取决于对象)。

请参阅Ray 文档。您可以阅读有关使用 Ray 和 Arrow 进行快速序列化的更多信息。注意我是 Ray 开发人员之一。

解决方案 3:

我遇到了同样的问题,并编写了一个小共享内存实用程序类来解决它。

我正在使用multiprocessing.RawArray(lockfree),并且对数组的访问根本不同步(lockfree),小心不要搬起石头砸自己的脚。

采用该解决方案,我在四核 i7 上获得了约 3 倍的速度提升。

这是代码:请随意使用和改进它,并请报告任何错误。

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

解决方案 4:

正如 Robert Nishihara 所提到的,Apache Arrow 使这变得简单,特别是通过 Plasma 内存对象存储,Ray 就是基于此构建的。

我专门为此制作了brain-plasmapickle - 快速加载和重新加载 Flask 应用中的大对象。它是 Apache Arrow 可序列化对象的共享内存对象命名空间,包括由 生成的 'd 字节串pickle.dumps(...)

Apache Ray 和 Plasma 的主要区别在于它会为您跟踪对象 ID。任何在本地运行的进程、线程或程序都可以通过从任何Brain对象调用名称来共享变量的值。

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/')

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用