当前位置: 首页 > news >正文

Java 并发编程

黑马程序员深入学习Java并发编程

进程与线程

预备知识

java8,pom.xml

<dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency>
</dependencies>

logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configurationxmlns="http://ch.qos.logback/xml/ns/logback"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://ch.qos.logback/xml/ns/logback logback.xsd"><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%date{HH:mm:ss} [%t] %logger - %m%n</pattern></encoder></appender><logger name="c" level="debug" additivity="false"><appender-ref ref="STDOUT"/></logger><root level="ERROR"><appender-ref ref="STDOUT"/></root>
</configuration>

进程和线程、并发和并行、同步和异步的概念参考课后文档

注意:单核 cpu 下只有并发,多线程不能实际提高程序运行效率,只是为了能够在不同的任务之间切换;多核 cpu 既有并发也有并行,可以真正提高运行效率。

Java线程

创建和运行线程

方法一:直接使用Thread,用了匿名内部类的写法

Thread t1 = new Thread("t1") {@Overridepublic void run() {log.debug("hello");}
};
t1.start(); //交给操作系统的任务调度器

方法二:Runnable接口,任务和线程分开,更推荐这种写法

Runnable r = () -> {log.debug("hello");
};
Thread t1 = new Thread(r, "t1");
t1.start();

Thread源码:run()方法里,如果指定了Runnable,就优先使用Runnable对象的run()方法

方法三:FutureTask接收Callable类型的参数,处理有返回值的情况。
FutureTask间接实现了Runnable接口,get方法可以获取任务的执行结果。

FutureTask<Integer> task = new FutureTask<>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception { //可以用lambda表达式简化log.debug("hello");return 100;}
});
Thread t1 = new Thread(task, "t1");
t1.start();
System.out.println(task.get()); //get()是一个阻塞方法

查看和杀死进程、 jconsole

参考课后文档

栈帧原理

每个线程启动后,虚拟机就会为其分配一块栈内存。每个栈由多个 栈帧(Frame) 组成,对应着每次方法调用时所占用的内存。每个线程只能有一个 活动栈帧,对应着当前正在执行的那个方法

局部变量存放在栈帧内部的局部变量表中。

栈帧图解:
在这里插入图片描述
类加载:将类的字节码加载到java虚拟机中。字节码放在方法区
程序计数器:指向下一条要执行的JVM指令

多线程、上下文切换

断点模式要选成Thread,不要选All,否则看不到两个线程同时运行的效果

每个线程都有独立的栈内存

线程上下文切换:具体看课后文档

当线程上下文发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念就是程序计数器,它的作用是记住下一条 jvm 指令的执行地址,是线程私有的

状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等。
线程上下文频繁发生会影响性能(线程不是越多越好)
在这里插入图片描述

常见方法

调用 sleep() 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞)
其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException

yield() 会从运行状态进入就绪状态,具体实现依赖操作系统的任务调度器(可能出现想让但没让出去的情况)

setPriority() 设置线程优先级,会提示调度器优先调度该线程,但它仅仅是一个提示,调度器可以忽略它。
如果 cpu 比较忙,那么优先级高的线程会获得更多的时间片,但 cpu 闲时,优先级几乎没作用

sleep() 的一个应用:
在没有利用 cpu 来计算时,不要让 while(true) 空转浪费 cpu,这时可以使用 yield 或 sleep 来让出 cpu 的使用权给其他程序

while(true) {try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}
}

join() 同步等待线程,可以带参数n,表示最多等待n毫秒

过时的方法:stop()、suspend()、resume(),这三个方法会破坏同步代码块,造成死锁

常见方法(interrupt方法)

打断阻塞中的线程(wait、sleep、join),会清空打断状态,并抛出异常。只有打断非阻塞中的线程,打断标记才会变成true。

interrupt() 并不会真正打断线程,而是设置一个布尔类型的打断标记。想让线程停下来,依靠的是利用 isInterrupted() 来设置判断条件,手动设置线程什么时候停止。

当打断标记为true时,遇到阻塞的情况会抛出异常,并且将打断标记置为false

两阶段终止模式(interrupt)

在一个线程T1里优雅地终止线程T2

例:系统健康状态监控(定时监控),判断是否被打断要分两种情况:运行状态下被打断和睡眠中被打断
在这里插入图片描述

public class JUC_test {public static void main(String[] args) throws ExecutionException, InterruptedException {TwoPhaseTermination tpt = new TwoPhaseTermination();tpt.start();Thread.sleep(8000);tpt.stop();}
}
@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination {private Thread monitor;//启动监控线程public void start() {monitor = new Thread(() -> {while (true) {Thread current = Thread.currentThread();if(current.isInterrupted()) {//料理后事log.debug("料理后事...");break;}try {Thread.sleep(2000);log.debug("执行监控记录...");} catch (InterruptedException e) { //睡眠中被打断e.printStackTrace();//重新设置打断标记current.interrupt();}}});monitor.start();}//停止监控线程public void stop() {monitor.interrupt();}
}

注意:线程有一个静态方法 interrupted(),也是判断是否被打断,但是该方法会清除打断标记

interrupt打断park线程

静态方法 LockSupport.park() 也可以将线程暂停,interrupt 打断park线程不会清除打断标记,也不会抛异常

如果打断标记为true,那么park()将无法暂停线程。此时要用 interrupted() 清除打断标记

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws ExecutionException, InterruptedException {Thread t1 = new Thread(() -> {log.debug("park...");LockSupport.park();log.debug("unpark...");log.debug("打断状态:{}", Thread.interrupted()); //打断标记设为falseLockSupport.park(); //如果打断标记为true,该方法不能暂停线程log.debug("unpark...");}, "t1");t1.start();Thread.sleep(2000);t1.interrupt();}
}

守护线程

默认情况下,Java 进程需要等待所有线程都运行结束,才会结束。有一种特殊的线程叫做守护线程,只要其它非守护线程运行结束了,即使守护线程的代码没有执行完,也会强制结束。

Thread t1 = new Thread(() -> {log.debug("开始运行...");sleep(5);log.debug("运行结束...");
}, "daemon");
// 设置该线程为守护线程
t1.setDaemon(true);
t1.start();
sleep(1);
log.debug("运行结束...");

垃圾回收器线程就是一种守护线程

Tomcat 中的 Acceptor 和 Poller 线程都是守护线程,所以 Tomcat 接收到 shutdown 命令后,不会等
待它们处理完当前请求

五种状态

五种状态是从 操作系统 层面来描述的
请添加图片描述

  • 【初始状态】:仅是在语言层面创建了线程对象,还未与操作系统线程关联
  • 【可运行状态】:(就绪状态)指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行
  • 【运行状态】:指获取了 CPU 时间片运行中的状态。当 CPU 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换
  • 【阻塞状态】:如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入【阻塞状态】。等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】。与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们
  • 【终止状态】表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态

六种状态

六种状态是从 Java API 层面来描述的(Thread类的源码指定了六种state)
请添加图片描述

  • NEW:线程刚被创建,但是还没有调用 start() 方法
  • RUNNABLE:调用了 start() 方法之后,注意,Java API 层面的 RUNNABLE 状态涵盖了 操作系统 层面的【可运行状态】、【运行状态】和 【阻塞状态】(由于 BIO 导致的线程阻塞,在 Java 里无法区分,仍然认为是可运行)
  • BLOCKED,WAITING,TIMED_WAITING 都是 Java API 层面对【阻塞状态】的细分,后面会在状态转换一节详述
  • TERMINATED:线程代码运行结束

注:IO读写是操作系统层面的阻塞,为RUNNABLE状态;sleep的线程为TIMED_WAITING状态;没有拿到锁的线程为BLOCKED状态,join的线程为WAITING状态

共享模型之管程 - 线程安全分析

上下文切换分析

两个线程,一个自增5000次,一个自减5000次,结果不为0

public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() -> {for (int i = 0; i < 5000; i++) {counter++;}}, "t1");Thread t2 = new Thread(() -> {for (int i = 0; i < 5000; i++) {counter--;}}, "t2");t1.start();t2.start();t1.join();t2.join();log.debug("{}",counter);
}

问题分析:Java 中对静态变量的自增,自减并不是原子操作。

i ++ 对应的JVM指令:

getstatic i // 获取静态变量i的值
iconst_1 // 准备常量1
iadd // 自增
putstatic i // 将修改后的值存入静态变量i

i -- 对应的JVM指令:

getstatic i // 获取静态变量i的值
iconst_1 // 准备常量1
isub // 自减
putstatic i // 将修改后的值存入静态变量i

临界区与竞态条件

多个线程对共享资源进行读写操作时发生指令交错,就会出现问题

一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区

static int counter = 0;
static void increment()
// 临界区
{counter++;
}
static void decrement()
// 临界区
{counter--;
}

多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件

为了避免临界区的竞态条件发生,有多种手段可以达到目的:

  • 阻塞式的解决方案:synchronized,Lock
  • 非阻塞式的解决方案:原子变量

synchronized

static int counter = 0;
static final Object room = new Object();
public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() -> {for (int i = 0; i < 5000; i++) {synchronized (room) {counter++;}}}, "t1");Thread t2 = new Thread(() -> {for (int i = 0; i < 5000; i++) {synchronized (room) {counter--;}}}, "t2");t1.start();t2.start();t1.join();t2.join();log.debug("{}",counter);
}

注意:

  • 线程A拿到了锁,其他拿不到锁的线程会进入阻塞状态(BLOCKED);
  • 即使线程A的时间片用完了,锁也不会被释放掉
  • 线程A执行完业务,释放了锁,其他线程会被唤醒

synchronized加在方法上

synchronized加在成员方法上,相当于锁住的是this对象

class Test {public synchronized void test() {}
}
等价于
class Test {public void test() {synchronized(this) {}}
}

synchronized加在静态方法上,相当于锁住的是类对象

class Test{public synchronized static void test() {}
}
等价于
class Test{public static void test() {synchronized(Test.class) {}}
}

线程八锁

情况三:易错

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) {Number n1 = new Number();new Thread(()->{ n1.a(); }).start();new Thread(()->{ n1.b(); }).start();new Thread(()->{ n1.c(); }).start();}
}
@Slf4j(topic = "c.Number")
class Number {public synchronized void a() throws InterruptedException {sleep(1);log.debug("1");}public synchronized void b() {log.debug("2");}public void c() {log.debug("3");}
}

线程安全分析

public static void test1() {int i = 10;i++;
}

以上方法不会出现线程安全问题。因为每个线程调用 test1() 方法时,局部变量 i 会在每个线程的栈帧内存中被创建多份,因此不存在共享。
在这里插入图片描述

局部变量引用

成员变量是一个对象的引用,可能会存在线程安全问题:

