celery独立部署接入数据库配置
目录结构:
config下配置:
__init__:
import os
import sys
sys.path.append(os.getcwd())
from celery import Celeryapp=Celery('celeryTester') # 创建一个Celery实例,名字自定义
app.config_from_object('config.celery_config') # 从celery_config获取配置
celery_config:
from .celery_signals import task_failure_handler,task_postrun_handler # 全局信号
broker_url = 'redis://xxx' # 作为任务队列,beat及手动调用往里添加任务,worker从这里取任务
result_backend = 'redis://xxx' # 任务结果
broker_connection_retry_on_startup=True # 重连可以不配
# 序列化相关,默认都是json
task_serializer='json'
accept_content=['json']
result_serializer='json'
#时区设置
timezone='Asia/Shanghai'
enable_utc=True
# 从哪些模块获取任务,一般是从根目录启动worker及beat进程,目录以根目录起
include=['tasks.sample_tasks']
# 使用自定义调度器
beat_schedule= {} # 动态获取这里配置为空
beat_scheduler = 'my_scheduler.dynamic_scheduler.DynamicScheduler' # 使用自定义scheduler类
beat_scheduler_reload_interval = 300 # beat_scheduler多久重新加载,可覆盖默认值
celery_signals:
import datetimefrom celery.signals import task_postrun, task_failure
from utils.coon import DatabaseConnection@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **other):# 只记录有返回值的成功任务,暂时只更新periodic_task表里的状态if state == 'SUCCESS':duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)DatabaseConnection().update('periodic_task',{'last_run_at': last_run_at, 'status': True},# set值'task=%s',(task.name,))# where后condition值@task_failure.connect
def task_failure_handler(task_id, exception, traceback, task, args, kwargs, einfo, **other):# 任务失败走此信号机制duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)DatabaseConnection().update('periodic_task',{'last_run_at': last_run_at, 'status': False},'task=%s',(task.name,))
dynamic_scheduler:
import json
import time
from typing import Dictfrom celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from utils.coon import DatabaseConnectionclass DynamicScheduler(Scheduler):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self._schedule = {}self._last_reload = time.time()self.reload_interval = kwargs.get('reload_interval', 60*5) # 默认300秒重载def setup_schedule(self):"""初始化加载调度配置,起beat进程时会调用"""self._schedule = self.load_schedule()def load_schedule(self) -> Dict[str, ScheduleEntry]:"""从配置源加载调度配置"""schedule = {}tasks =DatabaseConnection().fetch_all('select * from periodic_task where enabled=1')for item in tasks:name = item['name']if item['interval']:sche = item['interval']else:cron = item['crontab']minute, hour, day, month, week_day = cron.strip().split(' ')sche = crontab(minute=minute, hour=hour, day_of_week=week_day, day_of_month=day,month_of_year=month)item['args']=item['args'] and json.loads(item['args'])item['kwargs']=item['kwargs'] and json.loads(item['kwargs'])schedule[name] = ScheduleEntry(name=name,task=item['task'],schedule=sche,args=item.get('args' or ()),kwargs=item.get('kwargs', {}))return scheduledef tick(self, event_t=..., min=..., max=...):"""重载tick方法实现定期检查"""now = time.time()if now - self._last_reload > self.reload_interval:self._schedule = self.load_schedule()self._last_reload = nowself.logger.debug('Reloaded schedule')return super().tick()@propertydef schedule(self) -> Dict[str, ScheduleEntry]:"""返回当前调度配置"""return self._schedule
sample_tasks:
from config import app@app.task
def add(x, y):return x + y@app.task
def multiply(x, y):return x * y@app.task
def hello_world():return "Hello World!"
coon下面就是一个数据库连接类,自己随便找个就行这里就不贴出来了
启动命令:
windows机器下
worker进程:
celery -A app worker --loglevel=info --pool=solo
beat进程
celery -A app beat --loglevel=info
最后解释下原理,一般最简单的是在celery_config配置beat_schedule,我们是通过自定义Scheduler类,从数据库里面取值,类似于动态拿到这个beat_schedule值。好处就是可以直接通过配置修改或者添加定时任务,不用再去代码修改添加了,并在最开始和结束添加落表等操作