跳至主要內容

Celery学习笔记

银角小王大约 8 分钟笔记笔记celerydjangopython

1. Celery 简介

1.1 什么是 Celery?

Celery 是一个简单、灵活且可靠的分布式任务队列系统,用于处理大量消息并提供实时操作或异步任务处理。它允许你将任务分配到不同的工作节点上,并支持任务的调度、执行和监控。

1.2 Celery 的使用场景

  • Web 应用中的异步任务:处理耗时操作,如发送电子邮件或生成报告。
  • 定时任务:例如每天定时执行某个任务。
  • 分布式处理:将任务分配到不同的节点,实现负载均衡。

1.3 Celery 的优势

  • 简单易用,支持 Python 原生任务。
  • 提供高度可扩展性,可以处理数百万个任务。
  • 与 Redis、RabbitMQ 等消息代理(Broker)无缝集成。

2. Celery 基础概念

2.1 任务(Task)

任务是 Celery 中的基本工作单元。每个任务都是一个 Python 函数,它可以被 Celery 的工作进程执行。

2.2 工作队列(Queue)

任务被放置在队列中,等待工人进程来处理。Celery 使用消息队列系统来存储这些任务。

2.3 工人进程(Worker)

工人是执行队列中任务的进程。Celery 中的工人可以运行在单个机器或多个机器上。

2.4 代理(Broker)

代理是任务队列系统的核心,负责传递消息。Celery 支持多种消息代理,最常用的是 Redis 和 RabbitMQ。

2.5 结果后端(Result Backend)

Celery 可以将任务执行的结果存储到某个后端,如 Redis、数据库等,方便后续查询。


3. Celery 安装与配置

3.1 Celery 安装步骤

首先,需要通过 pip 安装 Celery 和一个消息代理,比如 Redis:

pip install celery[redis]

或者安装rabbitmq:

pip install celery[rabbitmq]

3.2 Celery 与 Redis/RabbitMQ 代理的集成

要运行 Celery,首先需要设置代理。假设使用 Redis,启动 Redis 服务:

redis-server

或启动 Docker 容器:

docker run -d --restart=always -p 6379:6379 --name redis redis:alpine3.19  --requirepass ***
docker run -d --restart=always -p 5672:5672 --name rabbitmq rabbitmq:management

在你的项目中创建一个 celery.py 文件来初始化 Celery 实例:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

配置 Celery:celeryconfig.py 文件详解

你可以在单独的配置文件中定义 Celery 的配置:

broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True

4. 任务创建与管理

4.1 创建一个简单的任务

定义一个简单的任务,可以放在 tasks.py 文件中:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

4.2 使用装饰器定义任务

Celery 提供了 @app.task 装饰器,简单地将 Python 函数转换为 Celery 任务。

4.3 任务参数传递

任务可以接受任意数量的参数,例如:

@app.task
def multiply(x, y):
    return x * y

4.4 定义任务超时和重试机制

可以通过设置参数定义任务的超时和重试逻辑:

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def failing_task(self, x):
    try:
        # 一些可能失败的操作
    except Exception as exc:
        raise self.retry(exc=exc)

5. 任务调度与异步执行

5.1 异步执行任务:delay() 和 apply_async()

  • delay():快速调用任务并异步执行。
* add.delay(4, 6)
  • apply_async():提供更多自定义选项,如延时执行。
add.apply_async((4, 6), countdown=10)  # 延迟10秒执行 

5.2 获取任务执行结果

通过任务的 AsyncResult 对象,可以检查任务状态和获取结果:

result = add.delay(4, 6)
print(result.status)  # 查看任务状态
print(result.result)  # 获取任务结果

5.3 任务的链式执行(Chain)

可以通过 Celery 的链式执行机制,将多个任务串行地执行:

from celery import chain
chain(add.s(2, 2), multiply.s(4))()

5.4 并行任务(Group、Chord)

  • Group:并行执行多个任务。
from celery import group
group(add.s(i, i) for i in range(10))()
  • Chord:在所有任务执行完毕后,执行回调任务。
from celery import chord
chord(add.s(i, i) for i in range(10))(final_task.s())

总结对比

  • group:只负责并行执行多个任务,结果返回后,处理逻辑由你自己决定。
  • chord:不仅并行执行多个任务,还会在这些任务完成后,自动执行一个回调任务来进一步处理结果。

6. 定时任务(Periodic Tasks)

6.1 使用 Celery Beat 进行定时任务调度

Celery Beat 是 Celery 的定时任务调度器,可以按计划定时执行任务。

6.2 定义定时任务

你可以在 Celery 的配置文件中定义定时任务:

from celery.schedules import crontab

beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
    'multiply-at-midnight': {
        'task': 'tasks.multiply',
        'schedule': crontab(hour=0, minute=0),
        'args': (2, 8)
    },
}

