Django Celery 任务增加多个队列,优先级设置,apply_async 函数详解
发布时间:2019-08-27T16:18:49:手机请访问
文章目录

在settings.py
设置 队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# settings.py from kombu import Exchange, Queue CELERY_QUEUES = ( Queue('task_html', Exchange('task_html', type='direct'), routing_key='task_html', consumer_arguments={'x-priority': 0}), Queue('task_js', Exchange('task_js', type='direct'), routing_key='task_js', consumer_arguments={'x-priority': 10}), Queue('task_ajax', Exchange('task_ajax', type='direct'), routing_key='task_ajax', consumer_arguments={'x-priority': 10}), Queue('extract_data', Exchange('extract_data', type='direct'), routing_key='extract_data', consumer_arguments={'x-priority': 10}), ) |
根据任务制定对应的队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# 设置 特定的队列 # .s 函数指定队列 group(parse_list_js.s(url_data) for url_data in urls_data)() group(parse_list_js.s(url_data).set(queue='task_js') for url_data in urls_data)() # apply_async,第一个参数一定要是元组,第二个才是队列 parse_list_single_json_task.delay([json_data, url_data]) parse_list_single_json_task.apply_async(args=([json_data, url_data],),queue='task_ajax') # countdown : 设置该任务等待一段时间再执行,单位为s; # eta : 定义任务的开始时间;eta=time.time()+10; # expires : 设置任务时间,任务在过期时间后还没有执行则被丢弃; # retry : 如果任务失败后, 是否重试;使用true或false,默认为true # shadow:重新指定任务的名字str,覆盖其在日志中使用的任务名称; # retry_policy : 重试策略. # max_retries : 最大重试次数, 默认为 3 次. # interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待. # interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2 # interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 . # routing_key:自定义路由键; # queue:指定发送到哪个队列; # exchange:指定发送到哪个交换机; # priority:任务队列的优先级,0-9之间; # serializer:任务序列化方法;通常不设置; # compression:压缩方案,通常有zlib, bzip2 # headers:为任务添加额外的消息; # link:任务成功执行后的回调方法;是一个signature对象;可以用作关联任务 task.apply_async((2,2), compression='zlib', serialize='json', queue='priority.high', routing_key='web.add', priority=0, exchange='web_exchange') |
启动 ** worker ** 消费者
1 2 3 4 5 6 7 8 9 10 |
# 单独启动队列 worker celery worker -A taskproj -E -l info -c 5 -Q task_html -n worker_task_html@%h & celery worker -A taskproj -E -l info -c 5 -Q task_js -n worker_task_js@%h celery worker -A taskproj -E -l info -c 5 -Q task_ajax -n worker_task_ajax@%h celery worker -A taskproj -E -l info -c 5 -Q extract_data -n worker_extract_data@%h celery worker -A taskproj -E -l info -c 5 -Q celery -n worker_celery@%h # 启动一个 worker 消费 5个队列 celery worker -A taskproj -E -l info -c 5 -Q celery,task_html,task_js,task_ajax,extract_data -n worker_celery@%h |
启动 flower
1 2 |
celery flower -A taskproj |
