如何用 Python 进行并行编程?
- 2025-01-22 08:45:00
- admin 原创
- 115
问题描述:
对于 C++,我们可以使用 OpenMP 进行并行编程;但是,OpenMP 不适用于 Python。如果我想并行化 Python 程序的某些部分,该怎么办?
代码的结构可以视为:
solve1(A)
solve2(B)
其中solve1
和solve2
是两个独立的函数。如何并行运行这类代码而不是顺序运行以减少运行时间?代码为:
def solve(Q, G, n):
i = 0
tol = 10 ** -4
while i < 1000:
inneropt, partition, x = setinner(Q, G, n)
outeropt = setouter(Q, G, n)
if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
break
node1 = partition[0]
node2 = partition[1]
G = updateGraph(G, node1, node2)
if i == 999:
print "Maximum iteration reaches"
print inneropt
其中setinner
和setouter
是两个独立函数。这就是我想要并行的地方...
解决方案 1:
您可以使用多处理模块。在这种情况下,我可能会使用处理池:
from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
这将生成可以为您执行一般工作的进程。由于我们没有通过processes
,它将为您机器上的每个 CPU 核心生成一个进程。每个 CPU 核心可以同时执行一个进程。
如果您想将列表映射到单个函数,您可以这样做:
args = [A, B]
results = pool.map(solve1, args)
不要使用线程,因为GIL会锁定对 python 对象的任何操作。
解决方案 2:
使用Ray可以非常优雅地完成此操作。
为了并行化您的示例,您需要用@ray.remote
装饰器定义您的函数,然后用调用它们.remote
。
import ray
ray.init()
# Define the functions.
@ray.remote
def solve1(a):
return 1
@ray.remote
def solve2(b):
return 2
# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)
# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])
与多处理模块相比,它有许多优点。
相同的代码可以在多核机器以及机器集群上运行。
进程通过共享内存和零拷贝序列化有效地共享数据。
错误信息被很好地传播。
这些函数调用可以组合在一起,例如,
@ray.remote
def f(x):
return x + 1
x_id = f.remote(1)
y_id = f.remote(x_id)
z_id = f.remote(y_id)
ray.get(z_id) # returns 4
除了远程调用函数之外,还可以将类远程实例化为参与者。
请注意,Ray是我一直在帮助开发的一个框架。
解决方案 3:
解决方案,正如其他人所说,是使用多个进程。然而,哪个框架更合适,取决于许多因素。除了已经提到的,还有charm4py和mpi4py(我是 charm4py 的开发人员)。
除了使用工作池抽象之外,还有一种更有效的方法来实现上述示例。主循环G
在 1000 次迭代中反复向工作线程发送相同的参数(包括完整的图)。由于至少一个工作线程将驻留在不同的进程中,因此这涉及复制参数并将其发送到其他进程。根据对象的大小,这可能非常昂贵。相反,让工作线程存储状态并仅发送更新的信息是有意义的。
例如,在 charm4py 中可以这样做:
class Worker(Chare):
def __init__(self, Q, G, n):
self.G = G
...
def setinner(self, node1, node2):
self.updateGraph(node1, node2)
...
def solve(Q, G, n):
# create 2 workers, each on a different process, passing the initial state
worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
while i < 1000:
result_a = worker_a.setinner(node1, node2, ret=True) # execute setinner on worker A
result_b = worker_b.setouter(node1, node2, ret=True) # execute setouter on worker B
inneropt, partition, x = result_a.get() # wait for result from worker A
outeropt = result_b.get() # wait for result from worker B
...
请注意,对于此示例,我们实际上只需要一个工作程序。主循环可以执行其中一个函数,并让工作程序执行另一个函数。但我的代码有助于说明以下几点:
工作器 A 在进程 0 中运行(与主循环相同)。在
result_a.get()
阻塞等待结果期间,工作器 A 在同一进程中进行计算。参数会自动通过引用传递给工作者 A,因为它在同一个进程中(不涉及复制)。
解决方案 4:
CPython 使用全局解释器锁,这使得并行编程比 C++ 更有趣
本主题有几个有用的示例和挑战描述:
在 Linux 上使用任务集在多核系统上解决 Python 全局解释器锁(GIL)问题?
解决方案 5:
您可以使用joblib
库进行并行计算和多处理。
from joblib import Parallel, delayed
您可以简单地创建一个foo
想要并行运行的函数,并基于下面的代码实现并行处理:
output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)
可以从以下图书馆获取num_cores
:multiprocessing
import multiprocessing
num_cores = multiprocessing.cpu_count()
如果您有一个具有多个输入参数的函数,并且您只想通过列表迭代其中一个参数,则可以使用库partial
中的函数functools
,如下所示:
from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
'''
body of the function
'''
return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)
您可以在此处找到有关 python 和 R 多处理的完整解释以及几个示例。
解决方案 6:
我总是使用“多处理”本机库来处理 Python 中的并行性。为了控制队列中的进程数,我使用共享变量作为计数器。在下面的示例中,您可以看到简单进程的并行执行是如何工作的。
我对脚本进行了更新,使其更易于使用。基本上,您唯一要做的就是process
用要并行运行的函数覆盖该方法。请参阅示例,该过程非常简单。或者,您也可以删除所有执行日志事件。
当我有时间时,我会更新代码以处理返回值的进程。
要求
user@host:~$ pip install coloredlogs==15.0.1
代码
并行处理脚本(复制粘贴):
#!/usr/bin/env python
# encoding: utf-8
from multiprocessing import Manager, Pool, Value, cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Iterator
from datetime import datetime
from logging import Logger
import coloredlogs
import logging
import time
import sys
import os
LOG_LEVEL = "DEBUG"
def get_logger(name: str = __name__, level: str = LOG_LEVEL) -> Logger:
assert level in ("NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")
# Setting-up the script logging:
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=level
)
logger = logging.getLogger(name)
coloredlogs.install(level=level, logger=logger, isatty=True)
return logger
class ParallelProcessing:
"""
Parallel processing.
References
----------
[1] Class `ParallelProcessing`: https://stackoverflow.com/a/70464369/16109419
Examples
--------
>>> class MyParallelProcessing(ParallelProcessing):
>>> def process(self, name: str) -> None:
>>> logger = get_logger()
>>> logger.info(f"Executing process: {name}...")
>>> time.sleep(5)
>>>
>>>
>>> params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
>>> mpp = MyParallelProcessing()
>>> mpp.run(args_list=params_list)
"""
_n_jobs: int
_waiting_time: int
_queue: Value
_logger: Logger
def __init__(self, n_jobs: int = -1, waiting_time: int = 1):
"""
Instantiates a parallel processing object to execute processes in parallel.
Parameters
----------
n_jobs: int
Number of jobs.
waiting_time: int
Waiting time when jobs queue is full, e.g. `_queue.value` == `_n_jobs`.
"""
self._n_jobs = n_jobs if n_jobs >= 0 else cpu_count()
self._waiting_time = waiting_time if waiting_time >= 0 else 60*60
self._logger = get_logger()
def process(self, *args) -> None:
"""
Abstract process that must be overridden.
Parameters
----------
*args
Parameters of the process to be executed.
"""
raise NotImplementedError("Process not defined ('NotImplementedError' exception).")
def _execute(self, *args) -> None:
"""
Run the process and remove it from the process queue by decreasing the queue process counter.
Parameters
----------
*args
Parameters of the process to be executed.
"""
self.process(*args)
self._queue.value -= 1
def _error_callback(self, result: Any) -> None:
"""
Error callback.
Parameters
----------
result: Any
Result from exceptions.
"""
self._logger.error(result)
os._exit(1)
def run(self, args_list: Iterator[tuple], use_multithreading: bool = False) -> None:
"""
Run processes in parallel.
Parameters
----------
args_list: Iterator[tuple]
List of process parameters (`*args`).
use_multithreading: bool
Use multithreading instead multiprocessing.
"""
manager = Manager()
self._queue = manager.Value('i', 0)
lock = manager.Lock()
pool = Pool(processes=self._n_jobs) if not use_multithreading else ThreadPool(processes=self._n_jobs)
start_time = datetime.now()
with lock: # Write-protecting the processes queue shared variable.
for args in args_list:
while True:
if self._queue.value < self._n_jobs:
self._queue.value += 1
# Running processes in parallel:
pool.apply_async(func=self._execute, args=args, error_callback=self._error_callback)
break
else:
self._logger.debug(f"Pool full ({self._n_jobs}): waiting {self._waiting_time} seconds...")
time.sleep(self._waiting_time)
pool.close()
pool.join()
exec_time = datetime.now() - start_time
self._logger.info(f"Execution time: {exec_time}")
使用示例:
class MyParallelProcessing(ParallelProcessing):
def process(self, name: str) -> None:
"""
Process to run in parallel (overrides abstract method).
"""
logger = get_logger()
logger.info(f"Executing process: {name}...")
time.sleep(5)
def main() -> None:
n_jobs = int(sys.argv[1]) # Number of jobs to run in parallel.
params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
mpp = MyParallelProcessing(n_jobs=n_jobs)
# Executing processes in parallel:
mpp.run(args_list=params_list)
if __name__ == '__main__':
main()
执行和输出
user@host:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478
user@host:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672
user@host:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934
解决方案 7:
在某些情况下,可以使用Numba自动并行化循环,尽管它只适用于 Python 的一小部分:
from numba import njit, prange
@njit(parallel=True)
def prange_test(A):
s = 0
# Without "parallel=True" in the jit-decorator
# the prange statement is equivalent to range
for i in prange(A.shape[0]):
s += A[i]
return s
不幸的是,Numba 似乎只适用于 Numpy 数组,而不适用于其他 Python 对象。理论上,也可以将Python 编译为 C++,然后使用 Intel C++ 编译器自动并行化,尽管我还没有尝试过。
解决方案 8:
如果您没有时间了解其他答案中推荐的库或模块的要求和假设,以下内容可能适合您:
为脚本提供运行任务各个部分的选项。
当准备并行运行
n
各个部件时,使用 启动它们child = subprocess.Popen(args = [sys.argv[0], ...])
,在附加选项和/或参数文件中提供部件编号和其他详细信息,然后调用child.wait()
每个子部件。
如果您想要监控进度,在工人完成后立即启动更多工人,或者在等待时做其他事情,请使用child.poll()
而不是child.wait()
并检查 是否child.returncode
仍然None
。
对于大型任务,启动新进程以及写入和读取文件的开销很小。对于许多小型任务,人们可能只想启动一次工作程序,然后通过管道或套接字与它们通信,但这需要做更多的工作,并且必须小心谨慎地进行,以避免出现死锁。在这种情况下,最好学习如何使用其他答案中推荐的模块。
解决方案 9:
下面是在Windows环境下运行的完整示例;异步处理的优点是节省时间:
import multiprocessing
import time
from multiprocessing import Pool, freeze_support
from multiprocessing import Pool
def f1(a):
c = 0
for i in range(0, 99999999):
c = c + 1
return 1
def f2(b):
c = 0
for i in range(0, 99999999):
c = c + 1
return 1
if __name__ == '__main__':
pool = Pool(multiprocessing.cpu_count())
result1 = pool.apply_async(f1, [0])
result2 = pool.apply_async(f2, [9])
freeze_support()
t0 = time.time()
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
print(time.time()-t0)
t0 = time.time()
aa = f1(1)
bb = f2(2)
print(time.time()-t0)
解决方案 10:
您可以将 Dataframe 转换为Dask Dataframe,它可以为您处理并行计算。
import dask.dataframe as dd
pdf = pd.Pandas({"A" : A, "B" : B})
ddf = dd.from_pandas(pdf, npartitions=3)
solve(ddf)
解决方案 11:
您不能使用线程在 Python 中进行并行编程。您必须使用多处理,或者如果您执行文件或互联网数据包之类的操作,则可以使用async
、await
和asyncio
。
如果您想要线程,您可以尝试使用cython
,但您必须安装带有 python 的 Visual Studio 并且也安装开发人员包。