如何在 Python FastAPI 中记录原始 HTTP 请求/响应?

2024-12-12 08:40:00
admin
原创
131
摘要:问题描述:我们正在使用 Python FastAPI 编写一个 Web 服务,该服务将托管在 Kubernetes 中。出于审计目的,我们需要保存特定路由的request/的原始 JSON 主体。和JSON的主体大小约为1MB,最好不要影响响应时间。我们该怎么做?response`request`respon...

问题描述:

我们正在使用 Python FastAPI 编写一个 Web 服务,该服务将托管在 Kubernetes 中。出于审计目的,我们需要保存特定路由的request/的原始 JSON 主体。和JSON的主体大小约为1MB,最好不要影响响应时间。我们该怎么做?response`request`response


解决方案 1:

选项 1 - 使用中间件

您可以使用Middleware。Amiddleware接收应用程序收到的每个请求,因此,您可以request在任何特定端点处理之前处理 ,以及response在将 返回到客户端之前处理 。要创建,您可以在函数顶部middleware使用装饰器,如下所示。@app.middleware("http")

由于您需要从流中使用请求主体middleware— 使用request.body()或,如本答案request.stream()所示(在后台,前一种方法实际上调用后者,请参阅此处)— 那么当您稍后将 传递给相应的端点时它将不可用。因此,您可以按照本文中描述的方法使请求主体在后续可用(即使用下面的函数)。更新:此问题现已修复,因此,如果您使用的是 FastAPI 0.108.0 或更高版本,则无需使用该解决方法。request`set_body()`

至于正文response,您可以使用与此答案中描述的相同方法来使用正文,然后将 返回response给客户端。 上述链接答案中描述的任一选项都可以工作;但是,下面的方法使用选项 2,它将正文存储在字节对象中并Response直接返回自定义(以及status_code原始的headers和)。media_type`response`

要记录数据,您可以使用,如本答案以及本答案和本答案BackgroundTask中所述。只有在发送响应后才会运行(另请参阅Starlette文档);因此,客户端不必等待记录完成后再接收(因此,响应时间不会受到明显影响)。BackgroundTask`response`

笔记

如果您有一个流式传输request或,response其主体无法放入服务器的 RAM(例如,假设在运行 8GB RAM 的机器上有一个 100GB 的主体),那么就会出现问题,因为您将数据存储到 RAM,而 RAM 没有足够的空间来容纳累积的数据。此外,如果是大型response(例如,大型FileResponseStreamingResponse),您可能会在客户端(或反向代理端,如果您使用的话)遇到Timeout错误,因为您将无法响应客户端,直到您读取整个响应主体(因为您正在循环response.body_iterator)。您提到“请求和响应 JSON 的主体大小约为 1MB”;因此,这通常应该没问题(但是,事先考虑一些事项始终是一个好习惯,例如您的 API 预计会同时服务多少个请求,哪些其他应用程序可能会使用 RAM 等,以便判断这是否是问题)。如果需要,您可以使用例如SlowAPI来限制对 API 端点的请求数量(如此答案中所示)。

middleware限制仅在特定路线上使用

您可以通过以下方式限制对特定端点的使用middleware

  • 检查request.url.path中间件内部是否符合您想要记录的路由的预定义列表
    requestresponse如本答案中所述(请参阅“更新”部分),

  • 或使用子应用程序,如此答案中所示

  • 或者使用自定义APIRoute类,如下面的选项 2所示

工作示例

from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Dict, Any
import logging


app = FastAPI()
logging.basicConfig(filename='info.log', level=logging.DEBUG)


def log_info(req_body, res_body):
    logging.info(req_body)
    logging.info(res_body)


# not needed when using FastAPI>=0.108.0.
'''
async def set_body(request: Request, body: bytes):
    async def receive() -> Message:
        return {'type': 'http.request', 'body': body}
    request._receive = receive
'''

@app.middleware('http')
async def some_middleware(request: Request, call_next):
    req_body = await request.body()
    #await set_body(request, req_body)  # not needed when using FastAPI>=0.108.0.
    response = await call_next(request)
    
    chunks = []
    async for chunk in response.body_iterator:
        chunks.append(chunk)
    res_body = b''.join(chunks)
    
    task = BackgroundTask(log_info, req_body, res_body)
    return Response(content=res_body, status_code=response.status_code, 
        headers=dict(response.headers), media_type=response.media_type, background=task)


@app.post('/')
def main(payload: Dict[Any, Any]):
    return payload

如果您想对请求正文执行某些验证(例如,确保请求正文大小不超过某个值),则request.body()可以不使用,而是使用方法一次处理一个块的正文.stream(),如下所示(类似于这个答案)。

@app.middleware('http')
async def some_middleware(request: Request, call_next):
    chunks = []
    async for chunk in request.stream():
        chunks.append(chunk)
    req_body = b''.join(chunks)
    ...

选项 2 - 使用自定义APIRoute

您也可以使用自定义 APIRoute类(类似于此处和此处),除其他功能外,它还允许您request在应用程序处理正文之前以及response在将正文返回到客户端之前对其进行操作。此选项还允许您将此类的使用限制在您希望的路由中,因为只有 下的端点APIRouter(即,router在下面的示例中)将使用自定义APIRoute类。

