如何并行运行函数?

2024-12-20 08:37:00
admin
原创
54
摘要:问题描述:我正在尝试在 Python 中并行运行多个函数。我有类似的东西:files.py import common #common is a util class that handles all the IO stuff dir1 = 'C: older1' dir2 = 'C: older2' f...

问题描述:

我正在尝试在 Python 中并行运行多个函数。

我有类似的东西:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:older1'
dir2 = 'C:older2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

我想调用 func1 和 func2 并让它们同时运行。这些函数彼此之间或同一对象之间不交互。现在我必须等待 func1 完成后才能启动 func2。我该如何做如下操作:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

我希望能够同时创建两个目录,因为我每分钟都会计算创建的文件数量。如果目录不存在,我的计时就会被打乱。


解决方案 1:

您可以使用threadingmultiprocessing

由于CPython 的特性,threading不太可能实现真正的并行性。因此,multiprocessing通常是更好的选择。

这是一个完整的例子:

from multiprocessing import Process


def func1():
    print("func1: starting")
    for i in range(10000000):
        pass

    print("func1: finishing")


def func2():
    print("func2: starting")
    for i in range(10000000):
        pass

    print("func2: finishing")


if __name__ == "__main__":
    p1 = Process(target=func1)
    p1.start()
    p2 = Process(target=func2)
    p2.start()
    p1.join()
    p2.join()

启动/加入子进程的机制可以很容易地封装到如下函数中runBothFunc

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

解决方案 2:

如果你的函数主要执行I/O 工作(较少的 CPU 工作)并且你拥有 Python 3.2+,那么你可以使用ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

如果你的函数主要执行CPU 工作(较少的 I/O 工作)并且你拥有 Python 3.2+,那么你可以使用ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor

def run_cpu_tasks_in_parallel(tasks):
    with ProcessPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])

或者如果你只有 Python 2.6+,你可以直接使用多处理模块:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])

解决方案 3:

这可以通过Ray优雅地完成,该系统允许您轻松并行化和分发 Python 代码。

为了并行化您的示例,您需要用@ray.remote装饰器定义您的函数,然后用调用它们.remote

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

如果将相同的参数传递给两个函数,并且该参数很大,则更有效的方法是使用ray.put()。这避免了对大参数进行两次序列化并创建它的两个内存副本:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

重要-如果func1()返回func2()结果,则需要重写代码如下:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

与多处理模块相比,使用 Ray 有许多优势。特别是,同一段代码既可以在一台机器上运行,也可以在一组机器上运行。有关 Ray 的更多优势,请参阅此相关文章。

解决方案 4:

似乎你有一个需要调用两个不同参数的函数。这可以通过结合使用Python 3.2+concurrent.futures和Python 3.2.1 来优雅地实现。map

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

现在,如果你的操作是 IO 绑定的,那么你可以ThreadPoolExecutor这样使用:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

注意map这里是如何将map你的函数添加到参数列表中的。

现在,如果你的函数是 CPU 密集型的,那么你可以使用ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

如果您不确定,您可以简单地尝试两者,看看哪一个能给您更好的结果。

最后,如果您想打印出结果,您可以简单地执行以下操作:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)

解决方案 5:

在 2021 年,最简单的方法是使用 asyncio:

import asyncio, time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():

    task1 = asyncio.create_task(
        say_after(4, 'hello'))

    task2 = asyncio.create_task(
        say_after(3, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

参考:

[1] https://docs.python.org/3/library/asyncio-task.html

解决方案 6:

如果您是 Windows 用户并使用 Python 3,那么这篇文章将帮助您在 Python 中进行并行编程。当您运行常用的多处理库的池编程时,您将收到有关程序中主函数的错误。这是因为 Windows 没有 fork() 功能。下面的文章给出了上述问题的解决方案。

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

因为我用的是python3,所以我对程序做了一些修改,如下:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

经过这个函数之后,上面的问题代码也稍微改变了一下,如下所示:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

我得到的输出为:

[1, 8, 27, 64, 125, 216]

我认为这篇文章可能对一些 Windows 用户有用。

解决方案 7:

没有办法保证两个函数能够同步执行,这似乎正是您想要做的。

您能做的最好的事情是将功能分成几个步骤,然后等待两个步骤在关键同步点完成,Process.join就像@aix 的答案提到的那样。

这比无法保证准确时间要好time.sleep(10)。通过明确等待,您说函数必须先执行完该步骤,然后才能转到下一步,而不是假设它会在 10ms 内完成,而根据机器上正在发生的其他情况,这无法保证。

解决方案 8:

仅举一个简单的相关示例。如果要将函数并行应用于整数数组,可以使用ProcessPoolExecutormap方法。这是一个简单的例子:

from concurrent.futures import ProcessPoolExecutor

def square(n):
    return n * n

if __name__ == "__main__":
    numbers = [1, 2, 3, 4, 5]
    
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(square, numbers))

    print("Squares:", results)

在此示例中,square函数应用于numbers数组的每个元素。结果收集在results列表中,保持输入数组的顺序。

解决方案 9:

(关于如何在 python 中同时运行两个(或更多)函数? )

使用asyncio,同步/异步任务可以通过以下方式同时运行:

import asyncio
import time

def function1():
    # performing blocking tasks
    while True:
        print("function 1: blocking task ...")
        time.sleep(1)

async def function2():
    # perform non-blocking tasks
    while True:
        print("function 2: non-blocking task ...")
        await asyncio.sleep(1)

async def main():
    loop = asyncio.get_running_loop()

    await asyncio.gather(
        # https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
        loop.run_in_executor(None, function1),
        function2(),
    )

if __name__ == '__main__':
    asyncio.run(main())

相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   990  
  在项目管理领域,CDCP(Certified Data Center Professional)认证评审是一个至关重要的环节,它不仅验证了项目团队的专业能力,还直接关系到项目的成功与否。在这一评审过程中,沟通技巧的运用至关重要。有效的沟通不仅能够确保信息的准确传递,还能增强团队协作,提升评审效率。本文将深入探讨CDCP...
华为IPD流程   26  
  IPD(Integrated Product Development,集成产品开发)是一种以客户需求为核心、跨部门协同的产品开发模式,旨在通过高效的资源整合和流程优化,提升产品开发的成功率和市场竞争力。在IPD培训课程中,掌握关键成功因素是确保团队能够有效实施这一模式的核心。以下将从五个关键成功因素展开讨论,帮助企业和...
IPD项目流程图   27  
  华为IPD(Integrated Product Development,集成产品开发)流程是华为公司在其全球化进程中逐步构建和完善的一套高效产品开发管理体系。这一流程不仅帮助华为在技术创新和产品交付上实现了质的飞跃,还为其在全球市场中赢得了显著的竞争优势。IPD的核心在于通过跨部门协作、阶段性评审和市场需求驱动,确保...
华为IPD   26  
  华为作为全球领先的通信技术解决方案提供商,其成功的背后离不开一套成熟的管理体系——集成产品开发(IPD)。IPD不仅是一种产品开发流程,更是一种系统化的管理思想,它通过跨职能团队的协作、阶段评审机制和市场需求驱动的开发模式,帮助华为在全球市场中脱颖而出。从最初的国内市场到如今的全球化布局,华为的IPD体系在多个领域展现...
IPD管理流程   53  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用