Java并发编程:深入浅出掌握多线程艺术
一、并发编程的定义与重要性
什么是并发编程?
当我们在单核CPU上边听音乐边写文档时,操作系统通过快速切换任务营造"同时执行"的假象,这就是并发。Java并发编程允许我们创建多个执行线程,充分利用多核CPU的计算能力。
重要性:
-
提升系统吞吐量(如Web服务器同时处理多个请求)
-
改善用户体验(如后台下载不影响界面响应)
-
提高资源利用率(如数据库连接池复用)
// 基础线程示例
public class BasicThread {public static void main(String[] args) {Thread downloadThread = new Thread(() -> {System.out.println("开始下载文件...");// 模拟耗时操作try { Thread.sleep(3000); } catch (InterruptedException e) {}System.out.println("文件下载完成");});downloadThread.start();System.out.println("主线程继续运行");}
}
二、线程安全与共享资源
线程安全陷阱
当多个线程同时修改共享数据时,可能产生不可预知的结果:
class UnsafeCounter {private int count = 0;public void increment() {count++; // 非原子操作}public int getCount() { return count; }
}// 测试代码
public static void main(String[] args) throws InterruptedException {UnsafeCounter counter = new UnsafeCounter();ExecutorService executor = Executors.newFixedThreadPool(10);for (int i = 0; i < 1000; i++) {executor.execute(counter::increment);}executor.shutdown();executor.awaitTermination(1, TimeUnit.MINUTES);System.out.println("Final count: " + counter.getCount()); // 预期1000,实际可能输出978等随机值
}
常见问题类型
-
数据竞争:未同步的共享数据访问
-
死锁:线程互相持有对方所需资源
-
活锁:线程不断重试失败操作
// 经典死锁示例
Object lockA = new Object();
Object lockB = new Object();new Thread(() -> {synchronized (lockA) {try { Thread.sleep(100); } catch (InterruptedException e) {}synchronized (lockB) {System.out.println("Thread1 got both locks");}}
}).start();new Thread(() -> {synchronized (lockB) {synchronized (lockA) {System.out.println("Thread2 got both locks");}}
}).start();
死锁问题全场景解决方案
银行家算法实现(预防死锁)
class BankersAlgorithm {private int[] available;private int[][] max;private int[][] allocation;private int[][] need;public synchronized boolean requestResources(int processId, int[] request) {if (!checkRequestValid(processId, request)) return false;// 试探性分配for (int i = 0; i < request.length; i++) {available[i] -= request[i];allocation[processId][i] += request[i];need[processId][i] -= request[i];}if (isSafeState()) {return true;} else {// 回滚操作for (int i = 0; i < request.length; i++) {available[i] += request[i];allocation[processId][i] -= request[i];need[processId][i] += request[i];}return false;}}private boolean isSafeState() { /* 安全性检查算法实现 */ }
}
锁排序解决方案
class Account {private int id;private BigDecimal balance;public void transfer(Account target, BigDecimal amount) {Account first = id < target.id ? this : target;Account second = id < target.id ? target : this;synchronized(first) {synchronized(second) {if (this.balance.compareTo(amount) >= 0) {this.balance = this.balance.subtract(amount);target.balance = target.balance.add(amount);}}}}
}
tryLock超时机制
class TimeoutLockExample {private final Lock lock1 = new ReentrantLock();private final Lock lock2 = new ReentrantLock();public boolean transferWithTimeout(long timeout, TimeUnit unit) throws InterruptedException {long startTime = System.nanoTime();while (true) {if (lock1.tryLock()) {try {if (lock2.tryLock(timeout, unit)) {try {// 执行业务逻辑return true;} finally {lock2.unlock();}}} finally {lock1.unlock();}}if (System.nanoTime() - startTime > unit.toNanos(timeout)) {return false;}Thread.sleep(50); // 避免活锁}}
}
三、同步机制与锁
synchronized关键字
class SafeCounter {private int count = 0;public synchronized void increment() {count++;}public synchronized int getCount() {return count;}
}
锁升级优化:
-
无锁 → 偏向锁 → 轻量级锁 → 重量级锁
ReentrantLock的灵活控制
class FairCounter {private int count = 0;private final ReentrantLock lock = new ReentrantLock(true); // 公平锁public void increment() {lock.lock();try {count++;} finally {lock.unlock();}}
}
读写锁应用场景
class CachedData {private Object data;private final ReadWriteLock rwLock = new ReentrantReadWriteLock();public void processCachedData() {rwLock.readLock().lock();if (!dataValid) {rwLock.readLock().unlock();rwLock.writeLock().lock();try {// 更新缓存} finally {rwLock.readLock().lock();rwLock.writeLock().unlock();}}// 使用数据rwLock.readLock().unlock();}
}
四、并发集合与工具类
ConcurrentHashMap的线程安全实现
// 高性能统计(原子操作方法)
ConcurrentHashMap<String, LongAdder> counterMap = new ConcurrentHashMap<>();public void count(String key) {counterMap.computeIfAbsent(key, k -> new LongAdder()).increment();
}// 分段统计示例
ConcurrentHashMap<String, AtomicLong> map = new ConcurrentHashMap<>();
map.putIfAbsent("key", new AtomicLong(0));
map.get("key").incrementAndGet();
CopyOnWriteArraySet应用场景
// 监听器列表管理
class EventManager {private final CopyOnWriteArraySet<EventListener> listeners = new CopyOnWriteArraySet<>();public void addListener(EventListener listener) {listeners.add(listener);}public void fireEvent(Event event) {for (EventListener listener : listeners) {executor.execute(() -> listener.onEvent(event));}}
}
Disruptor框架(高性能队列)
// 构建Disruptor环形队列
Disruptor<LogEvent> disruptor = new Disruptor<>(LogEvent::new, 1024, DaemonThreadFactory.INSTANCE,ProducerType.MULTI, new BlockingWaitStrategy()
);disruptor.handleEventsWith((event, sequence, endOfBatch) -> {// 处理日志事件
});RingBuffer<LogEvent> ringBuffer = disruptor.start();// 发布事件
long sequence = ringBuffer.next();
try {LogEvent event = ringBuffer.get(sequence);event.setMessage("Log message");
} finally {ringBuffer.publish(sequence);
}
同步工具应用示例
// 使用CountDownLatch实现并行任务等待
CountDownLatch latch = new CountDownLatch(3);Runnable task = () -> {// 执行任务latch.countDown();
};new Thread(task).start();
new Thread(task).start();
new Thread(task).start();latch.await(); // 等待所有任务完成
System.out.println("All tasks completed!");
五、内存模型与可见性
volatile关键字的作用
class VolatileExample {private volatile boolean flag = false;public void writer() {flag = true; // 写操作对其他线程立即可见}public void reader() {while (!flag) {// 当flag变为true时立即跳出循环}}
}
六、并发性能调优黄金法则
线程池参数优化公式
最佳线程数 = CPU核心数 * (1 + 等待时间/计算时间)
ThreadPoolExecutor executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // 核心线程数Runtime.getRuntime().availableProcessors() * 2, // 最大线程数60L, TimeUnit.SECONDS, // 空闲线程存活时间new LinkedBlockingQueue<>(1000), // 有界队列new ThreadFactoryBuilder().setNameFormat("worker-%d").build(),new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
上下文切换优化技巧
@Suspendable
public void coroutineExample() throws SuspendExecution {Fiber<Void> fiber1 = new Fiber<Void>(() -> {System.out.println("Fiber 1 start");Fiber.park(1000); // 挂起1秒System.out.println("Fiber 1 resume");}).start();Fiber<Void> fiber2 = new Fiber<Void>(() -> {System.out.println("Fiber 2 start");Fiber.park(500); // 挂起0.5秒System.out.println("Fiber 2 resume");}).start();
}
锁消除与锁粗化
// 锁消除示例(JIT编译器优化)
public String concatString(String s1, String s2, String s3) {StringBuffer sb = new StringBuffer();sb.append(s1); // 自动消除锁(栈封闭)sb.append(s2);sb.append(s3);return sb.toString();
}// 锁粗化优化前
public void method() {synchronized(lock) { /* 操作1 */ }synchronized(lock) { /* 操作2 */ }synchronized(lock) { /* 操作3 */ }
}// 优化后
public void method() {synchronized(lock) {/* 操作1 *//* 操作2 *//* 操作3 */}
}
七、分布式环境下的并发挑战
分布式锁实现方案对比
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
Redis SETNX | 实现简单,性能高 | 时钟漂移问题 | 高并发短事务 |
ZooKeeper | 强一致性,可靠性高 | 性能较低 | 金融交易等强一致性场景 |
Etcd | 高可用,支持租约 | 需要维护连接 | 容器化环境 |
数据库行锁 | 无需额外组件 | 性能差,死锁风险高 | 低频长事务 |
RedLock算法实现
public boolean acquireDistributedLock(JedisPool[] jedisPools, String lockKey, String requestId, int expireTime) {int successCount = 0;long startTime = System.currentTimeMillis();try {for (JedisPool pool : jedisPools) {try (Jedis jedis = pool.getResource()) {if ("OK".equals(jedis.set(lockKey, requestId, "NX", "PX", expireTime))) {successCount++;}}}long costTime = System.currentTimeMillis() - startTime;// 超过半数且操作时间小于锁过期时间return successCount >= jedisPools.length/2 + 1 && costTime < expireTime;} finally {if (!acquired) {releaseDistributedLock(jedisPools, lockKey, requestId);}}
}