FastAPI 中的 run_in_threadpool() 和 run_in_executor() 之间的性能结果有所不同
- 2025-01-21 09:01:00
- admin 原创
- 91
问题描述:
这是我的 FastAPI 应用程序的一个最小可重现示例。我有一个奇怪的行为,我不确定我是否理解原因。
我正在使用 ApacheBench ( ab
) 发送多个请求,如下所示:
ab -n 1000 -c 50 -H 'accept: application/json' -H 'x-data-origin: source' 'http://localhost:8001/test/async'
FastAPI 应用
import time
import asyncio
import enum
from typing import Any
from fastapi import FastAPI, Path, Body
from starlette.concurrency import run_in_threadpool
app = FastAPI()
loop = asyncio.get_running_loop()
def sync_func() -> None:
time.sleep(3)
print("sync func")
async def sync_async_with_fastapi_thread() -> None:
await run_in_threadpool( time.sleep, 3)
print("sync async with fastapi thread")
async def sync_async_func() -> None:
await loop.run_in_executor(None, time.sleep, 3)
async def async_func() -> Any:
await asyncio.sleep(3)
print("async func")
@app.get("/test/sync")
def test_sync() -> None:
sync_func()
print("sync")
@app.get("/test/async")
async def test_async() -> None:
await async_func()
print("async")
@app.get("/test/sync_async")
async def test_sync_async() -> None:
await sync_async_func()
print("sync async")
@app.get("/test/sync_async_fastapi")
async def test_sync_async_with_fastapi_thread() -> None:
await sync_async_with_fastapi_thread()
print("sync async with fastapi thread")
以下是 ApacheBench 的结果:
异步(asyncio.sleep):*并发级别:50
测试时间:63.528 秒
完成请求:1000
失败请求:0
总共传输:128000 字节
HTML 传输:4000 字节
每秒请求数:15.74 [#/秒](平均)
每个请求的时间:3176.407 [毫秒](平均)
每个请求的时间:63.528 [毫秒](所有并发请求的平均值)传输速率:1.97 [千字节/秒] 已接收*
同步(与 time.sleep):
并发级别:50
*测试时间:78.615 秒
完成请求:1000
失败请求:0
总共传输:128000 字节
HTML 传输:4000 字节
每秒请求数:12.72 [#/秒](平均)
每个请求的时间:3930.751 [毫秒](平均)
每个请求的时间:78.615 [毫秒](所有并发请求的平均值)传输速率:1.59 [千字节/秒] 已接收*
sync_async(使用 run_in_executor 的时间睡眠): *并发级别:50
测试时间:256.201 秒
完成请求:1000
失败请求:0
总共传输:128000 字节
HTML 传输:4000 字节
每秒请求数:3.90 [#/秒](平均)
每个请求的时间:12810.038 [毫秒](平均)
每个请求的时间:256.201 [毫秒](所有并发请求的平均值)传输速率:0.49 [千字节/秒] 已接收*
sync_async_fastapi(使用 run_in 线程池进行时间休眠):
*并发级别:50
测试时间:78.877 秒
完成请求:1000
失败请求:0
总共传输:128000 字节
HTML 传输:4000 字节
每秒请求数:12.68 [#/秒](平均)
每个请求的时间:3943.841 [毫秒](平均)
每个请求的时间:78.877 [毫秒](所有并发请求的平均值)传输速率:1.58 [千字节/秒] 已接收*
总之,我遇到了令人惊讶的结果差异,尤其是在使用时run_in_executor
,我遇到了明显更高的平均时间(12 秒)。我不明白这个结果。
--- 编辑 ---
AKX 回答之后。
Here the code working as expected:
import time
import asyncio
from anyio import to_thread
to_thread.current_default_thread_limiter().total_tokens = 200
loop = asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=100)
def sync_func() -> None:
time.sleep(3)
print("sync func")
async def sync_async_with_fastapi_thread() -> None:
await run_in_threadpool( time.sleep, 3)
print("sync async with fastapi thread")
async def sync_async_func() -> None:
await loop.run_in_executor(executor, time.sleep, 3)
async def async_func() -> Any:
await asyncio.sleep(3)
print("async func")
@app.get("/test/sync")
def test_sync() -> None:
sync_func()
print("sync")
@app.get("/test/async")
async def test_async() -> None:
await async_func()
print("async")
@app.get("/test/sync_async")
async def test_sync_async() -> None:
await sync_async_func()
print("sync async")
@app.get("/test/sync_async_fastapi")
async def test_sync_async_with_fastapi_thread() -> None:
await sync_async_with_fastapi_thread()
print("sync async with fastapi thread")
解决方案 1:
使用run_in_threadpool()
FastAPI 与Starlette完全兼容(并基于 Starlette ),因此,使用 FastAPI 您可以获得 Starlette 的所有功能,例如run_in_threadpool()
。Starlette在幕后run_in_threadpool()
使用的“将在单独的线程中运行同步阻塞函数,以确保主线程(运行协程的地方)不会被阻塞”——有关更多详细信息,请参阅此答案和 AnyIO 的使用线程文档。anyio.to_thread.run_sync()
调用run_in_threadpool()
— 在内部调用anyio.to_thread.run_sync()
,随后 —AsyncIOBackend.run_sync_in_worker_thread()
将返回一个协程,然后该协程将被await
编辑以获取同步函数的最终结果(例如result = await run_in_threadpool(...)
),因此,FastAPI 仍将异步工作(而不是直接在事件循环中调用该同步函数,在这种情况下,会阻塞事件循环,从而阻塞主线程)。
从 Starlette 的源代码(上面给出了链接)中可以看出,该run_in_threadpool()
函数简单如下所示(支持序列和关键字参数):
async def run_in_threadpool(
func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> T:
if kwargs: # pragma: no cover
# run_sync doesn't accept 'kwargs', so bind them in here
func = functools.partial(func, **kwargs)
return await anyio.to_thread.run_sync(func, *args)
正如AnyIO 的文档中所述:
调整默认最大工作线程数
默认的AnyIO 工作线程限制器的值为,这意味着任何没有明确参数
40
的调用都
将导致产生最大数量的线程。您可以像这样调整此限制:to_thread.run_sync()
`limiter**
40`**from anyio import to_thread async def foo(): # Set the maximum number of worker threads to 60 to_thread.current_default_thread_limiter().total_tokens = 60
笔记
AnyIO 的默认线程池限制器不会影响 上的默认线程池执行器
asyncio
。
由于FastAPI 使用 Startlette 的concurrency
模块在外部线程池中运行阻塞函数(FastAPI 也使用相同的def
线程池来运行用 normal而不是定义的端点async def
,如本答案中所述),线程限制器的默认值(如上所示)也适用于此处,即线程最大值 - 请参阅返回具有默认线程数的40
相关方法。因此,像您的情况一样,同时发送请求会导致线程池不足,这意味着线程池中没有足够的线程来同时处理所有传入的请求。AsyncIOBackend.current_default_thread_limiter()
`CapacityLimiter`50
如前所述,可以通过增加线程数来调整该值,这可能会导致性能结果的改善——始终取决于def
您的 API 预计同时服务的端点(或async def
对内部进行调用的端点)的请求数run_in_threadpool()
。例如,如果您希望 API 一次向此类端点提供不超过 50 个请求,则将最大线程数设置为 50。请注意,除了端点之外def
,如果您的 FastAPI 应用程序还使用同步/阻塞 后台任务和/或StreamingResponse
的生成器和/或依赖项(同步/阻塞函数是指用 normaldef
而不是定义的函数async def
),甚至是UploadFile
的async
方法,例如await file.read()
/ await file.close()
/等(它们都调用相应的同步 def
文件方法,run_in_threadpool()
在后台使用),那么您可以根据需要增加线程数,因为 FastAPI 实际上也在同一个外部线程池中运行所有这些函数——本答案对此进行了详细解释。
请注意,使用下面描述的方法对调整工作线程的数量具有相同的效果:
from anyio.lowlevel import RunVar
from anyio import CapacityLimiter
RunVar("_default_thread_limiter").set(CapacityLimiter(60))
但是,最好遵循 AnyIO 官方文档提供的方法(如前所示)。在应用程序启动时使用lifespan
事件处理程序完成此操作也是一个好主意,如此处所示。
在下面的工作示例中,由于/sync
端点是用 normaldef
而不是定义的async def
,因此 FastAPI 将在与外部线程池不同的线程中运行它,然后运行await
它,从而确保事件循环(以及主线程和整个服务器)不会由于将在该端点内执行的阻塞操作(阻塞 IO 绑定或 CPU 绑定)而被阻塞。
工作示例1
from fastapi import FastAPI
from contextlib import asynccontextmanager
from anyio import to_thread
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
to_thread.current_default_thread_limiter().total_tokens = 60
yield
app = FastAPI(lifespan=lifespan)
@app.get("/sync")
def test_sync() -> None:
time.sleep(3)
print("sync")
@app.get('/get_available_threads')
async def get_available_threads():
return to_thread.current_default_thread_limiter().available_tokens
使用 ApacheBench,您可以按如下方式测试上述示例,它将1000
总共发送请求并50
同时发送(-n
:请求数,-c
:并发请求数):
ab -n 1000 -c 50 "http://localhost:8000/sync"
在对上述示例运行性能测试时,如果您/get_available_threads
从浏览器调用端点,例如,您会看到可用的http://localhost:8000/get_available_threads
线程数始终为 10 或更高(因为在此测试中一次仅使用 50 个线程,但线程限制器设置为),这意味着将 AnyIO 线程限制器上的最大线程数设置为远高于您需求的数字,如其他答案和您最近的示例中所示,不会带来任何性能改进;相反,您最终会得到一些“闲置”在那里而未被使用的线程。如前所述,最大线程数应取决于 (1) 您的 API 预计同时处理的请求数(即对端点的调用数或在内部调用的端点数),(2) FastAPI 本身将在线程池中运行的任何其他阻塞任务/函数,以及 (3) 服务器计算机的可用资源。60
`200def
async def`run_in_threadpool()
下面的示例与上面的示例相同,但是不是让 FastAPI 本身处理端点内的阻塞操作def
(通过def
在外部线程池中运行端点并await
对其进行操作),而是现在使用 定义端点async def
(这意味着 FastAPI 将直接在事件循环中运行它),并且在端点内部,run_in_threadpool()
(返回一个await
able)用于运行阻塞操作(即time.sleep()
在示例中)。对下面的示例执行基准测试将产生与上一个示例类似的结果。
工作示例2
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
from contextlib import asynccontextmanager
from anyio import to_thread
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
to_thread.current_default_thread_limiter().total_tokens = 60
yield
app = FastAPI(lifespan=lifespan)
@app.get("/sync_async_run_in_tp")
async def test_sync_async_with_run_in_threadpool() -> None:
await run_in_threadpool(time.sleep, 3)
print("sync_async using FastAPI's run_in_threadpool")
@app.get('/get_available_threads')
async def get_available_threads():
return to_thread.current_default_thread_limiter().available_tokens
使用 ApacheBench,您可以按如下方式测试上述示例:
ab -n 1000 -c 50 "http://localhost:8000/sync_async_run_in_tp"
使用loop.run_in_executor()
ThreadPoolExecutor
在使用 时asyncio
—在loop.run_in_executor()
使用 获取正在运行的事件循环后asyncio.get_running_loop()
— 可以传递None
给executor
参数,这将导致使用默认执行器;即ThreadPoolExecutor
。请注意,在调用loop.run_in_executor()
并传递None
给executor
参数时,这不会在每次调用时创建 的新实例ThreadPoolExecutor
,例如 。await loop.run_in_executor(None, time.sleep, 3)
相反,默认值ThreadPoolExecutor
仅在第一次执行此操作时初始化一次,但对于后续使用参数集 调用loop.run_in_executor()
,executor
PythonNone
会重用该 的同一个实例ThreadPoolExecutor
(因此为默认执行器)。这可以在的源代码loop.run_in_executor()
中看到。这意味着,调用 时可以创建的线程数await loop.run_in_executor(None, ...)
限制为类中默认的线程工作器数ThreadPoolExecutor
。
如文档中所述— 以及此处ThreadPoolExecutor
的实现所示— 默认情况下,参数设置为,在这种情况下,工作线程数基于以下公式设置:。该函数返回当前系统中的逻辑CPU数量。如本文所述,物理核心是指硬件(例如芯片)中提供的 CPU 核心数,而逻辑核心是考虑超线程后的CPU 核心数。例如,如果您的机器有 4 个物理核心,每个核心都具有超线程(大多数现代 CPU 都有),那么 Python 将看到 8 个 CPU 并默认向池分配 12 个线程(8 个 CPU + 4 个)(Python 将线程数限制为 32 以“避免在多核机器上消耗惊人的大量资源”;但是,当使用自定义时,人们总是可以自行调整参数,而不是使用默认参数)。您可以按如下方式检查系统上的默认工作线程数:max_workers
`Nonemin(32, os.cpu_count() + 4)
os.cpu_count()max_workers
ThreadPoolExecutor`
import concurrent.futures
# create a thread pool with the default number of worker threads
pool = concurrent.futures.ThreadPoolExecutor()
# report the number of worker threads chosen by default
# Note: `_max_workers` is a protected variable and may change in the future
print(pool._max_workers)
现在,如原始示例所示,您没有使用自定义的ThreadPoolExecutor
;而是在每次请求到达时通过调用(在由端点触发的函数内部)使用默认值。假设您的机器有 4 个物理核心并且启用了超线程(如前面的示例所述),那么 默认值的默认工作线程数为12。这意味着,基于原始示例和触发该函数的端点,您的应用程序一次只能处理 12 个并发请求。这是与使用 (默认情况下附带分配的线程)相比性能结果存在差异的主要原因。尽管在这两种情况下,同时发送请求时都会导致线程池匮乏,但使用的端点(在您的示例中)性能更好,只是因为创建的默认线程数大于默认值(在您的另一个端点中)使用的线程数。ThreadPoolExecutor
`await loop.run_in_executor(None, time.sleep, 3)sync_async_func()
/test/sync_async
ThreadPoolExecutor/test/sync_async
await loop.run_in_executor(None, time.sleep, 3)run_in_threadpool()
4050
run_in_threadpool()
ThreadPoolExecutor`
解决这个问题的一种方法是每次请求到达时创建一个新的实例ThreadPoolExecutor
(自己创建,而不是使用默认执行器),并在任务完成后终止它(使用with
语句),如下所示:
import concurrent.futures
import asyncio
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
await loop.run_in_executor(pool, time.sleep, 3)
尽管上述方法应该可以正常工作,但最好在应用程序启动时实例化ThreadPoolExecutor
一次,根据需要调整工作线程数,并在需要时重用该执行器。话虽如此,如果由于任何原因,您在重用时在完成任务后遇到内存泄漏 - 即不再需要但未释放的内存ThreadPoolExecutor
(可能是由于您可能用于该阻塞任务的外部库没有正确释放内存),您可能会发现ThreadPoolExecutor
每次创建一个新的实例(如上所示)更合适。但请注意,如果这是ProcessPoolExecutor
相反的,反复创建和销毁许多进程可能会变得计算成本高昂。创建和销毁太多线程也会消耗大量内存。
下面是一个完整的工作示例,演示了如何创建可重用的自定义ThreadPoolExecutor
。在使用 ApacheBench 运行性能测试(使用并发请求,如您的问题中所述和如下所示)时,/get_active_threads
从您的浏览器调用端点,例如,您会看到活动线程数从未超过(50 个并发线程 + 1,即主线程),尽管在下面的示例中将参数设置为。 这仅仅是因为,在这个性能测试中,应用程序永远不需要同时处理超过 个请求。 此外,如果有空闲线程可用,则不会产生新线程(从而节省资源) - 请参阅相关的实现部分。 因此,再次,如果您不希望 FastAPI 应用程序一次处理超过 50 个请求(到使用该请求的端点),则再次使用 初始化(如您最近的更新中所示)是不必要的。http://localhost:8000/get_active_threads
`5051
max_workers60
50ThreadPoolExecutor
ThreadPoolExecutormax_workers=100
ThreadPoolExecutor`
工作示例
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import concurrent.futures
import threading
import asyncio
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
pool = concurrent.futures.ThreadPoolExecutor(max_workers=60)
yield {'pool': pool}
pool.shutdown()
app = FastAPI(lifespan=lifespan)
@app.get("/sync_async")
async def test_sync_async(request: Request) -> None:
loop = asyncio.get_running_loop()
await loop.run_in_executor(request.state.pool, time.sleep, 3)
print("sync_async")
@app.get('/get_active_threads')
async def get_active_threads():
return threading.active_count()
使用 ApacheBench,您可以按如下方式测试上述示例:
ab -n 1000 -c 50 "http://localhost:8000/sync_async"
最后说明
一般来说,您应该始终尽可能使用异步代码(即使用async
/ await
),因为async
代码(也称为协程)直接在事件循环中运行- 事件循环在线程(通常是主线程)中运行并在其线程中执行所有回调和任务。 这意味着只有一个线程可以锁定解释器; 因此,避免了上下文切换的额外开销(即,CPU 从一个执行线程跳转到另一个执行线程)。 但是,在处理同步阻止 IO 绑定任务时,您可以(1)使用 定义端点def
并让 FastAPI 在后台处理它,如前所述,以及在此答案中,或者(2)使用 定义端点async def
并run_in_threadpool()
自行使用 在单独的线程中运行该阻塞任务await
,或者(3)使用 定义端点并使用async def
和自定义asyncio
的loop.run_in_executor()
(最好可重用)ThreadPoolExecutor
,根据需要调整工作线程的数量。当需要执行阻塞的 CPU 密集型任务时,虽然在与外部线程池不同的线程中运行此类任务,然后await
执行它们可以成功防止事件循环被阻塞,但它不会提供您期望通过并行运行代码获得的性能改进。因此,对于 CPU 密集型任务,可以选择使用loop.run_in_executor()
with — 相关示例可在此答案ProcessPoolExecutor
中找到(请注意,一般使用进程时,需要使用 明确保护入口点)。if __name__ == '__main__'
要在后台运行任务,而不必等待它们完成后再继续执行端点中的其余代码,您可以使用 FastAPI ,如此处和此处BackgroundTasks
所示。如果用定义后台任务函数,则 FastAPI 将直接在事件循环中运行它,而如果它是用 normal 定义的,则 FastAPI 将使用和返回的协程(与 API 端点相同的概念)。当您需要在后台运行一个函数,但不一定在返回 FastAPI 响应后触发它(在中就是这种情况)时,另一种选择是使用,如此处答案和此答案所示。如果您需要执行大量后台计算,并且不一定需要由同一进程运行它,则使用其他更大的工具(如 Celery)可能会受益。async def
`defrun_in_threadpool()
awaitasync def
BackgroundTasks`asyncio.create_task()
最后,关于最佳/最大工作线程数,我建议阅读这篇文章(也可以查看这篇文章ThreadPoolExecutor
以了解更多详细信息)。正如文章中解释的那样:
根据系统中的资源或您打算在任务中使用的资源数量,将线程池中的工作线程数量限制为您希望完成的异步任务的数量非常重要。
或者,考虑到您打算使用的资源容量更大,您可能希望大幅增加工作线程的数量。
[...]
系统中的线程数通常比 CPU(物理或逻辑)数多。原因是线程用于 IO 密集型任务,而不是 CPU 密集型任务。这意味着线程用于等待相对较慢的资源响应的任务,例如硬盘驱动器、DVD 驱动器、打印机、网络连接等等。
因此,根据您的具体需求,应用程序中有数十、数百甚至数千个线程并不罕见。拥有超过一个或几千个线程并不常见。如果您需要这么多线程,那么可能首选替代解决方案,例如。
AsyncIO
另外,在同一篇文章中:
线程数是否与
ThreadPoolExecutor
CPU 或核心数匹配?工作线程的数量与系统中的CPU 或 CPU 核心的数量
ThreadPoolExecutor
无关。您可以根据需要执行的任务数、可用的本地系统资源量(例如内存)以及您打算在任务中访问的资源限制(例如与远程服务器的连接)来配置线程数。
我应该使用多少个线程?
如果您有数百个任务,您可能应该将线程数设置为等于任务数。
如果您有数千个任务,则可能应该将线程数限制为数百或 1,000 个。
如果您的应用程序计划在未来多次执行,您可以测试不同数量的线程并比较总体执行时间,然后选择一个可提供最佳性能的线程数。您可能希望在这些测试中使用随机休眠操作来模拟任务。
中的最大工作线程数是多少
ThreadPoolExecutor
?中没有最大工作线程数
ThreadPoolExecutor
。尽管如此,根据您可用的主内存 (RAM) 大小,系统可以创建的线程数量会有上限。
在超出主内存之前,添加新线程和执行更多任务的收益将达到递减点。这是因为操作系统必须在线程之间切换,这称为上下文切换。如果同时激活的线程太多,程序在上下文切换上所花的时间可能比实际执行任务的时间还要多。
对于许多应用程序来说,合理的上限是数百个线程到几千个线程。现代系统上超过几千个线程可能会导致过多的上下文切换,具体取决于您的系统和正在执行的任务类型。
解决方案 2:
starlette.concurrency.run_in_threadpool
`anyio.to_thread.run_sync()`在引擎盖下使用。
默认情况下,并发数限制为 40,因此 50 个并发请求将导致线程池不足;您可以使用以下命令增加该限制
from anyio import to_thread
to_thread.current_default_thread_limiter().total_tokens = 200
类似地,如果您没有传入,则run_in_executor
使用默认值ThreadPoolExecutor
;默认执行器的默认工作程序数量为min(32, os.cpu_count() + 4)
,因此根据您的配置,这也可能太少。
解决方案 3:
设置
--limit-concurrency 100
为uvicorn
更改默认值
default_thread_limiter.total_tokens
。默认限制为 40。
from fastapi import FastAPI
from anyio.to_thread import get_asynclib
app = FastAPI()
@app.on_event("startup")
def startup():
print("starting app ... ")
get_asynclib().current_default_thread_limiter().total_tokens = 100
print('default _default_thread_limiter', get_asynclib().current_default_thread_limiter().total_tokens)
运行同步任务使用
run_in_threadpool
。
from starlette.concurrency import run_in_threadpool
@app.post("/blocking")
async def create_flow():
await run_in_threadpool(sub_task, t=100)
def sub_task(t=1):
import subprocess
cmd = f'echo sleeping...; sleep {t}'
stdout, stderr = subprocess.Popen(cmd, shell=True).communicate()
或使用:
run_in_threadpool(func, *args, limiter=get_asynclib().CapacityLimiter(10))
--limit-concurrency
当设置为5
且为 3 时导出日志default_thread_limiter.total_tokens
。
# start the web app
starting app ...
default _default_thread_limiter 3
# start call use `cURL`.
call sync ..
sleeping ...
call sync ..
sleeping ...
call sync ..
sleeping ...
# The first three execute normally, the rest are suspended
call sync ..
call sync ..
# Exceed concurrency limit(5) for `uvicorn`.
Exceeded concurrency limit.
Exceeded concurrency limit.
...
注意:Chrome 或其他浏览器限制
6
每个主机名的连接数,可能会影响测试。