前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >FastAPI后台开发基础(19): 异步任务BackgroundTask

FastAPI后台开发基础(19): 异步任务BackgroundTask

原创
作者头像
bowenerchen
发布2024-11-20 10:20:49
发布2024-11-20 10:20:49
21600
代码可运行
举报
文章被收录于专栏:编码视界编码视界
运行总次数:0
代码可运行

在路径操作中使用BackgroundTask

代码语言:python
代码运行次数:0
复制
def my_background_task(task_id: str):
    """
    测试 background_task
    """
    begin_time = time.time()
    time.sleep(random.randint(1, 5))
    end_time = time.time()
    print(f'task_id:{task_id}, task finished, time cost:{end_time - begin_time}s')


@app.post('/background_task')
async def background_task(background_tasks: BackgroundTasks):
    """
    测试接口 background-task
    """
    task_id = uuid.uuid4().hex
    background_tasks.add_task(my_background_task, task_id)
    return {
        'current_task_id': task_id
    }
执行异步任务
执行异步任务

结合依赖注入使用异步任务

代码语言:python
代码运行次数:0
复制
def my_background_task(task_id: str):
    """
    测试 background_task
    """
    begin_time = time.time()
    time.sleep(random.randint(1, 5))
    end_time = time.time()
    print(f'task_id:{task_id}, task finished, time cost:{end_time - begin_time}s')

def get_q(background_tasks: BackgroundTasks, q: str | None = None):
    """
    测试接口 background-task
    """
    if q is not None:
        task_id = uuid.uuid4().hex
        q = f'{q}-{task_id}'
        background_tasks.add_task(my_background_task, task_id)
    return q


@app.post('/background_task_2')
async def background_task_2(q: Annotated[str, Depends(get_q)]):
    """
    测试接口 background-task
    """
    return {
        'current_task_id': q
    }
触发异步任务
触发异步任务
不触发异步任务
不触发异步任务

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 在路径操作中使用BackgroundTask
  • 结合依赖注入使用异步任务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档