FastAPI 以串行而非并行方式运行 API 调用

2024-11-18 08:41:00
admin
原创
14
摘要:问题描述:我有以下代码:import time from fastapi import FastAPI, Request app = FastAPI() @app.get("/ping") async def ping(request: Request): ...

问题描述:

我有以下代码:

import time
from fastapi import FastAPI, Request
    
app = FastAPI()
    
@app.get("/ping")
async def ping(request: Request):
        print("Hello")
        time.sleep(5)
        print("bye")
        return {"ping": "pong!"}

如果我在本地主机上运行我的代码(例如,http://localhost:8501/ping在同一个浏览器窗口的不同选项卡中),我会得到:

Hello
bye
Hello
bye

而不是:

Hello
Hello
bye
bye

我已阅读有关使用 的信息httpx,但仍然无法实现真正​​的并行化。问题出在哪里?


解决方案 1:

根据FastAPI 的文档:

当您使用 normal而不是声明路径操作函数(也称为端点)时,它将在随后的外部线程池中运行
而不是直接调用(因为它会阻塞服务器)。def`async def`await

另外,如下所述:

如果您正在使用与某些东西(数据库、API、文件系统等)通信且不支持使用 的第三方库await(目前大多数数据库库都是这种情况),那么只需使用 即可正常声明您的路径操作函数def

如果您的应用程序(不知何故)不需要与其他任何东西通信并等待其响应,请使用async def

如果您不知道,请使用正常def

注意:您可以根据需要在路径操作函数中混合使用def和,并使用最适合您的选项定义每个选项。FastAPI 将使用它们做正确的事情。async def

无论如何,在上述任何一种情况下,FastAPI仍然会异步工作并且速度极快。

但通过遵循上述步骤,它将能够进行一些性能优化。

因此,为了避免阻塞服务器, FastAPI 中的def端点(在异步编程上下文中,用 just 定义的函数def称为同步函数)仍将在事件循环中运行,但不是直接调用它,而是在外部线程池的单独线程def中运行端点(稍后将提供有关外部线程池的更多详细信息),因此,FastAPI 仍将异步工作换句话说,服务器将并发处理对这些端点的请求但将为每个传入请求生成一个新线程或重用线程池中的现有线程)。而端点直接在中运行——它在主(单)线程中运行,并在调用例如或其他 ASGI 服务器的等效方法时创建——也就是说,服务器也将并发/异步处理对此类端点的请求,只要这些端点/路由内调用非阻塞 I/O 绑定操作,例如等待(1)客户端通过网络发送数据、(2)读取磁盘中文件的内容、(3)数据库操作完成等(请查看此处)。await**async def`event loopuvicorn.run()await`async def

但是,如果 中定义的端点async defawait包含内部的某些协程(即,协程对象是调用async def函数的结果),则为了给 中的其他任务留出event loop运行时间(例如,对相同或其他端点的请求、后台任务等),对此类端点的每个请求都必须完全完成(即退出端点),然后才能将控制权交还给event loop并允许 中的其他任务event loop运行(如果您想获取和监视 中的所有待处理任务,请参阅此答案event loop)。换句话说,在这种情况下,服务器将被“阻止”,因此任何请求都将按顺序处理。

话虽如此,您仍然应该考虑用 定义一个端点async def,如果您的端点不必在内部执行阻塞操作并等待它响应,而是用来返回简单的JSON 数据,一个HTMLResponse(参见FastAPI 文档)或一个FileResponse(在这种情况下,文件内容将被异步读取,使用await anyio.open_file(),如在相关类实现中看到的那样),即使在这种情况下端点内部FileResponse没有语句,因为当直接在事件循环中运行这样一个简单的端点时,FastAPI 可能会表现得更好,而不是在与外部线程池分开的线程中运行端点(如果端点是用 normal 定义的,情况就会如此)。但是,如果您必须返回复杂而庞大的 JSON 数据,要么在端点内自行对它们进行编码,如前面链接的答案所示,要么使用 Starlette或 FastAPI 的/ ,所有这些类都会以同步方式使用和/对数据进行编码,在这种情况下,您应该考虑用 normal 定义端点——相关答案可以在这里和这里找到。await`defJSONResponseORJSONResponseUJSONResponsejson.dumps()orjson.dumps()ujson.dumps()`def

