目录
celery启动时调用的主程序
tasks.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from celery import Celery
from config.config import CeleryConfig
from share.log_config import setup_log
# 创建 Celery 应用
setup_log()
aaa_app = Celery('attack_app',
include=[
'nanny',
]
)
aaa_app.config_from_object(CeleryConfig)
if __name__ == '__main__':
aaa_app.worker_main()
设置celery 参数 与 redis 连接
config/config.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import os
import redis
from share.app_settings import AppSetting
class RedisConfig():
flag = AppSetting().get_config_value("log", "debug")
DEBUG = True if flag != "False" else False
ENV_REDIS_HOST = os.getenv('REDIS_URL')
#host = ENV_REDIS_HOST if ENV_REDIS_HOST else ('192.168.0.156' if DEBUG else '192.168.154.247')
host = ENV_REDIS_HOST if ENV_REDIS_HOST else ('127.0.0.1' if DEBUG else '127.0.0.1')
port = 6379
password = '123456' if DEBUG else '123456'
cache_db = 2
borker = 0
backend = 1
rc = redis.StrictRedis(
host=host,
port=port,
db=cache_db,
password=password
)
class CeleryConfig(object):
BROKER_URL = f"redis://:{RedisConfig.password}@{RedisConfig.host}:{RedisConfig.port}/{RedisConfig.borker}" # borker
CELERY_RESULT_BACKEND = f"redis://:{RedisConfig.password}@{RedisConfig.host}:{RedisConfig.port}/{RedisConfig.backend}" # backend
CELERY_TASK_SERIALIZER = 'json' # " json从4.0版本开始默认json,早期默认为pickle(可以传二进制对象)
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_ENABLE_UTC = True # 启用UTC时区设置
CELERY_TIMEZONE = 'Asia/Shanghai' # 上海时区
CELERYD_MAX_TASKS_PER_CHILD = 1 # 每个进程最多执行1个任务后释放进程(再有任务,新建进程执行,解决内存泄漏)
WORKER_HIJACK_ROOT_LOGGER = False
[log]
debug = False
path = logs/celery.tasks.log
主要设置密码,举例
config/redis.conf
requirepass 123456
在入口文件中 include 中
用于异步动态调用模块使用
这里注意一处注释的代码
@aaa_app.task(base=CallbackTask, ignore_result=True)
ignore_result参数:如果使用该参数,则当调用 load_and_run_plugin.delay(module_path, data, task_meta) 函数时,返回值将无法获取
res = load_and_run_plugin.delay(module_path, data, task_meta)
result = res.get()
头文件中的?sys.path.append("./") 必不可缺,否则动态调用模块无法获取正确的路径
nanay.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import json
import logging
import traceback
import importlib
from celery import Task
from tasks import aaa_app
sys.path.append("./")
logger = logging.getLogger("log")
class CallbackTask(Task):
"""
exc : 失败时的错误的类型
task_id : 任务的id;
args : 任务函数的参数
kwargs : 键值对参数
einfo : 失败或重试时的异常详细信息
retval : 任务成功执行的返回值
"""
def on_success(self, retval, task_id, args, kwargs):
pass
"""
aaa_app.send_task(
"task_manager.load_worker_task_result",
args=(
{"retval": retval, "task_id": task_id, "args": args, "kwargs": kwargs}, "success"
),
queue="queue_task_manager"
)
"""
def on_failure(self, exc, task_id, args, kwargs, einfo):
pass
"""
attack_app.send_task(
"task_manager.load_worker_task_result",
args=(
{"exc": exc, "task_id": task_id, "args": args, "kwargs": kwargs, "einfo": einfo}, "failure"
),
queue="queue_task_manager"
)
"""
#@aaa_app.task(base=CallbackTask, ignore_result=True)
@aaa_app.task(base=CallbackTask)
def load_and_run_plugin(module, data, task_meta, func="run"):
model_obj = importlib.import_module(module)
print(f'开始获取{module}插件的{func}方法')
_func = getattr(model_obj, func, None)
if _func:
try:
print(f'开始运行{module}.{func}方法,入参:{data}')
print(f"task_meta:{task_meta}")
module_result = dict()
module_result["result"] = _func(data, task_meta)
except Exception as err:
errortrace = traceback.format_exc()
module_result["status"] = False
module_result["errinfo"] = f"{module}.{func}出错,错误信息:{errortrace}"
print(f"{module}.{func}出错,错误信息:{errortrace}")
logger.error(f"{module}.{func}出错,错误信息:{errortrace}")
else:
module_result["status"] = False
module_result["errinfo"] = f"{module}不存在方法{func},请检查插件"
logger.error(f"{module}不存在方法{func},请检查插件")
return module_result