当前位置: 首页 > news >正文

并发编程之并发协同工具类

并发编程之并发协同工具类

认识并发协同

什么是并发协同

多个线程并发,协作来完成一件事情的过程中,因事情处理的需要,要控制某些线程阻塞,等待另一些线程完成某部分事情,再继续执行的过程。

并发协同原理:等待(阻塞)-> 通知(唤醒)

分析并发系统问题的常用思路

  1. 并发的是什么?
  2. 在什么地方需要协同?
  3. 谁该等待?谁该通知?

并发协同的实现

多线程并发协同都是基于“条件等待-通知”模式:

  • 方式一:传统的基于synchronized及Object的wait、notify、notifyAll监视器方法的方式;
  • 方式二:基于JUC包的LockSupport的park、uppark方法的等待-通知方式;
  • 方式三:基于JUC包的Lock及Condition的await、singal方法的等待-通知方式;
  • 方式四:基于JUC包提供的并发协同工具类API,来非常方便地实现多线程并发协同;

并发协同的利器

  • CountDownLatch :倒计数锁存器
  • CyclicBarrier : 循环屏障
  • Phaser :阶段协同器/相位器
  • Semaphore : 计数信息量

并发协同利器使用

CountDownLatch

介绍

CountDownLatch被称为倒计数锁存器,有一个构造器,传入一个参数,指定倒计数的大小,当指定的倒计数不为0时,则线程阻塞等待,当指定的倒计数为0时,执行下一步操作;

image-20220129163822709

每个CountDownLatch对象,只可使用一次,计数变为0后,就不可再用了

方法
  • 构造 方法: CountDownLatch(int count)

count 指定等待的条件数(操作数、任务数),不可再更改。

  • 等待方法:await()

await()阻塞等待线程直到条件都满足(count等待条件计数减少到0)。

如果count已是0,则不会阻塞,继续执行。

  • 条件完成减计数方法: countDown()

每一条件在完成时,都调用countDown()来对count计数减一。

  • boolean await(long timeout, TimeUnit unit)

阻塞等待最多多长时间。返回true表示等待条件到达;false表示条件未

到达,但时间到了。

  • long getCount()

获取当前计数值。该方法常用于调试或测试。

使用场景
  1. 统计线程执行的情况,比如统计子线程总计花费的时长;
  2. 压力测试中,使用CountDownLatch实现最大程度的并发处理;
  3. 多线程之间,互相通信,比如线程异步调用完接口,结果通知;
实例

示例一:

