如何并行运行函数?

2024-12-20 08:37:00
admin
原创
113
摘要:问题描述:我正在尝试在 Python 中并行运行多个函数。我有类似的东西:files.py import common #common is a util class that handles all the IO stuff dir1 = 'C: older1' dir2 = 'C: older2' f...

问题描述:

我正在尝试在 Python 中并行运行多个函数。

我有类似的东西:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:older1'
dir2 = 'C:older2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

我想调用 func1 和 func2 并让它们同时运行。这些函数彼此之间或同一对象之间不交互。现在我必须等待 func1 完成后才能启动 func2。我该如何做如下操作:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

我希望能够同时创建两个目录,因为我每分钟都会计算创建的文件数量。如果目录不存在,我的计时就会被打乱。


解决方案 1:

您可以使用threadingmultiprocessing

由于CPython 的特性,threading不太可能实现真正的并行性。因此,multiprocessing通常是更好的选择。

这是一个完整的例子:

from multiprocessing import Process


def func1():
    print("func1: starting")
    for i in range(10000000):
        pass

    print("func1: finishing")


def func2():
    print("func2: starting")
    for i in range(10000000):
        pass

    print("func2: finishing")


if __name__ == "__main__":
    p1 = Process(target=func1)
    p1.start()
    p2 = Process(target=func2)
    p2.start()
    p1.join()
    p2.join()

启动/加入子进程的机制可以很容易地封装到如下函数中runBothFunc

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

解决方案 2:

如果你的函数主要执行I/O 工作(较少的 CPU 工作)并且你拥有 Python 3.2+,那么你可以使用ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

如果你的函数主要执行CPU 工作(较少的 I/O 工作)并且你拥有 Python 3.2+,那么你可以使用ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor

def run_cpu_tasks_in_parallel(tasks):
    with ProcessPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])

或者如果你只有 Python 2.6+,你可以直接使用多处理模块:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])

解决方案 3:

这可以通过Ray优雅地完成,该系统允许您轻松并行化和分发 Python 代码。

为了并行化您的示例,您需要用@ray.remote装饰器定义您的函数,然后用调用它们.remote

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

如果将相同的参数传递给两个函数,并且该参数很大,则更有效的方法是使用ray.put()。这避免了对大参数进行两次序列化并创建它的两个内存副本:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

重要-如果func1()返回func2()结果,则需要重写代码如下:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

与多处理模块相比,使用 Ray 有许多优势。特别是,同一段代码既可以在一台机器上运行,也可以在一组机器上运行。有关 Ray 的更多优势,请参阅此相关文章。

解决方案 4:

似乎你有一个需要调用两个不同参数的函数。这可以通过结合使用Python 3.2+concurrent.futures和Python 3.2.1 来优雅地实现。map

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

现在,如果你的操作是 IO 绑定的,那么你可以ThreadPoolExecutor这样使用:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

注意map这里是如何将map你的函数添加到参数列表中的。

现在,如果你的函数是 CPU 密集型的,那么你可以使用ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

如果您不确定,您可以简单地尝试两者,看看哪一个能给您更好的结果。

最后,如果您想打印出结果,您可以简单地执行以下操作:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)

解决方案 5:

在 2021 年,最简单的方法是使用 asyncio:

import asyncio, time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():

    task1 = asyncio.create_task(
        say_after(4, 'hello'))

    task2 = asyncio.create_task(
        say_after(3, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

参考:

[1] https://docs.python.org/3/library/asyncio-task.html

解决方案 6:

如果您是 Windows 用户并使用 Python 3,那么这篇文章将帮助您在 Python 中进行并行编程。当您运行常用的多处理库的池编程时,您将收到有关程序中主函数的错误。这是因为 Windows 没有 fork() 功能。下面的文章给出了上述问题的解决方案。

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

因为我用的是python3,所以我对程序做了一些修改,如下:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

经过这个函数之后,上面的问题代码也稍微改变了一下,如下所示:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

我得到的输出为:

[1, 8, 27, 64, 125, 216]

我认为这篇文章可能对一些 Windows 用户有用。

解决方案 7:

没有办法保证两个函数能够同步执行,这似乎正是您想要做的。

您能做的最好的事情是将功能分成几个步骤,然后等待两个步骤在关键同步点完成,Process.join就像@aix 的答案提到的那样。

这比无法保证准确时间要好time.sleep(10)。通过明确等待,您说函数必须先执行完该步骤,然后才能转到下一步,而不是假设它会在 10ms 内完成,而根据机器上正在发生的其他情况,这无法保证。

解决方案 8:

仅举一个简单的相关示例。如果要将函数并行应用于整数数组,可以使用ProcessPoolExecutormap方法。这是一个简单的例子:

from concurrent.futures import ProcessPoolExecutor

def square(n):
    return n * n

if __name__ == "__main__":
    numbers = [1, 2, 3, 4, 5]
    
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(square, numbers))

    print("Squares:", results)

在此示例中,square函数应用于numbers数组的每个元素。结果收集在results列表中,保持输入数组的顺序。

解决方案 9:

(关于如何在 python 中同时运行两个(或更多)函数? )

使用asyncio,同步/异步任务可以通过以下方式同时运行:

import asyncio
import time

def function1():
    # performing blocking tasks
    while True:
        print("function 1: blocking task ...")
        time.sleep(1)

async def function2():
    # perform non-blocking tasks
    while True:
        print("function 2: non-blocking task ...")
        await asyncio.sleep(1)

async def main():
    loop = asyncio.get_running_loop()

    await asyncio.gather(
        # https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
        loop.run_in_executor(None, function1),
        function2(),
    )

if __name__ == '__main__':
    asyncio.run(main())

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用