Django系列之Celery异步框架+RabbitMQ使用

发布时间:2023年12月18日

在Django项目中,如何集成使用Celery框架来完成一些异步任务以及定时任务呢?

1. 安装

pip install celery		# celery框架
pip install django-celery-beat		# celery定时任务使用
pip install django-celery-results		# celery存储结果使用

2. Django集成celery

settings.py 配置文件中增加如下配置项:

INSTALLED_APPS = [
	...
    'celery',
    'django_celery_beat',
    'django_celery_results'
]


"""以下是celery的相关配置"""
# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False

# broker backend 配置:使用rabbitmq作为中间件
CELERY_BROKER_URL = "amqp://devops:devops123@127.0.0.1:5672/alarm"
CELERY_RESULT_BACKEND = "amqp://devops:devops123@127.0.0.1:5672/alarm"

# 使用django_celery_beat动态配置任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

# celery序列化和反序列化配置
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']

# 下面配置项在有些情况下可以防止死锁  非常重要!
CELERYD_FORCE_EXECV = True
# 任务结果存储的过期时间,默认1天过期。如果beat开启,Celery每天会自动清除,设为0,存储结果永不过期。
CELERY_RESULT_EXPIRES = 60 * 60 * 24
# 每个worker执行1000次任务后死掉,会自动重启worker,防止任务占用太多内存导致内存泄漏
CELERY_MAX_TASKS_PER_CHILD = 1000
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_DISABLE_RATE_LIMITS = True
# 单个任务的运行时间限制,否则会被杀死
CELERYD_TASK_TIME_LIMIT = 60 * 60
CELERY_TASK_RETRY = 2

# celery 队列配置
from kombu import Exchange, Queue
# consumer_arguments设置队列的优先级
CELERY_TASK_QUEUES = (
    Queue('alarm_queue', Exchange('alarm_exchange'), routing_key='alarm_email', consumer_arguments={'x-priority': 5}),
    Queue('alarm_queue', Exchange('alarm_exchange'), routing_key='alarm_phone', consumer_arguments={'x-priority': 8}),
    Queue('calcu_queue', Exchange('calcu_exchange'), routing_key='calcu_feature', consumer_arguments={'x-priority': 10})
)

CELERY_TASK_ROUTES = {
    'alarm.tasks.call_phone': {'queue': 'alarm_queue', 'routing_key': 'alarm_phone'},
    'alarm.tasks.send_email': {'queue': 'alarm_queue', 'routing_key': 'alarm_email'},
    'calcu.tasks.execute_calcu': {'queue': 'calcu_queue', 'routing_key': 'calcu_feature'},
}

settings.py 同级目录下,新增一个 celery.py 文件:

from __future__ import absolute_import, unicode_literals    # absolute_import: 使用python的库而不是项目目录下的文件
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "celerymq.settings")
celery_app = Celery("celerymq")

celery_app.config_from_object("django.conf:settings", namespace="CELERY")
celery_app.autodiscover_tasks(settings.INSTALLED_APPS)

在App中增加一个 tasks.py 文件,用于实现异步任务:

import time
from celery import shared_task

@shared_task(ignore_result=True)
def execute_calcu(dataframe):
    print(f'execute celery task: calcu feature')
    time.sleep(10)		# 这里写比较耗时的逻辑
    print(f'execute calcu feature task run over')

在其他文件逻辑中进行异步调用:

execute_calcu.delay(dataframe)

项目启动后,如果有异步任务进来,可以在 RabbitMQ 监控平台看到队列信息: http://127.0.0.1:15672/
在这里插入图片描述

3. 启动命令

启动 worker 去消费数据。

# 启动worker
celery worker -A celerymq -l INFO -n alarm_queue -Q alarm_queue -P eventlet

# 启动beat
celery beat -A celerymq -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

在linux中启动worker的话,可以去掉 -P eventlet 参数。

另外,定时任务推荐使用 django-admin 来下发。

文章来源:https://blog.csdn.net/CSDN1csdn1/article/details/134938311
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。