LinkedBlockingQueue、ConcurrentLinkedQueue和ArrayBlockingQueue深度解析
LinkedBlockingQueue
LinkedBlockingQueue
是 Java 并发包中一个非常重要的组件,它是一个基于链表节点实现的、可选容量的阻塞队列。
LinkedBlockingQueue
在并发编程中应用广泛,主要场景包括:
-
生产者 - 消费者模式:这是最经典的用法。生产者线程向队列中添加元素(生产数据),消费者线程从队列中取出元素(消费数据)。
LinkedBlockingQueue
提供了阻塞的put
和take
方法,能够有效地协调生产者和消费者的速度。当队列满时,生产者阻塞;当队列空时,消费者阻塞。 -
线程池的任务队列:
ThreadPoolExecutor
构造时可以接收一个BlockingQueue
作为任务等待队列。LinkedBlockingQueue
由于其可伸缩性(默认容量为Integer.MAX_VALUE
)和较好的吞吐量,常被用作线程池的任务队列。例如,Executors.newFixedThreadPool()
内部就可能使用LinkedBlockingQueue
。 -
数据缓冲:在不同处理速率的模块之间传递数据时,
LinkedBlockingQueue
可以作为缓冲区,平滑数据流,防止快速生产者压垮慢速消费者,或慢速生产者饿死快速消费者。 -
解耦:通过队列,可以将消息的发送方和接收方解耦,发送方不需要知道接收方的存在,也不需要等待接收方处理完成。
方法结构
LinkedBlockingQueue
继承自 AbstractQueue
并实现了 BlockingQueue
接口。其方法可以大致分为以下几类:
添加元素(生产者操作)
-
put(E e)
: 将元素插入队列尾部。如果队列已满(对于有界队列),则阻塞当前线程直到队列有空间。会抛出InterruptedException
。 -
offer(E e, long timeout, TimeUnit unit)
: 将元素插入队列尾部。如果队列已满,则阻塞当前线程,直到队列有空间或超时。返回true
表示成功,false
表示超时。会抛出InterruptedException
。 -
offer(E e)
: 将元素插入队列尾部。如果队列已满(对于有界队列),立即返回false
而不阻塞。如果队列未满,则插入成功返回true
。这是非阻塞的。 -
add(E e)
: 继承自Collection
。对于LinkedBlockingQueue
,如果队列已满(有界情况),此方法会抛出IllegalStateException
。通常推荐使用offer
或put
。
移除元素(消费者操作)
-
take()
: 从队列头部移除并返回元素。如果队列为空,则阻塞当前线程直到队列中有元素。会抛出InterruptedException
。 -
poll(long timeout, TimeUnit unit)
: 从队列头部移除并返回元素。如果队列为空,则阻塞当前线程,直到队列中有元素或超时。返回元素或null
(超时)。会抛出InterruptedException
。 -
poll()
: 从队列头部移除并返回元素。如果队列为空,立即返回null
而不阻塞。这是非阻塞的。 -
remove(Object o)
: 从队列中移除指定元素的单个实例(如果存在)。需要获取两把锁,效率较低。
检查元素(不移除)
-
peek()
: 返回队列头部的元素,但不移除。如果队列为空,返回null
。 -
element()
: 继承自Queue
。返回队列头部的元素,但不移除。如果队列为空,抛出NoSuchElementException
。
批量操作
-
drainTo(Collection<? super E> c)
: 移除此队列中所有可用的元素,并将它们添加到给定集合中。 -
drainTo(Collection<? super E> c, int maxElements)
: 最多从此队列中移除maxElements
个可用元素,并将它们添加到给定集合中。
其他工具方法
-
size()
: 返回队列中的元素数量。由于并发特性,返回的是一个近似值,因为在调用此方法时,其他线程可能正在添加或删除元素。 -
remainingCapacity()
: 返回队列剩余可用容量。对于无界队列(容量为Integer.MAX_VALUE
),此方法总是返回Integer.MAX_VALUE
减去当前size()
。 -
contains(Object o)
: 判断队列是否包含指定元素。需要获取两把锁。 -
toArray()
: 将队列元素转换为数组。 -
clear()
: 清空队列。需要获取两把锁。 -
iterator()
: 返回一个迭代器。迭代器是“弱一致性”的,可能不会反映迭代器创建后的修改。
主要和难点方法
LinkedBlockingQueue
的核心设计在于其高效的并发控制,主要通过以下机制实现:
内部节点 Node<E>
static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; }
}
这是一个典型的链表节点结构,包含元素 item
和指向下一个节点的 next
指针。
两把锁(Two Lock Queue)算法
-
putLock (ReentrantLock)
: 控制所有添加操作(put
,offer
)。 -
takeLock (ReentrantLock)
: 控制所有移除操作(take
,poll
)。
这种分离使得生产者和消费者可以并行操作,只要队列既不空也不满,从而提高吞吐量。
两个条件变量(Condition
)
-
notEmpty
: 与takeLock
关联。当队列为空时,消费者线程在此条件上等待 (notEmpty.await()
)。当生产者添加元素后,会唤醒等待的消费者 (notEmpty.signal()
)。 -
notFull
: 与putLock
关联。当队列已满时(对于有界队列),生产者线程在此条件上等待 (notFull.await()
)。当消费者移除元素后,会唤醒等待的生产者 (notFull.signal()
)。
原子计数器 count (AtomicInteger)
用于记录队列中元素的数量。使用 AtomicInteger
可以在不获取锁的情况下更新计数,减少锁竞争。put
和 take
操作在各自的锁保护下更新 count
,但检查队列是否满/空时,可以在锁外读取 count.get()
(这依赖于锁的内存可见性保证以及 AtomicInteger
本身的特性)。
head
和 last
指针
-
head
: 指向链表的头节点。head
节点本身不存储实际元素 (head.item == null
),它是一个哨兵节点或哑节点。真正的第一个元素是head.next
。 -
last
: 指向链表的尾节点。新元素总是添加到last.next
,然后last
指针后移。
初始化时:last = head = new Node<E>(null);
信令机制 signalNotEmpty()
和 signalNotFull()
-
signalNotEmpty()
: 在put
或offer
成功添加元素后,如果添加前队列为空 (c == 0
),则调用此方法。它会获取takeLock
,然后调用notEmpty.signal()
来唤醒一个可能正在等待的消费者线程。 -
signalNotFull()
: 在take
或poll
成功移除元素后,如果移除前队列已满 (c == capacity
),则调用此方法。它会获取putLock
,然后调用notFull.signal()
来唤醒一个可能正在等待的生产者线程。
级联唤醒 (Cascading Notify)
在 put
方法中,如果 c + 1 < capacity
(即添加后队列仍有空间),会调用 notFull.signal()
。类似地,在 take
方法中,如果 c > 1
(即取出后队列仍有元素),会调用 notEmpty.signal()
。这种设计是为了在某些情况下,一个生产者可以唤醒另一个等待的生产者(如果队列仍未满),一个消费者可以唤醒另一个等待的消费者(如果队列仍未空),从而提高并发性能,避免了“惊群效应”并减少了不必要的上下文切换。
fullyLock()
和 fullyUnlock()
某些操作如 remove(Object)
, contains(Object)
, toArray()
, clear()
需要遍历整个队列或修改队列的全局状态,这些操作需要同时获取 putLock
和 takeLock
以确保独占访问。
入队 enqueue(Node<E> node)
和出队 dequeue()
方法
-
enqueue(Node<E> node)
:
// ...
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node;
}
// ...
此方法在 putLock
的保护下执行。它将新节点链接到当前 last
节点的 next
,然后更新 last
指向新节点。
-
dequeue()
:
// ...
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x;
}
// ...
此方法在 takeLock
的保护下执行。
-
h
指向当前的哑head
节点。 -
first
指向第一个实际元素节点 (h.next
)。 -
h.next = h;
这是一个帮助 GC 的技巧。将旧的head
节点指向自身,意味着它不再指向队列中的任何有效节点,如果迭代器不持有对旧head
的引用,它可以被回收。这也用于弱一致性迭代器,当迭代器发现前驱节点自指时,知道需要跳到新的head.next
。 -
新的
head
更新为first
(原来的第一个元素节点现在成为新的哑head
)。 -
取出元素
x = first.item
。 -
first.item = null;
将新的哑head
节点的item
置空,符合head.item == null
的不变性。
put(E e)
方法详解
中断处理的责任被传递给了 LinkedBlockingQueue 的使用者。当调用 linkedBlockingQueue.put(element) 时,需要准备处理可能抛出的 InterruptedException。
// ...
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 响应中断地获取putLock try { while (count.get() == capacity) { // 检查队列是否已满 notFull.await(); // 如果已满,在notFull条件上等待 } enqueue(node); // 入队 c = count.getAndIncrement(); // 原子地增加计数,并获取增加前的值 if (c + 1 < capacity) // 如果队列在添加后仍有空间 notFull.signal(); // 唤醒其他可能在等待的生产者 (级联唤醒) } finally { putLock.unlock(); // 释放putLock } if (c == 0) // 如果是在空队列中添加的第一个元素 signalNotEmpty(); // 唤醒等待的消费者
}
// ...
take()
方法详解
只有空的时候 head == last ,因此take是不会和put冲突的
// ...
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // 响应中断地获取takeLock try { while (count.get() == 0) { // 检查队列是否为空 notEmpty.await(); // 如果为空,在notEmpty条件上等待 } x = dequeue(); // 出队 c = count.getAndDecrement(); // 原子地减少计数,并获取减少前的值 if (c > 1) // 如果队列在取出后仍有元素 notEmpty.signal(); // 唤醒其他可能在等待的消费者 (级联唤醒) } finally { takeLock.unlock(); // 释放takeLock } if (c == capacity) // 如果队列在取出前是满的 signalNotFull(); // 唤醒等待的生产者 return x;
}
// ...
深入分析 ConcurrentLinkedQueue
java.util.concurrent.ConcurrentLinkedQueue
是并发编程中非常实用的无界线程安全队列。
ConcurrentLinkedQueue
是基于链接节点的、无界的、线程安全队列,核心特性是非阻塞,其大部分操作(如 offer
、poll
、peek
)通过 CAS (Compare-And-Set) 原子指令完成,不会导致线程挂起等待锁。常见用法如下:
-
高并发生产者 - 消费者场景:当有大量线程并发生产和消费数据,且不希望因锁竞争导致性能瓶颈时,
ConcurrentLinkedQueue
是不错的选择。由于其无界性,生产者通常不会被阻塞(除非内存耗尽)。 -
任务调度与事件处理:可作为任务队列或事件队列,多个工作线程可从中拉取任务/事件进行处理。
-
数据收集与传递:在需要将多个线程的数据汇总到一个共享队列,然后由一个或多个线程处理的场景中使用。
-
替代需要高吞吐量的
LinkedBlockingQueue
:如果对队列容量没有限制要求,且更看重吞吐量和避免线程阻塞,ConcurrentLinkedQueue
通常比LinkedBlockingQueue
(即使是无界配置)有更好的性能,因为它避免了锁的开销。
与 LinkedBlockingQueue
的关键区别
-
阻塞 vs. 非阻塞:
LinkedBlockingQueue
是阻塞队列,当队列满时put
会阻塞,队列空时take
会阻塞。ConcurrentLinkedQueue
的offer
总是成功(返回true
,除非传入null
元素),poll
在队列为空时返回null
而不阻塞。 -
有界 vs. 无界:
LinkedBlockingQueue
可以是有界的,而ConcurrentLinkedQueue
总是无界的。 -
size()
方法 :LinkedBlockingQueue
的size()
通常是 O(1)(通过AtomicInteger
实现)。ConcurrentLinkedQueue
的size()
是 O(N) 操作,需要遍历队列,且结果可能不精确,不推荐频繁调用。
方法结构
ConcurrentLinkedQueue
继承自 AbstractQueue<E>
并实现了 Queue<E>
接口,其主要方法可分为:
添加元素
-
offer(E e)
: 将指定元素插入此队列的尾部。由于队列是无界的,此方法从不返回false
,是核心的入队操作,非阻塞。 -
add(E e)
: 行为与offer(E e)
相同。
移除元素
-
poll()
: 检索并移除此队列的头,如果此队列为空,则返回null
,是核心的出队操作,非阻塞。 -
remove(Object o)
: 从队列中移除指定元素的单个实例(如果存在)。此操作相对复杂,因为需要在无锁的情况下安全地移除内部节点。
检查元素(不移除)
-
peek()
: 检索但不移除此队列的头,如果此队列为空,则返回null
,非阻塞。 -
element()
: 检索但不移除此队列的头。此方法与peek
的不同之处在于,如果此队列为空,它会抛出NoSuchElementException
。
其他方法
-
isEmpty()
: 如果此队列不包含任何元素,则返回true
,通过调用first() == null
实现。 -
size()
: 返回此队列中的元素数,是一个 O(N) 操作,需要遍历队列,且由于队列的异步特性,结果可能不准确。 -
contains(Object o)
: 如果此队列包含指定元素,则返回true
,也需要遍历。 -
iterator()
: 返回在此队列元素上进行迭代的迭代器。迭代器是“弱一致性”的,可能不会反映迭代器创建后的修改,并且不会抛出ConcurrentModificationException
。
主要和难点算法与方法 (源码级别分析)
ConcurrentLinkedQueue 的核心在于其高效的无锁(lock-free)算法,基于 Maged M. Michael 和 Michael L. Scott 的论文 "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms"。它主要依赖 CAS (Compare-And-Set) 原子操作来实现线程安全。
内部节点 Node<E>
// ...static final class Node<E> {volatile E item;volatile Node<E> next;/*** Constructs a node holding item. Uses relaxed write because* item can only be seen after piggy-backing publication via CAS.*/Node(E item) {ITEM.set(this, item);}/** Constructs a dead dummy node. */Node() {}void appendRelaxed(Node<E> next) {// assert next != null;// assert this.next == null;NEXT.set(this, next);}boolean casItem(E cmp, E val) {// assert item == cmp || item == null;// assert cmp != null;// assert val == null;return ITEM.compareAndSet(this, cmp, val);}}
// ...// VarHandle mechanicsprivate static final VarHandle HEAD;private static final VarHandle TAIL;static final VarHandle ITEM; // Node.itemstatic final VarHandle NEXT; // Node.nextstatic {try {MethodHandles.Lookup l = MethodHandles.lookup();HEAD = l.findVarHandle(ConcurrentLinkedQueue.class, "head", Node.class);TAIL = l.findVarHandle(ConcurrentLinkedQueue.class, "tail", Node.class);ITEM = l.findVarHandle(Node.class, "item", Object.class);NEXT = l.findVarHandle(Node.class, "next", Node.class);} catch (ReflectiveOperationException e) {throw new Error(e);}}
// ...
-
item
和next
字段:-
item
和next
字段都是volatile
的,确保了多线程之间的可见性,并防止指令重排序。
-
-
构造函数
Node(E item)
:-
构造函数
Node(E item)
使用ITEM.set(this, item)
(这是一个VarHandle
的普通写操作,在 Java 9 之前可能是Unsafe.putObject
等)。注释中提到 "relaxed write" 是因为item
的可见性最终会通过后续的 CAS 操作(例如将此节点链接到队列中)来保证。
-
-
casItem(E cmp, E val)
方法:-
casItem(E cmp, E val)
使用ITEM.compareAndSet
(通过VarHandle
)原子地更新item
字段。这在poll()
操作中用于将节点标记为已删除(将其item
置为null
)。
-
-
VarHandle
的使用:-
VarHandle
(自 Java 9 起)用于对字段进行原子操作和内存排序控制,取代了之前版本中对sun.misc.Unsafe
的依赖。HEAD
、TAIL
、Node.ITEM
、Node.NEXT
都是通过VarHandle
来访问和修改的。
-
head
和 tail
指针:
// ...transient volatile Node<E> head;
// ...private transient volatile Node<E> tail;
// ...public ConcurrentLinkedQueue() {head = tail = new Node<E>(); // 初始化时,head和tail指向同一个哑节点 (item为null)}
// ...
1. 节点引用
-
head
和tail
都是volatile
的Node<E>
引用,它们都指向链表中的节点。初始化时,head
和tail
指向一个哑节点(dummy node),其item
字段为null
。这个哑节点作为链表的起始标记。
2. 延迟更新 (Lazy Updates)
-
head
和tail
指针并不会在每次入队或出队操作后都立即更新到最新的头或尾节点。这是一种优化策略,称为“松弛更新”或“延迟更新”,目的是减少 CAS 操作的次数,因为 CAS 操作相对昂贵。 -
通常,当指针(如内部遍历指针)发现
head
或tail
落后超过一定步数(例如两步)时,才会尝试更新它们。 -
由于并发和延迟更新,
tail
甚至可能落后于head
,或者指向一个已经从逻辑上被移除的节点。算法的鲁棒性保证了即使在这种情况下,操作也能正确进行。
入队操作 offer(E e)
-
offer
方法的目标是将新节点newNode
添加到链表的末尾。
// ...public boolean offer(E e) {final Node<E> newNode = new Node<E>(Objects.requireNonNull(e)); // 1. 创建新节点for (Node<E> t = tail, p = t;;) { // 2. 无限循环,t是当前线程认为的尾节点,p是当前操作节点,初始为tNode<E> q = p.next; // 3. q是p的下一个节点if (q == null) { // 4. 如果q为null,说明p是当前链表的最后一个节点// p is last nodeif (NEXT.compareAndSet(p, null, newNode)) { // 5. CAS尝试将newNode设置为p的next节点// Successful CAS is the linearization point// for e to become an element of this queue,// and for newNode to become "live".// 6. CAS成功,newNode已入队if (p != t) // 7. 如果p不等于t (说明tail指针t落后了,或者p在循环中前进了)TAIL.weakCompareAndSet(this, t, newNode); // 8. 尝试(弱)更新全局tail指向newNodereturn true; // 9. 入队成功}// 10. CAS失败,说明其他线程修改了p.next,循环继续,重新读取p.next}else if (p == q) { // 11. 如果p == q (即 p.next == p),说明p是一个“哨兵”节点或已失效节点// We have fallen off list. If tail is unchanged, it// will also be off-list, in which case we need to// jump to head, from which all live nodes are always// reachable. Else the new tail is a better bet.// 12. p需要重新定位: 如果全局tail (t = tail) 没变,则p从head开始;否则p从新的tail开始p = (t != (t = tail)) ? t : head;}else { // 13. p不是尾节点,也不是失效节点 (q != null && p != q)// Check for tail updates after two hops.// 14. 推进p: 如果p不是旧tail (p!=t) 且全局tail已更新 (t!=(t=tail)), 则p跳到新tail; 否则p跳到q (p.next)p = (p != t && t != (t = tail)) ? t : q;}}}
// ...
Offer 逻辑详解
1. 创建新节点
创建一个包含元素 e
的新节点 newNode
,不允许 null
元素。
2. 进入无限循环
-
t
保存当前线程最初看到的tail
值。 -
p
是当前迭代的指针,初始也为t
。 -
q
保存p.next
。
3. 不同情况处理
情况 A:p 是尾节点 (q == null
)
-
通过
NEXT.compareAndSet(p, null, newNode)
尝试原子地将p
的next
指针从null
更新为newNode
。 -
线性化点:如果这个 CAS 成功,那么
newNode
就正式成为队列的一部分,这是offer
操作的线性化点。 -
(延迟更新
tail
):如果 CAS 成功,并且p
不是当前线程最初认为的tail
(t
)(这意味着t
可能已经落后,或者p
在循环中前进了),则尝试使用TAIL.weakCompareAndSet(this, t, newNode)
来更新全局的tail
指针指向newNode
。weakCompareAndSet
是一种可能发生“伪失败”(即变量值未改变但返回false
)的 CAS,但在这里是可接受的,因为tail
的更新本身就是一种优化,即使失败,队列的正确性也由p.next
的 CAS 保证。【在 Java 的 VarHandle 实现中,weakCompareAndSet 通常等同于 compareAndSet,但它暗示了这是一个优化尝试】 -
返回
true
,表示成功。如果 CAS 失败,说明在当前线程尝试更新p.next
时,有其他线程已经修改了它(例如,另一个offer
操作)。循环会继续,重新读取p.next
并重试。
t 是进入 offer 方法时 tail 的一个快照。p 是在循环中不断尝试定位到链表实际尾部的指针。当 NEXT.compareAndSet(p, null, newNode) 成功时,p 是新节点的前驱。
如果 p != t,说明从当前线程开始执行 offer 操作到成功插入新节点期间,链表的实际尾部已经不是最初 t 指向的位置了。在这种情况下,尝试将全局 tail 指针向前移动到新插入的 newNode 是一个合理的优化,以减少后续操作遍历链表的开销。
情况 B:p 是失效节点 (p == q
,即 p.next == p
)
-
失效标记:当一个节点(通常是旧的
head
)被poll
操作移除后,它的next
指针会被设置为指向自身(见updateHead
方法),这作为一个标记,表示该节点已不在队列的有效路径上。 -
如果
p
是这样的失效节点,当前线程的p
指针需要重新定位。-
t != (t = tail)
:这个表达式首先读取全局tail
并赋值给局部变量t
,然后比较这个新的t
和循环开始时保存的t
。如果它们不同,说明全局tail
在此期间被其他线程更新了。 -
如果全局
tail
已经被更新,那么将p
设置为这个新的tail
(t
) 是一个较好的选择,因为它可能更接近实际的链表尾部。 -
如果全局
tail
没有改变(或者p
本身就是从旧tail
开始的,并且旧tail
已经失效),那么最安全的选择是将p
设置为head
,因为所有有效的活动节点都保证可以从head
到达。
-
情况 C:p 是中间节点 (q != null && p != q
)
-
p
不是尾节点,也不是失效节点,意味着p
后面还有其他节点。 -
(推进
p
并尝试跳到新tail
):-
p = (p != t && t != (t = tail)) ? t : q;
-
这个逻辑是为了让
p
更快地接近链表尾部。 -
如果
p
不是当前线程最初看到的tail
(p != t
),并且全局tail
在此期间被其他线程更新了 (t != (t = tail)
,这里t
会先被赋予新的全局tail
值),那么将p
直接设置为这个新的tail
(t
)。这是一种“跳跃”优化。 -
否则(即
p
就是最初的tail
,或者全局tail
没有被更新),则正常地将p
推进到其后继节点q
(p = p.next
)。这种“两步跳跃”的判断(p != t
且tail
已更新)是tail
延迟更新策略的一部分。
-
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
为什么是 "two hops"?
在 offer
方法的循环开始时:for (Node<E> t = tail, p = t;;)
这里 p
初始化为 t
(当前线程看到的 tail
)。
第一次循环
q = p.next;
如果 q
不为 null
且 p != q
,则会执行到我们讨论的这行代码。此时 p
仍然等于初始的 t
。所以 p != t
为 false
。因此,p
会被赋值为 q
。这是第一次跳跃 (p
从初始的 t
跳到了 t.next
)。
第二次循环
p
现在是上一次的 q
(即初始 t
的 next
节点)。
q = p.next;
如果 q
不为 null
且 p != q
,再次执行到这行代码。此时 p
(等于初始 t.next
) 通常不等于初始的 t
。所以 p != t
(这里的 t
仍然是循环开始时或者上一次条件判断中更新的 tail
) 可能为 true
。
这时,t != (t = tail)
这个条件就变得关键了。如果全局 tail
在这两次跳跃期间被更新了,并且新的 tail
比当前 p
更靠前,那么 p
就会直接跳到新的 tail
。
所以,"after two hops" 指的是在 p
从它最初观察到的 tail
(即变量 t
的初始值) 出发,经过了至少一次 p = q
的赋值(完成了一次跳跃)之后,在下一次迭代中,当 p
准备进行第二次跳跃到 q
之前,会执行这个检查。如果检查通过,p
可能会直接跳到最新的 tail
,而不是 q
。
这种“两步一回头”或者说“两步一更新”的策略,是在完全不更新 tail
(可能导致遍历很多已经不再是实际尾部的节点) 和每次都尝试读取 volatile 的 tail 变量之间的一种折中。它允许 tail
指针有一定的“延迟”,但又不会延迟太多,以保持较好的性能。
出队操作 poll()
poll
方法的目标是移除并返回队列头部的元素。
// ...public E poll() {restartFromHead: for (;;) { // 1. 外部无限循环,用于处理p==q(失效节点)的情况for (Node<E> h = head, p = h, q;; p = q) { // 2. 内部无限循环,h是当前线程认为的头,p是当前操作节点final E item;if ((item = p.item) != null && p.casItem(item, null)) { // 3. 如果p.item存在,并且CAS成功将其置为null// Successful CAS is the linearization point// for item to be removed from this queue.// 4. CAS成功,item已从逻辑上移除if (p != h) // 5. 如果p不是h (说明head指针h落后了)updateHead(h, ((q = p.next) != null) ? q : p); // 6. 更新全局headreturn item; // 7. 返回取出的item}else if ((q = p.next) == null) { // 8. p.item为null (p是哑节点或已被删除),且p是尾节点 (q==null)updateHead(h, p); // 9. 更新head到p (队列可能为空,p是最后的哑节点)return null; // 10. 队列为空}else if (p == q) // 11. p.next == p,p是失效节点continue restartFromHead; // 12. 跳到外部循环,从新的全局head重试// 13. else: p.item为null,但p不是尾节点也不是失效节点 (p是已删除的中间节点)// 隐式地 p = q; (在循环的 p = q 处执行) 继续向后查找}}}
// ...
Poll 逻辑详解
1. restartFromHead
标签
-
restartFromHead
是一个标签,当检测到当前遍历指针p
指向一个失效节点(p.next == p
)时,会执行continue restartFromHead
,从当前的全局head
重新开始遍历。
2. 内部循环
-
h
:保存当前线程最初看到的head
值。 -
p
:当前迭代的指针,初始值也为h
。 -
q
:用于保存p.next
。
3. 情况 A:找到有效元素并成功移除
-
item = p.item
:读取p
节点的元素。 -
如果
item
不为null
(说明p
是一个包含有效数据的节点),则尝试通过p.casItem(item, null)
原子地将其item
字段设置为null
。 -
线性化点:如果这个 CAS 成功,元素
item
就从逻辑上被移除了队列,这是poll
操作的线性化点。 -
第 5 - 6 步(延迟更新
head
):如果 CAS 成功,并且p
不是当前线程最初认为的head
(h
)(这意味着h
可能已经落后),则调用updateHead(h, newHead)
。-
newHead
的选择:-
如果
p.next
(即q
,在updateHead
调用前会重新读取p.next
赋值给q
)不为null
,则新的head
应该是q
(即p
的后继节点,它将成为新的哑头节点或第一个有效节点)。 -
如果
p.next
为null
(意味着p
是最后一个节点,现在它的元素也被取走了),则新的head
应该是p
自身(它现在是一个item
为null
的节点,可以作为新的哑头节点)。
-
-
updateHead
内部会 CAS 更新全局HEAD
,并将旧head
(h
)的next
指向自身(h.next = h
),标记为失效。
-
-
第 7 步:返回取出的元素
item
。
4. 情况 B:队列为空或遍历到末尾
-
此条件在
p.item
为null
(即p
是一个哑节点,或者其元素已被其他线程移除)之后检查。 -
如果
p.next
(即q
)为null
,说明p
是链表的最后一个节点。由于p.item
此时为null
,这意味着队列为空(如果p
是初始的哑head
且没有后继),或者遍历到了链表末端的哑节点。 -
第 9 步:调用
updateHead(h, p)
尝试更新全局head
到p
(如果h
落后于p
)。 -
第 10 步:返回
null
,表示队列为空。
5. 情况 C:p
是失效节点
-
如果
p.next
指向p
自身,说明p
是一个已被其他poll
操作处理并标记为失效的旧head
节点。 -
第 12 步:
continue restartFromHead
,跳到外层循环,从当前的全局head
重新开始遍历,因为当前路径可能已经无效。
6. 情况 D:p
是已删除的中间节点
-
这意味着
p.item
为null
,但p
不是尾节点(p.next != null
),也不是失效节点(p.next != p
)。 -
这说明
p
是一个已经被其他线程逻辑删除(其item
被置为null
)但尚未从链中断开(物理删除)的中间节点。 -
此时,循环的
p = q
(即p = p.next
)会使p
指向下一个节点,继续向后遍历查找第一个包含有效元素的节点。
peek()
方法
peek()
的逻辑与 poll()
非常相似,主要的区别在于它不会通过 casItem(item, null)
来将节点的 item
置为 null
,即它只读取头元素而不移除它。
// ...public E peek() {restartFromHead: for (;;) {for (Node<E> h = head, p = h, q;; p = q) {final E item;if ((item = p.item) != null // 找到一个item不为null的节点|| (q = p.next) == null) { // 或者 p.next 为null (p是尾节点,item可能为null也可能不为null)updateHead(h, p); // 尝试更新headreturn item; // 返回item (如果队列空且p是尾哑节点,item会是null)}else if (p == q) // p是失效节点continue restartFromHead;// else: p.item为null,但p不是尾节点也不是失效节点,继续向后 (p=q)}}}
// ...
peek() 同样包含 restartFromHead 逻辑来处理失效节点,以及 updateHead 来帮助推进 head 指针。当找到第一个 p.item != null 的节点,或者遍历到链表尾部 (p.next == null) 时,它会尝试更新 head 并返回该节点的 item(如果队列为空且 p 是尾部的哑节点,则 item 将为 null)。
内部辅助方法说明
updateHead(Node<E> h, Node<E> p)
和 succ(Node<E> p)
是内部辅助方法,对理解队列如何处理 head
的推进和失效节点至关重要。
updateHead(Node<E> h, Node<E> p)
方法
// ... existing code ...
final void updateHead(Node<E> h, Node<E> p) { // assert h != null && p != null && (h == p || h.item == null); if (h != p && HEAD.compareAndSet(this, h, p)) { // 1. CAS更新全局HEAD从h到p NEXT.setRelease(h, h); // 2. 将旧head (h) 的next指向自身,标记为失效 }
}
// ... existing code ...
该方法逻辑如下:
如果当前线程看到的旧 head
(h
) 与期望的新 head
(p
) 不同,并且通过 CAS 成功将全局 HEAD
引用从 h
更新到 p
,则将旧 head
(h
) 的 next
字段通过 NEXT.setRelease(h, h)
(这是一个具有释放屏障的写操作,确保之前的写对其他线程可见)设置为指向 h
自身。
标记失效:这使得 h
成为一个自指节点。其他线程在遍历时(例如通过 succ
方法)如果遇到 node.next == node
的情况,就知道这个 node
是一个过时的、已被移除的头节点,需要从当前的全局 head
重新开始查找。
帮助 GC:断开旧 head
(h
) 对队列中后续节点的强引用。如果 h
没有被其他地方(如迭代器)引用,它就可以被垃圾回收。
succ(Node<E> p)
方法
// ... existing code ...
final Node<E> succ(Node<E> p) { if (p == (p = p.next)) { // 1. 如果 p.next 就是 p 自己 (即 p 是一个自指的失效节点) p = head; // 2. 则从当前的全局 head 重新获取后继 }return p; // 返回真正的后继节点或新的 head
}
// ... existing code ...
p = p.next
会先执行,将 p
更新为其原始的 next
节点。
然后 p == (新的p)
实际上是在比较原始的 p
和原始的 p.next
。如果它们相等,说明原始的 p
的 next
指针指向它自己。如果节点 p
是自指的,那么它是一个失效的旧头节点。此时,应该从当前的全局 head
重新获取后继节点,因为从失效节点继续遍历是没有意义的。
这个方法主要被迭代器和某些内部遍历逻辑(如 remove(Object)
)使用,以确保它们能够正确地跳过已经被 poll
操作移除并标记为失效的旧头节点。
总结
ConcurrentLinkedQueue
是一个高度优化的无锁并发队列。其核心是基于 CAS 的原子操作和 Michael & Scott 算法的变体。关键特性和难点包括:
-
无锁设计:通过
VarHandle
(或旧版的Unsafe
) 进行 CAS 操作,避免了传统锁带来的开销和线程阻塞。 -
哑节点:
head
和tail
通常指向item
为null
的哑节点,简化了边界条件处理。 -
延迟更新
head
和tail
:这是一种重要的性能优化,减少了对head
和tail
的 CAS 操作频率。 -
p.next == p
作为失效标记:当头节点出队后,其next
指针会指向自身,作为后续遍历判断其已失效的依据。 -
offer
和poll
中的复杂循环和条件判断:这些是为了在并发环境下正确地找到链表的尾部(用于offer
)或头部(用于poll
),并处理其他线程可能同时进行的修改、head
/tail
指针的滞后以及失效节点等情况。 -
size()
的 O(N) 复杂度:由于没有集中的计数器,获取大小需要遍历,效率较低且结果不精确。
理解 ConcurrentLinkedQueue
的源码需要对 Java 内存模型、CAS 操作以及无锁数据结构的设计思想有较好的掌握。它是学习高级并发编程的一个优秀案例。
ArrayBlockingQueue
ArrayBlockingQueue 相较于 LinkedBlockingQueue 而言,在并发控制上确实采用了更简单的单锁机制。它使用一个 ReentrantLock 来保护对整个队列状态(包括数组、计数器、以及 putIndex 和 takeIndex)的访问。
然而,尽管其核心并发模型相对直接,ArrayBlockingQueue 仍然有一些值得注意的细节和潜在的复杂点,可能不像表面看起来那么简单:
公平性策略 (Fairness Policy)
ArrayBlockingQueue 的构造函数允许指定一个 fair 参数。
// ... existing code ...
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 注意这里的 fair 参数 notEmpty = lock.newCondition(); notFull = lock.newCondition();
}
// ... existing code ...
如果 fair 为 true,ReentrantLock 会使用公平策略,这意味着等待时间最长的线程将优先获得锁。这有助于防止线程饥饿,但通常会降低整体吞吐量,因为需要额外的簿记来维护等待线程的顺序。
如果 fair 为 false(默认),则锁的获取顺序是不确定的(非公平策略),这可能导致更高的吞吐量,但也可能导致某些线程长时间等待。
这个公平性选择会直接影响到生产者和消费者线程在队列满或空时等待锁的行为,是一个重要的性能和行为权衡点。
removeAt(int removeIndex)
当从队列中间移除一个元素时(例如通过迭代器的 remove() 方法,或者 remove(Object) 找到元素后),ArrayBlockingQueue 需要移动数组中的元素来填补空缺,以保持队列的紧凑性。
// ... existing code ...
void removeAt(final int removeIndex) { // assert lock.isHeldByCurrentThread(); // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items; if (removeIndex == takeIndex) { // 如果移除的是头部元素 // removing front item; just advance items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); } else { // 如果移除的是内部元素 // an "interior" remove // slide over all others up through putIndex. // 将 removeIndex 之后的元素向前移动 for (int i = removeIndex, putIndex = this.putIndex;;) { int pred = i; if (++i == items.length) i = 0; // 处理数组环绕 if (i == putIndex) { // 到达了 putIndex 的位置 items[pred] = null; this.putIndex = pred; // 更新 putIndex break; } items[pred] = items[i]; // 移动元素 } count--; if (itrs != null) itrs.removedAt(removeIndex); } notFull.signal();
}
// ... existing code ...
这个移动操作涉及到对 putIndex 的调整,并且需要正确处理数组的环形结构(当索引到达数组末尾时回到开头)。这个逻辑比简单的头部出队或尾部入队要复杂。
迭代器 (Itrs 类和弱一致性)
ArrayBlockingQueue 维护了一个内部类 Itrs 来管理活动的迭代器。
当队列元素发生变化时(如 dequeue 或 removeAt),需要通知这些迭代器,以便它们能够尝试保持某种程度的一致性(尽管迭代器本身是弱一致性的,不保证反映迭代器创建后的所有修改)。
// ... existing code ...
/** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */
transient Itrs itrs;
// ... existing code ...
private E dequeue() { // ... final Object[] items = this.items; @SuppressWarnings("unchecked") E e = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) // 通知迭代器 itrs.elementDequeued(); notFull.signal(); return e;
}
// ... existing code ...
Itrs 内部使用了 WeakReference 来跟踪迭代器,以避免内存泄漏。当迭代器不再被外部引用时,GC 可以回收它们,WeakReference 会被清除。
Itrs 类本身需要处理迭代器的注册、注销,以及在队列结构变化时(如元素被移除或队列被清空)更新迭代器状态的逻辑,这部分实现也相当精巧。例如,itrs.removedAt(removeIndex) 和 itrs.takeIndexWrapped() 等方法。
构造函数中的初始化逻辑
当通过一个集合来初始化 ArrayBlockingQueue 时,构造函数需要将集合中的元素复制到内部数组中,并正确设置 count 和 putIndex。
// ... existing code ...
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { final Object[] items = this.items; int i = 0; try { for (E e : c) items[i++] = Objects.requireNonNull(e); } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; // 正确设置 putIndex } finally { lock.unlock(); }
}
// ... existing code ...
这里获取锁主要是为了保证内存可见性 (visibility),而不是严格意义上的互斥,因为在构造阶段,其他线程还无法访问这个队列实例。
需要处理集合大小超过容量或者包含 null 元素等异常情况。
toString()
toString() 方法也需要获取锁,并正确地遍历环形数组来构建队列内容的字符串表示。这涉及到从 takeIndex 开始,遍历 count 个元素,并处理数组索引的环绕。
inc(int i, int modulus) 和 dec(int i, int modulus)
这两个静态辅助方法用于在环形数组中递增和递减索引。虽然简单,但它们是保证 takeIndex 和 putIndex 正确环绕的关键。
// ... existing code ...
static final int inc(int i, int modulus) {
if (++i >= modulus) i = 0; return i;
} // ... existing code ...
Itrs 类的作用和机制(介绍)
其它并发安全队列也有迭代器,这里以ArrayBlockingQueue
作为例子
Itrs 类是 ArrayBlockingQueue
的一个内部类,其核心职责是管理所有活动的迭代器(Itr
实例),并在队列发生结构性变化时通知这些迭代器,使其能调整自身状态,尽量保持与队列的一致性。
主要机制
迭代器跟踪 (Iterator Tracking)
-
Itrs 内部维护一个由
Node
对象组成的单向链表(private Node head;
)。 -
Node
是 Itrs 的一个私有内部类,继承自WeakReference<Itr>
,这意味着 Itrs 持有对Itr
对象的弱引用。当一个Itr
对象没有被其他地方强引用时(例如迭代完成、局部变量超出作用域),GC 可以回收它,避免内存泄漏。
// ... existing code ... private class Node extends WeakReference<Itr> { Node next; Node(Itr iterator, Node next) { super(iterator); this.next = next; } } // ... existing code ... /** Linked list of weak iterator references */ private Node head; // ... existing code ...
周期计数 (Cycles)
-
int cycles;
字段记录了takeIndex
环绕数组回到 0 的次数。 -
当
takeIndex
从items.length - 1
变为 0 时,cycles
会递增。 -
cycles
计数与takeIndex
一起,可帮助迭代器判断自上次操作以来有多少元素被出队。
迭代器注册与清理 (Registering and Sweeping)
-
register(Itr itr)
:新的Itr
被创建时,通过此方法注册到 Itrs 的链表中。 -
doSomeSweeping(boolean tryHarder)
:负责清理链表中“失效”的迭代器。迭代器失效可能是因为 GC 已回收它(p.get() == null
),或者迭代器自身报告已“分离”(it.isDetached()
)。 -
清理操作有两种模式:
-
SHORT_SWEEP_PROBES
(默认 4 次探测):创建新迭代器时进行少量探测。 -
LONG_SWEEP_PROBES
(默认 16 次探测):确定至少有一个迭代器可被清理(tryHarder = true
),或在某些特定事件后,进行更彻底的探测。
-
-
sweeper
字段用于记住上次清理到达的位置,避免每次都从头开始扫描整个链表。
回调通知机制 (Callback Notifications)
-
takeIndexWrapped()
:takeIndex
环绕时调用,通知所有迭代器,并清理在一个完整周期内未被使用或已失效的迭代器。 -
removedAt(int removedIndex)
:内部元素(非头部元素)被移除时调用,迭代器需调整内部指针。 -
queueIsEmpty()
:队列变为空时调用,通知所有迭代器,清空弱引用,并将ArrayBlockingQueue.this.itrs
设置为null
。 -
elementDequeued()
:头部元素出队时调用,若队列变空则调用queueIsEmpty()
,若takeIndex
环绕则调用takeIndexWrapped()
。
Itr 类的作用和机制
Itr
是实际的迭代器实现,实现了 Iterator<E>
接口,其复杂性在于如何在队列并发修改(特别是出队和内部删除)的情况下,维护内部状态以提供弱一致性的遍历。
关键字段和方法
核心状态字段
-
cursor
:指向下一个可能返回的元素在items
数组中的索引,若为NONE (-1)
,表示没有更多元素或迭代结束。 -
nextItem
:预读取的下一个元素,hasNext()
主要依赖此字段。 -
nextIndex
:nextItem
在items
数组中的索引。 -
lastItem
:上一个通过next()
返回的元素(仅在分离模式下,或hasNext()
首次返回false
后使用)。 -
lastRet
:上一个通过next()
返回的元素在items
数组中的索引,用于remove()
方法。 -
prevTakeIndex
:上次迭代器操作时队列的takeIndex
,若为DETACHED (-3)
,表示迭代器已进入“分离模式”。 -
prevCycles
:上次迭代器操作时Itrs.cycles
的值。
特殊索引值
-
NONE = -1
:表示索引无效或未定义。 -
REMOVED = -2
:表示元素已被其他操作(非当前迭代器的remove()
)移除。 -
DETACHED = -3
:prevTakeIndex
的特殊值,表示迭代器已分离。
构造函数 Itr()
-
获取队列的锁。
-
若队列为空,则初始化为结束状态,并将
prevTakeIndex
设为DETACHED
。 -
若队列不为空:
-
记录当前的
takeIndex
和itrs.cycles
。 -
预读取第一个元素到
nextItem
,并设置nextIndex
和cursor
。 -
若
ArrayBlockingQueue.this.itrs
为null
,则创建一个新的Itrs
实例。 -
将当前迭代器注册到
itrs
中,并执行一次doSomeSweeping
。
-
分离模式 (isDetached()
, detach()
)
-
当迭代器确定不再需要(或无法再精确地)跟踪队列的实时变化时,进入分离模式(
prevTakeIndex = DETACHED
)。 -
通常在以下情况进入分离模式:
-
所有内部索引(
cursor
,nextIndex
,lastRet
)都变为无效(< 0)。 -
hasNext()
第一次返回false
时(通过noNext()
方法)。 -
调用
forEachRemaining()
后。
-
-
一旦分离,迭代器会尝试从
Itrs
链表中移除自身(通过itrs.doSomeSweeping(true)
),之后不再接收来自Itrs
的更新。
incorporateDequeues()
- 核心同步逻辑
-
该私有方法是维持迭代器状态的关键,在
hasNext()
(间接通过noNext()
)、next()
、remove()
等操作中被调用(前提是迭代器未分离)。 -
比较当前队列的
takeIndex
、itrs.cycles
与迭代器保存的prevTakeIndex
、prevCycles
,计算出从上次操作到现在有多少元素被出队(dequeues
)。 -
根据
dequeues
的数量,检查迭代器内部的lastRet
、nextIndex
、cursor
是否因出队操作而失效,若失效则将相应索引标记为REMOVED
或更新cursor
。 -
若所有相关索引都失效,则调用
detach()
使迭代器进入分离模式;否则,更新prevTakeIndex
和prevCycles
为当前队列的值。
hasNext()
和 noNext()
-
hasNext()
:简单检查nextItem
是否为null
。若nextItem
为null
,则调用noNext()
。 -
noNext()
:获取锁,若迭代器未分离,调用incorporateDequeues()
更新状态。若lastRet
仍然有效,则将lastRet
对应的元素存入lastItem
,然后调用detach()
,以支持在hasNext()
返回false
后调用remove()
删除最后一个元素的场景。
next()
-
若
nextItem
为null
,抛出NoSuchElementException
。 -
获取锁,若未分离,调用
incorporateDequeues()
。 -
更新
lastRet
为当前的nextIndex
。 -
若
cursor
有效,则从cursor
位置预读取下一个元素到nextItem
,更新nextIndex
,并移动cursor
。 -
若
cursor
无效,则将nextItem
和nextIndex
置为无效状态。若此时lastRet
也被标记为REMOVED
,则进入分离模式。 -
返回之前保存的
nextItem
。
remove()
-
获取锁,若未分离,调用
incorporateDequeues()
。 -
检查
lastRet
:-
若
lastRet >= 0
(有效索引):-
若未分离,直接调用
ArrayBlockingQueue.this.removeAt(lastRet)
删除元素。 -
若已分离,需额外检查
items[lastRet]
是否仍然是之前保存的lastItem
,若是则调用removeAt(lastRet)
。
-
-
若
lastRet == NONE
,说明之前没有调用next()
或者已经调用过remove()
,抛出IllegalStateException
。 -
若
lastRet == REMOVED
,说明元素已被异步移除,不做任何操作。
-
-
重置
lastRet
为NONE
。若迭代器状态表明迭代已结束,则进入分离模式。
Itr 的回调方法 (被 Itrs 调用)
-
takeIndexWrapped()
:当Itrs
通知takeIndex
环绕时调用。迭代器检查自己的prevCycles
,若在一个完整的cycles
周期内没有活动(即itrs.cycles - prevCycles > 1
),则认为已失效,返回true
以便Itrs
将其移除;否则返回false
。 -
removedAt(int removedIndex)
:当Itrs
通知内部元素被移除时调用。迭代器调整其内部索引(cursor
,nextIndex
,lastRet
),若调整后发现自身已无有效状态,则返回true
以便Itrs
将其移除。 -
shutdown()
:当Itrs
通知队列变空时调用。迭代器将所有内部索引设为NONE
,并将prevTakeIndex
设为DETACHED
。
总结 Itr 和 Itrs 的复杂性
-
弱一致性:目标不是提供快照式的强一致性,而是在并发修改下,迭代器尽力提供合理的元素序列,不保证反映迭代器创建后的所有修改,但会避免一些明显的错误(如重复报告元素或跳过元素)。
-
并发修改处理:核心挑战在于处理队列在迭代过程中的
put
、take
和remove(Object)
操作。Itrs
和Itr
之间的回调和状态同步机制为此设计。 -
环形数组:索引的环绕增加了计算元素位置和距离的复杂性。
-
内部删除:
removeAt
导致元素移动,是迭代器最难处理的情况之一,需要Itrs
通过removedAt
回调精确通知所有迭代器。 -
资源管理:使用
WeakReference
和主动清理机制(doSomeSweeping
)避免废弃的迭代器对象造成内存泄漏或不必要的开销。 -
分离模式:允许迭代器在迭代接近尾声或无法精确跟踪时,“放弃”与队列的同步,简化逻辑并允许
Itrs
及时清理。 -
锁的精细使用:所有操作都在获取
ArrayBlockingQueue
的主锁lock
后进行,保证了Itrs
和Itr
内部状态修改的原子性和可见性。
Doug Lea 在注释中提到:“This adds a lot of complexity for the sake of correctly handling some uncommon operations...”,表明为了在特定场景下(如迭代过程中发生内部删除)迭代器的行为尽可能正确,付出了相当大的实现复杂性代价,体现了对并发数据结构正确性和健壮性的极致追求。