请注意,相同的概念不仅适用于端点,也适用于用作 的StreamingResponse生成器(请参阅StreamingResponse类实现)或Background Tasks(请参阅BackgroundTask类实现和此答案)的函数,这意味着 FastAPI 在后台也将在来自同一def外部线程池的单独线程中运行用 normal 定义的此类函数;而如果改为用 定义此类函数,它们将直接在 中运行。为了在单独的线程中运行上述端点或函数,FastAPI 使用了 Starlette 的异步函数,该函数在后台调用。该外部线程池的默认工作线程数为,可以根据需要进行调整——请查看此答案以了解有关外部线程池以及如何调整线程数的更多详细信息。因此,在阅读完这个答案后,您应该能够决定是否应该用 或 定义 FastAPI 端点、的生成器或函数。async def`event loopawaitrun_in_threadpool()anyio.to_thread.run_sync()40StreamingResponseBackgroundTaskdefasync def`

Python 的async def函数和await

关键字await(仅在async def函数内有效)将函数控制权传回给event loop。换句话说,它暂停周围协程的执行,并告诉event loop让其他某个任务运行,直到该awaited 任务完成。请注意,仅仅因为您可以在端点内使用 定义自定义函数,async def然后对其进行操作,并不意味着您的代码将异步工作,如果该自定义函数包含对 的调用、CPU 绑定任务、非异步 I/O 库或任何其他与异步 Python 代码不兼容的阻塞调用。例如,在 FastAPI 中,当使用 的方法(例如和 )时,FastAPI/Starlette 实际上在后台在与前面描述的外部线程池不同的线程中调用相应的同步File 方法(使用)并s 它;否则,这样的方法/操作会阻塞- 您可以通过查看类的实现了解更多信息。await`async deftime.sleep()asyncUploadFileawait file.read()await file.write()` `run_in_threadpool()awaitevent loopUploadFile`

请注意, 这async并不意味着并行,而是并发。如前所述,使用async和的异步代码await多次被总结为使用协程。协程是协作的(或协作多任务),这意味着“在任何给定时间,具有协程的程序运行其中一个协程,并且此正在运行的协程仅在明确请求暂停时才会暂停其执行”(有关协程的更多信息,请参阅此处和此处)。

正如本文所述:

具体来说,每当当前正在运行的协程执行到表达式时await,协程可能会被暂停,而另一个之前被暂停的协程可能会恢复执行(如果之前被暂停的协程已经返回了一个值)。当块async for从异步迭代器请求下一个值或async with进入或退出块时,也可能发生暂停,因为这些操作await在后台使用。

但是,如果在函数/端点内部直接执行/调用阻塞 I/O 绑定或 CPU 绑定操作async def,它将阻塞事件循环,因此主线程也将被阻塞(在进程/工作器的主线程中运行)。因此,诸如端点中的event loop阻塞操作将阻塞整个服务器(如问题中提供的代码示例所示)。因此,如果您的端点不进行任何调用,您可以改为使用 normal 声明它,在这种情况下,FastAPI 将在与外部线程池不同的线程中运行它,如前所述(以下部分提供了更多解决方案)。例:time.sleep()`async defasyncdef`await

@app.get("/ping")
def ping(request: Request):
    #print(request.client)
    print("Hello")
    time.sleep(5)
    print("bye")
    return "pong"

否则,如果您必须在端点内执行的函数是async您必须执行的函数await,则应使用 定义端点async def。为了演示这一点,下面的示例使用了asyncio.sleep()函数(来自asyncio库),它提供了非阻塞睡眠操作。该await asyncio.sleep()方法将暂停周围协程的执行(直到睡眠操作完成),从而允许 中的其他任务运行。这里和这里也event loop给出了类似的例子。

import asyncio
 
@app.get("/ping")
async def ping(request: Request):
    #print(request.client)
    print("Hello")
    await asyncio.sleep(5)
    print("bye")
    return "pong"

如果两个请求同时(大约)到达,则上述两个端点都会按照问题中提到的顺序将指定的消息打印到屏幕上,即:

Hello
Hello
bye
bye

重要提示

当使用 Web 浏览器第二次(第三次,依此类推)调用同一端点时,请记住从与浏览器主会话隔离的选项卡执行此操作;否则,后续请求(即第一个请求之后的请求)可能会被浏览器阻止(在客户端),因为浏览器可能在发送下一个请求之前等待服务器对上一个请求的响应。这至少是 Chrome Web 浏览器的常见行为,因为在再次请求同一资源之前,它会等待查看请求的结果并检查结果是否可以缓存。

