当前位置: 首页 > news >正文

15. 多线程(进阶2) --- CAS 和 多线程常用的类

文章目录

  • 前言
  • 一. CAS
    • 1.1. 什么是 CAS
    • 1.2. 应用场景
      • 1.2.1. 实现原子类
    • 1.3. 工作流程
    • 1.4. 基于 CAS 的自旋锁
    • 1.5. ABA 问题
  • 二. JUC 的一些组件
    • 2.1. Callable 接口
    • 2.2. ReentrantLock
    • 2.3. Semaphore
    • 2.4. CountDownLatch


前言

上个博客中,我们学习了常见的锁策略和锁的特性,这次我们学习一些新的内容 — CAS 和 多线程常用的类


一. CAS

1.1. 什么是 CAS

CAS:全称是 Compare and Swap,字面意思是 “比较并交换”,一个 CAS 涉及到以下的操作:

我们假设内存中的原数据 V,旧的预期值 A,需要修改的新值 B

  1. 比较 A 与 V 是否相等。(比较)
  2. 如果比较相等,将 B 写入 V。(交换)
  3. 返回操作是否成功

咱们谈到的 CAS ,是 CPU 的一条指令,是原子的这样的指令,就给编写多线程代码 / 线程安全 打开了新世界的大门。
CAS 本质上是 CPU 的指令,操作系统就会把这个指令进行封装,提供一些 api,就可以在 C++ 中调用。而 JVM 又是基于 C++ 实现的,JVM 也能够使用 C++ 调用这样的 CAS 操作。

1.2. 应用场景

1.2.1. 实现原子类

标准库中提供了java.util.concurrent.atomic 包,里面的类都是基于 CAS 实现的,可以实现一些基本数据类型的线程安全,例如 AtomicInteger 类.

