前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[Python] FastAPI踩坑记录--中间件消费请求体后请求卡死的分析和解决

[Python] FastAPI踩坑记录--中间件消费请求体后请求卡死的分析和解决

原创
作者头像
kr
发布2024-09-25 02:48:35
1220
发布2024-09-25 02:48:35
举报
文章被收录于专栏:开发流水账

概括

倘若在中间件中消费了请求体,会导致程序卡死在下一步骤处理函数call_next中。

要想对请求体做预处理,需要通过自定义请求+APIRouter解决。

问题代码示例

当使用request_json = await request.json()消费了请求对象后,程序会卡在response = await call_next(request)无法进行下去。

经过测试,通过await request.json()或者await request.body()消费后,程序均会卡在await call_next(request)

而使用request.headers.get("X-Sign")获取请求头信息则不会出现这种情况

代码语言:python
代码运行次数:0
复制
@app.middleware("http")
async def sync_middleware(request: Request, call_next):
    request_json = await request.json()
    _data = {
        "ip":request.client.host,
        "X-Sign":request.headers.get("X-Sign"),
        "body":request_json,
    }
    # 同步代码,做鉴权
    result = await run_in_threadpool(sync_code, _data)
    if result != 200:
        return Response(status_code=result)
    
    response = await call_next(request)
    return response

问题原因

这是FastAPI的设计问题,可以在FastAPI GitHub的issues中找到不少相关问题,例如:

https://github.com/fastapi/fastapi/issues/394

https://github.com/fastapi/fastapi/issues/5386

这是一个起码从2019年便被发现并存在至今的问题。

用一句话描述就是请求体只能被读取一次,如果在中间件中已经读取了请求体,那么后续的任何尝试再次读取请求体的操作都将陷入无限等待。

详细原因可以看GitHub中的讨论,这里不细说。

解决办法可以参考官网这篇教程:

https://fastapi.tiangolo.com/how-to/custom-request-and-route/#create-a-custom-gziproute-class

中文的问题分析和自定义请求编写可以参考如下文章:

https://www.modb.pro/db/144294

https://www.cnblogs.com/a00ium/p/13662335.html

问题解决

代码语言:python
代码运行次数:0
复制
from fastapi import FastAPI, Request, Response, APIRouter
from typing import Callable
from fastapi.routing import APIRoute
from pydantic import BaseModel

class middlewareRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            before = time.time()
            
            # 这里可以获取的我们的请求的体的信息----
            
            response: Response = await original_route_handler(request)

            request_body = await request.body()

            _data = {
                "ip":request.client.host,
                "X-Sign":request.headers.get("X-Sign"),
                "body":request_body,
            }

            # 同步代码,做鉴权
            result = await run_in_threadpool(sync_code, _data)
            if result != 200:
                return Response(status_code=result)
            
            # 下面可以处理我们的响应体的报文信息
            
            duration = time.time() - before
            response.headers["X-Response-Time"] = str(duration)
            # print(f"route duration: {duration}")
            # print(f"route response: {response}")
            # print(f"route response headers: {response.headers}")
            return response

        return custom_route_handler
        
app = FastAPI()
router = APIRouter(route_class=middlewareRoute)

@app.get("/")
def index():
    return {}

class SignData(BaseModel):
    Zzz: str
    Abc: str
    Def: str

@router.post("/func")
def func_test(signData:SignData):
    # 省略功能代码
    return {}

app.include_router(router)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概括
  • 问题代码示例
  • 问题原因
  • 问题解决
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档