当前位置: 首页 > ops >正文

深入了解阻塞队列的实现

目录

1、介绍

2、特性

2.1、线程安全

2.2、阻塞行为

3、实现方式

3.1、基于数组的实现

3.2、基于链表的实现

4、分类

4.1.ArrayBlockingQueue(有界)

4.2.LinkedBlockingQueue(可选界限)

4.3.PriorityBlockingQueue(无界)

4.4.DelayQueue(无界)

5、线程阻塞和唤醒

5.1. 阻塞状态

5.2. 监视器锁

5.3. 唤醒


前言

        阻塞队列是一种特殊的队列,同样遵循“先进先出”的原则,支持入队操作和出队操作。在此基础上,阻塞队列会在队列已满或队列为空时陷入阻塞,使其成为一个线程安全的数据结构。

如下图所示:


1、介绍

        阻塞队列(Blocking Queue)是一种支持在队列为空时进行等待获取元素在队列满时进行等待插入元素的队列数据结构。

        阻塞队列的特性使得多线程程序在处理生产者-消费者模型时更加简单有效。

如下图所示:


2、特性

        阻塞队列是设计用于多线程环境中的,确保多个线程在添加或删除元素时是安全的。内部实现通常会使用锁(如 ReentrantLock)来防止并发问题,如数据破损和不一致。

2.1、线程安全

        阻塞队列在多线程环境下是安全的,多个线程可以并发地进行插入、删除等操作,而不需要额外的同步措施。

如下图所示:

2.2、阻塞行为

        当一个线程试图从空队列中获取元素时,它会被阻塞,直到队列中有元素可用。当一个线程试图向满队列中插入元素时,它也会被阻塞,直到队列有空闲位置。


3、实现方式

根据java的类库,可以看到有以下几种常用的阻塞队列:

由上图可知主要分为有界和无界,数组和链表两种方式来划分。

3.1、基于数组的实现

        采用固定大小的数组来存储队列元素,使用两个指针(front 和 rear)来指示队列的头部和尾部。

        在创建 ArrayBlockingQueue 时,可以指定一个额外的参数,用于设置公平性策略(即 fair 属性)。这个参数控制多个线程对于访问队列的调度方式。

1.fair 参数

ArrayBlockingQueue 的构造函数可以接受一个布尔值 fair,用于指定是否采用公平策略:

ArrayBlockingQueue(int capacity, boolean fair)
  • capacity: 队列的容量(即最大存储元素的数量)。
  • fair: 如果为 true,则采用公平策略;如果为 false,则采用非公平策略。

2.公平与非公平

  1. 公平(fair)

    • 公平策略意味着,对于多个线程加入队列,线程将按照请求的顺序进行访问(即先入先服务)。在多个线程同时等待获取或插入元素时,先请求的线程会得到优先处理。
    • 这可以避免线程在竞争时饥饿(starvation)现象的发生。
  2. 非公平(unfair)

    • 非公平策略允许天生的竞争,线程在访问队列时不一定遵循请求的顺序。
    • 在高并发的情况下,它可能会提供更好的吞吐量,因为它允许等待的线程在队列释放锁时立即获取锁,而不是按照严格的顺序排队。

        尽管这个会造成不公平,但是不影响线程的安全问题。

3.2、基于链表的实现

        使用链表来动态存储元素,可以更灵活地处理队列大小。

        也需要实现 wait 和 notify 来控制线程的阻塞。

总结

   private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes *///队列非空private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts *///队列未满private final Condition notFull = putLock.newCondition();

通过结合使用:

//阻塞 
notFull.await();
//唤醒
notFull.signal();

        两者需要实现 await() 和 singal() 机制来处理线程的阻塞和唤醒。


4、分类

4.1.ArrayBlockingQueue(有界)

        基于数组的有界阻塞队列,创建时必须指定容量。插入和删除操作都是阻塞的。

        内部使用ReentrantLock(可重入锁)( 内部维护一个整数标志 state 来表示当前锁的状态。0 表示锁未被占用,正数表示锁被占用,数值表示当前获得此锁的线程的重入次数)来保证线程安全。

        由一个ReentrantLock在 put() 和 take() 方法中,ArrayBlockingQueue 使用锁来确保同一时间只有一个线程能够访问和修改队列状态。

 public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();}

在构造函数的时候,直接加锁,而非对take和put方法进行单独加锁。

具体想了解ReentrantLock,可参考:java常用的锁_java 常用锁-CSDN博客

