我有一个FastAPI GET
端点,它返回大量JSON数据(~160,000行和45列)。毫不奇怪,使用json.dumps()
返回数据的速度非常慢。我首先使用json.loads()
从文件中读取数据,然后根据输入的参数对其进行过滤。是否有比使用return data
更快地将数据返回给用户的方法?在目前的状态下,这需要近一分钟的时间。
我的代码当前如下所示:
# helper function to parse parquet file (where data is stored)
def parse_parquet(file_path):
df = pd.read_parquet(file_path)
result = df.to_json(orient = 'records')
parsed = json.loads(result)
return parsed
@app.get('/endpoint')
# has several more parameters
async def some_function(year = int | None = None, id = str | None = None):
if year is None:
data = parse_parquet(f'path/{year}_data.parquet')
# no year
if year is not None:
data = parse_parquet(f'path/all_data.parquet')
if id is not None:
data = [d for d in data if d['id'] == id]
return data
发布于 2022-09-02 00:55:37
响应如此缓慢的原因之一是,在您的parse_parquet()
方法中,首先将文件转换为JSON (使用df.to_json()
),然后转换为字典(使用json.loads()
),最后再转换为JSON,作为FastAPI,在幕后使用jsonable_encoder
将返回的值转化为与JSON兼容的数据,然后使用Python json.dumps()
来序列化对象--这个过程非常慢(更多细节见这个答案 )。
正如@MatsLindh在注释部分中所建议的那样,您可以使用替代的JSON编码器,例如奥尔森或尤约森,这确实会加快进程,而不是让FastAPI使用jsonable_encoder
,然后让标准json.dumps()
将数据转换为JSON。然而,,使用熊猫to_json()
并直接重新定制Response
--正如这个答案-seems的选项1 (Update 2)所描述的那样,它是性能最好的解决方案。您可以使用下面给出的代码--它使用班级-to来比较所有可用解决方案的响应时间。
使用您自己的拼花文件或下面的代码创建一个由160 K行和45列组成的样例拼花文件。
create_parquet.py
import pandas as pd
import numpy as np
columns = ['C' + str(i) for i in range(1, 46)]
df = pd.DataFrame(data=np.random.randint(99999, 99999999, size=(160000,45)),columns=columns)
df.to_parquet('data.parquet')
运行下面的FastAPI应用程序,并分别访问每个端点,以检查完成将数据加载和转换为JSON过程所需的时间。
app.py
from fastapi import FastAPI, APIRouter, Response, Request
from fastapi.routing import APIRoute
from typing import Callable
import pandas as pd
import json
import time
import ujson
import orjson
class TimedRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
before = time.time()
response: Response = await original_route_handler(request)
duration = time.time() - before
response.headers["Response-Time"] = str(duration)
print(f"route duration: {duration}")
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=TimedRoute)
@router.get("/defaultFastAPIencoder")
def get_data_default():
df = pd.read_parquet('data.parquet')
return df.to_dict(orient="records")
@router.get("/orjson")
def get_data_orjson():
df = pd.read_parquet('data.parquet')
return Response(orjson.dumps(df.to_dict(orient='records')), media_type="application/json")
@router.get("/ujson")
def get_data_ujson():
df = pd.read_parquet('data.parquet')
return Response(ujson.dumps(df.to_dict(orient='records')), media_type="application/json")
# Preferable way to do it
@router.get("/pandasJSON")
def get_data_pandasJSON():
df = pd.read_parquet('data.parquet')
return Response(df.to_json(orient="records"), media_type="application/json")
app.include_router(router)
尽管使用上面的/pandasJSON
响应时间相当快(这是最好的方法),但在浏览器上显示数据时可能会遇到一些延迟。然而,这与服务器端无关,而与客户端有关,因为浏览器试图显示大量数据。如果您不想显示数据,而是让用户将数据下载到他们的设备(这会快得多),您可以使用Content-Disposition
参数将Response
头设置为Response
,并传递一个filename
,指示浏览器应该下载该文件。有关更多细节,请查看这个答案和这个答案。
@router.get("/download")
def get_data():
df = pd.read_parquet('data.parquet')
headers = {'Content-Disposition': 'attachment; filename="data.json"'}
return Response(df.to_json(orient="records"), headers=headers, media_type='application/json')
我还应该提到,有一个名为Dask
的库,它可以处理大型数据集,如这里所描述的那样,以防您必须处理大量记录,并且要花费太长时间才能完成该过程。与Pandas类似,您可以使用.read_parquet()
方法读取文件。由于Dask似乎没有提供等效的.to_json()
方法,所以可以使用df.compute()
将Dask DataFrame转换为Pandas DataFrame,然后使用Pandas df.to_json()
将DataFrame转换为JSON字符串,并按上面所示返回。
发布于 2022-08-31 22:53:05
我猜在您的情况下,json.loads(result)
将返回一个dict数据类型,并且您正在过滤dict数据类型。您可以将dict数据类型发送为JSON,如下所示:
from fastapi.responses import JSONResponse
@app.get('/endpoint')
# has several more parameters
async def some_function(year = int | None = None, id = str | None = None):
if year is None:
data = parse_parquet(f'path/{year}_data.parquet')
# no year
if year is not None:
data = parse_parquet(f'path/all_data.parquet')
if id is not None:
data = [d for d in data if d['id'] == id]
return JSONResponse(content=json_compatible_item_data)
https://stackoverflow.com/questions/73564771
复制相似问题