Obelieve's Blog  >  All Categories  >  Notes  >  Celery 分布式任务队列

Celery 分布式任务队列

经纪人(broker)🔗

需要发送消息/接收消息,通过第三方消息代理软件,来实现。

  • RabbitMQ
  • Redis
#RabbitMQ安装
#Linux系统
sudo apt-get install rabbitmq-server
#Docker
docker run -d -p 5672:5672 rabbitmq
#Redis安装
docker run -d -p 6379:6379 redis

安装🔗

pip install celery

简单例子🔗

#task.py
from celery import Celery

#broker 代理消息的软件 、backend 保存任务结果的后端
app = Celery('tasks', broker='pyamqp://guest@localhost// ',backend='rpc://')  
#rabbitMQ  pyamqp://guest@localhost// 
#redis redis://localhost:6379/0
@app.task  
def add(x, y):  
    return x + y

运行工作服务器🔗

celery -A task worker --loglevel=INFO 

调用任务🔗

  • 使用delay调用任务
from tasks import add
add.delay(4, 4)

保存任务🔗

  • Celery 内置了几个结果后端可供选择: SQLAlchemy / Django ORM, MongoDB 、 Memcached 、 Redis 、 RPC ( RabbitMQ /AMQP)以及 – 或者您可以定义自己的。
#RPC 结果后端 
#backend='rpc://'

from tasks import add
result = add.delay(4,4)
result.ready() #判断结果是否完成
result.get(timeout=1) #获取结果并设置超时1s
result.get() #获取结果 
# result.get(propagate=False) 任务异常时get()也会异常,propagate覆盖操作
  • error
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'. Did you mean: 'get_task_meta_for'?
# 解决 重启调用任务的Python

配置🔗

# 单个配置
app.conf.task_serializer = 'json'
# update 多个配置
app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)
# 配置模块
app.config_from_object('celeryconfig')
celeryconfig.py
###
broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
###
python -m celeryconfig # 验证配置模块是否正确
分类: Notes 
标签Python
发布于: