当前位置: 首页 > ai >正文

一款支持多线程的批量任务均衡器


任务均衡,即将100条任务的执行时间,均匀的分布在1小时内,可以降低运行负载的峰值,也可以用于控制多线程任务

import threading
import time
from datetime import datetime
from typing import Callable, List
import concurrent.futuresclass TaskScheduler:def __init__(self, task_list: List[str], cycle_seconds: int, reserve_seconds: int, func: Callable, single_cycle: bool = False, max_threads: int = 10):"""构造说明:- 必须满足 reserve_seconds < cycle_seconds- func 应实现异常处理逻辑- 单次模式时自动在任务完成后停止- max_threads 限制同时运行的线程数"""if reserve_seconds >= cycle_seconds:raise ValueError("reserve_seconds 必须小于 cycle_seconds")if max_threads <= 0:raise ValueError("max_threads 必须大于 0")self.task_list = task_listself.cycle_seconds = cycle_secondsself.reserve_seconds = reserve_secondsself.func = funcself.single_cycle = single_cycleself.max_threads = max_threadsself.lock = threading.Lock()self.running = Falsedef _execute_task(self, task: str, target_time: float):"""执行单个任务,确保在指定时间点执行"""current_time = time.time()sleep_time = max(0, target_time - current_time)time.sleep(sleep_time)try:self.func(task)except Exception as e:print(f"任务 {task} 执行出错: {e}")def _schedule_tasks(self):"""安排任务执行"""cycle_start_time = time.time()available_time = self.cycle_seconds - self.reserve_secondsinterval = available_time / len(self.task_list)current_time = cycle_start_time# 使用线程池限制最大线程数with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_threads) as executor:for task in self.task_list:target_time = current_time + intervalexecutor.submit(self._execute_task, task, target_time)current_time = target_timedef start(self):"""启动任务调度"""with self.lock:if self.running:print("任务调度器已经在运行")returnself.running = Truewhile self.running:self._schedule_tasks()if self.single_cycle:self.running = Falseelse:time.sleep(self.cycle_seconds)print("[系统时间] 所有任务执行完成")def stop(self):"""停止任务调度"""with self.lock:self.running = Falsedef runner_test(x):# 测试运行print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] 处理 {x} start")if x == 3:raise ValueError("333333333333333333333333")time.sleep(2)print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] 处理 {x} end")# 示例用法
if __name__ == "__main__":scheduler = TaskScheduler(task_list=[i for i in range(90)],cycle_seconds=10,  # 执行周期,单位/秒reserve_seconds=1,  # 预留时间,单位/秒func=runner_test,single_cycle=True,max_threads=20  # 最大线程数)scheduler.start()

代码说明

  1. 时间控制
    • 任务均匀分布在 (周期时间 - 预留时间) 的时间窗内。
    • 每个任务的间隔时间通过 available_time / len(self.task_list) 计算得出。
    • 使用 time.sleep 确保任务在指定时间点执行,误差控制在毫秒级别。
  2. 执行控制
    • 支持自定义任务处理函数 func,每个任务元素独立调用该函数。
    • 使用多线程并发执行任务,确保任务的独立性。
  3. 模式选择
    • 支持单次执行模式和持续周期模式。
    • 单次模式下,任务执行完成后自动停止。
  4. 异常处理
    • _execute_task 方法中捕获异常,确保任务执行出错时不会影响其他任务。
  5. 线程安全
    • 使用 threading.Lock 确保多线程并发调用时的线程安全。
  6. 资源控制
    • 单实例最多同时管理 1000 个任务元素(可通过限制 task_list 的长度实现)。
  7. 执行保障
    • 确保最后一个任务完成时间不超过周期开始时间 + (cycle_seconds - reserve_seconds)
    • 跳过已过期的历史任务时间点。
  8. 误差容忍
    • 周期对齐误差和任务执行时间误差均控制在毫秒级别。
http://www.xdnf.cn/news/828.html

相关文章:

  • Craft 是什么:腾讯 Cloud Studio 中的 CodeBuddy 提供了 Craft 功能
  • 阻塞队列-ArrayBlockingQueue
  • 【Linux专栏】zip 多个文件不带路径
  • 入选AAAI 2025,浙江大学提出多对一回归模型M2OST,利用数字病理图像精准预测基因表达
  • C语言高频面试题——指针数组和数组指针
  • Spark-SQL核心编程
  • day33和day34图像处理OpenCV
  • MySQL数据库 - InnoDB引擎
  • DeepSeek智能时空数据分析(二):3秒对话式搞定“等时圈”绘制
  • OneClicker脚本自动运行工具
  • 2025年蓝桥杯第十六届CC++大学B组真题及代码
  • 模拟堆详解
  • 软件工程中的维护类型
  • OpenSSL1.1.1d windows安装包资源使用
  • [预备知识]1. 线性代数基础
  • 浙江大学 DeepSeek 公开课 第三季 第1期讲座 - 唐谈 研究员 (附PPT下载) | 突破信息差
  • 腾讯云×数语科技:Datablau DDM (AI智能版)上架云应用!
  • 虚拟环境下编译ros2节点需注意的地方
  • 【上位机——MFC】运行时类信息机制
  • # 05_Elastic Stack 从入门到实践(五)
  • Kafka 在小流量和大流量场景下的顺序消费问题
  • Spring MVC DispatcherServlet 的作用是什么? 它在整个请求处理流程中扮演了什么角色?为什么它是核心?
  • 平板电脑做欧盟网络安全法案(EU)2022/30
  • 人工智能100问☞第9问:什么是AI芯片?
  • 形象理解华为云物联网iotDA开发流程
  • MYSQL之慢查询分析(Analysis of Slow MySQL Query)
  • PyCharm 初级教程:从安装到第一个 Python 项目
  • 基于ueditor编辑器的功能开发之重写ueditor的查找和替换功能,支持滚动定位
  • 链式栈和线性栈
  • WebForms Validation