当前位置: 首页 > news >正文

celery独立部署接入数据库配置

目录结构:

config下配置:

__init__:

import os
import sys
sys.path.append(os.getcwd())
from celery import Celeryapp=Celery('celeryTester') # 创建一个Celery实例,名字自定义
app.config_from_object('config.celery_config') # 从celery_config获取配置

celery_config:

from .celery_signals import task_failure_handler,task_postrun_handler # 全局信号
broker_url = 'redis://xxx' # 作为任务队列,beat及手动调用往里添加任务,worker从这里取任务
result_backend = 'redis://xxx'    # 任务结果
broker_connection_retry_on_startup=True # 重连可以不配
# 序列化相关,默认都是json
task_serializer='json'
accept_content=['json']
result_serializer='json'
#时区设置
timezone='Asia/Shanghai'
enable_utc=True
# 从哪些模块获取任务,一般是从根目录启动worker及beat进程,目录以根目录起
include=['tasks.sample_tasks']
# 使用自定义调度器
beat_schedule= {} # 动态获取这里配置为空
beat_scheduler = 'my_scheduler.dynamic_scheduler.DynamicScheduler' # 使用自定义scheduler类
beat_scheduler_reload_interval = 300  # beat_scheduler多久重新加载,可覆盖默认值

celery_signals:

import datetimefrom celery.signals import task_postrun, task_failure
from utils.coon import DatabaseConnection@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **other):# 只记录有返回值的成功任务,暂时只更新periodic_task表里的状态if state == 'SUCCESS':duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)DatabaseConnection().update('periodic_task',{'last_run_at': last_run_at, 'status': True},# set值'task=%s',(task.name,))# where后condition值@task_failure.connect
def task_failure_handler(task_id, exception, traceback, task, args, kwargs, einfo, **other):# 任务失败走此信号机制duration = other.get('runtime', 0)last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)DatabaseConnection().update('periodic_task',{'last_run_at': last_run_at, 'status': False},'task=%s',(task.name,))

dynamic_scheduler:

import json
import time
from typing import Dictfrom celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from utils.coon import DatabaseConnectionclass DynamicScheduler(Scheduler):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self._schedule = {}self._last_reload = time.time()self.reload_interval = kwargs.get('reload_interval', 60*5)  # 默认300秒重载def setup_schedule(self):"""初始化加载调度配置,起beat进程时会调用"""self._schedule = self.load_schedule()def load_schedule(self) -> Dict[str, ScheduleEntry]:"""从配置源加载调度配置"""schedule = {}tasks =DatabaseConnection().fetch_all('select * from periodic_task where enabled=1')for item in tasks:name = item['name']if item['interval']:sche = item['interval']else:cron = item['crontab']minute, hour, day, month, week_day = cron.strip().split(' ')sche = crontab(minute=minute, hour=hour, day_of_week=week_day, day_of_month=day,month_of_year=month)item['args']=item['args'] and json.loads(item['args'])item['kwargs']=item['kwargs'] and json.loads(item['kwargs'])schedule[name] = ScheduleEntry(name=name,task=item['task'],schedule=sche,args=item.get('args' or ()),kwargs=item.get('kwargs', {}))return scheduledef tick(self, event_t=..., min=..., max=...):"""重载tick方法实现定期检查"""now = time.time()if now - self._last_reload > self.reload_interval:self._schedule = self.load_schedule()self._last_reload = nowself.logger.debug('Reloaded schedule')return super().tick()@propertydef schedule(self) -> Dict[str, ScheduleEntry]:"""返回当前调度配置"""return self._schedule

sample_tasks:

from config import app@app.task
def add(x, y):return x + y@app.task
def multiply(x, y):return x * y@app.task
def hello_world():return "Hello World!"

coon下面就是一个数据库连接类,自己随便找个就行这里就不贴出来了

启动命令:

windows机器下
worker进程:
celery -A app worker --loglevel=info --pool=solo
beat进程
celery -A app beat --loglevel=info

最后解释下原理,一般最简单的是在celery_config配置beat_schedule,我们是通过自定义Scheduler类,从数据库里面取值,类似于动态拿到这个beat_schedule值。好处就是可以直接通过配置修改或者添加定时任务,不用再去代码修改添加了,并在最开始和结束添加落表等操作

http://www.xdnf.cn/news/557965.html

相关文章:

  • 【C++算法】68.栈_字符串解码
  • 关于Linux服务器数字取证一
  • pytorch小记(二十四):PyTorch 中的 `torch.full` 全面指南
  • Python 包管理工具 uv
  • RocketMQ 的事务消息是如何实现的
  • 【Java高阶面经:微服务篇】3.熔断机制深度优化:从抖动治理到微服务高可用架构实战
  • unipp === 状态管理 Pinia 使用
  • 萌新联赛第(三)场
  • 自建主机NAS
  • Java转Go日记(四十二):错误处理
  • 链表-设计链表
  • OBS Studio:windows免费开源的直播与录屏软件
  • Tractor S--二维转一维,然后最小生成树
  • Python 中 pass 语句的详解和使用
  • Java双指针法:原地移除数组元素
  • IEEE出版|2025年智能光子学与应用技术国际学术会议(IPAT2025)
  • CRC计算
  • doris数据分片逻辑
  • RFID技术在半导体晶圆卡塞盒中的应用方案
  • C语言学习笔记之结构体
  • Cribl 在的function 的活用 (pipeline中)
  • day018-磁盘管理-案例
  • PySide6 GUI 学习笔记——常用类及控件使用方法(常用控件调色板QPalette)
  • Linux X86平台安装ARM64交叉编译器方法
  • 如何在 AOSP 中判断一个源文件属于哪个模块(以 CameraService 为例)
  • 首次中医知识问答模型微调
  • CSS display有几种属性值
  • 深入理解 Python 中的几种方法:实例方法、类方法、静态方法与特殊方法
  • leetcode 162. Find Peak Element
  • python新手学习笔记①