【零碎小知识点 】(四) Java多线程编程深入与实践
1. 线程基础
线程创建与生命周期
public class ThreadBasics {public static void main(String[] args) {// 1. 继承Thread类MyThread thread1 = new MyThread();thread1.start(); // 启动线程// 2. 实现Runnable接口Thread thread2 = new Thread(new MyRunnable());thread2.start();// 3. 实现Callable接口 + FutureTaskFutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());Thread thread3 = new Thread(futureTask);thread3.start();try {// 获取Callable的返回值Integer result = futureTask.get();System.out.println("Callable返回值: " + result);} catch (Exception e) {e.printStackTrace();}// 4. 使用Lambda表达式Thread thread4 = new Thread(() -> {System.out.println("Lambda线程: " + Thread.currentThread().getName());});thread4.start();// 5. 线程池方式(后面详细讲解)// 演示线程状态demonstrateThreadStates();}// 演示线程状态转换private static void demonstrateThreadStates() {System.out.println("\n=== 线程状态演示 ===");Thread thread = new Thread(() -> {try {Thread.sleep(1000); // TIMED_WAITING状态synchronized (ThreadBasics.class) {ThreadBasics.class.wait(); // WAITING状态}} catch (InterruptedException e) {e.printStackTrace();}});System.out.println("新建后状态: " + thread.getState()); // NEWthread.start();System.out.println("启动后状态: " + thread.getState()); // RUNNABLEtry {Thread.sleep(100);System.out.println("睡眠中状态: " + thread.getState()); // TIMED_WAITINGThread.sleep(1000);System.out.println("等待中状态: " + thread.getState()); // WAITINGsynchronized (ThreadBasics.class) {ThreadBasics.class.notify(); // 唤醒线程}Thread.sleep(100);System.out.println("运行后状态: " + thread.getState()); // TERMINATED} catch (InterruptedException e) {e.printStackTrace();}}
}// 方式1: 继承Thread类
class MyThread extends Thread {@Overridepublic void run() {System.out.println("继承Thread: " + Thread.currentThread().getName());}
}// 方式2: 实现Runnable接口
class MyRunnable implements Runnable {@Overridepublic void run() {System.out.println("实现Runnable: " + Thread.currentThread().getName());}
}// 方式3: 实现Callable接口
class MyCallable implements Callable<Integer> {@Overridepublic Integer call() throws Exception {System.out.println("实现Callable: " + Thread.currentThread().getName());return 42; // 返回结果}
}
进程与线程的区别
public class ProcessVsThread {public static void main(String[] args) {// 进程示例:启动一个外部程序try {Process process = Runtime.getRuntime().exec("notepad.exe");System.out.println("进程ID: " + process.pid());// 等待进程结束process.waitFor();System.out.println("进程退出值: " + process.exitValue());} catch (Exception e) {e.printStackTrace();}// 线程示例:创建多个线程for (int i = 0; i < 3; i++) {new Thread(() -> {System.out.println("线程 " + Thread.currentThread().getName() + " 运行中,进程ID: " + ProcessHandle.current().pid());}).start();}System.out.println("主线程进程ID: " + ProcessHandle.current().pid());}
}
2. 线程安全
原子性、可见性、有序性问题
public class ThreadSafetyIssues {private static int count = 0; // 共享变量private static volatile boolean flag = false; // volatile测试private static int a = 0, b = 0, x = 0, y = 0; // 指令重排测试public static void main(String[] args) throws InterruptedException {// 1. 原子性问题演示demonstrateAtomicityIssue();// 2. 可见性问题演示demonstrateVisibilityIssue();// 3. 有序性问题演示demonstrateOrderingIssue();}// 原子性问题:count++不是原子操作private static void demonstrateAtomicityIssue() throws InterruptedException {System.out.println("\n=== 原子性问题演示 ===");count = 0;Thread t1 = new Thread(() -> {for (int i = 0; i < 10000; i++) {count++; // 不是原子操作}});Thread t2 = new Thread(() -> {for (int i = 0; i < 10000; i++) {count++; // 不是原子操作}});t1.start();t2.start();t1.join();t2.join();System.out.println("期望值: 20000, 实际值: " + count);}// 可见性问题:一个线程修改了变量,另一个线程可能看不到private static void demonstrateVisibilityIssue() throws InterruptedException {System.out.println("\n=== 可见性问题演示 ===");flag = false;// 线程1:修改flagThread t1 = new Thread(() -> {try {Thread.sleep(100); // 确保线程2先运行flag = true;System.out.println("线程1设置flag为true");} catch (InterruptedException e) {e.printStackTrace();}});// 线程2:循环读取flagThread t2 = new Thread(() -> {while (!flag) {// 空循环,等待flag变为true}System.out.println("线程2检测到flag为true");});t2.start();t1.start();t1.join();t2.join();}// 有序性问题:指令重排可能导致意外结果private static void demonstrateOrderingIssue() throws InterruptedException {System.out.println("\n=== 有序性问题演示 ===");int detected = 0;for (int i = 0; i < 100000; i++) {a = 0; b = 0; x = 0; y = 0;Thread t1 = new Thread(() -> {a = 1; // 语句1x = b; // 语句2});Thread t2 = new Thread(() -> {b = 1; // 语句3y = a; // 语句4});t1.start();t2.start();t1.join();t2.join();// 如果没有指令重排,x和y不可能同时为0if (x == 0 && y == 0) {detected++;System.out.println("检测到指令重排: x=" + x + ", y=" + y);break;}}if (detected == 0) {System.out.println("未检测到指令重排");}}
}
synchronized关键字
public class SynchronizedPractice {private static int count = 0;private static final Object lock = new Object();public static void main(String[] args) throws InterruptedException {// 1. 同步代码块Thread t1 = new Thread(() -> {for (int i = 0; i < 10000; i++) {synchronized (lock) { // 同步代码块count++;}}});Thread t2 = new Thread(() -> {for (int i = 0; i < 10000; i++) {synchronized (lock) { // 同步代码块count++;}}});t1.start();t2.start();t1.join();t2.join();System.out.println("同步后计数: " + count);// 2. 同步方法Counter counter = new Counter();Thread t3 = new Thread(() -> {for (int i = 0; i < 10000; i++) {counter.increment();}});Thread t4 = new Thread(() -> {for (int i = 0; i < 10000; i++) {counter.increment();}});t3.start();t4.start();t3.join();t4.join();System.out.println("同步方法计数: " + counter.getCount());// 3. 静态同步方法demonstrateStaticSynchronized();// 4. 锁升级过程演示(需要通过JVM参数观察)demonstrateLockUpgrade();}// 静态同步方法private static void demonstrateStaticSynchronized() throws InterruptedException {System.out.println("\n=== 静态同步方法演示 ===");count = 0;Thread t1 = new Thread(() -> {for (int i = 0; i < 10000; i++) {incrementStatic();}});Thread t2 = new Thread(() -> {for (int i = 0; i < 10000; i++) {incrementStatic();}});t1.start();t2.start();t1.join();t2.join();System.out.println("静态同步方法计数: " + count);}// 静态同步方法private static synchronized void incrementStatic() {count++;}// 锁升级过程演示(需要通过JVM参数观察)private static void demonstrateLockUpgrade() {System.out.println("\n=== 锁升级过程演示 ===");System.out.println("请使用JVM参数运行以观察锁升级: -XX:+PrintFlagsFinal");System.out.println("偏向锁延迟: -XX:BiasedLockingStartupDelay=0");System.out.println("禁用偏向锁: -XX:-UseBiasedLocking");Object obj = new Object();// 初始状态:无锁System.out.println("初始状态: 无锁");// 第一次加锁:偏向锁synchronized (obj) {System.out.println("第一次加锁: 偏向锁");}// 有竞争时:轻量级锁Thread t1 = new Thread(() -> {synchronized (obj) {System.out.println("线程1加锁: 轻量级锁");}});Thread t2 = new Thread(() -> {synchronized (obj) {System.out.println("线程2加锁: 轻量级锁");}});t1.start();t2.start();try {t1.join();t2.join();} catch (InterruptedException e) {e.printStackTrace();}// 重度竞争时:重量级锁for (int i = 0; i < 10; i++) {new Thread(() -> {synchronized (obj) {try {Thread.sleep(100); // 模拟耗时操作} catch (InterruptedException e) {e.printStackTrace();}}}).start();}System.out.println("重度竞争: 重量级锁");}
}// 同步方法示例
class Counter {private int count = 0;// 同步方法public synchronized void increment() {count++;}public synchronized int getCount() {return count;}
}
volatile关键字
public class VolatilePractice {private static volatile boolean running = true;private static volatile int count = 0;public static void main(String[] args) throws InterruptedException {// 1. 保证可见性Thread t1 = new Thread(() -> {System.out.println("线程1启动");while (running) {// 空循环}System.out.println("线程1结束");});t1.start();Thread.sleep(100);running = false; // 主线程修改running,t1线程可见System.out.println("主线程设置running为false");// 2. 不保证原子性for (int i = 0; i < 10; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {count++; // 即使volatile也不保证原子性}}).start();}Thread.sleep(2000);System.out.println("volatile计数: " + count + " (期望: 10000)");// 3. 禁止指令重排demonstrateReorderPrevention();}// 演示volatile禁止指令重排private static void demonstrateReorderPrevention() {System.out.println("\n=== volatile禁止指令重排 ===");// 单例模式中的双重检查锁定Singleton instance1 = Singleton.getInstance();Singleton instance2 = Singleton.getInstance();System.out.println("单例实例相同: " + (instance1 == instance2));}
}// 单例模式:双重检查锁定 + volatile
class Singleton {private static volatile Singleton instance;private Singleton() {System.out.println("Singleton实例化");}public static Singleton getInstance() {if (instance == null) { // 第一次检查synchronized (Singleton.class) {if (instance == null) { // 第二次检查instance = new Singleton(); // volatile防止指令重排}}}return instance;}
}
3. JUC包
锁机制
import java.util.concurrent.locks.*;public class LockPractice {private static int count = 0;private static final ReentrantLock reentrantLock = new ReentrantLock();private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();public static void main(String[] args) throws InterruptedException {// 1. ReentrantLock基本使用demonstrateReentrantLock();// 2. 可中断锁demonstrateInterruptibleLock();// 3. 公平锁 vs 非公平锁demonstrateFairLock();// 4. 读写锁demonstrateReadWriteLock();// 5. Condition条件变量demonstrateCondition();}private static void demonstrateReentrantLock() throws InterruptedException {System.out.println("\n=== ReentrantLock基本使用 ===");count = 0;Thread t1 = new Thread(() -> {for (int i = 0; i < 10000; i++) {reentrantLock.lock(); // 获取锁try {count++;} finally {reentrantLock.unlock(); // 释放锁}}});Thread t2 = new Thread(() -> {for (int i = 0; i < 10000; i++) {reentrantLock.lock();try {count++;} finally {reentrantLock.unlock();}}});t1.start();t2.start();t1.join();t2.join();System.out.println("ReentrantLock计数: " + count);// 尝试获取锁boolean acquired = reentrantLock.tryLock();if (acquired) {try {System.out.println("尝试获取锁成功");} finally {reentrantLock.unlock();}} else {System.out.println("尝试获取锁失败");}// 带超时的尝试获取锁try {acquired = reentrantLock.tryLock(1, TimeUnit.SECONDS);if (acquired) {try {System.out.println("带超时的尝试获取锁成功");} finally {reentrantLock.unlock();}} else {System.out.println("带超时的尝试获取锁失败");}} catch (InterruptedException e) {e.printStackTrace();}}private static void demonstrateInterruptibleLock() {System.out.println("\n=== 可中断锁 ===");Thread t1 = new Thread(() -> {try {// 可中断地获取锁reentrantLock.lockInterruptibly();try {System.out.println("线程1获取到锁,将睡眠5秒");Thread.sleep(5000);} finally {reentrantLock.unlock();}} catch (InterruptedException e) {System.out.println("线程1在等待锁时被中断");}});Thread t2 = new Thread(() -> {try {// 可中断地获取锁reentrantLock.lockInterruptibly();try {System.out.println("线程2获取到锁");} finally {reentrantLock.unlock();}} catch (InterruptedException e) {System.out.println("线程2在等待锁时被中断");}});t1.start();try {Thread.sleep(100); // 确保t1先获取锁} catch (InterruptedException e) {e.printStackTrace();}t2.start();// 2秒后中断t2try {Thread.sleep(2000);t2.interrupt(); // 中断t2线程} catch (InterruptedException e) {e.printStackTrace();}}private static void demonstrateFairLock() {System.out.println("\n=== 公平锁 vs 非公平锁 ===");// 公平锁ReentrantLock fairLock = new ReentrantLock(true);// 非公平锁ReentrantLock unfairLock = new ReentrantLock(false);System.out.println("公平锁: " + fairLock.isFair());System.out.println("非公平锁: " + unfairLock.isFair());// 测试公平性testLockFairness(fairLock, "公平锁");testLockFairness(unfairLock, "非公平锁");}private static void testLockFairness(ReentrantLock lock, String lockType) {System.out.println("\n测试" + lockType + "的公平性:");for (int i = 0; i < 5; i++) {final int threadId = i;new Thread(() -> {for (int j = 0; j < 2; j++) {lock.lock();try {System.out.println(lockType + " - 线程" + threadId + "获取到锁");Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}).start();// 稍微延迟启动,让线程按顺序排队try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}private static void demonstrateReadWriteLock() throws InterruptedException {System.out.println("\n=== 读写锁 ===");ReadWriteLock rwLock = new ReentrantReadWriteLock();String[] data = new String[1]; // 共享数据// 写线程Thread writer = new Thread(() -> {rwLock.writeLock().lock(); // 获取写锁try {System.out.println("写线程开始写入数据");Thread.sleep(1000); // 模拟写入耗时data[0] = "Hello, World!";System.out.println("写线程完成数据写入");} catch (InterruptedException e) {e.printStackTrace();} finally {rwLock.writeLock().unlock(); // 释放写锁}});// 读线程Runnable reader = () -> {rwLock.readLock().lock(); // 获取读锁try {System.out.println("读线程 " + Thread.currentThread().getName() + " 读取数据: " + data[0]);Thread.sleep(500); // 模拟读取耗时} catch (InterruptedException e) {e.printStackTrace();} finally {rwLock.readLock().unlock(); // 释放读锁}};writer.start();// 等待写线程开始Thread.sleep(100);// 启动多个读线程for (int i = 0; i < 5; i++) {new Thread(reader, "R" + i).start();}writer.join();}private static void demonstrateCondition() {System.out.println("\n=== Condition条件变量 ===");ReentrantLock lock = new ReentrantLock();Condition condition = lock.newCondition();Thread t1 = new Thread(() -> {lock.lock();try {System.out.println("线程1等待条件");condition.await(); // 等待条件System.out.println("线程1被唤醒");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}});Thread t2 = new Thread(() -> {lock.lock();try {System.out.println("线程2执行一些工作");Thread.sleep(2000);condition.signal(); // 唤醒一个等待线程System.out.println("线程2发出信号");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}});t1.start();t2.start();try {t1.join();t2.join();} catch (InterruptedException e) {e.printStackTrace();}}
}
原子类
import java.util.concurrent.atomic.*;public class AtomicPractice {private static AtomicInteger atomicCount = new AtomicInteger(0);private static int normalCount = 0;public static void main(String[] args) throws InterruptedException {// 1. 基本原子类使用demonstrateBasicAtomic();// 2. CAS原理演示demonstrateCAS();// 3. ABA问题演示demonstrateABAProblem();// 4. 其他原子类demonstrateOtherAtomicClasses();}private static void demonstrateBasicAtomic() throws InterruptedException {System.out.println("\n=== 基本原子类使用 ===");// 使用原子类解决计数问题Thread t1 = new Thread(() -> {for (int i = 0; i < 10000; i++) {atomicCount.incrementAndGet(); // 原子操作}});Thread t2 = new Thread(() -> {for (int i = 0; i < 10000; i++) {atomicCount.incrementAndGet(); // 原子操作}});t1.start();t2.start();t1.join();t2.join();System.out.println("原子计数: " + atomicCount.get());// 对比普通变量normalCount = 0;t1 = new Thread(() -> {for (int i = 0; i < 10000; i++) {normalCount++; // 非原子操作}});t2 = new Thread(() -> {for (int i = 0; i < 10000; i++) {normalCount++; // 非原子操作}});t1.start();t2.start();t1.join();t2.join();System.out.println("普通计数: " + normalCount);// 其他原子操作System.out.println("当前值: " + atomicCount.get());System.out.println("增加10后: " + atomicCount.addAndGet(10));System.out.println("比较并设置(期望20,设置30): " + atomicCount.compareAndSet(20, 30) + ", 当前值: " + atomicCount.get());}private static void demonstrateCAS() {System.out.println("\n=== CAS原理演示 ===");// 模拟CAS操作int expected = atomicCount.get();int newValue = expected + 5;while (true) {int current = atomicCount.get();if (atomicCount.compareAndSet(current, current + 5)) {System.out.println("CAS成功: " + current + " -> " + (current + 5));break;} else {System.out.println("CAS失败,重试");}}}private static void demonstrateABAProblem() {System.out.println("\n=== ABA问题演示 ===");AtomicInteger value = new AtomicInteger(10);// 线程1:A->B->AThread t1 = new Thread(() -> {System.out.println("线程1: A->B " + value.compareAndSet(10, 20));System.out.println("线程1: B->A " + value.compareAndSet(20, 10));});// 线程2:检查A,但期间值可能经历了A->B->A的变化Thread t2 = new Thread(() -> {try {Thread.sleep(100); // 确保线程1先执行} catch (InterruptedException e) {e.printStackTrace();}boolean success = value.compareAndSet(10, 30);System.out.println("线程2: A->30 " + success + ", 最终值: " + value.get());});t1.start();t2.start();try {t1.join();t2.join();} catch (InterruptedException e) {e.printStackTrace();}// 使用AtomicStampedReference解决ABA问题System.out.println("\n使用AtomicStampedReference解决ABA问题:");AtomicStampedReference<Integer> stampedRef = new AtomicStampedReference<>(10, 0);t1 = new Thread(() -> {int stamp = stampedRef.getStamp();System.out.println("线程1: A->B " + stampedRef.compareAndSet(10, 20, stamp, stamp + 1));stamp = stampedRef.getStamp();System.out.println("线程1: B->A " + stampedRef.compareAndSet(20, 10, stamp, stamp + 1));});t2 = new Thread(() -> {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}int stamp = stampedRef.getStamp();boolean success = stampedRef.compareAndSet(10, 30, stamp, stamp + 1);System.out.println("线程2: A->30 " + success + ", 最终值: " + stampedRef.getReference() +", 版本: " + stampedRef.getStamp());});t1.start();t2.start();}private static void demonstrateOtherAtomicClasses() {System.out.println("\n=== 其他原子类 ===");// AtomicLongAtomicLong atomicLong = new AtomicLong(100L);System.out.println("AtomicLong: " + atomicLong.getAndAdd(10));// AtomicBooleanAtomicBoolean atomicBoolean = new AtomicBoolean(true);System.out.println("AtomicBoolean: " + atomicBoolean.compareAndSet(true, false));// AtomicReferenceAtomicReference<String> atomicReference = new AtomicReference<>("Hello");System.out.println("AtomicReference: " + atomicReference.compareAndSet("Hello", "World"));System.out.println("当前值: " + atomicReference.get());// AtomicIntegerArrayint[] array = {1, 2, 3};AtomicIntegerArray atomicArray = new AtomicIntegerArray(array);atomicArray.compareAndSet(1, 2, 20);System.out.println("AtomicIntegerArray: " + atomicArray.toString());// AtomicIntegerFieldUpdaterclass Data {volatile int value;}Data data = new Data();AtomicIntegerFieldUpdater<Data> updater = AtomicIntegerFieldUpdater.newUpdater(Data.class, "value");updater.set(data, 100);System.out.println("AtomicIntegerFieldUpdater: " + updater.get(data));}
}
线程池
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class ThreadPoolPractice {public static void main(String[] args) {// 1. ThreadPoolExecutor七大参数demonstrateThreadPoolExecutor();// 2. 常见线程池demonstrateCommonPools();// 3. 线程池工作流程demonstrateThreadPoolWorkflow();// 4. 拒绝策略demonstrateRejectionPolicies();// 5. 线程工厂demonstrateThreadFactory();// 6. 定时任务线程池demonstrateScheduledThreadPool();}private static void demonstrateThreadPoolExecutor() {System.out.println("\n=== ThreadPoolExecutor七大参数 ===");// 1. 核心线程数int corePoolSize = 2;// 2. 最大线程数int maximumPoolSize = 4;// 3. 空闲线程存活时间long keepAliveTime = 10;// 4. 时间单位TimeUnit unit = TimeUnit.SECONDS;// 5. 工作队列BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);// 6. 线程工厂ThreadFactory threadFactory = Executors.defaultThreadFactory();// 7. 拒绝策略RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);System.out.println("线程池创建成功");System.out.println("核心线程数: " + corePoolSize);System.out.println("最大线程数: " + maximumPoolSize);System.out.println("队列容量: " + workQueue.remainingCapacity());executor.shutdown();}private static void demonstrateCommonPools() {System.out.println("\n=== 常见线程池 ===");// 1. FixedThreadPool - 固定大小线程池ExecutorService fixedPool = Executors.newFixedThreadPool(3);System.out.println("FixedThreadPool: 固定3个线程");// 2. CachedThreadPool - 可缓存线程池ExecutorService cachedPool = Executors.newCachedThreadPool();System.out.println("CachedThreadPool: 根据需要创建新线程");// 3. SingleThreadExecutor - 单线程线程池ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();System.out.println("SingleThreadExecutor: 单个线程顺序执行");// 4. ScheduledThreadPool - 定时任务线程池ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);System.out.println("ScheduledThreadPool: 可安排任务在给定延迟后运行或定期执行");// 使用示例for (int i = 0; i < 5; i++) {final int taskId = i;fixedPool.execute(() -> {System.out.println("FixedPool任务" + taskId + "执行于: " + Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});}// 关闭线程池fixedPool.shutdown();cachedPool.shutdown();singleThreadPool.shutdown();scheduledPool.shutdown();}private static void demonstrateThreadPoolWorkflow() {System.out.println("\n=== 线程池工作流程 ===");// 创建一个小型线程池演示工作流程ThreadPoolExecutor executor = new ThreadPoolExecutor(2, // 核心线程数3, // 最大线程数1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2), // 容量为2的队列new ThreadPoolExecutor.CallerRunsPolicy() // 调用者运行策略);System.out.println("提交6个任务,观察线程池行为:");System.out.println("核心线程:2, 最大线程:3, 队列容量:2");for (int i = 1; i <= 6; i++) {final int taskId = i;try {executor.execute(() -> {System.out.println("任务" + taskId + "执行于: " + Thread.currentThread().getName());try {Thread.sleep(2000); // 模拟任务执行} catch (InterruptedException e) {e.printStackTrace();}});System.out.println("提交任务" + taskId + "成功");} catch (Exception e) {System.out.println("提交任务" + taskId + "失败: " + e.getMessage());}}executor.shutdown();}private static void demonstrateRejectionPolicies() {System.out.println("\n=== 拒绝策略 ===");// 1. AbortPolicy - 默认策略,抛出RejectedExecutionExceptiontestRejectionPolicy(new ThreadPoolExecutor.AbortPolicy(), "AbortPolicy");// 2. CallerRunsPolicy - 由调用线程执行该任务testRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy(), "CallerRunsPolicy");// 3. DiscardPolicy - 直接丢弃任务testRejectionPolicy(new ThreadPoolExecutor.DiscardPolicy(), "DiscardPolicy");// 4. DiscardOldestPolicy - 丢弃队列中最旧的任务,然后重试testRejectionPolicy(new ThreadPoolExecutor.DiscardOldestPolicy(), "DiscardOldestPolicy");}private static void testRejectionPolicy(RejectedExecutionHandler policy, String policyName) {System.out.println("\n测试拒绝策略: " + policyName);ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1), policy);// 提交3个任务,但线程池只能处理2个(1个执行,1个排队)for (int i = 1; i <= 3; i++) {final int taskId = i;try {executor.execute(() -> {System.out.println(policyName + " - 任务" + taskId + "执行于: " + Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(policyName + " - 提交任务" + taskId + "成功");} catch (Exception e) {System.out.println(policyName + " - 提交任务" + taskId + "失败: " + e.getMessage());}}executor.shutdown();}private static void demonstrateThreadFactory() {System.out.println("\n=== 线程工厂 ===");// 自定义线程工厂ThreadFactory customFactory = new CustomThreadFactory("CustomThread");ExecutorService executor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS,new LinkedBlockingQueue<>(),customFactory);for (int i = 0; i < 3; i++) {executor.execute(() -> {System.out.println("任务执行于: " + Thread.currentThread().getName());});}executor.shutdown();}// 自定义线程工厂static class CustomThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;CustomThreadFactory(String namePrefix) {this.namePrefix = namePrefix + "-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());t.setDaemon(false);t.setPriority(Thread.NORM_PRIORITY);return t;}}private static void demonstrateScheduledThreadPool() {System.out.println("\n=== 定时任务线程池 ===");ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);// 1. 延迟执行System.out.println("提交延迟任务: " + new java.util.Date());scheduler.schedule(() -> {System.out.println("延迟任务执行: " + new java.util.Date());}, 2, TimeUnit.SECONDS);// 2. 固定速率执行System.out.println("提交固定速率任务: " + new java.util.Date());scheduler.scheduleAtFixedRate(() -> {System.out.println("固定速率任务执行: " + new java.util.Date());}, 1, 2, TimeUnit.SECONDS);// 3. 固定延迟执行System.out.println("提交固定延迟任务: " + new java.util.Date());scheduler.scheduleWithFixedDelay(() -> {System.out.println("固定延迟任务开始: " + new java.util.Date());try {Thread.sleep(1000); // 模拟任务执行时间} catch (InterruptedException e) {e.printStackTrace();}System.out.println("固定延迟任务结束: " + new java.util.Date());}, 1, 2, TimeUnit.SECONDS);// 10秒后关闭调度器scheduler.schedule(() -> {System.out.println("关闭调度器");scheduler.shutdown();}, 10, TimeUnit.SECONDS);}
}
并发集合
import java.util.*;
import java.util.concurrent.*;public class ConcurrentCollectionPractice {public static void main(String[] args) {// 1. CopyOnWriteArrayListdemonstrateCopyOnWriteArrayList();// 2. ConcurrentLinkedQueuedemonstrateConcurrentLinkedQueue();// 3. ConcurrentHashMapdemonstrateConcurrentHashMap();// 4. 其他并发集合demonstrateOtherConcurrentCollections();}private static void demonstrateCopyOnWriteArrayList() {System.out.println("\n=== CopyOnWriteArrayList ===");List<String> list = new CopyOnWriteArrayList<>();// 启动一个线程不断读取Thread reader = new Thread(() -> {while (true) {try {for (String item : list) {System.out.println("读取: " + item);}Thread.sleep(100);} catch (InterruptedException e) {break;}}});reader.start();// 主线程添加元素for (int i = 0; i < 5; i++) {String item = "Item" + i;list.add(item);System.out.println("添加: " + item);try {Thread.sleep(150);} catch (InterruptedException e) {e.printStackTrace();}}reader.interrupt();System.out.println("最终列表: " + list);}private static void demonstrateConcurrentLinkedQueue() {System.out.println("\n=== ConcurrentLinkedQueue ===");Queue<String> queue = new ConcurrentLinkedQueue<>();// 生产者线程Thread producer = new Thread(() -> {for (int i = 0; i < 5; i++) {String item = "Product" + i;queue.offer(item);System.out.println("生产: " + item);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}});// 消费者线程Thread consumer = new Thread(() -> {for (int i = 0; i < 5; i++) {String item = queue.poll();if (item != null) {System.out.println("消费: " + item);}try {Thread.sleep(150);} catch (InterruptedException e) {e.printStackTrace();}}});producer.start();consumer.start();try {producer.join();consumer.join();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("队列剩余: " + queue.size());}private static void demonstrateConcurrentHashMap() {System.out.println("\n=== ConcurrentHashMap ===");ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();// 多个线程同时操作Thread[] threads = new Thread[5];for (int i = 0; i < threads.length; i++) {final int threadId = i;threads[i] = new Thread(() -> {for (int j = 0; j < 10; j++) {String key = "Key" + (threadId * 10 + j);map.put(key, threadId * 10 + j);System.out.println("线程" + threadId + " 添加: " + key);// 使用线程安全的方法map.computeIfPresent(key, (k, v) -> v + 1);}});}for (Thread thread : threads) {thread.start();}for (Thread thread : threads) {try {thread.join();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("最终映射大小: " + map.size());// 遍历map.forEach((k, v) -> System.out.println(k + " => " + v));// 搜索String result = map.search(100, (k, v) -> v > 40 ? k : null);System.out.println("搜索值>40的键: " + result);// 归约Integer sum = map.reduce(100, (k, v) -> v, (v1, v2) -> v1 + v2);System.out.println("所有值的和: " + sum);}private static void demonstrateOtherConcurrentCollections() {System.out.println("\n=== 其他并发集合 ===");// ConcurrentSkipListMap - 并发跳表映射ConcurrentSkipListMap<String, Integer> skipListMap = new ConcurrentSkipListMap<>();skipListMap.put("C", 3);skipListMap.put("A", 1);skipListMap.put("B", 2);System.out.println("ConcurrentSkipListMap: " + skipListMap);// ConcurrentSkipListSet - 并发跳表集合ConcurrentSkipListSet<String> skipListSet = new ConcurrentSkipListSet<>();skipListSet.add("C");skipListSet.add("A");skipListSet.add("B");System.out.println("ConcurrentSkipListSet: " + skipListSet);// CopyOnWriteArraySet - 写时复制集合CopyOnWriteArraySet<String> copyOnWriteSet = new CopyOnWriteArraySet<>();copyOnWriteSet.add("A");copyOnWriteSet.add("B");copyOnWriteSet.add("A"); // 重复元素System.out.println("CopyOnWriteArraySet: " + copyOnWriteSet);// BlockingQueue - 阻塞队列BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(2);try {blockingQueue.put("Item1");blockingQueue.put("Item2");System.out.println("阻塞队列已满");// 新线程尝试取出元素new Thread(() -> {try {Thread.sleep(1000);String item = blockingQueue.take();System.out.println("取出: " + item);} catch (InterruptedException e) {e.printStackTrace();}}).start();// 主线程尝试添加第三个元素(会阻塞直到有空间)System.out.println("尝试添加第三个元素...");blockingQueue.put("Item3");System.out.println("添加第三个元素成功");} catch (InterruptedException e) {e.printStackTrace();}}
}
同步工具类
import java.util.concurrent.*;public class SynchronizerPractice {public static void main(String[] args) throws InterruptedException {// 1. CountDownLatchdemonstrateCountDownLatch();// 2. CyclicBarrierdemonstrateCyclicBarrier();// 3. SemaphoredemonstrateSemaphore();// 4. ExchangerdemonstrateExchanger();// 5. PhaserdemonstratePhaser();}private static void demonstrateCountDownLatch() throws InterruptedException {System.out.println("\n=== CountDownLatch ===");int workerCount = 3;CountDownLatch startSignal = new CountDownLatch(1);CountDownLatch doneSignal = new CountDownLatch(workerCount);for (int i = 0; i < workerCount; i++) {final int workerId = i;new Thread(() -> {try {System.out.println("工人" + workerId + "等待开工");startSignal.await(); // 等待开工信号System.out.println("工人" + workerId + "开始工作");Thread.sleep(1000 + workerId * 500); // 模拟工作System.out.println("工人" + workerId + "完成工作");doneSignal.countDown(); // 完成工作} catch (InterruptedException e) {e.printStackTrace();}}).start();}Thread.sleep(1000); // 准备时间System.out.println("经理发出开工信号");startSignal.countDown(); // 发出开工信号doneSignal.await(); // 等待所有工人完成System.out.println("所有工人已完成工作,经理开始验收");}private static void demonstrateCyclicBarrier() {System.out.println("\n=== CyclicBarrier ===");int threadCount = 3;CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {System.out.println("所有线程已到达屏障,执行屏障操作");});for (int i = 0; i < threadCount; i++) {final int threadId = i;new Thread(() -> {try {System.out.println("线程" + threadId + "开始第一阶段");Thread.sleep(1000 + threadId * 500);System.out.println("线程" + threadId + "到达屏障,等待其他线程");barrier.await();System.out.println("线程" + threadId + "开始第二阶段");Thread.sleep(1000 + threadId * 500);System.out.println("线程" + threadId + "再次到达屏障,等待其他线程");barrier.await();System.out.println("线程" + threadId + "完成所有工作");} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}).start();}}private static void demonstrateSemaphore() {System.out.println("\n=== Semaphore ===");int resourceCount = 2;Semaphore semaphore = new Semaphore(resourceCount);for (int i = 0; i < 5; i++) {final int threadId = i;new Thread(() -> {try {System.out.println("线程" + threadId + "尝试获取资源");semaphore.acquire();System.out.println("线程" + threadId + "获取到资源,开始使用");Thread.sleep(2000); // 模拟使用资源System.out.println("线程" + threadId + "释放资源");semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}}).start();}}private static void demonstrateExchanger() {System.out.println("\n=== Exchanger ===");Exchanger<String> exchanger = new Exchanger<>();// 生产者线程new Thread(() -> {try {String data = "生产的数据";System.out.println("生产者准备交换: " + data);Thread.sleep(1000);String received = exchanger.exchange(data);System.out.println("生产者收到: " + received);} catch (InterruptedException e) {e.printStackTrace();}}).start();// 消费者线程new Thread(() -> {try {String data = "消费的数据";System.out.println("消费者准备交换: " + data);Thread.sleep(2000);String received = exchanger.exchange(data);System.out.println("消费者收到: " + received);} catch (InterruptedException e) {e.printStackTrace();}}).start();}private static void demonstratePhaser() {System.out.println("\n=== Phaser ===");Phaser phaser = new Phaser(1); // 主线程注册for (int i = 0; i < 3; i++) {phaser.register(); // 注册新线程final int threadId = i;new Thread(() -> {System.out.println("线程" + threadId + "到达阶段1");phaser.arriveAndAwaitAdvance(); // 等待所有线程到达System.out.println("线程" + threadId + "到达阶段2");phaser.arriveAndAwaitAdvance(); // 等待所有线程到达System.out.println("线程" + threadId + "完成");phaser.arriveAndDeregister(); // 完成并注销}).start();}// 主线程等待所有阶段完成phaser.arriveAndAwaitAdvance();System.out.println("阶段1完成");phaser.arriveAndAwaitAdvance();System.out.println("阶段2完成");phaser.arriveAndDeregister();System.out.println("所有阶段完成");}
}
4. 综合练习与应用
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;public class MultithreadingExercises {public static void main(String[] args) throws Exception {// 练习1: 生产者-消费者问题producerConsumerProblem();// 练习2: 读者-写者问题readerWriterProblem();// 练习3: 哲学家就餐问题diningPhilosophersProblem();// 练习4: 高性能计数器highPerformanceCounter();// 练习5: 并行处理任务parallelTaskProcessing();}// 生产者-消费者问题private static void producerConsumerProblem() throws InterruptedException {System.out.println("\n=== 生产者-消费者问题 ===");BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);// 生产者Thread producer = new Thread(() -> {try {for (int i = 0; i < 10; i++) {queue.put(i);System.out.println("生产: " + i + ",队列大小: " + queue.size());Thread.sleep(100);}} catch (InterruptedException e) {e.printStackTrace();}});// 消费者Thread consumer = new Thread(() -> {try {for (int i = 0; i < 10; i++) {Integer item = queue.take();System.out.println("消费: " + item + ",队列大小: " + queue.size());Thread.sleep(200);}} catch (InterruptedException e) {e.printStackTrace();}});producer.start();consumer.start();producer.join();consumer.join();}// 读者-写者问题private static void readerWriterProblem() {System.out.println("\n=== 读者-写者问题 ===");ReadWriteLock lock = new ReentrantReadWriteLock();List<Integer> data = new ArrayList<>();// 写者Thread writer = new Thread(() -> {for (int i = 0; i < 5; i++) {lock.writeLock().lock();try {data.add(i);System.out.println("写入: " + i);Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.writeLock().unlock();}}});// 读者for (int i = 0; i < 3; i++) {final int readerId = i;new Thread(() -> {for (int j = 0; j < 5; j++) {lock.readLock().lock();try {if (!data.isEmpty()) {System.out.println("读者" + readerId + "读取: " + data.get(data.size() - 1));}Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.readLock().unlock();}}}).start();}writer.start();}// 哲学家就餐问题private static void diningPhilosophersProblem() {System.out.println("\n=== 哲学家就餐问题 ===");int philosopherCount = 5;ReentrantLock[] chopsticks = new ReentrantLock[philosopherCount];for (int i = 0; i < philosopherCount; i++) {chopsticks[i] = new ReentrantLock();}ExecutorService executor = Executors.newFixedThreadPool(philosopherCount);for (int i = 0; i < philosopherCount; i++) {final int philosopherId = i;executor.execute(() -> {ReentrantLock leftChopstick = chopsticks[philosopherId];ReentrantLock rightChopstick = chopsticks[(philosopherId + 1) % philosopherCount];try {while (true) {// 思考System.out.println("哲学家" + philosopherId + "思考中");Thread.sleep(1000);// 就餐if (leftChopstick.tryLock(100, TimeUnit.MILLISECONDS)) {try {if (rightChopstick.tryLock(100, TimeUnit.MILLISECONDS)) {try {System.out.println("哲学家" + philosopherId + "就餐中");Thread.sleep(1000);} finally {rightChopstick.unlock();}}} finally {leftChopstick.unlock();}}}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}// 运行一段时间后关闭try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}executor.shutdownNow();}// 高性能计数器private static void highPerformanceCounter() throws InterruptedException {System.out.println("\n=== 高性能计数器 ===");int threadCount = 10;int incrementPerThread = 100000;// 1. 使用synchronizedlong startTime = System.currentTimeMillis();SynchronizedCounter synchronizedCounter = new SynchronizedCounter();testCounterPerformance(synchronizedCounter, threadCount, incrementPerThread);long synchronizedTime = System.currentTimeMillis() - startTime;// 2. 使用AtomicLongstartTime = System.currentTimeMillis();AtomicLong atomicCounter = new AtomicLong();testCounterPerformance(atomicCounter, threadCount, incrementPerThread);long atomicTime = System.currentTimeMillis() - startTime;// 3. 使用LongAdder (Java 8+)startTime = System.currentTimeMillis();LongAdder adderCounter = new LongAdder();testCounterPerformance(adderCounter, threadCount, incrementPerThread);long adderTime = System.currentTimeMillis() - startTime;System.out.println("synchronized耗时: " + synchronizedTime + "ms");System.out.println("AtomicLong耗时: " + atomicTime + "ms");System.out.println("LongAdder耗时: " + adderTime + "ms");}private static void testCounterPerformance(Object counter, int threadCount, int incrementPerThread) throws InterruptedException {Thread[] threads = new Thread[threadCount];for (int i = 0; i < threadCount; i++) {threads[i] = new Thread(() -> {for (int j = 0; j < incrementPerThread; j++) {if (counter instanceof SynchronizedCounter) {((SynchronizedCounter) counter).increment();} else if (counter instanceof AtomicLong) {((AtomicLong) counter).incrementAndGet();} else if (counter instanceof LongAdder) {((LongAdder) counter).increment();}}});}for (Thread thread : threads) {thread.start();}for (Thread thread : threads) {thread.join();}}static class SynchronizedCounter {private long count = 0;public synchronized void increment() {count++;}public synchronized long getCount() {return count;}}// 并行处理任务private static void parallelTaskProcessing() throws Exception {System.out.println("\n=== 并行处理任务 ===");int taskCount = 10;ExecutorService executor = Executors.newFixedThreadPool(4);CompletionService<String> completionService = new ExecutorCompletionService<>(executor);// 提交任务for (int i = 0; i < taskCount; i++) {final int taskId = i;completionService.submit(() -> {int sleepTime = 1000 + new Random().nextInt(2000);Thread.sleep(sleepTime);return "任务" + taskId + "完成,耗时" + sleepTime + "ms";});}// 获取结果for (int i = 0; i < taskCount; i++) {Future<String> future = completionService.take();System.out.println(future.get());}executor.shutdown();}
}
这些代码示例涵盖了Java多线程编程的核心概念,从线程基础到高级并发工具,再到实际应用场景。通过学习和实践这些示例,你将能够:
- 理解多线程的基本概念和创建方式
- 掌握线程安全问题的本质和解决方案
- 熟练使用synchronized和volatile关键字
- 理解并应用JUC包中的各种并发工具
- 能够设计和实现高效的多线程程序
多线程编程是Java开发中的难点和重点,需要大量的实践才能掌握。建议你不仅运行这些代码,还要尝试修改它们,创建自己的多线程应用,并学会使用调试工具分析多线程问题。