在 FastAPI 端点中进行 concurrent.futures.ThreadPoolExecutor 调用是否危险?

2024-12-20 08:37:00
admin
原创
115
摘要:问题描述:我有以下测试代码:import concurrent.futures import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj...

问题描述:

我有以下测试代码:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

我需要concurrent.futures.ThreadPoolExecutor在 FastAPI 端点中使用部分代码。

我担心的是 API 调用数量和线程包含的影响。担心创建过多线程及其相关后果,导致主机资源不足,应用程序和/或主机崩溃。

对于这种方法您有什么想法或困惑吗?


解决方案 1:

您应该使用提供API 的HTTPX库。如本答案所述,每次需要时,您都会生成一个并重复使用它。要使用 发出异步请求,您需要一个。async`ClientHTTPXAsyncClient`

您还可以使用limits上的关键字参数来控制连接池大小,Client该参数采用 的一个实例httpx.Limits。例如:

limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.AsyncClient(limits=limits)

您可以根据需要调整上述内容。根据池限制配置的文档:

  • max_keepalive_connections,允许的保持连接数,或None始终允许。(默认值为20

  • max_connections,允许的最大连接数,或None 无限制。(默认100

  • keepalive_expiry,空闲保持连接的时间限制(以秒为单位),或None无限制。(默认5

如果您还想调整超时,则可以使用timeout参数设置单个请求或Client/AsyncClient实例的超时,这会导致给定的超时被用作使用此客户端发出的请求的默认值(Timeout另请参阅类的实现)。您可以详细指定超时行为;例如,设置read超时参数将指定等待接收数据块(即响应主体的块)的最大持续时间。如果HTTPX无法在此时间范围内接收数据,ReadTimeout则会引发异常。如果设置为None而不是某个正数值,则不会出现timeout。所有操作的read默认值为 5 秒timeout

当您使用完毕后,您可以使用它await client.aclose()来明确关闭AsyncClient(例如,这可以在关闭事件处理程序中完成)。

运行多个异步操作(因为在调用 API 端点时需要请求五个不同的 URL),您可以使用 awaitable asyncio.gather()它将执行操作并按照传递给该函数的awaitables ( )async的顺序返回结果列表。tasks

工作示例

from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import httpx
import asyncio


URLS = ['https://www.foxnews.com/',
        'https://edition.cnn.com/',
        'https://www.nbcnews.com/',
        'https://www.bbc.co.uk/',
        'https://www.reuters.com/']


@asynccontextmanager
async def lifespan(app: FastAPI):
    # customize settings
    limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
    timeout = httpx.Timeout(5.0, read=15.0)  # 15s timeout on read. 5s timeout elsewhere.

    # Initialize the Client on startup and add it to the state
    async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
        yield {'client': client}
        # The Client closes on shutdown 


app = FastAPI(lifespan=lifespan)


async def send(url, client):
    return await client.get(url)


@app.get('/')
async def main(request: Request):
    client = request.state.client
    tasks = [send(url, client) for url in URLS]
    responses = await asyncio.gather(*tasks)
    return [r.text[:50] for r in responses]  # for demo purposes, only return the first 50 chars of each response

如果您不想将整个响应主体读入 RAM,则可以使用中的流响应httpx,以及使用 FastAPI StreamingResponse,如此答案中所述并在下面演示:

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
import httpx
import asyncio


URLS = ['https://www.foxnews.com/',
        'https://edition.cnn.com/',
        'https://www.nbcnews.com/',
        'https://www.bbc.co.uk/',
        'https://www.reuters.com/']


@asynccontextmanager
async def lifespan(app: FastAPI):
    # customize settings
    limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
    timeout = httpx.Timeout(5.0, read=15.0)  # 15s timeout on read. 5s timeout elsewhere.

    # Initialize the Client on startup and add it to the state
    async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
        yield {'client': client}
        # The Client closes on shutdown 


app = FastAPI(lifespan=lifespan)


async def send(url, client):
    req = client.build_request('GET', url)
    return await client.send(req, stream=True)


async def iter_content(responses):
     for r in responses:
        async for chunk in r.aiter_text():
            yield chunk[:50]  # for demo purposes, return only the first 50 chars of each response and then break the loop
            yield '

'
            break
        await r.aclose()


@app.get('/')
async def main(request: Request):
    client = request.state.client
    tasks = [send(url, client) for url in URLS]
    responses = await asyncio.gather(*tasks)
    return StreamingResponse(iter_content(responses), media_type='text/event-stream')
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   990  
  在项目管理领域,CDCP(Certified Data Center Professional)认证评审是一个至关重要的环节,它不仅验证了项目团队的专业能力,还直接关系到项目的成功与否。在这一评审过程中,沟通技巧的运用至关重要。有效的沟通不仅能够确保信息的准确传递,还能增强团队协作,提升评审效率。本文将深入探讨CDCP...
华为IPD流程   26  
  IPD(Integrated Product Development,集成产品开发)是一种以客户需求为核心、跨部门协同的产品开发模式,旨在通过高效的资源整合和流程优化,提升产品开发的成功率和市场竞争力。在IPD培训课程中,掌握关键成功因素是确保团队能够有效实施这一模式的核心。以下将从五个关键成功因素展开讨论,帮助企业和...
IPD项目流程图   27  
  华为IPD(Integrated Product Development,集成产品开发)流程是华为公司在其全球化进程中逐步构建和完善的一套高效产品开发管理体系。这一流程不仅帮助华为在技术创新和产品交付上实现了质的飞跃,还为其在全球市场中赢得了显著的竞争优势。IPD的核心在于通过跨部门协作、阶段性评审和市场需求驱动,确保...
华为IPD   26  
  华为作为全球领先的通信技术解决方案提供商,其成功的背后离不开一套成熟的管理体系——集成产品开发(IPD)。IPD不仅是一种产品开发流程,更是一种系统化的管理思想,它通过跨职能团队的协作、阶段评审机制和市场需求驱动的开发模式,帮助华为在全球市场中脱颖而出。从最初的国内市场到如今的全球化布局,华为的IPD体系在多个领域展现...
IPD管理流程   53  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用