指定倒计数的大小,当指定的倒计数不为0时,则线程阻塞等待,当指定的倒计数为0时,执行下一步操作

  public static void main(String[] args) {test1();//new CountDownLatch(1) 当传入的参数大于0,阻塞等待test2();//new CountDownLatch(0),打印输入:接着执行任务}
public static void test1(){CountDownLatch latch = new CountDownLatch(1);try {latch.await();System.out.println("接着执行任务");} catch (InterruptedException e) {e.printStackTrace();}}public static void test2(){CountDownLatch latch = new CountDownLatch(0);try {latch.await();System.out.println("接着执行任务");} catch (InterruptedException e) {e.printStackTrace();}}

示例二:并发协同,多线程之间,互相通信

  /*** 多个线程协作完成一件大的事情  CountDownLatch实现* 1.教官吹响集结号,等待士兵集合* 2.士兵开始起床准备,集合等待教官命令* 3.教官下达命令,等待士兵完成任务* 4.士兵完成*/
public class CountDownLatchDemo {public static void main(String[] args) {CountDownLatchDemo demo = new CountDownLatchDemo();demo.task();}public void task(){//准备教官等待士兵的CountDownLatchCountDownLatch startCountDownLatch = new CountDownLatch(1);//准备士兵等待教官的CountDownLatchCountDownLatch doneCountDownLatch = new CountDownLatch(10);//教官吹响集结号,等待士兵集合System.out.println(Thread.currentThread().getName()+"> 教官吹响集结号,等待士兵集合......");for(int i =0 ;i<10 ; i++){new Thread(new Worker(startCountDownLatch,doneCountDownLatch,i)).start();}try {Thread.sleep(1000 * 3);startCountDownLatch.countDown();System.out.println(Thread.currentThread().getName()+"> 教官开始下达命令,等待士兵完成任务");doneCountDownLatch.await();System.out.println(Thread.currentThread().getName()+"> 教官开始验收任务完成情况");} catch (InterruptedException e) {e.printStackTrace();}}}
class Worker implements Runnable{private CountDownLatch startCountDownLatch;private CountDownLatch doneCountDownLatch;private int i;public Worker(CountDownLatch startCountDownLatch, CountDownLatch doneCountDownLatch, int i) {this.startCountDownLatch = startCountDownLatch;this.doneCountDownLatch = doneCountDownLatch;this.i = i;}public void run() {System.out.println(Thread.currentThread().getName()+"> 士兵: "+this.i+" 听到集结号,开始起床,跑向操场,等待命令......");try {startCountDownLatch.await();System.out.println(Thread.currentThread().getName()+"> 士兵: "+this.i+" 按照指示,完成任务");doneCountDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}
}

示例三:压力测试中,并发1000次

public class CountDownLatchDemo {public static void main(String[] args) {CountDownLatchDemo demo = new CountDownLatchDemo();demo.task1(1000);}/*** 压力测试中,并发1000次,准备1000个线程,同时调用一个方法*/public void task1(int num){CountDownLatch latch = new CountDownLatch(1);for(int i = 0;i<num;i++){new Thread(){@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName()+" 已经准备好。。。。。。");latch.await();System.out.println(Thread.currentThread().getName()+" 开始调用接口。。。。。。");dbservice();} catch (InterruptedException e) {e.printStackTrace();}}}.start();}LockSupport.parkNanos(1000 * 1000 * 5);latch.countDown();}public void dbservice(){System.out.println(Thread.currentThread().getName()+"正在调用service接口");}
}

CyclicBarrier

介绍

CyclicBarrier 被称为循环屏障,又被称为“线程栅栏”,构造函数需要指定线程栅栏数量;它可以循环使用;

方法
  • 构造方法: CyclicBarrier(int parties)

parties:指定有多少个部分(线程)参与,称为参与数。

  • 构造方法: CyclicBarrier(int parties, Runnable barrierAction)

barrierAction:所有参与者都到达屏障时执行一次的命令。在一组线程中的最后一个线程到达

之后(但在释放所有线程之前),在该线程中执行该命令,该命令只在每个屏障点运行一次。

若要在继续执行所有参与线程之前更新共享状态,此屏障操作很有用。

  • 等待方法:int await() throws InterruptedException, BrokenBarrierException

线程执行过程中调用await()方法,表明自己已到达屏障,自己阻塞,等待其他线程到达屏障;

当所有参与线程都到达屏障,即等待线程数==参与数,则释放所有线程,让它们继续执行。

返回int 值 是到达的当前线程的索引号,注意索引号是从parties-1开始递减到0。

BrokenBarrierException :屏障被破坏异常,当调用await时,或等待过程中屏障被破坏,则

会抛出该异常。

  • int await(long timeout, TimeUnit unit) throws

InterruptedException,BrokenBarrierException,TimeoutExceptionn

等待指定时长,如到了时间还不能释放,则将抛出TimeoutException

  • int getNumberWaiting() 获取当前等在屏障处的线程数

  • boolean isBroken() 判断屏障是否被破坏

  • void reset()

重置屏障为初始状态。如果当前有线程正在等待,则这些线程将被释放并抛出

BrokenBarrierException

使用场景

1、 数据量比较大时,实现批量插入数据到数据库;

2、 数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总;

注意事项

  • 一定要确保有足够的参与者线程,否则会一直阻塞在屏障处。

  • 在线程池中使用时要特别小心,确保池的线程数 >= 要求的参与数

实例

示例一:数据量比较大时,实现批量插入数据到数据库

public static void main(String[] args) {CyclicBarrierDemo demo = new CyclicBarrierDemo();demo.test1(4,5);
}
/*** 数据量比较大时,实现批量插入数据到数据库* 有20条数据,分5次插入数据库,每次插入需要4个线程来具体执行*/
public void test1(int threadSize,int batchSize){CyclicBarrier barrier = new CyclicBarrier(threadSize,()->{System.out.println(" 都准备好了,开始执行");});for(int i =0;i<threadSize * batchSize;i++){new Thread(){@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName()+" 准备好了。。。");barrier.await();System.out.println(Thread.currentThread().getName()+" 开始插入数据库。。。");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}.start();LockSupport.parkNanos(1000 * 1000 * 1000L);}
}

