Celery — 异步任务分发模块详解
Celery 是 Python 中一个非常流行的异步任务队列/调度系统。它被广泛应用于 Web 应用程序中的后台任务处理、定时任务调度以及并发计算等场景。它支持分布式任务队列,可以非常方便地将任务异步执行,从而提高系统的响应速度和处理能力。
1. Celery 简介
Celery 是一个分布式任务队列,可以实现异步任务的执行,并且能够处理大量并发的任务。其主要优势是:
- 异步任务处理:任务会被异步执行,从而避免阻塞主线程。
- 分布式:可以在多个节点上分布式执行任务,提升系统的可扩展性和高可用性。
- 定时任务:支持周期性任务调度,类似于 Linux 的
cron
任务。
2. 安装 Celery
首先,需要安装 Celery 和一个消息中间件(例如 Redis 或 RabbitMQ)。最常用的中间件是 Redis,下面我们以 Redis 为例。
安装 Celery 和 Redis:
pip install celery
pip install redis
3. Celery 工作原理
Celery 基于消息中间件(Broker)来分发任务。常见的消息中间件包括:
- RabbitMQ:高效、功能强大的消息代理。
- Redis:支持简单高效的任务队列,非常适合于 Celery。
Celery 的工作流程大致如下:
- 生产者:Django、Flask 或其他应用将任务提交到队列中(Celery Worker)。
- 消息中间件(Broker):Celery 使用消息中间件(如 Redis 或 RabbitMQ)作为任务队列的承载者,将任务存储在队列中。
- 消费者(Worker):Celery Worker 从队列中取出任务并执行。
- 结果存储(Backend):执行完成后,结果可以存储到结果后端(如数据库、Redis、RabbitMQ 等)。
4. 创建一个简单的 Celery 示例
以下是一个简单的示例,展示了如何使用 Celery 来执行异步任务。
1. 创建 Django 项目并安装 Celery
假设你已经有一个 Django 项目,并安装了 Celery 和 Redis:
pip install celery
pip install redis
2. 配置 Celery
在 Django 项目的根目录下创建一个 celery.py
文件。
# myproject/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# 设置 Django 配置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# 创建 Celery 实例
app = Celery('myproject')
# 使用 Django 配置来初始化 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现任务模块
app.autodiscover_tasks()
在 Django 项目的 settings.py
中,添加以下 Celery 配置:
# settings.py
# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0' # Redis 作为消息中间件
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 结果存储
3. 创建任务
在 Django 应用的 tasks.py
文件中创建一个简单的任务。例如,假设我们有一个名为 tasks.py
的文件:
# myapp/tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
4. 调用任务
你可以在 Django 的视图或其他逻辑中调用 Celery 任务。例如,创建一个视图来触发异步任务:
# myapp/views.py
from django.http import HttpResponse
from .tasks import add
def add_numbers(request):
result = add.delay(4, 6) # 异步执行任务
return HttpResponse(f"Task result: {result.id}")
在视图中,我们使用 add.delay()
来异步执行任务。delay()
方法将任务推送到 Celery 队列中,Celery Worker 会从队列中取出任务并执行。
5. 启动 Celery Worker
启动 Celery Worker 来执行任务:
celery -A myproject worker --loglevel=info
这将启动 Celery Worker,它会监听队列并执行任务。
6. 查看结果
你可以使用 Celery 的结果存储来获取任务的执行结果。例如,获取上面任务的执行结果:
result = add.delay(4, 6)
print(result.get(timeout=1)) # 结果: 10
这样,result.get()
会阻塞直到任务执行完成并返回结果。
5. 定时任务(Periodic Tasks)
Celery 还支持定时任务。我们可以使用 Celery Beat
来实现周期性任务调度。
1. 安装 Celery Beat
pip install celery[redis]
2. 配置 Celery Beat
在 settings.py
中添加定时任务配置:
# settings.py
CELERY_BEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'myapp.tasks.add',
'schedule': 30.0, # 每 30 秒执行一次
'args': (4, 6), # 参数
},
}
3. 启动 Celery Beat
启动 Celery Beat 来调度定时任务:
celery -A myproject beat --loglevel=info
此时,任务将每 30 秒自动执行一次。
6. Celery 高级功能
1. 任务重试
Celery 允许你在任务失败时进行重试。例如:
@shared_task(bind=True, max_retries=3)
def add(self, x, y):
try:
return x + y
except Exception as exc:
raise self.retry(exc=exc)
这里,max_retries
指定最大重试次数,self.retry()
方法用于执行任务重试。
2. 任务优先级
Celery 支持通过队列的优先级来控制任务执行顺序。你可以为任务指定队列并设置优先级。
@shared_task(queue='high_priority')
def high_priority_task():
pass
3. 任务链与回调
Celery 提供了任务链(task chain)和回调(callback)机制来处理任务之间的依赖关系。例如,任务 A
完成后,自动执行任务 B
:
from celery import chain
result = chain(task_A.s(), task_B.s())()
7. Celery 优化建议
- 任务拆分:将复杂任务拆分为多个小任务,并使用任务链和回调来提高并发执行效率。
- 并发控制:根据任务的类型和负载需求,配置适当数量的 Worker 进程和线程。
- 定时任务:尽量避免过于频繁的定时任务,合理分配任务的执行频率,避免过多的系统资源消耗。
8. 总结
Celery 是一个强大的分布式任务队列系统,能够处理高并发任务、定时任务以及异步任务。它能够帮助开发者将任务从主进程中剥离出来,在后台进行处理,从而提高系统的响应速度和可扩展性。通过与 Redis 或 RabbitMQ 等消息中间件的结合,Celery 可以轻松扩展到分布式环境,处理数百万级的并发任务。
通过上述示例和配置,你可以在 Django 中轻松集成 Celery,进行异步任务处理与定时任务调度。
发表回复