当前位置: 首页 > ds >正文

揭开并发编程的面纱:从零开始构建 Java 阻塞队列

并发编程是现代软件开发的基石,它让应用程序能够看似同时执行多个任务,从而提高响应速度和资源利用率。生产者-消费者模式是并发编程中一种非常经典的场景:一个或多个线程(生产者)负责生产数据或任务,而另外一个或多个线程(消费者)负责处理这些数据或任务。要高效、安全地实现这种模式,阻塞队列(Blocking Queue)扮演着至关重要的角色。

虽然 Java 的标准库(java.util.concurrent​)已经提供了非常健壮且高度优化的 BlockingQueue​ 实现(例如 ArrayBlockingQueue​, LinkedBlockingQueue​),但深入理解它们底层的工作原理,对于任何想要掌握并发编程的开发者来说都非常有价值。这能加深我们对同步(Synchronization)、等待(Wait)和通知(Notify)等核心概念的理解。

那么,今天就让我们一起踏上这段旅程,使用 Java 最基础的并发原语,从零开始构建一个简单的阻塞队列吧!

起点:一个基础的循环数组队列

我们阻塞队列的核心将是一个基于固定大小数组的循环缓冲区。我们需要以下几个关键组件:

  1. 一个数组 items​:用于实际存储元素。
  2. 一个索引 head​:指向队列头部,即下一个将被取出的元素的位置。
  3. 一个索引 tail​:指向队列尾部,即下一个新元素将被插入的位置。
  4. 一个计数器 size​:记录队列中当前元素的数量。

在不考虑线程安全的情况下,一个简单的循环队列实现大致如下(伪代码):

// 伪代码 - 非线程安全版本
public class SimpleCircularQueue {private int[] items;private int head = 0;private int tail = 0;private int size = 0;private int capacity;public SimpleCircularQueue(int capacity) {this.items = new int[capacity];this.capacity = capacity;}public void put(int elem) {if (size == capacity) {System.out.println("队列已满,无法添加!");return; // 或者抛出异常}items[tail] = elem;tail = (tail + 1) % capacity; // 循环的关键size++;}public int take() {if (size == 0) {System.out.println("队列为空,无法取出!");return -1; // 或者抛出异常}int elem = items[head];head = (head + 1) % capacity; // 循环的关键size--;return elem;}
}

这个基础版本在单线程环境下工作良好,但一旦有多个线程同时调用 put​ 和 take​,就会出现各种问题(竞态条件),导致数据不一致甚至程序崩溃。

引入线程安全:synchronized​ 关键字

为了让队列能在多线程环境下安全工作,我们需要确保对共享资源(items​ 数组、head​、tail​、size​)的访问是互斥的。Java 提供了 synchronized​ 关键字来实现这一点。我们可以用 synchronized(this)​ 将修改共享状态的关键代码块包裹起来,确保同一时间只有一个线程能进入这个代码块。

public class BlockingQueue {private int[] items;volatile private int head = 0; // 注意 volatilevolatile private int tail = 0; // 注意 volatilevolatile private int size = 0; // 注意 volatileprivate final Object lock = new Object(); // 可以用一个单独的对象锁,或直接用 thispublic BlockingQueue(int capacity){this.items = new int[capacity];}// ... put 和 take 方法将在这里添加 synchronized ...
}

(注:代码片段中提前加入了 volatile​,后面会解释)

实现“阻塞”:wait()​ 与 notify()​

仅仅 synchronized​ 只能保证互斥,但不能解决生产者在队列满时、消费者在队列空时需要等待的问题。这就是“阻塞”队列的核心所在。我们需要利用 Java 对象监视器(Monitor)提供的 wait()​ 和 notify()​ / notifyAll()​ 方法。

  • ​wait()​: 当一个线程调用某个对象(比如 this​ 或 lock​)的 wait()​ 方法时,它会:

    1. 释放该对象上的锁。
    2. 进入该对象的等待集(Wait Set),线程状态变为 WAITING 或 TIMED_WAITING。
    3. 暂停执行,直到其他线程调用该对象的 notify()​ 或 notifyAll()​,或者发生中断、虚假唤醒。
  • ​notify()​: 唤醒一个正在该对象等待集中的线程。被唤醒的线程并不会立即执行,而是需要重新尝试获取该对象的锁,成功后才能从 wait()​ 的地方继续执行。

