如何用 Python 进行并行编程?

2025-01-22 08:45:00
admin
原创
115
摘要:问题描述:对于 C++,我们可以使用 OpenMP 进行并行编程;但是,OpenMP 不适用于 Python。如果我想并行化 Python 程序的某些部分,该怎么办?代码的结构可以视为:solve1(A) solve2(B) 其中solve1和solve2是两个独立的函数。如何并行运行这类代码而不是顺序运行以...

问题描述:

对于 C++,我们可以使用 OpenMP 进行并行编程;但是,OpenMP 不适用于 Python。如果我想并行化 Python 程序的某些部分,该怎么办?

代码的结构可以视为:

solve1(A)
solve2(B)

其中solve1solve2是两个独立的函数。如何并行运行这类代码而不是顺序运行以减少运行时间?代码为:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break
            
        node1 = partition[0]
        node2 = partition[1]
    
        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

其中setinnersetouter是两个独立函数。这就是我想要并行的地方...


解决方案 1:

您可以使用多处理模块。在这种情况下,我可能会使用处理池:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

这将生成可以为您执行一般工作的进程。由于我们没有通过processes,它将为您机器上的每个 CPU 核心生成一个进程。每个 CPU 核心可以同时执行一个进程。

如果您想将列表映射到单个函数,您可以这样做:

args = [A, B]
results = pool.map(solve1, args)

不要使用线程,因为GIL会锁定对 python 对象的任何操作。

解决方案 2:

使用Ray可以非常优雅地完成此操作。

为了并行化您的示例,您需要用@ray.remote装饰器定义您的函数,然后用调用它们.remote

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

与多处理模块相比,它有许多优点。

  1. 相同的代码可以在多核机器以及机器集群上运行。

  2. 进程通过共享内存和零拷贝序列化有效地共享数据。

  3. 错误信息被很好地传播。

  4. 这些函数调用可以组合在一起,例如,

@ray.remote
def f(x):
    return x + 1

x_id = f.remote(1)
y_id = f.remote(x_id)
z_id = f.remote(y_id)
ray.get(z_id)  # returns 4
  1. 除了远程调用函数之外,还可以将类远程实例化为参与者。

请注意,Ray是我一直在帮助开发的一个框架。

解决方案 3:

解决方案,正如其他人所说,是使用多个进程。然而,哪个框架更合适,取决于许多因素。除了已经提到的,还有charm4py和mpi4py(我是 charm4py 的开发人员)。

除了使用工作池抽象之外,还有一种更有效的方法来实现上述示例。主循环G在 1000 次迭代中反复向工作线程发送相同的参数(包括完整的图)。由于至少一个工作线程将驻留在不同的进程中,因此这涉及复制参数并将其发送到其他进程。根据对象的大小,这可能非常昂贵。相反,让工作线程存储状态并仅发送更新的信息是有意义的。

例如,在 charm4py 中可以这样做:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

请注意,对于此示例,我们实际上只需要一个工作程序。主循环可以执行其中一个函数,并让工作程序执行另一个函数。但我的代码有助于说明以下几点:

  1. 工作器 A 在进程 0 中运行(与主循环相同)。在result_a.get()阻塞等待结果期间,工作器 A 在同一进程中进行计算。

  2. 参数会自动通过引用传递给工作者 A,因为它在同一个进程中(不涉及复制)。

解决方案 4:

CPython 使用全局解释器锁,这使得并行编程比 C++ 更有趣

本主题有几个有用的示例和挑战描述:

在 Linux 上使用任务集在多核系统上解决 Python 全局解释器锁(GIL)问题?

解决方案 5:

您可以使用joblib库进行并行计算和多处理。

from joblib import Parallel, delayed

您可以简单地创建一个foo想要并行运行的函数,并基于下面的代码实现并行处理:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

可以从以下图书馆获取num_coresmultiprocessing

import multiprocessing

num_cores = multiprocessing.cpu_count()