public class Demo48 {private static AtomicInteger count = new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {Thread t1  = new Thread(()->{for (int i = 0;i<50000;i++){count.getAndIncrement(); // count++}});Thread t2  = new Thread(()->{for (int i = 0;i<50000;i++){count.getAndIncrement(); // count++}});t1.start();t2.start();t1.join();t2.join();System.out.println(count);}
}

在这里插入图片描述

1.3. 工作流程

我们先通过下面的伪代码来学习一个 单线程下 CAS 的 执行流程。

boolean CAS(address, expectValue, swapValue) {if (&address == expectedValue) {&address = swapValue;return true;}	return false;
}

address 是内存地址,expectValue是寄存器1的值,swapValue是寄存器2的值。
if条件语句的意思就是:判定内存中的值,和寄存器1的值是否一样,如果一致,就把寄存器2 中的值进行交换。
但是由于基本上只是关心交换后内存中的值,不关心寄存器2的值,此处可以把这样的操作理解成 “赋值”,
本质上是交换,基于交换下的赋值。
上面伪代码中这么多行的内容,其实只是 CPU上的一条指令。
我们把上面count.getAndIncrement();的 执行过程也使用伪代码来演示一下。

private static AtomicInteger count = new AtomicInteger(0);
count.getAndIncrement();

class AtomicInteger {private int value;public int getAndIncrement() {int oldValue = value;while ( CAS(value, oldValue, oldValue+1) != true) {oldValue = value;}return oldValue;}
}

我们现在有两个线程同时执行 count.getAndIncrement();。发生冲突时候,是怎么进行的。
在这里插入图片描述
初始 value = 0,t1 线程 先执行到 int oldValue = value,此时 oldValue = 1,然后 t2 线程抢先一步执行完所有的 getAndIncrement() 的代码之后,value 等于 1,oldValue 等于 0,
然后 切回到 t1 线程,发现 value 内存中的值 和 oldValue 寄存器的自豪不一样,CAS 交换操作是不会进行的,并且返回 false,于是在进入到循环体中,重新读取 value 到 oldValue 中

1.4. 基于 CAS 的自旋锁

我们根据 基于 CAS 的自旋锁 的伪代码来巩固一下。
伪代码:

public class SpinLock {private Thread owner = null;public void lock(){// 通过 CAS 看当前锁是否被某个线程持有.// 如果这个锁已经被别的线程持有, 那么就⾃旋等待.// 如果这个锁没有被别的线程持有, 那么就把 owner 设为当前尝试加锁的线程.while(!CAS(this.owner, null, Thread.currentThread())){}}public void unlock (){this.owner = null;}
}

当线程1 使用 lock() ,owner = null,那么进入到 CAS(this.owner,null,Thread.currentThread())中, this.owner 就给了 线程1,然后进行后续操作。
当线程2 此时使用 lock(),发现 owner 不为 null,进入到 CAS(this.owner,null,Thread.currentThread()),返回 false,直接忙等,这样就模拟实现了 自旋锁。
在这里插入图片描述

1.5. ABA 问题

使用 CAS 能够进行线程安全的编程,核心就是 先比较 “相等”,内存 和 寄存器是否相等 (这里本质上再判断是否有其他线程插入进来,做了一些修改)。
一般认为,如果发现这里寄存器和内存值一致,就可以认为是没有线程穿插到里面进行修改的,因此接下来剩余的操作都认为是线程安全的。
但是实际上,可能就存在一种情况,另一个线程,把内存中 A 改成 B,又从 B 修改成了 A。这个就是 CAS的ABA问题。
在这里插入图片描述
CAS 中的ABA问题,其实在大部分情况下,即使出现了 ABA 问题,最终的程序,一般问题不大,但是在一些极端的场景下,ABA才会出现一些严重的 bug。
举个例子:
滑稽老铁的账户初始余额为1000元,当他试图取出500元时,由于系统卡顿,他连续点击取款按钮,导致产生了两个并发执行的线程。这两个线程同时读取了账户余额为1000元,并各自准备进行取款操作。

在第一个线程(线程1)中,它读取到oldBalance为1000元,并尝试通过CAS操作将余额更新为500元(即balance = oldBalance - 500)。此时,balance成功被更新为500元。

然而,在第二个线程(线程2)中,它同样读取到了oldBalance为1000元,并准备执行相同的CAS操作。但在它执行CAS之前,第三个线程突然介入,给账户打入了500元,使得账户余额从500元变回了1000元。

当线程2执行CAS操作时,它会比较内存中的balance(此时为1000元)与它的oldBalance(也是1000元),发现两者相等,于是再次将balance减去500元,最终导致账户余额又被错误地更新为500元。

这个过程揭示了一个典型的并发问题:尽管每个线程的操作看似合理,但由于多个线程并发执行且缺乏有效的同步机制,导致最终结果不符合预期,账户余额出现了错误的变动。
在这里插入图片描述
上述问题中,使用 来判定中间是否有现成来穿插修改余额,可以加也可以减,但是如果换成 其他指标,约定,只能增加,不能减少,就能有效的避免 ABA 问题。
例如,引入 “版本号”,整数,每次修改一次余额,版本号 +1。
那能不能使用 时间 呢? 不行。因为时间会调整的,万一赶上,一次手动修改的时间的问题,就可能出现时间 “往前跳” 的情况。
天文学家给出了 闰年 的定义,在日常生活中,通过天数来弥补误差,但是在计算机中误差可是 秒级别的,天文学家会提前发出公告,要 闰秒一次,此时全世界的计算机系统的维护人员,就会手动的把系统时间往前调一秒,正常 59,59,60,闰一秒 就会 58,59,59,60, 手动把时间往前设置了一下,使得 59秒重复了2次。


二. JUC 的一些组件

JUC 的 全称是 java.util.concurrent 。

2.1. Callable 接口

我们先举个例子,例如,创建线程实现 1+2+3+4+…+100,
步骤:

  • 创建一个 匿名内部类,实现 Callable 接口。Callable 带有泛型参数,泛型参数表示返回值的类型
  • 重写 Callable 的 call 方法,完成累加的过程。直接通过返回值返回计算结果
  • 把 Callable 实例使用 FutureTask 包装一下
  • 创建线程,线程的构造方法传入 FutureTask,此时新线程就会执行 FutureTask 内部的 Callable的 call方法,完成计算,计算结果就会放到了 FutureTask 对象中。
  • 在主线程中调用 futureTask.get() 能够阻塞等待新线程计算完。并获取到 FutureTask 中的结果
public class Demo49 {public static void main(String[] args) throws ExecutionException, InterruptedException {Callable<Integer> callable = new Callable<Integer>() {@Overridepublic Integer call() throws Exception {int result = 0;for(int i =1;i<=100;i++){result += i;}return result;}};FutureTask<Integer> futureTask = new FutureTask<>(callable);Thread t = new Thread(futureTask);t.start();// get 操作就是获取到 FutureTask 的返回值,这个返回值就来自于 Callable 的 call 方法// get 可能会阻塞,如果当前线程执行完毕,get拿到返回结果// 如果当前线程还没有执行完毕,get就会一直阻塞System.out.println(futureTask.get());}
}

理解 Callable
Callable 和 Runnable 相对,都是描述一个 “任务”。Callable 描述的是带有返回值的任务,Runnable 描述的事不带返回值的任务。
Callable 通过需要搭配 FutureTask来使用。FutureTask 用来保存 Callable 的返回结果。因为 Callable 往往是在另一个线程中执行的,什么时候执行完并不清楚。
FutureTask 就可以负责这个等待结果出来的工作。

2.2. ReentrantLock

ReentrantLock 是可重入互斥锁,和 synchronized 定位类似,都是用来实现互斥效果,保证线程安全。
ReentrantLock 的用法:

  • lock(): 加锁,如果获取不到锁就死等
  • trylock(等待时间): 加锁,如果获取不到锁,等待一定的时间之后就放弃加锁
  • unlock(): 解锁
public class Demo50 {private static int count = 0;public static void main(String[] args) throws InterruptedException {ReentrantLock locker = new ReentrantLock(true);Thread t1 = new Thread(()->{for (int i = 0; i < 50000; i++) {locker.lock();try {count++;}finally {locker.unlock();}}});Thread t2 = new Thread(()->{for(int i = 0;i<50000;i++){locker.lock();count++;locker.unlock();}});t1.start();t2.start();t1.join();t2.join();System.out.println(count);}
}

两个线程:
t1:循环50,000次,每次获取锁后对count递增,用try-finally确保释放锁。
t2:循环50,000次,直接获取锁后递增count并释放锁(无异常处理)。
synchronized 和 ReentrantLock 的区别:

  1. synchronized 是 关键字 (内部是通过 C++ 实现的),ReentrantLock 是标准库的类

  2. synchronized 通过代码块控制加锁,解锁,ReentrantLock 需要 lock / unlock 方法,需要注意 unclock 不被调用的问题

  3. ReentrantLock 除了提供lock,unlock之外,还提供了一个方法,tryLock()
    tryLock(): 不会阻塞,加锁成功,返回 true,加锁失败,返回 false
    调用者可以通过判定返回值,决定下面该怎么做,
    并且也可以设置超时时间,等待时间到达超时后,再返回 true / false

  4. ReentrantLock 提供了公平锁的实现,默认是非公平锁
    在这里插入图片描述

  5. ReentrantLock 搭配的事等待通知机制,是 Condition类,相比于 wait notify 来说,功能更加强大

如果选择使用哪个锁?

  • 锁竞争不激烈的时候,使用 synchronized,效率更高,自动释放更方便
  • 锁竞争激烈的时候,使用 ReentrantLock,搭配 trylock 更灵活控制加锁的行为,而不是死等
  • 如果需要使用公平锁,使用 ReentrantLock

2.3. Semaphore

信号量,用来表示 “可用资源个数”。本质上就是一个计数器

可以把信号量想象成是停车场的展示牌:当前有 100个车位,表示有 100 个可用资源。
当有车开进行的时候,就相当于申请一个可用资源,可用车位就 -1 (这个称为信号量的 P 操作)
当有车开出来的时候,就相当于释放一个可用资源,可用车位就 +1 (这个称为信号量的 V 操作)
如果计数器的值已经为0了,还尝试申请资源,就会阻塞等待,直到有其他线程释放资源

Semaphore 的 PV 操作中的加减操作都是原子的,可以在多线程环境下直接使用。

public class Demo51 {public static void main(String[] args) throws InterruptedException {Semaphore semaphore = new Semaphore(4);semaphore.acquire();System.out.println("进行一次 p 操作");semaphore.acquire();System.out.println("进行一次 p 操作");semaphore.acquire();System.out.println("进行一次 p 操作");semaphore.acquire();System.out.println("进行一次 p 操作");}
}

在这里插入图片描述
如果信号量设置为3,就会阻塞等待,就会报错。

信号量有一个特殊情况,初始值为 1 的信号量,取值要么是 1 要么是 0 (二元信号量)
等价于 “锁” (普通的信号量,就相当于锁更广泛的推广)
如果是普通的N信号量,就可以限制,同时有多少个线程来执行某个逻辑。

public class Demo44 {private static int count = 0;public static void main(String[] args) throws InterruptedException {Semaphore semaphore = new Semaphore(1);Thread t1 = new Thread(() -> {for (int i = 0; i < 50000; i++) {try {semaphore.acquire();count++;semaphore.release();} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(()->{for (int i = 0;i < 50000;i++){try {semaphore.acquire();count++;semaphore.release();} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();t1.join();t2.join();System.out.println(count);}
}

2.4. CountDownLatch

同时等待 N 个任务执行结束

好比如是跑步比赛,10个选手依次就为,哨声响才同时出发;所有选手都通过终点,才公布成绩

  • 构造 CountDownLatch 实例,初始化 10 表示有 10 个任务需要完成
  • 每次任务执行完毕,都调用 latch.countDown() . 在 CountDownLatch 内部的计数器同时自减
  • 主线程中使用 latch.await(); 阻塞等待所有任务执行完毕,相当于计数器为0

代码实例:

  1. 将一个大型任务拆分成 10 个子任务
  2. 使用线程池(4个线程)并行执行这些子任务
  3. 使用 CountDownLatch 来等待所有子任务完成
  4. 在所有子任务完成后,继续执行主线程
public class Demo52 {private static int count = 0;public static void main(String[] args) throws InterruptedException {Semaphore semaphore = new Semaphore(1);Thread t1 = new Thread(()->{for (int i = 0; i <50000; i++) {try {semaphore.acquire();count++;semaphore.release();} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(()->{for (int i = 0; i <50000; i++) {try {semaphore.acquire();count++;semaphore.release();} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();t1.join();t2.join();System.out.println(count);}
}
http://www.xdnf.cn/news/1341703.html

相关文章:

  • Mokker AI:一键更换照片背景的AI神器
  • 粗粮厂的基于flink的汽车实时数仓解决方案
  • selenium一些进阶方法如何使用
  • K8s快速上手-微服务篇
  • 机器学习中的聚类与集成算法:从基础到应用
  • 前端视频流处理从 0 到 “能跑”:可复制 Demo+WebGL/Worker 优化,覆盖会议 / 直播 / 监控场景
  • 【尝试】在macOS上安装cvat
  • 【51单片机】【protues仿真】基于51单片机水位监测系统
  • 鸿蒙开发中的List组件详解
  • 机器学习-集成算法
  • Django的生命周期
  • 项目1总结其三(图片上传功能)
  • leetcode-python-242有效的字母异位词
  • 阿里巴巴推出Qoder:面向真实软件开发的智能编程平台
  • 计算机视觉(opencv)实战六——图像形态学(腐蚀、膨胀、开运算、闭运算、梯度、顶帽、黑帽)
  • 计算机视觉第一课opencv(三)保姆级教学
  • 大语言模型原理(Transformer架构)
  • c# 和 c++ 怎样结合
  • Chrome 插件开发实战:从入门到进阶
  • Docker:安装配置
  • 【框架篇二】FastAPI路由与请求处理
  • Linux 网络命令大全
  • uniapp 自定义组件封装、easycom匹配规则
  • 2025-08-21 Python进阶4——错误和异常
  • 用 Python 写的自动化测试 WPF 程序的一个案例
  • 【GaussDB】使用gdb定位GaussDB编译package报错
  • Spring Boot整合Amazon SNS实战:邮件订阅通知系统开发
  • 第三阶段数据库-6:sql中函数,多表查询,运算符,索引,约束
  • 我从零开始学微积分(2)- 函数与图形
  • 与森马品牌代言人王安宇专注日常力量,再启新常服故事