Celery — 异步任务分发模块详解

Celery 是 Python 中一个非常流行的异步任务队列/调度系统。它被广泛应用于 Web 应用程序中的后台任务处理、定时任务调度以及并发计算等场景。它支持分布式任务队列,可以非常方便地将任务异步执行,从而提高系统的响应速度和处理能力。

1. Celery 简介

Celery 是一个分布式任务队列,可以实现异步任务的执行,并且能够处理大量并发的任务。其主要优势是:

  • 异步任务处理:任务会被异步执行,从而避免阻塞主线程。
  • 分布式:可以在多个节点上分布式执行任务,提升系统的可扩展性和高可用性。
  • 定时任务:支持周期性任务调度,类似于 Linux 的 cron 任务。

2. 安装 Celery

首先,需要安装 Celery 和一个消息中间件(例如 Redis 或 RabbitMQ)。最常用的中间件是 Redis,下面我们以 Redis 为例。

安装 Celery 和 Redis:

pip install celery
pip install redis

3. Celery 工作原理

Celery 基于消息中间件(Broker)来分发任务。常见的消息中间件包括:

  • RabbitMQ:高效、功能强大的消息代理。
  • Redis:支持简单高效的任务队列,非常适合于 Celery。

Celery 的工作流程大致如下:

  1. 生产者:Django、Flask 或其他应用将任务提交到队列中(Celery Worker)。
  2. 消息中间件(Broker):Celery 使用消息中间件(如 Redis 或 RabbitMQ)作为任务队列的承载者,将任务存储在队列中。
  3. 消费者(Worker):Celery Worker 从队列中取出任务并执行。
  4. 结果存储(Backend):执行完成后,结果可以存储到结果后端(如数据库、Redis、RabbitMQ 等)。

4. 创建一个简单的 Celery 示例

以下是一个简单的示例,展示了如何使用 Celery 来执行异步任务。

1. 创建 Django 项目并安装 Celery

假设你已经有一个 Django 项目,并安装了 Celery 和 Redis:

pip install celery
pip install redis

2. 配置 Celery

在 Django 项目的根目录下创建一个 celery.py 文件。

# myproject/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 设置 Django 配置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

# 创建 Celery 实例
app = Celery('myproject')

# 使用 Django 配置来初始化 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现任务模块
app.autodiscover_tasks()

在 Django 项目的 settings.py 中,添加以下 Celery 配置:

# settings.py

# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Redis 作为消息中间件
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # 结果存储

3. 创建任务

在 Django 应用的 tasks.py 文件中创建一个简单的任务。例如,假设我们有一个名为 tasks.py 的文件:

# myapp/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

4. 调用任务

你可以在 Django 的视图或其他逻辑中调用 Celery 任务。例如,创建一个视图来触发异步任务:

# myapp/views.py
from django.http import HttpResponse
from .tasks import add

def add_numbers(request):
    result = add.delay(4, 6)  # 异步执行任务
    return HttpResponse(f"Task result: {result.id}")

在视图中,我们使用 add.delay() 来异步执行任务。delay() 方法将任务推送到 Celery 队列中,Celery Worker 会从队列中取出任务并执行。

5. 启动 Celery Worker

启动 Celery Worker 来执行任务:

celery -A myproject worker --loglevel=info

这将启动 Celery Worker,它会监听队列并执行任务。

6. 查看结果

你可以使用 Celery 的结果存储来获取任务的执行结果。例如,获取上面任务的执行结果:

result = add.delay(4, 6)
print(result.get(timeout=1))  # 结果: 10

这样,result.get() 会阻塞直到任务执行完成并返回结果。


5. 定时任务(Periodic Tasks)

Celery 还支持定时任务。我们可以使用 Celery Beat 来实现周期性任务调度。

1. 安装 Celery Beat

pip install celery[redis]

2. 配置 Celery Beat

在 settings.py 中添加定时任务配置:

# settings.py

CELERY_BEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'myapp.tasks.add',
        'schedule': 30.0,  # 每 30 秒执行一次
        'args': (4, 6),     # 参数
    },
}

3. 启动 Celery Beat

启动 Celery Beat 来调度定时任务:

celery -A myproject beat --loglevel=info

此时,任务将每 30 秒自动执行一次。


6. Celery 高级功能

1. 任务重试

Celery 允许你在任务失败时进行重试。例如:

@shared_task(bind=True, max_retries=3)
def add(self, x, y):
    try:
        return x + y
    except Exception as exc:
        raise self.retry(exc=exc)

这里,max_retries 指定最大重试次数,self.retry() 方法用于执行任务重试。

2. 任务优先级

Celery 支持通过队列的优先级来控制任务执行顺序。你可以为任务指定队列并设置优先级。

@shared_task(queue='high_priority')
def high_priority_task():
    pass

3. 任务链与回调

Celery 提供了任务链(task chain)和回调(callback)机制来处理任务之间的依赖关系。例如,任务 A 完成后,自动执行任务 B

from celery import chain

result = chain(task_A.s(), task_B.s())()

7. Celery 优化建议

  • 任务拆分:将复杂任务拆分为多个小任务,并使用任务链和回调来提高并发执行效率。
  • 并发控制:根据任务的类型和负载需求,配置适当数量的 Worker 进程和线程。
  • 定时任务:尽量避免过于频繁的定时任务,合理分配任务的执行频率,避免过多的系统资源消耗。

8. 总结

Celery 是一个强大的分布式任务队列系统,能够处理高并发任务、定时任务以及异步任务。它能够帮助开发者将任务从主进程中剥离出来,在后台进行处理,从而提高系统的响应速度和可扩展性。通过与 Redis 或 RabbitMQ 等消息中间件的结合,Celery 可以轻松扩展到分布式环境,处理数百万级的并发任务。

通过上述示例和配置,你可以在 Django 中轻松集成 Celery,进行异步任务处理与定时任务调度。