
Celery 是一个简单、灵活且可靠的分布式任务队列系统,专注于实时处理的异步任务队列,同时也支持任务调度。Celery是实现异步任务、定时任务的一种工具。
Celery 的核心功能
Celery 的架构
Celery 的工作流程
AsyncResult 查询任务的状态和结果。生产环境建议
安装Redis 作为消息中间件(过程略)
安装 Celery 和 Redis客户端
pip install redis
pip install celeryDjango项目结构示例
- mysite/
- manage.py
- mysite/
- __init__.py
- settings.py
- urls.py定义 Celery 实例:创建文件mysite\mysite\celery.py
"""定义和配置 Celery 实例"""
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
# 创建 Celery 实例
app = Celery("mysite")
# 加载配置文件中的 Celery 配置
app.config_from_object("django.conf:settings", namespace="CELERY")
# 自动发现并加载任务
app.autodiscover_tasks(["myapp_infra", "myapp_system"] + settings.MY_APPS, force=True)配置 Django 启动时会加载应用:修改文件mysite\mysite\__init__.py
"""Django 启动时加载Celery实例"""
from .celery import app as celery_app
__all__ = ("celery_app",)配置Celery:修改Django项目文件settings.py
### Celery配置
CELERY_RESULT_BACKEND = "redis://localhost:6379/2"
CELERY_BROKER_URL = "redis://localhost:6379/3"
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = True
CELERY_RESULT_EXTENDED = True # 启用后才会记录 task_name、date_started 等字段
CELERY_TASK_TRACK_STARTED = True # 记录任务开始时间发现任务:Celery 将自动从所有已安装的应用APP中发现任务,需要遵守以下目录结构
- myapp_system/
- tasks.py
- models.py
- myapp_infra/
- tasks.py
- models.py定义任务:创建文件myapp_infra/tasks.py,使用@shared_task装饰器定义 Celery 任务
"""定义 Celery 任务"""
from celery import shared_task
from time import sleep
@shared_task
def send_email_task(subject, message, recipient_list):
"""
发送电子邮件的任务
"""
print("发送邮件任务开始执行...")
sleep(3)
print(f"主题: {subject}")
print(f"内容: {message}")
print(f"收件人: {recipient_list}")
print("#" * 10, "\n")调用任务:在视图或其他代码中,使用 .delay() 方法将任务发送到 Celery 队列中。
from rest_framework.views import APIView
from rest_framework.response import Response
from .tasks import send_email_task
class SendEmailView(APIView):
def get(self, request):
subject = "Hello from Celery"
message = "This is a test email sent using Celery."
recipient_list = ["user@example.com"]
# 异步发送邮件
send_email_task.delay(subject, message, recipient_list)
return Response({"message": "Email sent successfully"})启动Celery工作进程:在开发和测试环境中,使用下面命令启动Celery工作进程
# 进入Django项目目录(包含manage.py的目录)
celery -A mysite worker -l INFO -P solo启动Django项目
# 进入Django项目目录(包含manage.py的目录)
python manage.py runserver当访问SendEmailView 视图,会看到日志中显示任务已触发和执行。
[2025-04-14 09:11:53,151: INFO/MainProcess] Task mybooks.tasks.send_email_task[c37a8725-aa59-43b4-9949-74a753c019a2] received
[2025-04-14 09:11:55,185: WARNING/MainProcess] 发送邮件任务开始执行...
[2025-04-14 09:11:58,186: WARNING/MainProcess] 主题: Hello from Celery
[2025-04-14 09:11:58,186: WARNING/MainProcess] 内容: This is a test email sent using Celery.
[2025-04-14 09:11:58,186: WARNING/MainProcess] 收件人: ['user@example.com']
[2025-04-14 09:11:58,187: WARNING/MainProcess] ##########
[2025-04-14 09:11:58,187: WARNING/MainProcess]delay_on_commit 是 Celery 提供的一个用于确保任务在数据库事务提交后执行的机制。考虑以下场景:
# views.py
def create_user(request):
user = User.objects.create(username=request.POST['username'])
send_email.delay(user.pk)
return HttpResponse('User created')
# task.py
@shared_task
def send_email(user_pk):
user = User.objects.get(pk=user_pk)
# send email ...delay_on_commit 会将任务调度延迟到当前事务成功提交后执行。如果事务回滚,任务也不会被调度。
# views.py
from django.db import transaction
from .tasks import send_email_task
@transaction.atomic
def create_user(request):
user = User.objects.create(username="zhangsan") # 数据库操作
# 正确:事务提交后才会触发任务
send_email_task.delay_on_commit(user.id)
# 如果此处抛出异常导致事务回滚,任务不会被执行
return HttpResponse("OK")您正在阅读的是《Django从入门到实战》专栏!关注不迷路~
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。