阻塞式队列
一、什么是阻塞队列?
阻塞队列(Blocking Queue)是一种特殊的队列,支持以下两种附加操作:
- 阻塞插入:当队列满时,插入元素的线程会被阻塞,直到队列有空位。
- 阻塞移除:当队列空时,移除元素的线程会被阻塞,直到队列有新元素。
它通过内置的等待/唤醒机制,自动管理线程的阻塞和唤醒,极大简化了多线程协作的复杂度。
二、阻塞队列的用途
生产者-消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等 待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
1) 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.
比如在 "秒杀" 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求, 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放 到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求. 这样做可以有效进行 "削峰", 防止服务器被突然到来的一波请求直接冲垮.
2) 阻塞队列也能使生产者和消费者之间 解耦.
比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺 子皮的人就是 "生产者", 包饺子的人就是 "消费者". 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人 也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超 市买的).
线程池任务管理
线程池使用阻塞队列保存待执行任务,当任务满时拒绝新任务,空时工作线程等待。
流量削峰与缓冲
在突发流量场景下,队列作为缓冲区平衡生产者和消费者的处理速度差异。
三、Java标准库中的阻塞队列
Java在java.util.concurrent
包中提供了多种阻塞队列实现:
类名 | 特点 |
---|---|
ArrayBlockingQueue | 有界队列,基于数组实现,固定容量。 |
LinkedBlockingQueue | 可选有界或无界,基于链表实现,吞吐量较高。 |
PriorityBlockingQueue | 支持优先级排序的无界队列。 |
SynchronousQueue | 不存储元素,每个插入操作必须等待一个移除操作。 |
常用方法
put(E e)
: 插入元素,队列满时阻塞。take()
: 移除并返回元素,队列空时阻塞。offer(E e, long timeout, TimeUnit unit)
: 插入元素,超时后放弃。poll(long timeout, TimeUnit unit)
: 移除元素,超时后放弃。
示例代码
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class BlockingQueueDemo {public static void main(String[] args) {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); // 容量10// 生产者线程new Thread(() -> {try {for (int i = 0; i < 20; i++) {queue.put(i); // 队列满时自动阻塞System.out.println("Produce: " + i);}} catch (InterruptedException e) {e.printStackTrace();}}).start();// 消费者线程new Thread(() -> {try {for (int i = 0; i < 20; i++) {Thread.sleep(1000);int num = queue.take(); // 队列空时自动阻塞System.out.println("Consume: " + num);}} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}
四、阻塞队列实现
- 通过 "循环队列" 的方式来实现.
- 使用 synchronized 进行加锁控制.
- put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一 定队列就不满了, 因为同时可能是唤醒了多个线程).
- take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
import java.util.Random;/*** 实现一个简单的阻塞队列*/
public class BlockQueueF {// 存储队列元素的数组(容量固定)private final int[] items = new int[1000]; private int size = 0; // 当前队列元素数量private int head = 0; // 队头指针(下一个要取的元素位置)private int tail = 0; // 队尾指针(下一个要放的元素位置)/*** 入队操作(生产者调用)* @param value 要添加的元素*/public void put(int value) throws InterruptedException {synchronized (this) { // 同步代码块保证线程安全// 当队列满时阻塞等待(使用while防止虚假唤醒)while (size == items.length) { // 循环检查队列是否已满this.wait(); // 释放锁并等待}items[tail] = value; // 将元素放入队尾tail = (tail + 1) % items.length; // 环形队列指针移动size++; // 更新队列大小this.notifyAll(); // 唤醒可能阻塞的消费者线程}}/*** 出队操作(消费者调用)* @return 取出的元素*/public int take() throws InterruptedException {int result;synchronized (this) {// 当队列空时阻塞等待while (size == 0) { // 循环检查队列是否为空this.wait();}result = items[head]; // 取出队头元素head = (head + 1) % items.length; // 环形队列指针移动size--; // 更新队列大小this.notifyAll(); // 唤醒可能阻塞的生产者线程}return result;}/*** 获取队列当前元素数量*/public synchronized int size() { // 添加方法同步return size;}/*** 测试代码:生产者-消费者模型演示*/public static void main(String[] args) throws InterruptedException {BlockQueueF blockingQueue = new BlockQueueF();// 消费者线程(持续取出数据)Thread customer = new Thread(() -> {while (true) {try {int value = blockingQueue.take(); // 阻塞获取数据System.out.println("消费: " + value);} catch (InterruptedException e) {e.printStackTrace();}}}, "消费者线程");// 生产者线程(持续生成随机数)Thread producer = new Thread(() -> {Random random = new Random();while (true) {try {int num = random.nextInt(10000);blockingQueue.put(num); // 阻塞插入数据System.out.println("生产: " + num);Thread.sleep(500); // 模拟生产耗时} catch (InterruptedException e) {e.printStackTrace();}}}, "生产者线程");// 启动线程producer.start();customer.start();// 等待线程结束(实际会无限运行)producer.join();customer.join();}
}
关键注释说明
同步控制
synchronized (this) { ... } // 保证原子性操作
使用内置锁确保多线程操作的原子性。
等待/唤醒机制
while (size == items.length) { this.wait(); // 队列满时阻塞生产者
}
this.notifyAll(); // 数据变化后唤醒等待线程
while
代替if
防止虚假唤醒(Spurious Wakeup)notifyAll()
同时唤醒生产者和消费者
五、总结
阻塞队列是多线程编程中的核心工具,它通过封装复杂的同步逻辑,让开发者更专注于业务实现。无论是使用Java标准库的实现,还是手动编写简易版本,其核心都是通过线程安全和等待唤醒机制协调不同线程的工作节奏。理解阻塞队列的设计思想,能帮助开发者更好地应对高并发场景下的资源共享问题。