当前位置: 首页 > news >正文

手撕定时任务

手撕定时任务

源码:https://gitee.com/bossDuy/hand-tearing-timed-task

我们想要实现的功能:

public class ScheduleService {//每隔delay毫秒执行一次taskvoid schedule(Runnable task,long delay) {}
}

那么首先考虑谁去执行这个task?线程池中线程可以吗?

    void schedule(Runnable task, long delay) throws InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(5);while (true) {Thread.sleep(delay);pool.execute(task);}}

但是这样存在问题,我们线程池的大小是5,如果我们第6次调用该方法,也就是调用任务的第6次,就会出现问题

我们考虑一个组件trigger,该组件阻塞delay时间,然后被唤醒的时候将task将给线程池执行,具体如下

package com.yb0os1;import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.LockSupport;public class ScheduleService {private ExecutorService pool = Executors.newFixedThreadPool(10);Trigger trigger = new Trigger();public void schedule(Runnable schedule, long delay) {Job job = new Job(schedule, System.currentTimeMillis()+delay,delay);trigger.jobQueue.offer(job);trigger.wakeUp();}public void schedule(Job job) {job.setStartTime(System.currentTimeMillis()+job.getDelay());trigger.jobQueue.offer(job);trigger.wakeUp();}//中断任务public void interrupt(Job job){trigger.jobQueue.removeIf(e->e.equals(job));}class Trigger {private final PriorityBlockingQueue<Job> jobQueue = new PriorityBlockingQueue<>();Thread thread = new Thread(() -> {while (true) {//如果为空 那么阻塞  为了防止虚假唤醒 需要whilewhile (jobQueue.isEmpty()) {LockSupport.park();}//先获取最先要执行的Job job = jobQueue.peek();if (job.getStartTime()<System.currentTimeMillis()){//任务可以执行//这里一次peek一次poll为了防止执行延迟为1s的任务的时候 插入了延迟为500ms的任务 需要先执行延迟为500ms的任务job = jobQueue.poll();pool.execute(job.getTask());//我们是定时任务 执行了这次还要计算下次要执行的时间点Job nextJob = new Job(job.getTask(),System.currentTimeMillis()+job.getDelay(),job.getDelay());jobQueue.offer(nextJob);}else{//最近要执行的任务都不可以执行 延迟等待到开始时间//如果等待的时候添加了一个任务 那么会被唤醒 重新走一遍逻辑LockSupport.parkUntil(job.getStartTime());}}});{thread.start();}public void wakeUp() {LockSupport.unpark(thread);}}//Job就是我们的执行的任务对象 包括如下static class Job implements Comparable<Job>{private Runnable task;private long startTime;private long delay;public void setStartTime(long startTime) {this.startTime = startTime;}public Job(Runnable task, long delay){this.task = task;this.delay = delay;}public Job(Runnable task, long startTime, long delay) {this.task = task;this.startTime = startTime;this.delay = delay;}public long getStartTime() {return startTime;}public Runnable getTask() {return task;}public long getDelay() {return delay;}public int compareTo(Job o) {return Long.compare(this.startTime,o.getStartTime());}public boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;Job job = (Job) o;return delay == job.delay && Objects.equals(task, job.task);}public int hashCode() {return Objects.hash(task, delay);}}
}

测试

package com.yb0os1;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;public class Main {public static void main(String[] args) throws InterruptedException {ScheduleService scheduleService = new ScheduleService();DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("mm:ss SSS");ScheduleService.Job job = new ScheduleService.Job(() -> System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "--100ms一次的任务"),100);scheduleService.schedule(job);Thread.sleep(1000);scheduleService.interrupt(job);scheduleService.schedule(() -> System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "--200ms一次的任务"), 200);}
}

在这里插入图片描述

是有几十毫秒的误差,这是正常的

  • LockSupport.parkUntil() 的精度限制

  • 使用 PriorityBlockingQueue 实现优先级队列带来的额外开销

  • 多线程协作带来的额外延迟

    • schedule() --> Trigger.jobQueue.offer() --> LockSupport.unpark() --> Thread 被唤醒 --> poll 并执行任务
      

我们就算使用JDK提供的标准定时任务工具类ScheduledExecutorService

    public static void main(String[] args) throws InterruptedException {
//        ScheduleService scheduleService = new ScheduleService();
//        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("mm:ss SSS");
//        ScheduleService.Job job = new ScheduleService.Job(() -> System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "--100ms一次的任务"),100);
//        scheduleService.schedule(job);
//        Thread.sleep(1000);
//        scheduleService.interrupt(job);
//        scheduleService.schedule(() -> System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "--200ms一次的任务"), 200);ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);pool.scheduleAtFixedRate(() -> System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("mm:ss SSS")) + "--100ms一次的任务"), 100, 100, java.util.concurrent.TimeUnit.MILLISECONDS);}

在这里插入图片描述

http://www.xdnf.cn/news/910387.html

相关文章:

  • mamba架构和transformer区别
  • 制作电子相册
  • 【深度学习新浪潮】RoPE对大模型的外推性有什么影响?
  • Gojs渲染实线、虚线
  • 单周期cpu和多周期cpu、单周期数据通路和多周期数据通路与总线结构数据通路和专用数据通路的关系
  • JAVA学习 DAY2 java程序运行、注意事项、转义字符
  • 实现echarts全屏的放大/缩小最优解
  • Kyosan K5BMC ELECTRONIC INTERLOCKING MANUAL 电子联锁
  • 【PmHub面试篇】性能监控与分布式追踪利器Skywalking面试专题分析
  • pp-ocrv5改进
  • 核弹级漏洞深度解析:Log4j2 JNDI注入攻击原理与防御实战
  • [IMX][UBoot] 01.UBoot 常用命令
  • 【八股消消乐】MySQL参数优化大汇总
  • 使用 Python 和 HuggingFace Transformers 进行对象检测
  • xpath表达式的常用知识点
  • K7 系列各种PCIE IP核的对比
  • 每日算法 -【Swift 算法】电话号码字母组合
  • Keil调试模式下,排查程序崩溃简述
  • 六、【ESP32开发全栈指南:深入解析ESP32 IDF中的WiFi AP模式开发】
  • 读《创新者的窘境》二分 - 破坏性创新与延续性创新
  • 飞牛使用Docker部署Tailscale 内网穿透教程
  • KL散度计算示例:用户画像 vs. 专辑播放分布的性别偏好分析
  • MySQL查询语句
  • 02 nginx 的环境搭建
  • 禅道5月更新速览 | 新增交付物配置功能,支持建立跨执行任务依赖关系,研发效能平台上线
  • 6个可提升社媒投资回报率的Facebook KPI
  • 基于tensorflow实现的猫狗识别
  • 配置git命令缩写
  • 学习记录aigc
  • 智能制造数字孪生全要素交付一张网:智造中枢,孪生领航,共建智造生态共同体