并发编程之并发协同工具类
并发编程之并发协同工具类
认识并发协同
什么是并发协同
多个线程并发,协作来完成一件事情的过程中,因事情处理的需要,要控制某些线程阻塞,等待另一些线程完成某部分事情,再继续执行的过程。
并发协同原理:等待(阻塞)-> 通知(唤醒)
分析并发系统问题的常用思路:
- 并发的是什么?
- 在什么地方需要协同?
- 谁该等待?谁该通知?
并发协同的实现
多线程并发协同都是基于“条件等待-通知
”模式:
- 方式一:传统的基于synchronized及Object的wait、notify、notifyAll监视器方法的方式;
- 方式二:基于JUC包的LockSupport的park、uppark方法的等待-通知方式;
- 方式三:基于JUC包的Lock及Condition的await、singal方法的等待-通知方式;
- 方式四:基于JUC包提供的并发协同工具类API,来非常方便地实现多线程并发协同;
并发协同的利器
- CountDownLatch :倒计数锁存器
- CyclicBarrier : 循环屏障
- Phaser :阶段协同器/相位器
- Semaphore : 计数信息量
并发协同利器使用
CountDownLatch
介绍
CountDownLatch被称为倒计数锁存器,有一个构造器,传入一个参数,指定倒计数的大小,当指定的倒计数不为0时,则线程阻塞等待,当指定的倒计数为0时,执行下一步操作;
每个CountDownLatch对象,只可使用一次,计数变为0后,就不可再用了
方法
- 构造 方法: CountDownLatch(int count)
count 指定等待的条件数(操作数、任务数),不可再更改。
- 等待方法:await()
await()阻塞等待线程直到条件都满足(count等待条件计数减少到0)。
如果count已是0,则不会阻塞,继续执行。
- 条件完成减计数方法: countDown()
每一条件在完成时,都调用countDown()来对count计数减一。
- boolean await(long timeout, TimeUnit unit)
阻塞等待最多多长时间。返回true表示等待条件到达;false表示条件未
到达,但时间到了。
- long getCount()
获取当前计数值。该方法常用于调试或测试。
使用场景
- 统计线程执行的情况,比如统计子线程总计花费的时长;
- 压力测试中,使用CountDownLatch实现最大程度的并发处理;
- 多线程之间,互相通信,比如线程异步调用完接口,结果通知;
实例
示例一:
指定倒计数的大小,当指定的倒计数不为0时,则线程阻塞等待,当指定的倒计数为0时,执行下一步操作;
public static void main(String[] args) {test1();//new CountDownLatch(1) 当传入的参数大于0,阻塞等待test2();//new CountDownLatch(0),打印输入:接着执行任务}
public static void test1(){CountDownLatch latch = new CountDownLatch(1);try {latch.await();System.out.println("接着执行任务");} catch (InterruptedException e) {e.printStackTrace();}}public static void test2(){CountDownLatch latch = new CountDownLatch(0);try {latch.await();System.out.println("接着执行任务");} catch (InterruptedException e) {e.printStackTrace();}}
示例二:并发协同,多线程之间,互相通信
/*** 多个线程协作完成一件大的事情 CountDownLatch实现* 1.教官吹响集结号,等待士兵集合* 2.士兵开始起床准备,集合等待教官命令* 3.教官下达命令,等待士兵完成任务* 4.士兵完成*/
public class CountDownLatchDemo {public static void main(String[] args) {CountDownLatchDemo demo = new CountDownLatchDemo();demo.task();}public void task(){//准备教官等待士兵的CountDownLatchCountDownLatch startCountDownLatch = new CountDownLatch(1);//准备士兵等待教官的CountDownLatchCountDownLatch doneCountDownLatch = new CountDownLatch(10);//教官吹响集结号,等待士兵集合System.out.println(Thread.currentThread().getName()+"> 教官吹响集结号,等待士兵集合......");for(int i =0 ;i<10 ; i++){new Thread(new Worker(startCountDownLatch,doneCountDownLatch,i)).start();}try {Thread.sleep(1000 * 3);startCountDownLatch.countDown();System.out.println(Thread.currentThread().getName()+"> 教官开始下达命令,等待士兵完成任务");doneCountDownLatch.await();System.out.println(Thread.currentThread().getName()+"> 教官开始验收任务完成情况");} catch (InterruptedException e) {e.printStackTrace();}}}
class Worker implements Runnable{private CountDownLatch startCountDownLatch;private CountDownLatch doneCountDownLatch;private int i;public Worker(CountDownLatch startCountDownLatch, CountDownLatch doneCountDownLatch, int i) {this.startCountDownLatch = startCountDownLatch;this.doneCountDownLatch = doneCountDownLatch;this.i = i;}public void run() {System.out.println(Thread.currentThread().getName()+"> 士兵: "+this.i+" 听到集结号,开始起床,跑向操场,等待命令......");try {startCountDownLatch.await();System.out.println(Thread.currentThread().getName()+"> 士兵: "+this.i+" 按照指示,完成任务");doneCountDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}
}
示例三:压力测试中,并发1000次
public class CountDownLatchDemo {public static void main(String[] args) {CountDownLatchDemo demo = new CountDownLatchDemo();demo.task1(1000);}/*** 压力测试中,并发1000次,准备1000个线程,同时调用一个方法*/public void task1(int num){CountDownLatch latch = new CountDownLatch(1);for(int i = 0;i<num;i++){new Thread(){@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName()+" 已经准备好。。。。。。");latch.await();System.out.println(Thread.currentThread().getName()+" 开始调用接口。。。。。。");dbservice();} catch (InterruptedException e) {e.printStackTrace();}}}.start();}LockSupport.parkNanos(1000 * 1000 * 5);latch.countDown();}public void dbservice(){System.out.println(Thread.currentThread().getName()+"正在调用service接口");}
}
CyclicBarrier
介绍
CyclicBarrier 被称为循环屏障,又被称为“线程栅栏”,构造函数需要指定线程栅栏数量;它可以循环使用;
方法
- 构造方法: CyclicBarrier(int parties)
parties:指定有多少个部分(线程)参与,称为参与数。
- 构造方法: CyclicBarrier(int parties, Runnable barrierAction)
barrierAction:所有参与者都到达屏障时执行一次的命令。在一组线程中的最后一个线程到达
之后(但在释放所有线程之前),在该线程中执行该命令,该命令只在每个屏障点运行一次。
若要在继续执行所有参与线程之前更新共享状态,此屏障操作很有用。
- 等待方法:int await() throws InterruptedException, BrokenBarrierException
线程执行过程中调用await()方法,表明自己已到达屏障,自己阻塞,等待其他线程到达屏障;
当所有参与线程都到达屏障,即等待线程数==参与数,则释放所有线程,让它们继续执行。
返回int 值 是到达的当前线程的索引号,注意索引号是从parties-1开始递减到0。
BrokenBarrierException :屏障被破坏异常,当调用await时,或等待过程中屏障被破坏,则
会抛出该异常。
- int await(long timeout, TimeUnit unit) throws
InterruptedException,BrokenBarrierException,TimeoutExceptionn
等待指定时长,如到了时间还不能释放,则将抛出TimeoutException
int getNumberWaiting() 获取当前等在屏障处的线程数
boolean isBroken() 判断屏障是否被破坏
void reset()
重置屏障为初始状态。如果当前有线程正在等待,则这些线程将被释放并抛出
BrokenBarrierException
使用场景
1、 数据量比较大时,实现批量插入数据到数据库;
2、 数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总;
注意事项
-
一定要确保有足够的参与者线程,否则会一直阻塞在屏障处。
-
在线程池中使用时要特别小心,确保池的线程数 >= 要求的参与数
实例
示例一:数据量比较大时,实现批量插入数据到数据库
public static void main(String[] args) {CyclicBarrierDemo demo = new CyclicBarrierDemo();demo.test1(4,5);
}
/*** 数据量比较大时,实现批量插入数据到数据库* 有20条数据,分5次插入数据库,每次插入需要4个线程来具体执行*/
public void test1(int threadSize,int batchSize){CyclicBarrier barrier = new CyclicBarrier(threadSize,()->{System.out.println(" 都准备好了,开始执行");});for(int i =0;i<threadSize * batchSize;i++){new Thread(){@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName()+" 准备好了。。。");barrier.await();System.out.println(Thread.currentThread().getName()+" 开始插入数据库。。。");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}.start();LockSupport.parkNanos(1000 * 1000 * 1000L);}
}
示例二:数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总
public static void main(String[] args) {CyclicBarrierDemo demo = new CyclicBarrierDemo();demo.test2(3);}/*** 数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总* 将30天分为10天一统计,统计3次*/public void test2(int threadSize){ConcurrentHashMap<String,Integer> result = new ConcurrentHashMap<>();CyclicBarrier barrier = new CyclicBarrier(threadSize,()->{int sum = 0;final Set<Map.Entry<String, Integer>> entries = result.entrySet();for (Map.Entry<String,Integer> entry : entries){sum += entry.getValue();}System.out.println("最总统计的结果为:"+sum);});final ExecutorService executorService = Executors.newFixedThreadPool(threadSize);for(int i=0;i<threadSize;i++){executorService.execute(()->{Random random = new Random();int val = random.nextInt(10);System.out.println("阶段统计的结果为:"+val);result.put(Thread.currentThread().getName(),val);try {barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}});}}
Phaser
介绍
Java7中增加的一个用于多阶段同步控制的工具类,他包含了CyclicBarrier和CountDownLatch的相关功能,比它们更强大灵活。
多个线程协作执行的任务分为多个阶段,每个阶段都可以有任意个参与者,线程可以随时注册并参与到某个阶段;当一个阶段中所有任务都成功完成之后,Phaser的onAdvance()被调用(可以通过覆盖添加自定义处理逻辑(类似循环屏障的使用的Runnable接口)),然后Phaser释放等待线程,自动进入下个阶段。如此循环,直到Phaser不再包含任何参与者
方法
原理:Phaser内部存在4个值,分别为:RegisteredParties、Phase、ArrivedParties、Awaits
Parties:代表同步器注册的数量;
phaser:代表返回当前所在阶段数;
ArrivedParties:代表当前到达的线程数;
Awaits:代表arriveAndAwaitAdvance等待的线程数;
具体的操作如下图:
- 构造方法
Phaser() 参与 任务数0
Phaser(int parties) 指定初始参与任务数
Phaser(Phaser parent) 指定父阶段器,子对象整体作为一个参与者加入到父对象,当子对象中没有参与者时,
自动从父对象解除注册。
Phaser(Phaser parent, int parties)
- 增减参与任务数方法
int register() 增加一个数,返回当前阶段号
int bulkRegister(int parties) 增加指定个数,返回当前阶段号
int arriveAndDeregister() 减少一个任务数,返回当前阶段号
- 到达、等待方法
int arrive() 到达(任务完成),返回当前阶段号
int arriveAndAwaitAdvance() 到达后等待其他任务到达,返回到达阶段号
int awaitAdvance(int phase) 在指定阶段等待(必须是当前阶段才有效)
int awaitAdvanceInterruptibly(int phase)
int awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit)
- 阶段到达触发动作
protected boolean onAdvance(int phase, int registeredParties)
类似CyclicBarrier的触发命令,通过重写该方法来增加阶段到达动作,该方法返回true将终结Phaser对象。
- 其他API
void forceTermination() 强制结束
int getPhase() 获取当前阶段号
boolean isTerminated() 判断是否结束
使用场景
- 大数据里面:分布式任务调度系统-阿磁卡班。
实例
/*** 大数据里面:分布式任务调度系统-阿磁卡班。*/
public class PhaserDemo1 {public static void main(String[] args) {PhaserDemo demo = new PhaserDemo();demo.phaser();}public void phaser() {int parties = 3;Phaser phaser = new Phaser() {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {int register = registeredParties;switch(phase) {case 0: System.out.println("===================万达广场,第一个节目("+register+")人参与看电影==================");break;case 1: System.out.println("===================第二个节目("+register+")人参与吃火锅==================");break;case 2: System.out.println("===================第三个节目("+register+")人参与唱歌==================");break;case 3: System.out.println("===================最后("+register+")人度过了一个愉快的夜晚==================");break;default:break;}printPhaserInfo(this);// 判断是否只剩下住线程一个参与者,是则返回true,阶段协同器终止。return phase >= 3 || registeredParties == 1;}};// 增加一个任务数,用来让主线程(男主角)全程参与phaser.register();// 让3个全程参与的子线程(女孩)加入for(int i = 0; i < parties; i++) {// 增加参与数phaser.register();new Thread(()->{stage1();phaser.arriveAndAwaitAdvance();stage2();printPhaserInfo(phaser);phaser.arriveAndAwaitAdvance();stage3();phaser.arriveAndAwaitAdvance();stage4();if(!"girl-1".equals(Thread.currentThread().getName())) {phaser.arriveAndDeregister();stage5();}else {// 完成了,注销离开phaser.arriveAndAwaitAdvance();}}, "girl-"+i).start();}// while(! phaser.isTerminated()) {int p = phaser.arriveAndAwaitAdvance();printPhaserInfo(phaser);// 中途来了两个男生吃火锅,唱歌// 为什么不能在p==2的时候进行?// 因为每个阶段都是根据人数来决定是否要开始的,在下一个阶段开始前就需要提前告知下阶段有多少人参与。if(p == 1) {stage2();for(int i = 0; i < 2; i++) {phaser.register();new Thread(()->{// 先到达集合地点stage1();phaser.arriveAndAwaitAdvance(); // 等待一起吃火锅,不能跑过来就开吃,得等其他人一起stage3();printPhaserInfo(phaser);phaser.arriveAndAwaitAdvance(); stage4();// 完成了,注销离开phaser.arriveAndDeregister();stage5();}, "boy-"+i).start();}}}}private void printPhaserInfo(Phaser phaser) {System.out.println(Thread.currentThread().getName()+" phaser:"+phaser.getPhase()+", parties: "+phaser.getRegisteredParties()+", arrived: "+phaser.getArrivedParties());}/*// 线程参与任务phaser.register(); // 增加一个任务数,返回当前阶段号phaser.bulkRegister(2); // 增加指定任务数,返回当前阶段号phaser.arrive(); // 任务完成// 线程会阻塞,当rigester与arrives数相等时,线程继续执行,阶段phase会增大phaser.arriveAndAwaitAdvance();phaser.awaitAdvance(0);// 线程离开,并且不参与后面的阶段phaser.arriveAndDeregister(); // 减少一个任务数,返回当前阶段号*/public void stage1() {Random r = new Random();try {/*第一阶段:集合一起出发*/System.out.println(Thread.currentThread().getName()+" 从家里出发...");Thread.sleep(r.nextInt(3000));System.out.println(Thread.currentThread().getName()+" 到达集合地点:万达广场");}catch (Exception e) {e.printStackTrace();}}public void stage2() {Random r = new Random();try {/*第二阶段:去电影院看电影*/System.out.println(Thread.currentThread().getName()+" 出发去电影院...");Thread.sleep(r.nextInt(3000));System.out.println(Thread.currentThread().getName()+" 到达万达电影院");}catch (Exception e) {e.printStackTrace();}}public void stage3() {Random r = new Random();try {/*第三阶段:吃个火锅*/System.out.println(Thread.currentThread().getName()+" 出发去火锅店...");Thread.sleep(r.nextInt(3000));System.out.println(Thread.currentThread().getName()+" 到达海底捞");}catch (Exception e) {e.printStackTrace();}}public void stage4() {try {/*第四阶段:唱个歌*/System.out.println(Thread.currentThread().getName()+" 出发去KTV...");System.out.println(Thread.currentThread().getName()+" 到达麦迪KTV");}catch (Exception e) {e.printStackTrace();}}public void stage5() {Random r = new Random();try {/*第四阶段:唱歌*/System.out.println(Thread.currentThread().getName()+" 回家去...");Thread.sleep(r.nextInt(3000));System.out.println(Thread.currentThread().getName()+" 到家了");}catch (Exception e) {e.printStackTrace();}}
}
Semaphore
介绍
Semaphore 计数信息量,又称“信号量”;控制多个线程争抢许可;
Semaphore一个计数信号量,一个信号量维护一组许可,acquire都会阻塞,直到获取到一个许可,release释放一个许可。
方法
acquire:获取一个许可,如果没有就等待,
release:释放一个许可。
availablePermits:方法得到可用的许可数目
使用场景
- 代码并发处理限流;
实例
public class SemaphoreDemo {public static void main(String[] args) {test(200);}/*** 数据库限制连接数*/public static void test(int num){Semaphore semaphore = new Semaphore(30);for (int i = 0;i< num ;i++){new Thread(){@Overridepublic void run() {try {semaphore.acquire();queryDB("localhost:3306");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}}}.start();}}public static void queryDB(String uri){System.out.println("do query db ......"+uri);LockSupport.parkNanos(1000 * 1000 * 1000L);}
}new Thread(){@Overridepublic void run() {try {semaphore.acquire();queryDB("localhost:3306");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}}}.start();}}public static void queryDB(String uri){System.out.println("do query db ......"+uri);LockSupport.parkNanos(1000 * 1000 * 1000L);}
}