Java线程安全与同步机制全解析
Java 线程同步安全机制详解
在 Java 中实现线程同步安全是并发编程的核心,下面我将全面介绍各种同步机制及其应用场景:
一、基础同步机制
1. synchronized 关键字
public class SynchronizedCounter {private int count = 0;// 同步方法public synchronized void increment() {count++;}// 同步代码块public void decrement() {synchronized(this) {count--;}}// 静态同步方法(类级别锁)public static synchronized void staticMethod() {// ...}
}
2. volatile 关键字
public class VolatileExample {private volatile boolean flag = false;public void setFlag() {flag = true; // 写操作立即对其他线程可见}public void doWork() {while(!flag) {// 循环直到flag变为true}}
}
二、高级同步工具类
1. ReentrantLock
import java.util.concurrent.locks.*;public class LockCounter {private int count = 0;private final ReentrantLock lock = new ReentrantLock();public void increment() {lock.lock(); // 手动加锁try {count++;} finally {lock.unlock(); // 确保解锁}}// 尝试获取锁public boolean tryIncrement() {if(lock.tryLock()) {try {count++;return true;} finally {lock.unlock();}}return false;}
}
2. ReadWriteLock
public class ReadWriteMap<K, V> {private final Map<K, V> map = new HashMap<>();private final ReadWriteLock lock = new ReentrantReadWriteLock();public V get(K key) {lock.readLock().lock();try {return map.get(key);} finally {lock.readLock().unlock();}}public void put(K key, V value) {lock.writeLock().lock();try {map.put(key, value);} finally {lock.writeLock().unlock();}}
}
三、线程安全容器
1. ConcurrentHashMap
import java.util.concurrent.*;public class ConcurrentCache {private final ConcurrentMap<String, String> cache = new ConcurrentHashMap<>();public String get(String key) {return cache.get(key);}public void put(String key, String value) {cache.put(key, value);}// 原子操作public void atomicUpdate(String key, String value) {cache.compute(key, (k, v) -> v == null ? value : v + value);}
}
2. CopyOnWriteArrayList
import java.util.concurrent.*;public class EventDispatcher {private final CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();public void addListener(EventListener listener) {listeners.add(listener);}public void dispatchEvent(Event event) {for(EventListener listener : listeners) {listener.onEvent(event);}}
}
四、原子操作类
import java.util.concurrent.atomic.*;public class AtomicCounter {private final AtomicInteger count = new AtomicInteger(0);public void increment() {count.incrementAndGet(); // 原子自增}public void add(int value) {// 原子更新count.updateAndGet(current -> current + value);}// 比较并交换(CAS)public boolean compareAndSet(int expect, int update) {return count.compareAndSet(expect, update);}
}
五、线程协调工具
1. CountDownLatch
public class ServiceInitializer {public void initializeServices() throws InterruptedException {int serviceCount = 3;CountDownLatch latch = new CountDownLatch(serviceCount);new Thread(() -> { startDB(); latch.countDown(); }).start();new Thread(() -> { startCache(); latch.countDown(); }).start();new Thread(() -> { startAPI(); latch.countDown(); }).start();latch.await(); // 等待所有服务初始化完成System.out.println("所有服务已就绪");}
}
2. CyclicBarrier
public class MatrixProcessor {public void processMatrix(int[][] matrix) {int rows = matrix.length;CyclicBarrier barrier = new CyclicBarrier(rows, () -> System.out.println("矩阵处理完成"));for(int i = 0; i < rows; i++) {final int row = i;new Thread(() -> {processRow(matrix[row]);barrier.await(); // 等待其他行处理完成}).start();}}
}
3. Semaphore
public class ConnectionPool {private final Semaphore semaphore;private final List<Connection> connections;public ConnectionPool(int size) {this.semaphore = new Semaphore(size);this.connections = new ArrayList<>(size);// 初始化连接池...}public Connection getConnection() throws InterruptedException {semaphore.acquire(); // 获取许可return acquireAvailableConnection();}public void releaseConnection(Connection conn) {releaseConnection(conn);semaphore.release(); // 释放许可}
}
六、最佳实践与性能考虑
-
锁粒度优化:
// 粗粒度锁(性能较差) public synchronized void processEverything() { /* ... */ }// 细粒度锁(推荐) public void process() {synchronized(lockA) { /* 操作A */ }synchronized(lockB) { /* 操作B */ } }
-
避免死锁:
// 使用tryLock避免死锁 public boolean transfer(Account from, Account to, int amount) {if (from.lock.tryLock()) {try {if (to.lock.tryLock()) {try {// 执行转账return true;} finally {to.lock.unlock();}}} finally {from.lock.unlock();}}return false; }
-
使用线程局部变量:
public class UserContext {private static final ThreadLocal<User> currentUser = new ThreadLocal<>();public static void setUser(User user) {currentUser.set(user);}public static User getUser() {return currentUser.get();} }
七、同步机制对比
机制 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
synchronized | 简单同步场景 | 简单易用,自动释放锁 | 功能有限,不可中断 |
ReentrantLock | 复杂同步需求 | 可中断、公平锁、尝试获取锁 | 需手动释放锁 |
volatile | 状态标志 | 轻量级,保证可见性 | 不保证原子性 |
Atomic类 | 计数器等 | 无锁,高性能 | 只适合简单操作 |
Concurrent集合 | 高并发容器 | 高性能,线程安全 | API复杂 |
CountDownLatch | 一次性等待 | 简单协调 | 不可重置 |
CyclicBarrier | 循环等待 | 可重复使用 | 复杂场景控制难 |
八、高级并发模式
1. 生产者-消费者模式
public class ProducerConsumer {private final BlockingQueue<Item> queue = new LinkedBlockingQueue<>(100);class Producer implements Runnable {public void run() {while(true) {Item item = produceItem();queue.put(item); // 阻塞直到有空间}}}class Consumer implements Runnable {public void run() {while(true) {Item item = queue.take(); // 阻塞直到有项目consumeItem(item);}}}
}
2. Fork/Join框架
public class SumTask extends RecursiveTask<Long> {private final int[] array;private final int start, end;public SumTask(int[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected Long compute() {if (end - start < 1000) {// 小任务直接计算long sum = 0;for (int i = start; i < end; i++) {sum += array[i];}return sum;} else {// 大任务拆分int mid = (start + end) >>> 1;SumTask left = new SumTask(array, start, mid);SumTask right = new SumTask(array, mid, end);left.fork(); // 异步执行return right.compute() + left.join();}}
}
总结
实现线程同步安全的核心原则:
- 识别共享状态:明确哪些数据需要保护
- 选择合适锁粒度:平衡安全性和性能
- 优先使用高级工具:如并发容器、原子类
- 避免锁嵌套:防止死锁
- 考虑不可变性:不可变对象天生线程安全
对于现代Java开发,优先考虑以下方案:
- 简单状态标志:
volatile
- 计数器:
AtomicInteger
/AtomicLong
- 并发集合:
ConcurrentHashMap
/CopyOnWriteArrayList
- 资源池:
Semaphore
- 复杂同步:
ReentrantLock
- 任务协调:
CountDownLatch
/CyclicBarrier
通过合理应用这些同步机制,可以构建出既安全又高效的并发程序。