在 Uvicorn/FastAPI 内部发出下游 HTTP 请求的正确方法是什么?
- 2024-12-03 08:44:00
- admin 原创
- 218
问题描述:
我有一个 API 端点(FastAPI / Uvicorn)。除其他事项外,它还会向另一个 API 发出信息请求。当我使用多个并发请求加载我的 API 时,我开始收到以下错误:
h11._util.LocalProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE
在正常环境中,我会利用request.session
,但我知道它不是完全线程安全的。
那么,在 FastAPI 等框架中使用请求的正确方法是什么,其中多个线程会requests
同时使用该库?
解决方案 1:
除了使用requests
,您还可以使用httpx
,它也提供了async
API (在执行测试时httpx
也在 FastAPI 的文档中建议async
使用,并且 FastAPI/Starlette 最近将 HTTP 客户端TestClient
从替换requests
为httpx
)。
下面的示例基于httpx
文档中给出的示例,演示了如何使用库发出异步 HTTP(s) 请求,随后将响应流回客户端。httpx.AsyncClient()
您可以用 代替requests.Session()
,这在向同一主机发出多个请求时很有用,因为底层 TCP 连接将被重用,而不是为每个请求都重新创建一个连接 - 因此,可以显着提高性能。 此外,它允许您在请求之间重用headers
和其他设置(例如proxies
和timeout
)以及保留。每次需要时,cookies
您都会生成一个并重用它。 您可以使用在完成后显式关闭客户端(您可以在事件处理程序中执行此操作)。 在此答案中也可以找到示例和更多详细信息。Client
`await client.aclose()`shutdown
例子
from fastapi import FastAPI
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse
import httpx
app = FastAPI()
@app.on_event("startup")
async def startup_event():
app.state.client = httpx.AsyncClient()
@app.on_event('shutdown')
async def shutdown_event():
await app.state.client.aclose()
@app.get('/')
async def home():
client = app.state.client
req = client.build_request('GET', 'https://www.example.com/')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))
示例(已更新)
由于startup
和shutdown
现已弃用(将来可能会完全删除),因此您可以使用lifespan
处理程序来初始化httpx
客户端,并在关闭时关闭客户端实例,类似于此答案中演示的内容。 Starlette 在其文档页面中特别提供了一个使用lifespan
处理程序和httpx
客户端的示例。 如Starlette 的文档中所述:
有的
lifespan
概念state
,它是一个字典,可以用来在生命周期和请求之间共享对象。
state
请求上接收到的是生命周期处理程序上接收到的状态的浅表副本。
因此,可以使用 在端点内访问添加到生命周期处理程序中的状态的对象request.state
。下面的示例使用流式响应与外部服务器通信,并将响应发送回客户端。有关(即、、等)响应流式方法的更多详细信息,请参阅此处。async
`httpxaiter_bytes()
aiter_text()`aiter_lines()
如果您想将 用作media_type
,StreamingResponse
则可以使用原始响应中的 ,如下所示:media_type=r.headers['content-type']
。但是,正如本答案中所述,您需要确保media_type
未设置为text/plain
;否则,内容将不会按预期在浏览器中传输,除非您禁用MIME 嗅探(请查看链接的答案以了解更多详细信息和解决方案)。
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
import httpx
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialise the Client on startup and add it to the state
async with httpx.AsyncClient() as client:
yield {'client': client}
# The Client closes on shutdown
app = FastAPI(lifespan=lifespan)
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))
如果由于某种原因,您需要在服务器端逐块读取内容,然后再响应客户端,那么您可以按如下方式执行此操作:
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)
async def gen():
async for chunk in r.aiter_raw():
yield chunk
await r.aclose()
return StreamingResponse(gen())
如果您不想使用流式响应,而是首先读取响应(这会将响应数据存储到服务器的 RAM 中;因此,您应该确保有足够的空间来容纳数据),则可以使用以下方法。请注意,使用仅httpx
适用r.json()
于响应数据为 JSON 格式的情况;否则,您可以直接返回PlainTextResponse
或自定义Response
,如下所示。
from fastapi import Response
from fastapi.responses import PlainTextResponse
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req)
content_type = r.headers.get('content-type')
if content_type == 'application/json':
return r.json()
elif content_type == 'text/plain':
return PlainTextResponse(content=r.text)
else:
return Response(content=r.content)
使用async
APIhttpx
意味着您必须使用 定义您的端点async def
;否则,您必须使用标准同步 API(对于def
vs,async def
请参阅此答案),如此github 讨论中所述:
是的。
HTTPX
旨在实现线程安全,是的,跨所有线程的单个客户端实例在连接池方面会比使用每个线程一个实例做得更好。
limits
您还可以使用关键字参数控制连接池大小Client
(请参阅池限制配置)。例如:
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.Client(limits=limits)