多处理 vs 多线程 vs asyncio
- 2025-02-12 10:03:00
- admin 原创
- 85
问题描述:
我发现在 Python 3.4 中,有几个用于多处理/线程的不同库:多处理vs线程vs asyncio。
但我不知道该使用哪一个或“推荐哪一个”。它们的作用相同还是不同?如果是,哪一个用于什么?我想编写一个使用计算机中多核的程序。但我不知道应该学习哪个库。
解决方案 1:
总结
做出正确的选择:
我们已经介绍了最流行的并发形式。但问题仍然存在——什么时候应该选择哪一种?这实际上取决于用例。根据我的经验(和阅读),我倾向于遵循以下伪代码:
if io_bound:
if io_very_slow:
print("Use Asyncio")
else:
print("Use Threads")
else:
print("Multi Processing")
CPU 限制 => 多处理
I/O 受限、快速 I/O、连接数有限 => 多线程
I/O 受限、I/O 速度慢、连接数多 => Asyncio
参考
[笔记]:
如果您有一个长调用方法(例如包含睡眠时间或延迟 I/O 的方法),最好的选择是asyncio,Twisted或Tornado方法(协同方法),以单线程作为并发工作。
asyncio适用于Python3.4及更高版本。
Tornado和Twisted自Python2.7起已准备就绪
uvloop是超快速
asyncio
事件循环(uvloop速度提高asyncio
2-4 倍)。
[更新(2019)]:
Japranto (GitHub)是一个基于uvloop的非常快速的流水线 HTTP 服务器。
[更新(2024)]:
concurrent.futures
:提供使用线程或进程异步执行可调用函数的高级接口。
解决方案 2:
它们用于(略微)不同的目的和/或要求。CPython(一种典型的主流 Python 实现)仍然具有全局解释器锁,因此多线程应用程序(当今实现并行处理的标准方式)不是最佳的。这就是为什么multiprocessing
may比 更受欢迎threading
。但并非每个问题都可以有效地分成 [几乎独立的] 部分,因此可能需要进行繁重的进程间通信。这就是为什么may总体上multiprocessing
不受欢迎的原因。threading
asyncio
(这种技术不仅在 Python 中可用,其他语言和/或框架也有,例如Boost.ASIO)是一种有效处理来自多个同时源的大量 I/O 操作的方法,无需并行执行代码。因此,它只是针对特定任务的解决方案(确实是一个好解决方案!),而不是针对一般的并行处理。
解决方案 3:
在多处理中,您可以利用多个 CPU 来分配计算。由于每个 CPU 都并行运行,因此您可以同时运行多个任务。您可能希望使用多处理来执行CPU 密集型任务。例如,尝试计算一个大列表中所有元素的总和。如果您的机器有 8 个内核,您可以将列表“切分”为 8 个较小的列表,并在不同的内核上分别计算每个列表的总和,然后将这些数字相加。这样做将获得约 8 倍的速度提升。
在 (多)线程中,您不需要多个 CPU。想象一下一个向 Web 发送大量 HTTP 请求的程序。如果您使用单线程程序,它会在每个请求时停止执行 (阻止),等待响应,然后在收到响应后继续。这里的问题是您的 CPU 在等待某个外部服务器完成工作时并没有真正工作;它实际上可以在此期间做一些有用的工作!解决方法是使用线程 - 您可以创建许多线程,每个线程负责从 Web 请求一些内容。线程的好处是,即使它们在一个 CPU 上运行,CPU 也会不时“冻结”一个线程的执行并跳转到执行另一个线程(这称为上下文切换,并且它会以非确定性间隔不断发生)。因此,如果您的任务是I/O 绑定的- 使用线程。
asyncio本质上是一种线程,其中不是 CPU,而是你,作为程序员(或者实际上是你的应用程序),决定在何时何地进行上下文切换。在 Python 中,你可以使用await
关键字来暂停协程的执行(使用关键字定义async
)。
解决方案 4:
这是基本思想:
它是IO绑定的吗? -----------> 使用
asyncio
它占用大量CPU吗? ---------> 使用
multiprocessing
其他?---------------------->使用
threading
因此,除非遇到 IO/CPU 问题,否则基本上坚持线程。
解决方案 5:
许多答案都建议如何只选择 1 个选项,但为什么不能使用所有 3 个选项呢?在这个答案中,我解释了如何使用这 3 种并发形式asyncio
进行管理,以及在需要时如何在它们之间轻松切换。
简短的回答
许多初次接触 Python 并发的开发人员最终会使用processing.Process
和threading.Thread
。但是,这些是低级 API,已被concurrent.futures
模块提供的高级 API 合并在一起。此外,生成进程和线程会产生开销,例如需要更多内存,这个问题困扰着我下面展示的一个示例。在一定程度上,为concurrent.futures
您管理这个问题,这样您就不会像生成一千个进程并让您的计算机崩溃一样轻易地发生这种情况,因为您只生成几个进程,然后在每次完成一个进程时重新使用这些进程。
这些高级 API 通过 提供,然后由和concurrent.futures.Executor
实现。在大多数情况下,您应该使用这些 API 而不是和,因为将来使用时更容易从一个切换到另一个,并且您不必了解每个 API 的详细差异。concurrent.futures.ProcessPoolExecutor
`concurrent.futures.ThreadPoolExecutormultiprocessing.Process
threading.Thread`concurrent.futures
由于它们共享统一的接口,您还会发现使用multiprocessing
或threading
经常使用的代码也不例外,并提供了通过以下代码使用它的方法concurrent.futures
:asyncio
import asyncio
from concurrent.futures import Executor
from functools import partial
from typing import Any, Callable, Optional, TypeVar
T = TypeVar("T")
async def run_in_executor(
executor: Optional[Executor],
func: Callable[..., T],
/,
*args: Any,
**kwargs: Any,
) -> T:
"""
Run `func(*args, **kwargs)` asynchronously, using an executor.
If the executor is None, use the default ThreadPoolExecutor.
"""
return await asyncio.get_running_loop().run_in_executor(
executor,
partial(func, *args, **kwargs),
)
# Example usage for running `print` in a thread.
async def main():
await run_in_executor(None, print, "O" * 100_000)
asyncio.run(main())
事实上,使用threading
withasyncio
非常普遍,因此在 Python 3.9 中他们添加了asyncio.to_thread(func, *args, **kwargs)
缩写以将其作为默认值ThreadPoolExecutor
。
详细答案
这种方法有什么缺点吗?
是的。使用asyncio
,最大的缺点是异步函数与同步函数不同。如果您从一开始asyncio
就没有考虑到这一点,这可能会让很多新用户感到困惑,并导致大量返工。asyncio
另一个缺点是,使用您代码的用户也会被迫使用asyncio
。所有这些必要的返工通常会让首次使用asyncio
的用户感到非常不爽。
这有什么非性能方面的优势吗?
是的。类似于使用concurrent.futures
优于threading.Thread
和multiprocessing.Process
的统一接口,这种方法可以被视为从Executor
到异步函数的进一步抽象。您可以先使用asyncio
,如果后来发现需要其中的一部分threading
或multiprocessing
,则可以使用asyncio.to_thread
或run_in_executor
。同样,您以后可能会发现您尝试使用线程运行的异步版本已经存在,因此您可以轻松地放弃使用threading
并改用asyncio
。
这有什么性能优势吗?
是的……也不是。最终这取决于任务。在某些情况下,它可能没有帮助(尽管它可能没有伤害),而在其他情况下它可能有很大帮助。这个答案的其余部分提供了一些解释,说明为什么使用asyncio
运行Executor
可能会有好处。
- 组合多个执行器和其他异步代码
asyncio
本质上提供了对并发的更多控制,但代价是您需要更多地控制并发。如果您想同时运行使用 的代码和ThreadPoolExecutor
使用 的其他代码ProcessPoolExecutor
,使用同步代码管理起来并不容易,但使用 就很容易了asyncio
。
import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
async def with_processing():
with ProcessPoolExecutor() as executor:
tasks = [...]
for task in asyncio.as_completed(tasks):
result = await task
...
async def with_threading():
with ThreadPoolExecutor() as executor:
tasks = [...]
for task in asyncio.as_completed(tasks):
result = await task
...
async def main():
await asyncio.gather(with_processing(), with_threading())
asyncio.run(main())
它是如何工作的?本质上asyncio
要求执行器运行它们的函数。然后,当执行器运行时,asyncio
将运行其他代码。例如,ProcessPoolExecutor
启动一组进程,然后在等待这些进程完成时,ThreadPoolExecutor
启动一组线程。asyncio
然后会在这些执行器完成后检查它们并收集它们的结果。此外,如果您有其他使用的代码asyncio
,您可以在等待进程和线程完成时运行它们。
- 缩小需要执行器的代码部分的范围
代码中有许多执行器的情况并不常见,但我见过人们在使用线程/进程时遇到的一个常见问题是,他们会将整个代码塞进一个线程/进程中,并期望它能正常工作。例如,我曾经看到过以下代码(大致):
from concurrent.futures import ThreadPoolExecutor
import requests
def get_data(url):
return requests.get(url).json()["data"]
urls = [...]
with ThreadPoolExecutor() as executor:
for data in executor.map(get_data, urls):
print(data)
这段代码的有趣之处在于,有并发时比没有并发时慢。为什么?因为结果json
很大,而许多线程消耗大量内存是灾难性的。幸运的是,解决方案很简单:
from concurrent.futures import ThreadPoolExecutor
import requests
urls = [...]
with ThreadPoolExecutor() as executor:
for response in executor.map(requests.get, urls):
print(response.json()["data"])
现在一次只有一个json
被卸载到内存中,一切正常。
这里的教训是什么?
您不应该尝试将所有代码都放入线程/进程中,而应该关注代码的哪部分真正需要并发。
但是,如果get_data
函数不是像这个例子这么简单呢?如果我们必须在函数的中间某个地方应用执行器会怎么样?这就是asyncio
需要用到的地方:
import asyncio
import requests
async def get_data(url):
# A lot of code.
...
# The specific part that needs threading.
response = await asyncio.to_thread(requests.get, url, some_other_params)
# A lot of code.
...
return data
urls = [...]
async def main():
tasks = [get_data(url) for url in urls]
for task in asyncio.as_completed(tasks):
data = await task
print(data)
asyncio.run(main())
尝试使用相同的方法concurrent.futures
绝对不是什么好事。您可以使用诸如回调、队列等方法,但与基本代码相比,管理起来要困难得多asyncio
。
解决方案 6:
我不是专业的 Python 用户,但作为计算机架构专业的学生,我认为我可以分享一些在多处理和多线程之间进行选择时的考虑因素。此外,其他一些答案(即使是那些投票率较高的答案)也滥用了技术术语,所以我认为也有必要对这些术语进行一些澄清,我会先这样做。
多处理和多线程之间的根本区别在于它们是否共享相同的内存空间。线程共享对同一虚拟内存空间的访问,因此线程可以高效且轻松地交换其计算结果(零拷贝,完全在用户空间执行)。
另一方面,进程具有单独的虚拟内存空间。它们不能直接读取或写入其他进程的内存空间,就像一个人不能在不与他人交谈的情况下读懂或改变他人的想法一样。(允许这样做会违反内存保护并违背使用虚拟内存的目的。)要在进程之间交换数据,它们必须依赖操作系统的功能(例如消息传递),并且由于多种原因,这比线程使用的“共享内存”方案成本更高。一个原因是调用操作系统的消息传递机制需要进行系统调用,从而将代码执行从用户模式切换到内核模式,这很耗时;另一个原因可能是操作系统消息传递方案必须将数据字节从发送者的内存空间复制到接收者的内存空间,因此复制成本不为零。
说多线程程序只能使用一个 CPU 是错误的。许多人之所以这么说,是因为 CPython 实现的一个产物:全局解释器锁 (GIL)。由于 GIL,CPython 进程中的线程是序列化的。因此,看起来多线程 Python 程序只使用一个 CPU。
但一般来说,多线程计算机程序并不局限于一个核心,对于 Python 来说,不使用 GIL 的实现确实可以并行运行多个线程,也就是说,同时在多个 CPU 上运行。(请参阅https://wiki.python.org/moin/GlobalInterpreterLock)。
鉴于 CPython 是 Python 的主要实现,因此可以理解为什么多线程 Python 程序通常等同于绑定到单个核心。
对于具有 GIL 的 Python,释放多核能力的唯一方法是使用多处理(下面提到了例外情况)。但您的问题最好能够轻松划分为具有最少相互通信的并行子问题,否则将不得不进行大量的进程间通信,并且如上所述,使用操作系统的消息传递机制的开销将非常高昂,有时甚至高昂到完全抵消了并行处理的好处。如果您的问题的性质需要并发例程之间的密集通信,那么多线程是自然的选择。不幸的是,由于 GIL,使用 CPython 无法实现真正的、有效的并行多线程。在这种情况下,您应该意识到 Python 不是您项目的最佳工具,并考虑使用其他语言。
还有一种替代解决方案,即在用 C(或其他语言)编写的外部库中实现并发处理例程,并将该模块导入 Python。CPython GIL 不会费心阻止该外部库生成的线程。
那么,有了 GIL 的负担,CPython 中的多线程有什么好处吗?不过,正如其他答案所提到的,如果您正在进行 IO 或网络通信,它仍然会带来好处。在这些情况下,相关计算不是由您的 CPU 完成的,而是由其他设备完成的(在 IO 的情况下,磁盘控制器和 DMA(直接内存访问)控制器将以最少的 CPU 参与传输数据;在网络的情况下,NIC(网络接口卡)和 DMA 将在没有 CPU 参与的情况下处理大部分任务),因此一旦线程将此类任务委托给 NIC 或磁盘控制器,操作系统就可以将该线程置于休眠状态并切换到同一程序的其他线程来执行有用的工作。
在我的理解中,asyncio模块本质上是IO操作多线程的一个特例。
因此:CPU 密集型程序可以轻松地划分为在具有有限通信的多个进程上运行:如果不存在 GIL,则使用多线程(例如 Jython),如果存在 GIL,则使用多进程(例如 CPython)。
CPU 密集型程序,需要并发例程之间进行密集通信:如果不存在 GIL,请使用多线程,或使用其他编程语言。
大量 IO:asyncio
解决方案 7:
已经有很多好答案了。无法详细说明何时使用每个答案。这是一个更有趣的组合。多处理 + asyncio:https://pypi.org/project/aiomultiprocess/。
它的设计用例是 highio,但仍然利用尽可能多的可用核心。Facebook 使用这个库编写了某种基于 Python 的文件服务器。Asyncio 允许 IO 绑定流量,但多处理允许多个核心上的多个事件循环和线程。
来自 repo 的 Ex 代码:
import asyncio
from aiohttp import request
from aiomultiprocess import Pool
async def get(url):
async with request("GET", url) as response:
return await response.text("utf-8")
async def main():
urls = ["https://jreese.sh", ...]
async with Pool() as pool:
async for result in pool.map(get, urls):
... # process result
if __name__ == '__main__':
# Python 3.7
asyncio.run(main())
# Python 3.6
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
补充一下,在 jupyter notebook 中效果不太好,因为 notebook 已经有一个 asyncio 循环在运行。这只是一个小提示,让你不要抓狂。
解决方案 8:
多处理可以并行运行。
多线程和asyncio不能并行运行。
使用Intel(R) Core(TM) i7-8700K CPU @ 3.70GHz和32.0 GB RAM ,我使用2 个进程、2 个线程和2 个 asyncio 任务2
计算了和100000
之间有多少个素数,如下所示。 *这是CPU 密集型计算:
多处理 | 多线程 | 异步 |
---|---|---|
23.87 秒 | 45.24 秒 | 44.77 秒 |
因为多处理可以并行运行,所以多处理比多线程和异步快两倍,如上所示。
我使用了以下 3 组代码:
多处理:
# "process_test.py"
from multiprocessing import Process
import time
start_time = time.time()
def test():
num = 100000
primes = 0
for i in range(2, num + 1):
for j in range(2, i):
if i % j == 0:
break
else:
primes += 1
print(primes)
if __name__ == "__main__": # This is needed to run processes on Windows
process_list = []
for _ in range(0, 2): # 2 processes
process = Process(target=test)
process_list.append(process)
for process in process_list:
process.start()
for process in process_list:
process.join()
print(round((time.time() - start_time), 2), "seconds") # 23.87 seconds
结果:
...
9592
9592
23.87 seconds
多线程:
# "thread_test.py"
from threading import Thread
import time
start_time = time.time()
def test():
num = 100000
primes = 0
for i in range(2, num + 1):
for j in range(2, i):
if i % j == 0:
break
else:
primes += 1
print(primes)
thread_list = []
for _ in range(0, 2): # 2 threads
thread = Thread(target=test)
thread_list.append(thread)
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
print(round((time.time() - start_time), 2), "seconds") # 45.24 seconds
结果:
...
9592
9592
45.24 seconds
异步:
# "asyncio_test.py"
import asyncio
import time
start_time = time.time()
async def test():
num = 100000
primes = 0
for i in range(2, num + 1):
for j in range(2, i):
if i % j == 0:
break
else:
primes += 1
print(primes)
async def call_tests():
tasks = []
for _ in range(0, 2): # 2 asyncio tasks
tasks.append(test())
await asyncio.gather(*tasks)
asyncio.run(call_tests())
print(round((time.time() - start_time), 2), "seconds") # 44.77 seconds
结果:
...
9592
9592
44.77 seconds
解决方案 9:
只是为了在和之间的比较中添加一个代码示例asyncio
,multithreading
因为我在这篇文章中没有看到:
asyncio
这是运行后输出确定性的代码
import asyncio
async def foo():
print('Start foo()')
for x in range(10):
await asyncio.sleep(0.1)
print(x, "foooo", x, "foooo",)
print('End foo()')
async def bar():
print('Start bar()')
for x in range(10):
await asyncio.sleep(0.1)
print(x, "barrr", x, "barrr",)
print('End bar()')
async def main():
await asyncio.gather(foo(), bar())
asyncio.run(main())
输出:
Start foo()
Start bar()
0 foooo 0 foooo
0 barrr 0 barrr
1 foooo 1 foooo
1 barrr 1 barrr
2 foooo 2 foooo
2 barrr 2 barrr
3 foooo 3 foooo
3 barrr 3 barrr
4 foooo 4 foooo
4 barrr 4 barrr
5 foooo 5 foooo
5 barrr 5 barrr
6 foooo 6 foooo
6 barrr 6 barrr
7 foooo 7 foooo
7 barrr 7 barrr
8 foooo 8 foooo
8 barrr 8 barrr
9 foooo 9 foooo
End foo()
9 barrr 9 barrr
End bar()
与运行的此代码相比multithreading
,输出不确定,并且会在运行之间发生变化
import threading
import time
def foo():
print('Start foo()')
for x in range(10):
time.sleep(0.1)
print(x, "foooo", x, "foooo",)
print('End foo()')
def bar():
print('Start bar()')
for x in range(10):
time.sleep(0.1)
print(x, "barrr", x, "barrr",)
print('End bar()')
t1 = threading.Thread(target=foo)
t2 = threading.Thread(target=bar)
t1.start()
t2.start()
t1.join()
t2.join()
输出:
Start bar()Start foo()
0 0 foooo 0 foooo
barrr 0 barrr
11 foooo barrr 11 foooobarrr
22 foooobarrr 22 barrr
foooo
3 3 barrr foooo3 3 foooobarrr
44 barrr 4 barrr
foooo 4 foooo
55 barrr foooo5 5barrr
foooo
66 foooo 6 barrr foooo
6 barrr
7 7 foooo 7 foooo
barrr 7 barrr
88 foooo 8 foooo
barrr 8 barrr
99 foooo barrr 99 foooobarrr
End foo()
End bar()
在multithreading
上下文中切换会自动发生,并且asyncio
只有在 await 语句之后才会发生上下文切换。
还要注意,在asyncio
没有await asyncio.sleep(0.1)
代码的示例中,代码的行为将像正常的同步代码一样,但multithreading
即使没有,示例中的代码也将保持异步time.sleep
解决方案 10:
如果您想同时执行大量 IO 任务(并发),请使用 Asyncio;如果您想并行使用多个 CPU 内核(并行),请使用 Multiprocessing。
由于全局解释器锁,Python 中的线程具有线程模型的所有缺点,而没有任何优点(由于 GIL,只有一个线程可以真正执行 Python 代码,所以线程实际上无法进行并行)。
如果您发现异步正在感染非 IO 重点部分的代码,请构建您的应用程序,以便您拥有一个子进程,其唯一职责是 IO 并在其中使用异步 IO,以及另一个不执行 IO(因此没有异步)的进程,并让这两个进程通过多处理模块中的队列进行通信。这可以为您提供一个摆脱异步 IO 的良好出口。
您还可以使用两个以上的进程,在这些情况下,尝试编写执行不同部分工作的进程(最好形成管道或 DAG),这通常比尝试让并行进程在执行相同操作时互相绊倒要容易得多。如果您的任务很容易并行化,请检查是否可以对其进行矢量化/批处理并卸载到快速库中,或者您是否可以在并行化之前(而不是之后)改进算法。不要试图通过投入更多资源来改进低效的程序。
解决方案 11:
多处理
每个进程都有自己的 Python 解释器,可以在处理器的单独核心上运行。Python 多处理是一个支持使用类似于线程模块的 API 生成进程的包。多处理包提供真正的并行性,通过使用子进程而不是线程有效地避开了全局解释器锁。
当您有 CPU 密集型任务时,请使用多处理。
多线程
Python 多线程允许您在进程内生成多个线程。这些线程可以共享进程的相同内存和资源。在 CPython 中,由于全局解释器锁定,在任何给定时间只有一个线程可以运行,因此您无法使用多个核心。由于 GIL 限制,Python 中的多线程不提供真正的并行性。
Asyncio
Asyncio 采用协作式多任务概念。Asyncio 任务在同一个线程上运行,因此不存在并行性,但它为开发人员提供了更好的控制,而不是像多线程中的操作系统那样。
此链接上有一篇关于 asyncio 相对于线程的优势的很好的讨论。
Lei Mao 有一篇关于 Python 并发的精彩博客
Python 中的多处理 VS 线程 VS AsyncIO 摘要
解决方案 12:
只是另一种观点
多线程和 asyncio 的并发性质有所不同。线程可以在任何执行点交错。操作系统控制何时踢出一个线程并给另一个线程一个机会(分配 CPU)。线程交错的时间没有一致性和可预测性。这就是为什么在多线程中会出现竞争条件。但是,只要您没有等待某事,asyncio 就是同步的。事件循环将继续执行,直到await
您可以清楚地看到协程交错的位置。当协程正在等待时,事件循环将踢出协程。从这个意义上说,多线程是一个“真正的”并发模型。正如我所说,除非您没有等待,否则 asyncio 不是并发的。我并不是说 asyncio 更好或更坏。
# Python 3.9.6
import asyncio
import time
async def test(name: str):
print(f"sleeping: {name}")
time.sleep(3) # imagine that this is big chunk of code/ or a number crunching block that takes a while to execute
print(f"awaiting sleep: {name}")
await asyncio.sleep(2)
print(f"woke up: {name}")
async def main():
print("In main")
tasks = [test(name="1"), test(name="2"), test(name="3")]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
输出:
In main
sleeping: 1
awaiting sleep: 1
sleeping: 2
awaiting sleep: 2
sleeping: 3
awaiting sleep: 3
woke up: 1
woke up: 2
woke up: 3
您可以看到顺序是可预测的,并且始终相同且同步。没有交错。而使用多线程时,您无法预测顺序(始终不同)。
解决方案 13:
许多人声称 asyncio 应该用于 I/O 密集型任务。然而,事实可能并非如此。我编写了一个简单的基准测试,并观察到对于 I/O 密集型任务,asyncio 和多线程的性能非常相似。特别是当 IO 特别慢时,两者之间几乎没有区别。
事实上,我认为只有在处理非常复杂的状态共享和争用时,asyncio 才是必要的。否则,如果使用 asyncio 会严重影响编码风格,最好避免使用它。