深入了解阻塞队列的实现
目录
1、介绍
2、特性
2.1、线程安全
2.2、阻塞行为
3、实现方式
3.1、基于数组的实现
3.2、基于链表的实现
4、分类
4.1.ArrayBlockingQueue(有界)
4.2.LinkedBlockingQueue(可选界限)
4.3.PriorityBlockingQueue(无界)
4.4.DelayQueue(无界)
5、线程阻塞和唤醒
5.1. 阻塞状态
5.2. 监视器锁
5.3. 唤醒
前言
阻塞队列是一种特殊的队列,同样遵循“先进先出”的原则,支持入队操作和出队操作。在此基础上,阻塞队列会在队列已满或队列为空时陷入阻塞,使其成为一个线程安全的数据结构。
如下图所示:
1、介绍
阻塞队列(Blocking Queue)是一种支持在队列为空时进行等待获取元素、在队列满时进行等待插入元素的队列数据结构。
阻塞队列的特性使得多线程程序在处理生产者-消费者模型时更加简单有效。
如下图所示:
2、特性
阻塞队列是设计用于多线程环境中的,确保多个线程在添加或删除元素时是安全的。内部实现通常会使用锁(如
ReentrantLock
)来防止并发问题,如数据破损和不一致。
2.1、线程安全
阻塞队列在多线程环境下是安全的,多个线程可以并发地进行插入、删除等操作,而不需要额外的同步措施。
如下图所示:
2.2、阻塞行为
当一个线程试图从空队列中获取元素时,它会被阻塞,直到队列中有元素可用。当一个线程试图向满队列中插入元素时,它也会被阻塞,直到队列有空闲位置。
3、实现方式
根据java的类库,可以看到有以下几种常用的阻塞队列:
由上图可知主要分为有界和无界,数组和链表两种方式来划分。
3.1、基于数组的实现
采用固定大小的数组来存储队列元素,使用两个指针(front 和 rear)来指示队列的头部和尾部。
在创建
ArrayBlockingQueue
时,可以指定一个额外的参数,用于设置公平性策略(即fair
属性)。这个参数控制多个线程对于访问队列的调度方式。
1.fair 参数
ArrayBlockingQueue
的构造函数可以接受一个布尔值 fair
,用于指定是否采用公平策略:
ArrayBlockingQueue(int capacity, boolean fair)
- capacity: 队列的容量(即最大存储元素的数量)。
- fair: 如果为
true
,则采用公平策略;如果为false
,则采用非公平策略。
2.公平与非公平
-
公平(fair):
- 公平策略意味着,对于多个线程加入队列,线程将按照请求的顺序进行访问(即先入先服务)。在多个线程同时等待获取或插入元素时,先请求的线程会得到优先处理。
- 这可以避免线程在竞争时饥饿(starvation)现象的发生。
-
非公平(unfair):
- 非公平策略允许天生的竞争,线程在访问队列时不一定遵循请求的顺序。
- 在高并发的情况下,它可能会提供更好的吞吐量,因为它允许等待的线程在队列释放锁时立即获取锁,而不是按照严格的顺序排队。
尽管这个会造成不公平,但是不影响线程的安全问题。
3.2、基于链表的实现
使用链表来动态存储元素,可以更灵活地处理队列大小。
也需要实现 wait
和 notify
来控制线程的阻塞。
总结
private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes *///队列非空private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts *///队列未满private final Condition notFull = putLock.newCondition();
通过结合使用:
//阻塞
notFull.await();
//唤醒
notFull.signal();
两者需要实现 a
wait()
和singal()
机制来处理线程的阻塞和唤醒。
4、分类
4.1.ArrayBlockingQueue(有界)
基于数组的有界阻塞队列,创建时必须指定容量。插入和删除操作都是阻塞的。
内部使用ReentrantLock(可重入锁)( 内部维护一个整数标志 state
来表示当前锁的状态。0 表示锁未被占用,正数表示锁被占用,数值表示当前获得此锁的线程的重入次数)来保证线程安全。
由一个ReentrantLock在
put()
和take()
方法中,ArrayBlockingQueue
使用锁来确保同一时间只有一个线程能够访问和修改队列状态。
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}
在构造函数的时候,直接加锁,而非对take和put方法进行单独加锁。
具体想了解ReentrantLock,可参考:java常用的锁_java 常用锁-CSDN博客
代码示例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;class Producer implements Runnable {private BlockingQueue<Integer> queue;public Producer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {for (int i = 0; i < 10; i++) {System.out.println("Producing: " + i);queue.put(i); // 将元素放入队列,如果队列满则等待Thread.sleep(100); // 模拟生产延迟}// 生产完成后可以添加一个特殊元素表示结束queue.put(-1); // -1 表示结束信号} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 处理中断}}
}class Consumer implements Runnable {private BlockingQueue<Integer> queue;public Consumer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {int value = queue.take(); // 从队列中获取元素,如果队列空则等待if (value == -1) {break; // 如果获取到结束信号,则退出}System.out.println("Consuming: " + value);Thread.sleep(200); // 模拟消费延迟}} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 处理中断}}
}public class ArrayBlockingQueueExample {public static void main(String[] args) {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // 创建一个容量为5的阻塞队列Producer producer = new Producer(queue);Consumer consumer = new Consumer(queue);Thread producerThread = new Thread(producer);Thread consumerThread = new Thread(consumer);producerThread.start();consumerThread.start();try {producerThread.join(); // 等待生产者线程完成consumerThread.join(); // 等待消费者线程完成} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 处理中断}System.out.println("Processing finished.");}
}
4.2.LinkedBlockingQueue(可选界限)
-
基于链表的阻塞队列,支持可选的界限,如果没有设定界限,默认的最大容量为 Integer.MAX_VALUE。
-
与 ArrayBlockingQueue 相比,具有更好的潜在性能,因为它支持更高的并发性。
同样内部也使用了ReentrantLock,不过put和toke的时候分别加锁。
private final ReentrantLock putLock = new ReentrantLock();private final ReentrantLock takeLock = new ReentrantLock();
4.3.PriorityBlockingQueue(无界)
默认初始化容量为11。
private static final int DEFAULT_INITIAL_CAPACITY = 11;public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);
}
对于无界的增加数据,可参考下面代码示例:
import java.util.concurrent.PriorityBlockingQueue;public class PriorityBlockingQueueDemo {public static void main(String[] args) {PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();// 添加大量元素,观察扩容情况for (int i = 0; i < 100; i++) {queue.offer(i);System.out.println("Added: " + i);System.out.println("Current Queue Size: " + queue.size());}// 处理优先级队列里的元素while (!queue.isEmpty()) {System.out.println("Processing: " + queue.poll());}}
}
因此:
-
支持优先级的阻塞队列,队列中的元素会根据自然排序或指定 comparator 进行排序。
-
不支持界限,因此可以无限插入元素。
4.4.DelayQueue(无界)
一种支持延迟获取元素的阻塞队列,元素会在设定的延迟时间后才可以被获取。
特别适合用于实现各种基于时间的功能,比如定时任务、缓存过期、任务调度等。
5、线程阻塞和唤醒
在 多线程编程中,使用
wait()
,notify()
, 和notifyAll()
方法是实现线程间通信和协调的一种传统机制。这些方法通常与对象的监视器(monitor)相关联,是实现阻塞和唤醒机制的基础。
5.1. 阻塞状态
当一个线程试图从空队列中读取元素时(例如,调用 take()
方法),如果该队列为空,线程会被阻塞,从而进入等待状态。
同样,当一个线程试图向满队列中添加元素时(例如,调用 put()
方法),如果该队列已满,它也会被阻塞。
5.2. 监视器锁
阻塞队列的实现通常使用一个监视器锁(或称为互斥锁)来保护访问其共享状态(即队列的元素和相关标志)。
当线程在执行
put()
或take()
方法时,它会先获取该锁,进行状态检查,然后决定是否需要阻塞。一旦当前线程被阻塞(例如,通过
wait()
方法),它会释放锁,允许其他线程进入队列的操作。
5.3. 唤醒
notify()
:
唤醒在该对象监视器上等待的一个线程。如果有多个线程在等待,这个方法会随机选择其中一个进行唤醒。
notifyAll()
:
唤醒在该对象监视器上等待的所有线程。这一般用于复杂的条件变量设计中,确保所有可能等待的线程都有机会被唤醒。
总结
在之前的文章介绍数据结构的时候,简单对常用的数据结构进行了介绍,如数组、集合、树、hash、队列:java集合的介绍-CSDN博客
感兴趣的可以了解下哦!