如果您有一个具有多个输入参数的函数,并且您只想通过列表迭代其中一个参数,则可以使用库partial中的函数functools,如下所示:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

您可以在此处找到有关 python 和 R 多处理的完整解释以及几个示例。

解决方案 6:

我总是使用“多处理”本机库来处理 Python 中的并行性。为了控制队列中的进程数,我使用共享变量作为计数器。在下面的示例中,您可以看到简单进程的并行执行是如何工作的。

我对脚本进行了更新,使其更易于使用。基本上,您唯一要做的就是process用要并行运行的函数覆盖该方法。请参阅示例,该过程非常简单。或者,您也可以删除所有执行日志事件。

当我有时间时,我会更新代码以处理返回值的进程。

要求

user@host:~$ pip install coloredlogs==15.0.1

代码

并行处理脚本(复制粘贴)

#!/usr/bin/env python
# encoding: utf-8

from multiprocessing import Manager, Pool, Value, cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Iterator
from datetime import datetime
from logging import Logger
import coloredlogs
import logging
import time
import sys
import os


LOG_LEVEL = "DEBUG"


def get_logger(name: str = __name__, level: str = LOG_LEVEL) -> Logger:
    assert level in ("NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")

    # Setting-up the script logging:
    logging.basicConfig(
        stream=sys.stdout,
        format="%(asctime)s %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        level=level
    )

    logger = logging.getLogger(name)
    coloredlogs.install(level=level, logger=logger, isatty=True)

    return logger


class ParallelProcessing:
    """
    Parallel processing.

    References
    ----------
    [1] Class `ParallelProcessing`: https://stackoverflow.com/a/70464369/16109419

    Examples
    --------
    >>> class MyParallelProcessing(ParallelProcessing):
    >>>     def process(self, name: str) -> None:
    >>>         logger = get_logger()
    >>>         logger.info(f"Executing process: {name}...")
    >>>         time.sleep(5)
    >>>
    >>>
    >>> params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
    >>> mpp = MyParallelProcessing()
    >>> mpp.run(args_list=params_list)
    """

    _n_jobs: int
    _waiting_time: int
    _queue: Value
    _logger: Logger

    def __init__(self, n_jobs: int = -1, waiting_time: int = 1):
        """
        Instantiates a parallel processing object to execute processes in parallel.

        Parameters
        ----------
        n_jobs: int
            Number of jobs.
        waiting_time: int
            Waiting time when jobs queue is full, e.g. `_queue.value` == `_n_jobs`.
        """
        self._n_jobs = n_jobs if n_jobs >= 0 else cpu_count()
        self._waiting_time = waiting_time if waiting_time >= 0 else 60*60
        self._logger = get_logger()

    def process(self, *args) -> None:
        """
        Abstract process that must be overridden.

        Parameters
        ----------
        *args
            Parameters of the process to be executed.
        """
        raise NotImplementedError("Process not defined ('NotImplementedError' exception).")

    def _execute(self, *args) -> None:
        """
        Run the process and remove it from the process queue by decreasing the queue process counter.

        Parameters
        ----------
        *args
            Parameters of the process to be executed.
        """
        self.process(*args)
        self._queue.value -= 1

    def _error_callback(self, result: Any) -> None:
        """
        Error callback.

        Parameters
        ----------
        result: Any
            Result from exceptions.
        """
        self._logger.error(result)
        os._exit(1)

    def run(self, args_list: Iterator[tuple], use_multithreading: bool = False) -> None:
        """
        Run processes in parallel.

        Parameters
        ----------
        args_list: Iterator[tuple]
            List of process parameters (`*args`).
        use_multithreading: bool
            Use multithreading instead multiprocessing.
        """
        manager = Manager()
        self._queue = manager.Value('i', 0)
        lock = manager.Lock()
        pool = Pool(processes=self._n_jobs) if not use_multithreading else ThreadPool(processes=self._n_jobs)

        start_time = datetime.now()

        with lock:  # Write-protecting the processes queue shared variable.
            for args in args_list:
                while True:
                    if self._queue.value < self._n_jobs:
                        self._queue.value += 1

                        # Running processes in parallel:
                        pool.apply_async(func=self._execute, args=args, error_callback=self._error_callback)

                        break
                    else:
                        self._logger.debug(f"Pool full ({self._n_jobs}): waiting {self._waiting_time} seconds...")
                        time.sleep(self._waiting_time)

        pool.close()
        pool.join()

        exec_time = datetime.now() - start_time
        self._logger.info(f"Execution time: {exec_time}")

