flask使用celery通过数据库定时
celery基本配置
from .celery_signals import task_failure_handler,task_postrun_handlerbroker_connection_retry_on_startup=Truetask_serializer='json'
accept_content=['json']
result_serializer='json'
timezone='Asia/Shanghai'
enable_utc=Trueinclude=['app.tasks.test_tasks']
beat_schedule= {} # 动态获取
# 使用自定义调度器
beat_scheduler = 'app.scheduler.database_scheduler.FlaskDBScheduler'
beat_scheduler_reload_interval = 30 # 可覆盖默认值
celery信号机制:
import datetimefrom celery.signals import task_postrun, task_failure
from app.models import TaskExecutionLog, db@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **other):from app import appif state == 'SUCCESS':duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)with app.app_context():db.session.add(TaskExecutionLog(task_id=task_id,task_name=task.name,status='SUCCESS',result=retval,args=args,kwargs=kwargs,start_time=last_run_at,end_time=datetime.datetime.now(datetime.timezone.utc),duration=duration))db.session.commit()@task_failure.connect
def task_failure_handler(task_id, exception, traceback, task, args, kwargs, einfo, **other):# 记录返回异常的任务from app import appduration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)with app.app_context():db.session.add(TaskExecutionLog(task_id=task_id,task_name=task.name,status='FAILURE',result=exception,args=args,kwargs=kwargs,start_time=last_run_at,end_time=datetime.datetime.now(datetime.timezone.utc),duration=duration))db.session.commit()
flask基本配置:
from flask import Flask
from celery import Celery
from flask_sqlalchemy import SQLAlchemyfrom config import Configcelery = Celery()
db = SQLAlchemy()def create_app(config_class=Config):app = Flask(__name__)app.config.from_object(config_class)# 注册celeryinit_celery(app)# 注册dbinit_db(app)# 注册路由from app.views import bpapp.register_blueprint(bp)return appdef init_celery(app: Flask):from config import celery_configcelery.config_from_object(celery_config)celery.conf.update(broker_url=app.config['CELERY_BROKER_URL'],result_backend=app.config['CELERY_RESULT_BACKEND'])celery.flask_app = app #scheduler类可直接调用class ContextTask(celery.Task):def __call__(self, *args, **kwargs):with app.app_context():return self.run(*args, **kwargs)celery.Task = ContextTaskdef init_db(app: Flask):db.init_app(app)app=create_app()
scheduler类重写:
import time
from typing import Dictfrom celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontabfrom app.models import ScheduledTask,dbclass FlaskDBScheduler(Scheduler):def __init__(self, *args, **kwargs):self._schedule = {}self._last_reload = 0self.reload_interval = kwargs.get('reload_interval', 30) # 默认30秒重载super().__init__(*args, **kwargs)# 确保在调度器初始化时创建表with self.app.flask_app.app_context():db.create_all()def setup_schedule(self):self._schedule = self.load_schedule()def load_schedule(self) -> Dict[str, ScheduleEntry]:"""从Flask数据库加载调度配置"""schedule = {}with self.app.flask_app.app_context():for task in ScheduledTask.query.filter_by(enabled=1).all():schedule[task.name] = self.create_entry(task)return scheduledef create_entry(self, db_task) -> ScheduleEntry:"""将数据库记录转换为ScheduleEntry"""try:db_schedule=float(db_task.schedule)except:minute, hour, day, month, week_day = db_task.schedule.strip().split(' ')db_schedule = crontab(minute=minute, hour=hour, day_of_week=week_day, day_of_month=day,month_of_year=month)return ScheduleEntry(name=db_task.name,task=db_task.task,schedule=db_schedule,args=db_task.args,kwargs=db_task.kwargs,options={'enabled': db_task.enabled})def tick(self, event_t=None, min=None, max=None):"""重载tick方法实现定期检查"""now = time.time()if now - self._last_reload > self.reload_interval:self._schedule = self.load_schedule()self._last_reload = nowself.logger.debug('Reloaded schedule from Flask DB')return super().tick()@propertydef schedule(self) -> Dict[str, ScheduleEntry]:return self._schedule
表基本配置:
from app import db
from datetime import datetime,timezoneclass ScheduledTask(db.Model):id = db.Column(db.Integer, primary_key=True,comment='id')name = db.Column(db.String(128), unique=True, nullable=False) # 别名,望文生义task = db.Column(db.String(256), nullable=False) # e.g. 'app.tasks.sample_task'schedule = db.Column(db.String(128), nullable=False) # e.g. '10.0' or '0 8 * * *'args = db.Column(db.JSON, default=list)kwargs = db.Column(db.JSON, default=dict)enabled = db.Column(db.Boolean, default=True)last_updated = db.Column(db.DateTime, default=lambda :datetime.now(timezone.utc))def __repr__(self):return '<ScheduledTask %r %s>' % self.name,self.taskclass TaskExecutionLog(db.Model):id = db.Column(db.Integer, primary_key=True)task_id = db.Column(db.String(128), index=True)task_name = db.Column(db.String(256))status = db.Column(db.String(50)) # SUCCESS, FAILURE, RETRYresult = db.Column(db.Text)traceback = db.Column(db.Text)args = db.Column(db.JSON)kwargs = db.Column(db.JSON)start_time = db.Column(db.DateTime)end_time = db.Column(db.DateTime)duration = db.Column(db.Float) # in secondsdef __repr__(self):return f'<TaskExecution {self.task_name} {self.status}>'
接口展示及添加任务:
import importlibfrom flask import Blueprint, jsonify, request
from app.models import ScheduledTask, TaskExecutionLog
from app import dbbp = Blueprint('api', __name__, url_prefix='/api')@bp.route('/tasks', methods=['GET'])
def list_tasks():tasks = ScheduledTask.query.all()return jsonify([{'id': t.id,'name': t.name,'task': t.task,'schedule': t.schedule,'enabled': t.enabled} for t in tasks])@bp.route('/tasks', methods=['POST'])
def create_task():data = request.jsonprint(data)task = ScheduledTask(name=data['name'],task=data['task'],schedule=data['schedule'],args=data.get('args') or [],kwargs=data.get('kwargs') or {},enabled=data.get('enabled', True))db.session.add(task)db.session.commit()return jsonify({'message': 'Task created'}), 201@bp.route('/run', methods=['POST'])
def run_task():data = ScheduledTask.query.filter(ScheduledTask.id == request.json['id'], ScheduledTask.enabled == True).first()if data and data.task:module, func = data.task.rsplit('.', 1)task = getattr(importlib.import_module(module), func)result = task.delay(*data.args, **data.kwargs)return jsonify({'task_id': result.task_id})return jsonify({'message': 'Task not found'}), 404@bp.route('/task-logs', methods=['GET'])
def list_task_logs():logs = TaskExecutionLog.query.order_by(TaskExecutionLog.start_time.desc()).limit(50).all()return jsonify([{'task_name': log.task_name,'status': log.status,'start_time': log.start_time.isoformat(),'duration': log.duration} for log in logs])
项目根目录导出celery便于celery命令执行:
from app import celery
tasks任务编写:
from app import celery
from app.models import ScheduledTask@celery.task
def add(x, y):return x + y@celery.task
def multiply(x, y):return x * y@celery.task
def hello_world():return "Hello World!"@celery.task
def monitor_task_list():tasks = ScheduledTask.query.all()return [{'id': t.id,'name': t.name,'task': t.task,'schedule': t.schedule,'enabled': t.enabled} for t in tasks]
版本依赖:
celery==5.2.7
Flask==3.1.1
flask_sqlalchemy==3.1.1
tasks简单编写方便举例配置:
from app import celery
from app.models import ScheduledTask@celery.task
def add(x, y):return x + y@celery.task
def multiply(x, y):return x * y@celery.task
def hello_world():return "Hello World!"@celery.task
def monitor_task_list():tasks = ScheduledTask.query.all()return [{'id': t.id,'name': t.name,'task': t.task,'schedule': t.schedule,'enabled': t.enabled} for t in tasks]
celery启动:
celery -A make_celery worker --pool=solo --loglevel=info #worker启动命令 celery -A make_celery beat --loglevel=info # beat启动命令
数据库配置:
按照上述步骤配置即可生效