当前位置: 首页 > news >正文

阻塞队列:线程安全与生产者消费者模型解析

一、阻塞队列

阻塞队列就是基于普通队列做出扩展

1.线程安全的

如果针对一个已经满了的队列进行入队列,此时入队列操作就会阻塞,一直阻塞到队列不满(其他线程出队列元素)之后

如果针对一个已经空了的队列进行出队列,此时出队列操作就会阻塞,一直阻塞到队列不空(其他线程入队列元素)之后

包饺子

A和B和C在过年期间要包饺子,为了快一点三个人决定一起包饺子

但包饺子得先擀饺子皮,现在就只有一个擀面杖,所以A擀饺子皮是BC就陷入了阻塞,显然这就会出现竞争擀面杖问题(锁竞争)。

我们加入了一个桌子(阻塞队列),A负责擀饺子皮,BC通过桌子来包饺子,

如果A干的特别快,BC赶不上他的速度,很快桌子就会被放满(队列满了),这下A就会停下来(阻塞)

如果BC干的特别快,A赶不上他俩的速度,桌子为空(队列为空),BC就得等待(阻塞)

 生产者消费者模型

1.引入生产者消费者模型,就可以更好的做到“解耦合”

     上述过程中,A和B,A和C的耦合性较强,如果B或者C挂了,对于A的影响是很大的

加了阻塞队列A和B,C几乎几乎就没有交集了,B,C就算是挂了,对于A的影响也是微乎其微

阻塞队列(消息队列)

1.上述的阻塞队列,是基于这个数据结构实现的服务器程序,又被部署到单独的主机上
2.整个系统的结构更复杂了,你要维护的服务器更多了

3.效率,请求从A发过来B受到,这个过程经过转发要有一定开销

2.削峰填谷

当外网的请求数量就像洪水一样,急剧增加,这时A的请求数量就会增加很多,即使工作一般比较简单,每个请求消耗的资源少,但架不住多。B和C完成的工作更复杂,消耗的资源更多,这就可能导致BC挂了。

我们加入了阻塞队列之后

队列就像抗洪用的水库,只是用来存储数据的,消耗资源很少,抗压能力很强,在数据多的时候,存下来保持稳定的速度,少的是否就会放出数据,同样保持稳定请求速度,需要注意的是这个队列只是防止BC挂掉了。

Java标准库提供了现成的阻塞队列数据结构

 使用put 和 offer一样都是入队列

但是put是带有阻塞功能,offer 没带阻塞(队列满了会返回结果)

take 方法用来出队列,同样带有阻塞功能

阻塞队列没有提供带有阻塞功能的获取队首元素的方法     

尝试实现一个阻塞队列

既然涉及到了出队列,那数据结构就得用到循环队列,保证空间利用率

class MyBlockingQueue{private String[] elems = null;private int head = 0;private int tail = 0;private int size = 0;//准备锁对象,使用this也可以private Object locker = new Object();public MyBlockingQueue(int capacity){elems= new String[capacity];}public void put(String elem) throws InterruptedException {while(size >= elems.length){}//新的元素要放到tail指向的位置上elems[tail] = elem;tail++;if(tail >= elems.length){tail = 0;}size++;}public String take() throws InterruptedException {String elem = null;while(size == 0){}//取出 head 位置的元素返回elem = elems[head];head++;if(head >= elems.length){head = 0;}size--;return elem;}}
现在我们就相当于实现了一个循环队列,我们需要加上阻塞效果,我们知道只有当队列满和队列空才会阻塞,而解除阻塞则需要队列中有一个元素出去和队列中有一个元素进来。

所以我们可以在put结束时加上notify,并在判断队列满了的时候进行阻塞,至于在判断条件加上while循环,是为了防止在多线程情况下被不小心解锁,毕竟,每次put,take之后都对解一次锁,为了保证代码的原子性,我们为整个方法都加上锁。试想一下如果没有这么加锁,就会引发数组越界等一系列问题。