使用示例:

class MyParallelProcessing(ParallelProcessing):
    def process(self, name: str) -> None:
        """
        Process to run in parallel (overrides abstract method).
        """
        logger = get_logger()
        logger.info(f"Executing process: {name}...")
        time.sleep(5)


def main() -> None:
    n_jobs = int(sys.argv[1])  # Number of jobs to run in parallel.
    params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]

    mpp = MyParallelProcessing(n_jobs=n_jobs)

    # Executing processes in parallel:
    mpp.run(args_list=params_list)


if __name__ == '__main__':
    main()

执行和输出

user@host:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478
user@host:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672
user@host:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934

解决方案 7:

在某些情况下,可以使用Numba自动并行化循环,尽管它只适用于 Python 的一小部分:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

不幸的是,Numba 似乎只适用于 Numpy 数组,而不适用于其他 Python 对象。理论上,也可以将Python 编译为 C++,然后使用 Intel C++ 编译器自动并行化,尽管我还没有尝试过。

解决方案 8:

如果您没有时间了解其他答案中推荐的库或模块的要求和假设,以下内容可能适合您:

  1. 为脚本提供运行任务各个部分的选项。

  2. 当准备并行运行n各个部件时,使用 启动它们child = subprocess.Popen(args = [sys.argv[0], ...]),在附加选项和/或参数文件中提供部件编号和其他详细信息,然后调用child.wait()每个子部件。

如果您想要监控进度,在工人完成后立即启动更多工人,或者在等待时做其他事情,请使用child.poll()而不是child.wait()并检查 是否child.returncode仍然None

对于大型任务,启动新进程以及写入和读取文件的开销很小。对于许多小型任务,人们可能只想启动一次工作程序,然后通过管道或套接字与它们通信,但这需要做更多的工作,并且必须小心谨慎地进行,以避免出现死锁。在这种情况下,最好学习如何使用其他答案中推荐的模块。

解决方案 9:

下面是在Windows环境下运行的完整示例;异步处理的优点是节省时间:

import multiprocessing
import time
from multiprocessing import Pool, freeze_support
from multiprocessing import Pool


def f1(a):
    c = 0
    for i in range(0, 99999999):
        c = c + 1
    return 1


def f2(b):
    c = 0
    for i in range(0, 99999999):
        c = c + 1
    return 1

if __name__ == '__main__':

    pool = Pool(multiprocessing.cpu_count())
    result1 = pool.apply_async(f1, [0])
    result2 = pool.apply_async(f2, [9])
    freeze_support()
    t0 = time.time()
    answer1 = result1.get(timeout=10)
    answer2 = result2.get(timeout=10)
    print(time.time()-t0)
    t0 = time.time()
    aa = f1(1)
    bb = f2(2)
    print(time.time()-t0)

解决方案 10:

您可以将 Dataframe 转换为Dask Dataframe,它可以为您处理并行计算。

import dask.dataframe as dd
pdf = pd.Pandas({"A" : A, "B" : B})
ddf = dd.from_pandas(pdf, npartitions=3)
solve(ddf)

解决方案 11:

您不能使用线程在 Python 中进行并行编程。您必须使用多处理,或者如果您执行文件或互联网数据包之类的操作,则可以使用asyncawaitasyncio

如果您想要线程,您可以尝试使用cython,但您必须安装带有 python 的 Visual Studio 并且也安装开发人员包。

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用