print(request.client)您可以通过在端点内部使用来确认这一点,您将看到所有传入请求的hostnameport数字都是相同的(如果请求是从同一浏览器窗口/会话中打开的选项卡发起的);否则,port每个请求的数字通常都会不同——因此,这些请求将由服务器按顺序处理,因为浏览器/客户端首先按顺序发送它们。为了解决这个问题,您可以:

  1. 重新加载相同的选项卡(正在运行),或者

  2. 在隐身窗口中打开新标签页,或

  3. 使用不同的 Web 浏览器/客户端发送请求,或者

  4. 使用该httpx库和awaitable进行异步 HTTP 请求,它允许同时执行多个异步操作,然后按照传递给该函数的 awaitable(任务)的相同顺序返回结果列表(有关更多详细信息,请查看此答案)。 asyncio.gather()

例子

import httpx
import asyncio

URLS = ['http://127.0.0.1:8000/ping'] * 2

async def send(url, client):
    return await client.get(url, timeout=10)

async def main():
    async with httpx.AsyncClient() as client:
        tasks = [send(url, client) for url in URLS]
        responses = await asyncio.gather(*tasks)
        print(*[r.json() for r in responses], sep='
')

asyncio.run(main())

如果您必须调用不同的端点,而这些端点可能需要不同的时间来处理请求,并且您希望在服务器返回响应后立即在客户端打印出来 - 而不是等待asyncio.gather()收集所有任务的结果并按照任务传递给函数的相同顺序打印出来send()- 您可以将send()上面示例中的函数替换为下面显示的函数:

async def send(url, client):
    res = await client.get(url, timeout=10)
    print(res.json())
    return res

Async/await和阻止 I/O 密集型或 CPU 密集型操作

如果您需要定义一个 FastAPI 端点(或 的StreamingResponse生成器或后台任务函数)async def(因为您可能需要await其中的一些协程),但也有一些同步阻塞 I/O 绑定或 CPU 绑定操作(计算密集型任务)会阻塞event loop(基本上是整个服务器)并且不会让其他请求通过,例如:

@app.post("/ping")
async def ping(file: UploadFile = File(...)):
    print("Hello")
    try:
        contents = await file.read()
        res = cpu_bound_task(contents)  # this would block the event loop
    finally:
        await file.close()
    print("bye")
    return "pong"

然后:

  1. 您应该检查是否可以将端点的定义更改为 normaldef而不是async def。例如,如果端点中唯一需要等待的方法是读取文件内容的方法(如您在下面的评论部分中提到的),那么您可以将端点参数的类型声明为bytes(即file: bytes = File()),这样 FastAPI 就会为您读取文件,您会以 的形式收到内容bytes。因此,没有必要使用await file.read()。请注意,上述方法适用于小文件,因为整个文件内容将存储到内存中(请参阅有关参数的文档File);因此,如果您的系统没有足够的 RAM 来容纳累积的数据(例如,如果您有 8GB 的​​ RAM,则无法加载 50GB 的文件),您的应用程序最终可能会崩溃。或者,您可以直接调用.read()的方法SpooledTemporaryFile(可以通过对象.file的属性访问UploadFile),这样您就不必再次使用await.read()方法 - 并且由于您现在可以用 normal 声明端点def,每个请求将在单独的线程中运行(示例如下)。有关如何上传File以及Starlette / FastAPI如何SpooledTemporaryFile在幕后使用的更多详细信息,请查看此答案和此答案。

@app.post("/ping")
def ping(file: UploadFile = File(...)):
    print("Hello")
    try:
        contents = file.file.read()
        res = cpu_bound_task(contents)
    finally:
        file.file.close()
    print("bye")
    return "pong"
  1. 使用模块中的 FastAPI(Starlette)run_in_threadpool()函数concurrency(如 @tiangolo在此处建议的那样),该函数“将在单独的线程中运行该函数,以确保主线程(运行协程的地方)不会被阻塞”(请参阅​​此处)。run_in_threadpool是一个awaitable 函数,其第一个参数是一个普通函数,后面的参数直接传递给该函数。它支持序列关键字参数。

from fastapi.concurrency import run_in_threadpool

res = await run_in_threadpool(cpu_bound_task, contents)
  1. 或者,在获取正在运行的using之后,使用asyncio's来运行任务,在这种情况下,您可以让它完成并返回结果,然后再转到下一行代码。传递给executor参数,将使用默认执行器;即:loop.run_in_executor()`event loopasyncio.get_running_loop()awaitNoneThreadPoolExecutor`

import asyncio

loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, cpu_bound_task, contents)

