Django Celery  任务增加多个队列,优先级设置,apply_async 函数详解-Python 技术分享 Java技术分享 Python 爬虫技术_微信公众号:zeropython—昊天博客

settings.py 设置 队列

# settings.py
from kombu import Exchange, Queue
CELERY_QUEUES = (
    Queue('task_html',
          Exchange('task_html', type='direct'),
          routing_key='task_html',
          consumer_arguments={'x-priority': 0}),
    Queue('task_js',
          Exchange('task_js', type='direct'),
          routing_key='task_js',
          consumer_arguments={'x-priority': 10}),
    Queue('task_ajax',
          Exchange('task_ajax', type='direct'),
          routing_key='task_ajax',
          consumer_arguments={'x-priority': 10}),
     Queue('extract_data',
          Exchange('extract_data', type='direct'),
          routing_key='extract_data',
          consumer_arguments={'x-priority': 10}),
)

根据任务制定对应的队列

# 设置 特定的队列

# .s 函数指定队列
group(parse_list_js.s(url_data) for url_data in urls_data)()

group(parse_list_js.s(url_data).set(queue='task_js') for url_data in urls_data)()

# apply_async,第一个参数一定要是元组,第二个才是队列
parse_list_single_json_task.delay([json_data, url_data])
parse_list_single_json_task.apply_async(args=([json_data, url_data],),queue='task_ajax')

# countdown : 设置该任务等待一段时间再执行,单位为s;
# eta : 定义任务的开始时间;eta=time.time()+10;
# expires : 设置任务时间,任务在过期时间后还没有执行则被丢弃;
# retry : 如果任务失败后, 是否重试;使用true或false,默认为true
# shadow:重新指定任务的名字str,覆盖其在日志中使用的任务名称;
# retry_policy : 重试策略.
#   max_retries : 最大重试次数, 默认为 3 次.
#   interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.
#   interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2
#   interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 .
# routing_key:自定义路由键;
# queue:指定发送到哪个队列;
# exchange:指定发送到哪个交换机;
# priority:任务队列的优先级,0-9之间;
# serializer:任务序列化方法;通常不设置;
# compression:压缩方案,通常有zlib, bzip2
# headers:为任务添加额外的消息;
# link:任务成功执行后的回调方法;是一个signature对象;可以用作关联任务

task.apply_async((2,2),
    compression='zlib',
    serialize='json',
    queue='priority.high',
    routing_key='web.add',
    priority=0,
    exchange='web_exchange')

启动 ** worker ** 消费者

# 单独启动队列 worker
celery worker -A taskproj -E  -l info -c 5 -Q task_html -n worker_task_html@%h &
celery worker -A taskproj -E  -l info -c 5 -Q task_js -n worker_task_js@%h
celery worker -A taskproj -E  -l info -c 5 -Q task_ajax -n worker_task_ajax@%h
celery worker -A taskproj -E  -l info -c 5 -Q extract_data -n worker_extract_data@%h
celery worker -A taskproj -E -l info -c 5 -Q celery -n worker_celery@%h

# 启动一个 worker 消费 5个队列
celery worker -A taskproj -E -l info -c 5 -Q celery,task_html,task_js,task_ajax,extract_data -n worker_celery@%h

启动 flower

celery flower -A taskproj 
HTTPX 基础教程-新乡seo|网站优化,网站建设_微信公众号:zeropython—昊天博客