我正在用Flask编写一个应用程序,除了WSGI
是同步的和阻塞的之外,它工作得非常好。我有一个特别的任务,它调用第三方API,该任务可能需要几分钟才能完成。我想进行这个调用(实际上是一系列调用),并让它运行。同时将控制权返回给Flask。
我的观点如下所示:
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = audio_class.render_audio(data=text_list)
# do stuff
return Response(
mimetype='application/json',
status=200
)
现在,我想要做的就是把这句话
final_file = audio_class.render_audio()
运行并提供方法返回时要执行的回调,同时Flask可以继续处理请求。这是我唯一需要Flask异步运行的任务,我想要一些关于如何最好地实现它的建议。
我看过Twisted和Klein,但我不确定它们是否过度杀伤力,因为也许线程就足够了。或许芹菜是个不错的选择?
发布于 2015-08-06 22:56:05
我将使用Celery为您处理异步任务。您需要安装一个代理来作为您的任务队列(推荐使用RabbitMQ和Redis )。
app.py
from flask import Flask
from celery import Celery
broker_url = 'amqp://guest@localhost' # Broker URL for RabbitMQ task queue
app = Flask(__name__)
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig') # Your celery configurations in a celeryconfig.py
@celery.task(bind=True)
def some_long_task(self, x, y):
# Do some long task
...
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = audio_class.render_audio(data=text_list)
some_long_task.delay(x, y) # Call your async task and pass whatever necessary variables
return Response(
mimetype='application/json',
status=200
)
运行您的Flask应用程序,并启动另一个进程来运行您的芹菜工人。
$ celery worker -A app.celery --loglevel=debug
我还会参考Miguel Gringberg的write up,以获得更深入的指南,了解如何在Flask中使用芹菜。
发布于 2020-03-27 15:50:19
您还可以尝试在daemon=True
中使用multiprocessing.Process
;process.start()
方法不会阻塞,您可以在代价高昂的函数在后台执行时立即向调用者返回响应/状态。
在使用falcon框架和使用daemon
process时,我遇到了类似的问题。
您需要执行以下操作:
from multiprocessing import Process
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
heavy_process = Process( # Create a daemonic process with heavy "my_func"
target=my_func,
daemon=True
)
heavy_process.start()
return Response(
mimetype='application/json',
status=200
)
# Define some heavy function
def my_func():
time.sleep(10)
print("Process finished")
您应该会立即得到响应,10秒后,您应该会在控制台中看到打印的消息。
注意:请记住,daemonic
进程不允许派生任何子进程。
发布于 2021-06-05 16:37:55
烧瓶2.0
Flask 2.0现在支持异步路由。为此,您可以使用httpx库和asyncio协程。你可以像下面这样修改你的代码
@app.route('/render/<id>', methods=['POST'])
async def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = await asyncio.gather(
audio_class.render_audio(data=text_list),
do_other_stuff_function()
)
# Just make sure that the coroutine should not having any blocking calls inside it.
return Response(
mimetype='application/json',
status=200
)
上面的代码只是一个伪代码,但是你可以检查asyncio是如何与flask 2.0一起工作的,对于HTTP调用,你可以使用httpx。还要确保协程只执行一些I/O任务。
https://stackoverflow.com/questions/31866796
复制