或者,如果您想要传递关键字参数,则可以使用lambda表达式(例如lambda: cpu_bound_task(some_arg=contents)),或者最好是,functools.partial()这是文档中特别推荐的loop.run_in_executor()

import asyncio
from functools import partial

loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))

在 Python 3.9+ 中,您还可以使用asyncio.to_thread()在单独的线程中异步运行同步函数 — 本质上,它使用await loop.run_in_executor(None, func_call)后台,如 的实现asyncio.to_thread()中所示。该to_thread()函数采用要执行的阻塞函数的名称以及该函数的任何参数(*args和/或**kwargs),然后返回可执行的协程await。示例:

import asyncio

res = await asyncio.to_thread(cpu_bound_task, contents)

请注意,正如本答案中所述,传递Noneexecutor参数不会ThreadPoolExecutor在每次调用时创建新的await loop.run_in_executor(None, ...),而是重新使用具有默认工作线程数(即)的默认执行器。因此,根据应用程序的要求,该数量可能不够。在这种情况下,您应该使用自定义的。例如:min(32, os.cpu_count() + 4)`ThreadPoolExecutor`

import asyncio
import concurrent.futures

loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
    res = await loop.run_in_executor(pool, cpu_bound_task, contents)

我强烈建议您查看上面链接的答案,以了解使用run_in_threadpool()和之间的区别run_in_executor(),以及如何ThreadPoolExecutor在应用程序启动时创建可重复使用的自定义,并根据需要调整最大工作线程数。

  1. ThreadPoolExecutor将成功防止被event loop阻止,但不会为您带来并行运行代码所期望的性能提升;尤其是当需要执行诸如此处描述的任务(例如音频或图像处理、机器学习等)时。因此,最好在单独的进程中运行 CPU 密集型任务(使用,如下所示),同样,您可以将其与集成,以便它完成其工作并返回结果。如此处所述,保护程序的入口点以避免递归生成子进程等非常重要。基本上,您的代码必须在下。CPU-bound`ProcessPoolExecutorasyncioawait`if __name__ == '__main__'

import concurrent.futures

loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
    res = await loop.run_in_executor(pool, cpu_bound_task, contents) 

再次,我建议您查看前面链接的答案,了解如何ProcessPoolExecutor在应用程序启动时创建可重用项。您应该也会发现这个答案很有帮助。此外,上述方法的替代解决方案包括生成新线程或使用,如本答案asyncio.create_task()所示,以及生成新进程,使用模块而不是。multiprocessing`concurrent.futures`

  1. 使用更多工作程序来利用多核 CPU,以便并行运行多个进程并能够处理更多请求。例如,uvicorn main:app --workers 4(如果您使用Gunicorn 作为带有 Uvicorn 工作程序的进程管理器,请查看此答案)。当使用 1 个工作程序时,仅运行一个进程。当使用多个工作程序时,这将产生多个进程(所有单线程)。每个进程都有一个单独的全局解释器锁(GIL)以及它自己的event loop,它在每个进程的主线程中运行并在其线程中执行所有任务。这意味着,只有一个线程可以锁定每个进程的解释器;除非您使用额外的线程,无论是在 外部还是内部event loop,例如,当使用 时,或ThreadPoolExecutor用normal而不是loop.run_in_executor定义端点/后台任务/的生成器,以及在调用的方法时(有关详细信息,请参阅本答案的前两段)。StreamingResponse`defasync defUploadFile`

注意:每个 worker “都有自己的内容、变量和内存”。这意味着global变量/对象等不会在进程/worker 之间共享。在这种情况下,您应该考虑使用数据库存储或键值存储(缓存),如此处和此处所述。此外,请注意“如果您的代码消耗了大量内存,则每个进程都会消耗等量的内存”。

  1. 如果您需要执行繁重的后台计算,并且不一定需要由同一进程运行它(例如,您不需要共享内存,变量等),那么您可能会受益于使用其他更大的工具,例如Celery,如FastAPI 的文档中所述。 使用AsyncIOSchedulerfrom apscheduler,如本答案中所示,也可能是另一种选择。

