11. 多线程(8) --- 案例:阻塞式队列
文章目录
- 前言
- 1. 阻塞队列是什么
- 2.生产者消费者模型
- 3. 标准库中的阻塞队列
- 4. 生产者-消费者模型
- 5. 构建出一个阻塞队列
前言
上一个博客中,我们讲解了单例模式中的多线程安全的写法,这次我们学习阻塞队列。
1. 阻塞队列是什么
我们在数据结构中已经学习过两种队列,一个是队列,另一个是优先级队列。
而阻塞队列,是一种更加复杂的队列,需要去满足一下特征:
- 满足线程安全。
- 满足阻塞的特性
a. 队列为空,在尝试出队列,出队列操作就会阻塞,阻塞到其他线程向队列添加元素为止。
b. 队列为慢,在尝试入队列,入队列操作也会阻塞,阻塞到其他线程取走元素为止。
阻塞队列的一个典型应用场景就是 “生产者消费者模型”。这是一种非常典型的开发模型。
2.生产者消费者模型
生产者消费者模型就是通过一个容器来解决生产者和消费者钱耦合的问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生成完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。
例如:
A B C 三个一起包饺子。可以有两种情况:
- A B C,三个人每个人都分别擀个饺子皮,然后包一个饺子。
此时我们发现,三个人同时会竞争同一个资源 — 擀面杖。并且这种方法效率低下。 - A负责擀饺子皮,B和C负责包饺子。
此时资源 —> 饺子皮,A —> 生产者,BC --> 消费者,盖帘—>交易场所。
此时这里的交易场所,就是一个阻塞队列
阻塞产生的原因,是"极端情况"下生产者和消费者之间速度不协调的时候。
优点:
- 阻塞队列可以让生产者和消费者之间 解耦。
有两个服务器A和B,两个服务器可以直接进行请求和响应操作。
现在是 A 直接访问B,这样 A 和 B的耦合就很高。
编写 A 的代码的时候,多多少少会有一些和B相关的逻辑,同理编写 B 代码的时候,也会有一些 A 相关的逻辑。
A 或者 B 的内容稍加修改,此时对应的B或者A也会随之进行更改
现在在A和B服务器中间,添加一个队列。
A和队列交互,B和队列交互,这样A 和 B 不再直接进行交互。
A的代码中看不到B,B的代码中也看不到A,A的代码中和B的代码只能看到各自跟队列交互的部分。
本来是A和B耦合,现在变成了A和队列耦合,B和队列耦合。
降低耦合,是为了让后续修改的时候,成本降低,队列一般不修改。
因为阻塞队列太重要了,甚至会把队列单独部署到一个服务器中,此时的阻塞队列称为"消息队列",消息队列的服务器,里面可不止是一个队列,可以有N个队列。
- 阻塞队列相当于一个缓冲区,平衡了生产者和消费者的处理能力。
两个服务器进行请求和响应的时候,传递的的频率是一个波形,你可以想象成正弦函数,但是么只是想象,有助于理解。波形中有波峰和波谷。当A服务器遇到了一定流量的激增,那么A的每个请求都转发到B服务器,B也会承担一样的压力,很容易就挂了。
为什么服务器会挂呢?
服务器处理每个请求的时候,都是需要消耗一定的硬件资源的,包括但不局限于 CPU,内存,硬盘,网络带宽等,同时有N个请求呢?消耗量 * n,一旦消耗的总量,超出了机器硬件资源的上限,此时,对应的进程就可能会崩溃或者操作系统产生卡顿,就挂了。
所以说,A这种上游的服务器,尤其是入口服务器,干的话少、简单,单个请求消耗的资源数少,像B这种下游的服务器,通常承担更重的任务量,复杂的计算 / 存储工作,单个请求的资源数就更多了,日常工作中,确实是会给B这样角色的服务器分配更好的机器,即使如此,也能很难确保B承担的访问量能够比A更高。
因此,我们可以在A和B服务器中间添加一个队列服务器,只做存储和转发,这样B服务器就可以不关系队列中的数据量多少,就按照自己的节奏,慢慢处理队列中的请求数据即可,这样就可以做到达到峰值,B可以继续消费数据,利用波谷的时间,就赶紧消费之前挤压的数据。
任何事物都是有双面性的,生产者 - 消费者模型也会付出代价:
(1) 引入队列之后,整体的结构会更复杂。此时就需要更多的机器,进行部署。生产环境的结构会更加复杂,管理起来更加麻烦
(2) 效率也会受到影响
3. 标准库中的阻塞队列
在Java标准库中内置了阻塞队列。如果我们需要在一些程序中使用阻塞队列,直接使用标准库中的即可。
- BlockingQueue 是一个接口。真正实现的类是 LinkedBlockingQueue.
- put 方法用于阻塞式的入队列,take 用于阻塞式的出队列
- BlockingQueue 也有 offer, poll, peek 等方法,但是这些方法不带有阻塞特性。
public class Demo37 {public static void main(String[] args) throws InterruptedException {// 只进行拿出操作BlockingDeque<String> queue = new LinkedBlockingDeque<>();String elem = queue.take();System.out.println(elem);}
}
main线程现在一直等待着元素进入队列,会一直阻塞等待。我们打开jconsole.exe程序,观察效果。
如果使用 peek 或者 poll则没有这个效果。
public class Demo37 {public static void main(String[] args) throws InterruptedException {// 只进行拿出操作BlockingDeque<String> queue = new LinkedBlockingDeque<>();String elem = queue.peek();System.out.println(elem);}
}
如果只有进队列操作,就不会出现阻塞问题。
public class Demo37 {public static void main(String[] args) throws InterruptedException {BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();for (int i = 0;i<100;i++){queue.put(10);}System.out.println("向队列中填充 100个10");queue.put(10);System.out.println("在填充 1 个 10");}
}
这个就是BlockingQueue的基本使用。
4. 生产者-消费者模型
我们根据1,2的思路和3的代码,使用BlockingQueue来进行编写生产者-消费者模型。
public class Demo38 {public static void main(String[] args) {BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();// 生产者Thread producer = new Thread(()->{int n = 0;while (true){try {queue.put(n);System.out.println("生产元素: "+n);n++;Thread.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"producer");// 消费者Thread customer = new Thread(()->{while (true){try {int n = queue.take();System.out.println("消费元素:"+n);Thread.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"customer");producer.start();customer.start();}
}
5. 构建出一个阻塞队列
- 通过 “循环队列” 的方式来实现
- 使用 synchronized 进行加锁控制
- put 插入元素的时候,判定如果队列满了,就进行 wait. (注意,要在循环中进行wait,被唤醒时不一定队列就不满了,因为同时可能是唤醒对个线程)
- take 取出元素的时候,判定如果队列为空,就进行 wait (也是循环wait)
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;/*** @Author: XXHH* @CreateTime: 2025-04-27*/
class MyBlockingQueue{private String[] data;// 用来记录当前队列中的元素个数private int size;private int head;private int tail;public MyBlockingQueue(int capacity){data = new String[capacity];}// 入队列public void put(String elem) throws InterruptedException {synchronized (this){while (size > data.length){this.wait();}data[tail] = elem;tail++;if (tail == data.length) tail = 0;size++;this.notify();}}// 出队列public String take(){// 如果队列为0,那么就阻塞synchronized (this){while (size == 0) {try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}String elem = data[head];head++;if (head == data.length) head = 0;size--;this.notify();return elem;}}
}public class Demo39 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(20);// 生产者Thread producer = new Thread(()->{int n = 0;while (true){try {queue.put(n+" ");System.out.println("生产元素: "+n);n++;} catch (InterruptedException e) {throw new RuntimeException(e);}}},"producer");// 消费者Thread customer = new Thread(()->{while (true){String n = queue.take();System.out.println("消费元素"+n);}},"customer");producer.start();customer.start();}
}
我们在讲解队列的时候,其中循环队列我们使用的是
rear = (rear + 1) % elem.length;
按照上面的写法是,
head = (head + 1) % data.length;
下面这两种写法有什么不同呢?
if (head == data.length) head = 0; 和
head = (head + 1) % data.length;
一个是代码的可读性,另外一个就是效率问题。
第一个,写成if条件语句,毋庸置疑,就知道当 head 或者 tail 数组越界,直接回到起点;
第二个,% 是 求余数,就要先进行除法,然后在得到余数,加法和减法都是可以通过位运算符组合得到,但是乘法和除法可不是(除非是针对2的N次方进行乘除,可以优化为移位运算),所以它的效率会比 if 判断要低。
因此我们在写代码,需要关注两点:
- 开发效率是否有影响。包括但不局限:是否容易理解,是否容易修改。
- 运行效率:代码是否高效。但是Java程序员并不关注这些。
完