在Django项目中,如何集成使用Celery框架来完成一些异步任务以及定时任务呢?
pip install celery # celery框架
pip install django-celery-beat # celery定时任务使用
pip install django-celery-results # celery存储结果使用
在 settings.py
配置文件中增加如下配置项:
INSTALLED_APPS = [
...
'celery',
'django_celery_beat',
'django_celery_results'
]
"""以下是celery的相关配置"""
# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False
# broker backend 配置:使用rabbitmq作为中间件
CELERY_BROKER_URL = "amqp://devops:devops123@127.0.0.1:5672/alarm"
CELERY_RESULT_BACKEND = "amqp://devops:devops123@127.0.0.1:5672/alarm"
# 使用django_celery_beat动态配置任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# celery序列化和反序列化配置
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
# 下面配置项在有些情况下可以防止死锁 非常重要!
CELERYD_FORCE_EXECV = True
# 任务结果存储的过期时间,默认1天过期。如果beat开启,Celery每天会自动清除,设为0,存储结果永不过期。
CELERY_RESULT_EXPIRES = 60 * 60 * 24
# 每个worker执行1000次任务后死掉,会自动重启worker,防止任务占用太多内存导致内存泄漏
CELERY_MAX_TASKS_PER_CHILD = 1000
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_DISABLE_RATE_LIMITS = True
# 单个任务的运行时间限制,否则会被杀死
CELERYD_TASK_TIME_LIMIT = 60 * 60
CELERY_TASK_RETRY = 2
# celery 队列配置
from kombu import Exchange, Queue
# consumer_arguments设置队列的优先级
CELERY_TASK_QUEUES = (
Queue('alarm_queue', Exchange('alarm_exchange'), routing_key='alarm_email', consumer_arguments={'x-priority': 5}),
Queue('alarm_queue', Exchange('alarm_exchange'), routing_key='alarm_phone', consumer_arguments={'x-priority': 8}),
Queue('calcu_queue', Exchange('calcu_exchange'), routing_key='calcu_feature', consumer_arguments={'x-priority': 10})
)
CELERY_TASK_ROUTES = {
'alarm.tasks.call_phone': {'queue': 'alarm_queue', 'routing_key': 'alarm_phone'},
'alarm.tasks.send_email': {'queue': 'alarm_queue', 'routing_key': 'alarm_email'},
'calcu.tasks.execute_calcu': {'queue': 'calcu_queue', 'routing_key': 'calcu_feature'},
}
在 settings.py
同级目录下,新增一个 celery.py
文件:
from __future__ import absolute_import, unicode_literals # absolute_import: 使用python的库而不是项目目录下的文件
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "celerymq.settings")
celery_app = Celery("celerymq")
celery_app.config_from_object("django.conf:settings", namespace="CELERY")
celery_app.autodiscover_tasks(settings.INSTALLED_APPS)
在App中增加一个 tasks.py
文件,用于实现异步任务:
import time
from celery import shared_task
@shared_task(ignore_result=True)
def execute_calcu(dataframe):
print(f'execute celery task: calcu feature')
time.sleep(10) # 这里写比较耗时的逻辑
print(f'execute calcu feature task run over')
在其他文件逻辑中进行异步调用:
execute_calcu.delay(dataframe)
项目启动后,如果有异步任务进来,可以在 RabbitMQ
监控平台看到队列信息: http://127.0.0.1:15672/
。
启动 worker
去消费数据。
# 启动worker
celery worker -A celerymq -l INFO -n alarm_queue -Q alarm_queue -P eventlet
# 启动beat
celery beat -A celerymq -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
在linux中启动worker的话,可以去掉 -P eventlet
参数。
另外,定时任务推荐使用 django-admin
来下发。