@Slf4j(topic = "c.JUC_test")
public class JUC_test {static final int THREAD_NUMBER = 2;static final int LOOP_NUMBER = 200;public static void main(String[] args) {ThreadUnsafe test = new ThreadUnsafe();for (int i = 0; i < THREAD_NUMBER; i++) {new Thread(() -> {test.method1(LOOP_NUMBER);}, "Thread" + i).start();}}
}
@Slf4j(topic = "c.ThreadUnsafe")
class ThreadUnsafe {ArrayList<String> list = new ArrayList<>();public void method1(int loopNumber) {for (int i = 0; i < loopNumber; i++) {//临界区, 会产生竞态条件method2();method3();}}private void method2() {list.add("1");}private void method3() {list.remove(0);}
}

请添加图片描述
将ArrayList改成method1里的局部变量,就不会出现问题了:

@Slf4j(topic = "c.JUC_test")
public class JUC_test {static final int THREAD_NUMBER = 2;static final int LOOP_NUMBER = 200;public static void main(String[] args) {ThreadUnsafe test = new ThreadUnsafe();for (int i = 0; i < THREAD_NUMBER; i++) {new Thread(() -> {test.method1(LOOP_NUMBER);}, "Thread" + i).start();}}
}
@Slf4j(topic = "c.ThreadUnsafe")
class ThreadUnsafe {public void method1(int loopNumber) {ArrayList<String> list = new ArrayList<>();for (int i = 0; i < loopNumber; i++) {//临界区, 会产生竞态条件method2(list);method3(list);}}private void method2(ArrayList<String> list) {list.add("1");}private void method3(ArrayList<String> list) {list.remove(0);}
}

注:method2、method3的修饰符不管是private还是public,都不会影响线程安全。

但如果局部变量的引用暴露给了外部,那么可能会引起线程安全问题。例如为 ThreadSafe 类添加子类,子类覆盖 method2 或 method3 方法:

@Slf4j(topic = "c.JUC_test")
public class JUC_test {static final int THREAD_NUMBER = 2;static final int LOOP_NUMBER = 200;public static void main(String[] args) {ThreadSafeSubClass test = new ThreadSafeSubClass();for (int i = 0; i < THREAD_NUMBER; i++) {new Thread(() -> {test.method1(LOOP_NUMBER);}, "Thread" + i).start();}}
}
@Slf4j(topic = "c.ThreadUnsafe")
class ThreadUnsafe {public void method1(int loopNumber) {ArrayList<String> list = new ArrayList<>();for (int i = 0; i < loopNumber; i++) {//临界区, 会产生竞态条件method2(list);method3(list);}}public void method2(ArrayList<String> list) {list.add("1");}public void method3(ArrayList<String> list) {list.remove(0);}
}
class ThreadSafeSubClass extends ThreadUnsafe {@Overridepublic void method3(ArrayList<String> list) {new Thread(() -> {list.remove(0);}).start();}
}

以上这段代码将method2、method3的修饰符改为了public,新建ThreadSafeSubClass子类继承ThreadUnsafe,重写了method3方法。此时可能会造成线程安全问题。

为了避免出现线程安全问题,最好用修饰符来修饰方法(如private、final),以防止子类影响到线程安全性

常见的线程安全类

String、Integer、StringBuffer、Random、Vector、Hashtable、java.util.concurrent 包下的类

这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的。也可以理解为

Hashtable table = new Hashtable();new Thread(()->{table.put("key", "value1");
}).start();new Thread(()->{table.put("key", "value2");
}).start();

Hashtable的put方法是一个整体(原子性),不会被线程的上下文切换所干扰。

注意:它们多个方法的组合不是原子的,例如下面的代码,get、put方法各自都是安全的,但组合在一起使用,还是会有线程安全问题

Hashtable table = new Hashtable();
// 线程1,线程2
if(table.get("key") == null) {table.put("key", value);
}

不可变类

String、Integer 等都是不可变类,因为其内部的状态不可以改变,因此它们的方法都是线程安全的

虽然String 有 replace,substring 等方法可以改变值,但并不是真正的改变,而是创建了一个新的字符串对象并返回。(可以查看substring()方法的源码)

注:Java中的String类加上了final修饰符,目的是防止子类覆盖String的方法导致线程不安全

实例分析

例1:servlet只有一个实例,会被tomcat的多个线程共享使用

public class MyServlet extends HttpServlet {// 是否安全?Map<String,Object> map = new HashMap<>();// 是否安全?String S1 = "...";// 是否安全?final String S2 = "...";// 是否安全?Date D1 = new Date();// 是否安全?final Date D2 = new Date();public void doGet(HttpServletRequest request, HttpServletResponse response) {// 使用上述变量}
}

例2:

public class MyServlet extends HttpServlet {// 是否安全?private UserService userService = new UserServiceImpl();public void doGet(HttpServletRequest request, HttpServletResponse response) {userService.update(...);}
}
public class UserServiceImpl implements UserService {// 记录调用次数private int count = 0;public void update() {count++;}
}

例3:Spring注入的所有对象都是单例的(除非加@Scope限定)。将start设为环绕通知里的局部变量,可以避免线程安全问题。

@Aspect
@Component
public class MyAspect {// 是否安全?private long start = 0L;@Before("execution(* *(..))")public void before() {start = System.nanoTime();}@After("execution(* *(..))")public void after() {long end = System.nanoTime();System.out.println("cost time:" + (end-start));}
}

例4:

public class MyServlet extends HttpServlet {// 是否安全private UserService userService = new UserServiceImpl();public void doGet(HttpServletRequest request, HttpServletResponse response) {userService.update(...);}
}
public class UserServiceImpl implements UserService {// 是否安全private UserDao userDao = new UserDaoImpl();public void update() {userDao.update();}
}
public class UserDaoImpl implements UserDao {public void update() {String sql = "update user set password = ? where username = ?";// 是否安全try (Connection conn = DriverManager.getConnection("","","")){// ...} catch (Exception e) {// ...}}
}

例5:connection最好设为方法内的局部变量,如果设为成员变量可能会出现线程安全问题

public class UserDaoImpl implements UserDao {// 是否安全private Connection conn = null;public void update() throws SQLException {String sql = "update user set password = ? where username = ?";conn = DriverManager.getConnection("","","");// ...conn.close();}
}

例6:这么写不会有线程安全问题,但是不推荐这种写法,而是把connection设为方法内的局部变量

public class UserServiceImpl implements UserService {public void update() {UserDao userDao = new UserDaoImpl();userDao.update();}
}
public class UserDaoImpl implements UserDao {// 是否安全private Connection = null;public void update() throws SQLException {String sql = "update user set password = ? where username = ?";conn = DriverManager.getConnection("","","");// ...conn.close();}
}

例7:

public abstract class Test {public void bar() {// 是否安全SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");foo(sdf);}public abstract foo(SimpleDateFormat sdf);public static void main(String[] args) {new Test().bar();}
}

其中 foo 的行为是不确定的,可能导致不安全的发生,被称之为外星方法

习题 - 卖票

以下代码共有两个地方有对共享变量的读写操作:ticketWindow.sell() 和 sellCount.add()。

ticketWindow.sell()有线程安全问题,因此要给sell方法加synchronized关键字。如果sellCount不是Vector,而是ArrayList,那么也会有线程安全问题。

@Slf4j(topic = "c.ExerciseSell")
public class ExerciseSell {public static void main(String[] args) {TicketWindow ticketWindow = new TicketWindow(2000);List<Thread> list = new ArrayList<>();// 用来存储买出去多少张票List<Integer> sellCount = new Vector<>();for (int i = 0; i < 2000; i++) {Thread t = new Thread(() -> {try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}// 分析这里的竞态条件int count = ticketWindow.sell(randomAmount());sellCount.add(count);});list.add(t);t.start();}list.forEach((t) -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});// 买出去的票求和log.debug("selled count:{}",sellCount.stream().mapToInt(c -> c).sum());// 剩余票数log.debug("remainder count:{}", ticketWindow.getCount());}// Random 为线程安全static Random random = new Random();// 随机 1~5public static int randomAmount() {return random.nextInt(5) + 1;}
}class TicketWindow {private int count;public TicketWindow(int count) {this.count = count;}public int getCount() {return count;}public int sell(int amount) {if (this.count >= amount) {this.count -= amount;return amount;} else {return 0;}}
}

注:可以写一个Java脚本,执行多次
在target/classes下打开cmd,输入以下命令:

for /L %n in (1, 1, 10) do java com.ljc.ExerciseSell

习题 - 转账

在transfer()方法上加synchronized关键字仍然会有线程安全问题,原因是synchronized关键字是给this对象加锁,但这里有a和b两个对象,相当于两把锁。

正确方法是添加synchronized代码块,给Account.class加锁(但这样仍有问题,同一时刻只能有两个线程转账,其他线程都得等待。后面的课程会优化)

@Slf4j(topic = "c.ExerciseTransfer")
public class ExerciseTransfer {public static void main(String[] args) throws InterruptedException {Account a = new Account(1000);Account b = new Account(1000);Thread t1 = new Thread(() -> {for (int i = 0; i < 1000; i++) {a.transfer(b, randomAmount());}}, "t1");Thread t2 = new Thread(() -> {for (int i = 0; i < 1000; i++) {b.transfer(a, randomAmount());}}, "t2");t1.start();t2.start();t1.join();t2.join();// 查看转账2000次后的总金额log.debug("total:{}",(a.getMoney() + b.getMoney()));}// Random 为线程安全static Random random = new Random();// 随机 1~100public static int randomAmount() {return random.nextInt(100) +1;}
}
class Account {private int money;public Account(int money) {this.money = money;}public int getMoney() {return money;}public void setMoney(int money) {this.money = money;}public synchronized void transfer(Account target, int amount) {if (this.money > amount) {this.setMoney(this.getMoney() - amount);target.setMoney(target.getMoney() + amount);}}
}

共享模型之管程 - synchronized原理

Java对象头

内存中的java对象都是由对象头和对象体(成员变量)组成。
在这里插入图片描述
普通对象的对象头由 Klass wordMark word 组成。
Klass word表示对象是什么类型,是一个指针,指向对象所属的class。Mark word的结构如图所示:
在这里插入图片描述
注意:32位机器中,int是4字节,但Integer在内存中是8+4字节,因为还要存储对象头

Monitor

Monitor 被翻译为监视器管程,是一个操作系统提供的锁对象,底层是C++实现的

每个 Java 对象都可以关联一个 Monitor 对象,如果使用 synchronized 给对象上锁(重量级锁)之后,该对象头的 Mark Word 中就被设置为指向 Monitor 对象的指针

Monitor 结构如下:
在这里插入图片描述
假设Thread-2执行synchronized(obj),获取到了锁。此时 obj 的 Mark word 的最后两位变成10,前面30位用来存放Monitor的地址。Monitor 的 Owner 置为 Thread-2。如果有其它线程来尝试获取锁,发现Monitor 的 Owner不为null,会进入 EntryList 被阻塞。Thread-2执行完,Monitor 的 Owner 置为null,唤醒EntryList的线程。

字节码角度分析Monitor原理:(参考课后资料原理篇的 synchronized原理)
拿到lock的引用,复制一份放到变量slot_1里;将 lock对象 MarkWord 置为 Monitor 指针;6~11行是实现count++的字节码;14行从slot_1里获取lock引用,将 lock对象 MarkWord 重置(还原之前Mark word中存储的hashcode、age等), 唤醒 EntryList。
19~24行是异常处理,一旦出现异常,立即释放锁。

轻量级锁

如果一个对象有多个线程要加锁,但加锁的时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化。轻量级锁对使用者是透明的,即语法仍然是 synchronized

假设有两个方法同步块,利用同一个对象加锁

static final Object obj = new Object();
public static void method1() {synchronized( obj ) {// 同步块 Amethod2();}
}
public static void method2() {synchronized( obj ) {// 同步块 B}
}

每个线程的栈帧创建一个锁记录对象(Lock Record),Object reference是一个指针,指向Object锁对象。
CAS:交换锁记录的内容(前30位是锁记录的地址,最后两位00表示轻量级锁)和Object的 Mark Word
在这里插入图片描述
CAS替换成功:
在这里插入图片描述
如果替换的时候发现Object的 Mark Word 已经是00,那么cas失败,有两种情况:

  • 如果是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,进入锁膨胀过程
  • 如果是自己执行了 synchronized 锁重入,那么再添加一条 Lock Record 作为重入的计数,内容为null

在这里插入图片描述
有几个锁记录对象,说明重入了几次

当退出 synchronized 代码块(解锁时)如果有取值为 null 的锁记录,说明有重入,则清除掉该条锁记录,表示重入计数减一;

当退出 synchronized 代码块(解锁时)锁记录的值不为 null,使用 cas 将 Mark Word 的值恢复给对象头

  • 成功,则解锁成功
  • 失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程

在这里插入图片描述

锁膨胀

如果在尝试加轻量级锁的过程中,CAS 操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。

当 Thread-1 进行轻量级加锁时,发现Thread-0 已经对该对象加了轻量级锁
在这里插入图片描述
这时 Thread-1 加轻量级锁失败,进入锁膨胀流程:

  • 为 Object 对象申请 Monitor 锁,让 Object 指向重量级锁地址(Mark word变成Monitor的地址)
  • 然后自己进入 Monitor 的 EntryList BLOCKED
    在这里插入图片描述
    当 Thread-0 退出同步块解锁时,使用 cas 将 Mark Word 的值恢复给对象头,失败。这时会进入重量级解锁
    流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程

自旋优化

  • 重量级锁竞争的时候,还可以使用自旋来进行优化:线程先不进入阻塞,而是循环几次尝试获取锁,如果此时该线程自旋成功,就可以避免阻塞带来的上下文切换。自旋失败则进入阻塞。

  • Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋

  • Java 7 之后不能人为控制是否开启自旋功能,而是java底层来控制的

偏向锁

轻量级锁在没有竞争时,每次发生重入,生成的锁记录仍然需要执行 CAS 操作

Java 6 中引入了偏向锁来做进一步优化:第一次加轻量级锁时,使用 CAS 将 线程 ID 设置到锁对象的 Mark Word,之后每次获取锁,只要发现这个线程 ID是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个锁对象就归该线程所有。

偏向锁细节

在这里插入图片描述

  • 如果开启了偏向锁(默认开启),那么对象创建后,markword 值为 0x05,即最后 3 位为 101,并且thread、epoch、age 都为 0

  • 偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数
    XX:BiasedLockingStartupDelay=0 来禁用延迟

  • JDK 15之后,偏向锁就被标记为废除状态需要手动开启,并且会报warning。设置 VM 参数
    -XX:+UseBiasedLocking 手动开启偏向锁

  • 如果没有开启偏向锁,那么对象创建后,markword 值为 0x01 即最后 3 位为 001,这时它的 hashcode、
    age 都为 0。第一次用到 hashcode 时才会赋值(如果没有手动调用 hashcode() 方法,hashcode 始终为0)

  • 一个可偏向的对象,调用了 hashcode() 方法之后,会撤销掉偏向锁,此时markword最后 3 位变成 001,且hashcode不再是0。

注:IDEA设置VM参数的方法如下:
在这里插入图片描述
使用 jol 第三方工具验证结论,导入依赖:

<dependency><groupId>org.openjdk.jol</groupId><artifactId>jol-core</artifactId><version>0.10</version>
</dependency>

新建JolUtils类,扩展jol

public class JolUtils {public static String toPrintableSimple(Object o) {return getHeader64Bit(o);}public static void main(String[] args) {System.out.println( getHeader64Bit(new Object()));}public static String getHeader64Bit(Object o) {VirtualMachine vm = VM.current();long word = vm.getLong(o, 0);List<String> list = new ArrayList<>(8);for (int i = 0; i < 8; i++) {list.add(toBinary((word >> i * 8) & 0xFF) );}Collections.reverse(list);return String.join(" ",list);}private static String toBinary(long x) {StringBuilder s = new StringBuilder(Long.toBinaryString(x));int deficit = 8 - s.length();for (int c = 0; c < deficit; c++) {s.insert(0, "0");}return s.toString();}
}

新建TestBiased类:

@Slf4j(topic = "c.TestBiased")
public class TestBiased {// 手动开启偏向锁 -XX:+UseBiasedLocking// 关闭偏向锁延迟 -XX:BiasedLockingStartupDelay=0public static void main(String[] args) throws IOException, InterruptedException {Dog d = new Dog();System.out.println(JolUtils.toPrintableSimple(d));new Thread(() -> {log.debug("synchronized 前");System.out.println(JolUtils.toPrintableSimple(d));synchronized (d) {log.debug("synchronized 中");//前54位是操作系统设置的线程IDSystem.out.println(JolUtils.toPrintableSimple(d));}log.debug("synchronized 后");System.out.println(JolUtils.toPrintableSimple(d));}, "t1").start();}
}
class Dog {}

偏向锁撤销

有三种情况会导致偏向锁被撤销:调用hashcode()、其他线程使用偏向锁对象、调用wait/notify

情况一:调用了对象的 hashCode。偏向锁的对象 MarkWord 中存储的是线程 id,如果调用 hashCode 会导致偏向锁被撤销。轻量级锁会在锁记录中记录 hashCode;重量级锁会在 Monitor 中记录 hashCode

情况二:当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁

@Slf4j(topic = "c.TestBiased")
public class TestBiased {public static void main(String[] args) throws IOException, InterruptedException {Dog d = new Dog();Thread t1 = new Thread(() -> {log.debug(JolUtils.toPrintableSimple(d));synchronized (d) {log.debug(JolUtils.toPrintableSimple(d));}log.debug(JolUtils.toPrintableSimple(d));synchronized (TestBiased.class) {TestBiased.class.notify();}}, "t1");t1.start();//等待t1的代码执行完并释放锁,t2才执行Thread t2 = new Thread(() -> {synchronized (TestBiased.class) {try {TestBiased.class.wait();} catch (InterruptedException e) {e.printStackTrace();}}log.debug(JolUtils.toPrintableSimple(d));synchronized (d) {log.debug(JolUtils.toPrintableSimple(d));}log.debug(JolUtils.toPrintableSimple(d));}, "t2");t2.start();}
}
class Dog {}

情况三:调用 wait/notify 也会撤销偏向锁,因为调用 wait/notify 会把对象升级成重量级锁

@Slf4j(topic = "c.TestBiased")
public class TestBiased {public static void main(String[] args) throws IOException, InterruptedException {Dog d = new Dog();Thread t1 = new Thread(() -> {log.debug(JolUtils.toPrintableSimple(d));synchronized (d) {log.debug(JolUtils.toPrintableSimple(d));try {d.wait();} catch (InterruptedException e) {e.printStackTrace();}log.debug(JolUtils.toPrintableSimple(d));}}, "t1");t1.start();new Thread(() -> {try {Thread.sleep(6000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (d) {log.debug("notify");d.notify();}}, "t2").start();}
}
class Dog {}

批量重偏向

撤销偏向锁会带来性能损耗。当撤销偏向锁 阈值超过 20 次后,jvm 会觉得,我是不是偏向错了呢,于是会在给这些对象加锁时重新偏向至加锁线程

@Slf4j(topic = "c.TestBiased")
public class TestBiased {public static void main(String[] args) throws IOException, InterruptedException {Vector<Dog> list = new Vector<>();Thread t1 = new Thread(() -> {for (int i = 0; i < 30; i++) {Dog d = new Dog();list.add(d);synchronized (d) {log.debug(i + "\t" + JolUtils.toPrintableSimple(d));}}synchronized (list) {list.notify();}}, "t1");t1.start();Thread t2 = new Thread(() -> {synchronized (list) {try {list.wait();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("===============> ");for (int i = 0; i < 30; i++) {Dog d = list.get(i);log.debug(i + "\t" + JolUtils.toPrintableSimple(d));synchronized (d) {log.debug(i + "\t" + JolUtils.toPrintableSimple(d));}log.debug(i + "\t" + JolUtils.toPrintableSimple(d));}}, "t2");t2.start();}
}
class Dog {}

T1先循环30次,给对象加偏向锁,然后T2开始执行。T2的前20轮循环加的是轻量级锁,第21轮以及之后加的都是偏向锁。

批量撤销

当撤销偏向锁 阈值超过 40 次 后,jvm 会觉得自己确实偏向错了,根本就不该偏向。于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的。

@Slf4j(topic = "c.TestBiased")
public class TestBiased {static Thread t1,t2,t3;public static void main(String[] args) throws IOException, InterruptedException {Vector<Dog> list = new Vector<>();int loopNumber = 40;t1 = new Thread(() -> {for (int i = 0; i < loopNumber; i++) {Dog d = new Dog();list.add(d);synchronized (d) {log.debug(i + "\t" + JolUtils.toPrintableSimple(d));}}LockSupport.unpark(t2);}, "t1");t1.start();t2 = new Thread(() -> {LockSupport.park();log.debug("===============> ");for (int i = 0; i < loopNumber; i++) {Dog d = list.get(i);log.debug(i + "\t" + JolUtils.toPrintableSimple(d));synchronized (d) {log.debug(i + "\t" + JolUtils.toPrintableSimple(d));}log.debug(i + "\t" + JolUtils.toPrintableSimple(d));}LockSupport.unpark(t3);}, "t2");t2.start();t3 = new Thread(() -> {LockSupport.park();log.debug("===============> ");for (int i = 0; i < loopNumber; i++) {Dog d = list.get(i);log.debug(i + "\t" + JolUtils.toPrintableSimple(d));synchronized (d) {log.debug(i + "\t" + JolUtils.toPrintableSimple(d));}log.debug(i + "\t" + JolUtils.toPrintableSimple(d));}}, "t3");t3.start();t3.join();log.debug(JolUtils.toPrintableSimple(new Dog())); //新建的类也是不可偏向的,末尾是001}
}
class Dog {}

锁消除

即时编译器(JIT)会进一步优化字节码。如果发现某个对象没有被共享,即使加了锁,JIT也会消除锁。

-XX:-EliminateLocks 禁用锁消除

共享模型之管程 - wait / notify

wait / notify原理

在这里插入图片描述

  • Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为WAITING 或 TIME WAITING状态

  • BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片

  • BLOCKED 线程会在 Owner 线程释放锁时唤醒;
    WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味着立刻获得锁,仍需进入 EntryList 重新竞争

相关api

obj.wait():让进入 object 监视器的线程到 waitSet 等待
obj.wait(long n):有时间限制的等待
obj.notify():在 object 上正在 waitSet 等待的线程中挑一个唤醒
obj.notifyAll():让 object 上正在 waitSet 等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法

@Slf4j(topic = "c.JUC_test")
public class JUC_test {private static final Object lock = new Object();public static void main(String[] args) throws InterruptedException {new Thread(() -> {synchronized (lock) {try {obj.wait();log.debug("t1...");} catch (InterruptedException e) {throw new RuntimeException(e);}}}, "t1").start();new Thread(() -> {synchronized (lock) {try {obj.wait();log.debug("t2...");} catch (InterruptedException e) {throw new RuntimeException(e);}}}, "t2").start();Thread.sleep(3000);synchronized (lock) {obj.notifyAll();}}
}

注:给lock加上final关键字,可以防止改变lock的引用,导致锁住不同的对象。

使用wait / notify的正确姿势

sleep(long n) 和 wait(long n) 的相同点:都是TIMED_WAITING状态

sleep(long n) 和 wait(long n) 的区别:

  • sleep 是 Thread 方法,而 wait 是 Object 的方法
  • wait 需要和 synchronized 一起用,而 sleep 不需要
  • sleep 在睡眠的同时,不会释放对象锁,但 wait 在等待的时候会释放对象锁

虚假唤醒:notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程。
解决方法是 notifyAll,并且 配合while循环,让错误唤醒的线程继续wait。

@Slf4j(topic = "c.JUC_test")
public class JUC_test {static final Object room = new Object();static boolean hasCigarette = false;static boolean hasTakeout = false;public static void main(String[] args) throws InterruptedException {new Thread(() -> {synchronized (room) {log.debug("有烟没?[{}]", hasCigarette);while (!hasCigarette) {log.debug("没烟,先歇会!");try {room.wait();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("有烟没?[{}]", hasCigarette);if (hasCigarette) {log.debug("可以开始干活了");} else {log.debug("没干成活...");}}}, "小南").start();new Thread(() -> {synchronized (room) {log.debug("外卖送到没?[{}]", hasTakeout);while(!hasTakeout) {log.debug("没外卖,先歇会!");try {room.wait();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("外卖送到没?[{}]", hasTakeout);if (hasTakeout) {log.debug("可以开始干活了");} else {log.debug("没干成活...");}}}, "小女").start();Thread.sleep(1000);new Thread(() -> {synchronized (room) {hasTakeout = true;log.debug("外卖到了噢!");//room.notify();room.notifyAll();}}, "送外卖的").start();}
}

总结:正确使用wait / notify

synchronized(lock) {while(条件不成立) {lock.wait();}// 干活
}
//另一个线程
synchronized(lock) {lock.notifyAll();
}

共享模型之管程 - 保护性暂停

保护性暂停

保护性暂停(Guarded Suspension),用在一个线程等待另一个线程的执行结果

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程,那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

在这里插入图片描述

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws InterruptedException {GuardedObject guardedObject = new GuardedObject();new Thread(() -> {Object response = guardedObject.get();log.debug("接收到response为{}", response);}, "t1").start();new Thread(() -> {log.debug("正在下载中...");try {Thread.sleep(3000);Integer response = 2025;guardedObject.set(response);log.debug("下载完成...");} catch (InterruptedException e) {throw new RuntimeException(e);}}, "t2").start();}
}
class GuardedObject {private Object response;public Object get() {synchronized (this) {while (response == null) {try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}return response;}}public void set(Object response) {synchronized (this) {this.response = response;this.notifyAll();}}
}

实现超时等待

注意:wait() 里的实参,不能为 timeout,因为如果发生了虚假唤醒,那么会再一次等待 timeout。正确方法是每一轮循环应该记录一个剩余时间 waitTime,wait() 里传的是waitTime。

public Object get(long timeout) {synchronized (this) {long beginTime = System.currentTimeMillis();long passTime = 0; //经过了多长时间while (response == null) {long waitTime = timeout - passTime; //每一轮的剩余时间if(waitTime <= 0) {break;}try {this.wait(waitTime); //每一轮等待剩余时间} catch (InterruptedException e) {throw new RuntimeException(e);}passTime = System.currentTimeMillis() - beginTime;}return response;}
}

join 原理

保护性暂停是一个线程等待另一个线程的结果,join 是一个线程等待另一个线程的结束。

join 源码

public final synchronized void join(long millis)throws InterruptedException {long base = System.currentTimeMillis();long now = 0;if (millis < 0) {throw new IllegalArgumentException("timeout value is negative");}if (millis == 0) { //如果时执行的join(0)while (isAlive()) { //如果线程是运行状态,就会执行下面的等待wait(0);               }} else { //如果是执行的join(time)while (isAlive()) { //如果线程时运行状态long delay = millis - now;    if (delay <= 0) {break;}wait(delay); //等待delay时间后自动返回继续执行now = System.currentTimeMillis() - base;}}
}

解耦实现多任务版 GuardedObject

图中 Futures 相当于居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 相当于等待邮件的居民,右侧的 t1,t3,t5 相当于邮递员

如果需要在多个类之间使用 GuardedObject 对象,将 GuardedObject 对象作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理(即多个 GuardedObject 对象的管理)
在这里插入图片描述
这种设计的特点是:有几个居民,就有几个邮递员,他们是一一对应的

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws InterruptedException {for(int i = 0; i < 3; i ++) {new People().start();}Thread.sleep(1000);//循环所有的idfor (Integer id : Mailboxes.getIds()) {new Postman(id, "内容为:" + id).start();}}
}
@Slf4j(topic = "c.People")
class People extends Thread {@Overridepublic void run() {GuardedObject guardedObject = Mailboxes.createGuardedObject();log.debug("开始收信 id:{}", guardedObject.getId());Object mail = guardedObject.get(5000);//等待5slog.debug("收到信 id:{},内容:{}", guardedObject.getId(), mail);}
}
@Slf4j(topic = "c.Postman")
class Postman extends Thread {private int id;private String mail; //送信的内容public Postman(int id, String mail) {this.id = id;this.mail = mail;}@Overridepublic void run() {GuardedObject guardedObject = Mailboxes.getGuardedObject(id);guardedObject.set(mail);log.debug("送信 id:{},内容:{}", guardedObject.getId(), mail);}
}//中间类,用于解耦,具有通用性
class Mailboxes {private static Map<Integer, GuardedObject> boxes = new Hashtable<>();private static int id = 1;//产生唯一ID,要加上synchronized保证线程安全private static synchronized int generateId() {return id ++;}public static GuardedObject createGuardedObject() {GuardedObject go = new GuardedObject(generateId());boxes.put(go.getId(), go);return go;}public static GuardedObject getGuardedObject(int id) {return boxes.remove(id); //获得value的同时从集合中删除}//获得所有的keypublic static Set<Integer> getIds() {return boxes.keySet();}
}
class GuardedObject {private int id;public GuardedObject(int id) {this.id = id;}public int getId() {return id;}private Object response;public Object get(long timeout) {synchronized (this) {long beginTime = System.currentTimeMillis();long passTime = 0; //经过了多长时间while (response == null) {long waitTime = timeout - passTime; //每一轮的剩余时间if(waitTime <= 0) {break;}try {this.wait(waitTime); //每一轮等待剩余时间} catch (InterruptedException e) {throw new RuntimeException(e);}passTime = System.currentTimeMillis() - beginTime;}return response;}}public void set(Object response) {synchronized (this) {this.response = response;this.notifyAll();}}
}

共享模型之管程 - 生产者消费者

生产者消费者

在这里插入图片描述
保护性暂停是同步模式,生产者生产之后消费者可以立即拿到结果。而生产者消费者是异步模式,生产者生产的数据在队列中,不一定立即被消费者拿到。

与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应

消费队列可以用来平衡生产和消费的线程资源;生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据

消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据

JDK 中各种阻塞队列,采用的就是这种模式

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws InterruptedException {MessageQueue messageQueue = new MessageQueue(2);for(int i = 0; i < 5; i ++) {int id = i; //lambda表达式里引用变量new Thread(() -> {messageQueue.put(new Message(id, "值" + id));}, "生产者" + i).start();}new Thread(() -> {while(true) {try {Thread.sleep(1000);Message message = messageQueue.get();} catch (InterruptedException e) {throw new RuntimeException(e);}}}, "消费者").start();}
}//消息队列类,用于线程间的通信
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {//消息队列集合,双向队列LinkedListprivate LinkedList<Message> list = new LinkedList<>();//消息队列容量private int capacity;public MessageQueue(int capacity) {this.capacity = capacity;}//获取消息public Message get() {synchronized (list) {//检查队列是否为空while (list.isEmpty()) {try {log.debug("队列为空,消费者线程等待...");list.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}Message message = list.removeFirst(); //从队列头部获取消息log.debug("消费者取出消息,message为{}", message);list.notifyAll();return message;}}//存入消息public void put(Message message) {synchronized (list) {//检查队列是否为满while (list.size() == capacity) {try {log.debug("队列已满,生产者线程等待...");list.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}list.add(message); //将消息存入队列尾部log.debug("生产者{}已生产消息,message为{}", message.getId(), message);list.notifyAll();}}
}final class Message {private int id;private Object value;public Message(int id, Object value) {this.id = id;this.value = value;}public int getId() {return id;}public Object getValue() {return value;}@Overridepublic String toString() {return "Message{" +"id=" + id +", value=" + value +'}';}
}

注:lambda表达式里引用的外部变量必须是final的

park & unpark

LockSupport.park(); //暂停当前线程
LockSupport.unpark(暂停线程对象); //恢复某个线程的运行

park()之后线程处于WAITING状态;可以先unpark再park

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() -> {try {log.debug("start...");Thread.sleep(1000);log.debug("park...");LockSupport.park();log.debug("resume...");} catch (InterruptedException e) {throw new RuntimeException(e);}}, "t1");t1.start();Thread.sleep(5000);LockSupport.unpark(t1);}
}

和 wait / notify 比较:

  • wait、notify、notifyAll 必须配合 Object Monitor 一起使用,而 park & unpark 不需要
  • park & unpark 是以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么精确
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

park & unpark 底层原理
每个线程都有自己的一个 Parker 对象(由底层实现,java层面看不到),由三部分组成 _counter,_cond 和 _mutex

线程就像一个旅人,Parker 就像他随身携带的背包,条件变量_cond就好比背包中的帐篷。_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)

  • 调用 park 就是要看需不需要停下来歇息:
    如果备用干粮耗尽,那么钻进帐篷歇息
    如果备用干粮充足,那么不需停留,继续前进

  • 调用 unpark,就好比补充干粮
    如果这时线程还在帐篷,就唤醒让他继续前进
    如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进

因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮

假设先park再unpark:
在这里插入图片描述
当前线程调用 Unsafe.park() 方法,检查_counter=0;这时,获得 _mutex 互斥锁,线程进入 _cond 条件变量阻塞;设置 _counter = 0
在这里插入图片描述
调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1;唤醒 _cond 条件变量中的 Thread_0;Thread_0 恢复运行;设置 _counter 为 0

假设先unpark再park:
在这里插入图片描述
调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1;当前线程调用 Unsafe.park() 方法,检查 _counter=1,这时线程无需阻塞,继续运行;设置 _counter 为 0

线程状态转换

请添加图片描述
参考课后文档,假设有线程 t

  • 情况 1 NEW --> RUNNABLE
    当调用 t.start() 方法时,由 NEW --> RUNNABLE

  • 情况 2 RUNNABLE <–> WAITING
    调用 obj.wait() 方法时,t 线程从 RUNNABLE --> WAITING
    调用 obj.notify(),obj.notifyAll(),t.interrupt() 时,t 线程进入entryList 变成BLOCKED

  • 情况 3 RUNNABLE <–> WAITING
    当前线程调用 t.join() 方法时,当前线程从 RUNNABLE --> WAITING
    注意是当前线程在 t 线程对象的监视器上等待
    t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING --> RUNNABLE

  • 情况 4 RUNNABLE <–> WAITING
    当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE --> WAITING
    调用 LockSupport.unpark(目标线程) 或调用了线程的 interrupt() ,会让目标线程从 WAITING -->RUNNABLE

  • 情况 5 RUNNABLE <–> TIMED_WAITING
    调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE --> TIMED_WAITING
    当 t 线程等待时间超过了 n 毫秒,或调用 obj.notify(),obj.notifyAll(),t.interrupt() 时,t 线程进入entryList 变成BLOCKED

  • 情况 6 RUNNABLE <–> TIMED_WAITING
    当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE --> TIMED_WAITING
    注意是当前线程在t 线程对象的监视器上等待
    当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING --> RUNNABLE

  • 情况 7 RUNNABLE <–> TIMED_WAITING
    当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE --> TIMED_WAITING
    当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING --> RUNNABLE

  • 情况 8 RUNNABLE <–> TIMED_WAITING
    当前线程调用 LockSupport.parkNanos(long nanos)LockSupport.parkUntil(long millis) 时,当前线程从 RUNNABLE --> TIMED_WAITING
    调用 LockSupport.unpark(目标线程) 或调用了线程的 interrupt() ,或是等待超时,会让目标线程从 TIMED_WAITING–> RUNNABLE

  • 情况 9 RUNNABLE <–> BLOCKED
    t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE --> BLOCKED
    持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争成功,从 BLOCKED --> RUNNABLE ,其它失败的线程仍然 BLOCKED

  • 情况 10 RUNNABLE <–> TERMINATED
    当前线程所有代码运行完毕,进入 TERMINATED

多把锁

有些情况下只用一把锁会导致并发度很低,例如线程1进入房间睡觉的时候,线程2不能进入房间学习(前提是睡觉和学习不会互相影响)

class BigRoom {public void sleep() {synchronized (this) {log.debug("sleeping 2 小时");Sleeper.sleep(2);}}public void study() {synchronized (this) {log.debug("study 1 小时");Sleeper.sleep(1);}}
}

解决办法是使用多把锁(细粒度的锁)

class BigRoom {private final Object studyRoom = new Object();private final Object bedRoom = new Object();public void sleep() {synchronized (bedRoom) {log.debug("sleeping 2 小时");Sleeper.sleep(2);}}public void study() {synchronized (studyRoom) {log.debug("study 1 小时");Sleeper.sleep(1);}}
}

将锁的粒度细分可以增强并发度,但是如果一个线程需要同时获得多把锁,就容易发生死锁

共享模型之管程 - 活跃性

注:死锁和饥饿可以由 ReentrantLock 解决

死锁

t1 线程 获得 A对象 锁,接下来想获取 B对象 的锁;
t2 线程 获得 B对象 锁,接下来想获取 A对象 的锁。

Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {synchronized (A) {log.debug("lock A");sleep(1);synchronized (B) {log.debug("lock B");log.debug("操作...");}}
}, "t1");
Thread t2 = new Thread(() -> {synchronized (B) {log.debug("lock B");sleep(0.5);synchronized (A) {log.debug("lock A");log.debug("操作...");}}
}, "t2");
t1.start();
t2.start();

检测死锁可以使用 jconsole工具,或者使用 jps命令 定位进程 id,再用 jstack 进程id定位死锁。

例子:哲学家进餐
有五位哲学家,围坐在圆桌旁。吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。如果每个哲学家都拿起一双筷子,就会导致死锁。

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws InterruptedException {Chopstick c1 = new Chopstick("1");Chopstick c2 = new Chopstick("2");Chopstick c3 = new Chopstick("3");Chopstick c4 = new Chopstick("4");Chopstick c5 = new Chopstick("5");new Philosopher("苏格拉底", c1, c2).start();new Philosopher("柏拉图", c2, c3).start();new Philosopher("亚里士多德", c3, c4).start();new Philosopher("赫拉克利特", c4, c5).start();new Philosopher("阿基米德", c5, c1).start();}
}@Slf4j(topic = "c.Philosopher")
class Philosopher extends Thread {Chopstick left;Chopstick right;public Philosopher(String name, Chopstick left, Chopstick right) {super(name);this.left = left;this.right = right;}private void eat() throws InterruptedException {log.debug("eating...");Thread.sleep(1000);}@Overridepublic void run() {while (true) {// 获得左手筷子synchronized (left) {// 获得右手筷子synchronized (right) {// 吃饭try {eat();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}}
}
class Chopstick {String name;public Chopstick(String name) {this.name = name;}@Overridepublic String toString() {return "筷子{" + name + '}';}
}

解决办法:ReentrantLock

活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如

@Slf4j(topic = "c.JUC_test")
public class JUC_test {static volatile int count = 10;static final Object lock = new Object();public static void main(String[] args) throws InterruptedException {new Thread(() -> {// 期望减到 0 退出循环while (count > 0) {try {Thread.sleep(200);} catch (InterruptedException e) {throw new RuntimeException(e);}count--;log.debug("count: {}", count);}}, "t1").start();new Thread(() -> {// 期望超过 20 退出循环while (count < 20) {try {Thread.sleep(200);} catch (InterruptedException e) {throw new RuntimeException(e);}count++;log.debug("count: {}", count);}}, "t2").start();}
}

解决办法:让每个线程的执行时间交错开(随机sleep时间)

饥饿

饥饿定义为,一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束。

使用顺序加锁的方式解决之前的死锁问题:每个线程只能先尝试获取对象A的锁,然后才能尝试获取对象B的锁。但这种情况会导致饥饿。
在这里插入图片描述
例如修改哲学家问题的代码:每个哲学家必须按照c1、c2 … c5的顺序拿筷子,将第五个哲学家从c5, c1改成c1, c5,就可以消除死锁,但这样会导致饥饿。

共享模型之管程 - ReentrantLock

ReentrantLock

ReentrantLock是JUC包下的一个类,相对于synchronized,它具备如下特点:

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量
  • 与 synchronized 一样,都支持可重入

基本语法:加锁的代码放在try里面也可以

// 获取锁
reentrantLock.lock();
try {// 临界区
} finally {// 释放锁reentrantLock.unlock();
}

可重入

可重入是指同一个线程如果首次获得了这把锁,那么由于它是这把锁的拥有者,有权利再次获取这把锁

如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住

@Slf4j(topic = "c.JUC_test")
public class JUC_test {private static ReentrantLock lock = new ReentrantLock();public static void main(String[] args) {lock.lock();try {log.debug("enter main");m1();} finally {lock.unlock();}}public static void m1() {lock.lock();try {log.debug("enter m1");m2();} finally {lock.unlock();}}public static void m2() {lock.lock();try {log.debug("enter m2");} finally {lock.unlock();}}
}

可打断

lock.lock() 创建的锁是不可打断的
lock.lockInterruptibly() 如果没有竞争,则可以正常获取锁;如果有竞争进入了阻塞队列,此时可以被其他线程用 interrupt 方法打断,防止无限等待下去。

@Slf4j(topic = "c.JUC_test")
public class JUC_test {private static ReentrantLock lock = new ReentrantLock();public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() -> {try {log.debug("尝试获取锁");lock.lockInterruptibly();} catch (InterruptedException e) {log.debug("没有获取到锁,返回");e.printStackTrace();return;}try {log.debug("获取锁成功");} finally {lock.unlock();}});lock.lock(); //主线程先获取锁,再启动t1线程t1.start();Thread.sleep(3000);log.debug("打断t1");t1.interrupt();}
}

锁超时

lock.tryLock() 如果不传参,获取不到锁会立即返回false;如果传参timeout,会在timeout时间内不断尝试获取锁,直至超时后返回false。

tryLock() 可以被打断,被打断之后要return,更严谨

@Slf4j(topic = "c.JUC_test")
public class JUC_test {private static ReentrantLock lock = new ReentrantLock();public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() -> {log.debug("尝试获取锁");try {if(!lock.tryLock(5, TimeUnit.SECONDS)) {log.debug("获取锁失败");return;}} catch (InterruptedException e) {e.printStackTrace();log.debug("获取锁失败");return; //由于tryLock可以被打断,打断后也要return}try {log.debug("获取锁成功");} finally {lock.unlock();}});lock.lock();log.debug("主线程获取到锁");t1.start();Thread.sleep(3000);lock.unlock();log.debug("主线程释放锁");}
}

锁超时解决哲学家就餐问题:Chopstick继承可重入锁,如果获取不到右手筷子,就将左手筷子释放掉

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws InterruptedException {Chopstick c1 = new Chopstick("1");Chopstick c2 = new Chopstick("2");Chopstick c3 = new Chopstick("3");Chopstick c4 = new Chopstick("4");Chopstick c5 = new Chopstick("5");new Philosopher("苏格拉底", c1, c2).start();new Philosopher("柏拉图", c2, c3).start();new Philosopher("亚里士多德", c3, c4).start();new Philosopher("赫拉克利特", c4, c5).start();new Philosopher("阿基米德", c5, c1).start();}
}@Slf4j(topic = "c.Philosopher")
class Philosopher extends Thread {Chopstick left;Chopstick right;public Philosopher(String name, Chopstick left, Chopstick right) {super(name);this.left = left;this.right = right;}private void eat() throws InterruptedException {log.debug("eating...");Thread.sleep(1000);}@Overridepublic void run() {while (true) {// 获得左手筷子if(left.tryLock()) {try {// 获得右手筷子if(right.tryLock()) {try {// 吃饭eat();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {right.unlock();}}} finally {left.unlock();}}}}
}
class Chopstick extends ReentrantLock {String name;public Chopstick(String name) {this.name = name;}@Overridepublic String toString() {return "筷子{" + name + '}';}
}

公平锁

ReentrantLock 默认是不公平的,即每个线程获取锁的优先级一样,不会按照先来先到的顺序获取锁。设置为公平锁后,先进入阻塞队列的线程优先获取锁。

设置为公平锁:

ReentrantLock lock = new ReentrantLock(true);

公平锁一般没有必要,会降低并发度。

条件变量

synchronized 中也有条件变量,即 waitSet ,当条件不满足时进入 waitSet 休息室等待。但 synchronized 只有一个休息室。

ReentrantLock 支持多个条件变量,这就好比 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒。

@Slf4j(topic = "c.JUC_test")
public class JUC_test {private static ReentrantLock lock = new ReentrantLock();public static void main(String[] args) throws InterruptedException {Condition condition1 = lock.newCondition();Thread t1 = new Thread(() -> {lock.lock();try {condition1.await();log.debug("t1休息完毕");} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}});t1.start();Thread.sleep(3000);lock.lock();try {log.debug("主线程获取锁");condition1.signal();} finally {lock.unlock();}}
}

注意:
await 前需要获得锁
await 执行后,会释放锁,进入 conditionObject 等待
await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
竞争 lock 锁成功后,从 await 后继续执行

例子:

@Slf4j(topic = "c.JUC_test")
public class JUC_test {static boolean hasCigarette = false;static boolean hasTakeout = false;static ReentrantLock room = new ReentrantLock();static Condition waitCigaretteSet = room.newCondition();static Condition waitTakeoutSet = room.newCondition();public static void main(String[] args) throws InterruptedException {new Thread(() -> {room.lock();try {log.debug("有烟没?[{}]", hasCigarette);while (!hasCigarette) {log.debug("没烟,先歇会!");try {waitCigaretteSet.await();} catch (InterruptedException e) {e.printStackTrace();}log.debug("可以开始干活了");}} finally {room.unlock();}}, "小南").start();new Thread(() -> {room.lock();try {log.debug("外卖送到没?[{}]", hasTakeout);while(!hasTakeout) {log.debug("没外卖,先歇会!");try {waitTakeoutSet.await();} catch (InterruptedException e) {e.printStackTrace();}log.debug("可以开始干活了");}} finally {room.unlock();}}, "小女").start();Thread.sleep(3000);new Thread(() -> {room.lock();try {hasTakeout = true;log.debug("外卖到了噢!");waitTakeoutSet.signal();} finally {room.unlock();}}, "送外卖的").start();new Thread(() -> {room.lock();try {hasCigarette = true;log.debug("烟到了噢!");waitCigaretteSet.signal();} finally {room.unlock();}}, "送烟的").start();}
}

共享模型之管程 - 同步模式之顺序控制

固定运行顺序

有时候需要控制线程的先后顺序,例如 先 2 后 1

  1. wait / notify 版
@Slf4j(topic = "c.JUC_test")
public class JUC_test {static final Object lock = new Object();static boolean t2_runned = false;public static void main(String[] args) {Thread t1 = new Thread(() -> {synchronized (lock) {while (!t2_runned) {try {lock.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}}log.debug("t1");}, "t1");Thread t2 = new Thread(() -> {synchronized (lock) {log.debug("t2");t2_runned = true;lock.notify();}}, "t2");t1.start();t2.start();}
}

使用 ReentrantLock 的 await / signal 也是可以的

  1. park & unpark版
@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) {Thread t1 = new Thread(() -> {LockSupport.park();log.debug("t1");}, "t1");Thread t2 = new Thread(() -> {log.debug("t2");LockSupport.unpark(t1);}, "t2");t1.start();t2.start();}
}

这种方法更为简单

交替输出

线程 1 输出 a 3 次,线程 2 输出 b 3 次,线程 3 输出 c 3 次。现在要求输出 abcabcabc 怎么实现

  1. wait / notify 版:
    使用整数标记 flag 判断是否轮到自己执行;nextFlag 指定下一个执行的线程
@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) {WaitNotify wn = new WaitNotify(1, 3);new Thread(() -> {wn.print("a", 1, 2);}).start();new Thread(() -> {wn.print("b", 2, 3);}).start();new Thread(() -> {wn.print("c", 3, 1);}).start();}
}
class WaitNotify {//等待标记private int flag; //共3个值:1、2、3//循环次数private int loopNumber;public WaitNotify(int flag, int loopNumber) {this.flag = flag;this.loopNumber = loopNumber;}//打印信息public void print(String str, int waitFlag, int nextFlag) {for(int i = 0; i < loopNumber; i ++) {synchronized (this) {while(waitFlag != flag) {try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.print(str);flag = nextFlag;this.notifyAll();}}}
}
  1. await / signal 版:
    设定多个条件变量(休息室),每个线程进入自己的休息室
@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws InterruptedException {AwaitSignal as = new AwaitSignal(3);Condition a = as.newCondition();Condition b = as.newCondition();Condition c = as.newCondition();new Thread(() -> {as.print("a", a, b);}).start();new Thread(() -> {as.print("b", b, c);}).start();new Thread(() -> {as.print("c", c, a);}).start();Thread.sleep(1000);as.lock();try {a.signal();System.out.println("开始...");} finally {as.unlock();}}
}
class AwaitSignal extends ReentrantLock {//打印信息public void print(String str, Condition current, Condition next) {for(int i = 0; i < loopNumber; i ++) {lock();try {current.await();System.out.print(str);next.signal();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {unlock();}}}//循环次数private int loopNumber;public AwaitSignal(int loopNumber) {this.loopNumber = loopNumber;}
}
  1. park & unpark 版:实现起来最简单
@Slf4j(topic = "c.JUC_test")
public class JUC_test {static Thread t1;static Thread t2;static Thread t3;public static void main(String[] args) throws InterruptedException {ParkUnpark parkUnpark = new ParkUnpark(3);t1 = new Thread(() -> {parkUnpark.print("a", t2);});t2 = new Thread(() -> {parkUnpark.print("b", t3);});t3 = new Thread(() -> {parkUnpark.print("c", t1);});t1.start();t2.start();t3.start();Thread.sleep(1000);LockSupport.unpark(t1);}
}
class ParkUnpark {//打印信息public void print(String str, Thread next) {for(int i = 0; i < loopNumber; i ++) {LockSupport.park();System.out.print(str);LockSupport.unpark(next);}}//循环次数private int loopNumber;public ParkUnpark(int loopNumber) {this.loopNumber = loopNumber;}
}

共享模型之JMM

Java 内存模型

JMM 即 Java Memory Model,从java层面定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。主存是所有线程共享的空间(静态变量、成员变量),工作内存是每个线程私有的空间(局部变量)

JMM 体现在以下几个方面:

  • 原子性 - 保证指令不会受到线程上下文切换的影响
  • 可见性 - 保证指令不会受 cpu 缓存的影响
  • 有序性 - 保证指令不会受 cpu 指令并行优化的影响

可见性

退不出的循环
先来看一个现象,main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止:

static boolean run = true;public static void main(String[] args) throws InterruptedException {Thread t = new Thread(()->{while(run){// ....}});t.start();sleep(1000);run = false; // 线程t不会如预想的停下来
}

分析原因:
因为 t 线程要频繁从主存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存里的高速缓存中,减少对主存中 run 的访问,提高效率。1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存读取这个变量的值,结果永远是旧值
在这里插入图片描述
解决方法:给 run 变量加 volatile 关键字。

volatile 可以修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取变量的值,线程操作 volatile 变量都是直接操作主存。因此,如果遇到两个线程操作同一个共享变量,一个读一个修改,就要用到volatile

synchronized 也可以保证共享变量的可见性,但是synchronized要创建monitor,是重量级操作,性能差

static boolean run = true;
final static Object lock = new Object();public static void main(String[] args) throws InterruptedException {Thread t = new Thread(()->{while(true){synchronized (lock) {if(!run) {break;}}}});t.start();sleep(1000);synchronized (lock) {run = false; // 线程t不会如预想的停下来}
}

注意:volatile 只能保证可见性,即读取到的变量是最新的,但是不能保证原子性,在指令交错的情况下仍要加锁处理

如果不用 volatile,在while死循环里加一个System.out.println()也可以获取到最新的run变量,原因是println()的底层加了锁

两阶段终止模式 - volatile

设一个stop变量来控制监控线程monitor的运行,stop为true则停止监控线程

public class JUC_test {public static void main(String[] args) throws InterruptedException {TwoPhaseTermination tpt = new TwoPhaseTermination();tpt.start();Thread.sleep(8000);tpt.stop();}
}
@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination {private Thread monitor;private volatile boolean stop = false; //停止标记//启动监控线程public void start() {monitor = new Thread(() -> {while (true) {if(stop) {//料理后事log.debug("料理后事...");break;}try {Thread.sleep(2000);log.debug("执行监控记录...");} catch (InterruptedException e) {}}}, "monitor");monitor.start();}//停止监控线程public void stop() {stop = true;monitor.interrupt(); //可以打断睡眠中的线程,立即停止}
}

停止监控线程的代码里最好加上monitor.interrupt(),否则要等到monitor线程睡眠结束,才能停止

犹豫模式(balking)

Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回

保证某个方法只被执行一次,下次再次执行就直接返回(例如监控线程只需要启动一个)

public class JUC_test {public static void main(String[] args) throws InterruptedException {TwoPhaseTermination tpt = new TwoPhaseTermination();tpt.start();Thread.sleep(8000);tpt.stop();}
}
@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination {//监控线程private Thread monitor;//停止标记private volatile boolean stop = false;//判断start方法是否执行过private boolean starting = false;//启动监控线程public void start() {synchronized (this) {if(starting) { //犹豫模式return;}starting = true;}monitor = new Thread(() -> {while (true) {if(stop) {//料理后事log.debug("料理后事...");break;}try {Thread.sleep(2000);log.debug("执行监控记录...");} catch (InterruptedException e) { //睡眠中被打断}}}, "monitor");monitor.start();}//停止监控线程public void stop() {stop = true;monitor.interrupt();}
}

同步代码块里的代码越少越好,有助于提升性能

犹豫模式应用

用springboot实现,可以多次开启和关闭监控线程(完整代码看课后资料)

@Service
@Slf4j
public class MonitorService {private volatile boolean stop;private volatile boolean starting;private Thread monitorThread;public void start() {// 缩小同步范围,提升性能synchronized (this) {log.info("该监控线程已启动?({})", starting);if (starting) {return;}starting = true;}// 由于之前的 balking 模式,以下代码只可能被一个线程执行,因此无需互斥monitorThread = new Thread(() -> {while (!stop) {report();sleep(2);}// 这里的监控线程只可能启动一个,因此只需要用 volatile 保证 starting 的可见性log.info("监控线程已停止...");starting = false;});stop = false;log.info("监控线程已启动...");monitorThread.start();}private void report() {Info info = new Info();info.setTotal(Runtime.getRuntime().totalMemory());info.setFree(Runtime.getRuntime().freeMemory());info.setMax(Runtime.getRuntime().maxMemory());info.setTime(System.currentTimeMillis());MonitorController.QUEUE.offer(info);}private void sleep(long seconds) {try {TimeUnit.SECONDS.sleep(seconds);} catch (InterruptedException e) {}}public synchronized void stop() {stop = true;// 不加打断需要等到下一次 sleep 结束才能退出循环,这里是为了更快结束monitorThread.interrupt();}
}

starting 变量要设置成 volatile,虽然starting = true是在同步代码块中,但是下面的starting = false可能会出现不可见问题

此外,犹豫模式还经常用来实现线程安全的单例(懒惰初始化)

public final class Singleton {private Singleton() {}private static Singleton INSTANCE = null;public static synchronized Singleton getInstance() {if (INSTANCE != null) {return INSTANCE;}INSTANCE = new Singleton();return INSTANCE;}
}

指令重排

JVM 会在不影响正确性的前提下,可以调整语句的执行顺序,思考下面一段代码

static int i;
static int j;
// 在某个线程内执行如下赋值操作
i = ...;
j = ...;

可以看到,至于是先执行 i 还是 先执行 j ,对最终的结果不会产生影响,因此实际运行中可能变为下面的顺序

j = ...;
i = ...;

这种特性称之为『指令重排』,多线程下『指令重排』会影响正确性

指令重排序优化
在这里插入图片描述
在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序和组合来实现指令级并行

指令重排的前提是,重排指令不能影响结果,例如

// 可以重排的例子
int a = 10; // 指令1
int b = 20; // 指令2
System.out.println( a + b );
// 不能重排的例子
int a = 10; // 指令1
int b = a - 5; // 指令2

指令重排带来的问题

考虑r1的值为什么

int num = 0;
boolean ready = false;
// 线程1 执行此方法
public void actor1(I_Result r) {if(ready) {r.r1 = num + num;} else {r.r1 = 1;}
}
// 线程2 执行此方法
public void actor2(I_Result r) {num = 2;ready = true;
}

如果先执行actor1方法,那么r1为1;如果先执行actor2,那么r1为4;

如果actor2中发生了指令重排,那么r1可能为0。例如actor2先执行ready = true,然后切换到了actor1,此时num为0,所以r1为0。注意,需要大量测试才能观察到该结果。

有序性 - 禁用指令重排

volatile 修饰的变量,可以禁用指令重排

int num = 0;
volatile boolean ready = false;
// 线程1 执行此方法
public void actor1(I_Result r) {if(ready) {r.r1 = num + num;} else {r.r1 = 1;}
}
// 线程2 执行此方法
public void actor2(I_Result r) {num = 2;ready = true;
}

为什么给ready变量加volatile,而num变量不用加?因为给变量加volatile可以保证该变量之前的代码不会重排到它的后面。所以只要给最后面的变量加上volatile就可以了

共享模型之JMM - volatile原理

如何保证可见性

在volatile修饰的变量后面加写屏障,写屏障(sfence)保证在该屏障之前的,所有对共享变量的改动,都同步到主存当中(注意,volatile修饰的变量和该变量前面的变量都会被同步,即使它们没有被volatile修饰)

public void actor2(I_Result r) {num = 2;ready = true; // ready 是 volatile 赋值带写屏障// 写屏障
}

而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据

public void actor1(I_Result r) {// 读屏障// ready 是 volatile 读取值带读屏障if(ready) {r.r1 = num + num;} else {r.r1 = 1;}
}

如何保证有序性

写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后

public void actor2(I_Result r) {num = 2;ready = true; // ready 是 volatile 赋值带写屏障// 写屏障
}

读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前

public void actor1(I_Result r) {// 读屏障// ready 是 volatile 读取值带读屏障if(ready) {r.r1 = num + num;} else {r.r1 = 1;}
}

注意:volatile 不能解决指令交错。写屏障仅仅是保证之后的读能够读到最新的结果,如果另一个线程在变量修改之前就读了,还是会出现问题;有序性的保证也只是保证了本线程内相关代码不被重排序,线程间执行的指令是cpu时间片决定的,还是会交错执行。

double-checked locking 问题

public final class Singleton {private Singleton() {}private static Singleton INSTANCE = null;public static synchronized Singleton getInstance() {if(INSTANCE == null){INSTANCE = new Singleton();}return INSTANCE;}
}
public final class Singleton {private Singleton() {}private static Singleton INSTANCE = null;// 分析这里的线程安全, 并说明有什么缺点public static synchronized Singleton getInstance() {synchronized (Singleton.class) {if(INSTANCE == null){INSTANCE = new Singleton();}return INSTANCE;}}
}

上面两个代码其实是等价的,但是每次调用 getInstance() 方法都要加锁,效率低,继续优化

public final class Singleton {private Singleton() { }private static volatile Singleton INSTANCE = null;public static Singleton getInstance() {if(INSTANCE == null) {// 首次访问会同步,而之后的使用没有 synchronizedsynchronized(Singleton.class) {if (INSTANCE == null) {INSTANCE = new Singleton();}}}return INSTANCE;}
}

以上的实现特点是:

  • 懒惰实例化
  • 首次使用 getInstance() 才使用 synchronized 加锁,后续使用时无需加锁
  • 有隐含的,但很关键的一点:第一个 if 使用了 INSTANCE 变量,是在同步块之外

但是以上代码在多线程环境下会出现问题! 问题出现在这一行:

INSTANCE = new Singleton();

底层字节码是:

21: invokespecial #4 // Method "<init>":()V
24: putstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;

21 表示利用一个对象引用,调用构造方法
24 表示利用一个对象引用,赋值给 static INSTANCE

如果发生指令重排,jvm 会优化为:先执行 24,再执行 21,此时INSTANCE对象完成了赋值,但还没有调用构造方法,那么就会导致另一个线程 if(INSTANCE == null) 的结果为false,直接返回了一个没有初始化的INSTANCE对象

注意一个误区:

  • synchronized并不能保证有序性,代码块里的代码,是有可能发生指令重排的
  • 如果某个对象完全被 monitor 监管(即某个对象的使用范围完全在synchronized代码块内部),那么即使发生了指令重排,也不会影响结果。但以上例子中,synchronized代码块外部也访问了INSTANCE(即判断INSTANCE是否为null),一旦发生指令重排,就可能会出现问题。

解决方法:给INSTANCE加volatile关键字

//正确代码
public final class Singleton {private Singleton() { }private static volatile Singleton INSTANCE = null;public static Singleton getInstance() {if(INSTANCE == null) {// 首次访问会同步,而之后的使用没有 synchronizedsynchronized(Singleton.class) {if (INSTANCE == null) {INSTANCE = new Singleton();}}}return INSTANCE;}
}

加上volatile关键字之后,会在字节码24行之后加一个写屏障,保证24行之前的代码不会排到24行之后

21: invokespecial #4 // Method "<init>":()V
24: putstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;

happens-before 规则

happens-before 规定了对共享变量的写操作对其它线程的读操作可见,它是可见性与有序性的一套规则总结。抛开以下 happens-before 规则,JMM 并不能保证一个线程对共享变量的写,对于其它线程对该共享变量的读可见

共7个规则,详见课后文档

线程安全单例

单例模式有很多实现方法,饿汉、懒汉、静态内部类、枚举类,试分析每种实现下获取单例对象(即调用
getInstance)时的线程安全,并思考注释中的问题

饿汉式:类加载就会导致该单实例对象被创建
懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建

实现1,饿汉式,没有线程安全问题(静态成员是在类加载时创建的)

// 问题1:为什么加 final
// 问题2:如果实现了序列化接口, 还要做什么来防止反序列化破坏单例
public final class Singleton implements Serializable {// 问题3:为什么设置为私有? 是否能防止反射创建新的实例?private Singleton() {}// 问题4:这样初始化是否能保证单例对象创建时的线程安全?private static final Singleton INSTANCE = new Singleton();// 问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由public static Singleton getInstance() {return INSTANCE;}public Object readResolve() {return INSTANCE;}
}

实现2

// 问题1:枚举单例是如何限制实例个数的
// 问题2:枚举单例在创建时是否有并发问题
// 问题3:枚举单例能否被反射破坏单例
// 问题4:枚举单例能否被反序列化破坏单例
// 问题5:枚举单例属于懒汉式还是饿汉式
// 问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做
enum Singleton {INSTANCE;
}

实现3

public final class Singleton {private Singleton() { }private static Singleton INSTANCE = null;// 分析这里的线程安全, 并说明有什么缺点public static synchronized Singleton getInstance() {if( INSTANCE != null ){return INSTANCE;}INSTANCE = new Singleton();return INSTANCE;}
}

实现4,懒汉式

public final class Singleton {private Singleton() { }// 问题1:解释为什么要加 volatile private static volatile Singleton INSTANCE = null;// 问题2:对比实现3, 说出这样做的意义public static Singleton getInstance() {if (INSTANCE != null) {return INSTANCE;}synchronized (Singleton.class) {// 问题3:为什么还要在这里加为空判断, 之前不是判断过了吗if (INSTANCE != null) { return INSTANCE;}INSTANCE = new Singleton();return INSTANCE;}}
}

实现5,推荐使用这种方式创建单例

public final class Singleton {private Singleton() { }// 问题1:属于懒汉式还是饿汉式private static class LazyHolder {static final Singleton INSTANCE = new Singleton();}// 问题2:在创建时是否有并发问题public static Singleton getInstance() {return LazyHolder.INSTANCE;}
}

类加载时不会加载这个静态内部类,所以是懒汉式的。只有第一次调用getInstance()方法时,才会触发静态内部类的类加载,而类加载过程中创建对象会由JVM保证其线程安全性。

共享模型之无锁并发

CAS案例

public class JUC_test {public static void main(String[] args) {Account a1 = new AccountUnsafe(10000);Account.demo(a1);Account a2 = new AccountCas(10000);Account.demo(a2);}
}//无锁方式
class AccountCas implements Account {private AtomicInteger balance;public AccountCas(Integer balance) {this.balance = new AtomicInteger(balance);}@Overridepublic Integer getBalance() {return balance.get();}@Overridepublic void withdraw(Integer amount) {while (true) {int prev = balance.get();int next = prev - amount;if (balance.compareAndSet(prev, next)) {break;}}}
}//加锁方式
class AccountUnsafe implements Account {private Integer balance;public AccountUnsafe(Integer balance) {this.balance = balance;}@Overridepublic Integer getBalance() {synchronized (this) {return balance;}}@Overridepublic void withdraw(Integer amount) {synchronized (this) {balance -= amount;}}
}
interface Account {// 获取余额Integer getBalance();// 取款void withdraw(Integer amount);/*** 方法内会启动 1000 个线程,每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(Account account) {List<Thread> ts = new ArrayList<>();long start = System.nanoTime();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(10);}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(account.getBalance()+ " cost: " + (end-start)/1000_000 + " ms");}
}

CAS工作方式

compareAndSet,简称 CAS (也有 Compare And Swap 的说法),它必须是原子操作
在这里插入图片描述
注意:实现CAS必须配合volatile来使用,因为CAS通过读取到共享变量的最新值来实现比较与交换

为什么无锁效率高?
无锁情况下,即使重试失败,线程始终在高速运行,没有停歇;而 synchronized 会让线程在没有获得锁的时
候,发生上下文切换,进入阻塞。

CAS特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下(CAS的线程最好不要多于cpu核心数)

  • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,吃亏点再重试。
  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量
  • CAS 体现的是无锁并发、无阻塞并发
    因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

原子整数

J.U.C 并发包提供了:AtomicBoolean、AtomicInteger、AtomicLong,下面以AtomicInteger为例

AtomicInteger内部维护了一个整数变量value,可以用cas保证其线程安全

public class Test34 {public static void main(String[] args) {AtomicInteger i = new AtomicInteger(5);System.out.println(i.incrementAndGet()); // ++i   1System.out.println(i.getAndIncrement()); // i++   2System.out.println(i.getAndAdd(5)); // 2 , 7System.out.println(i.addAndGet(5)); // 12, 12i.updateAndGet(value -> value * 10);System.out.println(i.get());//使用自己手写的updateAndGet方法System.out.println(updateAndGet(i, p -> p / 2));}//手写updateAndGet方法public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator) {while (true) {int prev = i.get();int next = operator.applyAsInt(prev);if (i.compareAndSet(prev, next)) {return next;}}}
}

上面account取钱的案例可以优化为:

public void withdraw(Integer amount) {balance.addAndGet(-1 * amount);
}

原子引用

需要cas保护的是一个引用类型的数据,可以用AtomicReference、AtomicMarkableReference、AtomicStampedReference

假设一个BigDecimal类型的数据balance需要被保护:

@Slf4j(topic = "c.Test35")
public class Test35 {public static void main(String[] args) {DecimalAccount.demo(new DecimalAccountCas(new BigDecimal("10000")));}
}class DecimalAccountCas implements DecimalAccount {private AtomicReference<BigDecimal> balance;public DecimalAccountCas(BigDecimal balance) {
//        this.balance = balance;this.balance = new AtomicReference<>(balance);}@Overridepublic BigDecimal getBalance() {return balance.get();}@Overridepublic void withdraw(BigDecimal amount) {while(true) {BigDecimal prev = balance.get();BigDecimal next = prev.subtract(amount);if (balance.compareAndSet(prev, next)) {break;}}}
}interface DecimalAccount {// 获取余额BigDecimal getBalance();// 取款void withdraw(BigDecimal amount);/*** 方法内会启动 1000 个线程,每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(DecimalAccount account) {List<Thread> ts = new ArrayList<>();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(BigDecimal.TEN);}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(account.getBalance());}
}

ABA问题

public class JUC_test {static AtomicReference<String> ref = new AtomicReference<>("A");public static void main(String[] args) throws InterruptedException {log.debug("main start...");// 获取值 A// 这个共享变量被它线程修改过?String prev = ref.get();other();sleep(1);// 尝试改为 Clog.debug("change A->C {}", ref.compareAndSet(prev, "C"));}private static void other() {new Thread(() -> {log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));}, "t1").start();sleep(0.5);new Thread(() -> {log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));}, "t2").start();}
}

主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到其他线程从 A 改为 B 又改回 A 的情况

解决办法:AtomicStampedReference,加版本号

@Slf4j(topic = "c.Test36")
public class Test36 {static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0); //版本号为0public static void main(String[] args) throws InterruptedException {log.debug("main start...");// 获取值 AString prev = ref.getReference();// 获取版本号int stamp = ref.getStamp();log.debug("版本 {}", stamp);// 如果中间有其它线程干扰,发生了 ABA 现象other();sleep(1);// 尝试改为 Clog.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));}private static void other() {new Thread(() -> {//每次修改版本号加1log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1)); log.debug("更新版本为 {}", ref.getStamp());}, "t1").start();sleep(0.5);new Thread(() -> {log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));log.debug("更新版本为 {}", ref.getStamp());}, "t2").start();}
}

也可以使用AtomicMarkableReference,这种方式不需要加版本号,而是用维护一个布尔值用于记录变量是否修改过

@Slf4j(topic = "c.Test38")
public class Test38 {public static void main(String[] args) throws InterruptedException {GarbageBag bag = new GarbageBag("装满了垃圾");// 参数2 mark 可以看作一个标记,表示垃圾袋满了AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);log.debug("start...");GarbageBag prev = ref.getReference();log.debug(prev.toString());new Thread(() -> {log.debug("start...");bag.setDesc("空垃圾袋");ref.compareAndSet(bag, bag, true, false); //修改了marklog.debug(bag.toString());},"保洁阿姨").start();sleep(1);log.debug("想换一只新垃圾袋?");boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false); log.debug("换了么?" + success);log.debug(ref.getReference().toString());}
}class GarbageBag {String desc;public GarbageBag(String desc) {this.desc = desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return super.toString() + " " + desc;}
}

原子数组

AtomicReference保护的是对象的引用,但是它不能保护对象内部的值(比如数组)

AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

假设有一个int类型的数组。有100个线程,每个线程对数组里的每个元素做10000次自增。

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws InterruptedException {int[] a = {0, 0, 0, 0, 0};//        for(int i = 0; i < 100; i ++) { //100个线程
//            new Thread(() -> {
//                for(int j = 0; j < 10000; j ++) { //每个线程做10000次操作
//                    for(int k = 0; k < a.length; k ++)
//                        a[k] ++;
//                }
//            }).start();
//        }AtomicIntegerArray a1 = new AtomicIntegerArray(a);for(int i = 0; i < 100; i ++) { //100个线程new Thread(() -> {for(int j = 0; j < 10000; j ++) { //每个线程做10000次操作for(int k = 0; k < a1.length(); k ++)a1.getAndIncrement(k);}}).start();}Thread.sleep(3000);for(int i = 0; i < a.length; i ++)System.out.print(a1.get(i) + " ");}
}

字段更新器

需要保护的是一个对象的属性,根据类型分为以下三种:
AtomicReferenceFieldUpdater,AtomicIntegerFieldUpdater,AtomicLongFieldUpdater

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws InterruptedException {Student student = new Student("zhangsan");AtomicReferenceFieldUpdater<Student, String> updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");boolean b = updater.compareAndSet(student, "zhangsan", "lisi");System.out.println(b);System.out.println(student.name);}
}
class Student {volatile String name;public Student(String name) {this.name = name;}
}

原子累加器

虽然原子整数AtomicInteger、AtomicLong的getAndIncrement()方法可以自增,但是并发量非常大时,会导致大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS 的操作,而这会白白浪费CPU 资源。

JDK 8 新增了一个LongAdder类用来克服在高并发下使用AtomicLong的缺点。此外,还有LongAccumulator类,它比LongAdder的功能更强大,可以为累加器提供非0的初始值,还可以指定累加规则。

LongAdder原理

LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧

LongAdder 类有几个关键域

// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;

为什么要加锁?cells创建或扩容时,可能出现线程安全问题,这里使用一种cas的方式加锁。以下是一个简单例子,只是为了模拟cellsBusy的作用,并不能直接用于实践:

// 不要用于实践!!!
public class LockCas {private AtomicInteger state = new AtomicInteger(0);public void lock() {while (true) {if (state.compareAndSet(0, 1)) {break;}}}public void unlock() {log.debug("unlock...");state.set(0);}public static void main(String[] args) {//测试代码:LockCas lock = new LockCas();new Thread(() -> {log.debug("begin...");lock.lock();try {log.debug("lock...");sleep(1);} finally {lock.unlock();}}).start();new Thread(() -> {log.debug("begin...");lock.lock();try {log.debug("lock...");} finally {lock.unlock();}}).start();}
}

LongAdder原理 - 缓存行伪共享

LongAdder的原理是每个线程对应一个cell,每个线程只在自己的cell里计算,就不会出现共享变量的线程安全问题

Cell 为累加单元

// 该注解防止缓存行伪共享
@sun.misc.Contended
static final class Cell {volatile long value;Cell(long x) { value = x; }// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值final boolean cas(long prev, long next) {return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);}// 省略不重要代码
}

在这里插入图片描述
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。

而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中

CPU要保证数据的 一致性如果某个CPU核心更改了数据,其它CPU核心对应的整个缓存行必须失效 。如下图所示:
在这里插入图片描述
因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象。这样问题来了:
Core-0 要修改 Cell[0]
Core-1 要修改 Cell[1]

无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加
Cell[0]=6001, Cell[1]=8000,这时会让 Core-1 的缓存行失效,需要从内存中读取新的值

@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
在这里插入图片描述

LongAdder源码 - add

在这里插入图片描述

public void add(long x) {// as 为累加单元数组// b 为基础值// x 为累加值Cell[] as; long b, v; int m; Cell a;// 进入 if 的两个条件// 1. as 有值, 表示已经发生过竞争, 进入 if// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 ifif ((as = cells) != null || !casBase(b = base, b + x)) {// uncontended 表示 cell 没有竞争boolean uncontended = true;if (// as 还没有创建as == null || (m = as.length - 1) < 0 ||// 当前线程对应的 cell 还没有创建(a = as[getProbe() & m]) == null ||// cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )!(uncontended = a.cas(v = a.value, v + x))) {// 进入 cell 数组创建、cell 创建的流程longAccumulate(x, null, uncontended);}}
}

cells数组是懒惰创建的,只有竞争发生时才会创建

LongAdder源码 - longAccumulate

在这里插入图片描述

for(; ;)里面分成三部分

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {int h;// 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cellif ((h = getProbe()) == 0) {// 初始化 probeThreadLocalRandom.current();// h 对应新的 probe 值, 用来对应 cellh = getProbe();wasUncontended = true;}// collide 为 true 表示需要扩容boolean collide = false;for (;;) {Cell[] as; Cell a; int n; long v;// 1.已经有了 cellsif ((as = cells) != null && (n = as.length) > 0) {}// 2.还没有 cells, 尝试给 cellsBusy 加锁else if (cellsBusy == 0 && cells == as && casCellsBusy()) {}// 3.上两种情况失败, 尝试给 base 累加else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;}
}

先看第二个条件:还没有 cells 数组,那么会尝试加锁。如果cells == as即别的线程没有创建cells,并且加锁成功,则创建一个长度为2的 cells 数组,但是并不是两个累加单元对象cell都初始化,而是只初始化一个。

else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try {                           // Initialize tableif (cells == as) {Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;
}

再看第一个条件:已经有了 cells 数组
在这里插入图片描述
如果没有累加单元cell,尝试加锁创建cell,创建失败进入下一轮循环。创建成功break。

如果有了累加单元cell,cas 尝试累加,累加成功break。累加失败判断cells 长度是否超过cpu核心数

// 已经有了 cells
if ((as = cells) != null && (n = as.length) > 0) {// 还没有 cellif ((a = as[(n - 1) & h]) == null) {// 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x// 成功则 break, 否则继续 continue 循环if (cellsBusy == 0) {       Cell r = new Cell(x);   if (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try {               Cell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) { //对应的槽位是否为空rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // cell没有创建成功,created为false,下一轮循环 }}collide = false;}// 有竞争, 改变线程对应的 cell 来重试 caselse if (!wasUncontended)       // CAS already known to failwasUncontended = true;      // Continue after rehash// cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 nullelse if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;// 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 caselse if (n >= NCPU || cells != as)collide = false;            // At max size or stale// 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了else if (!collide)collide = true;// 加锁,加锁成功则扩容else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == as) {      // 扩容Cell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i) //旧数组的内容拷贝到新数组里去rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;continue;                   // Retry with expanded table}// 改变线程对应的 cellh = advanceProbe(h);
}

在这里插入图片描述

LongAdder源码 - sum

获取最终结果通过 sum 方法

 public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;
}

Unsafe

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得

public class UnsafeAccessor {static Unsafe unsafe;static {try {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); //通过反射获取对象theUnsafe.setAccessible(true);unsafe = (Unsafe) theUnsafe.get(null); //静态变量属于类而非对象,所以传递null} catch (NoSuchFieldException | IllegalAccessException e) {throw new Error(e);}}static Unsafe getUnsafe() {return unsafe;}
}

使用Unsafe对Teacher对象的属性进行修改

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws InterruptedException, NoSuchFieldException {Unsafe unsafe = UnsafeAccessor.getUnsafe();Field id = Student.class.getDeclaredField("id");Field name = Student.class.getDeclaredField("name");// 获得成员变量的偏移量long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id);long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);Student student = new Student();// 使用 cas 方法替换成员变量的值UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 trueUnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 trueSystem.out.println(student);}
}
@Data
class Student {volatile int id;volatile String name;
}

使用自定义的 AtomicData 实现之前线程安全的原子整数 Account 实现

@Slf4j(topic = "c.JUC_test")
public class JUC_test {public static void main(String[] args) throws InterruptedException, NoSuchFieldException {Account.demo(new Account() {AtomicData atomicData = new AtomicData(10000);@Overridepublic Integer getBalance() {return atomicData.getData();}@Overridepublic void withdraw(Integer amount) {atomicData.decrease(amount);}});}
}
class AtomicData {private volatile int data;static final Unsafe unsafe;static final long DATA_OFFSET;static {unsafe = UnsafeAccessor.getUnsafe();try {// data 属性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));} catch (NoSuchFieldException e) {throw new Error(e);}}public AtomicData(int data) {this.data = data;}public void decrease(int amount) {int oldValue;while(true) {// 获取共享变量旧值,可以在这一行加入断点,修改 data 调试来加深理解oldValue = data;// cas 尝试修改 data 为 旧值 + amount,如果期间旧值被别的线程改了,返回 falseif (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {return;}}}public int getData() {return data;}
}
interface Account {// 获取余额Integer getBalance();// 取款void withdraw(Integer amount);/*** 方法内会启动 1000 个线程,每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(Account account) {List<Thread> ts = new ArrayList<>();long start = System.nanoTime();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(10);}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(account.getBalance()+ " cost: " + (end-start)/1000_000 + " ms");}
}

共享模型之不可变

日期转换的问题

下面的代码在运行时,由于 SimpleDateFormat 不是线程安全的,会出现错误

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
for (int i = 0; i < 10; i++) {new Thread(() -> {try {log.debug("{}", sdf.parse("1951-04-21"));} catch (Exception e) {log.error("{}", e);}}).start();
}

解决方法:

  1. 加锁,但是性能差
  2. 使用 jdk8 新增的 DateTimeFormatter 对象
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
for (int i = 0; i < 10; i++) {new Thread(() -> {LocalDate date = dtf.parse("2018-10-01", LocalDate::from);log.debug("{}", date);}).start();
}

DateTimeFormatter 是一个不可变对象。一个对象不能够修改其内部状态(属性),那么它就是线程安全的。

不可变设计

String 类也是不可变的,以它为例,说明一下不可变设计的要素

public final class Stringimplements java.io.Serializable, Comparable<String>, CharSequence {/** The value is used for character storage. */private final char value[];/** Cache the hash code for the string */private int hash; // Default to 0// ...
}

final 的使用

  • 该类、类中所有属性都是 final 的
  • 属性用 final 修饰保证了该属性是只读的,不能修改
  • 类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性

final 加在数组上,只能保证数组的引用不能修改,但是数组里面的内容是可以被修改的。
不可变类用 保护性拷贝 来解决这个问题。如果 String构造方法传递的是一个char数组,那么会将数组拷贝一份。

public String(char value[]) {this.value = Arrays.copyOf(value, value.length);
}

保护性拷贝
构造新字符串对象时,会生成新的 char[] value,对内容进行复制 。这种通过创建副本对象来避
免共享的手段称之为 保护性拷贝(defensive copy)

再举一个例子,substring

public String substring(int beginIndex) {if (beginIndex < 0) {throw new StringIndexOutOfBoundsException(beginIndex);}int subLen = value.length - beginIndex;if (subLen < 0) {throw new StringIndexOutOfBoundsException(subLen);}return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}

发现其内部是调用 String 的构造方法创建了一个新字符串,而这个构造方法内部也是将value数组拷贝了一份

this.value = Arrays.copyOfRange(value, offset, offset+count);

享元模式

不可变类需要创建对象,而创建大量对象会消耗性能。当需要重用数量有限的同一类对象时,可以采用享元模式

享元模式的体现:

  1. 包装类:
  2. String 串池
  3. BigDecimal BigInteger

在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的 valueOf 方法会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象

public static Long valueOf(long l) {final int offset = 128;if (l >= -128 && l <= 127) { // will cachereturn LongCache.cache[(int)l + offset];}return new Long(l);
}

注意:
Byte, Short, Long 缓存的范围都是 -128~127
Character 缓存的范围是 0~127
Integer的默认范围是 -128~127,最小值不能变,最大值可以调整虚拟机参数:
-Djava.lang.Integer.IntegerCache.high 来改变
Boolean 缓存了 TRUE 和 FALSE

一个误区:BigInteger是线程安全的,为什么前面取款的例子需要用 AtomicReference 保护 BigInteger?
因为取款案例是多个方法的组合,每个方法是原子的,但方法的组合不是原子的:

@Override
public void withdraw(BigDecimal amount) {while(true) {BigDecimal prev = balance.get();BigDecimal next = prev.subtract(amount);if (balance.compareAndSet(prev, next)) {break;}}
}

get()、subtract()、compareAndSet() 三个方法的组合会产生线程安全问题

享元模式 - 自定义连接池

一个线上商城应用,QPS 达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时
预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。

class Pool {// 1. 连接池大小private final int poolSize;// 2. 连接对象数组private Connection[] connections;// 3. 连接状态数组 0 表示空闲, 1 表示繁忙private AtomicIntegerArray states;// 4. 构造方法初始化public Pool(int poolSize) {this.poolSize = poolSize;this.connections = new Connection[poolSize];this.states = new AtomicIntegerArray(new int[poolSize]);for (int i = 0; i < poolSize; i++) {connections[i] = new MockConnection("连接" + (i+1));}}// 5. 借连接public Connection borrow() {while(true) {for (int i = 0; i < poolSize; i++) {// 获取空闲连接if(states.get(i) == 0) {if (states.compareAndSet(i, 0, 1)) {log.debug("borrow {}", connections[i]);return connections[i];}}}// 如果没有空闲连接,当前线程进入等待synchronized (this) {try {log.debug("wait...");this.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 6. 归还连接public void free(Connection conn) {for (int i = 0; i < poolSize; i++) {if (connections[i] == conn) {states.set(i, 0);synchronized (this) {log.debug("free {}", conn);this.notifyAll();}break;}}}
}
class MockConnection implements Connection {private String name;public MockConnection(String name) {this.name = name;}// 方法实现略
}

测试代码:

Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {new Thread(() -> {Connection conn = pool.borrow();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}pool.free(conn);}).start();
}

为什么 borrow() 里面,线程获取不到空闲连接的时候要wait(),而不能进入下一次while循环继续尝试获取空闲连接?
cas适合运行时间短的代码片段,这里其他线程拿到conncetion对象以后,会做一些耗时比较长的操作(增删改查),那么尝试获取空闲连接的线程就会一直循环,导致cpu资源被浪费

以上实现没有考虑:

  • 连接的动态增长与收缩(这里的poolSize是固定大小)
  • 连接保活(可用性检测)
  • 等待超时处理(可以参考保护性暂停里的超时等待)
  • 分布式 hash

final原理

public class TestFinal {final int a = 20;
}

字节码

0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: aload_0
5: bipush 20
7: putfield #2 // Field a:I<-- 写屏障
10: return

发现 final 变量的赋值也会通过 putfield 指令来完成,同样在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况

注意:这个地方没彻底搞懂,需要查阅一下其他的资料

无状态

在 web 阶段学习时,设计 Servlet 时为了保证其线程安全,都会有这样的建议,不要为 Servlet 设置成员变量,这种没有任何成员变量的类是线程安全的

因为成员变量保存的数据也可以称为状态信息,因此没有成员变量就称之为无状态

共享模型之工具 - 线程池

自定义线程池

线程池是享元模式的体现,可以让线程重复利用,减少内存占用,避免线程频繁上下文切换。
在这里插入图片描述
注意:这个图画得有问题,应该是main往阻塞队列里放任务,线程池里从阻塞队列里取出任务并执行

阻塞队列:

@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {// 1. 任务队列private Deque<T> queue = new ArrayDeque<>();// 2. 锁private ReentrantLock lock = new ReentrantLock();// 3. 生产者条件变量private Condition fullWaitSet = lock.newCondition();// 4. 消费者条件变量private Condition emptyWaitSet = lock.newCondition();// 5. 容量private int capcity;public BlockingQueue(int capcity) {this.capcity = capcity;}// 带超时阻塞获取public T poll(long timeout, TimeUnit unit) {lock.lock();try {// 将 timeout 统一转换为 纳秒long nanos = unit.toNanos(timeout);while (queue.isEmpty()) {try {if (nanos <= 0) {return null;}// 返回值是剩余时间,防止虚假唤醒nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞获取public T take() {lock.lock();try {while (queue.isEmpty()) { //队列为空,消费者阻塞try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞添加public void put(T task) {lock.lock();try {while (queue.size() == capcity) { //队列满,生产者阻塞try {log.debug("等待加入任务队列 {} ...", task);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();} finally {lock.unlock();}}// 带超时时间阻塞添加public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capcity) {try {if(nanos <= 0) {return false;}log.debug("等待加入任务队列 {} ...", task);nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();return true;} finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否满if(queue.size() == capcity) {rejectPolicy.reject(this, task);} else {  // 有空闲log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();}} finally {lock.unlock();}}
}

线程池:

@Slf4j(topic = "c.ThreadPool")
class ThreadPool {// 任务队列private BlockingQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();// 核心线程数private int coreSize;// 获取任务时的超时时间private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;// 执行任务public void execute(Runnable task) {// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行// 如果任务数超过 coreSize 时,加入任务队列暂存synchronized (workers) {if(workers.size() < coreSize) {Worker worker = new Worker(task);log.debug("新增 worker{}, {}", worker, task);workers.add(worker);worker.start();} else {
//                taskQueue.put(task);// 1) 死等// 2) 带超时等待// 3) 让调用者放弃任务执行// 4) 让调用者抛出异常// 5) 让调用者自己执行任务taskQueue.tryPut(rejectPolicy, task);}}}public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);this.rejectPolicy = rejectPolicy;}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 执行任务// 1) 当 task 不为空,执行任务// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
//            while(task != null || (task = taskQueue.take()) != null) {while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行...{}", task);task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers) {log.debug("worker 被移除{}", this);workers.remove(this);}}}
}

拒绝策略:任务队列满了,此时有新的任务加进来,怎么办?(策略模式,具体实现由调用者决定)

@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {void reject(BlockingQueue<T> queue, T task);
}

测试代码:

@Slf4j(topic = "c.TestPool")
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{// 1. 死等
//            queue.put(task);// 2) 带超时等待
//            queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3) 让调用者放弃任务执行
//            log.debug("放弃{}", task);// 4) 让调用者抛出异常
//            throw new RuntimeException("任务执行失败 " + task);// 5) 让调用者自己执行任务task.run(); //主线程执行});for (int i = 0; i < 4; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}

ThreadPoolExecutor

在这里插入图片描述
线程池状态:
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
在这里插入图片描述
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

为什么不用两个整数分别存储 线程池状态 和 线程数量?
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

构造方法

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

corePoolSize 核心线程数目 (最多保留的线程数)
maximumPoolSize 最大线程数目
keepAliveTime 生存时间 - 针对救急线程
unit 时间单位 - 针对救急线程
workQueue 阻塞队列
threadFactory 线程工厂 - 可以为线程创建时起个好名字
handler 拒绝策略

在这里插入图片描述
救急线程
maximumPoolSize - corePoolSize = 救急线程数量

如果阻塞队列满了,此时又来了一个任务(假设任务5),那么会创建救急线程执行该任务。
救急线程执行完任务后,会触发生存时间,超过生存时间后会被销毁;核心线程执行完任务不会被销毁,仍然保留在线程池中。

如果阻塞队列满了,并且没有救急线程执行任务,那么会执行拒绝策略

注意:如果阻塞队列使用的是无界队列,那么不会创建救急线程。只有有界队列才会创建救急线程

拒绝策略
jdk提供了4种拒绝策略
在这里插入图片描述
AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy 让调用者运行任务
DiscardPolicy 放弃本次任务
DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之

其他框架(如Dubbo、Netty)提供了其他的拒绝策略。

Executors工厂方法

根据构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池

  1. newFixedThreadPool,创建固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需设置超时时间
  • 阻塞队列是无界的,可以放任意数量的任务
  • 适用于任务量已知,相对耗时的任务

注意:也可以使用两个形参的方法,第二个为ThreadFactory,可以给线程起名字

private static void test1() {ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {private AtomicInteger t = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "mypool_t" + t.getAndIncrement());}});pool.execute(() -> {log.debug("1");});pool.execute(() -> {log.debug("2");});pool.execute(() -> {log.debug("3");});
}
  1. newCachedThreadPool,创建带缓冲的线程池
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s。这意味着全部都是救急线程(60s 后可以回收),救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货
  • 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况
  1. newSingleThreadExecutor,单线程线程池
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
  • 使用场景:希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
  • 单线程 的区别:
    自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施;而线程池还会新建一个线程,保证池的正常工作
  • Executors.newFixedThreadPool(1) 的区别:
    (1) newFixedThreadPool(1) 初始时为1,以后还可以修改;newSingleThreadExecutor() 线程个数始终为1,不能修改;
    (2) newFixedThreadPool(1) 直接返回了一个ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改。
    newSingleThreadExecutor() 返回的对象被 FinalizableDelegatedExecutorService 包装(装饰器模式), 只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法

提交任务

execute() 方法没有返回值

// 执行任务
void execute(Runnable command);

submit() 方法返回一个Future对象,调用 future.get() 获取返回的结果

// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
ExecutorService pool = Executors.newFixedThreadPool(3);
Future<String> future = pool.submit(() -> {log.debug("running");Thread.sleep(1000);return "ok";
});
log.debug("{}", future.get());

invokeAll() 接收一个任务集合tasks,提交 tasks 中所有任务;返回值是一个Future集合

// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
ExecutorService pool = Executors.newFixedThreadPool(3);
List<Future<String>> futures = pool.invokeAll(Arrays.asList(() -> {log.debug("begin");Thread.sleep(1000);return "1";},() -> {log.debug("begin");Thread.sleep(500);return "2";},() -> {log.debug("begin");Thread.sleep(2000);return "3";}
));futures.forEach(f -> {try {log.debug("{}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
});

invokeAny() 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
// 带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

注意:下面的代码返回值是 String 类型;三个任务同时执行,但任务2时间最短先执行完毕,那么任务1和任务3会取消

ExecutorService pool = Executors.newFixedThreadPool(3);
String result = pool.invokeAny(Arrays.asList(() -> {log.debug("begin 1");Thread.sleep(1000);log.debug("end 1");return "1";},() -> {log.debug("begin 2");Thread.sleep(500);log.debug("end 2");return "2";},() -> {log.debug("begin 3");Thread.sleep(2000);log.debug("end 3");return "3";}
));
log.debug("{}", result);

关闭线程池

shutdown() 方法,线程池状态变为 SHUTDOWN,不会接收新任务,但已提交任务会执行完。此方法不会阻塞调用线程的执行(假设main线程调用shutdown,那么调用完后main线程继续往下执行)

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 修改线程池状态advanceRunState(SHUTDOWN);// 仅会打断空闲线程interruptIdleWorkers();onShutdown(); // 扩展点 ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)tryTerminate();
}

shutdownNow() 方法,线程池状态变为 STOP,不会接收新任务,会将队列中的任务返回,并用 interrupt 的方式中断正在执行的任务

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 修改线程池状态advanceRunState(STOP);// 打断所有线程interruptWorkers();// 获取队列中剩余任务tasks = drainQueue();} finally {mainLock.unlock();}// 尝试终结tryTerminate();return tasks;
}

其他方法

// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

设计模式 - 工作线程

让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。

注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理。

固定大小线程池 会有 饥饿 现象,如下所示:

@Slf4j(topic = "c.TestStarvation")
public class TestStarvation {static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");static Random RANDOM = new Random();static String cooking() {return MENU.get(RANDOM.nextInt(MENU.size()));}public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(2);pool.execute(() -> {log.debug("处理点餐...");Future<String> f = pool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});pool.execute(() -> {log.debug("处理点餐...");Future<String> f = pool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});}
}

如果只有一个客人(只提交了一个任务),那么一个线程处理点餐,一个线程做饭,不会有问题。但如果来了两个客人,那么两个线程都去处理点餐,并且都在等待菜做好了上菜(调用 f.get() 方法),此时没有线程去做饭了。

分析原因:线程数不足导致了饥饿现象
解决方法:不同任务类型使用不同的线程池

@Slf4j(topic = "c.TestStarvation")
public class TestStarvation {static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");static Random RANDOM = new Random();static String cooking() {return MENU.get(RANDOM.nextInt(MENU.size()));}public static void main(String[] args) {ExecutorService waiterPool = Executors.newFixedThreadPool(1);ExecutorService cookPool = Executors.newFixedThreadPool(1);waiterPool.execute(() -> {log.debug("处理点餐...");Future<String> f = cookPool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});waiterPool.execute(() -> {log.debug("处理点餐...");Future<String> f = cookPool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});}
}

线程池多大合适

过小会导致程序不能充分地利用系统资源、容易导致饥饿
过大会导致更多的线程上下文切换,占用更多内存

  1. CPU 密集型运算
    通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

  2. I/O 密集型运算
    CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
    经验公式如下
    线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
    例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式
    4 * 100% * 100% / 50% = 8
    例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式
    4 * 100% * 100% / 10% = 40

任务调度线程池

任务调度线程池功能 加入之前,可以使用 java.util.Timer 来实现定时功能。Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

Timer timer = new Timer(); 
TimerTask task1 = new TimerTask() {@Overridepublic void run() {log.debug("task 1");sleep(2000);
//int i = 1 / 0;}
};
TimerTask task2 = new TimerTask() {@Overridepublic void run() {log.debug("task 2");}
};log.debug("start...");
timer.schedule(task1, 1000); //延迟1秒后执行
timer.schedule(task2, 1000);

以上代码 task1 sleep两秒,或者出现异常,都会导致task2不是延迟一秒后执行,而是在task1执行完以后延迟一秒执行

而使用 ScheduledExecutorService 不会出现以上问题。注意task1里出现了异常,虽然不会影响task2运行,但是控制台并没有打印异常信息。后面会介绍如何捕获异常。

ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);pool.schedule(() -> {log.debug("task1");sleep(2000);//int i = 1 / 0;
}, 1, TimeUnit.SECONDS);pool.schedule(() -> {log.debug("task2");
}, 1, TimeUnit.SECONDS);

除了延时任务,还可以处理定时任务

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);log.debug("start...");//scheduleAtFixedRate
pool.scheduleAtFixedRate(() -> {log.debug("running...");sleep(2000);
}, 1, 1, TimeUnit.SECONDS);//scheduleWithFixedDelay
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
pool.scheduleWithFixedDelay(()-> {log.debug("running...");sleep(2000);
}, 1, 1, TimeUnit.SECONDS);

scheduleAtFixedRate:1s以后任务开始执行,但之后都是每 2s 执行一次任务。参数里的1s是每个任务开始后启动计时,如果发现该任务没有执行完,会先等待其执行完再执行下一个任务。

ScheduledExecutorService:1s以后任务开始执行,但之后都是每 3s 执行一次任务。参数里的1s是每个任务结束后启动计时,相当于任务之间的间隔,第一个任务结束1s之后才执行第二个任务。

线程池如何捕获异常

无论是普通线程池还是任务调度线程池,都不会自动捕获异常,有两种方法解决:

  1. 手动 try-catch 捕获异常
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {try {log.debug("task1");int i = 1 / 0;} catch (Exception e) {log.error("error:", e);}
});
  1. Future对象,返回结果之前如果发现有异常,会返回异常信息
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {log.debug("task1");int i = 1 / 0;return true;
});
log.debug("result:{}", f.get()); //get()方法会报异常

定时任务应用

每周四 18:00:00 定时执行任务

public class TestSchedule {// 如何让每周四 18:00:00 定时执行任务?public static void main(String[] args) {// 获取当前时间LocalDateTime now = LocalDateTime.now();System.out.println(now);// 获取周四时间LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);// 如果 当前时间 > 本周周四,必须找到下周周四if(now.compareTo(time) > 0) {time = time.plusWeeks(1);}System.out.println(time);// initailDelay 代表当前时间和周四的时间差// period 一周的间隔时间long initailDelay = Duration.between(now, time).toMillis();long period = 1000 * 60 * 60 * 24 * 7;ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);pool.scheduleAtFixedRate(() -> {System.out.println("running...");}, initailDelay, period, TimeUnit.MILLISECONDS);}
}

Tomcat 线程池

Tomcat 有两大部分:connector 和 container。其中连接器connector用到了线程池
在这里插入图片描述

  • LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
  • Acceptor 只负责【接收新的 socket 连接】
  • Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
  • 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
  • Executor 线程池中的工作线程最终负责【处理请求】

Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同

  • 如果总线程数达到 maximumPoolSize,这时不会立刻抛 RejectedExecutionException 异常,而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

Connector 配置
在这里插入图片描述
Executor 线程配置
在这里插入图片描述
在这里插入图片描述

Fork/Join 线程池

共享模型之工具 - JUC

AQS原理

全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架,有以下几个特点:

  1. AQS 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
  • getState - 获取 state 状态
  • setState - 设置 state 状态
  • compareAndSetState - cas 机制设置 state 状态
  1. 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源

  2. 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList

  3. 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

如何使用?子类要实现这样一些方法(默认抛出 UnsupportedOperationException)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

获取锁的姿势

// 如果获取锁失败
if (!tryAcquire(arg)) {// 入队, 可以选择阻塞当前线程 park unpark
}

释放锁的姿势

// 如果释放锁成功
if (tryRelease(arg)) {// 让阻塞线程恢复运行
}

AQS 自定义锁

实现一个不可重入锁

@Slf4j(topic = "c.TestAqs")
public class TestAqs {public static void main(String[] args) {MyLock lock = new MyLock();new Thread(() -> {lock.lock();try {log.debug("locking...");sleep(1);} finally {log.debug("unlocking...");lock.unlock();}},"t1").start();new Thread(() -> {lock.lock();try {log.debug("locking...");} finally {log.debug("unlocking...");lock.unlock();}},"t2").start();}
}// 自定义锁(不可重入锁)
class MyLock implements Lock {// 独占锁 同步器类class MySync extends AbstractQueuedSynchronizer {@Overrideprotected boolean tryAcquire(int arg) { //arg参数在可重入锁中才会用到if(compareAndSetState(0, 1)) {// 加上了锁,并设置 owner 为当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Overrideprotected boolean tryRelease(int arg) {setExclusiveOwnerThread(null);setState(0); //state被volatile修饰了,放在后面,这样可以保证setState之前的代码不会被重排到setState后面return true;}@Override // 是否持有独占锁protected boolean isHeldExclusively() {return getState() == 1;}public Condition newCondition() {return new ConditionObject();}}private MySync sync = new MySync();@Override // 加锁(不成功会进入等待队列)public void lock() {sync.acquire(1);}@Override // 加锁,可打断public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Override // 尝试加锁(一次)public boolean tryLock() {return sync.tryAcquire(1);}@Override // 尝试加锁,带超时public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}@Override // 解锁public void unlock() {sync.release(1);}@Override // 创建条件变量public Condition newCondition() {return sync.newCondition();}
}

同步器类继承自AQS,大部分的方法已经实现好了。

ReentrantLock 非公平锁原理

在这里插入图片描述
加锁流程:
先从构造器开始看,默认为非公平锁实现

public ReentrantLock() {sync = new NonfairSync();
}

没有竞争时

final void lock() {if (compareAndSetState(0, 1)) //cas修改statesetExclusiveOwnerThread(Thread.currentThread()); //owner改为当前线程elseacquire(1);
}

在这里插入图片描述
第一个竞争出现时,进入 acquire()

public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}

在这里插入图片描述
Thread-1 执行了

  • CAS 尝试将 state 由 0 改为 1,结果失败
  • 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
  • 接下来进入 addWaiter 逻辑,构造 Node 队列
    图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
    Node 的创建是懒惰的
    其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
    在这里插入图片描述

当前线程进入 acquireQueued 逻辑

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
    在这里插入图片描述
  4. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时
    state 仍为 1,失败
  5. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回
    true。(waitStatus为-1表示负责唤醒链表的下一个thread)
  6. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)。
    在这里插入图片描述
    再次有多个线程经历上述过程竞争失败,变成这个样子
    在这里插入图片描述

释放锁流程:

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}

Thread-0 释放锁,进入 tryRelease 流程,如果成功,设置 exclusiveOwnerThread = null,state = 0
在这里插入图片描述
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程,再一次tryAcquire
在这里插入图片描述
解锁后竞争成功:
如果加锁成功(没有竞争),会设置
exclusiveOwnerThread 为 Thread-1,state = 1
head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
原本的 head 因为从链表断开,而可被垃圾回收

解锁后竞争失败:
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
在这里插入图片描述
如果不巧又被 Thread-4 占了先
Thread-4 被设置为 exclusiveOwnerThread,state = 1
Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

ReentrantLock 可重入原理

state = 1时,如果发现还是当前线程来加锁,那么会设置一个计数器 nextc 并自增。解锁时只有当 nextc = 0 时才会真正释放锁

static final class NonfairSync extends Sync {// Sync 继承过来的方法, 方便阅读, 放在此处final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入else if (current == getExclusiveOwnerThread()) {// state++int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryRelease(int releases) {// state--int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 支持锁重入, 只有 state 减为 0, 才释放成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
}

ReentrantLock 可打断原理

不可打断模式 下,即使线程被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

在不可打断模式下,阻塞的线程被打断后,parkAndCheckInterrupt() 方法返回 true,进入 if 块,将 interrupted 变量设为 true。然后继续下一次 for 循环尝试获取锁,如果没有获取到锁,又会被阻塞。直到获取锁以后,才能返回打断状态,最终在 selfInterrupt() 设置中断标记,表示该线程被打断过。

// Sync 继承自 AQS
static final class NonfairSync extends Sync {// ...private final boolean parkAndCheckInterrupt() {// 如果打断标记已经是 true, 则 park 会失效LockSupport.park(this);// interrupted 会清除打断标记return Thread.interrupted();}final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null;failed = false;// 还是需要获得锁后, 才能返回打断状态return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 如果是因为 interrupt 被唤醒, 返回打断状态为 trueinterrupted = true;}}} finally {if (failed)cancelAcquire(node);}}public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {// 如果打断状态为 trueselfInterrupt();}}static void selfInterrupt() {// 重新产生一次中断Thread.currentThread().interrupt();}
}

打断模式:如果被打断了,继续向下运行,直接抛出异常

static final class NonfairSync extends Sync {public final void acquireInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 如果没有获得到锁, 进入 ㈠if (!tryAcquire(arg))doAcquireInterruptibly(arg);}// ㈠ 可打断的获取锁流程private void doAcquireInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 在 park 过程中如果被 interrupt 会进入此// 这时候抛出异常, 而不会再次进入 for (;;)throw new InterruptedException();}}} finally {if (failed)cancelAcquire(node);}}
}

ReentrantLock 公平锁原理

非公平锁:tryAcquire 时发现只要 state 为 0,就尝试获取锁,不去检查 AQS 队列

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// 如果还没有获得锁if (c == 0) {// 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入else if (current == getExclusiveOwnerThread()) {// state++int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}// 获取失败, 回到调用处return false;
}

公平锁:先检查 AQS 队列中是否有前驱节点, 没有才去竞争

// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;// h != t 时表示队列中有 Nodereturn h != t &&(// (s = h.next) == null 表示队列中还有没有老二(s = h.next) == null ||// 或者队列中老二线程不是此线程s.thread != Thread.currentThread());
}

ReentrantLock 条件变量原理

共享模型之工具 - 读写锁

ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。
类似于数据库中的 select … from … lock in share mode
注意:写-写、读-写还是互斥的。

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

@Slf4j(topic = "c.TestReadWriteLock")
public class TestReadWriteLock {public static void main(String[] args) throws InterruptedException {DataContainer dataContainer = new DataContainer();new Thread(() -> {dataContainer.read();}, "t1").start();new Thread(() -> {dataContainer.read();}, "t2").start();}
}@Slf4j(topic = "c.DataContainer")
class DataContainer {private Object data;private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();private ReentrantReadWriteLock.ReadLock r = rw.readLock();private ReentrantReadWriteLock.WriteLock w = rw.writeLock();public Object read() {log.debug("获取读锁...");r.lock();try {log.debug("读取");sleep(1);return data;} finally {log.debug("释放读锁...");r.unlock();}}public void write() {log.debug("获取写锁...");w.lock();try {log.debug("写入");sleep(1);} finally {log.debug("释放写锁...");w.unlock();}}
}

注意:

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
r.lock();
try {// ...w.lock();try {// ...} finally{w.unlock();}
} finally{r.unlock();
}
  • 重入时降级支持:即持有写锁的情况下去获取读锁
// ReentrantLock源码
class CachedData {Object data;// 是否有效,如果失效,需要重新计算 datavolatile boolean cacheValid;final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();void processCachedData() {rwl.readLock().lock(); //先获取读锁,判断cache是否失效if (!cacheValid) { //cache失效// 获取写锁前必须释放读锁rwl.readLock().unlock();rwl.writeLock().lock();try {// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新if (!cacheValid) {data = ...cacheValid = true;}// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存rwl.readLock().lock();} finally {rwl.writeLock().unlock();}}// cache没有失效,自己用完数据,释放读锁try {use(data);} finally {rwl.readLock().unlock();}}
}

以上例子中,线程加读锁读取cache时发现cache失效了,先释放读锁,再加写锁。写完后先降级为读锁,然后释放写锁,此时别的线程不能读也不能写,自己读完以后释放读锁。

ReentrantReadWriteLock 应用 - 缓存

ReentrantReadWriteLock 原理

StampedLock

ReentrantReadWriteLock 基于AQS,底层用cas修改状态,与不加锁的方式相比,性能有限。

StampedLock 类自 JDK 8 加入,是为了进一步优化读性能;
它的特点是在使用读锁、写锁时都必须配合【戳】使用。

读锁:

long stamp = lock.readLock();
lock.unlockRead(stamp);

写锁:

long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读,相当于没加锁),读取完毕后需要做一次 戳校验 。如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){// 锁升级
}

例子:

@Slf4j(topic = "c.TestStampedLock")
public class TestStampedLock {public static void main(String[] args) {DataContainerStamped dataContainer = new DataContainerStamped(1);new Thread(() -> {dataContainer.read(1);}, "t1").start();sleep(0.5);new Thread(() -> {dataContainer.read(0);}, "t2").start();}
}@Slf4j(topic = "c.DataContainerStamped")
class DataContainerStamped {private int data;private final StampedLock lock = new StampedLock();public DataContainerStamped(int data) {this.data = data;}public int read(int readTime) {long stamp = lock.tryOptimisticRead();log.debug("optimistic read locking...{}", stamp);sleep(readTime);if (lock.validate(stamp)) {log.debug("read finish...{}, data:{}", stamp, data);return data;}// 锁升级 - 读锁log.debug("updating to read lock... {}", stamp);try {stamp = lock.readLock();log.debug("read lock {}", stamp);sleep(readTime);log.debug("read finish...{}, data:{}", stamp, data);return data;} finally {log.debug("read unlock {}", stamp);lock.unlockRead(stamp);}}public void write(int newData) {long stamp = lock.writeLock();log.debug("write lock {}", stamp);try {sleep(2);this.data = newData;} finally {log.debug("write unlock {}", stamp);lock.unlockWrite(stamp);}}
}
  • 读锁和写锁都会返回一个stamp
  • 第一次读是 tryOptimisticRead() 获取 乐观读锁,读取完后判断 stamp 是否被改过。没被改过直接return。如果被改过,进行 锁升级,即获取读锁。

StampedLock 的局限性:

  • StampedLock 不支持条件变量
  • StampedLock 不支持可重入

Semaphore

信号量,用来限制能同时访问共享资源的线程上限

// 1. 创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);
// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {new Thread(() -> {// 3. 获取许可try {semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}try {log.debug("running...");sleep(1);log.debug("end...");} finally {// 4. 释放许可semaphore.release();}}).start();
}

使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)

用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好,注意下面的实现中线程数和数据库连接数是相等的。

Semaphore原理:

CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时。(一般是主线程等待其他线程)
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(3);new Thread(() -> {log.debug("begin...");sleep(1);latch.countDown();log.debug("end...{}", latch.getCount());}).start();new Thread(() -> {log.debug("begin...");sleep(2);latch.countDown();log.debug("end...{}", latch.getCount());}).start();new Thread(() -> {log.debug("begin...");sleep(1.5);latch.countDown();log.debug("end...{}", latch.getCount());}).start();log.debug("waiting...");latch.await();log.debug("wait end...");
}

虽然 CountdownLatch 用法上和 join 类似,但如果配合线程池使用,就必须用 CountdownLatch:

public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(3);ExecutorService service = Executors.newFixedThreadPool(4);service.submit(() -> {log.debug("begin...");sleep(1);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(() -> {log.debug("begin...");sleep(1.5);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(() -> {log.debug("begin...");sleep(2);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(()->{try {log.debug("waiting...");latch.await();log.debug("wait end...");} catch (InterruptedException e) {e.printStackTrace();}});
}

CountdownLatch 应用1:同步等待多线程准备完毕

CountdownLatch 应用2:同步等待多个远程调用结束

CyclicBarrier

一个需求:如果希望CountdownLatch倒计时被执行多次,可以放在for循环里。但是CountdownLatch不能重用(即不能重新设置倒计时),必须重新new一个CountdownLatch对象

CyclicBarrier,循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行

public static void main(String[] args) {CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行for(int i = 0; i < 3; i ++) {new Thread(()->{System.out.println("线程1开始.."+new Date());try {cb.await(); // 当个数不足时,等待} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}System.out.println("线程1继续向下运行..."+new Date());}).start();new Thread(()->{System.out.println("线程2开始.."+new Date());try { Thread.sleep(2000); } catch (InterruptedException e) { }try {cb.await(); // 2 秒后,线程个数够2,继续运行} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}System.out.println("线程2继续向下运行..."+new Date());}).start();}
}

CyclicBarrier 和 CountdownLatch 用法几乎一样,唯一不同的是CyclicBarrier倒计时结束后会恢复计数

线程安全类

在这里插入图片描述
遗留的安全集合通常在方法上加synchronized关键字,而修饰的安全集合也是通过加锁实现,这两种集合性能较差,不推荐

重点介绍 java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词:Blocking、CopyOnWrite、Concurrent

  • Blocking 大部分实现基于锁,并提供用来阻塞的方法
  • CopyOnWrite 之类容器修改开销相对较重
  • Concurrent 类型的容器:
    • 内部很多操作使用 cas 优化,一般可以提供较高吞吐量
    • 弱一致性
      • 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
      • 求大小弱一致性,size 操作未必是 100% 准确
      • 读取弱一致性

遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出
ConcurrentModificationException,不再继续遍历

ConcurrentHashMap

案例:单词计数,有26个英文字母,每个字母有200个,打乱顺序后写入txt中。用多线程读取26个txt,统计每个字母的数量。

demo(// 创建 map 集合// 创建 ConcurrentHashMap 对不对?() -> new HashMap<String, Integer>(),// 进行计数(map, words) -> {for (String word : words) {Integer counter = map.get(word);int newValue = counter == null ? 1 : counter + 1;map.put(word, newValue);}}
);

以上代码会有并发安全问题,把 HashMap 换成 ConcurrentHashMap 仍有问题,原因是get方法和put方法各自是线程安全的,但他们的组合不是原子的,会有并发安全问题。

解决方法:computeIfAbsent 方法,如果key不存在,则new一个LongAdder对象

demo(() -> new ConcurrentHashMap<String, LongAdder>(),(map, words) -> {for (String word : words) {// 注意不能使用 putIfAbsent,此方法返回的是上一次的 value,首次调用返回 nullLongAdder value = map.computeIfAbsent(word, (key) -> new LongAdder());value.increment();}}
);

HashMap并发死链

HashMap原理:拉链法解决桶下标相同,jdk8中加入链表的元素放在链表尾部(尾插法),而jdk7中加入链表的元素放在链表头部(头插法)。当链表长度超过阈值时(数组长度的四分之三),会触发数组扩容,每个元素重新计算下标。

并发死链:jdk7中,多线程环境下HashMap扩容导致死循环。
JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能
够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)

ConcurrentHashMap原理

重要属性和内部类

// 默认为 0
// 当初始化时, 为 -1
// 当扩容时, 为 -(1 + 扩容线程数)
// 当初始化或扩容完成后,为 下一次的扩容的阈值大小
private transient volatile int sizeCtl;
// 整个 ConcurrentHashMap 就是一个 Node[]
static class Node<K,V> implements Map.Entry<K,V> {}
// hash 表
transient volatile Node<K,V>[] table;
// 扩容时的 新 hash 表
private transient volatile Node<K,V>[] nextTable;
// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
static final class ForwardingNode<K,V> extends Node<K,V> {}
// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V> {}
// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V> {}
// 作为 treebin 的节点, 存储 parent, left, right
static final class TreeNode<K,V> extends Node<K,V> {}

ForwardingNode的作用:当桶数组扩容时,按照旧数组下标从后往前的顺序,依次将元素放到新数组,如果某个下标的链表都处理完了,就在该下标加上ForwardingNode(hash值为-1)。当有其他线程处理该下标时,读到ForwardingNode就说明这个下标已经被处理过了。或者当其他线程get时,读到ForwardingNode,就说明要到新数组里面get。

TreeBin 和 TreeNode 是红黑树的节点。JDK8以后,当链表长度大于8时会先尝试扩容,如果数组长度已经扩容到了64,则会转化成一棵红黑树

重要方法:

// 获取 Node[] 中第 i 个 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
// cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)
// 直接修改 Node[] 中第 i 个 Node 的值, v 为新值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)

构造方法:
可以看到实现了懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel) // Use at least as many binsinitialCapacity = concurrencyLevel; // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);// tableSizeFor 仍然是保证计算的大小是 2^n, 即 16,32,64 ...int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap;
}

get 流程:

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// spread 方法能确保返回结果是正数int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// 如果头结点已经是要查找的 keyif ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}// hash 为负数表示该 bin 在扩容中或是 treebin, 这时调用 find 方法来查找else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;// 正常遍历链表, 用 equals 比较while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}

总结:

  1. 计算哈希并定位桶
  2. 查找:
    • 如果头结点就是目标节点,直接返回
    • 如果是链表,遍历链表查找
    • 如果是红黑树,走树查找逻辑
  3. 无需加锁,因为:value 是 volatile 修饰,保证可见性,get 是无锁操作,高并发下性能好

put 流程:

public V put(K key, V value) {return putVal(key, value, false);
}final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 其中 spread 方法会综合高位低位, 具有更好的hash性,可以保证hash为非负数int hash = spread(key.hashCode());int binCount = 0;for (Node<K, V>[] tab = table; ; ) {// f 是链表头节点// fh 是链表头结点的 hash// i 是链表在 table 中的下标Node<K, V> f;int n, i, fh;// 要创建 tableif (tab == null || (n = tab.length) == 0)// 初始化 table 使用了 cas, 无需 synchronized 创建成功, 其他线程进入下一轮循环tab = initTable();// 要创建链表头节点else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 添加链表头使用了 cas, 无需 synchronizedif (casTabAt(tab, i, null,new Node<K, V>(hash, key, value, null)))break;}// 帮忙扩容else if ((fh = f.hash) == MOVED)// 帮忙之后, 进入下一轮循环tab = helpTransfer(tab, f);else { // 一定发生了桶下标冲突V oldVal = null;// 锁住链表头节点synchronized (f) {// 再次确认链表头节点没有被移动if (tabAt(tab, i) == f) {// 头节点大于0,是链表if (fh >= 0) {binCount = 1;// 遍历链表for (Node<K, V> e = f; ; ++binCount) {K ek;// 找到相同的 keyif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;// 更新if (!onlyIfAbsent)e.val = value;break;}Node<K, V> pred = e;// 已经是最后的节点了, 新增 Node, 追加至链表尾if ((e = e.next) == null) {pred.next = new Node<K, V>(hash, key,value, null);break;}}}// 头节点小于0,是红黑树else if (f instanceof TreeBin) {Node<K, V> p;binCount = 2;// putTreeVal 会看 key 是否已经在树中, 是, 则返回对应的 TreeNodeif ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}// 释放链表头节点的锁}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)// 如果链表长度 >= 树化阈值(8), 进行链表转为红黑树treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 增加 size 计数addCount(1L, binCount);return null;
}

addCount 和 longAdder 原理类似,用到了累加单元,作用有两个:size+1,扩容

总结:

  1. 计算 hash,对 key 进行哈希后扰动处理
  2. 找对应的桶(数组位置),Node<K,V>[] table 是底层数组结构,如果该位置是 null,使用 CAS 插入新节点
  3. 如果桶不为空,判断当前桶是否为:
    • 链表:遍历链表看是否 key 已存在,若存在则替换 value,否则尾插
    • 红黑树:则使用红黑树方式插入
  4. 插入后判断是否需要树化(链表长度 ≥ 8):如果数组容量 ≥ 64,就将链表转为红黑树
  5. 并发控制:非空桶使用 synchronized 锁住链表头节点,实现锁粒度控制,避免了整个数组加锁,提高并发性能

LinkedBlockingQueue 原理

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {static class Node<E> {E item;/*** 下列三种情况之一* - 真正的后继节点* - 自己, 发生在出队时* - null, 表示是没有后继节点, 是最后了*/Node<E> next;Node(E x) { item = x; }}
}

在这里插入图片描述
出队操作看文档

加锁分析:用了两把锁和 dummy 节点

  • 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
  • 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
    • 消费者与消费者线程仍然串行
    • 生产者与生产者线程仍然串行
// 用于 put(阻塞) offer(非阻塞)
private final ReentrantLock putLock = new ReentrantLock();
// 用户 take(阻塞) poll(非阻塞)
private final ReentrantLock takeLock = new ReentrantLock();

当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是
head 节点的线程安全。两把锁保证了入队和出队没有竞争

当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争(如果没有 dummy 节点,那么两把锁锁住同一个对象,锁就失效了)

当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞

LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较

  • Linked 支持有界,Array 强制有界
  • Linked 实现是链表,Array 实现是数组
  • Linked 是懒惰初始化,而 Array 需要提前初始化 Node 数组
  • Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
  • Linked 两把锁,Array 一把锁

ConcurrentLinkedQueue 原理

ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像,也是
两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争

只是【锁】使用了 cas 来实现

CopyOnWriteArrayList

底层实现采用了 写入时拷贝 的思想,增删改操作会将底层数组拷贝一份,更改操作在新数组上执行,这时不影响其它线程的并发读,读写分离。适合『读多写少』的应用场景

CopyOnWriteArraySet 的底层是 CopyOnWriteArrayList

http://www.xdnf.cn/news/511201.html

相关文章:

  • discuz X3.5批量新建用户
  • Leetcode 3551. Minimum Swaps to Sort by Digit Sum
  • BAT32 Could not stop Cortex-M device
  • 如何根据三点求圆心
  • 多模态大语言模型arxiv论文略读(八十一)
  • 【Leetcode】取余/2的幂次方
  • ABP vNext 多租户系统实现登录页自定义 Logo 的最佳实践
  • CSS- 4.3 绝对定位(position: absolute)学校官网导航栏实例
  • LLM大语言模型系列1-token
  • Linux干货(六)
  • 机器学习-人与机器生数据的区分模型测试 - 模型选择与微调
  • Redis 学习笔记 4:优惠券秒杀
  • 单目测距和双目测距 bev 3D车道线
  • 如何快速显示首屏页面
  • 接口——类比摄像
  • Java大厂求职面试:探讨Spring Boot与微服务架构
  • StarRocks Community Monthly Newsletter (Apr)
  • 你引入的lodash充分利用了吗?
  • Python 条件语句详解
  • SAP集团内部公司间交易自动开票
  • Python高级特性深度解析:从熟练到精通的跃迁之路
  • JAVA学习-练习试用Java实现“音频文件的读取与写入 :使用Java音频库处理音频数据”
  • 《从零开始:Spring Cloud Eureka 配置与服务注册全流程》​
  • 主成分分析的应用之sklearn.decomposition模块的PCA函数
  • 初学c语言15(字符和字符串函数)
  • (5)python爬虫--BeautifulSoup(bs4)
  • 01 CentOS根分区满了扩容
  • 2025年- H30-Lc138- 141.环形链表(快慢指针,快2慢1)---java版
  • 学习是有方法的——费曼学习法
  • 先说爱的人为什么先离开