用 Java 实现一个简单的阻塞队列
用 Java 实现一个简单的阻塞队列
这篇文章将分析阻塞队列的实现原理,并给出基于单锁 & 单条件变量的实现方案。
(一)定义接口 MyBlockingQueue
/*** 一个线程安全的阻塞队列接口 支持生产者和消费者模型* 当队列满时 生产者会被阻塞* 当队列空时 消费者会被阻塞** @param <T> 队列中元素的类型*/
public interface MyBlockingQueue<T> {/*** 尝试将指定元素插入队列 如果队列已满 则立即返回插入失败* 此方法通常是非阻塞的** @param t 要添加的元素* @return 如果元素被成功添加到队列中则返回 true 如果队列已满则返回 false* @throws NullPointerException 如果指定元素为 null*/boolean offer(T t);/*** 将指定元素插入队列 如果队列已满 则等待 直到有空间可用* 此方法是阻塞的** @param t 要添加的元素* @throws InterruptedException 如果在等待时被中断* @throws NullPointerException 如果指定元素为 null*/void put(T t) throws InterruptedException;/*** 检索并移除队列的头部元素 如果队列为空 则立即返回 null* 此方法通常是非阻塞的** @return 队列的头部元素 如果队列为空则返回 null*/T poll();/*** 检索并移除队列的头部元素 如果队列为空 则等待 直到有元素可用* 此方法是阻塞的** @return 队列的头部元素* @throws InterruptedException 如果在等待时被中断*/T take() throws InterruptedException;
}
(二)基于单锁 & 单条件变量的阻塞队列实现
在这个实现中,我们使用 Java 内置锁(即同步监视器)作用在所有方法上,并使用与该锁绑定的等待队列。具体工作原理如下:
-
竞争队列与等待队列
- 每一个同步监视器都有一个竞争队列和等待队列,分别用于管理由于竞争锁失败和等待条件而进入阻塞状态的线程
- 同一时刻只能有一个生产者或消费者线程持有锁,独占阻塞队列的使用权
- 其余线程需要进入同步监视器的竞争队列中阻塞,这些线程会在锁被释放后恢复运行态并竞争锁
- 持有锁的线程如果执行了
wait()
方法,说明某个条件不成立,就会释放锁,并进入同步监视器的等待队列中阻塞 - 持有锁的线程如果执行了
notifyAll()
方法,说明某个条件刚刚成立,就会将等待队列中所有等待的线程移至竞争队列,使其有机会重新竞争锁(需要注意这些线程依然处于阻塞态)
-
队列非空且非满时
- 消费者线程可以正常取出元素
- 生产者线程可以正常插入元素
- 所有操作完成后都会调用
notifyAll()
唤醒可能等待的线程
-
队列为空时
- 消费者线程行为
- 调用
poll()
方法:直接返回null
(非阻塞) - 调用
take()
方法:执行wait()
,线程进入同步监视器的等待队列阻塞
- 调用
- 生产者线程行为
- 调用
offer()
或put()
方法:成功插入元素后执行notifyAll()
方法
- 调用
- 消费者线程行为
-
队列为满时
- 生产者线程行为
- 调用
offer()
方法:直接返回false
(非阻塞) - 调用
put()
方法:执行wait()
,线程进入同步监视器的等待队列阻塞
- 调用
- 生产者线程行为
-
消费者线程行为
- 调用
poll()
或take()
方法:成功取出元素后执行notifyAll()
方法
- 调用
-
虚假唤醒问题分析及其解决方法
-
假设同步监视器中竞争队列和等待队列都为空
-
假设阻塞队列为空,消费者 C1 获取锁并调用
take()
方法,它将释放锁并进入阻塞态,并进入等待队列中等待,此时等待队列中有 C1 一个线程 -
紧接着消费者 C2 获取锁并调用
take()
方法,也释放锁并进入阻塞态,并进入等待队列中等待,此时等待队列中有 C1 和 C2 两个线程 -
此时生产者 P 获取锁并插入一个元素,然后执行
notifyAll()
方法,等待队列中 C1 和 C2 被移入竞争队列中,此时竞争队列有两个线程,等待队列为空 -
然后生产者 P 释放锁,此时竞争队列中的 C1 和 C2 争抢锁,假设 C1 抢到锁,C1 取出阻塞队列中的唯一元素并返回,然后释放锁
-
然后 C2 抢到锁,C2 进入阻塞队列中取出元素并发现阻塞队列为空,程序出错
-
-
根据上面的分析,消费者 C2 被虚假唤醒
- 我们需要让消费者消费之前,再次检查队列是否为空,如果为空继续进入等待队列中等待
- 因此,我们必须使用 while 循环块包围条件,而不是 if 条件块
-
public class MyLinkedBlockingQueue<T> implements MyBlockingQueue<T> {private final Queue<T> queue = new LinkedList<>();private final int capacity;public MyLinkedBlockingQueue(int capacity) {this.capacity = capacity;}public synchronized boolean offer(T t) {if (queue.size() >= capacity) {return false;}queue.offer(t);this.notifyAll();return true;}public synchronized void put(T t) throws InterruptedException {while (queue.size() == capacity) {this.wait();}queue.add(t);this.notifyAll();}public synchronized T poll() {T t = queue.poll();if (t != null) {this.notifyAll();}return t;}public synchronized T take() throws InterruptedException {while (queue.isEmpty()) {this.wait();}T t = queue.remove();this.notifyAll();return t;}
}