https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html#first-steps
Celery时一个自带电池的任务队列。本教程内容:
Celery需要一个方法来发送和接受消息,这个方法被称为消息代理(message broker)。Celery支持多种消息代理,如RabbitMQ、Redis等。
安装RabbitMQ
:(推荐)在Dockers上运行RabbitMQ:
docker run -d -p 5672:5672 rabbitmq
或者在Ubuntu上安装RabbitMQ:
sudo apt-get install rabbitmq-server
celery -A tasks worker --loglevel=INFO
Windows下有个坑:
celery
正常启动和接收任务但不能执行,报错:ValueError: not enough values to unpack (expected 3, got 0)。需要借助eventlet
:1.安装eventlet:pip install eventlet
2.借助eventlet启动celery:celery -A tasks worker --loglevel=INFO -P eventlet
参考1:https://www.cnblogs.com/qumogu/p/13284173.html 参考2:https://stackoverflow.com/questions/37255548/how-to-run-celery-on-windows 但这只是一个临时解决方案, celery对windows的支持很差,最好还是在Linux下运行。windows系统可以用WSL。
使用delay()
方法调用task:
在Python shell中:
from tasks import add
add.delay(4, 4)
注:
delay()
方法是apply_async()
方法的快捷方式。
然后,之前启动的worker进程会执行这个任务。可以在worker进程的日志中看到任务的执行情况:
[2024-04-10 21:58:25,217: INFO/MainProcess] Task tasks.add[987d2e18-0090-4b5b-bcb5-bd038b9690a3] received
[2024-04-10 21:58:25,221: INFO/MainProcess] Task tasks.add[987d2e18-0090-4b5b-bcb5-bd038b9690a3] succeeded in 0.0s: 8
如果要跟踪任务的状态, Celery需要将状态存储或发送到某个地方,如SQLAlchemy/Django ORM、MongoDB、Memcached、Redis、RPC(RabbitMQ/AMQP),并且可以自定义。
在此示例中,我们使用 rpc
作为结果后端(result backend),它将状态作为暂时性消息发送回。Celery
通过 backend
参数 指定后端(如果选择使用配置模块,则通过result_backend
设置指定)。因此,您可以在 tasks.py
文件中修改此行以启用 rpc://
后端:
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
或者,如果您想使用 Redis 作为结果后端,但仍然使用 RabbitMQ 作为消息代理(一种流行的组合):
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
现在配置了结果后端,关闭当前 python 会话并再次导入 tasks 模块以使更改生效。这一次,您将保留调用任务时返回的 AsyncResult 实例:
from tasks import add
result = add.delay(4, 4)
然后可以用ready()
方法检查任务是否完成:
result.ready()
您可以等待结果完成,但很少使用,因为这会将异步调用转换为同步调用:
result.get(timeout=1)
8
Celery就像家用电器一样,不需要太多配置。只需要配置输入(连接到代理 broker)和输出(连接到结果后端)即可使用。但是,如果你仔细观察,你会发现有很多按钮。这就是配置选项。默认的配置通常是足够的,但是也可以通过修改配置让Celery更适合你的需求。
可以直接在app上修改配置:
app.conf.task_serializer = 'json'
如果一次性修改多个配置,可以使用update
方法:
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
对于较大的项目,建议使用专用的配置模块。可以用app.config_from_object()
告诉 Celery 使用配置模块:
app.config_from_object('celeryconfig')
配置模块名称通常是celeryconfig
。
该模块必须在当前目录可以访问, celeryconfig.py
:
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
要验证配置文件是否正常工作且不包含任何语法错误,可以尝试导入它: python -m celeryconfig
下面是两个配置示例:将行为异常的任务路由到专用队列的方式
task_routes = {
'tasks.add': 'low-priority',
}
对任务进行速率限制
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}