Celery笔记

Getting Started

celery is flexible. Almost every part of Celry can be extended or used on its own, Custom pool implementations, serializers, compression schemes, logging, schedulers, consumers, producers, broker transports, and much more.

  • celery中current_app的指向

    # main.py
    from celery import current_app
    from celery import Celery
    
    
    cel = Celery("lll")
    cel2 = Celery("bbb")
    cel2 = Celery("ddd")
    cel2 = Celery("fff")
    
    print(current_app.main)
    cel2 = Celery("ddd")
    print(current_app.main)
    pr
    
    # result of running main.py
    fff
    ddd
    
  • Running the Celery worker server

    demo1.py

    from celery import Celery
    
    from celery.utils.log import get_task_logger
    logger = get_task_logger(__name__)
    
    app = Celery('hello', broker='amqp://guest:guest@127.0.0.1:5672//',
             backend='redis://127.0.0.1:6379/0')
    
    @app.task
    def hello():
    logger.info("task hello")
    return 'hello'
    

    运行 demo1.py 命令:

    celery -A main worker
    

    默认celery队列,exchange为direct类型的celery, binding_key为celery

  • Calling the task

    from main import hello
    
    a = hello.delay()
    print(a.id)
    a = a.get()
    print(a)
    

    delay函数发送任务,a.id获取其任务ID,a.get阻塞获取异步任务结果

    消息发送到rabbitmq中,rabbitmq消息内容为:

    如果以redis作为broker,其消息内容为:

    {\"body\": \"W1sxLCAyXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"main.add\", \"id\": \"95eb2c01-437e-40e6-9b15-e9b6c9218e99\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"95eb2c01-437e-40e6-9b15-e9b6c9218e99\", \"parent_id\": null, \"argsrepr\": \"[1, 2]\", \"kwargsrepr\": \"{}\", \"origin\": \"gen226466@iZ8vb4rhbik3h9nvaj42m2Z\"}, \"properties\": {\"correlation_id\": \"95eb2c01-437e-40e6-9b15-e9b6c9218e99\", \"reply_to\": \"e227c785-cc00-35d9-b52a-5062bfcf22ce\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"9a140915-c69d-40b7-a895-7978a8cc077a\"}}
    

    get函数获取任务执行结果,以redis为backend,其结果如下:

  • Configuration

    有多种配置方式,本配置介绍采用app.conf.update(**kwargs),格式为 celery configuration

    配置参数在不同版本有不同的key值。例如CELERY_ACCEPT_CONTENT –> accept_content,请参考 Configration and defaults

    • Basic configuration

      # task serializer (json, pickle, yaml, msgpack)
      task_serailizer = ['json']
      # using serializer name, default['json']
      accept_content = ['json']
      
      # using seralizer name
      result_accept_content = ['json']
      
      # enable_utc
      enable_utc = True
      
      # pytz
      timezone = "UTC"
      
      # consumer rate limit
      task_annotations = {'tasks.add': {'rate_limit': '10/s'}}
      task_annotations = {'*': {'rate_limit': '10/s'}}
      
      # If this is True, all tasks will be executed locally by blocking until the task    returns
      task_always_eager = False
      
      task_ignore_result = False
      
      task_store_errors_even_if_ignored = False
      
      # task_acks_late = False
      
      # default 1 day, unit in seconds
      result_expires = 86400
      result_persistent = False
      
    • Task failure handling

      def my_on_failure(self, exc, task_id, args, kwargs, einfo):
      print('Oh no! Task failed: {0!r}'.format(exc))
      
      task_annotations = {'*': {'on_failure': my_on_failure}}
      
  • Message routing

    celery任务可以通过delay(tuple, **kwargs)和apply_async(tuple, **kwargs)进行。apply_async中kwargs支持更多的参数

    • 任务投放

      按routing_key投放

      from main import hello, add
      from kombu import Exchange
      
      a = hello.apply_async((), exchange=Exchange('celery'), routing_key='celery')
      print(a.id)
      

      按队列投放,如果队列不存在将建立exchange和queue并绑定routing_key

      from main import hello, add
      from kombu import Exchange
      
      a = hello.apply_async((), queue='celery')
      print(a.id)
      

      task_routes配置用于任务投放,当delay()和applay_async()没有描述相关参数时,按照task_routes投放至队列

      app.conf.update(task_routes=[
          {'main.hello': 'hello'},
          {'*': 'celery'}
      ])
      

      celery定时任务, celery -A main beat

      from celery import Celery
      from kombu import Queue, Exchange
      from datetime import timedelta
      
      from celery.utils.log import get_task_logger
      logger = get_task_logger(__name__)
      
      app = Celery('hello', broker='amqp://guest:guest@127.0.0.1:5672//',
                  backend='redis://127.0.0.1:6379/0')
      
      app.conf.update(task_routes=[
          {'main.hello': 'hello'},
          {'*': 'celery'}
      ])
      
      app.conf.update(beat_schedule={
          'bbb': {
              'task': 'main.add',
              'schedule': timedelta(seconds=1),
              'args': [1, 2]
          }
      })
      
      @app.task(name='main.hello')
      def hello():
          logger.info("task hello")
          return 'hello'
      
      @app.task(name='main.add')
      def add(x, y :int):
          return x + y
      
    • 任务消费

      task_queues参数负责worker启动时消费的队列,如未指定Exchange,key均使用celery

      # demo2.py
      from celery import Celery
      from kombu import Queue, Exchange
      
      from celery.utils.log import get_task_logger
      logger = get_task_logger(__name__)
      
      app = Celery('hello', broker='amqp://guest:guest@127.0.0.1:5672//',
                   backend='redis://127.0.0.1:6379/0')
      
      app.conf.update(task_routes=[
          {'main.hello': 'hello'},
          {'*': 'celery'}
      ])
      
      app.conf.update(task_queues=[
          Queue('normal', Exchange('normal'), routing_key='normal'),
          Queue('exception', Exchange('exception'), routing_key='exception')
      ])
      
      @app.task(name='main.hello')
      def hello():
          logger.info("task hello")
          return 'hello'
      
      @app.task
      def add(x, y :int):
          return x + y
      

      celelry -A main worker -Q normal中 -Q 参数指定worker消费队列

      A worker instance can consume from any number of queues. By default it will consume from all queues defined in the task_queues setting (that if not specified falls back to the default queue named celery).

  • celery中使用logging记录日志

    from celery.utils.log import get_task_logger
    

celery进阶

celery源码解析

附录

  • celery configuration

    app.conf.update(
        task_serializer='json',
        task_annotations = {'consumer.hello': {'rate_limit': '100/s'}},
        timezone='UTC',
        result_persistent=False,
        celery_default_queue='bbb',
        celery_default_routing_key='default',
        task_routes = [
            {'consumer.hello':  {'queue': 'hello_celec'}},
        ], # 一个用语描述task.name与队列关系的字典
        task_queues = {
            Queue('hello'),
            Queue('hello_celec', exchange=Exchange('hello_celec'), routing_key='hello_celec')
        },
        beat_schedule={
            'bbb': {
                'task': 'consumer.hello',
                'schedule': timedelta(seconds=1),
                'args': [1, 2]
            }
        }
    )
    

Reference:

Problem:

  • celery为何是高可用的?

    worker和beat具有重试机制当连接断开或丢失时,borker本身高可用,具有主从复制或双主可用