在 FastAPI 端点中进行 concurrent.futures.ThreadPoolExecutor 调用是否危险?

2024-12-20 08:37:00
admin
原创
231
摘要:问题描述:我有以下测试代码:import concurrent.futures import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj...

问题描述:

我有以下测试代码:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

我需要concurrent.futures.ThreadPoolExecutor在 FastAPI 端点中使用部分代码。

我担心的是 API 调用数量和线程包含的影响。担心创建过多线程及其相关后果,导致主机资源不足,应用程序和/或主机崩溃。

对于这种方法您有什么想法或困惑吗?


解决方案 1:

您应该使用提供API 的HTTPX库。如本答案所述,每次需要时,您都会生成一个并重复使用它。要使用 发出异步请求,您需要一个。async`ClientHTTPXAsyncClient`

您还可以使用limits上的关键字参数来控制连接池大小,Client该参数采用 的一个实例httpx.Limits。例如:

limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.AsyncClient(limits=limits)

您可以根据需要调整上述内容。根据池限制配置的文档:

  • max_keepalive_connections,允许的保持连接数,或None始终允许。(默认值为20

  • max_connections,允许的最大连接数,或None 无限制。(默认100

  • keepalive_expiry,空闲保持连接的时间限制(以秒为单位),或None无限制。(默认5

如果您还想调整超时,则可以使用timeout参数设置单个请求或Client/AsyncClient实例的超时,这会导致给定的超时被用作使用此客户端发出的请求的默认值(Timeout另请参阅类的实现)。您可以详细指定超时行为;例如,设置read超时参数将指定等待接收数据块(即响应主体的块)的最大持续时间。如果HTTPX无法在此时间范围内接收数据,ReadTimeout则会引发异常。如果设置为None而不是某个正数值,则不会出现timeout。所有操作的read默认值为 5 秒timeout

当您使用完毕后,您可以使用它await client.aclose()来明确关闭AsyncClient(例如,这可以在关闭事件处理程序中完成)。

运行多个异步操作(因为在调用 API 端点时需要请求五个不同的 URL),您可以使用 awaitable asyncio.gather()它将执行操作并按照传递给该函数的awaitables ( )async的顺序返回结果列表。tasks

工作示例

from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import httpx
import asyncio


URLS = ['https://www.foxnews.com/',
        'https://edition.cnn.com/',
        'https://www.nbcnews.com/',
        'https://www.bbc.co.uk/',
        'https://www.reuters.com/']


@asynccontextmanager
async def lifespan(app: FastAPI):
    # customize settings
    limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
    timeout = httpx.Timeout(5.0, read=15.0)  # 15s timeout on read. 5s timeout elsewhere.

    # Initialize the Client on startup and add it to the state
    async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
        yield {'client': client}
        # The Client closes on shutdown 


app = FastAPI(lifespan=lifespan)


async def send(url, client):
    return await client.get(url)


@app.get('/')
async def main(request: Request):
    client = request.state.client
    tasks = [send(url, client) for url in URLS]
    responses = await asyncio.gather(*tasks)
    return [r.text[:50] for r in responses]  # for demo purposes, only return the first 50 chars of each response

如果您不想将整个响应主体读入 RAM,则可以使用中的流响应httpx,以及使用 FastAPI StreamingResponse,如此答案中所述并在下面演示:

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
import httpx
import asyncio


URLS = ['https://www.foxnews.com/',
        'https://edition.cnn.com/',
        'https://www.nbcnews.com/',
        'https://www.bbc.co.uk/',
        'https://www.reuters.com/']


@asynccontextmanager
async def lifespan(app: FastAPI):
    # customize settings
    limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
    timeout = httpx.Timeout(5.0, read=15.0)  # 15s timeout on read. 5s timeout elsewhere.

    # Initialize the Client on startup and add it to the state
    async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
        yield {'client': client}
        # The Client closes on shutdown 


app = FastAPI(lifespan=lifespan)


async def send(url, client):
    req = client.build_request('GET', url)
    return await client.send(req, stream=True)


async def iter_content(responses):
     for r in responses:
        async for chunk in r.aiter_text():
            yield chunk[:50]  # for demo purposes, return only the first 50 chars of each response and then break the loop
            yield '

'
            break
        await r.aclose()


@app.get('/')
async def main(request: Request):
    client = request.state.client
    tasks = [send(url, client) for url in URLS]
    responses = await asyncio.gather(*tasks)
    return StreamingResponse(iter_content(responses), media_type='text/event-stream')
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用