任务调度利器:Celery

celery 简介

Celery 是一个专注于实时处理和任务调度的分布式任务队列, 同时提供操作和维护分布式系统所需的工具.. 所谓任务就是消息, 消息中的有效载荷中包含要执行任务需要的全部数据.

Celery 是一个分布式队列的管理工具, 可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列.

Celery 本身不是任务队列, 是管理分布式任务队列的工具. 它封装了操作常见任务队列的各种操作, 我们使用它可以快速进行任务队列的使用与管理.

Celery 特性 :

  • Celery易于使用和维护,且不需要配置文件,默认配置启动时自动写入消息代理
  • Celery高可用,连接丢失或失败时客户端或消费者会自动重试,并且可通过消息代理的双主/主从模式来提高高可用性
  • Celery快速,单个进程每分钟可处理百万任务,且优化后可保持往返延迟在亚毫秒级别
  • Celery灵活,几乎所有部分都支持扩展或单独使用,连接池,序列化,压缩模式,日志,调度器,消费者,生产者,自动扩展,中间人传输等

celery 组件

  • Celery Beat : 任务调度器. Beat 进程会读取配置文件的内容, 周期性的将配置中到期需要执行的任务发送给任务队列.
  • Celery Worker : 执行任务的消费者, 通常会在多台服务器运行多个消费者, 提高运行效率.
  • Broker : 消息代理, 队列本身. 也称为消息中间件. 接受任务生产者发送过来的任务消息, 存进队列再按序分发给任务消费方(通常是消息队列或者数据库). Celery支持的消息代理有 RabbitMQ 、Redis、SQS 等
  • Producer : 任务生产者. 调用 Celery API , 函数或者装饰器, 而产生任务并交给任务队列处理的都是任务生产者.
  • Result Backend : 任务处理完成之后保存状态信息和结果, 以供查询.存储支持多种方式:AMQP, Redis,SQLAlchemy,Elasticsearch 等

img

安装

pip install -U Celery

Celery还定义了多种可用于安装Celery以及给定功能的依赖项的捆绑包。

pip install celery[redis]
pip install celery[redis,msgpack]

更多详情请见:官方安装介绍

简单示例

# tasks.py
from celery import Celery

app = Celery('tasks', broker='amqp://admin:admin@localhost:5672')

@app.task
def add(x, y):
return x + y

启动worker

celery -A tasks worker --loglevel=info

调用任务

from tasks import add
add.delay(1,1)

Celery 库在使用之前必须初始化 ,一个celery实例被称为一个应用 ,Celery 应用是线程安全的,所以多个不同配置、不同组件、不同任务的 应用可以在一个进程空间里共存。

当你发送一个消息给 Celery,消息中不会包含任何源码,而只有你想要执行的任务的名称。这就好像因特网上的域名映射原理一般:每个执行单元维护着一个任务名称到实际任务函数的映射,这个映射被称为任务注册表。

理想的任务函数应该是具有幂等性的:这意味着即使一个任务函数以同样的参数被调用多次也不会导致不可预料的效果。因为工作单元无法探测任务是否是幂等的,所以默认的行为是在即将执行之前预先确认任务消息,这使得已经开始的任务不会再被执行。

参考:

0%