共享的只读数据是否复制到不同的进程以进行多处理?

2025-01-21 09:01:00
admin
原创
90
摘要:问题描述:我的这段代码看起来有点像这样:glbl_array = # a 3 Gb array def my_func( args, def_param = glbl_array): #do stuff on args and def_param if __name__ == '__main__'...

问题描述:

我的这段代码看起来有点像这样:

glbl_array = # a 3 Gb array

def my_func( args, def_param = glbl_array):
    #do stuff on args and def_param

if __name__ == '__main__':
  pool = Pool(processes=4)
  pool.map(my_func, range(1000))

有没有办法确保(或鼓励)不同的进程不会获得 glbl_array 的副本,而是共享它。如果没有办法停止复制,我将使用 memmapped 数组,但我的访问模式不是很规律,所以我预计 memmapped 数组会更慢。以上似乎是第一个要尝试的事情。这是在 Linux 上。我只是想从 Stackoverflow 获得一些建议,不想惹恼系统管理员。你认为如果第二个参数是一个真正的不可变对象,比如 ,这会有帮助吗glbl_array.tostring()


解决方案 1:

您可以相当轻松地将共享内存multiprocessing与 Numpy 一起使用:

import multiprocessing
import ctypes
import numpy as np

shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)

#-- edited 2015-05-01: the assert check below checks the wrong thing
#   with recent versions of Numpy/multiprocessing. That no copy is made
#   is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()

# Parallel processing
def my_func(i, def_param=shared_array):
    shared_array[i,:] = i

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(my_func, range(10))

    print shared_array

打印

[[ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.]
 [ 2.  2.  2.  2.  2.  2.  2.  2.  2.  2.]
 [ 3.  3.  3.  3.  3.  3.  3.  3.  3.  3.]
 [ 4.  4.  4.  4.  4.  4.  4.  4.  4.  4.]
 [ 5.  5.  5.  5.  5.  5.  5.  5.  5.  5.]
 [ 6.  6.  6.  6.  6.  6.  6.  6.  6.  6.]
 [ 7.  7.  7.  7.  7.  7.  7.  7.  7.  7.]
 [ 8.  8.  8.  8.  8.  8.  8.  8.  8.  8.]
 [ 9.  9.  9.  9.  9.  9.  9.  9.  9.  9.]]

然而,Linux 对 具有写时复制语义fork(),因此即使不使用multiprocessing.Array,数据也不会被复制,除非它被写入。

解决方案 2:

以下代码适用于 Win7 和 Mac(可能适用于 Linux,但未经测试)。

import multiprocessing
import ctypes
import numpy as np

#-- edited 2015-05-01: the assert check below checks the wrong thing
#   with recent versions of Numpy/multiprocessing. That no copy is made
#   is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()

shared_array = None

def init(shared_array_base):
    global shared_array
    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
    shared_array = shared_array.reshape(10, 10)

# Parallel processing
def my_func(i):
    shared_array[i, :] = i

if __name__ == '__main__':
    shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)

    pool = multiprocessing.Pool(processes=4, initializer=init, initargs=(shared_array_base,))
    pool.map(my_func, range(10))

    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
    shared_array = shared_array.reshape(10, 10)
    print shared_array

解决方案 3:

对于那些坚持使用不支持的 Windows 的用户fork()(除非使用 CygWin),pv 的答案不起作用。全局变量不适用于子进程。

Pool相反,您必须在/的初始化过程中传递共享内存,Process如下所示:

#! /usr/bin/python

import time

from multiprocessing import Process, Queue, Array

def f(q,a):
    m = q.get()
    print m
    print a[0], a[1], a[2]
    m = q.get()
    print m
    print a[0], a[1], a[2]

if __name__ == '__main__':
    a = Array('B', (1, 2, 3), lock=False)
    q = Queue()
    p = Process(target=f, args=(q,a))
    p.start()
    q.put([1, 2, 3])
    time.sleep(1)
    a[0:3] = (4, 5, 6)
    q.put([4, 5, 6])
    p.join()

(它不是 numpy,也不是好的代码,但它说明了这一点 ;-)

解决方案 4:

如果您正在寻找一种能够在 Windows 上高效运行的选项,并且适用于不规则访问模式、分支和其他可能需要根据共享内存矩阵和进程本地数据的组合来分析不同矩阵的场景,那么ParallelRegression包中的 mathDict 工具包就是为处理这种确切情况而设计的。

解决方案 5:

我知道,我正在回答一个非常老的问题。但这个主题在 Windows 操作系统中不起作用。上述答案具有误导性,没有提供实质性证据。所以我尝试了以下代码。

# -*- coding: utf-8 -*-
from __future__ import annotations
import ctypes
import itertools
import multiprocessing
import os
import time
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import numpy.typing as npt


shared_np_array_for_subprocess: npt.NDArray[np.double]