示例二:数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总

    public static void main(String[] args) {CyclicBarrierDemo demo = new CyclicBarrierDemo();demo.test2(3);}/*** 数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总*  将30天分为10天一统计,统计3次*/public void test2(int threadSize){ConcurrentHashMap<String,Integer> result = new ConcurrentHashMap<>();CyclicBarrier barrier = new CyclicBarrier(threadSize,()->{int sum = 0;final Set<Map.Entry<String, Integer>> entries = result.entrySet();for (Map.Entry<String,Integer>  entry : entries){sum += entry.getValue();}System.out.println("最总统计的结果为:"+sum);});final ExecutorService executorService = Executors.newFixedThreadPool(threadSize);for(int i=0;i<threadSize;i++){executorService.execute(()->{Random random = new Random();int val = random.nextInt(10);System.out.println("阶段统计的结果为:"+val);result.put(Thread.currentThread().getName(),val);try {barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}});}}

Phaser

介绍

Java7中增加的一个用于多阶段同步控制的工具类,他包含了CyclicBarrier和CountDownLatch的相关功能,比它们更强大灵活。

多个线程协作执行的任务分为多个阶段,每个阶段都可以有任意个参与者,线程可以随时注册并参与到某个阶段;当一个阶段中所有任务都成功完成之后,Phaser的onAdvance()被调用(可以通过覆盖添加自定义处理逻辑(类似循环屏障的使用的Runnable接口)),然后Phaser释放等待线程,自动进入下个阶段。如此循环,直到Phaser不再包含任何参与者

方法

原理:Phaser内部存在4个值,分别为:RegisteredParties、Phase、ArrivedParties、Awaits

Parties:代表同步器注册的数量;

phaser:代表返回当前所在阶段数;

ArrivedParties:代表当前到达的线程数;

Awaits:代表arriveAndAwaitAdvance等待的线程数;

具体的操作如下图:

image-20220129200918613

  • 构造方法

Phaser() 参与 任务数0

Phaser(int parties) 指定初始参与任务数

Phaser(Phaser parent) 指定父阶段器,子对象整体作为一个参与者加入到父对象,当子对象中没有参与者时,

自动从父对象解除注册。

Phaser(Phaser parent, int parties)

  • 增减参与任务数方法

int register() 增加一个数,返回当前阶段号

int bulkRegister(int parties) 增加指定个数,返回当前阶段号

int arriveAndDeregister() 减少一个任务数,返回当前阶段号

  • 到达、等待方法

int arrive() 到达(任务完成),返回当前阶段号

int arriveAndAwaitAdvance() 到达后等待其他任务到达,返回到达阶段号

int awaitAdvance(int phase) 在指定阶段等待(必须是当前阶段才有效)

int awaitAdvanceInterruptibly(int phase)

int awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit)

  • 阶段到达触发动作

protected boolean onAdvance(int phase, int registeredParties)

类似CyclicBarrier的触发命令,通过重写该方法来增加阶段到达动作,该方法返回true将终结Phaser对象。

  • 其他API

void forceTermination() 强制结束

