在 FastAPI 端点中进行 concurrent.futures.ThreadPoolExecutor 调用是否危险?
- 2024-12-20 08:37:00
- admin 原创
- 115
问题描述:
我有以下测试代码:
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor() as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
我需要concurrent.futures.ThreadPoolExecutor
在 FastAPI 端点中使用部分代码。
我担心的是 API 调用数量和线程包含的影响。担心创建过多线程及其相关后果,导致主机资源不足,应用程序和/或主机崩溃。
对于这种方法您有什么想法或困惑吗?
解决方案 1:
您应该使用提供API 的HTTPX
库。如本答案所述,每次需要时,您都会生成一个并重复使用它。要使用 发出异步请求,您需要一个。async
`ClientHTTPX
AsyncClient`
您还可以使用limits
上的关键字参数来控制连接池大小,Client
该参数采用 的一个实例httpx.Limits
。例如:
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.AsyncClient(limits=limits)
您可以根据需要调整上述内容。根据池限制配置的文档:
max_keepalive_connections
,允许的保持连接数,或None
始终允许。(默认值为20)
max_connections
,允许的最大连接数,或None
无限制。(默认100)
keepalive_expiry
,空闲保持连接的时间限制(以秒为单位),或None
无限制。(默认5)
如果您还想调整超时,则可以使用timeout
参数设置单个请求或Client
/AsyncClient
实例的超时,这会导致给定的超时被用作使用此客户端发出的请求的默认值(Timeout
另请参阅类的实现)。您可以详细指定超时行为;例如,设置read
超时参数将指定等待接收数据块(即响应主体的块)的最大持续时间。如果HTTPX
无法在此时间范围内接收数据,ReadTimeout
则会引发异常。如果设置为None
而不是某个正数值,则不会出现timeout
。所有操作的read
默认值为 5 秒timeout
。
当您使用完毕后,您可以使用它await client.aclose()
来明确关闭AsyncClient
(例如,这可以在关闭事件处理程序中完成)。
要运行多个异步操作(因为在调用 API 端点时需要请求五个不同的 URL),您可以使用 awaitable 。 asyncio.gather()
它将执行操作并按照传递给该函数的awaitables ( )async
的顺序返回结果列表。tasks
工作示例
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import httpx
import asyncio
URLS = ['https://www.foxnews.com/',
'https://edition.cnn.com/',
'https://www.nbcnews.com/',
'https://www.bbc.co.uk/',
'https://www.reuters.com/']
@asynccontextmanager
async def lifespan(app: FastAPI):
# customize settings
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
timeout = httpx.Timeout(5.0, read=15.0) # 15s timeout on read. 5s timeout elsewhere.
# Initialize the Client on startup and add it to the state
async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
yield {'client': client}
# The Client closes on shutdown
app = FastAPI(lifespan=lifespan)
async def send(url, client):
return await client.get(url)
@app.get('/')
async def main(request: Request):
client = request.state.client
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
return [r.text[:50] for r in responses] # for demo purposes, only return the first 50 chars of each response
如果您不想将整个响应主体读入 RAM,则可以使用中的流响应httpx
,以及使用 FastAPI StreamingResponse
,如此答案中所述并在下面演示:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
import httpx
import asyncio
URLS = ['https://www.foxnews.com/',
'https://edition.cnn.com/',
'https://www.nbcnews.com/',
'https://www.bbc.co.uk/',
'https://www.reuters.com/']
@asynccontextmanager
async def lifespan(app: FastAPI):
# customize settings
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
timeout = httpx.Timeout(5.0, read=15.0) # 15s timeout on read. 5s timeout elsewhere.
# Initialize the Client on startup and add it to the state
async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
yield {'client': client}
# The Client closes on shutdown
app = FastAPI(lifespan=lifespan)
async def send(url, client):
req = client.build_request('GET', url)
return await client.send(req, stream=True)
async def iter_content(responses):
for r in responses:
async for chunk in r.aiter_text():
yield chunk[:50] # for demo purposes, return only the first 50 chars of each response and then break the loop
yield '
'
break
await r.aclose()
@app.get('/')
async def main(request: Request):
client = request.state.client
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
return StreamingResponse(iter_content(responses), media_type='text/event-stream')
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)
- 项目管理必备:盘点2024年13款好用的项目管理软件