6.3 Crontab 表达式详解

crontab 用于设置更复杂的时间调度规则,如:

  • crontab(minute=0, hour=0) 每天午夜执行。
  • crontab(minute=0, hour='*/2') 每两小时执行一次。

7. 任务路由(Routing)

7.1 基于任务的路由

可以为不同类型的任务指定不同的队列:

app.conf.task_routes = {'tasks.add': {'queue': 'low_priority'}}

7.2 使用队列进行任务分发

任务可以通过不同的队列进行分发和处理:

add.apply_async(queue='low_priority')

7.3 优先级队列的使用

通过优先级队列,可以控制任务的执行顺序。队列中设置 x-max-priority 属性。

8. 监控与管理

8.1 Celery 任务监控:Flower 工具

Flower 是一个 Celery 的 web 监控工具,可以监控任务状态、工人节点等。

pip install flower
celery -A tasks flower

8.2 使用日志记录任务执行

可以在 Celery 配置中定义日志记录:

worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'

8.3 任务状态监控和错误处理 通过 AsyncResult 可以获取任务的执行状态,并处理错误。

9. 高级使用

9.1 使用自定义任务类

你可以通过继承 celery.Task 来创建自定义任务类,以便实现更加灵活的控制,例如在任务执行前后进行某些操作,或者添加特殊的行为。

创建自定义任务类 首先,定义一个自定义的任务类,继承自 celery.Task,并覆盖 run 方法。你还可以添加自定义的生命周期方法如 on_successon_failure

myapp/tasks.py

from celery import Celery, Task

app = Celery('myapp')

class MyCustomTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print(f'Task {task_id} succeeded with result: {retval}')
        return super().on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print(f'Task {task_id} failed with error: {exc}')
        return super().on_failure(exc, task_id, args, kwargs, einfo)

    def run(self, x, y):
        return x + y

# 注册自定义任务
@app.task(base=MyCustomTask)
def add(x, y):
    return x + y

在这个例子中,我们定义了一个 MyCustomTask 任务类,它继承了 celery.Task 并自定义了任务执行成功和失败时的行为:

  • on_success:任务成功时调用,用于记录日志或进行其他成功后的操作。
  • on_failure:任务失败时调用,可以用来发送错误通知或记录失败原因。
  • run:实际执行任务的核心逻辑,这里定义了一个简单的加法任务。

使用自定义任务类

你可以像使用普通的 Celery 任务一样调用这个任务:

add.delay(4, 6)

当任务成功时,on_success 方法将被调用;如果任务失败,on_failure 将处理异常。

9.2 任务并发和幂等性问题

在处理并发任务时,务必确保任务的幂等性,避免重复执行相同的任务导致数据问题。

9.3 任务重试机制优化

利用 retry 参数和异常处理机制,优化任务的重试逻辑。

9.4 使用信号和事件

Celery 提供了一些信号来跟踪任务的生命周期(如 task_sent、task_received),可以用于监控和扩展。

10. 常见问题与调试

10.1 常见错误及解决方法

例如:任务未被执行可能是因为代理配置错误。

10.2 调试 Celery 任务

使用 worker --loglevel=INFO 提供更多的调试信息。

10.3 性能优化建议

  • 使用 Redis 作为结果后端时,可以开启压缩以节省内存。
  • 适当调节工作节点数以提高并发处理能力。

13. Celery 与 Django 的集成

在 Django 项目根目录创建一个 celery.py 文件来配置 Celery 实例,通常与 settings.py 文件放在同一目录下:

myproject/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 设置 Django 的默认 settings 模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# 从 Django 的设置中导入 Celery 配置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现任务
app.autodiscover_tasks()

接着,在 myproject/__init__.py 中导入 Celery 实例,以确保 Django 加载时 Celery 也会被加载:

myproject/__init__.py

from __future__ import absolute_import, unicode_literals

# 导入 Celery app
from .celery import app as celery_app

__all__ = ('celery_app',)
# 在这段代码中,__all__ = ('celery_app',) 表示只有 celery_app 这个名称可以被其他模块导入,所有其他模块或变量即使存在也不会被导出。

13.1 配置 Celery

在 Django 的 settings.py 中进行 Celery 的配置。常见的配置包括代理(Broker)和结果后端(Result Backend):

# Redis 作为代理和结果后端
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# 任务序列化格式
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']

# 定时任务相关配置(可选)
CELERY_BEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}

13.2 创建任务

在 Django 应用中创建一个 tasks.py 文件,并定义任务:

myapp/tasks.py

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

13.3 运行 Celery

启动 Celery worker:

celery -A myproject worker --loglevel=info

如果你使用 Celery Beat 进行定时任务调度,还需要启动 Beat:

celery -A myproject beat --loglevel=info