Celery笔记
Getting Started
celery is flexible. Almost every part of Celry can be extended or used on its own, Custom pool implementations, serializers, compression schemes, logging, schedulers, consumers, producers, broker transports, and much more.
celery中current_app的指向
# main.py from celery import current_app from celery import Celery cel = Celery("lll") cel2 = Celery("bbb") cel2 = Celery("ddd") cel2 = Celery("fff") print(current_app.main) cel2 = Celery("ddd") print(current_app.main) pr
# result of running main.py fff ddd
Running the Celery worker server
demo1.py
from celery import Celery from celery.utils.log import get_task_logger logger = get_task_logger(__name__) app = Celery('hello', broker='amqp://guest:guest@127.0.0.1:5672//', backend='redis://127.0.0.1:6379/0') @app.task def hello(): logger.info("task hello") return 'hello'
运行 demo1.py 命令:
celery -A main worker
默认celery队列,exchange为direct类型的celery, binding_key为celery
Calling the task
from main import hello a = hello.delay() print(a.id) a = a.get() print(a)
delay函数发送任务,a.id获取其任务ID,a.get阻塞获取异步任务结果
消息发送到rabbitmq中,rabbitmq消息内容为:
如果以redis作为broker,其消息内容为:
{\"body\": \"W1sxLCAyXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"main.add\", \"id\": \"95eb2c01-437e-40e6-9b15-e9b6c9218e99\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"95eb2c01-437e-40e6-9b15-e9b6c9218e99\", \"parent_id\": null, \"argsrepr\": \"[1, 2]\", \"kwargsrepr\": \"{}\", \"origin\": \"gen226466@iZ8vb4rhbik3h9nvaj42m2Z\"}, \"properties\": {\"correlation_id\": \"95eb2c01-437e-40e6-9b15-e9b6c9218e99\", \"reply_to\": \"e227c785-cc00-35d9-b52a-5062bfcf22ce\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"9a140915-c69d-40b7-a895-7978a8cc077a\"}}
get函数获取任务执行结果,以redis为backend,其结果如下:
Configuration
有多种配置方式,本配置介绍采用app.conf.update(**kwargs),格式为 celery configuration
配置参数在不同版本有不同的key值。例如CELERY_ACCEPT_CONTENT –> accept_content,请参考 Configration and defaults
Basic configuration
# task serializer (json, pickle, yaml, msgpack) task_serailizer = ['json'] # using serializer name, default['json'] accept_content = ['json'] # using seralizer name result_accept_content = ['json'] # enable_utc enable_utc = True # pytz timezone = "UTC" # consumer rate limit task_annotations = {'tasks.add': {'rate_limit': '10/s'}} task_annotations = {'*': {'rate_limit': '10/s'}} # If this is True, all tasks will be executed locally by blocking until the task returns task_always_eager = False task_ignore_result = False task_store_errors_even_if_ignored = False # task_acks_late = False # default 1 day, unit in seconds result_expires = 86400 result_persistent = False
Task failure handling
def my_on_failure(self, exc, task_id, args, kwargs, einfo): print('Oh no! Task failed: {0!r}'.format(exc)) task_annotations = {'*': {'on_failure': my_on_failure}}
Message routing
celery任务可以通过delay(tuple, **kwargs)和apply_async(tuple, **kwargs)进行。apply_async中kwargs支持更多的参数
任务投放
按routing_key投放
from main import hello, add from kombu import Exchange a = hello.apply_async((), exchange=Exchange('celery'), routing_key='celery') print(a.id)
按队列投放,如果队列不存在将建立exchange和queue并绑定routing_key
from main import hello, add from kombu import Exchange a = hello.apply_async((), queue='celery') print(a.id)
task_routes配置用于任务投放,当delay()和applay_async()没有描述相关参数时,按照task_routes投放至队列
app.conf.update(task_routes=[ {'main.hello': 'hello'}, {'*': 'celery'} ])
celery定时任务, celery -A main beat
from celery import Celery from kombu import Queue, Exchange from datetime import timedelta from celery.utils.log import get_task_logger logger = get_task_logger(__name__) app = Celery('hello', broker='amqp://guest:guest@127.0.0.1:5672//', backend='redis://127.0.0.1:6379/0') app.conf.update(task_routes=[ {'main.hello': 'hello'}, {'*': 'celery'} ]) app.conf.update(beat_schedule={ 'bbb': { 'task': 'main.add', 'schedule': timedelta(seconds=1), 'args': [1, 2] } }) @app.task(name='main.hello') def hello(): logger.info("task hello") return 'hello' @app.task(name='main.add') def add(x, y :int): return x + y
任务消费
task_queues参数负责worker启动时消费的队列,如未指定Exchange,key均使用celery
# demo2.py from celery import Celery from kombu import Queue, Exchange from celery.utils.log import get_task_logger logger = get_task_logger(__name__) app = Celery('hello', broker='amqp://guest:guest@127.0.0.1:5672//', backend='redis://127.0.0.1:6379/0') app.conf.update(task_routes=[ {'main.hello': 'hello'}, {'*': 'celery'} ]) app.conf.update(task_queues=[ Queue('normal', Exchange('normal'), routing_key='normal'), Queue('exception', Exchange('exception'), routing_key='exception') ]) @app.task(name='main.hello') def hello(): logger.info("task hello") return 'hello' @app.task def add(x, y :int): return x + y
celelry -A main worker -Q normal中 -Q 参数指定worker消费队列
A worker instance can consume from any number of queues. By default it will consume from all queues defined in the task_queues setting (that if not specified falls back to the default queue named celery).
celery中使用logging记录日志
from celery.utils.log import get_task_logger
celery进阶
celery定时任务的实现
celery任务消费速度限制
celery工作流实现
Extensions and Bootsteps
https://docs.celeryproject.org/en/stable/userguide/extending.html
celery源码解析
附录
celery configuration
app.conf.update( task_serializer='json', task_annotations = {'consumer.hello': {'rate_limit': '100/s'}}, timezone='UTC', result_persistent=False, celery_default_queue='bbb', celery_default_routing_key='default', task_routes = [ {'consumer.hello': {'queue': 'hello_celec'}}, ], # 一个用语描述task.name与队列关系的字典 task_queues = { Queue('hello'), Queue('hello_celec', exchange=Exchange('hello_celec'), routing_key='hello_celec') }, beat_schedule={ 'bbb': { 'task': 'consumer.hello', 'schedule': timedelta(seconds=1), 'args': [1, 2] } } )
Reference:
- https://www.jianshu.com/p/db4fead7a56c
- https://www.jianshu.com/p/581fd8f92509
- celery源码解析: https://liqiang.io/post/kombu-source-code-analysis-part-2?lang=ZH_CN
Problem:
celery为何是高可用的?
worker和beat具有重试机制当连接断开或丢失时,borker本身高可用,具有主从复制或双主可用