多处理中的共享内存

2025-01-09 08:46:00
admin
原创
17
摘要:问题描述:我有三个大列表。第一个包含位数组(模块位数组 0.8.0),其他两个包含整数数组。l1=[bitarray 1, bitarray 2, ... ,bitarray n] l2=[array 1, array 2, ... , array n] l3=[array 1, array 2, ... ,...

问题描述:

我有三个大列表。第一个包含位数组(模块位数组 0.8.0),其他两个包含整数数组。

l1=[bitarray 1, bitarray 2, ... ,bitarray n]
l2=[array 1, array 2, ... , array n]
l3=[array 1, array 2, ... , array n]

这些数据结构占用了相当多的 RAM(总共约 16GB)。

如果我使用以下命令启动 12 个子进程:

multiprocessing.Process(target=someFunction, args=(l1,l2,l3))

这是否意味着每个子进程都会复制 l1、l2 和 l3,还是子进程会共享这些列表?或者更直接地说,我会使用 16GB 还是 192GB 的 RAM?

someFunction 会从这些列表中读取一些值,然后根据读取的值执行一些计算。结果将返回给父进程。列表 l1、l2 和 l3 不会被 someFunction 修改。

因此,我假设子进程不需要也不会复制这些巨大的列表,而只是与父进程共享它们。这意味着由于 Linux 下的写时复制方法,该程序将占用 16GB 的 RAM(无论我启动了多少个子进程)?我是否正确,或者我是否遗漏了导致列表被复制的某些内容?

编辑:在阅读了更多关于该主题的内容后,我仍然感到困惑。一方面,Linux 使用写时复制,这意味着不会复制任何数据。另一方面,访问对象将改变其引用计数(我仍然不确定为什么以及这意味着什么)。即便如此,整个对象都会被复制吗?

例如如果我定义 someFunction 如下:

def someFunction(list1, list2, list3):
    i=random.randint(0,99999)
    print list1[i], list2[i], list3[i]

使用此功能是否意味着 l1、l2 和 l3 将为每个子流程完全复制?

有没有什么办法可以检查这个?

EDIT2在阅读更多内容并监控子进程运行时系统的总内存使用情况后,似乎确实为每个子进程复制了整个对象。这似乎是因为引用计数。

在我的程序中,l1、l2 和 l3 的引用计数实际上是不必要的。这是因为 l1、l2 和 l3 将保留在内存中(不变),直到父进程退出。在此之前,无需释放这些列表使用的内存。事实上,我确信引用计数将保持在 0 以上(对于这些列表以及这些列表中的每个对象),直到程序退出。

那么现在的问题是,我如何确保对象不会被复制到每个子进程?我是否可以禁用这些列表以及这些列表中的每个对象的引用计数?

EDIT3只是补充说明。子流程不需要修改l1l2l3或这些列表中的任何对象。子流程只需要能够引用其中一些对象,而不会导致每个子流程都复制内存。


解决方案 1:

因为这在谷歌上仍然是一个非常高的结果,并且没有其他人提到它,所以我想我会提到在 python 版本 3.8.0 中引入的“真正”共享内存的新可能性:https ://docs.python.org/3/library/multiprocessing.shared_memory.html

我在这里提供了一个使用 numpy 数组的小型示例(在 Linux 上测试),这可能是一个非常常见的用例:

# one dimension of the 2d array which is shared
dim = 5000

import numpy as np
from multiprocessing import shared_memory, Process, Lock
from multiprocessing import cpu_count, current_process
import time

lock = Lock()

def add_one(shr_name):

    existing_shm = shared_memory.SharedMemory(name=shr_name)
    np_array = np.ndarray((dim, dim,), dtype=np.int64, buffer=existing_shm.buf)
    lock.acquire()
    np_array[:] = np_array[0] + 1
    lock.release()
    time.sleep(10) # pause, to see the memory usage in top
    print('added one')
    existing_shm.close()

def create_shared_block():

    a = np.ones(shape=(dim, dim), dtype=np.int64)  # Start with an existing NumPy array

    shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
    # # Now create a NumPy array backed by shared memory
    np_array = np.ndarray(a.shape, dtype=np.int64, buffer=shm.buf)
    np_array[:] = a[:]  # Copy the original data into shared memory
    return shm, np_array

if current_process().name == "MainProcess":
    print("creating shared block")
    shr, np_array = create_shared_block()

    processes = []
    for i in range(cpu_count()):
        _process = Process(target=add_one, args=(shr.name,))
        processes.append(_process)
        _process.start()

    for _process in processes:
        _process.join()

    print("Final array")
    print(np_array[:10])
    print(np_array[10:])

    shr.close()
    shr.unlink()

请注意,由于 64 位整数,此代码可能需要大约 1GB 的 RAM 才能运行,因此请确保使用它不会冻结系统。^_^

解决方案 2:

一般来说,共享同一份数据的方式有两种:

  • 多线程

  • 共享内存

Python 的多线程不适合 CPU 密集型任务(因为有 GIL),因此在这种情况下通常的解决方案是继续。但是,使用此解决方案,您需要使用和multiprocessing明确共享数据。multiprocessing.Value`multiprocessing.Array`

请注意,由于所有同步问题,通常在进程之间共享数据可能不是最佳选择;涉及参与者交换消息的方法通常被视为更好的选择。另请参阅Python 文档:

如上所述,进行并发编程时,最好尽可能避免使用共享状态。使用多个进程时尤其如此。

但是,如果您确实需要使用一些共享数据,那么多处理提供了几种方法。

就您而言,您需要包装l1l2l3以某种可以理解的方式multiprocessing(例如使用multiprocessing.Array),然后将它们作为参数传递。

还要注意,正如您所说,您不需要写访问权限,那么您应该lock=False在创建对象时传递,否则所有访问仍将被序列化。

解决方案 3:

对于那些有兴趣使用 Python3.8 的shared_memory模块的人来说,它仍然有一个错误(github 问题链接在这里),尚未修复,并且现在(2021-01-15)正在影响 Python3.8/3.9/3.10。该错误影响 posix 系统,并且是关于资源跟踪器在其他进程仍应具有有效访问权限时破坏共享内存段。因此,如果您在代码中使用它,请小心。

解决方案 4:

如果您想使用写时复制功能,并且您的数据是静态的(在子进程中不变),那么您应该让 python 不要弄乱数据所在的内存块。您可以使用 C 或 C++ 结构(例如 stl)作为容器轻松做到这一点,并提供您自己的 python 包装器,当创建 python 级对象(如果有)时,它将使用指向数据内存的指针(或可能复制数据内存)。所有这些都可以通过 cython 的几乎 python 简单性和语法非常轻松地完成。

# 伪 cython
cdef 类 FooContainer:
   cdef 字符 * 数据
   def __cinit__(self,char * foo_value):
       self.data = malloc(1024,sizeof(char))
       memcpy(self.data,foo_value,min(1024,len(foo_value)))
   
   定义获取(自身):
       返回自身数据

# python 部分
从 foo 导入 FooContainer

f = FooContainer(“你好,世界”)
进程标识 = fork()
如果不是 pid:
   f.get() #此调用将读取相同的内存页面
           # 父进程写入了 1024 个字符的 self.data
           # cython 将自动创建一个新的 python 字符串
           # 从中取出对象并返回给调用者

上面的伪代码写得很糟糕。不要使用它。在你的情况下,应该用 C 或 C++ 容器代替 self.data。

解决方案 5:

您可以使用 memcached 或 redis 并将每个设置为键值对 {'l1'...

解决方案 6:

我根据 @rboreal-frippery 的回答编写了自己的代码。它提供了如何使用共享内存Process以及如何使用共享内存Pool(pool.map) 的示例。

import multiprocessing

# one dimension of the 2d array which is shared
DIM = 5000

import numpy as np
from multiprocessing import shared_memory, Process, Lock
from multiprocessing import cpu_count, current_process
import time


def add_one_v1(shr_name, lock):
    existing_shm = shared_memory.SharedMemory(name=shr_name)
    np_array = np.ndarray((DIM, DIM,), dtype=np.int64, buffer=existing_shm.buf)
    print('before one', np_array.sum())
    lock.acquire()
    np_array[:] = np_array[:] + 1
    lock.release()
    time.sleep(1)
    print('after one', np_array.sum())
    existing_shm.close()


def add_one_v2(shr_name):
    existing_shm = shared_memory.SharedMemory(name=shr_name)
    np_array = np.ndarray((DIM, DIM,), dtype=np.int64, buffer=existing_shm.buf)
    print('before one', np_array.sum())
    lock.acquire()
    np_array[:] = np_array[:] + 1
    lock.release()
    time.sleep(1)
    print('after one', np_array.sum())
    existing_shm.close()


def create_shared_block(a):
    shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
    # # Now create a NumPy array backed by shared memory
    np_array = np.ndarray(a.shape, dtype=np.int64, buffer=shm.buf)
    np_array[:] = a[:]  # Copy the original data into shared memory
    return shm, np_array


def example_with_Process():
    lock = Lock()
    matrix = np.ones(shape=(DIM, DIM), dtype=np.int64)  # Start with an existing NumPy array
    print("creating shared block")
    shr, np_array = create_shared_block(matrix)

    print('Num of CPUs: {}'.format(cpu_count()))
    processes = []
    for i in range(cpu_count()):
        _process = Process(target=add_one_v1, args=(shr.name, lock))
        processes.append(_process)
        _process.start()

    for _process in processes:
        _process.join()

    print("Final array (expected values: {})".format(cpu_count() + 1))
    print(np_array[:10])
    print(np_array[10:])

    shr.close()
    shr.unlink()


def init_pool_processes(the_lock):
    '''Initialize each process with a global variable lock.
    '''
    global lock
    lock = the_lock


def example_with_Pool():
    lock = Lock()
    matrix = np.ones(shape=(DIM, DIM), dtype=np.int64)  # Start with an existing NumPy array
    print("creating shared block")
    shr, np_array = create_shared_block(matrix)

    print('Num of CPUs: {}'.format(cpu_count()))
    pool = multiprocessing.Pool(
        processes=cpu_count(),
        initializer=init_pool_processes,
        initargs=(lock,)
    )
    data_array = []
    for i in range(50):
        data_array.append(shr.name)
    pool.map(add_one_v2, data_array)

    print("Final array (expected values: {})".format(50 + 1))
    print(np_array[:10])
    print(np_array[10:])

    shr.close()
    shr.unlink()


if __name__ == '__main__':
    example_with_Process()
    example_with_Pool()

相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   984  
  在项目管理领域,CDCP(Certified Data Center Professional)认证评审是一个至关重要的环节,它不仅验证了项目团队的专业能力,还直接关系到项目的成功与否。在这一评审过程中,沟通技巧的运用至关重要。有效的沟通不仅能够确保信息的准确传递,还能增强团队协作,提升评审效率。本文将深入探讨CDCP...
华为IPD流程   0  
  IPD(Integrated Product Development,集成产品开发)是一种以客户需求为核心、跨部门协同的产品开发模式,旨在通过高效的资源整合和流程优化,提升产品开发的成功率和市场竞争力。在IPD培训课程中,掌握关键成功因素是确保团队能够有效实施这一模式的核心。以下将从五个关键成功因素展开讨论,帮助企业和...
IPD项目流程图   0  
  华为IPD(Integrated Product Development,集成产品开发)流程是华为公司在其全球化进程中逐步构建和完善的一套高效产品开发管理体系。这一流程不仅帮助华为在技术创新和产品交付上实现了质的飞跃,还为其在全球市场中赢得了显著的竞争优势。IPD的核心在于通过跨部门协作、阶段性评审和市场需求驱动,确保...
华为IPD   0  
  华为作为全球领先的通信技术解决方案提供商,其成功的背后离不开一套成熟的管理体系——集成产品开发(IPD)。IPD不仅是一种产品开发流程,更是一种系统化的管理思想,它通过跨职能团队的协作、阶段评审机制和市场需求驱动的开发模式,帮助华为在全球市场中脱颖而出。从最初的国内市场到如今的全球化布局,华为的IPD体系在多个领域展现...
IPD管理流程   0  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用