代码示例:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;class Producer implements Runnable {private BlockingQueue<Integer> queue;public Producer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {for (int i = 0; i < 10; i++) {System.out.println("Producing: " + i);queue.put(i); // 将元素放入队列,如果队列满则等待Thread.sleep(100); // 模拟生产延迟}// 生产完成后可以添加一个特殊元素表示结束queue.put(-1); // -1 表示结束信号} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 处理中断}}
}class Consumer implements Runnable {private BlockingQueue<Integer> queue;public Consumer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {int value = queue.take(); // 从队列中获取元素,如果队列空则等待if (value == -1) {break; // 如果获取到结束信号,则退出}System.out.println("Consuming: " + value);Thread.sleep(200); // 模拟消费延迟}} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 处理中断}}
}public class ArrayBlockingQueueExample {public static void main(String[] args) {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // 创建一个容量为5的阻塞队列Producer producer = new Producer(queue);Consumer consumer = new Consumer(queue);Thread producerThread = new Thread(producer);Thread consumerThread = new Thread(consumer);producerThread.start();consumerThread.start();try {producerThread.join(); // 等待生产者线程完成consumerThread.join(); // 等待消费者线程完成} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 处理中断}System.out.println("Processing finished.");}
}

4.2.LinkedBlockingQueue(可选界限)

  1. 基于链表的阻塞队列,支持可选的界限,如果没有设定界限,默认的最大容量为 Integer.MAX_VALUE。

  2. 与 ArrayBlockingQueue 相比,具有更好的潜在性能,因为它支持更高的并发性。

同样内部也使用了ReentrantLock,不过put和toke的时候分别加锁。

    private final ReentrantLock putLock = new ReentrantLock();private final ReentrantLock takeLock = new ReentrantLock();

4.3.PriorityBlockingQueue(无界)

默认初始化容量为11。

private static final int DEFAULT_INITIAL_CAPACITY = 11;public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);
}

对于无界的增加数据,可参考下面代码示例:

import java.util.concurrent.PriorityBlockingQueue;public class PriorityBlockingQueueDemo {public static void main(String[] args) {PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();// 添加大量元素,观察扩容情况for (int i = 0; i < 100; i++) {queue.offer(i);System.out.println("Added: " + i);System.out.println("Current Queue Size: " + queue.size());}// 处理优先级队列里的元素while (!queue.isEmpty()) {System.out.println("Processing: " + queue.poll());}}
}

因此:

  1. 支持优先级的阻塞队列,队列中的元素会根据自然排序或指定 comparator 进行排序。

  2. 不支持界限,因此可以无限插入元素。

4.4.DelayQueue(无界)

        一种支持延迟获取元素的阻塞队列,元素会在设定的延迟时间后才可以被获取。

特别适合用于实现各种基于时间的功能,比如定时任务、缓存过期、任务调度等。


5、线程阻塞和唤醒

        在 多线程编程中,使用 wait()notify(), 和 notifyAll() 方法是实现线程间通信和协调的一种传统机制。

        这些方法通常与对象的监视器(monitor)相关联,是实现阻塞和唤醒机制的基础。

5.1. 阻塞状态

        当一个线程试图从空队列中读取元素时(例如,调用 take() 方法),如果该队列为空,线程会被阻塞,从而进入等待状态。

        同样,当一个线程试图向满队列中添加元素时(例如,调用 put() 方法),如果该队列已满,它也会被阻塞。

5.2. 监视器锁

        阻塞队列的实现通常使用一个监视器锁(或称为互斥锁)来保护访问其共享状态(即队列的元素和相关标志)。

        当线程在执行 put() 或 take() 方法时,它会先获取该锁,进行状态检查,然后决定是否需要阻塞。

        一旦当前线程被阻塞(例如,通过 wait() 方法),它会释放锁,允许其他线程进入队列的操作。

5.3. 唤醒

notify():

        唤醒在该对象监视器上等待的一个线程。如果有多个线程在等待,这个方法会随机选择其中一个进行唤醒。

notifyAll():

        唤醒在该对象监视器上等待的所有线程。这一般用于复杂的条件变量设计中,确保所有可能等待的线程都有机会被唤醒。


总结

        在之前的文章介绍数据结构的时候,简单对常用的数据结构进行了介绍,如数组、集合、树、hash、队列:java集合的介绍-CSDN博客

感兴趣的可以了解下哦!

http://www.xdnf.cn/news/4914.html

相关文章:

  • MySQL性能分析工具:SHOW PROCESSLIST
  • Mac电脑远程连接window系统服务器
  • 自定义分区器
  • 车载以太网转USB接口工具选型指南(2025版)
  • C++ stl中的list的相关函数用法
  • 学习黑客搜索技巧
  • Open CASCADE学习|实现裁剪操作
  • keepalived详细笔记
  • 2025 年数维杯数学建模 C 题完整论文代码模型:清明时节雨纷纷,何处踏青不误春
  • .net/C#进程间通信技术方案总结
  • C#黑魔法:鸭子类型(Duck Typing)
  • ChatGPT深度研究功能革新:GitHub直连与强化微调
  • qtcreater配置opencv
  • 对golang中CSP的理解
  • 34.笔记1
  • 【挑战项目】 --- 微服务编程测评系统(在线OJ系统)(二)
  • 多线程面试题总结
  • python 上海新闻爬虫, 上观新闻 + 腾讯新闻
  • C 语言中的 对象(object),值(Value),类型(Type)
  • C++ Lambda表达式应用详解
  • python实现点餐系统
  • MCP专题| 突破LLM三大瓶颈!模型上下文协议(MCP)如何重塑AI交互体验?
  • 高可用系统架构演进史——从单体节点到分布式系统的继承权治理方案
  • 【网安播报】Meta 推出 LlamaFirewall开源框架以阻止 AI 越狱、注入和不安全代码
  • 录播课收入增长四维模型与执行方案
  • 一种安全不泄漏、高效、免费的自动化脚本平台
  • 初识C++:入门基础(二)
  • POSE识别 神经网络
  • STM32--PWM--函数
  • 股票行情实时数据:港股、美股、沪深A股行情数据的具体细分内容介绍在哪里可以获取到便宜的股票实时行情?