如何并行运行函数?
- 2024-12-20 08:37:00
- admin 原创
- 54
问题描述:
我正在尝试在 Python 中并行运行多个函数。
我有类似的东西:
files.py
import common #common is a util class that handles all the IO stuff
dir1 = 'C:older1'
dir2 = 'C:older2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
def func1():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir1)
c.getFiles(dir1)
time.sleep(10)
c.removeFiles(addFiles[i], dir1)
c.getFiles(dir1)
def func2():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir2)
c.getFiles(dir2)
time.sleep(10)
c.removeFiles(addFiles[i], dir2)
c.getFiles(dir2)
我想调用 func1 和 func2 并让它们同时运行。这些函数彼此之间或同一对象之间不交互。现在我必须等待 func1 完成后才能启动 func2。我该如何做如下操作:
process.py
from files import func1, func2
runBothFunc(func1(), func2())
我希望能够同时创建两个目录,因为我每分钟都会计算创建的文件数量。如果目录不存在,我的计时就会被打乱。
解决方案 1:
您可以使用threading
或multiprocessing
。
由于CPython 的特性,threading
不太可能实现真正的并行性。因此,multiprocessing
通常是更好的选择。
这是一个完整的例子:
from multiprocessing import Process
def func1():
print("func1: starting")
for i in range(10000000):
pass
print("func1: finishing")
def func2():
print("func2: starting")
for i in range(10000000):
pass
print("func2: finishing")
if __name__ == "__main__":
p1 = Process(target=func1)
p1.start()
p2 = Process(target=func2)
p2.start()
p1.join()
p2.join()
启动/加入子进程的机制可以很容易地封装到如下函数中runBothFunc
:
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
runInParallel(func1, func2)
解决方案 2:
如果你的函数主要执行I/O 工作(较少的 CPU 工作)并且你拥有 Python 3.2+,那么你可以使用ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor
def run_io_tasks_in_parallel(tasks):
with ThreadPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()
run_io_tasks_in_parallel([
lambda: print('IO task 1 running!'),
lambda: print('IO task 2 running!'),
])
如果你的函数主要执行CPU 工作(较少的 I/O 工作)并且你拥有 Python 3.2+,那么你可以使用ProcessPoolExecutor:
from concurrent.futures import ProcessPoolExecutor
def run_cpu_tasks_in_parallel(tasks):
with ProcessPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()
def task_1():
print('CPU task 1 running!')
def task_2():
print('CPU task 2 running!')
if __name__ == '__main__':
run_cpu_tasks_in_parallel([
task_1,
task_2,
])
或者如果你只有 Python 2.6+,你可以直接使用多处理模块:
from multiprocessing import Process
def run_cpu_tasks_in_parallel(tasks):
running_tasks = [Process(target=task) for task in tasks]
for running_task in running_tasks:
running_task.start()
for running_task in running_tasks:
running_task.join()
def task_1():
print('CPU task 1 running!')
def task_2():
print('CPU task 2 running!')
if __name__ == '__main__':
run_cpu_tasks_in_parallel([
task_1,
task_2,
])
解决方案 3:
这可以通过Ray优雅地完成,该系统允许您轻松并行化和分发 Python 代码。
为了并行化您的示例,您需要用@ray.remote
装饰器定义您的函数,然后用调用它们.remote
。
import ray
ray.init()
dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
# Define the functions.
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
# func1() code here...
@ray.remote
def func2(filename, addFiles, dir):
# func2() code here...
# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)])
如果将相同的参数传递给两个函数,并且该参数很大,则更有效的方法是使用ray.put()
。这避免了对大参数进行两次序列化并创建它的两个内存副本:
largeData_id = ray.put(largeData)
ray.get([func1(largeData_id), func2(largeData_id)])
重要-如果func1()
返回func2()
结果,则需要重写代码如下:
ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])
与多处理模块相比,使用 Ray 有许多优势。特别是,同一段代码既可以在一台机器上运行,也可以在一组机器上运行。有关 Ray 的更多优势,请参阅此相关文章。
解决方案 4:
似乎你有一个需要调用两个不同参数的函数。这可以通过结合使用Python 3.2+concurrent.futures
和Python 3.2.1 来优雅地实现。map
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def sleep_secs(seconds):
time.sleep(seconds)
print(f'{seconds} has been processed')
secs_list = [2,4, 6, 8, 10, 12]
现在,如果你的操作是 IO 绑定的,那么你可以ThreadPoolExecutor
这样使用:
with ThreadPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)
注意map
这里是如何将map
你的函数添加到参数列表中的。
现在,如果你的函数是 CPU 密集型的,那么你可以使用ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)
如果您不确定,您可以简单地尝试两者,看看哪一个能给您更好的结果。
最后,如果您想打印出结果,您可以简单地执行以下操作:
with ThreadPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)
for result in results:
print(result)
解决方案 5:
在 2021 年,最简单的方法是使用 asyncio:
import asyncio, time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.create_task(
say_after(4, 'hello'))
task2 = asyncio.create_task(
say_after(3, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
参考:
[1] https://docs.python.org/3/library/asyncio-task.html
解决方案 6:
如果您是 Windows 用户并使用 Python 3,那么这篇文章将帮助您在 Python 中进行并行编程。当您运行常用的多处理库的池编程时,您将收到有关程序中主函数的错误。这是因为 Windows 没有 fork() 功能。下面的文章给出了上述问题的解决方案。
http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html
因为我用的是python3,所以我对程序做了一些修改,如下:
from types import FunctionType
import marshal
def _applicable(*args, **kwargs):
name = kwargs['__pw_name']
code = marshal.loads(kwargs['__pw_code'])
gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
defs = marshal.loads(kwargs['__pw_defs'])
clsr = marshal.loads(kwargs['__pw_clsr'])
fdct = marshal.loads(kwargs['__pw_fdct'])
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
del kwargs['__pw_name']
del kwargs['__pw_code']
del kwargs['__pw_defs']
del kwargs['__pw_clsr']
del kwargs['__pw_fdct']
return func(*args, **kwargs)
def make_applicable(f, *args, **kwargs):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
kwargs['__pw_name'] = f.__name__ # edited
kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited
kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited
kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited
kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited
return _applicable, args, kwargs
def _mappable(x):
x,name,code,defs,clsr,fdct = x
code = marshal.loads(code)
gbls = globals() #gbls = marshal.loads(gbls)
defs = marshal.loads(defs)
clsr = marshal.loads(clsr)
fdct = marshal.loads(fdct)
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
return func(x)
def make_mappable(f, iterable):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
name = f.__name__ # edited
code = marshal.dumps(f.__code__) # edited
defs = marshal.dumps(f.__defaults__) # edited
clsr = marshal.dumps(f.__closure__) # edited
fdct = marshal.dumps(f.__dict__) # edited
return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)
经过这个函数之后,上面的问题代码也稍微改变了一下,如下所示:
from multiprocessing import Pool
from poolable import make_applicable, make_mappable
def cube(x):
return x**3
if __name__ == "__main__":
pool = Pool(processes=2)
results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
print([result.get(timeout=10) for result in results])
我得到的输出为:
[1, 8, 27, 64, 125, 216]
我认为这篇文章可能对一些 Windows 用户有用。
解决方案 7:
没有办法保证两个函数能够同步执行,这似乎正是您想要做的。
您能做的最好的事情是将功能分成几个步骤,然后等待两个步骤在关键同步点完成,Process.join
就像@aix 的答案提到的那样。
这比无法保证准确时间要好time.sleep(10)
。通过明确等待,您说函数必须先执行完该步骤,然后才能转到下一步,而不是假设它会在 10ms 内完成,而根据机器上正在发生的其他情况,这无法保证。
解决方案 8:
仅举一个简单的相关示例。如果要将函数并行应用于整数数组,可以使用ProcessPoolExecutor
的map
方法。这是一个简单的例子:
from concurrent.futures import ProcessPoolExecutor
def square(n):
return n * n
if __name__ == "__main__":
numbers = [1, 2, 3, 4, 5]
with ProcessPoolExecutor() as executor:
results = list(executor.map(square, numbers))
print("Squares:", results)
在此示例中,square
函数应用于numbers
数组的每个元素。结果收集在results
列表中,保持输入数组的顺序。
解决方案 9:
(关于如何在 python 中同时运行两个(或更多)函数? )
使用asyncio
,同步/异步任务可以通过以下方式同时运行:
import asyncio
import time
def function1():
# performing blocking tasks
while True:
print("function 1: blocking task ...")
time.sleep(1)
async def function2():
# perform non-blocking tasks
while True:
print("function 2: non-blocking task ...")
await asyncio.sleep(1)
async def main():
loop = asyncio.get_running_loop()
await asyncio.gather(
# https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
loop.run_in_executor(None, function1),
function2(),
)
if __name__ == '__main__':
asyncio.run(main())
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)
- 项目管理必备:盘点2024年13款好用的项目管理软件