int getPhase() 获取当前阶段号

boolean isTerminated() 判断是否结束

使用场景
  1. 大数据里面:分布式任务调度系统-阿磁卡班。
实例
/*** 大数据里面:分布式任务调度系统-阿磁卡班。*/
public class PhaserDemo1 {public static void main(String[] args) {PhaserDemo demo = new PhaserDemo();demo.phaser();}public void phaser() {int parties = 3;Phaser phaser = new Phaser() {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {int register = registeredParties;switch(phase) {case 0: System.out.println("===================万达广场,第一个节目("+register+")人参与看电影==================");break;case 1: System.out.println("===================第二个节目("+register+")人参与吃火锅==================");break;case 2: System.out.println("===================第三个节目("+register+")人参与唱歌==================");break;case 3: System.out.println("===================最后("+register+")人度过了一个愉快的夜晚==================");break;default:break;}printPhaserInfo(this);// 判断是否只剩下住线程一个参与者,是则返回true,阶段协同器终止。return phase >= 3 || registeredParties == 1;}};// 增加一个任务数,用来让主线程(男主角)全程参与phaser.register();// 让3个全程参与的子线程(女孩)加入for(int i = 0; i < parties; i++) {// 增加参与数phaser.register();new Thread(()->{stage1();phaser.arriveAndAwaitAdvance();stage2();printPhaserInfo(phaser);phaser.arriveAndAwaitAdvance();stage3();phaser.arriveAndAwaitAdvance();stage4();if(!"girl-1".equals(Thread.currentThread().getName())) {phaser.arriveAndDeregister();stage5();}else {// 完成了,注销离开phaser.arriveAndAwaitAdvance();}}, "girl-"+i).start();}// while(! phaser.isTerminated()) {int p = phaser.arriveAndAwaitAdvance();printPhaserInfo(phaser);// 中途来了两个男生吃火锅,唱歌// 为什么不能在p==2的时候进行?// 因为每个阶段都是根据人数来决定是否要开始的,在下一个阶段开始前就需要提前告知下阶段有多少人参与。if(p == 1) {stage2();for(int i = 0; i < 2; i++) {phaser.register();new Thread(()->{// 先到达集合地点stage1();phaser.arriveAndAwaitAdvance(); // 等待一起吃火锅,不能跑过来就开吃,得等其他人一起stage3();printPhaserInfo(phaser);phaser.arriveAndAwaitAdvance();    stage4();// 完成了,注销离开phaser.arriveAndDeregister();stage5();}, "boy-"+i).start();}}}}private void printPhaserInfo(Phaser phaser) {System.out.println(Thread.currentThread().getName()+" phaser:"+phaser.getPhase()+", parties: "+phaser.getRegisteredParties()+", arrived: "+phaser.getArrivedParties());}/*// 线程参与任务phaser.register(); // 增加一个任务数,返回当前阶段号phaser.bulkRegister(2);    // 增加指定任务数,返回当前阶段号phaser.arrive();   // 任务完成// 线程会阻塞,当rigester与arrives数相等时,线程继续执行,阶段phase会增大phaser.arriveAndAwaitAdvance();phaser.awaitAdvance(0);// 线程离开,并且不参与后面的阶段phaser.arriveAndDeregister();  // 减少一个任务数,返回当前阶段号*/public void stage1() {Random r = new Random();try {/*第一阶段:集合一起出发*/System.out.println(Thread.currentThread().getName()+" 从家里出发...");Thread.sleep(r.nextInt(3000));System.out.println(Thread.currentThread().getName()+" 到达集合地点:万达广场");}catch (Exception e) {e.printStackTrace();}}public void stage2() {Random r = new Random();try {/*第二阶段:去电影院看电影*/System.out.println(Thread.currentThread().getName()+" 出发去电影院...");Thread.sleep(r.nextInt(3000));System.out.println(Thread.currentThread().getName()+" 到达万达电影院");}catch (Exception e) {e.printStackTrace();}}public void stage3() {Random r = new Random();try {/*第三阶段:吃个火锅*/System.out.println(Thread.currentThread().getName()+" 出发去火锅店...");Thread.sleep(r.nextInt(3000));System.out.println(Thread.currentThread().getName()+" 到达海底捞");}catch (Exception e) {e.printStackTrace();}}public void stage4() {try {/*第四阶段:唱个歌*/System.out.println(Thread.currentThread().getName()+" 出发去KTV...");System.out.println(Thread.currentThread().getName()+" 到达麦迪KTV");}catch (Exception e) {e.printStackTrace();}}public void stage5() {Random r = new Random();try {/*第四阶段:唱歌*/System.out.println(Thread.currentThread().getName()+" 回家去...");Thread.sleep(r.nextInt(3000));System.out.println(Thread.currentThread().getName()+" 到家了");}catch (Exception e) {e.printStackTrace();}}  
}

Semaphore

介绍

Semaphore 计数信息量,又称“信号量”;控制多个线程争抢许可;

Semaphore一个计数信号量,一个信号量维护一组许可,acquire都会阻塞,直到获取到一个许可,release释放一个许可。

方法

acquire:获取一个许可,如果没有就等待,

release:释放一个许可。

availablePermits:方法得到可用的许可数目

使用场景
  1. 代码并发处理限流;
实例
public class SemaphoreDemo {public static void main(String[] args) {test(200);}/*** 数据库限制连接数*/public static void test(int num){Semaphore semaphore = new Semaphore(30);for (int i = 0;i< num ;i++){new Thread(){@Overridepublic void run() {try {semaphore.acquire();queryDB("localhost:3306");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}}}.start();}}public static void queryDB(String uri){System.out.println("do query db ......"+uri);LockSupport.parkNanos(1000 * 1000 * 1000L);}
}new Thread(){@Overridepublic void run() {try {semaphore.acquire();queryDB("localhost:3306");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}}}.start();}}public static void queryDB(String uri){System.out.println("do query db ......"+uri);LockSupport.parkNanos(1000 * 1000 * 1000L);}
}
http://www.xdnf.cn/news/587755.html

