深入解读ConcurrentHashMap特性以及源码
这个太常用了是吗,面试必问,现在才来整理,不过只要在路上就不晚,此时此刻哈哈
来吧宝子们,让我们更加熟悉这个类
首先我们得知道1.7和1.8版本是不一样的
下图是JDK1.7的数据结构
1.7
- segment+分段锁技术实现
- 默认将Hash表分为16个桶
- segment继承自ReentrantLock,数组+单向链表,每个segment提供相同的线程安全,内部拥有一个Entry数组
- 读不加锁,定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部
- Lock锁控制,分段锁技术锁住Node,而锁住node之前的操作基于volatile和CAS之上原子性的操作保证线程安全
- 段内扩容:段内超过数组长度的75%触发扩容,不会对整个Map进行扩容
这个大家了解一下就好了,现在用1.7的寥寥无几了,如果还有公司在用,兄弟该升升级了
1.8
1.8取消了segment,采用Node,结构为 数组+链表+红黑树,时间复杂度O(log(N)),Java 8 中,锁粒度更细,synchronized 只锁定当前链表或红黑二叉树的首节点,这样只要 hash 不冲突,就不会产生并发,就不会影响其他 Node 的读写,效率大幅提升
- CAS和synchronized来保证线程 并发安全
- 键值不允许null
看下类上下结构
核心结构
Node<K,V>[] table
:存储节点的数组。
Node<K,V>
:基本节点结构,链表节点。
TreeNode<K,V>
:红黑树节点(当桶中链表过长时转为红黑树)。
ForwardingNode
:迁移中的占位节点,用于扩容时的协助
Java 8 中,锁粒度更细,synchronized 只锁定当前链表或红黑二叉树的首节点,这样只要 hash 不冲突,就不会产生并发,就不会影响其他 Node 的读写,效率大幅提升
ConcurrentHashMap 提供了一些原子性的复合操作,如 putIfAbsent、compute、computeIfAbsent 、computeIfPresent、merge等内部方法
putIfAbsent(K key, V value): 只有map中还没有指定key时才添加
remove(Object key, Object value): 如果指定的键所映射的当前值等于给定值,则将其移除。
replace(K key, V oldValue, V newValue): 只有传入old值等于map中的值,才会替换成新值。
replace(K key, V value): 不管原始值是什么,都将指定键的映射值替换为新值。
compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction): 对给定的键应用提供的函数,如果键存在,则使用其当前映射值作为函数的第一个参数,函数的返回值将作为新的映射值。如果键不存在,则根据函数的返回值(通常应为非null)插入新条目。
computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction): 如果指定的键不存在或映射到null,则使用给定的函数来计算其映射值,并将其作为新值插入。
computeIfPresent(K key, BiFunction<? super K,? super V,? extends V> remappingFunction): 如果指定的键存在,则使用给定的函数来计算其新映射值,如果函数返回非null,则更新映射值。
这里说下computeIfAbsent方法使用场景
- 延迟初始化
- 避免多线程竞争
- 高效计算
import java.util.concurrent.ConcurrentHashMap;public class ComputeIfAbsentExample {public static void main(String[] args) {// 创建一个 ConcurrentHashMap 实例ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();// 使用 computeIfAbsent 方法String result = map.computeIfAbsent("key1", key -> {// 假设在这里执行复杂计算或数据库查询操作System.out.println("Calculating value for key: " + key);return "computedValue";});// 输出: Calculating value for key: key1// 输出Result: computedValueSystem.out.println("Result: " + result); // 再次调用 computeIfAbsent 时,计算函数不会再执行,因为键已经存在String result2 = map.computeIfAbsent("key1", key -> {System.out.println("This will not be executed");return "anotherValue";});// 输出: Result2: computedValueSystem.out.println("Result2: " + result2); }
}//延迟加载缓存
ConcurrentHashMap<String, Data> cache = new ConcurrentHashMap<>();
public Data getData(String key) {return cache.computeIfAbsent(key, k -> loadDataFromDB(k));
}//并发分类统计
ConcurrentHashMap<String, AtomicInteger> wordCount = new ConcurrentHashMap<>();
public void count(String word) {wordCount.computeIfAbsent(word, k -> new AtomicInteger(0)).incrementAndGet();
}
源码分析
标准Node结构
static class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;volatile V val;volatile Node<K,V> next;
sizeCtl
private transient volatile int sizeCtl;
- 控制扩容阈值
- 扩容过程的状态管理
- 正数:表示哈希表容量值,何时开始扩容
- 负数:正在扩容,不同值含义不同
initTable()
// 初始化哈希表数组
private final Node<K,V>[] initTable() {Node<K,V>[] tab; // 声明一个Node类型的引用变量tab,用于存放当前table的引用int sc; // 声明一个整型变量sc,用于存放sizeCtl的值// 使用while循环确保在并发环境下正确初始化while ((tab = table) == null || tab.length == 0) { // 当table未初始化或者长度为0时,继续循环// 检查是否因其他线程正在进行初始化而导致的竞争失败if ((sc = sizeCtl) < 0) {Thread.yield(); // 如果是竞争失败(sizeCtl负数表示有线程正在初始化或扩容),则让出CPU时间片,稍后再试} else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 尝试通过CAS操作获取初始化权限try {// 再次检查table是否已被其他线程初始化if ((tab = table) == null || tab.length == 0) {// 计算容量n,如果sc大于0,则使用sc作为容量;否则使用默认容量DEFAULT_CAPACITYint n = (sc > 0) ? sc : DEFAULT_CAPACITY;// 创建一个新的Node数组nt,类型转换以避免泛型警告@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];// 将新创建的数组赋值给tabletable = tab = nt;// 计算新的sizeCtl值,大约为容量的3/4,用于控制并发扩容sc = n - (n >>> 2);}} finally { // 确保在异常情况下也能释放初始化锁sizeCtl = sc; // 设置sizeCtl为计算好的值,允许其他线程执行扩容或进一步的初始化}break; // 成功初始化后退出循环}}return tab; // 返回初始化后的table引用
}
get
/** 相比put方法,get就很单纯了,支持并发操作,* 当key为null的时候回抛出NullPointerException的异常* get操作通过首先计算key的hash值来确定该元素放在数组的哪个位置* 然后遍历该位置的所有节点* 如果不存在的话返回null*/
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// 求取hashint h = spread(key.hashCode());// table不为空,并且hash节点不为空if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// 节点hash等于key的hash,就equals比较当前值,如果相等就直接返回if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;// 遍历下一个往后table,equals比较查找while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}
put
/** 当添加一对键值对的时候,首先会去判断保存这些键值对的数组是不是初始化了,* 如果没有的话就初始化数组* 然后通过计算hash值来确定放在数组的哪个位置* 如果这个位置为空则直接添加,如果不为空的话,则取出这个节点来* 如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制* 最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加操作* 然后判断当前取出的节点位置存放的是链表还是树* 如果是链表的话,则遍历整个链表,直到取出来的节点的key来个要放的key进行比较,如果key相等,并且key的hash值也相等的话,* 则说明是同一个key,则覆盖掉value,否则的话则添加到链表的末尾* 如果是树的话,则调用putTreeVal方法把这个元素添加到树中去* 最后在添加完成之后,会判断在该节点处共有多少个节点(注意是添加前的个数),如果达到8个以上了的话,* 则调用treeifyBin方法来尝试将处的链表转为树,或者扩容数组*/
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();//K,V都不能为空,否则的话跑出异常//取得key的hash值int hash = spread(key.hashCode()); //用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树int binCount = 0; for (Node<K,V>[] tab = table;;) { //Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0) //第一次put的时候table没有初始化,则初始化tabletab = initTable(); //通过哈希计算出一个表中的位置因为n是数组的长度,所以(n-1)&hash肯定不会出现数组越界else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果这个位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) //创建一个Node添加到数组中区,null表示的是下一个节点为空break; // no lock when adding to empty bin}/** 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段,* 则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失*/else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);else {/** 如果在这个位置有元素的话,就采用synchronized的方式加锁,锁的f是要操作的Node节点* 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历,* 如果找到了key和key的hash值都一样的节点,则把它的值替换到* 如果没找到的话,则添加在链表的最后面* 否则,是树的话,则调用putTreeVal方法添加到树中去* * 在添加完之后,会对该节点上关联的的数目进行判断,* 如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容*/V oldVal = null;synchronized (f) {//再次取出要存储的位置的元素,跟前面取出来的比较if (tabAt(tab, i) == f) { //取出来的元素的hash值大于0,当转换为树之后,hash值为-2if (fh >= 0) { binCount = 1; //遍历这个链表for (Node<K,V> e = f;; ++binCount) { K ek;//要存的元素的hash,key跟要存储的位置的节点的相同的时候,替换掉该节点的value即可if (e.hash == hash && ((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;//当使用putIfAbsent的时候,只有在这个key没有设置值得时候才设置if (!onlyIfAbsent) e.val = value;break;}Node<K,V> pred = e;//如果不是同样的hash,同样的key的时候,则判断该节点的下一个节点是否为空,if ((e = e.next) == null) { //为空的话把这个要加入的节点设置为当前节点的下一个节点pred.next = new Node<K,V>(hash, key, value, null);break;}}}//表示已经转化成红黑树类型了else if (f instanceof TreeBin) { Node<K,V> p;binCount = 2;//调用putTreeVal方法,将该元素添加到树中去if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {//当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为treeif (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null)return oldVal;break;}}}//计数addCount(1L, binCount); return null;
}
static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;
}
这块处理放入,包括树节点
// 在给定的哈希值h下,向树中插入键k和值v,返回已存在的包含相同键的节点(如果有)
final TreeNode<K,V> putTreeVal(int h, K k, V v) {Class<?> kc = null; // 用于存储K类的Class对象,用于比较boolean searched = false; // 标记是否已经搜索过左右子树// 从根节点开始遍历for (TreeNode<K,V> p = root;;) {int dir, ph; // dir指示插入方向,ph为当前节点的哈希值K pk; // 当前节点的键// 如果树为空,创建新节点作为根if (p == null) {first = root = new TreeNode<K,V>(h, k, v, null, null);break;}// 根据哈希值确定插入方向else if ((ph = p.hash) > h)dir = -1; // 插入左子树else if (ph < h)dir = 1; // 插入右子树// 如果哈希值相同且键相等,则返回现有节点else if ((pk = p.key) == k || (pk != null && k.equals(pk)))return p;// 如果键不可比较或比较结果为0(即相等),则需要进一步查找或决定插入位置else if ((kc == null &&(kc = comparableClassFor(k)) == null) || // 尝试获取可比较的Class(dir = compareComparables(kc, k, pk)) == 0) { // 比较键,dir=0表示相等if (!searched) { // 如果还没搜索过左右子树TreeNode<K,V> q, ch; // q为找到的节点,ch为子树searched = true; // 设置已搜索标志// 在左子树和右子树中查找键kif (((ch = p.left) != null && (q = ch.findTreeNode(h, k, kc)) != null) ||((ch = p.right) != null && (q = ch.findTreeNode(h, k, kc)) != null))return q; // 找到则返回}// 键不可比或比较结果为0时,使用tieBreakOrder决定插入方向dir = tieBreakOrder(k, pk);}// 沿dir方向移动到下一个节点TreeNode<K,V> xp = p;if ((p = (dir <= 0) ? p.left : p.right) == null) {// 创建新节点,并插入到树中TreeNode<K,V> x, f = first;first = x = new TreeNode<K,V>(h, k, v, f, xp);// 更新前驱节点if (f != null)f.prev = x;// 插入新节点if (dir <= 0)xp.left = x;elsexp.right = x;// 红黑树平衡处理if (!xp.red)x.red = true; // 父节点不是红色,新节点变为红色else {lockRoot(); // 锁住根节点以进行红黑树调整try {root = balanceInsertion(root, x); // 平衡插入后的新树根} finally {unlockRoot(); // 解锁根节点}}break; // 插入完成,跳出循环}}// 确保红黑树性质不变assert checkInvariants(root);return null; // 返回null,因为插入总是成功并返回已有节点或无返回(新插入)
}
spread方法通过对输入整数,h进行无符号右移、按位异或操作,以及按位与操作,生成了一个更加均匀分布的散列值,这对于在ConcurrentHashMap中高效定位元素存储位置至关重要,减少了哈希冲突,提升了并发访问性能
-
(h ^ (h >>> 16))这部分是散列值计算的核心逻辑。
-
- h >>> 16:这里使用无符号右移操作符,将h的二进制表示向右移动16位。无符号右移意味着无论h是正数还是负数,高位都会补0。这样做可以将原始的高位和低位信息混合。
-
- ^:这是一个按位异或操作符。将原始的h与右移16位后的h进行异或操作。异或操作能够使得高低位的信息进一步混合,有助于消除连续的0或1,从而提高散列值的随机性和均匀性。
-
& HASH_BITS
-
HASH_BITS是一个未在代码片段中直接定义的常量,但在ConcurrentHashMap的上下文中,它通常用来限制散列值的范围,确保散列值适配于表的大小。例如,如果HASH_BITS等于32(即0x7FFFFFFF),那么此操作会将散列值截断到32位,丢弃更高位的比特,这对于保持散列值在预期范围内很有帮助,尤其是在做模运算分配桶位置时。
-
&是按位与操作符,用于确保最终的散列值不会超出预定义的范围,提高了散列值的有效性和使用效率
-
这段是一个大佬那记的,整不太明白,大家大概理解一下就好
computeIfAbsent
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {if (key == null || mappingFunction == null) {throw new NullPointerException();}int hash = spread(key.hashCode()); // 1. 对 key 的 hash 进行扰动计算,避免 hash 冲突Node<K,V>[] tab; Node<K,V> first; int n, i;V val;// 2. 查找 key 所在桶位(i),若桶非空,则遍历链表或树查找 keyif ((tab = table) != null && (n = tab.length) > 0 &&(first = tabAt(tab, i = (n - 1) & hash)) != null) {Node<K,V> e; K ek;// a. 若首节点即为目标 key,直接返回其 valueif (first.hash == hash && ((ek = first.key) == key || (ek != null && key.equals(ek)))) {val = first.val;}// b. 若不是,进入链表/树查找流程(锁住桶)else if ((val = (e = first).find(hash, key)) != null) {// 如果找到了,也返回 value}// c. 找不到则尝试插入(需加 synchronized 锁)else {synchronized (e) {if (tabAt(tab, i) == e) {if (e instanceof TreeBin) {val = ((TreeBin<K,V>) e).computeIfAbsent(hash, key, mappingFunction);} else {val = mappingFunction.apply(key);if (val != null) {setTabAt(tab, i, new Node<K,V>(hash, key, val, e));}}}}}if (val != null) {return val;}}// 3. 若桶为空(tab[i] == null),使用 CAS 创建节点插入val = mappingFunction.apply(key);if (val == null) {return null;}// 尝试插入新节点(线程安全 CAS)Node<K,V> newNode = new Node<>(hash, key, val, null);if ((tab = table) == null || (n = tab.length) == 0 ||tabAt(tab, i = (n - 1) & hash) != null ||!casTabAt(tab, i, null, newNode)) {putVal(key, val, false); // CAS 失败则退化使用 putVal}return val;
}
扩容
/***链表转为红黑树 Replaces all linked nodes in bin at given index unless table is* too small, in which case resizes instead.* 当数组长度小于64的时候,扩张数组长度一倍,否则的话把链表转为树*/
// 将指定索引处的链表转换为红黑树结构
private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; // 用于暂存当前桶的头结点int n, sc; // n为哈希表长度,sc为sizeCtl的值// 检查哈希表是否为空,不为空则继续if (tab != null) {// 如果哈希表长度小于MIN_TREEIFY_CAPACITY(默认为64),则尝试扩容if ((n = tab.length) < MIN_TREEIFY_CAPACITY)tryPresize(n << 1); // 尝试将表大小加倍else {// 获取桶index处的头结点if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 确保头结点存在且不是ForwardingNode(迁移标识)synchronized (b) { // 对桶头结点加锁,防止并发修改// 再次检查桶内头结点,确保在加锁前后未被其他线程改变,类似单例的双重检测if (tabAt(tab, index) == b) {// 初始化红黑树的头结点和尾结点TreeNode<K,V> hd = null, tl = null;// 遍历桶中的链表for (Node<K,V> e = b; e != null; e = e.next) {// 将链表节点转换为树节点TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);// 构建红黑树的双向链表结构if ((p.prev = tl) == null)hd = p; // 第一个节点elsetl.next = p;tl = p; // 更新尾节点}// 将桶设置为新的TreeBin节点,完成树化setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}}
}/*** 预扩容 它的主要目的是在实际添加元素之前,根据预期的元素数量size来决定是否需要扩大哈希表的容量,以避免在高并发情况下频繁的扩容操作扩容表为指可以容纳指定个数的大小(总是2的N次方)* 假设原来的数组长度为16,则在调用tryPresize的时候,size参数的值为16<<1(32),此时sizeCtl的值为12* 计算出来c的值为64,则要扩容到sizeCtl≥为止* 第一次扩容之后 数组长:32 sizeCtl:24* 第二次扩容之后 数组长:64 sizeCtl:48* 第二次扩容之后 数组长:128 sizeCtl:94 --> 这个时候才会退出扩容*/
private final void tryPresize(int size) {/** 计算目标容量,如果size大于最大容量的一半,则直接使用最大容量MAXIMUM_CAPACITY = 1 << 30* 否则使用tableSizeFor算出来* 后面table一直要扩容到这个值小于等于sizeCtrl(数组长度的3/4)才退出扩容*/int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc; //sizeCtl的副本,用于控制并发扩容while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;// 当前哈希表长度/** 如果数组table还没有被初始化,则初始化一个大小为sizeCtrl和刚刚算出来的c中较大的一个大小的数组* 初始化的时候,设置sizeCtrl为-1,初始化完成之后把sizeCtrl设置为数组长度的3/4* 为什么要在扩张的地方来初始化数组呢?这是因为如果第一次put的时候不是put单个元素,* 而是调用putAll方法直接put一个map的话,在putALl方法中没有调用initTable方法去初始化table,* 而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断*/if (tab == null || (n = tab.length) == 0) {//计算新容量,取sc和c中较大的一个n = (sc > c) ? sc : c;// 尝试将sizeCtl设为-1,标记正在进行初始化或扩容准备 所以我们一般可以用标识位来标记我们想做什么,相当于获取了这个权限if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {// 确保table没有被其他线程初始化 如果其他简称操作了那么就不相等了if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;// 设置sizeCtl为新容量的3/4,用于后续扩容控制sc = n - (n >>> 2);}} finally {sizeCtl = sc;}}}/** 一直扩容到的c小于等于sizeCtl或者数组长度大于最大长度的时候,则退出* 所以在一次扩容之后,不是原来长度的两倍,而是2的n次方倍*/else if (c <= sc || n >= MAXIMUM_CAPACITY) {break; //退出扩张}else if (tab == table) {int rs = resizeStamp(n); // 计算resize的stamp,用于并发控制if (sc < 0) { // 如果sizeCtl负值,说明已经有其他线程在进行扩容Node<K,V>[] nt;// 检查是否应该协助扩容或扩容已完成if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;/** transfer的线程数加一,该线程将进行transfer的帮忙* 在transfer的时候,sc表示在transfer工作的线程数*/if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);// 参与扩容操作}/** 没有在初始化或扩容,则开始扩容*/else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2)) {transfer(tab, null);// 开始扩容,nextTable为null表示新建}}}
}
/*** 把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置* 在这里首先会计算一个步长,表示一个线程处理的数组长度,用来控制对CPU的使用,* 每个CPU最少处理16个长度的数组元素,也就是说,如果一个数组的长度只有16,那只有一个线程会对其进行扩容的复制移动操作* 扩容的时候会一直遍历,直到复制完所有节点,每处理一个节点的时候会在链表的头部设置一个fwd节点,这样其他线程就会跳过他,* 复制后在新数组中的链表不是绝对的反序的*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// 获取旧表的长度,并计算每个CPU应处理的步长(stride)int n = tab.length, stride;// 如果CPU数量大于1,根据CPU数量和旧表长度计算每个线程处理的区间大小;否则,整个表由一个线程处理if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // 如果计算出的步长太小,设置最小步长以确保有效并发// 初始化新表if (nextTab == null) { // 如果新表未初始化(表示当前线程是第一个参与扩容的线程)try {// 创建新表,长度为旧表的两倍@SuppressWarnings("unchecked") // 压制类型安全警告Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt;} catch (Throwable ex) { // 处理可能的内存不足异常sizeCtl = Integer.MAX_VALUE; // 设置标志表示扩容失败return;}nextTable = nextTab; // 设置新表引用transferIndex = n; // 设置转移索引,开始于旧表的末尾}int nextn = nextTab.length; // 新表长度ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 创建一个转发节点,用于新表完成前的临时访问boolean advance = true; // 是否继续处理下一个桶boolean finishing = false; // 标记是否已完成所有桶的处理// 主循环:遍历旧表并转移节点到新表for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 决定是否推进到下一个桶while (advance) {int nextIndex, nextBound;// 达到边界或完成阶段则停止推进if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 尝试更新transferIndex,以便其他线程可以处理不同的部分else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}// 检查是否越界或已完成所有桶的处理if (i < 0 || i >= n || i + n >= nextn) {int sc;// 如果完成阶段,完成转移并更新控制状态if (finishing) {nextTable = null;table = nextTab; // 更新表引用为新表sizeCtl = (n << 1) - (n >>> 1); // 重置sizeCtl为新容量的调整值return;}// 减少sizeCtl以表明有线程完成其工作部分,检查是否所有线程完成if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return; // 如果不是所有线程都在尝试扩容,则退出finishing = advance = true; // 进入完成阶段并重新检查桶i = n; // 重新检查所有桶,确保没有遗漏}}// 处理当前桶else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd); // 置空的桶直接放入转发节点并推进else if ((fh = f.hash) == MOVED)advance = true; // 已经被处理过的桶,直接跳过else {// 对链表或树形节点进行同步处理和转移synchronized (f) {// 确保桶内容未被其他线程修改 if (tabAt(tab, i) == f) {// 分离链表或树为两个部分,根据hash值分配到新表的不同部分Node<K,V> ln, hn;//该节点的hash值大于等于0,说明是一个Node节点if (fh >= 0) { /** 因为n的值为数组的长度,且是power(2,x)的,所以,在&操作的结果只可能是0或者n* 根据这个规则* 0--> 放在新表的相同位置* n--> 放在新表的(n+原来位置)*/int runBit = fh & n; Node<K,V> lastRun = f;/** lastRun 表示的是需要复制的最后一个节点* 每当新节点的hash&n -> b 发生变化的时候,就把runBit设置为这个结果b* 这样for循环之后,runBit的值就是最后不变的hash&n的值* 而lastRun的值就是最后一次导致hash&n 发生变化的节点(假设为p节点)* 为什么要这么做呢?因为p节点后面的节点的hash&n 值跟p节点是一样的,* 所以在复制到新的table的时候,它肯定还是跟p节点在同一个位置* 在复制完p节点之后,p节点的next节点还是指向它原来的节点,就不需要进行复制了,自己就被带过去了* 这也就导致了一个问题就是复制后的链表的顺序并不一定是原来的倒序*/for (Node<K,V> p = f.next; p != null; p = p.next) {//n的值为扩张前的数组的长度int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}/** 构造两个链表,顺序大部分和原来是反的* 分别放到原来的位置和新增加的长度的相同位置(i/n+i)*/for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)/** 假设runBit的值为0,* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/ln = new Node<K,V>(ph, pk, pv, ln);else/** 假设runBit的值不为0,* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/hn = new Node<K,V>(ph, pk, pv, hn); }setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) { //否则的话是一个树节点TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}/** 在复制完树节点之后,判断该节点处构成的树还有几个节点,* 如果≤6个的话,就转回为一个链表*/ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}
}
什么时候触发扩容?
treeIfybin主要是在put添加元素完之后,判断该数组节点相关元素是不是已经超过8个的时候,如果超过则会调用这个方法来扩容数组或者把链表转为树
helpTransfer是在当一个线程要对table中元素进行操作的时候,如果检测到节点的HASH值为MOVED的时候,就会调用helpTransfer方法,在helpTransfer中再调用transfer方法来帮助完成数组的扩容
addCount是在当对数组进行操作,使得数组中存储的元素个数发生了变化的时候会调用的方法。
引起扩容的结论如下:
只有在往map中添加元素的时候,在某一个节点的数目已经超过了8个,同时数组的长度又小于64的时候,才会触发数组的扩容
当数组中元素达到了sizeCtl的数量的时候,则会调用transfer方法来进行扩容
多线程如何做到同步处理
使用Synchronized和CAS
在数组扩容的时候,则通过处理的步长和fwd节点来达到并发安全的目的,通过设置hash值为MOVED,使其他线程执行helpTransfer
transfer方法和helpTransfer扩容方法是如何保证线程安全的?
transfer方法
- 目的
transfer方法负责将哈希表从旧数组迁移到新数组,这是在扩容时发生的核心操作。它首先会检查是否需要进行扩容(通过读取sizeCtl变量),然后创建一个新的、容量更大的数组nextTable。
- 线程协作
在扩容过程中,多个线程可能会同时尝试执行扩容操作。为了协调这些线程,transfer方法采用了“分段迁移”的策略,即将整个表分成多个小部分,每个线程负责一部分的迁移工作。这样可以减少线程间的竞争,提高效率。
- CAS操作
在转移过程中,使用了CAS操作来原子性地更新桶的状态,确保在多线程环境下操作的原子性和一致性。例如,当需要将桶中的节点迁移到新表时,会先尝试用CAS操作将桶标记为正在迁移状态。
helpTransfer方法
- 协助扩容
当一个线程尝试插入新元素时,如果发现正在进行扩容操作,它会调用helpTransfer方法来协助完成迁移工作,而不是等待扩容完成。这增加了并发度,使得扩容过程更加高效。
- 安全协作
helpTransfer会检查当前的扩容状态,如果发现有新的扩容任务尚未分配(即transferIndex还未达到新表的大小),它会尝试获取一部分任务并执行迁移工作。这一步同样使用了原子性的操作来避免冲突。
- 控制并发度
通过sizeCtl变量,ConcurrentHashMap能够控制参与扩容的线程数量,避免过多线程同时参与导致的竞争激烈。sizeCtl不仅用来表示扩容的命令,还用来控制并发帮助者的数量,确保资源的有效利用和线程安全。
有些源码点还是没太懂,也许后面应该好好debug理解一下