JAVA集合篇--深入理解ConcurrentHashMap图解版
一、前言
在Java并发编程中,线程安全的Map实现一直是一个重要话题。虽然我们可以使用Collections.synchronizedMap()
或者HashTable
来获得线程安全的Map,但它们的性能在高并发场景下往往不尽人意。ConcurrentHashMap作为Java并发包中的重要组件,以其优雅的设计和出色的性能成为了并发编程的首选。
二、为什么需要ConcurrentHashMap
1.1 传统方案的问题
在ConcurrentHashMap出现之前,我们主要有以下几种线程安全的Map实现:
HashTable
// HashTable的put方法
public synchronized V put(K key, V value) {// 整个方法都被synchronized修饰// 所有操作都是串行的
}
Collections.synchronizedMap()
// SynchronizedMap的实现
public V put(K key, V value) {synchronized (mutex) { // mutex通常是thisreturn m.put(key, value);}
}
这两种方案的共同问题是:
- 粗粒度锁:整个Map只有一把锁,所有操作都需要竞争这把锁
- 读写冲突:读操作也需要获取锁,无法并发进行
- 性能瓶颈:在高并发场景下,锁竞争激烈,性能急剧下降
1.2 性能对比示例
public class MapPerformanceTest {private static final int THREAD_COUNT = 16;private static final int OPERATION_COUNT = 100_000;public static void main(String[] args) throws InterruptedException {// 测试HashTableMap<String, Integer> hashTable = new Hashtable<>();testConcurrentAccess("HashTable", hashTable);// 测试ConcurrentHashMapMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();testConcurrentAccess("ConcurrentHashMap", concurrentMap);}private static void testConcurrentAccess(String mapType, Map<String, Integer> map) throws InterruptedException {CountDownLatch startLatch = new CountDownLatch(1);CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);long startTime = System.currentTimeMillis();for (int i = 0; i < THREAD_COUNT; i++) {new Thread(() -> {try {startLatch.await();for (int j = 0; j < OPERATION_COUNT; j++) {map.put("key" + j, j);map.get("key" + j);}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {endLatch.countDown();}}).start();}startLatch.countDown();endLatch.await();long endTime = System.currentTimeMillis();System.out.println(mapType + " 耗时: " + (endTime - startTime) + "ms");}
}
典型测试结果:
- HashTable: 2500ms+
- ConcurrentHashMap: 800ms-
三、ConcurrentHashMap的演进历史
2.1 JDK 1.5-1.7:分段锁时代
在JDK 1.7及之前,ConcurrentHashMap采用**分段锁(Segment)**的设计:
// JDK 1.7 ConcurrentHashMap结构
static final class Segment<K,V> extends ReentrantLock implements Serializable {transient volatile HashEntry<K,V>[] table; // 该段的哈希表transient int count; // 该段的元素数量transient int modCount; // 修改次数transient int threshold; // 扩容阈值final float loadFactor; // 负载因子
}// HashEntry结构
static final class HashEntry<K,V> {final int hash;final K key;volatile V value;volatile HashEntry<K,V> next; // 链表指针
}
分段锁原理:
- 将整个Map分成多个Segment(默认16个)
- 每个Segment相当于一个小的HashMap,拥有独立的锁
- 不同Segment之间的操作可以并发进行
- 只有操作同一个Segment时才需要竞争锁
// JDK 1.7 put操作
public V put(K key, V value) {Segment<K,V> s;if (value == null)throw new NullPointerException();int hash = hash(key);// 根据hash值确定segmentint j = (hash >>> segmentShift) & segmentMask;if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)s = ensureSegment(j);return s.put(key, hash, value, false);
}
2.2 JDK 1.8+:CAS + synchronized的革新
JDK 1.8对ConcurrentHashMap进行了重大改革:
主要变化:
- 取消分段锁:改为对每个桶(数组元素)进行锁定
- 引入红黑树:链表长度超过8时转换为红黑树
- CAS操作:大量使用CAS操作减少锁的使用
- 锁粒度更细:锁的粒度从Segment级别降低到桶级别
// JDK 1.8+ 主要数据结构
transient volatile Node<K,V>[] table; // 主数组
private transient volatile Node<K,V>[] nextTable; // 扩容时的新数组
private transient volatile int sizeCtl; // 控制标识符// Node结构
static class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;volatile V val;volatile Node<K,V> next;
}// 红黑树节点
static final class TreeNode<K,V> extends Node<K,V> {TreeNode<K,V> parent;TreeNode<K,V> left;TreeNode<K,V> right;TreeNode<K,V> prev;boolean red;
}
四、扩容的区别
4.1、JDK1.7中的扩容
- 基于Segment:ConcurrentHashMap 是由多个 Segment 组成的,每个 Segment 中包含一个 HashMap。当某个 HashMap 达到扩容阈值时,单独为该 Segment 进行扩容,而不会影响其他的 Segment。
- 扩容过程:每个 Segment 维护自己的负载因子,当 Segment 中的元素超过阈值时,该Segment 的 HashMap 会扩容,整体的 ConcurrentHashMap 并不是一次性全部扩容。
4.2、JDK1.8中的扩容
- 全局扩容:ConcurrentHashMap 取消了 Segment,变成了一个全局数组(类似于HashMap),因此当 ConcurrentHashMap 中的任意一个元素超出阈值时,整个 ConcurrentHashMap 都会触发扩容。
- 基于CAS扩容:在扩容时,ConcurrentHashMap 采用了类似 HashMap 的方式,但通过 CAS操作 确保线程的安全,避免了锁住整个数组。在扩容时多个线程可以协同完成扩容。
- 渐进式扩容:JDK1.8的 ConcurrentHashMap 引入了渐进式扩容机制,扩容不是一次性将所有数据重新分配,而是多个线程共同参与,逐步迁移旧数据到新数组,降低了扩容时性能的开销
4.3、深入JDK 1.8:多线程协作扩容
// JDK 1.8 扩容核心流程
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// 计算每个线程的工作量if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE;// 首个线程初始化新表if (nextTab == null) {try {Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {sizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false;// 多线程协作迁移for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 获取工作范围while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// CAS获取工作范围else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}// 处理边界和收尾if (i < 0 || i >= n || i + n >= nextn) {// 扩容完成处理...}// 迁移具体的桶else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true;else {synchronized (f) { // 锁定单个桶if (tabAt(tab, i) == f) {// 链表/红黑树迁移逻辑// 分为高位和低位两个链表/树}}}}
}// 多线程协作时序图
Thread-1 Thread-2 Thread-3| | || 发起扩容 | || 初始化nextTable | || transfer(0-15) | helpTransfer || | transfer(16-31) | helpTransfer| 迁移bucket-0 | | transfer(32-47)| 迁移bucket-1 | 迁移bucket-16 || ... | ... | 迁移bucket-32| 完成分配范围 | 完成分配范围 | ...| 检查是否全部完成 | 退出扩容 | 完成分配范围| 替换table引用 | | 退出扩容| 扩容完成 | |
优点:
- 多线程并发迁移,效率高
- 细粒度锁,减少阻塞时间
- 渐进式迁移,内存使用平滑
- 支持扩容期间的读操作
缺点:
- 实现复杂度高
- 需要额外的协调机制
- 内存开销较大(ForwardingNode等)
五、核心源码分析
3.1 初始化过程
// 构造函数
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel)initialCapacity = concurrencyLevel;long size = (long)(1.0 + (long)initialCapacity / loadFactor);// 计算最接近size的2的幂次方int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap; // 设置初始容量
}// 懒初始化
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)Thread.yield(); // 其他线程正在初始化,让出CPUelse if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// CAS成功,获得初始化权限try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;sc = n - (n >>> 2); // 0.75 * n}} finally {sizeCtl = sc;}break;}}return tab;
}
3.2 put操作详解
public V put(K key, V value) {return putVal(key, value, false);
}final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode()); // 计算hash值int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 情况1:表未初始化if (tab == null || (n = tab.length) == 0)tab = initTable();// 情况2:目标桶为空,直接CAS插入else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))break; // CAS成功,结束循环}// 情况3:正在扩容,协助扩容else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);// 情况4:桶不为空,需要同步操作else {V oldVal = null;synchronized (f) { // 锁定桶的头节点if (tabAt(tab, i) == f) { // 双重检查// 4.1 链表结构if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;// 找到相同key,更新valueif (e.hash == hash &&((ek = e.key) == key || (ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;// 到达链表尾部,插入新节点if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key, value, null);break;}}}// 4.2 红黑树结构else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}// 检查是否需要树化if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i); // 转换为红黑树if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount); // 更新计数return null;
}
3.3 get操作详解
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// 直接命中头节点if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}// hash值为负数,特殊节点(红黑树或转发节点)else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;// 遍历链表while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}
get操作的特点:
- 无锁读取:get操作不需要加锁,依赖volatile保证可见性
- 快速路径:优先检查头节点,大多数情况下可快速返回
- 支持并发扩容:即使在扩容过程中也能正确读取数据