【Java ee初阶】多线程(6)
一、阻塞队列
队列的原则:“先进先出”,队列分为普通队列,优先级队列等等。在数据结构中,堆是特殊的完全二叉树,一定不要把堆和二叉搜索树混淆。
阻塞队列是一种特殊的队列,也遵循“先进先出”的原则。
阻塞队列的特点:1.线程安全
2.带有阻塞功能
(1)如果队列为空,尝试出队列,就会触发阻塞,直到队列不空
(2)如果队列满了,尝试入队列,也会触发阻塞,直到队列不满
*生产者消费模型
引入生产者消费模型,主要目的是为了减少“锁竞争”,生产者和消费者的步调,不一定完全一致。出现不一样的时候阻塞队列可以起到"协调"的作用。
阻塞队列,用于协调多个线程之间的工作。
生产者消费模型,是后端开发中,常见的编程手段。
*什么是耦合
目前,大多数公司采用的是分布式系统。“服务器开发”,开发的不是一个服务器程序,通常是一组服务器程序,一台机器搞不定,需要多台机器。
例如:
此时A B C之间是直接调用的关系,因此他们之间的耦合关系就比较大
如果C这个模块修改了,B这个模块就可能也要配合修改
如果要增加一个D,那么也要针对B进行需修改
于是,我们可以在B和C之间加上一个阻塞队列(消息队列MQ)
这样子,如果C产生变动,对于B的影响就非常小了。如果再增加D,那么对于B影响很小。
但是,这样子做也是有代价的,首先,系统结构就更加复杂了,而且网络通信的效率会变得更低。
优点如下:
1、减少资源竞争,提高效率
2、可以更好地做到模块之间的解耦合
3、削峰填谷
A收到多大的压力,此时BCD收到的压力是相同的(此处的压力指的是每秒钟需要处理的请求数目),一旦BCD中某个机器顶不住了,此时整个系统可能就崩溃了
此时,A收到的压力,只是传递给B。而CD则被MQ保护起来了。每个服务器完成的功能不同,有的服务器处理一个请求,消耗的资源更多,有的就更少。
这种单个请求消耗资源较多的服务器,就更容易挂掉(MySQL服务器,就属于是比较脆弱的,容易挂掉的)
一个服务器为什么会挂?因为一个服务器每次处理一个请求,都是要消耗一定的硬件资源的,包括但不限于cpu,内存,硬盘,网络宽带
一个机器,能够提供的硬件资源,是有上限的,同一时刻,如果请求太多了,消耗的总资源超出机器能够提供的资源的上限,那么整个时候机器就会挂机了(系统也就无法进行访问了)
生产者消费者模型:
优点:
1.减少资源竞争,提升效率
2.降低耦合
3.削峰填谷
缺点:
1.系统更加复杂
2.引入队列的层数太多,就会增加网络开销
二、BlockingQueue
针对BlockingQueue,offer / add 等方法,是不带有“阻塞功能”的。
多个线程交互
三、实现阻塞队列
package Thread;
//自己基于以前学过的知识,写一个阻塞队列class MyBlockingQueue{ // 阻塞队列。private String[] array = null;private int size = 0; // 队列的大小。private int head = 0; // 队列的头指针。private int tail = 0; // 队列的尾指针private Object locker = new Object(); // 锁对象,用于保证线程安全。public MyBlockingQueue(int capacity){ // 构造方法,初始化队列的大小。array = new String[capacity]; // 队列的数组。}public void put(String element) throws InterruptedException { synchronized(locker){// 入队列。if(size >= array.length){locker.wait(); // 队列已满,等待。}array[tail] = element; // 入队列。tail++; // 尾指针加1。if(tail >= array.length){ // 如果尾指针超过了数组的长度,就从头开始。tail = 0; // 尾指针回到0。}size++; // 队列的大小加1。locker.notify(); // 唤醒等待的线程。}}public String take() throws InterruptedException { // 出队列。synchronized(locker){ // 出队列。if (size==0) {locker.wait(); // 队列已空,等待。}String element = array[head]; // 出队列。head++; // 头指针加1。if(head >= array.length){ // 如果头指针超过了数组的长度,就从头开始。head = 0; // 头指针回到0。}size--; // 队列的大小减1。locker.notify(); // 唤醒等待的线程。return element; // 返回出队列的元素。}
}
}
public class demo37 {public static void main(String[] args) {MyBlockingQueue myBlockingQueue = new MyBlockingQueue(1000); // 创建一个阻塞队列。Thread producer = new Thread(() -> { // 创建一个生产者线程。int count = 0; // 计数器,用于记录生产的数量。try { // 捕获异常。while (true) { // 无限循环,一直生产。myBlockingQueue.put(" " + count); // 入队列。System.out.println("生产了一个元素:" + count); // 打印出生产的元素。count++; // 计数器加1。Thread.sleep(1000); // 生产者休眠 0.5 秒。} }catch (Exception e) {e.printStackTrace(); // 打印异常信息。}
});Thread consumer = new Thread(() -> { // 创建一个消费者线程。try { // 捕获异常。while (true) { // 无限循环,一直消费。String take = myBlockingQueue.take(); // 出队列。System.out.println("消耗了一个元素:" + take); // 打印出队列的元素。Thread.sleep(1000); // 消费者休眠 1.5 秒。}}catch (Exception e) {e.printStackTrace(); // 打印异常信息。}});producer.start(); // 启动生产者线程。consumer.start(); // 启动消费者线程。try { // 捕获异常。producer.join(); // 等待消费者线程结束。consumer.join(); // 等待生产者线程结束。} catch (InterruptedException e) { // 捕获异常。e.printStackTrace(); // 打印异常信息。}}
}
最终输出:
官方文档上建议用while来写
为什么此处要用while而不是if呢?
因为,进入wait之前,当然要判定一次条件,写作while当wait被唤醒之后,还需要再次判定一次条件。
正常来说,肯定得是条件被打破了才能唤醒,此处条件是size>=length,必然是有其他线程take,size就会<length,触发notify
如果是其他代码,不排除出现,唤醒之后条件仍然成立的可能性。
确定好条件确实是不成立了,然后再继续往下走,相当于“二次确认”的效果。