如何并行化一个简单的 Python 循环?

2024-12-11 08:47:00
admin
原创
153
摘要:问题描述:这可能是一个简单的问题,但是如何在 python 中并行化以下循环?# setup output lists output1 = list() output2 = list() output3 = list() for j in range(0, 10): # calc individua...

问题描述:

这可能是一个简单的问题,但是如何在 python 中并行化以下循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

我知道如何在 Python 中启动单线程,但我不知道如何“收集”结果。

多个进程也可以 - 无论哪种方式对这种情况来说最简单。我目前使用的是 Linux,但代码也应该可以在 Windows 和 Mac 上运行。

并行化此代码的最简单方法是什么?


解决方案 1:

CPython 实现目前有一个全局解释器锁(GIL),可防止同一解释器的线程同时执行 Python 代码。这意味着 CPython 线程对于并发 I/O 密集型工作负载很有用,但通常不适用于 CPU 密集型工作负载。命名calc_stuff()表明您的工作负载是 CPU 密集型的,因此您需要在此处使用多个进程(无论 GIL 如何,这通常是 CPU 密集型工作负载的更好解决方案)。

有两种简单的方法可以将进程池创建到 Python 标准库中。第一种方法是使用multiprocessing模块,可以像这样使用:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

multiprocessing请注意,由于实现方式的原因,这在交互式解释器中不起作用。

创建进程池的第二种方法是concurrent.futures.ProcessPoolExecutor

with concurrent.futures.ProcessPoolExecutor() as pool:
    out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

这使用了multiprocessing底层模块,因此其行为与第一个版本相同。

解决方案 2:

from joblib import Parallel, delayed
def process(i):
    return i * i
    
results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)  # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

上述程序在我的计算机上运行良好(Ubuntu,joblib 包已预先安装,但可以通过 安装pip install joblib)。

摘自https://blog.dominodatalab.com/simple-parallelization/


编辑于 2021 年 3 月 31 日:于joblibmultiprocessingthreading`asyncio`

  • joblib上面的代码中使用了import multiprocessing底层技术(因此使用了多个进程,这通常是跨核心运行 CPU 工作的最佳方式 - 因为有 GIL)

  • 你可以joblib使用多个线程而不是多个进程,但这(或import threading直接使用)只有在线程花费大量时间进行 I/O(例如读取/写入磁盘、发送 HTTP 请求)时才有益。对于 I/O 工作,GIL 不会阻止另一个线程的执行

  • 从 Python 3.7 开始,作为 的替代方案,您可以使用asynciothreading并行工作,但同样的建议也适用于(尽管与后者相比,只会使用 1 个线程;从好的方面来说,它有很多不错的特性,有助于异步编程)import threading`asyncio`

  • 使用多个进程会产生开销。想想看:通常,每个进程都需要初始化/加载运行计算所需的一切。您需要自己检查上述代码片段是否能缩短您的计算时间。以下是另一个,我确认它可以joblib产生更好的结果:

import time
from joblib import Parallel, delayed

def countdown(n):
    while n>0:
        n -= 1
    return n


t = time.time()
for _ in range(20):
    print(countdown(10**7), end=" ")
print(time.time() - t)  
# takes ~10.5 seconds on medium sized Macbook Pro


t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro

解决方案 3:

这是最简单的方法!

您可以使用asyncio 。(文档可在此处找到)。它是多个 Python 异步框架的基础,这些框架提供高性能网络和 Web 服务器、数据库连接库、分布式任务队列等。此外,它还具有高级和低级 API 来应对任何类型的问题。

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

现在,此函数将在每次调用时并行运行,而无需将主程序置于等待状态。您也可以使用它来并行化 for 循环。当为 for 循环调用时,虽然循环是连续的,但只要解释器​​到达主程序,每次迭代都会与主程序并行运行。

  1. 与主线程并行触发循环,无需等待


在此处输入图片描述

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

这将产生以下输出:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

更新:2022 年 5 月

虽然这回答了原始问题,但我们可以通过其他方式等待循环完成,正如赞同评论所要求的那样。因此也在这里添加它们。实现的关键是:asyncio.gather()& run_until_complete()。考虑以下函数:

import asyncio
import time

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument, other_argument): # Added another argument
    time.sleep(5)
    print(f"function finished for {argument=} and {other_argument=}")

def code_to_run_before():
    print('This runs Before Loop!')

def code_to_run_after():
    print('This runs After Loop!')
  1. 并行运行但等待完成


在此处输入图片描述

code_to_run_before()                                                         # Anything you want to run before, run here!

loop = asyncio.get_event_loop()                                              # Have a new event loop

