9 定时任务与周期性调度
一、定时任务核心机制
1.1 基础调度配置
# celery.py
from celery import Celery
from celery.schedules import crontabapp = Celery('proj')
app.conf.beat_schedule = {'daily-report': {'task': 'report.generate','schedule': crontab(hour=3, minute=30), # 每天3:30执行'args': (),'options': {'queue': 'reports'}},'every-5-min-check': {'task': 'monitor.check_status','schedule': 300.0, # 每300秒执行'kwargs': {'service': 'api'}}
}
调度器类型对比:
调度器 | 精度 | 持久化 | 动态修改 | 适用场景 |
---|---|---|---|---|
Default | 秒级 | 无 | 否 | 开发环境 |
Database | 分钟级 | 支持 | 支持 | 中小型生产环境 |
Redis(RedBeat) | 秒级 | 支持 | 支持 | 大型分布式系统 |
1.2 启动Beat服务
# 基础启动命令
celery -A proj beat --loglevel=info# 生产环境推荐参数
celery -A proj beat \--scheduler=redbeat.RedBeatScheduler \--max-interval=10 \--pidfile=/var/run/celerybeat.pid
二、集群防重调度方案
2.1 Redis分布式锁方案
# redbeat_lock.py
import redis
from contextlib import contextmanagerr = redis.Redis(host='redis-ha')@contextmanager
def redis_lock(lock_key, timeout=30):identifier = str(uuid.uuid4())if r.setnx(lock_key, identifier):r.expire(lock_key, timeout)try:yield Truefinally:if r.get(lock_key) == identifier.encode():r.delete(lock_key)else:yield False# 在Beat启动时检查
if not redis_lock('celery:beat:master'):sys.exit("Another beat instance is running")
2.2 数据库标记方案
# models.py
from django.db import modelsclass BeatLock(models.Model):hostname = models.CharField(max_length=255)last_heartbeat = models.DateTimeField(auto_now=True)# beat.py
from django.utils import timezonedef acquire_lock():now = timezone.now()return BeatLock.objects.update_or_create(defaults={'last_heartbeat': now},hostname=settings.HOSTNAME,last_heartbeat__lt=now - timedelta(seconds=60))# 定时任务中持续更新心跳
@app.task
def refresh_beat_lock():BeatLock.objects.filter(hostname=settings.HOSTNAME).update(last_heartbeat=timezone.now())
三、动态任务管理实战
3.1 django-celery-beat 集成
安装配置:
pip install django-celery-beat# settings.py
INSTALLED_APPS += ('django_celery_beat',)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
管理界面示例:
# 通过Admin创建周期任务
from django_celery_beat.models import PeriodicTask, CrontabScheduleschedule, _ = CrontabSchedule.objects.get_or_create(minute='30',hour='3',day_of_week='*',day_of_month='*',month_of_year='*',
)PeriodicTask.objects.create(crontab=schedule,name='Daily Inventory Sync',task='inventory.sync',args='["supplier1"]',queue='critical',enabled=True
)
3.2 API动态管理示例
# api/views.py
from rest_framework import generics
from django_celery_beat.models import PeriodicTask
from .serializers import TaskSerializerclass TaskAPI(generics.ListCreateAPIView):queryset = PeriodicTask.objects.all()serializer_class = TaskSerializerdef perform_create(self, serializer):instance = serializer.save()# 通知所有Beat节点刷新instance.enabled = Trueinstance.save()app.control.broadcast('beat_restart')
四、高可用架构设计
4.1 RedBeat集群方案
# 配置RedBeat
app.conf.redbeat_redis_url = 'redis://redis-ha:6379/1'
app.conf.redbeat_lock_timeout = 60
app.conf.redbeat_keyprefix = 'rb:'# 启动多个Beat节点
celery -A proj beat --scheduler=redbeat.RedBeatScheduler
RedBeat核心机制:
- 基于Redis的分布式锁
- 调度信息持久化存储
- 节点故障自动转移
- 支持动态修改调度计划
4.2 跨机房调度方案
app.conf.beat_sync_every = 10 # 同步间隔(秒)
app.conf.beat_max_loop_interval = 30 # 最大调度间隔
app.conf.broker_transport_options = {'global_prefix': 'sh1_', # 上海机房'visibility_timeout': 1800
}
五、监控与排错指南
5.1 关键监控指标
# 查看待执行任务
redis-cli KEYS redbeat:* | xargs redis-cli TYPE# 检查锁状态
redis-cli GET redbeat::lock# 获取调度统计
celery -A proj inspect scheduled
5.2 日志分析模式
# 配置专用日志格式
app.conf.worker_log_format = '''[%(asctime)s] [BEAT] [%(levelname)s]
%(message)s'''
app.conf.worker_task_log_format = '''%(asctime)s [BEAT]
Task %(task_name)s: %(message)s'''
六、典型业务场景案例
6.1 电商大促动态调度
def adjust_schedule(event_type):if event_type == 'flash_sale':PeriodicTask.objects.filter(name='daily_cleanup').update(enabled=False)PeriodicTask.objects.create(name='flash_sale_monitor',task='monitor.track_sales',interval=IntervalSchedule.objects.get(every=10, period='seconds'),enabled=True)
6.2 金融对账系统
# 节假日感知调度
class HolidayAwareCrontab(CrontabSchedule):def is_due(self, last_run_at):if is_holiday(datetime.now()):return False, 3600*24 # 跳过节假日return super().is_due(last_run_at)# 自定义调度器
app.conf.beat_schedulers = {'holiday': 'proj.schedulers.HolidayAwareScheduler'
}
七、最佳实践总结
高可用架构检查清单:
- 使用RedBeat或Database调度器
- 配置合理的锁超时时间(建议30-60秒)
- 实现节点心跳监控
- 启用任务历史记录
- 设置任务执行超时时间
性能优化参数推荐:
app.conf.beat_max_loop_interval = 30 # 最大调度间隔
app.conf.worker_prefetch_multiplier = 4 # 预取数量
app.conf.task_acks_late = True # 确保任务不丢失
app.conf.task_reject_on_worker_lost = True
灾备方案示例:
# 主节点
celery -A proj beat --scheduler=redbeat.RedBeatScheduler# 备用节点
celery -A proj beat --scheduler=redbeat.RedBeatScheduler \--redbeat_initial_lock_ttl=30
通过合理设计定时任务架构,可以实现:
- 99.99%的调度可用性
- 分钟级的故障转移能力
- 动态调整的灵活调度策略
- 日均百万级定时任务处理能力
扩展建议:
- 集成Prometheus监控指标
- 实现调度可视化看板
- 开发灰度发布功能
- 构建自动化测试套件