def init_processing(shared_raw_array_obj: ctypes.Array[ctypes.c_double]):
    global shared_np_array_for_subprocess
    #shared_np_array_for_subprocess = np.frombuffer(shared_raw_array_obj, dtype=np.double)
    shared_np_array_for_subprocess = np.ctypeslib.as_array(shared_raw_array_obj)


def do_processing(i: int) -> int:
    print("
--------------->>>>>>")
    print(f"[P{i}] input is {i} in process id {os.getpid()}")
    print(f"[P{i}] 0th element via np access: ", shared_np_array_for_subprocess[0])
    print(f"[P{i}] 1st element via np access: ", shared_np_array_for_subprocess[1])
    print(f"[P{i}] NP array's base memory is: ", shared_np_array_for_subprocess.base)
    np_array_addr, _ = shared_np_array_for_subprocess.__array_interface__["data"]
    print(f"[P{i}] NP array obj pointing memory address is: ", hex(np_array_addr))
    print("
--------------->>>>>>")
    time.sleep(3.0)
    return i


if __name__ == "__main__":
    shared_raw_array_obj: ctypes.Array[ctypes.c_double] = multiprocessing.RawArray(ctypes.c_double, 128)  # 8B * 1MB = 8MB
    # This array is malloced, 0 filled.
    print("Shared Allocated Raw array: ", shared_raw_array_obj)
    shared_raw_array_ptr = ctypes.addressof(shared_raw_array_obj)
    print("Shared Raw Array memory address: ", hex(shared_raw_array_ptr))

    # Assign data
    print("Assign 0, 1 element data in Shared Raw array.")
    shared_raw_array_obj[0] = 10.2346
    shared_raw_array_obj[1] = 11.9876

    print("0th element via ptr access: ", (ctypes.c_double).from_address(shared_raw_array_ptr).value)
    print("1st element via ptr access: ", (ctypes.c_double).from_address(shared_raw_array_ptr + ctypes.sizeof(ctypes.c_double)).value)

    print("Create NP array from the Shared Raw array memory")
    shared_np_array: npt.NDArray[np.double] = np.frombuffer(shared_raw_array_obj, dtype=np.double)

    print("0th element via np access: ", shared_np_array[0])
    print("1st element via np access: ", shared_np_array[1])

    print("NP array's base memory is: ", shared_np_array.base)
    np_array_addr, _ = shared_np_array.__array_interface__["data"]
    print("NP array obj pointing memory address is: ", hex(np_array_addr))

    print("NP array , Raw array points to same memory , No copies? : ", np_array_addr == shared_raw_array_ptr)

    print("Now that we have native memory based NP array , Send for multi processing.")

    # results = []
    with ProcessPoolExecutor(max_workers=4, initializer=init_processing, initargs=(shared_raw_array_obj,)) as process_executor:
        results = process_executor.map(do_processing, range(0, 2))

    print("All jobs sumitted.")
    for result in results:
        print(result)

    print("Main process is going to shutdown.")
    exit(0)

这是示例输出

Shared Allocated Raw array:  <multiprocessing.sharedctypes.c_double_Array_128 object at 0x000001B8042A9E40>
Shared Raw Array memory address:  0x1b804300000
Assign 0, 1 element data in Shared Raw array.
0th element via ptr access:  10.2346
1st element via ptr access:  11.9876
Create NP array from the Shared Raw array memory
0th element via np access:  10.2346
1st element via np access:  11.9876
NP array's base memory is:  <multiprocessing.sharedctypes.c_double_Array_128 object at 0x000001B8042A9E40>
NP array obj pointing memory address is:  0x1b804300000
NP array , Raw array points to same memory , No copies? :  True
Now that we have native memory based NP array , Send for multi processing.

--------------->>>>>>
[P0] input is 0 in process id 21852
[P0] 0th element via np access:  10.2346
[P0] 1st element via np access:  11.9876
[P0] NP array's base memory is:  <memory at 0x0000021C7ACAFF40>
[P0] NP array obj pointing memory address is:  0x21c7ad60000

--------------->>>>>>

--------------->>>>>>
[P1] input is 1 in process id 11232
[P1] 0th element via np access:  10.2346
[P1] 1st element via np access:  11.9876
[P1] NP array's base memory is:  <memory at 0x0000022C7FF3FF40>
[P1] NP array obj pointing memory address is:  0x22c7fff0000

--------------->>>>>>
All jobs sumitted.
0
1
Main process is going to shutdown.

以上输出来自以下环境:

OS: Windows 10 20H2
Python: Python 3.9.9 (tags/v3.9.9:ccb0e6a, Nov 15 2021, 18:08:50) [MSC v.1929 64 bit (AMD64)]

您可以清楚地看到,numpy 指向的内存数组对于每个子进程都是不同的,这意味着进行了内存复制。因此在 Windows 操作系统中,子进程不共享底层内存。我确实认为,这是由于操作系统保护,进程不能引用内存中的任意指针地址,这将导致内存访问冲突。

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用