解决方案 2:

问:
“...有什么问题吗?”

答:
FastAPI 文档明确指出该框架使用进程内任务(从Starlette继承)。

这本身就意味着,所有这些任务都会竞争接收(不时)Python 解释器的 GIL 锁 - 实际上是 MUTEX 恐怖的全局解释器锁,它实际上重新设置了[SERIAL]所有数量的 Python 解释器进程内线程,
使其作为一个且仅有一个工作,而所有其他线程都在等待......

在细粒度上,您会看到结果 - 如果为第二个到达的 http 请求生成另一个处理程序(从第二个 FireFox-tab 手动启动)实际上花费的时间比睡眠所花费的时间长,则 GIL 锁交错时间量循环的结果(在下一轮 GIL 锁释放获取轮盘之前~ 100 [ms]所有等待一个都可以工作)Python 解释器的内部工作不会显示更多细节,您可以使用更多细节(取决于 O/S 类型或版本)从这里查看更多线程内LoD,就像在执行的异步装饰代码中一样:~ 100 [ms]

import time
import threading
from   fastapi import FastAPI, Request

TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"

print( TEMPLATE.format( time.perf_counter_ns(),
                        threading.get_ident(),
                       "Python Interpreter __main__ was started ..."
                        )
...
@app.get("/ping")
async def ping( request: Request ):
        """                                __doc__
        [DOC-ME]
        ping( Request ):  a mock-up AS-IS function to yield
                          a CLI/GUI self-evidence of the order-of-execution
        RETURNS:          a JSON-alike decorated dict

        [TEST-ME]         ...
        """
        print( TEMPLATE.format( time.perf_counter_ns(),
                                threading.get_ident(),
                               "Hello..."
                                )
        #------------------------------------------------- actual blocking work
        time.sleep( 5 )
        #------------------------------------------------- actual blocking work
        print( TEMPLATE.format( time.perf_counter_ns(),
                                threading.get_ident(),
                               "...bye"
                                )
        return { "ping": "pong!" }

最后,但并非最不重要的一点是,不要犹豫,阅读有关所有其他基于鲨鱼线程的代码可能遭受的...甚至导致...幕后...

备忘录

GIL 锁、基于线程的池、异步装饰器、阻塞和事件处理的混合 —— 不确定性的可靠组合 & HWY2HELL ;o)

相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   601  
  华为IPD与传统研发模式的8大差异在快速变化的商业环境中,产品研发模式的选择直接决定了企业的市场响应速度和竞争力。华为作为全球领先的通信技术解决方案供应商,其成功在很大程度上得益于对产品研发模式的持续创新。华为引入并深度定制的集成产品开发(IPD)体系,相较于传统的研发模式,展现出了显著的差异和优势。本文将详细探讨华为...
IPD流程是谁发明的   7  
  如何通过IPD流程缩短产品上市时间?在快速变化的市场环境中,产品上市时间成为企业竞争力的关键因素之一。集成产品开发(IPD, Integrated Product Development)作为一种先进的产品研发管理方法,通过其结构化的流程设计和跨部门协作机制,显著缩短了产品上市时间,提高了市场响应速度。本文将深入探讨如...
华为IPD流程   9  
  在项目管理领域,IPD(Integrated Product Development,集成产品开发)流程图是连接创意、设计与市场成功的桥梁。它不仅是一个视觉工具,更是一种战略思维方式的体现,帮助团队高效协同,确保产品按时、按质、按量推向市场。尽管IPD流程图可能初看之下显得错综复杂,但只需掌握几个关键点,你便能轻松驾驭...
IPD开发流程管理   8  
  在项目管理领域,集成产品开发(IPD)流程被视为提升产品上市速度、增强团队协作与创新能力的重要工具。然而,尽管IPD流程拥有诸多优势,其实施过程中仍可能遭遇多种挑战,导致项目失败。本文旨在深入探讨八个常见的IPD流程失败原因,并提出相应的解决方法,以帮助项目管理者规避风险,确保项目成功。缺乏明确的项目目标与战略对齐IP...
IPD流程图   8  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用