FastAPI 返回大量 JSON 数据非常慢
- 2025-02-08 08:52:00
- admin 原创
- 70
问题描述:
我有一个 FastAPIGET
端点,它返回大量 JSON 数据(约 160,000 行和 45 列)。毫不奇怪,使用 返回数据非常json.dumps()
慢。我首先使用 从文件中读取数据json.loads()
,然后根据输入的参数对其进行过滤。有没有比使用 更快的方法将数据返回给用户return data
?在当前状态下需要将近一分钟。
我的代码目前如下所示:
# helper function to parse parquet file (where data is stored)
def parse_parquet(file_path):
df = pd.read_parquet(file_path)
result = df.to_json(orient = 'records')
parsed = json.loads(result)
return parsed
@app.get('/endpoint')
# has several more parameters
async def some_function(year = int | None = None, id = str | None = None):
if year is None:
data = parse_parquet(f'path/{year}_data.parquet')
# no year
if year is not None:
data = parse_parquet(f'path/all_data.parquet')
if id is not None:
data = [d for d in data if d['id'] == id]
return data
解决方案 1:
响应如此缓慢的原因之一是,在您的parse_parquet()
方法中,您首先将文件转换为 JSON(使用df.to_json()
),然后转换为字典(使用json.loads()
),最后再次转换为 JSON,因为 FastAPI 在后台使用自动将返回值转换为 JSON 兼容数据jsonable_encoder
,然后使用 Python 标准json.dumps()
序列化对象 - 这个过程非常慢(有关更多详细信息,请参阅此答案)。
正如评论部分中 @MatsLindh 所建议的那样,您可以使用替代的 JSON 编码器,例如orjson或ujosn(也请参阅此答案),与让 FastAPI 使用jsonable_encoder
然后json.dumps()
将数据转换为 JSON 的标准相比,这确实可以加快该过程。但是,使用 pandasto_json()
并直接返回自定义Response
(如本答案的选项 1(更新 2)中所述)似乎是性能最佳的解决方案。您可以使用下面给出的代码(使用自定义类)来比较所有可用解决方案的响应时间。APIRoute
使用您自己的镶木地板文件或以下代码创建一个由 160K 行和 45 列组成的示例镶木地板文件。
创建镶木地板.py
import pandas as pd
import numpy as np
columns = ['C' + str(i) for i in range(1, 46)]
df = pd.DataFrame(data=np.random.randint(99999, 99999999, size=(160000,45)),columns=columns)
df.to_parquet('data.parquet')
运行下面的 FastAPI 应用程序并分别访问每个端点以检查完成加载和将数据转换为 JSON 的过程所需的时间。
应用程序
from fastapi import FastAPI, APIRouter, Response, Request
from fastapi.routing import APIRoute
from typing import Callable
import pandas as pd
import json
import time
import ujson
import orjson
class TimedRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
before = time.time()
response: Response = await original_route_handler(request)
duration = time.time() - before
response.headers["Response-Time"] = str(duration)
print(f"route duration: {duration}")
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=TimedRoute)
@router.get("/defaultFastAPIencoder")
def get_data_default():
df = pd.read_parquet('data.parquet')
return df.to_dict(orient="records")
@router.get("/orjson")
def get_data_orjson():
df = pd.read_parquet('data.parquet')
return Response(orjson.dumps(df.to_dict(orient='records')), media_type="application/json")
@router.get("/ujson")
def get_data_ujson():
df = pd.read_parquet('data.parquet')
return Response(ujson.dumps(df.to_dict(orient='records')), media_type="application/json")
# Preferred way
@router.get("/pandasJSON")
def get_data_pandasJSON():
df = pd.read_parquet('data.parquet')
return Response(df.to_json(orient="records"), media_type="application/json")
app.include_router(router)
尽管使用/pandasJSON
上述方法响应时间非常快(这应该是首选方法),但在浏览器上显示数据时可能会遇到一些延迟。然而,这与服务器端无关,而是与客户端有关,因为浏览器正在尝试显示大量数据。如果您不想显示数据,而是让用户将数据下载到他们的设备(这会快得多),您可以Content-Disposition
在Response
使用attachment
参数设置标头并传递一个filename
,指示浏览器应该下载该文件。有关更多详细信息,请查看此答案和此答案。
@router.get("/download")
def get_data():
df = pd.read_parquet('data.parquet')
headers = {'Content-Disposition': 'attachment; filename="data.json"'}
return Response(df.to_json(orient="records"), headers=headers, media_type='application/json')
我还应该提到,有一个名为的库Dask
可以处理大型数据集,如此处所述,以防您必须处理大量记录而需要很长时间才能完成。与 Pandas 类似,您可以使用方法.read_parquet()
读取文件。由于 Dask 似乎没有提供等效.to_json()
方法,您可以使用将 Dask DataFrame 转换为 Pandas DataFrame df.compute()
,然后使用 Pandasdf.to_json()
将 DataFrame 转换为 JSON 字符串,并按上述示例返回它。
我还建议您看一下这个答案,它提供了有关流式传输/返回 DataFrame 的详细信息和解决方案,以防您处理大量数据,将它们转换为 JSON(使用.to_json()
)或 CSV(使用)可能会导致服务器端出现内存问题,如果您选择将输出字符串(JSON 或 CSV)存储到 RAM 中(如果您不将路径.to_csv()
参数传递给上述函数,这是默认行为)——因为已经为原始 DataFrame 分配了大量内存。
解决方案 2:
我猜想json.loads(result)
在你的情况下会返回一个 dict 数据类型,并且你正在过滤 dict 数据类型。你可以按如下方式将 dict 数据类型发送为 JSON:
from fastapi.responses import JSONResponse
@app.get('/endpoint')
# has several more parameters
async def some_function(year = int | None = None, id = str | None = None):
if year is None:
data = parse_parquet(f'path/{year}_data.parquet')
# no year
if year is not None:
data = parse_parquet(f'path/all_data.parquet')
if id is not None:
data = [d for d in data if d['id'] == id]
return JSONResponse(content=json_compatible_item_data)