阻塞队列BlockingQueue解析
阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除的方法。
阻塞插入:当队列满的时候,队列会阻塞插入元素的线程,直到队列不满。
阻塞移除:当队列空的时候,队列会阻塞移除元素的线程,直到队列不满。
对于插入和移除的操作则是提供了四种处理方式
抛出异常:当队列满时候插入元素的时候则会直接抛出异常,为空时取出元素也会抛出元素。
返回特殊值:这种方式不会阻塞线程而是直接返回boolean类型的值表示是否插入成功或者移除成功
一直阻塞:插入或者移除的操作不满足条件的时候会进行阻塞放入Condition的队列中直到满足条件后被唤醒
超时退出:进行阻塞是含有时间限制的,超出时间之后线程则会退出方法,返回boolean类型的值
如果采用的是无界队列那么进行使用put或者offer方法的时候永远不会阻塞,并且使用offer方法时候永远返回true
JDK 7 提供了 7 个阻塞队列,如下。
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
ArrayBlockingQueue是一个由数组实现的有界阻塞队列,此队列按照先进先出的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程考研找阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平则是当队列可用的时候所有的阻塞线程都会抢夺访问队列的资格。一般公平性通常会降低吞吐量。
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}
初始化ArrayBlockingQueue的时候第一个参数是容量第二个参数则是是否公平访问。
LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序
PriorityBlockingQueue则是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排序,也可以指定Comparator参数来实现对元素的排序,或者重写类对象的compareTo方法来进行排序。需要注意的是不能保证同优先级元素的顺序。
DelayQueue是一个支持延迟获取元素的无界阻塞队列。队列使用的是PriorityQueue来实现的队列中的元素必须实现了Delayed接口,在创建元素的时候可以指定多久才能从队列中获取当前的元素。只有延迟期满之后才能从队列中提取元素。DelayQueue使用的场景:缓存设置,使用一个线程循环查询DelayQueue如果获取到元素则表明这个元素的缓存有效期到期了,定时任务调度,当获取到元素的时候表示当前元素已经经历了指定时间的等待。
SynchronousQueue则是一个不存储元素的阻塞队列,队列中的每个put操作必须等待一个take操作才会执行否则不会执行,支持公平访问队列,默认情况下为非公平策略的访问。也就是说SynchronousQueue实际上没有队列而是直接将生产者的产品直接传递给消费者。
LinkedTransferQueue是一个无界的由链表组成的阻塞队列,相比于其他阻塞队列多了一个transfer和tryTransfer的方法来执行,这个方法的特点和SynchronousQueue类似而是将生产者或者消费者的数据直接传递给对方而不存储到队列中去,同时如果没有对应的线程接收数据就会进入等待。而tryTransfer则是不会等待直接返回boolean的结果
源码原理分析:
下面我们就根据源码来进行原理分析:
先从两个核心方法put和take开始逐步解析
public void put(E var1) throws InterruptedException {checkNotNull(var1);ReentrantLock var2 = this.lock;var2.lockInterruptibly();try {while(this.count == this.items.length) {this.notFull.await();}this.enqueue(var1);} finally {var2.unlock();}}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}
在讲述put方法之前我们首先来看看ArrayBlockingQueue的内部属性
final Object[] items;int takeIndex;int putIndex;int count;final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;
其中items这个属性则是用来存储元素的,takeIndex
:消费者指针,标记队头位置(下一个取出的元素)。putIndex
:生产者指针,标记队尾位置(下一个插入的位置),count则是用来表示整个队列的资源数界限的。lock的目的很明显就是为了保证多线程调用时候的同步并且后续的两个Condition队列也是依赖lock来进行的为其提供了保障,同时也能证明阻塞队列是一个线程安全的队列。两个Condtion队列分别表示当前队列为空的时和队列满时候的等待队列。
然后我们开始分析put方法首先是调用了checkNotNull来检查传递的资源是否为空,为空则直接抛出异常。随后取出lock锁设置同步保障,然后通过while循环进行判断当前队列是否已经满了,如果满了就直接调用等待队列notFull来进行线程的阻塞处理(当线程被唤醒之后则会继续执行后续的方法),为什么设置while循环呢?确保条件真正满足:避免虚假唤醒和竞争条件导致的状态不一致。随后就调用enqueue来讲元素进行入队设置,更改索引以及队列数组等属性
最后我们再看看take方法,几乎与put方法差不多都是先获取锁然后判断当前队列条件是否满足不满足则会调用等待队列来讲线程进行阻塞。区别在于一个是将元素放到队列中去,另一个则是将元素进行取出。这就是阻塞队列的两个核心方法。
总结一下:
维度 | 实现机制 | 解决的问题 |
---|---|---|
线程安全 | ReentrantLock + 临界区操作 | 防止多线程数据竞争 |
阻塞控制 | Condition.await() /signal() | 精准线程挂起/唤醒 |
空间效率 | 循环指针(takeIndex /putIndex ) | O(1)复杂度避免数据迁移 |
状态管理 | count 原子计数器 | 快速判断空/满状态 |
资源协调 | 生产者-消费者条件分离(notFull /notEmpty ) | 解耦生产消费速率差异 |
阻塞队列的本质其实是依赖于Condition的同步队列工具来实现对线程的阻塞的,而Condtion又依赖于ReentrantLock的锁机制来实现线程的入队出队,这两个同步工具的底层依赖则都是AQS。AQS提供了这些操作的底层实现逻辑并且搭配LockSupport内的Unsafe本地方法来实现线程控制的精准控制,这些底层工具类一层层的堆叠从而完成了阻塞队列的阻塞功能。