  • ​notifyAll()​: 唤醒所有正在该对象等待集中的线程。

关键点:while​ 循环检查条件

在使用 wait()​ 时,必须将其放在一个 while​ 循环中来检查等待条件。这有两个原因:

  1. 虚假唤醒 (Spurious Wakeup): 线程可能在没有被 notify()​ 的情况下从 wait()​ 中醒来。如果只用 if​,线程醒来后不会重新检查条件,可能导致错误。
  2. 多个等待者: 如果有多个生产者(或消费者)在等待,notify()​ 只唤醒一个。如果使用 notifyAll()​,所有等待者都会被唤醒,它们都需要重新检查条件,只有满足条件的才能继续执行。

改造 put​ 方法 (入队):

    public void put(int elem) throws InterruptedException {synchronized (this){ // 获取锁// 使用 while 循环检查条件while(size >= items.length){// 队列满了,释放锁并等待System.out.println("队列已满,生产者 " + Thread.currentThread().getName() + " 进入等待...");this.wait(); // 释放锁,进入等待状态System.out.println("生产者 " + Thread.currentThread().getName() + " 被唤醒...");}// 条件满足 (队列未满),执行入队操作if(tail >= items.length ){ // 处理循环tail = 0;}items[tail] = elem;tail++;size++;System.out.println("生产元素: " + elem + ", size=" + size);// 成功入队,通知可能在等待的消费者this.notify(); // 唤醒一个等待的线程 (可能是消费者)} // 释放锁}

改造 take​ 方法 (出队):

    public int take() throws InterruptedException {synchronized (this){ // 获取锁// 使用 while 循环检查条件while (size == 0){// 队列为空,释放锁并等待System.out.println("队列为空,消费者 " + Thread.currentThread().getName() + " 进入等待...");this.wait(); // 释放锁,进入等待状态System.out.println("消费者 " + Thread.currentThread().getName() + " 被唤醒...");}// 条件满足 (队列非空),执行出队操作if (head >= items.length){ // 处理循环head = 0;}int elem = items[head];head++;size--;System.out.println("消费元素: " + elem + ", size=" + size);// 成功出队,通知可能在等待的生产者this.notify(); // 唤醒一个等待的线程 (可能是生产者)return elem;} // 释放锁}

​volatile​ 的作用:内存可见性

我们给 head​, tail​, size​ 加上了 volatile​ 关键字。它的主要作用是保证内存可见性和禁止指令重排。

  • 可见性: 当一个线程修改了 volatile​ 变量的值,这个新值对其他线程来说是立即可见的。这可以防止线程读取到过期的值。
  • 禁止指令重排: 编译器和处理器为了优化性能可能会改变代码的执行顺序,volatile​ 可以阻止对相关变量的读写操作进行重排。

在这个特定实现中 volatile​ 是否必需?

严格来说,因为所有对 head​, tail​, size​ 的读写操作都已经被包含在 synchronized​ 块内部,synchronized​ 本身就提供了比 volatile​ 更强的保证(原子性 + 可见性)。根据 Java 内存模型(JMM)的 happens-before 原则,synchronized​ 块的释放(unlock)happens-before 于后续对同一个锁的获取(lock)。这意味着前一个线程在 synchronized​ 块内做的所有修改,对于下一个获取到锁的线程来说都是可见的。

因此,在这个特定的实现里,volatile​ 对 head​, tail​, size​ 来说不是绝对必要的。但是,加上 volatile​ 体现了对多线程共享变量可见性问题的意识。如果这些变量在 synchronized​ 块之外被读取(例如,你添加了一个没有同步的 getSize()​ 方法),那么 volatile​ 就变得至关重要了。

完整的示例代码与测试

下面是整合了以上所有概念的完整 BlockingQueue​ 代码,以及一个简单的生产者-消费者测试:

import java.util.concurrent.TimeUnit;public class BlockingQueue {private int[] items;volatile private int head = 0;volatile private int tail = 0;volatile private int size = 0;private final int capacity; // 容量最好是 final 的public BlockingQueue(int capacity){if (capacity <= 0) {throw new IllegalArgumentException("Capacity must be positive");}this.items = new int[capacity];this.capacity = capacity;}/*** 手撕阻塞队列——入队* @param elem* @throws InterruptedException*/public void put(int elem) throws InterruptedException {synchronized (this){while(size >= capacity){ // 使用 capacity// 队列满了,采取等待策略System.out.println("队列已满,生产者 " + Thread.currentThread().getName() + " 进入等待...");this.wait();System.out.println("生产者 " + Thread.currentThread().getName() + " 被唤醒...");}// 队列没满:添加元素 (已处理循环逻辑,tail<capacity 恒成立)items[tail] = elem;tail = (tail + 1) % capacity; // 显式使用取模更清晰size++;System.out.println(Thread.currentThread().getName() + " 生产元素: " + elem + ", 当前大小: " + size);// 成功入队,唤醒可能在等待的消费者this.notify(); // 唤醒一个即可}}/*** 出队列* @return* @throws InterruptedException*/public int take() throws InterruptedException {synchronized (this){while (size == 0){//队列空System.out.println("队列为空,消费者 " + Thread.currentThread().getName() + " 进入等待...");this.wait();System.out.println("消费者 " + Thread.currentThread().getName() + " 被唤醒...");}// 队列非空 (已处理循环逻辑,head<capacity 恒成立)int elem = items[head];head = (head + 1) % capacity; // 显式使用取模更清晰size--;System.out.println(Thread.currentThread().getName() + " 消费元素: " + elem + ", 当前大小: " + size);// 成功出队,唤醒可能在等待的生产者this.notify(); // 唤醒一个即可return elem;}}// 可选:获取当前大小的方法 (也需要同步)public int getSize() {synchronized (this) {return size;}}public static void main(String[] args) {BlockingQueue queue = new BlockingQueue(5); // 容量设为 5//生产者线程Thread producer = new Thread(()->{int count = 0;while (true){try {queue.put(count);count++;// 稍微减慢生产速度,方便观察TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 正确处理中断System.out.println("生产者被中断");break;}}}, "生产者-1");//消费者线程Thread consumer = new Thread(()->{while (true){try {int elem = queue.take();// 稍微减慢消费速度TimeUnit.MILLISECONDS.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 正确处理中断System.out.println("消费者被中断");break;}}}, "消费者-1");producer.start();consumer.start();// (可选) 让主线程等待一段时间后停止生产者和消费者try {TimeUnit.SECONDS.sleep(10);producer.interrupt();consumer.interrupt();producer.join();consumer.join();System.out.println("程序结束,最终队列大小: " + queue.getSize());} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

运行 main​ 方法,你将看到生产者和消费者线程交替执行,并在队列满或空时正确地等待和唤醒。

讨论与改进空间

我们自制的 BlockingQueue​ 是一个很棒的学习工具,但与 java.util.concurrent​ 包中的实现相比,它还有一些局限和可以思考的地方:

  1. ​notify()​ vs notifyAll()​:

    • 我们使用了 notify()​。在只有一个生产者和一个消费者的情况下,这通常没问题。

    • 但如果有多个生产者和多个消费者,notify()​ 可能导致信号丢失或死锁。例如,一个生产者 notify()​ 之后,唤醒的可能是另一个正在等待的生产者(因为它也需要检查 size < capacity​),而不是等待队列非空的消费者。同样,消费者 notify()​ 可能唤醒另一个消费者。

      • 信号丢失详解
    • 在这种多对多场景下,使用 notifyAll()​ 通常更安全。它会唤醒所有等待的线程,让它们各自重新检查条件 (while​ 循环的重要性再次体现)。虽然效率可能略低(唤醒了不该唤醒的线程),但它避免了死锁的风险。

  2. 公平性 (Fairness):

    • 我们的实现是非公平的。当调用 notify()​ 或 notifyAll()​ 时,哪个等待的线程被唤醒并能获取到锁是随机的(取决于 JVM 的线程调度)。
    • ​java.util.concurrent.ArrayBlockingQueue​ 构造函数可以接受一个 fair​ 参数,设置为 true​ 时,会倾向于唤醒等待时间最长的线程,实现公平性,但这通常会带来性能开销。
  3. 性能:

    • ​java.util.concurrent​ 包下的类通常使用更底层的 Lock​ 和 Condition​ 接口(例如 ReentrantLock​ 及其 newCondition()​ 方法)。
    • ​Condition​ 接口允许我们为“队列满”(生产者等待)和“队列空”(消费者等待)创建不同的条件变量 (Condition Object)。这样,生产者调用 conditionFull.await()​,消费者调用 conditionEmpty.await()​。生产者入队后只需调用 conditionEmpty.signal()​ (或 signalAll()​),精确唤醒等待非空的消费者;消费者出队后只需调用 conditionFull.signal()​ (或 signalAll()​),精确唤醒等待空间的生产者。这比使用 this.notifyAll()​ 唤醒所有线程(包括同类等待者)通常更高效。
    • 此外,标准库的实现还可能利用了 CAS(Compare-and-Swap)等更高级的无锁或低锁技术来进一步优化性能。
  4. 中断处理:

    • 我们的代码捕获了 InterruptedException​,并在 catch​ 块中重新设置了中断标志 (Thread.currentThread().interrupt()​) 并退出循环,这是处理中断的良好实践。阻塞方法(如 wait()​, sleep()​, join()​, 以及 BlockingQueue​ 的 put​/take​)都应该响应中断。

结论

通过亲手实现一个简单的 BlockingQueue​,我们不仅实践了 Java 并发编程的基础知识,如 synchronized​, wait()​, notify()​, volatile​,还更深刻地理解了阻塞队列的工作机制、线程间的协作方式以及潜在的并发问题(如虚假唤醒、信号丢失)。

尽管这个手写版本对于学习非常有益,但在实际的生产项目中,强烈推荐优先使用 java.util.concurrent​ 包提供的成熟实现。它们功能更完善、性能经过深度优化,并且经历了广泛的测试,能为我们构建稳定、高效的并发应用程序提供坚实的保障。

http://www.xdnf.cn/news/3642.html

相关文章:

  • 【AI提示词】系统分析员
  • Redis怎么避免热点数据问题
  • 软件第三方测试:关键部分、意义、流程及方法全解析?
  • 轻量级在线Excel预览工具
  • PyTorch、Flash-Attn、Transformers与Triton技术全景解析+环境包
  • 第 13 届蓝桥杯 C++ 青少组省赛中 / 高级组 2022 年真题
  • Python全流程开发实战:基于IMAP协议安全下载个人Gmail邮箱内所有PDF附件
  • SQL语句练习 自学SQL网 在查询中使用表达式 统计
  • 组件通信-mitt
  • 数据结构之哈夫曼树
  • 【Hive入门】Hive性能调优之Join优化:深入解析MapJoin与Sort-Merge Join策略
  • 安装深度环境anaconda+cuda+cudnn+pycharm+qt+MVS
  • python 桌面程序开发简述及示例
  • 玩转Docker(一):基本概念
  • 觅知解析计费系统重构版在线支付卡密充值多解析接口免授权无后门源码扶风二开
  • Git 完整教程:初学者分步指南
  • 网工_IP协议
  • 前端面经-VUE3篇--vue3基础知识(一)插值表达式、ref、reactive
  • 2000-2020年全国各地级市资本存量测算数据(以2000年为基期)(含原始数据+计算过程+结果)
  • ASP.NET MVC​ 入门与提高指南七
  • 性能测试工具篇
  • 龙虎榜——20250430
  • 雅思写作--70个高频表达
  • CloudCompare中CCCoreLib模块内容
  • 数字智慧方案5981丨智慧农业解决方案(55页PPT)(文末有下载方式)
  • 机箱结构的EMC设计
  • 数字智慧方案6157丨智慧医疗建设方案(85页PPT)(文末有下载方式)
  • 协议(消息)配置
  • 【数据结构与算法】位图 布隆过滤器 海量数据问题处理 哈希切分
  • AdaBoost算法详解:原理、实现与应用指南