相关文章:

  • ollama+open-webui搭建可视化大模型聊天
  • 【计算机网络】TCP如何保障传输可靠性_笔记
  • Python结合ollama和stramlit开发聊天机器人
  • 栈和队列总结
  • ISO 26262-5 生产维护和报废
  • 前端性能优化的秘密武器:Preload 与 Prefetch 的深度解析
  • fatal error: uuid/uuid.h: No such file or directory 编译问题修复。
  • VS Code中Maven未能正确读取`settings.xml`中配置的新路径
  • 将MCP(ModelContextProtocol)与Semantic Kernel集成(调用github)
  • [密码学实战]使用C语言实现TCP服务端(二十九)
  • SAR ADC 的常见架构
  • 广州能源所重大突破:闪蒸焦耳加热助力粉煤灰 / 赤泥中关键金属低碳回收
  • Netty学习专栏(一):Java NIO编程与核心组件详解
  • Android View的事件分发机制
  • docker容器暴露端口的作用
  • kafka在线增加分区副本数
  • RK3588 RGA 测试
  • 工商业预付费系统组成架构及系统特点介绍
  • 【MySQL成神之路】MySQL插入、删除、更新操作汇总
  • Unity Shader入门(更新中)
  • python安装与使用
  • Java的列表、集合、数组的添加一个元素各自用的什么方法?
  • 【论文阅读】——AN EXPRESSIVE REPRESENTATION OF GENERAL 3D SHAPES
  • Linux环境基础开发工具->vim
  • 实现FAT12文件管理
  • 线性回归模型的参数估计
  • AutoMapper .net Framework 的 Model转换扩展方法
  • python学习 day5
  • 部署人工智能Qlib量化投资平台
  • 你通俗易懂的理解——线程、多线程与线程池