 take方法也是同理

至此我们可以来验证一下阻塞队列是否成功,我们创建了两个线程分别是t1,t2,t1线程为生产元素的线程并且要比t2线程消费元素快的多。

Thread t1 = new Thread(() -> {int n = 1;while(true){try {queue.put(n + "");System.out.println("生产元素" + n);n++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});//消费者Thread t2 = new Thread(() -> {while (true){try {String n = queue.take();System.out.println("消费元素" + n);Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t2.start();t1.start();

 

我们发现在一开始运行时,t1线程十分快都到了一千多了,但t2线程才到2,这就导致了t1线程要想继续添加元素,就得等t2线程,这就出现了消费一个元素,就生产一个元素。

 定时器

public class ThreadDemo30 {public static void main(String[] args) {Timer timer = new Timer();timer.schedule(new TimerTask() {@Overridepublic void run() {//时间到了之后要执行的代码System.out.println("hello timer 3000");}},3000);timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("hello timer 2000");}}, 2000);timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("hello timer 1000");}},1000);System.out.println("hello main");}
}

我们发现进程并没有结束,Timer 里内置了前台线程

实现一个简单timer

 要想实现定时器,得先剖析它,怎么剖析呢,我们发现定时器就像一个容器一样将线程存储起来,等线程的时间到了就调度,时间短的限制性,这个容器我们就可以用

优先级队列

而优先级队列存储的是啥呢?优先级队列比较器基准是啥呢?

 存储的这个元素得有时间和进程,这时没问题的,我们主要的是根据时间来做文章,首先便是初始化问题,时间得是相对时间所以我们发现

在构造方法上我们是现在的时间加上等待的时间、

其次就是比较器

我们比较的基准是时间,时间少的往前走反之往后走,

比较的顺序往往是试出来的,而不是去猜出来的

回到优先级队列那里,我们要有一个方法创建任务对象,并且将任务放入队列中 

接下来是进行任务的时刻

我们将代码放到构造方法中

我们得有一个线程来做遍历队列中任务的工作

我们之所以加上了锁

是因为防止线程安全问题出现,试着想一下没有锁会咋样,可能队列中都没有元素了它还进行访问,可能队列中的这个元素都出去来,还去执行

为了保持原子性还是得加上锁的,在漫长的等待后,队列进入元素,locker重新获得锁,需要注意的是我们每次去往队列添加一个元素都会解锁。之后我们得获取当前时间和任务规定的时间进行计较,如果没达到约定时间就要进行等待。

至此我们就完成了定时器的设计了,我们发现做完后好像云里雾里啊,我们来重新画图梳理

 首先

因为定时是时间越短越先执行,所以我们用优先级队列来存储线程

我们存储的元素不仅要有线程还要有时间,并且还要有比较器

我们元素构建好了得把元素放到队列中就衍生出了schdule方法,它负责创建对象,并将对象放入队列

接下来我们就要执行进程,我们选在了构造方法里

一进来就先加锁,防止出现线程安全问题,然后判断是否为空,如果为空就要等待,等schdule方法放入对象,并notify。

然后比较当前时间和任务预期执行时间,没到预期时间就去等待相应时间、

到达时间出队列,并且运行线程。

结束!!!

http://www.xdnf.cn/news/529597.html

相关文章:

  • nginx 流量控制
  • map与set封装
  • Web安全基础
  • 十三、面向对象底层逻辑-Dubbo序列化Serialization接口
  • MacBook连接不上星巴克Wi-Fi的解决方法
  • 《Effective Python》第三章 循环和迭代器——在遍历参数时保持防御性
  • 江协科技EXTI外部中断hal库实现
  • 需求频繁变更?AI 驱动的自动化解决方案实践
  • 企业销售管理痛点解析与数字化解决方案
  • Unity 如何使用Timeline预览、播放特效
  • 第十六届蓝桥杯复盘
  • C#中的ThreadStart委托
  • 软件架构风格系列(7):闭环控制架构
  • 基于不透光法的柴油机排放精准监测
  • Android13 以太网(YT8531)
  • 【JavaScript】用 Proxy 拦截对象属性
  • Xshell实战:远程连接VMware CentOS7虚拟机与高效运维指南——从零配置到自动化操作,解锁Xshell的核心价值
  • Bootstrap 5 容器与网格系统详解
  • 项目删除了,为什么vscode中的git还是存在未提交记录,应该怎么删除掉
  • vue3个生命周期解析,及setup
  • 遨游科普:三防平板是什么?有什么作用?
  • 线光谱共焦传感器:复杂材质检测
  • MCU 温度采样理论(-ADC Temperature sensor)
  • 用户账号及权限管理:企业安全的基石与艺术
  • python训练营day29
  • CAN总线采样点不一致的危害
  • 26、DAPO论文笔记(解耦剪辑与动态采样策略优化,GRPO的改进)
  • 计算机网络(2)——应用层(上)
  • Spring_Boot(一)Hello spring boot!
  • 2025年- H32-Lc140 --21. 合并两个有序链表--Java版