Celery 分布式任务队列
经纪人(broker)🔗
需要发送消息/接收消息,通过第三方消息代理软件,来实现。
- RabbitMQ
- Redis
#RabbitMQ安装
#Linux系统
sudo apt-get install rabbitmq-server
#Docker
docker run -d -p 5672:5672 rabbitmq
#Redis安装
docker run -d -p 6379:6379 redis
安装🔗
pip install celery
简单例子🔗
#task.py
from celery import Celery
#broker 代理消息的软件 、backend 保存任务结果的后端
app = Celery('tasks', broker='pyamqp://guest@localhost// ',backend='rpc://')
#rabbitMQ pyamqp://guest@localhost//
#redis redis://localhost:6379/0
@app.task
def add(x, y):
return x + y
运行工作服务器🔗
celery -A task worker --loglevel=INFO
调用任务🔗
- 使用delay调用任务
from tasks import add
add.delay(4, 4)
保存任务🔗
- Celery 内置了几个结果后端可供选择: SQLAlchemy / Django ORM, MongoDB 、 Memcached 、 Redis 、 RPC ( RabbitMQ /AMQP)以及 – 或者您可以定义自己的。
#RPC 结果后端
#backend='rpc://'
from tasks import add
result = add.delay(4,4)
result.ready() #判断结果是否完成
result.get(timeout=1) #获取结果并设置超时1s
result.get() #获取结果
# result.get(propagate=False) 任务异常时get()也会异常,propagate覆盖操作
- error
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'. Did you mean: 'get_task_meta_for'?
# 解决 重启调用任务的Python
配置🔗
# 单个配置
app.conf.task_serializer = 'json'
# update 多个配置
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
# 配置模块
app.config_from_object('celeryconfig')
celeryconfig.py
###
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
###
python -m celeryconfig # 验证配置模块是否正确