当前位置: 首页 > java >正文

9 定时任务与周期性调度

一、定时任务核心机制

1.1 基础调度配置

# celery.py
from celery import Celery
from celery.schedules import crontabapp = Celery('proj')
app.conf.beat_schedule = {'daily-report': {'task': 'report.generate','schedule': crontab(hour=3, minute=30),  # 每天3:30执行'args': (),'options': {'queue': 'reports'}},'every-5-min-check': {'task': 'monitor.check_status','schedule': 300.0,  # 每300秒执行'kwargs': {'service': 'api'}}
}

调度器类型对比:

调度器精度持久化动态修改适用场景
Default秒级开发环境
Database分钟级支持支持中小型生产环境
Redis(RedBeat)秒级支持支持大型分布式系统

1.2 启动Beat服务

# 基础启动命令
celery -A proj beat --loglevel=info# 生产环境推荐参数
celery -A proj beat \--scheduler=redbeat.RedBeatScheduler \--max-interval=10 \--pidfile=/var/run/celerybeat.pid

二、集群防重调度方案

2.1 Redis分布式锁方案

# redbeat_lock.py
import redis
from contextlib import contextmanagerr = redis.Redis(host='redis-ha')@contextmanager
def redis_lock(lock_key, timeout=30):identifier = str(uuid.uuid4())if r.setnx(lock_key, identifier):r.expire(lock_key, timeout)try:yield Truefinally:if r.get(lock_key) == identifier.encode():r.delete(lock_key)else:yield False# 在Beat启动时检查
if not redis_lock('celery:beat:master'):sys.exit("Another beat instance is running")

2.2 数据库标记方案

# models.py
from django.db import modelsclass BeatLock(models.Model):hostname = models.CharField(max_length=255)last_heartbeat = models.DateTimeField(auto_now=True)# beat.py
from django.utils import timezonedef acquire_lock():now = timezone.now()return BeatLock.objects.update_or_create(defaults={'last_heartbeat': now},hostname=settings.HOSTNAME,last_heartbeat__lt=now - timedelta(seconds=60))# 定时任务中持续更新心跳
@app.task
def refresh_beat_lock():BeatLock.objects.filter(hostname=settings.HOSTNAME).update(last_heartbeat=timezone.now())

三、动态任务管理实战

3.1 django-celery-beat 集成

安装配置:

pip install django-celery-beat# settings.py
INSTALLED_APPS += ('django_celery_beat',)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

管理界面示例:

# 通过Admin创建周期任务
from django_celery_beat.models import PeriodicTask, CrontabScheduleschedule, _ = CrontabSchedule.objects.get_or_create(minute='30',hour='3',day_of_week='*',day_of_month='*',month_of_year='*',
)PeriodicTask.objects.create(crontab=schedule,name='Daily Inventory Sync',task='inventory.sync',args='["supplier1"]',queue='critical',enabled=True
)

3.2 API动态管理示例

# api/views.py
from rest_framework import generics
from django_celery_beat.models import PeriodicTask
from .serializers import TaskSerializerclass TaskAPI(generics.ListCreateAPIView):queryset = PeriodicTask.objects.all()serializer_class = TaskSerializerdef perform_create(self, serializer):instance = serializer.save()# 通知所有Beat节点刷新instance.enabled = Trueinstance.save()app.control.broadcast('beat_restart')

四、高可用架构设计

4.1 RedBeat集群方案

# 配置RedBeat
app.conf.redbeat_redis_url = 'redis://redis-ha:6379/1'
app.conf.redbeat_lock_timeout = 60
app.conf.redbeat_keyprefix = 'rb:'# 启动多个Beat节点
celery -A proj beat --scheduler=redbeat.RedBeatScheduler

RedBeat核心机制:

  1. 基于Redis的分布式锁
  2. 调度信息持久化存储
  3. 节点故障自动转移
  4. 支持动态修改调度计划

4.2 跨机房调度方案

app.conf.beat_sync_every = 10  # 同步间隔(秒)
app.conf.beat_max_loop_interval = 30  # 最大调度间隔
app.conf.broker_transport_options = {'global_prefix': 'sh1_',  # 上海机房'visibility_timeout': 1800
}

五、监控与排错指南

5.1 关键监控指标

# 查看待执行任务
redis-cli KEYS redbeat:* | xargs redis-cli TYPE# 检查锁状态
redis-cli GET redbeat::lock# 获取调度统计
celery -A proj inspect scheduled

5.2 日志分析模式

# 配置专用日志格式
app.conf.worker_log_format = '''[%(asctime)s] [BEAT] [%(levelname)s] 
%(message)s'''
app.conf.worker_task_log_format = '''%(asctime)s [BEAT] 
Task %(task_name)s: %(message)s'''

六、典型业务场景案例

6.1 电商大促动态调度

def adjust_schedule(event_type):if event_type == 'flash_sale':PeriodicTask.objects.filter(name='daily_cleanup').update(enabled=False)PeriodicTask.objects.create(name='flash_sale_monitor',task='monitor.track_sales',interval=IntervalSchedule.objects.get(every=10, period='seconds'),enabled=True)

6.2 金融对账系统

# 节假日感知调度
class HolidayAwareCrontab(CrontabSchedule):def is_due(self, last_run_at):if is_holiday(datetime.now()):return False, 3600*24  # 跳过节假日return super().is_due(last_run_at)# 自定义调度器
app.conf.beat_schedulers = {'holiday': 'proj.schedulers.HolidayAwareScheduler'
}

七、最佳实践总结

高可用架构检查清单:

  • 使用RedBeat或Database调度器
  • 配置合理的锁超时时间(建议30-60秒)
  • 实现节点心跳监控
  • 启用任务历史记录
  • 设置任务执行超时时间

性能优化参数推荐:

app.conf.beat_max_loop_interval = 30  # 最大调度间隔
app.conf.worker_prefetch_multiplier = 4  # 预取数量
app.conf.task_acks_late = True  # 确保任务不丢失
app.conf.task_reject_on_worker_lost = True

灾备方案示例:

# 主节点
celery -A proj beat --scheduler=redbeat.RedBeatScheduler# 备用节点
celery -A proj beat --scheduler=redbeat.RedBeatScheduler \--redbeat_initial_lock_ttl=30

通过合理设计定时任务架构,可以实现:

  • 99.99%的调度可用性
  • 分钟级的故障转移能力
  • 动态调整的灵活调度策略
  • 日均百万级定时任务处理能力

扩展建议:

  • 集成Prometheus监控指标
  • 实现调度可视化看板
  • 开发灰度发布功能
  • 构建自动化测试套件
http://www.xdnf.cn/news/7727.html

相关文章:

  • 活到老学到老-Spring注解-如何创建get和set
  • C++面向对象——多态
  • 进程之IPC通信一
  • 内核常见面试问题汇总
  • PN结的形成及特性
  • 技术派项目——注册登录(用户名密码的方式)
  • 瀚高安全版4.5.8/4.5.9字符串默认按字节存储导致数据无法写入(APP)
  • 前端流行框架Vue3教程:20. 插槽slot(2)
  • leetcode 找到字符串中所有字母异位词 java
  • 牛顿迭代法求解除法
  • C语言中三个点代表什么含义...
  • LeetCode 438. 找到字符串中所有字母异位词 | 滑动窗口与字符计数数组解法
  • base算法
  • Web开发-Python应用Flask框架Jinja模版绑定路由参数传递页面解析SSTI注入
  • Baumer工业相机堡盟工业相机的工业视觉如何使用三色光进行字符识别检测
  • 第十六届C++B组easyQuestions
  • AI产品经理课程推荐
  • 2025ICPC南昌邀请赛-G
  • 【实验增效】5 μL/Test 高浓度液体试剂!Elabscience PE Anti-Mouse Ly6G抗体 简化流式细胞术流程
  • 【操作系统】进程同步问题——生产者-消费者问题
  • 【Git】远程操作
  • spring cloud gateway配置
  • 探索自定义地图样式,打造应用专属个性化地图
  • 《探索具身智能机器人视觉-运动映射模型的创新训练路径》
  • 中级网络工程师知识点8
  • Rocketmq Broker与队列关系,怎么存储的
  • AI语音合成平台:AnKo开启免费创作新时代!
  • 基于Telink 8258配合Wireshark抓包测试SIG Mesh的IV Index Update过程
  • Java基础 Day16
  • leetcode hot100:四、解题思路大全:滑动窗口(无重复字符的最长子串、找到字符串中所有字母异位词)、子串(和为k的子数组、)