多处理中的共享内存
- 2025-01-09 08:46:00
- admin 原创
- 17
问题描述:
我有三个大列表。第一个包含位数组(模块位数组 0.8.0),其他两个包含整数数组。
l1=[bitarray 1, bitarray 2, ... ,bitarray n]
l2=[array 1, array 2, ... , array n]
l3=[array 1, array 2, ... , array n]
这些数据结构占用了相当多的 RAM(总共约 16GB)。
如果我使用以下命令启动 12 个子进程:
multiprocessing.Process(target=someFunction, args=(l1,l2,l3))
这是否意味着每个子进程都会复制 l1、l2 和 l3,还是子进程会共享这些列表?或者更直接地说,我会使用 16GB 还是 192GB 的 RAM?
someFunction 会从这些列表中读取一些值,然后根据读取的值执行一些计算。结果将返回给父进程。列表 l1、l2 和 l3 不会被 someFunction 修改。
因此,我假设子进程不需要也不会复制这些巨大的列表,而只是与父进程共享它们。这意味着由于 Linux 下的写时复制方法,该程序将占用 16GB 的 RAM(无论我启动了多少个子进程)?我是否正确,或者我是否遗漏了导致列表被复制的某些内容?
编辑:在阅读了更多关于该主题的内容后,我仍然感到困惑。一方面,Linux 使用写时复制,这意味着不会复制任何数据。另一方面,访问对象将改变其引用计数(我仍然不确定为什么以及这意味着什么)。即便如此,整个对象都会被复制吗?
例如如果我定义 someFunction 如下:
def someFunction(list1, list2, list3):
i=random.randint(0,99999)
print list1[i], list2[i], list3[i]
使用此功能是否意味着 l1、l2 和 l3 将为每个子流程完全复制?
有没有什么办法可以检查这个?
EDIT2在阅读更多内容并监控子进程运行时系统的总内存使用情况后,似乎确实为每个子进程复制了整个对象。这似乎是因为引用计数。
在我的程序中,l1、l2 和 l3 的引用计数实际上是不必要的。这是因为 l1、l2 和 l3 将保留在内存中(不变),直到父进程退出。在此之前,无需释放这些列表使用的内存。事实上,我确信引用计数将保持在 0 以上(对于这些列表以及这些列表中的每个对象),直到程序退出。
那么现在的问题是,我如何确保对象不会被复制到每个子进程?我是否可以禁用这些列表以及这些列表中的每个对象的引用计数?
EDIT3只是补充说明。子流程不需要修改l1
、l2
和l3
或这些列表中的任何对象。子流程只需要能够引用其中一些对象,而不会导致每个子流程都复制内存。
解决方案 1:
因为这在谷歌上仍然是一个非常高的结果,并且没有其他人提到它,所以我想我会提到在 python 版本 3.8.0 中引入的“真正”共享内存的新可能性:https ://docs.python.org/3/library/multiprocessing.shared_memory.html
我在这里提供了一个使用 numpy 数组的小型示例(在 Linux 上测试),这可能是一个非常常见的用例:
# one dimension of the 2d array which is shared
dim = 5000
import numpy as np
from multiprocessing import shared_memory, Process, Lock
from multiprocessing import cpu_count, current_process
import time
lock = Lock()
def add_one(shr_name):
existing_shm = shared_memory.SharedMemory(name=shr_name)
np_array = np.ndarray((dim, dim,), dtype=np.int64, buffer=existing_shm.buf)
lock.acquire()
np_array[:] = np_array[0] + 1
lock.release()
time.sleep(10) # pause, to see the memory usage in top
print('added one')
existing_shm.close()
def create_shared_block():
a = np.ones(shape=(dim, dim), dtype=np.int64) # Start with an existing NumPy array
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
# # Now create a NumPy array backed by shared memory
np_array = np.ndarray(a.shape, dtype=np.int64, buffer=shm.buf)
np_array[:] = a[:] # Copy the original data into shared memory
return shm, np_array
if current_process().name == "MainProcess":
print("creating shared block")
shr, np_array = create_shared_block()
processes = []
for i in range(cpu_count()):
_process = Process(target=add_one, args=(shr.name,))
processes.append(_process)
_process.start()
for _process in processes:
_process.join()
print("Final array")
print(np_array[:10])
print(np_array[10:])
shr.close()
shr.unlink()
请注意,由于 64 位整数,此代码可能需要大约 1GB 的 RAM 才能运行,因此请确保使用它不会冻结系统。^_^
解决方案 2:
一般来说,共享同一份数据的方式有两种:
多线程
共享内存
Python 的多线程不适合 CPU 密集型任务(因为有 GIL),因此在这种情况下通常的解决方案是继续。但是,使用此解决方案,您需要使用和multiprocessing
明确共享数据。multiprocessing.Value
`multiprocessing.Array`
请注意,由于所有同步问题,通常在进程之间共享数据可能不是最佳选择;涉及参与者交换消息的方法通常被视为更好的选择。另请参阅Python 文档:
如上所述,进行并发编程时,最好尽可能避免使用共享状态。使用多个进程时尤其如此。
但是,如果您确实需要使用一些共享数据,那么多处理提供了几种方法。
就您而言,您需要包装l1
,l2
并l3
以某种可以理解的方式multiprocessing
(例如使用multiprocessing.Array
),然后将它们作为参数传递。
还要注意,正如您所说,您不需要写访问权限,那么您应该lock=False
在创建对象时传递,否则所有访问仍将被序列化。
解决方案 3:
对于那些有兴趣使用 Python3.8 的shared_memory模块的人来说,它仍然有一个错误(github 问题链接在这里),尚未修复,并且现在(2021-01-15)正在影响 Python3.8/3.9/3.10。该错误影响 posix 系统,并且是关于资源跟踪器在其他进程仍应具有有效访问权限时破坏共享内存段。因此,如果您在代码中使用它,请小心。
解决方案 4:
如果您想使用写时复制功能,并且您的数据是静态的(在子进程中不变),那么您应该让 python 不要弄乱数据所在的内存块。您可以使用 C 或 C++ 结构(例如 stl)作为容器轻松做到这一点,并提供您自己的 python 包装器,当创建 python 级对象(如果有)时,它将使用指向数据内存的指针(或可能复制数据内存)。所有这些都可以通过 cython 的几乎 python 简单性和语法非常轻松地完成。
# 伪 cython
cdef 类 FooContainer:
cdef 字符 * 数据
def __cinit__(self,char * foo_value):
self.data = malloc(1024,sizeof(char))
memcpy(self.data,foo_value,min(1024,len(foo_value)))
定义获取(自身):
返回自身数据
# python 部分
从 foo 导入 FooContainer
f = FooContainer(“你好,世界”)
进程标识 = fork()
如果不是 pid:
f.get() #此调用将读取相同的内存页面
# 父进程写入了 1024 个字符的 self.data
# cython 将自动创建一个新的 python 字符串
# 从中取出对象并返回给调用者
上面的伪代码写得很糟糕。不要使用它。在你的情况下,应该用 C 或 C++ 容器代替 self.data。
解决方案 5:
您可以使用 memcached 或 redis 并将每个设置为键值对 {'l1'...
解决方案 6:
我根据 @rboreal-frippery 的回答编写了自己的代码。它提供了如何使用共享内存Process
以及如何使用共享内存Pool
(pool.map) 的示例。
import multiprocessing
# one dimension of the 2d array which is shared
DIM = 5000
import numpy as np
from multiprocessing import shared_memory, Process, Lock
from multiprocessing import cpu_count, current_process
import time
def add_one_v1(shr_name, lock):
existing_shm = shared_memory.SharedMemory(name=shr_name)
np_array = np.ndarray((DIM, DIM,), dtype=np.int64, buffer=existing_shm.buf)
print('before one', np_array.sum())
lock.acquire()
np_array[:] = np_array[:] + 1
lock.release()
time.sleep(1)
print('after one', np_array.sum())
existing_shm.close()
def add_one_v2(shr_name):
existing_shm = shared_memory.SharedMemory(name=shr_name)
np_array = np.ndarray((DIM, DIM,), dtype=np.int64, buffer=existing_shm.buf)
print('before one', np_array.sum())
lock.acquire()
np_array[:] = np_array[:] + 1
lock.release()
time.sleep(1)
print('after one', np_array.sum())
existing_shm.close()
def create_shared_block(a):
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
# # Now create a NumPy array backed by shared memory
np_array = np.ndarray(a.shape, dtype=np.int64, buffer=shm.buf)
np_array[:] = a[:] # Copy the original data into shared memory
return shm, np_array
def example_with_Process():
lock = Lock()
matrix = np.ones(shape=(DIM, DIM), dtype=np.int64) # Start with an existing NumPy array
print("creating shared block")
shr, np_array = create_shared_block(matrix)
print('Num of CPUs: {}'.format(cpu_count()))
processes = []
for i in range(cpu_count()):
_process = Process(target=add_one_v1, args=(shr.name, lock))
processes.append(_process)
_process.start()
for _process in processes:
_process.join()
print("Final array (expected values: {})".format(cpu_count() + 1))
print(np_array[:10])
print(np_array[10:])
shr.close()
shr.unlink()
def init_pool_processes(the_lock):
'''Initialize each process with a global variable lock.
'''
global lock
lock = the_lock
def example_with_Pool():
lock = Lock()
matrix = np.ones(shape=(DIM, DIM), dtype=np.int64) # Start with an existing NumPy array
print("creating shared block")
shr, np_array = create_shared_block(matrix)
print('Num of CPUs: {}'.format(cpu_count()))
pool = multiprocessing.Pool(
processes=cpu_count(),
initializer=init_pool_processes,
initargs=(lock,)
)
data_array = []
for i in range(50):
data_array.append(shr.name)
pool.map(add_one_v2, data_array)
print("Final array (expected values: {})".format(50 + 1))
print(np_array[:10])
print(np_array[10:])
shr.close()
shr.unlink()
if __name__ == '__main__':
example_with_Process()
example_with_Pool()
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)
- 项目管理必备:盘点2024年13款好用的项目管理软件