应该注意的是,上文“注释”部分下选项 1中提到的注释也适用于此选项。例如,如果您的 API 返回— 如以下示例所示,即从在线源流式传输视频文件(可在此处找到用于测试此问题的公共视频,您甚至可以使用比下面使用的更长的视频来更清楚地查看效果)— 如果服务器的 RAM 无法处理,您可能会在服务器端遇到问题,以及由于整个(流式传输)响应在返回客户端之前被读取并存储在 RAM 中(如前所述)而导致客户端(和反向代理服务器,如果使用)出现延迟。在这种情况下,您可以排除从自定义类中返回的端点,并将其使用限制在所需的路由上 - 特别是如果它是一个大型视频文件,或者甚至是实时视频,将其存储在日志中可能没有多大意义 - 只需不使用装饰器(即, 在下面的例子中)这样的端点,而是使用装饰器(即,在下面的例子中),或其他或子应用程序。StreamingResponse`/videoStreamingResponseAPIRoute@<name_of_router>@router@<name_of_app>@app`APIRouter

工作示例

from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Callable, Dict, Any
import logging
import httpx


def log_info(req_body, res_body):
    logging.info(req_body)
    logging.info(res_body)

       
class LoggingRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            req_body = await request.body()
            response = await original_route_handler(request)
            tasks = response.background
            
            if isinstance(response, StreamingResponse):
                chunks = []
                async for chunk in response.body_iterator:
                    chunks.append(chunk)
                res_body = b''.join(chunks)
                
                task = BackgroundTask(log_info, req_body, res_body)
                response = Response(content=res_body, status_code=response.status_code, 
                        headers=dict(response.headers), media_type=response.media_type)
            else:
                task = BackgroundTask(log_info, req_body, response.body)
            
            # check if the original response had background tasks already attached to it
            if tasks:
                tasks.add_task(task)  # add the new task to the tasks list
                response.background = tasks
            else:
                response.background = task
                
            return response
            
        return custom_route_handler


app = FastAPI()
router = APIRouter(route_class=LoggingRoute)
logging.basicConfig(filename='info.log', level=logging.DEBUG)


@router.post('/')
def main(payload: Dict[Any, Any]):
    return payload


@router.get('/video')
def get_video():
    url = 'https://storage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4'
    
    def gen():
        with httpx.stream('GET', url) as r:
            for chunk in r.iter_raw():
                yield chunk

    return StreamingResponse(gen(), media_type='video/mp4')


app.include_router(router)

解决方案 2:

您可以尝试按照 FastAPI 官方文档自定义APIRouter :

import time
from typing import Callable

from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute


class TimedRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            before = time.time()
            response: Response = await original_route_handler(request)
            duration = time.time() - before
            response.headers["X-Response-Time"] = str(duration)
            print(f"route duration: {duration}")
            print(f"route response: {response}")
            print(f"route response headers: {response.headers}")
            return response

        return custom_route_handler


app = FastAPI()
router = APIRouter(route_class=TimedRoute)


@app.get("/")
async def not_timed():
    return {"message": "Not timed"}


@router.get("/timed")
async def timed():
    return {"message": "It's the time of my life"}


app.include_router(router)

解决方案 3:

由于其他答案对我来说不起作用,并且我在 stackoverflow 上进行了相当广泛的搜索以解决此问题,因此我将在下面展示我的解决方案。

主要问题是,当使用请求正文或响应正文时,许多在线提供的方法/解决方案根本不起作用,因为请求/响应正文在从流中读取时被消耗了。

为了解决这个问题,我采用了一种方法,基本上是在读取请求和响应后重建它们。这主要基于用户“kovalevvlad”在https://github.com/encode/starlette/issues/495上的评论。

创建自定义中间件,稍后将其添加到应用程序中以记录所有请求和响应。请注意,您需要某种记录器来存储日志。

from json import JSONDecodeError
import json
import logging
from typing import Callable, Awaitable, Tuple, Dict, List

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.types import Scope, Message

# Set up your custom logger here
logger = ""

class RequestWithBody(Request):
    """Creation of new request with body"""
    def __init__(self, scope: Scope, body: bytes) -> None:
        super().__init__(scope, self._receive)
        self._body = body
        self._body_returned = False

    async def _receive(self) -> Message:
        if self._body_returned:
            return {"type": "http.disconnect"}
        else:
            self._body_returned = True
            return {"type": "http.request", "body": self._body, "more_body": False}


class CustomLoggingMiddleware(BaseHTTPMiddleware):
    """
    Use of custom middleware since reading the request body and the response consumes the bytestream.
    Hence this approach to basically generate a new request/response when we read the attributes for logging.
    """
    async def dispatch(  # type: ignore
        self, request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]]
    ) -> Response:
            # Store request body in a variable and generate new request as it is consumed.
            request_body_bytes = await request.body()
        
        request_with_body = RequestWithBody(request.scope, request_body_bytes)

        # Store response body in a variable and generate new response as it is consumed.
        response = await call_next(request_with_body)
        response_content_bytes, response_headers, response_status = await self._get_response_params(response)

        # Logging

        # If there is no request body handle exception, otherwise convert bytes to JSON.
        try:
            req_body = json.loads(request_body_bytes)
        except JSONDecodeError:
            req_body = ""
        # Logging of relevant variables.
        logger.info(
            f"{request.method} request to {request.url} metadata
"
            f"    Status_code: {response.status_code}
"
            f"    Request_Body: {req_body}
"
        )
        # Finally, return the newly instantiated response values
        return Response(response_content_bytes, response_status, response_headers)

async def _get_response_params(self, response: StreamingResponse) -> Tuple[bytes, Dict[str, str], int]:
    """Getting the response parameters of a response and create a new response."""
    response_byte_chunks: List[bytes] = []
    response_status: List[int] = []
    response_headers: List[Dict[str, str]] = []

    async def send(message: Message) -> None:
        if message["type"] == "http.response.start":
            response_status.append(message["status"])
            response_headers.append({k.decode("utf8"): v.decode("utf8") for k, v in message["headers"]})
        else:
            response_byte_chunks.append(message["body"])

    await response.stream_response(send)
    content = b"".join(response_byte_chunks)
    return content, response_headers[0], response_status[0]
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用