Celery学习笔记
1. Celery 简介
1.1 什么是 Celery?
Celery 是一个简单、灵活且可靠的分布式任务队列系统,用于处理大量消息并提供实时操作或异步任务处理。它允许你将任务分配到不同的工作节点上,并支持任务的调度、执行和监控。
1.2 Celery 的使用场景
- Web 应用中的异步任务:处理耗时操作,如发送电子邮件或生成报告。
- 定时任务:例如每天定时执行某个任务。
- 分布式处理:将任务分配到不同的节点,实现负载均衡。
1.3 Celery 的优势
- 简单易用,支持 Python 原生任务。
- 提供高度可扩展性,可以处理数百万个任务。
- 与 Redis、RabbitMQ 等消息代理(Broker)无缝集成。
2. Celery 基础概念
2.1 任务(Task)
任务是 Celery 中的基本工作单元。每个任务都是一个 Python 函数,它可以被 Celery 的工作进程执行。
2.2 工作队列(Queue)
任务被放置在队列中,等待工人进程来处理。Celery 使用消息队列系统来存储这些任务。
2.3 工人进程(Worker)
工人是执行队列中任务的进程。Celery 中的工人可以运行在单个机器或多个机器上。
2.4 代理(Broker)
代理是任务队列系统的核心,负责传递消息。Celery 支持多种消息代理,最常用的是 Redis 和 RabbitMQ。
2.5 结果后端(Result Backend)
Celery 可以将任务执行的结果存储到某个后端,如 Redis、数据库等,方便后续查询。
3. Celery 安装与配置
3.1 Celery 安装步骤
首先,需要通过 pip 安装 Celery 和一个消息代理,比如 Redis:
pip install celery[redis]
或者安装rabbitmq:
pip install celery[rabbitmq]
3.2 Celery 与 Redis/RabbitMQ 代理的集成
要运行 Celery,首先需要设置代理。假设使用 Redis,启动 Redis 服务:
redis-server
或启动 Docker 容器:
docker run -d --restart=always -p 6379:6379 --name redis redis:alpine3.19 --requirepass ***
docker run -d --restart=always -p 5672:5672 --name rabbitmq rabbitmq:management
在你的项目中创建一个 celery.py 文件来初始化 Celery 实例:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
配置 Celery:celeryconfig.py 文件详解
你可以在单独的配置文件中定义 Celery 的配置:
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True
4. 任务创建与管理
4.1 创建一个简单的任务
定义一个简单的任务,可以放在 tasks.py 文件中:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
4.2 使用装饰器定义任务
Celery 提供了 @app.task 装饰器,简单地将 Python 函数转换为 Celery 任务。
4.3 任务参数传递
任务可以接受任意数量的参数,例如:
@app.task
def multiply(x, y):
return x * y
4.4 定义任务超时和重试机制
可以通过设置参数定义任务的超时和重试逻辑:
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def failing_task(self, x):
try:
# 一些可能失败的操作
except Exception as exc:
raise self.retry(exc=exc)
5. 任务调度与异步执行
5.1 异步执行任务:delay() 和 apply_async()
delay()
:快速调用任务并异步执行。
* add.delay(4, 6)
apply_async()
:提供更多自定义选项,如延时执行。
add.apply_async((4, 6), countdown=10) # 延迟10秒执行
5.2 获取任务执行结果
通过任务的 AsyncResult 对象,可以检查任务状态和获取结果:
result = add.delay(4, 6)
print(result.status) # 查看任务状态
print(result.result) # 获取任务结果
5.3 任务的链式执行(Chain)
可以通过 Celery 的链式执行机制,将多个任务串行地执行:
from celery import chain
chain(add.s(2, 2), multiply.s(4))()
5.4 并行任务(Group、Chord)
- Group:并行执行多个任务。
from celery import group
group(add.s(i, i) for i in range(10))()
- Chord:在所有任务执行完毕后,执行回调任务。
from celery import chord
chord(add.s(i, i) for i in range(10))(final_task.s())
总结对比
- group:只负责并行执行多个任务,结果返回后,处理逻辑由你自己决定。
- chord:不仅并行执行多个任务,还会在这些任务完成后,自动执行一个回调任务来进一步处理结果。
6. 定时任务(Periodic Tasks)
6.1 使用 Celery Beat 进行定时任务调度
Celery Beat 是 Celery 的定时任务调度器,可以按计划定时执行任务。
6.2 定义定时任务
你可以在 Celery 的配置文件中定义定时任务:
from celery.schedules import crontab
beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
'multiply-at-midnight': {
'task': 'tasks.multiply',
'schedule': crontab(hour=0, minute=0),
'args': (2, 8)
},
}
6.3 Crontab 表达式详解
crontab 用于设置更复杂的时间调度规则,如:
- crontab(minute=0, hour=0) 每天午夜执行。
- crontab(minute=0, hour='*/2') 每两小时执行一次。
7. 任务路由(Routing)
7.1 基于任务的路由
可以为不同类型的任务指定不同的队列:
app.conf.task_routes = {'tasks.add': {'queue': 'low_priority'}}
7.2 使用队列进行任务分发
任务可以通过不同的队列进行分发和处理:
add.apply_async(queue='low_priority')
7.3 优先级队列的使用
通过优先级队列,可以控制任务的执行顺序。队列中设置 x-max-priority 属性。
8. 监控与管理
8.1 Celery 任务监控:Flower 工具
Flower 是一个 Celery 的 web 监控工具,可以监控任务状态、工人节点等。
pip install flower
celery -A tasks flower
8.2 使用日志记录任务执行
可以在 Celery 配置中定义日志记录:
worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
8.3 任务状态监控和错误处理 通过 AsyncResult 可以获取任务的执行状态,并处理错误。
9. 高级使用
9.1 使用自定义任务类
你可以通过继承 celery.Task 来创建自定义任务类,以便实现更加灵活的控制,例如在任务执行前后进行某些操作,或者添加特殊的行为。
创建自定义任务类 首先,定义一个自定义的任务类,继承自 celery.Task
,并覆盖 run 方法。你还可以添加自定义的生命周期方法如 on_success
和 on_failure
。
myapp/tasks.py
from celery import Celery, Task
app = Celery('myapp')
class MyCustomTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print(f'Task {task_id} succeeded with result: {retval}')
return super().on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(f'Task {task_id} failed with error: {exc}')
return super().on_failure(exc, task_id, args, kwargs, einfo)
def run(self, x, y):
return x + y
# 注册自定义任务
@app.task(base=MyCustomTask)
def add(x, y):
return x + y
在这个例子中,我们定义了一个 MyCustomTask 任务类,它继承了 celery.Task 并自定义了任务执行成功和失败时的行为:
- on_success:任务成功时调用,用于记录日志或进行其他成功后的操作。
- on_failure:任务失败时调用,可以用来发送错误通知或记录失败原因。
- run:实际执行任务的核心逻辑,这里定义了一个简单的加法任务。
使用自定义任务类
你可以像使用普通的 Celery 任务一样调用这个任务:
add.delay(4, 6)
当任务成功时,on_success
方法将被调用;如果任务失败,on_failure
将处理异常。
9.2 任务并发和幂等性问题
在处理并发任务时,务必确保任务的幂等性,避免重复执行相同的任务导致数据问题。
9.3 任务重试机制优化
利用 retry 参数和异常处理机制,优化任务的重试逻辑。
9.4 使用信号和事件
Celery 提供了一些信号来跟踪任务的生命周期(如 task_sent、task_received),可以用于监控和扩展。
10. 常见问题与调试
10.1 常见错误及解决方法
例如:任务未被执行可能是因为代理配置错误。
10.2 调试 Celery 任务
使用 worker --loglevel=INFO 提供更多的调试信息。
10.3 性能优化建议
- 使用 Redis 作为结果后端时,可以开启压缩以节省内存。
- 适当调节工作节点数以提高并发处理能力。
13. Celery 与 Django 的集成
在 Django 项目根目录创建一个 celery.py
文件来配置 Celery 实例,通常与 settings.py
文件放在同一目录下:
myproject/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# 设置 Django 的默认 settings 模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# 从 Django 的设置中导入 Celery 配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现任务
app.autodiscover_tasks()
接着,在 myproject/__init__.py
中导入 Celery 实例,以确保 Django 加载时 Celery 也会被加载:
myproject/__init__.py
from __future__ import absolute_import, unicode_literals
# 导入 Celery app
from .celery import app as celery_app
__all__ = ('celery_app',)
# 在这段代码中,__all__ = ('celery_app',) 表示只有 celery_app 这个名称可以被其他模块导入,所有其他模块或变量即使存在也不会被导出。
13.1 配置 Celery
在 Django 的 settings.py
中进行 Celery 的配置。常见的配置包括代理(Broker)和结果后端(Result Backend):
# Redis 作为代理和结果后端
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# 任务序列化格式
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
# 定时任务相关配置(可选)
CELERY_BEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
13.2 创建任务
在 Django 应用中创建一个 tasks.py
文件,并定义任务:
myapp/tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
13.3 运行 Celery
启动 Celery worker:
celery -A myproject worker --loglevel=info
如果你使用 Celery Beat 进行定时任务调度,还需要启动 Beat:
celery -A myproject beat --loglevel=info