looper = asyncio.gather(*[your_function(i, 1) for i in range(1, 5)])         # Run the loop
                               
results = loop.run_until_complete(looper)                                    # Wait until finish

code_to_run_after()                                                          # Anything you want to run after, run here!

这将产生以下输出:

This runs Before Loop!
function finished for argument=2 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=1 and other_argument=1
function finished for argument=4 and other_argument=1
This runs After Loop!
  1. 并行运行多个循环并等待完成


在此处输入图片描述

code_to_run_before()                                                         # Anything you want to run before, run here!   

loop = asyncio.get_event_loop()                                              # Have a new event loop

group1 = asyncio.gather(*[your_function(i, 1) for i in range(1, 2)])         # Run all the loops you want
group2 = asyncio.gather(*[your_function(i, 2) for i in range(3, 5)])         # Run all the loops you want
group3 = asyncio.gather(*[your_function(i, 3) for i in range(6, 9)])         # Run all the loops you want

all_groups = asyncio.gather(group1, group2, group3)                          # Gather them all                                    
results = loop.run_until_complete(all_groups)                                # Wait until finish

code_to_run_after()                                                          # Anything you want to run after, run here!

这将产生以下输出:

This runs Before Loop!
function finished for argument=3 and other_argument=2
function finished for argument=1 and other_argument=1
function finished for argument=6 and other_argument=3
function finished for argument=4 and other_argument=2
function finished for argument=7 and other_argument=3
function finished for argument=8 and other_argument=3
This runs After Loop!
  1. 循环按顺序运行,但每个循环的迭代彼此并行运行


在此处输入图片描述

code_to_run_before()                                                               # Anything you want to run before, run here!

for loop_number in range(3):

    loop = asyncio.get_event_loop()                                                # Have a new event loop

    looper = asyncio.gather(*[your_function(i, loop_number) for i in range(1, 5)]) # Run the loop
                             
    results = loop.run_until_complete(looper)                                      # Wait until finish

    print(f"finished for {loop_number=}")       

code_to_run_after()                                                                # Anything you want to run after, run here!

这将产生以下输出:

This runs Before Loop!
function finished for argument=3 and other_argument=0
function finished for argument=4 and other_argument=0
function finished for argument=1 and other_argument=0
function finished for argument=2 and other_argument=0
finished for loop_number=0
function finished for argument=4 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=2 and other_argument=1
function finished for argument=1 and other_argument=1
finished for loop_number=1
function finished for argument=1 and other_argument=2
function finished for argument=4 and other_argument=2
function finished for argument=3 and other_argument=2
function finished for argument=2 and other_argument=2
finished for loop_number=2
This runs After Loop!

更新:2022 年 6 月

目前的形式可能无法在某些版本的 jupyter notebook 上运行。原因是 jupyter notebook 使用事件循环。要使其在这些 jupyter 版本上运行,nest_asyncio(从名称就可以看出,它将嵌套事件循环)是可行的方法。只需将其导入并应用到单元格顶部即可:

import nest_asyncio
nest_asyncio.apply()

并且上面讨论的所有功能也应该可以在笔记本环境中访问。

解决方案 4:

为了并行化一个简单的 for 循环,joblib为多处理的原始使用带来了很多价值。不仅是简短的语法,还有诸如当迭代速度非常快时透明地聚集迭代(以消除开销)或捕获子进程的回溯,以便更好地报告错误等功能。

免责声明:我是 joblib 的原作者。

解决方案 5:

并行化此代码的最简单方法是什么?

使用 中的 PoolExecutor concurrent.futures。将原始代码与此代码并排比较。首先,最简洁的方法是使用executor.map

...
with ProcessPoolExecutor() as executor:
    for out1, out2, out3 in executor.map(calc_stuff, parameters):
        ...

或者通过单独提交每个调用来细分:

...
with ThreadPoolExecutor() as executor:
    futures = []
    for parameter in parameters:
        futures.append(executor.submit(calc_stuff, parameter))

    for future in futures:
        out1, out2, out3 = future.result() # this will block
        ...

离开上下文会向执行者发出释放资源的信号

您可以使用线程或进程并使用完全相同的接口。

一个工作示例

以下是有效示例代码,它将展示以下内容的价值:

将其放入文件-futuretest.py中:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection

def processor_intensive(arg):
    def fib(n): # recursive, processor intensive calculation (avoid n > 36)
        return fib(n-1) + fib(n-2) if n > 1 else n
    start = time()
    result = fib(arg)
    return time() - start, result

def io_bound(arg):
    start = time()
    con = HTTPSConnection(arg)
    con.request('GET', '/')
    result = con.getresponse().getcode()
    return time() - start, result

