def my_background_task(task_id: str):
"""
测试 background_task
"""
begin_time = time.time()
time.sleep(random.randint(1, 5))
end_time = time.time()
print(f'task_id:{task_id}, task finished, time cost:{end_time - begin_time}s')
@app.post('/background_task')
async def background_task(background_tasks: BackgroundTasks):
"""
测试接口 background-task
"""
task_id = uuid.uuid4().hex
background_tasks.add_task(my_background_task, task_id)
return {
'current_task_id': task_id
}
def my_background_task(task_id: str):
"""
测试 background_task
"""
begin_time = time.time()
time.sleep(random.randint(1, 5))
end_time = time.time()
print(f'task_id:{task_id}, task finished, time cost:{end_time - begin_time}s')
def get_q(background_tasks: BackgroundTasks, q: str | None = None):
"""
测试接口 background-task
"""
if q is not None:
task_id = uuid.uuid4().hex
q = f'{q}-{task_id}'
background_tasks.add_task(my_background_task, task_id)
return q
@app.post('/background_task_2')
async def background_task_2(q: Annotated[str, Depends(get_q)]):
"""
测试接口 background-task
"""
return {
'current_task_id': q
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。