手撕定时任务
手撕定时任务
源码:https://gitee.com/bossDuy/hand-tearing-timed-task
我们想要实现的功能:
public class ScheduleService {//每隔delay毫秒执行一次taskvoid schedule(Runnable task,long delay) {}
}
那么首先考虑谁去执行这个task?线程池中线程可以吗?
void schedule(Runnable task, long delay) throws InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(5);while (true) {Thread.sleep(delay);pool.execute(task);}}
但是这样存在问题,我们线程池的大小是5,如果我们第6次调用该方法,也就是调用任务的第6次,就会出现问题
我们考虑一个组件trigger,该组件阻塞delay时间,然后被唤醒的时候将task将给线程池执行,具体如下
package com.yb0os1;import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.LockSupport;public class ScheduleService {private ExecutorService pool = Executors.newFixedThreadPool(10);Trigger trigger = new Trigger();public void schedule(Runnable schedule, long delay) {Job job = new Job(schedule, System.currentTimeMillis()+delay,delay);trigger.jobQueue.offer(job);trigger.wakeUp();}public void schedule(Job job) {job.setStartTime(System.currentTimeMillis()+job.getDelay());trigger.jobQueue.offer(job);trigger.wakeUp();}//中断任务public void interrupt(Job job){trigger.jobQueue.removeIf(e->e.equals(job));}class Trigger {private final PriorityBlockingQueue<Job> jobQueue = new PriorityBlockingQueue<>();Thread thread = new Thread(() -> {while (true) {//如果为空 那么阻塞 为了防止虚假唤醒 需要whilewhile (jobQueue.isEmpty()) {LockSupport.park();}//先获取最先要执行的Job job = jobQueue.peek();if (job.getStartTime()<System.currentTimeMillis()){//任务可以执行//这里一次peek一次poll为了防止执行延迟为1s的任务的时候 插入了延迟为500ms的任务 需要先执行延迟为500ms的任务job = jobQueue.poll();pool.execute(job.getTask());//我们是定时任务 执行了这次还要计算下次要执行的时间点Job nextJob = new Job(job.getTask(),System.currentTimeMillis()+job.getDelay(),job.getDelay());jobQueue.offer(nextJob);}else{//最近要执行的任务都不可以执行 延迟等待到开始时间//如果等待的时候添加了一个任务 那么会被唤醒 重新走一遍逻辑LockSupport.parkUntil(job.getStartTime());}}});{thread.start();}public void wakeUp() {LockSupport.unpark(thread);}}//Job就是我们的执行的任务对象 包括如下static class Job implements Comparable<Job>{private Runnable task;private long startTime;private long delay;public void setStartTime(long startTime) {this.startTime = startTime;}public Job(Runnable task, long delay){this.task = task;this.delay = delay;}public Job(Runnable task, long startTime, long delay) {this.task = task;this.startTime = startTime;this.delay = delay;}public long getStartTime() {return startTime;}public Runnable getTask() {return task;}public long getDelay() {return delay;}public int compareTo(Job o) {return Long.compare(this.startTime,o.getStartTime());}public boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;Job job = (Job) o;return delay == job.delay && Objects.equals(task, job.task);}public int hashCode() {return Objects.hash(task, delay);}}
}
测试
package com.yb0os1;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;public class Main {public static void main(String[] args) throws InterruptedException {ScheduleService scheduleService = new ScheduleService();DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("mm:ss SSS");ScheduleService.Job job = new ScheduleService.Job(() -> System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "--100ms一次的任务"),100);scheduleService.schedule(job);Thread.sleep(1000);scheduleService.interrupt(job);scheduleService.schedule(() -> System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "--200ms一次的任务"), 200);}
}
是有几十毫秒的误差,这是正常的
-
LockSupport.parkUntil() 的精度限制
-
使用 PriorityBlockingQueue 实现优先级队列带来的额外开销
-
多线程协作带来的额外延迟
-
schedule() --> Trigger.jobQueue.offer() --> LockSupport.unpark() --> Thread 被唤醒 --> poll 并执行任务
-
我们就算使用JDK提供的标准定时任务工具类ScheduledExecutorService
public static void main(String[] args) throws InterruptedException {
// ScheduleService scheduleService = new ScheduleService();
// DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("mm:ss SSS");
// ScheduleService.Job job = new ScheduleService.Job(() -> System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "--100ms一次的任务"),100);
// scheduleService.schedule(job);
// Thread.sleep(1000);
// scheduleService.interrupt(job);
// scheduleService.schedule(() -> System.out.println(LocalDateTime.now().format(dateTimeFormatter) + "--200ms一次的任务"), 200);ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);pool.scheduleAtFixedRate(() -> System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("mm:ss SSS")) + "--100ms一次的任务"), 100, 100, java.util.concurrent.TimeUnit.MILLISECONDS);}