def manager(PoolExecutor, calc_stuff):
    if calc_stuff is io_bound:
        inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
                  'noaa.gov', 'parler.com', 'aaronhall.dev')
    else:
        inputs = range(25, 32)
    timings, results = list(), list()
    start = time()
    with PoolExecutor() as executor:
        for timing, result in executor.map(calc_stuff, inputs):
            # put results into correct output list:
            timings.append(timing), results.append(result)
    finish = time()
    print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
    print(f'wall time to execute: {finish-start}')
    print(f'total of timings for each call: {sum(timings)}')
    print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
    print(dict(zip(inputs, results)), end = '

')

def main():
    for computation in (processor_intensive, io_bound):
        for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
            manager(pool_executor, calc_stuff=computation)

if __name__ == '__main__':
    main()

以下是一次运行的输出python -m futuretest

processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

处理器密集型分析

在 Python 中执行处理器密集型计算时,期望的ProcessPoolExecutor性能优于ThreadPoolExecutor

由于全局解释器锁(又名 GIL),线程无法使用多个处理器,因此预计每次计算的时间和挂钟时间(实际经过时间)会更长。

IO 绑定分析

另一方面,当执行 IO 绑定操作时,期望ThreadPoolExecutor比 具有更高的性能ProcessPoolExecutor

Python 的线程是真实的 OS 线程。它们可以被操作系统置于休眠状态,并在信息到达时重新唤醒。

最后的想法

我怀疑 Windows 上的多处理速度会更慢,因为 Windows 不支持分叉,所以每个新进程都需要时间来启动。

你可以在多个进程内嵌套多个线程,但建议不要使用多个线程来分拆多个进程。

如果在 Python 中面临繁重的处理问题,您可以通过增加进程来轻松扩展 - 但使用线程则不行。

解决方案 6:

使用Ray有许多优点:

  • 除了多个核心之外,您还可以在多台机器上进行并行化(使用相同的代码)。

  • 通过共享内存(和零拷贝序列化)有效处理数值数据。

  • 通过分布式调度实现高任务吞吐量。

  • 容错。

就你的情况而言,你可以启动 Ray 并定义一个远程函数

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

然后并行调用它

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

要在集群上运行相同的示例,唯一需要改变的行是对 ray.init() 的调用。相关文档可在此处找到。

请注意,我正在帮助开发 Ray。

解决方案 7:

我发现joblib它对我很有用。请参见以下示例:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs=-1:使用所有可用的核心

解决方案 8:

Dask 期货;我很惊讶还没有人提到它。。。

from dask.distributed import Client

client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)

def my_function(i):
    output = <code to execute in the for loop here>
    return output

futures = []

for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)

results = client.gather(futures)
client.close()

解决方案 9:

谢谢@iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

解决方案 10:

tqdm 库的并发包装器是并行化长时间运行代码的好方法。tqdm 通过智能进度计提供有关当前进度和剩余时间的反馈,我发现这对于长时间计算非常有用。

可以通过简单调用 重写循环以作为并发线程运行thread_map,或者通过简单调用 重写循环以作为并发多进程运行process_map

from tqdm.contrib.concurrent import thread_map, process_map


def calc_stuff(num, multiplier):
    import time

    time.sleep(1)

    return num, num * multiplier


if __name__ == "__main__":

    # let's parallelize this for loop:
    # results = [calc_stuff(i, 2) for i in range(64)]

    loop_idx = range(64)
    multiplier = [2] * len(loop_idx)

    # either with threading:
    results_threading = thread_map(calc_stuff, loop_idx, multiplier)

    # or with multi-processing:
    results_processes = process_map(calc_stuff, loop_idx, multiplier)

解决方案 11:

为什么不使用线程和一个互斥锁来保护一个全局列表?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

请记住,你的速度将与最慢的线程一样快

解决方案 12:

假设我们有一个异步函数

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

这需要在一个大型数组上运行。一些属性被传递给程序,一些属性从数组中的字典元素的属性中使用。

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

解决方案 13:

这在 Python 中实现多处理和并行/分布式计算时可能很有用。

有关使用 techila 包的 YouTube 教程

Techila 是一个分布式计算中间件,它使用 techila 包直接与 Python 集成。包中的 peach 函数可用于并行化循环结构。(以下代码片段来自Techila 社区论坛)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

解决方案 14:

看看这个;

http://docs.python.org/library/queue.html

这可能不是正确的方法,但我会做类似的事情;

实际代码;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

希望有所帮助。

解决方案 15:

并行处理的简单示例是

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用