共享的只读数据是否复制到不同的进程以进行多处理?
- 2025-01-21 09:01:00
- admin 原创
- 90
问题描述:
我的这段代码看起来有点像这样:
glbl_array = # a 3 Gb array
def my_func( args, def_param = glbl_array):
#do stuff on args and def_param
if __name__ == '__main__':
pool = Pool(processes=4)
pool.map(my_func, range(1000))
有没有办法确保(或鼓励)不同的进程不会获得 glbl_array 的副本,而是共享它。如果没有办法停止复制,我将使用 memmapped 数组,但我的访问模式不是很规律,所以我预计 memmapped 数组会更慢。以上似乎是第一个要尝试的事情。这是在 Linux 上。我只是想从 Stackoverflow 获得一些建议,不想惹恼系统管理员。你认为如果第二个参数是一个真正的不可变对象,比如 ,这会有帮助吗glbl_array.tostring()
?
解决方案 1:
您可以相当轻松地将共享内存multiprocessing
与 Numpy 一起使用:
import multiprocessing
import ctypes
import numpy as np
shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)
#-- edited 2015-05-01: the assert check below checks the wrong thing
# with recent versions of Numpy/multiprocessing. That no copy is made
# is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()
# Parallel processing
def my_func(i, def_param=shared_array):
shared_array[i,:] = i
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
pool.map(my_func, range(10))
print shared_array
打印
[[ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[ 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[ 2. 2. 2. 2. 2. 2. 2. 2. 2. 2.]
[ 3. 3. 3. 3. 3. 3. 3. 3. 3. 3.]
[ 4. 4. 4. 4. 4. 4. 4. 4. 4. 4.]
[ 5. 5. 5. 5. 5. 5. 5. 5. 5. 5.]
[ 6. 6. 6. 6. 6. 6. 6. 6. 6. 6.]
[ 7. 7. 7. 7. 7. 7. 7. 7. 7. 7.]
[ 8. 8. 8. 8. 8. 8. 8. 8. 8. 8.]
[ 9. 9. 9. 9. 9. 9. 9. 9. 9. 9.]]
然而,Linux 对 具有写时复制语义fork()
,因此即使不使用multiprocessing.Array
,数据也不会被复制,除非它被写入。
解决方案 2:
以下代码适用于 Win7 和 Mac(可能适用于 Linux,但未经测试)。
import multiprocessing
import ctypes
import numpy as np
#-- edited 2015-05-01: the assert check below checks the wrong thing
# with recent versions of Numpy/multiprocessing. That no copy is made
# is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()
shared_array = None
def init(shared_array_base):
global shared_array
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)
# Parallel processing
def my_func(i):
shared_array[i, :] = i
if __name__ == '__main__':
shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)
pool = multiprocessing.Pool(processes=4, initializer=init, initargs=(shared_array_base,))
pool.map(my_func, range(10))
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)
print shared_array
解决方案 3:
对于那些坚持使用不支持的 Windows 的用户fork()
(除非使用 CygWin),pv 的答案不起作用。全局变量不适用于子进程。
Pool
相反,您必须在/的初始化过程中传递共享内存,Process
如下所示:
#! /usr/bin/python
import time
from multiprocessing import Process, Queue, Array
def f(q,a):
m = q.get()
print m
print a[0], a[1], a[2]
m = q.get()
print m
print a[0], a[1], a[2]
if __name__ == '__main__':
a = Array('B', (1, 2, 3), lock=False)
q = Queue()
p = Process(target=f, args=(q,a))
p.start()
q.put([1, 2, 3])
time.sleep(1)
a[0:3] = (4, 5, 6)
q.put([4, 5, 6])
p.join()
(它不是 numpy,也不是好的代码,但它说明了这一点 ;-)
解决方案 4:
如果您正在寻找一种能够在 Windows 上高效运行的选项,并且适用于不规则访问模式、分支和其他可能需要根据共享内存矩阵和进程本地数据的组合来分析不同矩阵的场景,那么ParallelRegression包中的 mathDict 工具包就是为处理这种确切情况而设计的。
解决方案 5:
我知道,我正在回答一个非常老的问题。但这个主题在 Windows 操作系统中不起作用。上述答案具有误导性,没有提供实质性证据。所以我尝试了以下代码。
# -*- coding: utf-8 -*-
from __future__ import annotations
import ctypes
import itertools
import multiprocessing
import os
import time
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import numpy.typing as npt
shared_np_array_for_subprocess: npt.NDArray[np.double]
def init_processing(shared_raw_array_obj: ctypes.Array[ctypes.c_double]):
global shared_np_array_for_subprocess
#shared_np_array_for_subprocess = np.frombuffer(shared_raw_array_obj, dtype=np.double)
shared_np_array_for_subprocess = np.ctypeslib.as_array(shared_raw_array_obj)
def do_processing(i: int) -> int:
print("
--------------->>>>>>")
print(f"[P{i}] input is {i} in process id {os.getpid()}")
print(f"[P{i}] 0th element via np access: ", shared_np_array_for_subprocess[0])
print(f"[P{i}] 1st element via np access: ", shared_np_array_for_subprocess[1])
print(f"[P{i}] NP array's base memory is: ", shared_np_array_for_subprocess.base)
np_array_addr, _ = shared_np_array_for_subprocess.__array_interface__["data"]
print(f"[P{i}] NP array obj pointing memory address is: ", hex(np_array_addr))
print("
--------------->>>>>>")
time.sleep(3.0)
return i
if __name__ == "__main__":
shared_raw_array_obj: ctypes.Array[ctypes.c_double] = multiprocessing.RawArray(ctypes.c_double, 128) # 8B * 1MB = 8MB
# This array is malloced, 0 filled.
print("Shared Allocated Raw array: ", shared_raw_array_obj)
shared_raw_array_ptr = ctypes.addressof(shared_raw_array_obj)
print("Shared Raw Array memory address: ", hex(shared_raw_array_ptr))
# Assign data
print("Assign 0, 1 element data in Shared Raw array.")
shared_raw_array_obj[0] = 10.2346
shared_raw_array_obj[1] = 11.9876
print("0th element via ptr access: ", (ctypes.c_double).from_address(shared_raw_array_ptr).value)
print("1st element via ptr access: ", (ctypes.c_double).from_address(shared_raw_array_ptr + ctypes.sizeof(ctypes.c_double)).value)
print("Create NP array from the Shared Raw array memory")
shared_np_array: npt.NDArray[np.double] = np.frombuffer(shared_raw_array_obj, dtype=np.double)
print("0th element via np access: ", shared_np_array[0])
print("1st element via np access: ", shared_np_array[1])
print("NP array's base memory is: ", shared_np_array.base)
np_array_addr, _ = shared_np_array.__array_interface__["data"]
print("NP array obj pointing memory address is: ", hex(np_array_addr))
print("NP array , Raw array points to same memory , No copies? : ", np_array_addr == shared_raw_array_ptr)
print("Now that we have native memory based NP array , Send for multi processing.")
# results = []
with ProcessPoolExecutor(max_workers=4, initializer=init_processing, initargs=(shared_raw_array_obj,)) as process_executor:
results = process_executor.map(do_processing, range(0, 2))
print("All jobs sumitted.")
for result in results:
print(result)
print("Main process is going to shutdown.")
exit(0)
这是示例输出
Shared Allocated Raw array: <multiprocessing.sharedctypes.c_double_Array_128 object at 0x000001B8042A9E40>
Shared Raw Array memory address: 0x1b804300000
Assign 0, 1 element data in Shared Raw array.
0th element via ptr access: 10.2346
1st element via ptr access: 11.9876
Create NP array from the Shared Raw array memory
0th element via np access: 10.2346
1st element via np access: 11.9876
NP array's base memory is: <multiprocessing.sharedctypes.c_double_Array_128 object at 0x000001B8042A9E40>
NP array obj pointing memory address is: 0x1b804300000
NP array , Raw array points to same memory , No copies? : True
Now that we have native memory based NP array , Send for multi processing.
--------------->>>>>>
[P0] input is 0 in process id 21852
[P0] 0th element via np access: 10.2346
[P0] 1st element via np access: 11.9876
[P0] NP array's base memory is: <memory at 0x0000021C7ACAFF40>
[P0] NP array obj pointing memory address is: 0x21c7ad60000
--------------->>>>>>
--------------->>>>>>
[P1] input is 1 in process id 11232
[P1] 0th element via np access: 10.2346
[P1] 1st element via np access: 11.9876
[P1] NP array's base memory is: <memory at 0x0000022C7FF3FF40>
[P1] NP array obj pointing memory address is: 0x22c7fff0000
--------------->>>>>>
All jobs sumitted.
0
1
Main process is going to shutdown.
以上输出来自以下环境:
OS: Windows 10 20H2
Python: Python 3.9.9 (tags/v3.9.9:ccb0e6a, Nov 15 2021, 18:08:50) [MSC v.1929 64 bit (AMD64)]
您可以清楚地看到,numpy 指向的内存数组对于每个子进程都是不同的,这意味着进行了内存复制。因此在 Windows 操作系统中,子进程不共享底层内存。我确实认为,这是由于操作系统保护,进程不能引用内存中的任意指针地址,这将导致内存访问冲突。