FastAPI 以串行而非并行方式运行 API 调用
- 2024-11-18 08:41:00
- admin 原创
- 14
问题描述:
我有以下代码:
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
如果我在本地主机上运行我的代码(例如,http://localhost:8501/ping
在同一个浏览器窗口的不同选项卡中),我会得到:
Hello
bye
Hello
bye
而不是:
Hello
Hello
bye
bye
我已阅读有关使用 的信息httpx
,但仍然无法实现真正的并行化。问题出在哪里?
解决方案 1:
根据FastAPI 的文档:
当您使用 normal而不是声明路径操作函数(也称为端点)时,它将在随后的外部线程池中运行
,而不是直接调用(因为它会阻塞服务器)。def
`async def`await
另外,如下所述:
如果您正在使用与某些东西(数据库、API、文件系统等)通信且不支持使用 的第三方库
await
(目前大多数数据库库都是这种情况),那么只需使用 即可正常声明您的路径操作函数def
。如果您的应用程序(不知何故)不需要与其他任何东西通信并等待其响应,请使用
async def
。如果您不知道,请使用正常
def
。注意:您可以根据需要在路径操作函数中混合使用
def
和,并使用最适合您的选项定义每个选项。FastAPI 将使用它们做正确的事情。async def
无论如何,在上述任何一种情况下,FastAPI仍然会异步工作并且速度极快。
但通过遵循上述步骤,它将能够进行一些性能优化。
因此,为了避免阻塞服务器, FastAPI 中的def
端点(在异步编程上下文中,用 just 定义的函数def
称为同步函数)仍将在事件循环中运行,但不是直接调用它,而是在外部线程池的单独线程def
中运行端点(稍后将提供有关外部线程池的更多详细信息),因此,FastAPI 仍将异步工作。换句话说,服务器将并发处理对这些端点的请求(但将为每个传入请求生成一个新线程或重用线程池中的现有线程)。而端点直接在中运行——它在主(单)线程中运行,并在调用例如或其他 ASGI 服务器的等效方法时创建——也就是说,服务器也将并发/异步处理对此类端点的请求,只要在这些端点/路由内调用非阻塞 I/O 绑定操作,例如等待(1)客户端通过网络发送数据、(2)读取磁盘中文件的内容、(3)数据库操作完成等(请查看此处)。await
**async def
`event loopuvicorn.run()
await`async def
但是,如果 中定义的端点async def
不await
包含内部的某些协程(即,协程对象是调用async def
函数的结果),则为了给 中的其他任务留出event loop
运行时间(例如,对相同或其他端点的请求、后台任务等),对此类端点的每个请求都必须完全完成(即退出端点),然后才能将控制权交还给event loop
并允许 中的其他任务event loop
运行(如果您想获取和监视 中的所有待处理任务,请参阅此答案event loop
)。换句话说,在这种情况下,服务器将被“阻止”,因此任何请求都将按顺序处理。
话虽如此,您仍然应该考虑用 定义一个端点async def
,如果您的端点不必在内部执行阻塞操作并等待它响应,而是用来返回简单的JSON 数据,一个HTMLResponse
(参见FastAPI 文档)或一个FileResponse
(在这种情况下,文件内容将被异步读取,使用await anyio.open_file()
,如在相关类实现中看到的那样),即使在这种情况下端点内部FileResponse
没有语句,因为当直接在事件循环中运行这样一个简单的端点时,FastAPI 可能会表现得更好,而不是在与外部线程池分开的线程中运行端点(如果端点是用 normal 定义的,情况就会如此)。但是,如果您必须返回复杂而庞大的 JSON 数据,要么在端点内自行对它们进行编码,如前面链接的答案所示,要么使用 Starlette或 FastAPI 的/ ,所有这些类都会以同步方式使用和/对数据进行编码,在这种情况下,您应该考虑用 normal 定义端点——相关答案可以在这里和这里找到。await
`defJSONResponse
ORJSONResponseUJSONResponse
json.dumps()orjson.dumps()
ujson.dumps()`def
请注意,相同的概念不仅适用于端点,也适用于用作 的StreamingResponse
生成器(请参阅StreamingResponse
类实现)或Background Tasks
(请参阅BackgroundTask
类实现和此答案)的函数,这意味着 FastAPI 在后台也将在来自同一def
外部线程池的单独线程中运行用 normal 定义的此类函数;而如果改为用 定义此类函数,它们将直接在 中运行。为了在单独的线程中运行上述端点或函数,FastAPI 使用了 Starlette 的异步函数,该函数在后台调用。该外部线程池的默认工作线程数为,可以根据需要进行调整——请查看此答案以了解有关外部线程池以及如何调整线程数的更多详细信息。因此,在阅读完这个答案后,您应该能够决定是否应该用 或 定义 FastAPI 端点、的生成器或函数。async def
`event loopawait
run_in_threadpool()anyio.to_thread.run_sync()
40StreamingResponse
BackgroundTaskdef
async def`
Python 的async def
函数和await
关键字await
(仅在async def
函数内有效)将函数控制权传回给event loop
。换句话说,它暂停周围协程的执行,并告诉event loop
让其他某个任务运行,直到该await
ed 任务完成。请注意,仅仅因为您可以在端点内使用 定义自定义函数,async def
然后对其进行操作,并不意味着您的代码将异步工作,如果该自定义函数包含对 的调用、CPU 绑定任务、非异步 I/O 库或任何其他与异步 Python 代码不兼容的阻塞调用。例如,在 FastAPI 中,当使用 的方法(例如和 )时,FastAPI/Starlette 实际上在后台在与前面描述的外部线程池不同的线程中调用相应的同步File 方法(使用)并s 它;否则,这样的方法/操作会阻塞- 您可以通过查看类的实现了解更多信息。await
`async deftime.sleep()
asyncUploadFile
await file.read()await file.write()` `run_in_threadpool()
awaitevent loop
UploadFile`
请注意, 这async
并不意味着并行,而是并发。如前所述,使用async
和的异步代码await
多次被总结为使用协程。协程是协作的(或协作多任务),这意味着“在任何给定时间,具有协程的程序只运行其中一个协程,并且此正在运行的协程仅在明确请求暂停时才会暂停其执行”(有关协程的更多信息,请参阅此处和此处)。
正如本文所述:
具体来说,每当当前正在运行的协程执行到表达式时
await
,协程可能会被暂停,而另一个之前被暂停的协程可能会恢复执行(如果之前被暂停的协程已经返回了一个值)。当块async for
从异步迭代器请求下一个值或async with
进入或退出块时,也可能发生暂停,因为这些操作await
在后台使用。
但是,如果在函数/端点内部直接执行/调用阻塞 I/O 绑定或 CPU 绑定操作async def
,它将阻塞事件循环,因此主线程也将被阻塞(在进程/工作器的主线程中运行)。因此,诸如端点中的event loop
阻塞操作将阻塞整个服务器(如问题中提供的代码示例所示)。因此,如果您的端点不进行任何调用,您可以改为使用 normal 声明它,在这种情况下,FastAPI 将在与外部线程池不同的线程中运行它,如前所述(以下部分提供了更多解决方案)。例:time.sleep()
`async defasync
def`await
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
否则,如果您必须在端点内执行的函数是async
您必须执行的函数await
,则应使用 定义端点async def
。为了演示这一点,下面的示例使用了asyncio.sleep()
函数(来自asyncio
库),它提供了非阻塞睡眠操作。该await asyncio.sleep()
方法将暂停周围协程的执行(直到睡眠操作完成),从而允许 中的其他任务运行。这里和这里也event loop
给出了类似的例子。
import asyncio
@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
如果两个请求同时(大约)到达,则上述两个端点都会按照问题中提到的顺序将指定的消息打印到屏幕上,即:
Hello
Hello
bye
bye
重要提示
当使用 Web 浏览器第二次(第三次,依此类推)调用同一端点时,请记住从与浏览器主会话隔离的选项卡执行此操作;否则,后续请求(即第一个请求之后的请求)可能会被浏览器阻止(在客户端),因为浏览器可能在发送下一个请求之前等待服务器对上一个请求的响应。这至少是 Chrome Web 浏览器的常见行为,因为在再次请求同一资源之前,它会等待查看请求的结果并检查结果是否可以缓存。
print(request.client)
您可以通过在端点内部使用来确认这一点,您将看到所有传入请求的hostname
和port
数字都是相同的(如果请求是从同一浏览器窗口/会话中打开的选项卡发起的);否则,port
每个请求的数字通常都会不同——因此,这些请求将由服务器按顺序处理,因为浏览器/客户端首先按顺序发送它们。为了解决这个问题,您可以:
重新加载相同的选项卡(正在运行),或者
在隐身窗口中打开新标签页,或
使用不同的 Web 浏览器/客户端发送请求,或者
使用该
httpx
库和awaitable进行异步 HTTP 请求,它允许同时执行多个异步操作,然后按照传递给该函数的 awaitable(任务)的相同顺序返回结果列表(有关更多详细信息,请查看此答案)。asyncio.gather()
例子:
import httpx
import asyncio
URLS = ['http://127.0.0.1:8000/ping'] * 2
async def send(url, client):
return await client.get(url, timeout=10)
async def main():
async with httpx.AsyncClient() as client:
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
print(*[r.json() for r in responses], sep='
')
asyncio.run(main())
如果您必须调用不同的端点,而这些端点可能需要不同的时间来处理请求,并且您希望在服务器返回响应后立即在客户端打印出来 - 而不是等待asyncio.gather()
收集所有任务的结果并按照任务传递给函数的相同顺序打印出来send()
- 您可以将send()
上面示例中的函数替换为下面显示的函数:
async def send(url, client):
res = await client.get(url, timeout=10)
print(res.json())
return res
Async
/await
和阻止 I/O 密集型或 CPU 密集型操作
如果您需要定义一个 FastAPI 端点(或 的StreamingResponse
生成器或后台任务函数)async def
(因为您可能需要await
其中的一些协程),但也有一些同步阻塞 I/O 绑定或 CPU 绑定操作(计算密集型任务)会阻塞event loop
(基本上是整个服务器)并且不会让其他请求通过,例如:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this would block the event loop
finally:
await file.close()
print("bye")
return "pong"
然后:
您应该检查是否可以将端点的定义更改为 normal
def
而不是async def
。例如,如果端点中唯一需要等待的方法是读取文件内容的方法(如您在下面的评论部分中提到的),那么您可以将端点参数的类型声明为bytes
(即file: bytes = File()
),这样 FastAPI 就会为您读取文件,您会以 的形式收到内容bytes
。因此,没有必要使用await file.read()
。请注意,上述方法适用于小文件,因为整个文件内容将存储到内存中(请参阅有关参数的文档File
);因此,如果您的系统没有足够的 RAM 来容纳累积的数据(例如,如果您有 8GB 的 RAM,则无法加载 50GB 的文件),您的应用程序最终可能会崩溃。或者,您可以直接调用.read()
的方法SpooledTemporaryFile
(可以通过对象.file
的属性访问UploadFile
),这样您就不必再次使用await
该.read()
方法 - 并且由于您现在可以用 normal 声明端点def
,每个请求将在单独的线程中运行(示例如下)。有关如何上传File
以及Starlette / FastAPI如何SpooledTemporaryFile
在幕后使用的更多详细信息,请查看此答案和此答案。
@app.post("/ping")
def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = file.file.read()
res = cpu_bound_task(contents)
finally:
file.file.close()
print("bye")
return "pong"
使用模块中的 FastAPI(Starlette)
run_in_threadpool()
函数concurrency
(如 @tiangolo在此处建议的那样),该函数“将在单独的线程中运行该函数,以确保主线程(运行协程的地方)不会被阻塞”(请参阅此处)。run_in_threadpool
是一个await
able 函数,其第一个参数是一个普通函数,后面的参数直接传递给该函数。它支持序列和关键字参数。
from fastapi.concurrency import run_in_threadpool
res = await run_in_threadpool(cpu_bound_task, contents)
或者,在获取正在运行的using之后,使用
asyncio
's来运行任务,在这种情况下,您可以让它完成并返回结果,然后再转到下一行代码。传递给executor参数,将使用默认执行器;即:loop.run_in_executor()
`event loopasyncio.get_running_loop()
awaitNone
ThreadPoolExecutor`
import asyncio
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, cpu_bound_task, contents)
或者,如果您想要传递关键字参数,则可以使用lambda
表达式(例如lambda: cpu_bound_task(some_arg=contents)
),或者最好是,functools.partial()
这是文档中特别推荐的loop.run_in_executor()
:
import asyncio
from functools import partial
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
在 Python 3.9+ 中,您还可以使用asyncio.to_thread()
在单独的线程中异步运行同步函数 — 本质上,它使用await loop.run_in_executor(None, func_call)
后台,如 的实现asyncio.to_thread()
中所示。该to_thread()
函数采用要执行的阻塞函数的名称以及该函数的任何参数(*args
和/或**kwargs
),然后返回可执行的协程await
。示例:
import asyncio
res = await asyncio.to_thread(cpu_bound_task, contents)
请注意,正如本答案中所述,传递None
给executor
参数不会ThreadPoolExecutor
在每次调用时创建新的await loop.run_in_executor(None, ...)
,而是重新使用具有默认工作线程数(即)的默认执行器。因此,根据应用程序的要求,该数量可能不够。在这种情况下,您应该使用自定义的。例如:min(32, os.cpu_count() + 4)
`ThreadPoolExecutor`
import asyncio
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
我强烈建议您查看上面链接的答案,以了解使用run_in_threadpool()
和之间的区别run_in_executor()
,以及如何ThreadPoolExecutor
在应用程序启动时创建可重复使用的自定义,并根据需要调整最大工作线程数。
ThreadPoolExecutor
将成功防止被event loop
阻止,但不会为您带来并行运行代码所期望的性能提升;尤其是当需要执行诸如此处描述的任务(例如音频或图像处理、机器学习等)时。因此,最好在单独的进程中运行 CPU 密集型任务(使用,如下所示),同样,您可以将其与集成,以便它完成其工作并返回结果。如此处所述,保护程序的入口点以避免递归生成子进程等非常重要。基本上,您的代码必须在下。CPU-bound
`ProcessPoolExecutorasyncio
await`if __name__ == '__main__'
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
再次,我建议您查看前面链接的答案,了解如何ProcessPoolExecutor
在应用程序启动时创建可重用项。您应该也会发现这个答案很有帮助。此外,上述方法的替代解决方案包括生成新线程或使用,如本答案asyncio.create_task()
所示,以及生成新进程,使用模块而不是。multiprocessing
`concurrent.futures`
使用更多工作程序来利用多核 CPU,以便并行运行多个进程并能够处理更多请求。例如,
uvicorn main:app --workers 4
(如果您使用Gunicorn 作为带有 Uvicorn 工作程序的进程管理器,请查看此答案)。当使用 1 个工作程序时,仅运行一个进程。当使用多个工作程序时,这将产生多个进程(所有单线程)。每个进程都有一个单独的全局解释器锁(GIL)以及它自己的event loop
,它在每个进程的主线程中运行并在其线程中执行所有任务。这意味着,只有一个线程可以锁定每个进程的解释器;除非您使用额外的线程,无论是在 外部还是内部event loop
,例如,当使用 时,或ThreadPoolExecutor
用normal而不是loop.run_in_executor
定义端点/后台任务/的生成器,以及在调用的方法时(有关详细信息,请参阅本答案的前两段)。StreamingResponse
`defasync def
UploadFile`
注意:每个 worker “都有自己的内容、变量和内存”。这意味着global
变量/对象等不会在进程/worker 之间共享。在这种情况下,您应该考虑使用数据库存储或键值存储(缓存),如此处和此处所述。此外,请注意“如果您的代码消耗了大量内存,则每个进程都会消耗等量的内存”。
如果您需要执行繁重的后台计算,并且不一定需要由同一进程运行它(例如,您不需要共享内存,变量等),那么您可能会受益于使用其他更大的工具,例如Celery,如FastAPI 的文档中所述。 使用
AsyncIOScheduler
fromapscheduler
,如本答案中所示,也可能是另一种选择。
解决方案 2:
问:
“...有什么问题吗?”
答:
FastAPI 文档明确指出该框架使用进程内任务(从Starlette继承)。
这本身就意味着,所有这些任务都会竞争接收(不时)Python 解释器的 GIL 锁 - 实际上是 MUTEX 恐怖的全局解释器锁,它实际上重新设置了[SERIAL]
所有数量的 Python 解释器进程内线程,
使其作为一个且仅有一个工作,而所有其他线程都在等待......
在细粒度上,您会看到结果 - 如果为第二个到达的 http 请求生成另一个处理程序(从第二个 FireFox-tab 手动启动)实际上花费的时间比睡眠所花费的时间长,则 GIL 锁交错时间量循环的结果(在下一轮 GIL 锁释放获取轮盘之前~ 100 [ms]
所有等待一个都可以工作)Python 解释器的内部工作不会显示更多细节,您可以使用更多细节(取决于 O/S 类型或版本)从这里查看更多线程内LoD,就像在执行的异步装饰代码中一样:~ 100 [ms]
import time
import threading
from fastapi import FastAPI, Request
TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Python Interpreter __main__ was started ..."
)
...
@app.get("/ping")
async def ping( request: Request ):
""" __doc__
[DOC-ME]
ping( Request ): a mock-up AS-IS function to yield
a CLI/GUI self-evidence of the order-of-execution
RETURNS: a JSON-alike decorated dict
[TEST-ME] ...
"""
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Hello..."
)
#------------------------------------------------- actual blocking work
time.sleep( 5 )
#------------------------------------------------- actual blocking work
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"...bye"
)
return { "ping": "pong!" }
最后,但并非最不重要的一点是,不要犹豫,阅读有关所有其他基于鲨鱼线程的代码可能遭受的...甚至导致...幕后...
备忘录
GIL 锁、基于线程的池、异步装饰器、阻塞和事件处理的混合 —— 不确定性的可靠组合 & HWY2HELL ;o)
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理必备:盘点2024年13款好用的项目管理软件