如何在 Python FastAPI 中记录原始 HTTP 请求/响应?
- 2024-12-12 08:40:00
- admin 原创
- 131
问题描述:
我们正在使用 Python FastAPI 编写一个 Web 服务,该服务将托管在 Kubernetes 中。出于审计目的,我们需要保存特定路由的request
/的原始 JSON 主体。和JSON的主体大小约为1MB,最好不要影响响应时间。我们该怎么做?response
`request`response
解决方案 1:
选项 1 - 使用中间件
您可以使用Middleware
。Amiddleware
接收应用程序收到的每个请求,因此,您可以request
在任何特定端点处理之前处理 ,以及response
在将 返回到客户端之前处理 。要创建,您可以在函数顶部middleware
使用装饰器,如下所示。@app.middleware("http")
由于您需要从流中使用请求主体middleware
— 使用request.body()
或,如本答案request.stream()
所示(在后台,前一种方法实际上调用后者,请参阅此处)— 那么当您稍后将 传递给相应的端点时它将不可用。因此,您可以按照本文中描述的方法使请求主体在后续可用(即使用下面的函数)。更新:此问题现已修复,因此,如果您使用的是 FastAPI 0.108.0 或更高版本,则无需使用该解决方法。request
`set_body()`
至于正文response
,您可以使用与此答案中描述的相同方法来使用正文,然后将 返回response
给客户端。 上述链接答案中描述的任一选项都可以工作;但是,下面的方法使用选项 2,它将正文存储在字节对象中并Response
直接返回自定义(以及status_code
原始的headers
和)。media_type
`response`
要记录数据,您可以使用,如本答案以及本答案和本答案BackgroundTask
中所述。只有在发送响应后才会运行(另请参阅Starlette文档);因此,客户端不必等待记录完成后再接收(因此,响应时间不会受到明显影响)。BackgroundTask
`response`
笔记
如果您有一个流式传输request
或,response
其主体无法放入服务器的 RAM(例如,假设在运行 8GB RAM 的机器上有一个 100GB 的主体),那么就会出现问题,因为您将数据存储到 RAM,而 RAM 没有足够的空间来容纳累积的数据。此外,如果是大型response
(例如,大型FileResponse
或StreamingResponse
),您可能会在客户端(或反向代理端,如果您使用的话)遇到Timeout
错误,因为您将无法响应客户端,直到您读取整个响应主体(因为您正在循环response.body_iterator
)。您提到“请求和响应 JSON 的主体大小约为 1MB”;因此,这通常应该没问题(但是,事先考虑一些事项始终是一个好习惯,例如您的 API 预计会同时服务多少个请求,哪些其他应用程序可能会使用 RAM 等,以便判断这是否是问题)。如果需要,您可以使用例如SlowAPI来限制对 API 端点的请求数量(如此答案中所示)。
middleware
限制仅在特定路线上使用
您可以通过以下方式限制对特定端点的使用middleware
:
检查
request.url.path
中间件内部是否符合您想要记录的路由的预定义列表request
,response
如本答案中所述(请参阅“更新”部分),或使用子应用程序,如此答案中所示
或者使用自定义
APIRoute
类,如下面的选项 2所示
。
工作示例
from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Dict, Any
import logging
app = FastAPI()
logging.basicConfig(filename='info.log', level=logging.DEBUG)
def log_info(req_body, res_body):
logging.info(req_body)
logging.info(res_body)
# not needed when using FastAPI>=0.108.0.
'''
async def set_body(request: Request, body: bytes):
async def receive() -> Message:
return {'type': 'http.request', 'body': body}
request._receive = receive
'''
@app.middleware('http')
async def some_middleware(request: Request, call_next):
req_body = await request.body()
#await set_body(request, req_body) # not needed when using FastAPI>=0.108.0.
response = await call_next(request)
chunks = []
async for chunk in response.body_iterator:
chunks.append(chunk)
res_body = b''.join(chunks)
task = BackgroundTask(log_info, req_body, res_body)
return Response(content=res_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type, background=task)
@app.post('/')
def main(payload: Dict[Any, Any]):
return payload
如果您想对请求正文执行某些验证(例如,确保请求正文大小不超过某个值),则request.body()
可以不使用,而是使用方法一次处理一个块的正文.stream()
,如下所示(类似于这个答案)。
@app.middleware('http')
async def some_middleware(request: Request, call_next):
chunks = []
async for chunk in request.stream():
chunks.append(chunk)
req_body = b''.join(chunks)
...
选项 2 - 使用自定义APIRoute
类
您也可以使用自定义 APIRoute
类(类似于此处和此处),除其他功能外,它还允许您request
在应用程序处理正文之前以及response
在将正文返回到客户端之前对其进行操作。此选项还允许您将此类的使用限制在您希望的路由中,因为只有 下的端点APIRouter
(即,router
在下面的示例中)将使用自定义APIRoute
类。
应该注意的是,上文“注释”部分下选项 1中提到的注释也适用于此选项。例如,如果您的 API 返回— 如以下示例所示,即从在线源流式传输视频文件(可在此处找到用于测试此问题的公共视频,您甚至可以使用比下面使用的更长的视频来更清楚地查看效果)— 如果服务器的 RAM 无法处理,您可能会在服务器端遇到问题,以及由于整个(流式传输)响应在返回客户端之前被读取并存储在 RAM 中(如前所述)而导致客户端(和反向代理服务器,如果使用)出现延迟。在这种情况下,您可以排除从自定义类中返回的端点,并将其使用限制在所需的路由上 - 特别是如果它是一个大型视频文件,或者甚至是实时视频,将其存储在日志中可能没有多大意义 - 只需不使用装饰器(即, 在下面的例子中)这样的端点,而是使用装饰器(即,在下面的例子中),或其他或子应用程序。StreamingResponse
`/videoStreamingResponse
APIRoute@<name_of_router>
@router@<name_of_app>
@app`APIRouter
工作示例
from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Callable, Dict, Any
import logging
import httpx
def log_info(req_body, res_body):
logging.info(req_body)
logging.info(res_body)
class LoggingRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
req_body = await request.body()
response = await original_route_handler(request)
tasks = response.background
if isinstance(response, StreamingResponse):
chunks = []
async for chunk in response.body_iterator:
chunks.append(chunk)
res_body = b''.join(chunks)
task = BackgroundTask(log_info, req_body, res_body)
response = Response(content=res_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type)
else:
task = BackgroundTask(log_info, req_body, response.body)
# check if the original response had background tasks already attached to it
if tasks:
tasks.add_task(task) # add the new task to the tasks list
response.background = tasks
else:
response.background = task
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=LoggingRoute)
logging.basicConfig(filename='info.log', level=logging.DEBUG)
@router.post('/')
def main(payload: Dict[Any, Any]):
return payload
@router.get('/video')
def get_video():
url = 'https://storage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4'
def gen():
with httpx.stream('GET', url) as r:
for chunk in r.iter_raw():
yield chunk
return StreamingResponse(gen(), media_type='video/mp4')
app.include_router(router)
解决方案 2:
您可以尝试按照 FastAPI 官方文档自定义APIRouter :
import time
from typing import Callable
from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute
class TimedRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
before = time.time()
response: Response = await original_route_handler(request)
duration = time.time() - before
response.headers["X-Response-Time"] = str(duration)
print(f"route duration: {duration}")
print(f"route response: {response}")
print(f"route response headers: {response.headers}")
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=TimedRoute)
@app.get("/")
async def not_timed():
return {"message": "Not timed"}
@router.get("/timed")
async def timed():
return {"message": "It's the time of my life"}
app.include_router(router)
解决方案 3:
由于其他答案对我来说不起作用,并且我在 stackoverflow 上进行了相当广泛的搜索以解决此问题,因此我将在下面展示我的解决方案。
主要问题是,当使用请求正文或响应正文时,许多在线提供的方法/解决方案根本不起作用,因为请求/响应正文在从流中读取时被消耗了。
为了解决这个问题,我采用了一种方法,基本上是在读取请求和响应后重建它们。这主要基于用户“kovalevvlad”在https://github.com/encode/starlette/issues/495上的评论。
创建自定义中间件,稍后将其添加到应用程序中以记录所有请求和响应。请注意,您需要某种记录器来存储日志。
from json import JSONDecodeError
import json
import logging
from typing import Callable, Awaitable, Tuple, Dict, List
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.types import Scope, Message
# Set up your custom logger here
logger = ""
class RequestWithBody(Request):
"""Creation of new request with body"""
def __init__(self, scope: Scope, body: bytes) -> None:
super().__init__(scope, self._receive)
self._body = body
self._body_returned = False
async def _receive(self) -> Message:
if self._body_returned:
return {"type": "http.disconnect"}
else:
self._body_returned = True
return {"type": "http.request", "body": self._body, "more_body": False}
class CustomLoggingMiddleware(BaseHTTPMiddleware):
"""
Use of custom middleware since reading the request body and the response consumes the bytestream.
Hence this approach to basically generate a new request/response when we read the attributes for logging.
"""
async def dispatch( # type: ignore
self, request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]]
) -> Response:
# Store request body in a variable and generate new request as it is consumed.
request_body_bytes = await request.body()
request_with_body = RequestWithBody(request.scope, request_body_bytes)
# Store response body in a variable and generate new response as it is consumed.
response = await call_next(request_with_body)
response_content_bytes, response_headers, response_status = await self._get_response_params(response)
# Logging
# If there is no request body handle exception, otherwise convert bytes to JSON.
try:
req_body = json.loads(request_body_bytes)
except JSONDecodeError:
req_body = ""
# Logging of relevant variables.
logger.info(
f"{request.method} request to {request.url} metadata
"
f" Status_code: {response.status_code}
"
f" Request_Body: {req_body}
"
)
# Finally, return the newly instantiated response values
return Response(response_content_bytes, response_status, response_headers)
async def _get_response_params(self, response: StreamingResponse) -> Tuple[bytes, Dict[str, str], int]:
"""Getting the response parameters of a response and create a new response."""
response_byte_chunks: List[bytes] = []
response_status: List[int] = []
response_headers: List[Dict[str, str]] = []
async def send(message: Message) -> None:
if message["type"] == "http.response.start":
response_status.append(message["status"])
response_headers.append({k.decode("utf8"): v.decode("utf8") for k, v in message["headers"]})
else:
response_byte_chunks.append(message["body"])
await response.stream_response(send)
content = b"".join(response_byte_chunks)
return content, response_headers[0], response_status[0]