当前位置: 首页 > news >正文

Disruptor—3.核心源码实现分析一

大纲

1.Disruptor的生产者源码分析

2.Disruptor的消费者源码分析

3.Disruptor的WaitStrategy等待策略分析

4.Disruptor的高性能原因

5.Disruptor高性能之数据结构(内存预加载机制)

6.Disruptor高性能之内核(使用单线程写)

7.Disruptor高性能之系统内存优化(内存屏障)

8.Disruptor高性能之系统缓存优化(消除伪共享)

9.Disruptor高性能之序号获取优化(自旋 + CAS)

1.Disruptor的生产者源码分析

(1)通过Sequence序号发布消息

(2)通过Translator事件转换器发布消息

(1)通过Sequence序号发布消息

生产者可以先从RingBuffer中获取一个可用的Sequence序号,然后再根据该Sequence序号从RingBuffer的环形数组中获取对应的元素,接着对该元素进行赋值替换,最后调用RingBuffer的publish()方法设置当前生产者的Sequence序号来完成事件消息的发布。

//注意:这里使用的版本是3.4.4
//单生产者单消费者的使用示例
public class Main {public static void main(String[] args) {//参数准备OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());//参数一:eventFactory,消息(Event)工厂对象//参数二:ringBufferSize,容器的长度//参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler//参数四:ProducerType,单生产者还是多生产者//参数五:waitStrategy,等待策略//1.实例化Disruptor对象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());//2.添加Event处理器,用于处理事件//也就是构建Disruptor与消费者的一个关联关系disruptor.handleEventsWith(new OrderEventHandler());//3.启动Disruptordisruptor.start();//4.获取实际存储数据的容器: RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long i = 0; i < 5; i++) {bb.putLong(0, i);//向容器中投递数据producer.sendData(bb);}disruptor.shutdown();executor.shutdown();}
}public class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(ByteBuffer data) {//1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号long sequence = ringBuffer.next();try {//2.根据这个序号, 找到具体的"OrderEvent"元素//注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"OrderEvent event = ringBuffer.get(sequence);//3.进行实际的赋值处理event.setValue(data.getLong(0));} finally {//4.提交发布操作ringBuffer.publish(sequence);}}
}public class OrderEventHandler implements EventHandler<OrderEvent> {public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(1000);System.err.println("消费者: " + event.getValue());}
}
//多生产者多消费者的使用示例
public class Main {public static void main(String[] args) throws InterruptedException {//1.创建RingBufferRingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI,//多生产者new EventFactory<Order>() {public Order newInstance() {return new Order();}},1024 * 1024,new YieldingWaitStrategy());//2.通过ringBuffer创建一个屏障SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();//3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口Consumer[] consumers = new Consumer[10];for (int i = 0; i < consumers.length; i++) {consumers[i] = new Consumer("C" + i);}//4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPoolWorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,sequenceBarrier,new EventExceptionHandler(),consumers);//5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中ringBuffer.addGatingSequences(workerPool.getWorkerSequences());//6.启动workerPoolworkerPool.start(Executors.newFixedThreadPool(5));final CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < 100; i++) {final Producer producer = new Producer(ringBuffer);new Thread(new Runnable() {public void run() {try {latch.await();} catch (Exception e) {e.printStackTrace();}for (int j = 0; j < 100; j++) {producer.sendData(UUID.randomUUID().toString());}}}).start();}Thread.sleep(2000);System.err.println("----------线程创建完毕,开始生产数据----------");latch.countDown();Thread.sleep(10000);System.err.println("任务总数:" + consumers[2].getCount());}
}public class Producer {private RingBuffer<Order> ringBuffer;public Producer(RingBuffer<Order> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(String uuid) {//1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号long sequence = ringBuffer.next();try {//2.根据这个序号, 找到具体的"Order"元素//注意:此时获取的Order对象是一个没有被赋值的"空对象"Order order = ringBuffer.get(sequence);//3.进行实际的赋值处理order.setId(uuid);} finally {//4.提交发布操作ringBuffer.publish(sequence);}}
}public class Consumer implements WorkHandler<Order> {private static AtomicInteger count = new AtomicInteger(0);private String consumerId;private Random random = new Random();public Consumer(String consumerId) {this.consumerId = consumerId;}public void onEvent(Order event) throws Exception {Thread.sleep(1 * random.nextInt(5));System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId());count.incrementAndGet();}public int getCount() {return count.get();}
}

其中,RingBuffer的publish(sequence)方法会调用Sequencer接口的publish()方法来设置当前生产者的Sequence序号。

abstract class RingBufferPad {protected long p1, p2, p3, p4, p5, p6, p7;
}abstract class RingBufferFields<E> extends RingBufferPad {...private static final Unsafe UNSAFE = Util.getUnsafe();private final long indexMask;//环形数组存储事件消息private final Object[] entries;protected final int bufferSize;//RingBuffer的sequencer属性代表了当前线程对应的生产者protected final Sequencer sequencer;RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {this.sequencer = sequencer;this.bufferSize = sequencer.getBufferSize();if (bufferSize < 1) {throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1) {throw new IllegalArgumentException("bufferSize must be a power of 2");}this.indexMask = bufferSize - 1;//初始化数组this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];//内存预加载fill(eventFactory);}private void fill(EventFactory<E> eventFactory) {for (int i = 0; i < bufferSize; i++) {entries[BUFFER_PAD + i] = eventFactory.newInstance();}}protected final E elementAt(long sequence) {return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}...
}public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {protected long p1, p2, p3, p4, p5, p6, p7;...//Increment and return the next sequence for the ring buffer.  //Calls of this method should ensure that they always publish the sequence afterward.  //E.g.//long sequence = ringBuffer.next();//try {//    Event e = ringBuffer.get(sequence);//    //Do some work with the event.//} finally {//    ringBuffer.publish(sequence);//}//@return The next sequence to publish to.//@see RingBuffer#publish(long)//@see RingBuffer#get(long)@Overridepublic long next() {return sequencer.next();}//Publish the specified sequence.//This action marks this particular message as being available to be read.//@param sequence the sequence to publish.@Overridepublic void publish(long sequence) {sequencer.publish(sequence);}//Get the event for a given sequence in the RingBuffer.//This call has 2 uses.  //Firstly use this call when publishing to a ring buffer.//After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long).//Secondly use this call when consuming data from the ring buffer.  //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method.//@param sequence for the event//@return the event for the given sequence@Overridepublic E get(long sequence) {//调用父类RingBufferFields的elementAt()方法return elementAt(sequence);}...
}

RingBuffer的sequencer属性会在创建RingBuffer对象时传入,而创建RingBuffer对象的时机则是在初始化Disruptor的时候。

在Disruptor的构造方法中,会调用RingBuffer的create()方法,RingBuffer的create()方法会根据不同的生产者类型来初始化sequencer属性。

由生产者线程通过new创建的Sequencer接口实现类的实例就是一个生产者。单生产者的线程执行上面的main()方法时,会创建一个单生产者Sequencer实例来代表生产者。多生产者的线程执行如下的main()方法时,会创建一个多生产者Sequencer实例来代表生产者。

public class Disruptor<T> {private final RingBuffer<T> ringBuffer;private final Executor executor;private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();private final AtomicBoolean started = new AtomicBoolean(false);private ExceptionHandler<? super T> exceptionHandler;...//Create a new Disruptor.//@param eventFactory   the factory to create events in the ring buffer.//@param ringBufferSize the size of the ring buffer, must be power of 2.//@param executor       an Executor to execute event processors.//@param producerType   the claim strategy to use for the ring buffer.//@param waitStrategy   the wait strategy to use for the ring buffer.public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) {this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);}private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {this.ringBuffer = ringBuffer;this.executor = executor;}...
}public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {protected long p1, p2, p3, p4, p5, p6, p7;...//Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)//@param producerType producer type to use ProducerType.//@param factory used to create events within the ring buffer.//@param bufferSize number of elements to create within the ring buffer.//@param waitStrategy used to determine how to wait for new elements to become available.public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {switch (producerType) {case SINGLE://单生产者模式下的当前生产者是一个SingleProducerSequencer实例return createSingleProducer(factory, bufferSize, waitStrategy);case MULTI://多生产者模式下的当前生产者是一个MultiProducerSequencer实例return createMultiProducer(factory, bufferSize, waitStrategy);default:throw new IllegalStateException(producerType.toString());}}//Create a new single producer RingBuffer with the specified wait strategy.//@param <E> Class of the event stored in the ring buffer.//@param factory      used to create the events within the ring buffer.//@param bufferSize   number of elements to create within the ring buffer.//@param waitStrategy used to determine how to wait for new elements to become available.//@return a constructed ring buffer.public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);return new RingBuffer<E>(factory, sequencer);}//Create a new multiple producer RingBuffer with the specified wait strategy.//@param <E> Class of the event stored in the ring buffer.//@param factory      used to create the events within the ring buffer.//@param bufferSize   number of elements to create within the ring buffer.//@param waitStrategy used to determine how to wait for new elements to become available.//@return a constructed ring buffer.public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);return new RingBuffer<E>(factory, sequencer);}//Construct a RingBuffer with the full option set.//@param eventFactory to newInstance entries for filling the RingBuffer//@param sequencer    sequencer to handle the ordering of events moving through the RingBuffer.RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {super(eventFactory, sequencer);}...
}abstract class RingBufferPad {protected long p1, p2, p3, p4, p5, p6, p7;
}abstract class RingBufferFields<E> extends RingBufferPad {...private final long indexMask;//环形数组存储事件消息private final Object[] entries;protected final int bufferSize;//RingBuffer的sequencer属性代表了当前线程对应的生产者protected final Sequencer sequencer;RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {this.sequencer = sequencer;this.bufferSize = sequencer.getBufferSize();if (bufferSize < 1) {throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1) {throw new IllegalArgumentException("bufferSize must be a power of 2");}this.indexMask = bufferSize - 1;//初始化数组this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];//内存预加载fill(eventFactory);}private void fill(EventFactory<E> eventFactory) {for (int i = 0; i < bufferSize; i++) {entries[BUFFER_PAD + i] = eventFactory.newInstance();}}...
}

SingleProducerSequencer的publish()方法在发布事件消息时,首先会设置当前生产者的Sequence,然后会通过等待策略通知阻塞的消费者。

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {...//Publish the specified sequence.//This action marks this particular message as being available to be read.//@param sequence the sequence to publish.@Overridepublic void publish(long sequence) {sequencer.publish(sequence);}...
}public abstract class AbstractSequencer implements Sequencer {private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");//环形数组的大小protected final int bufferSize;//等待策略protected final WaitStrategy waitStrategy;//当前生产者的进度protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);//每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)//这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,//由RingBuffer的addGatingSequences()方法进行添加protected volatile Sequence[] gatingSequences = new Sequence[0];...//Create with the specified buffer size and wait strategy.//@param bufferSize The total number of entries, must be a positive power of 2.//@param waitStrategypublic AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {if (bufferSize < 1) {throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1) {throw new IllegalArgumentException("bufferSize must be a power of 2");}this.bufferSize = bufferSize;this.waitStrategy = waitStrategy;}...
}abstract class SingleProducerSequencerPad extends AbstractSequencer {protected long p1, p2, p3, p4, p5, p6, p7;public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) {super(bufferSize, waitStrategy);}
}abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad {public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) {super(bufferSize, waitStrategy);}//表示生产者的当前序号,值为-1protected long nextValue = Sequence.INITIAL_VALUE;//表示消费者的最小序号,值为-1protected long cachedValue = Sequence.INITIAL_VALUE;
}public final class SingleProducerSequencer extends SingleProducerSequencerFields {protected long p1, p2, p3, p4, p5, p6, p7;//Construct a Sequencer with the selected wait strategy and buffer size.//@param bufferSize   the size of the buffer that this will sequence over.//@param waitStrategy for those waiting on sequences.public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {super(bufferSize, waitStrategy);}@Overridepublic void publish(long sequence) {//设置当前生产者的进度,cursor代表了当前生产者的Sequencecursor.set(sequence);//通过等待策略通知阻塞的消费者waitStrategy.signalAllWhenBlocking();}@Overridepublic long next() {return next(1);}@Overridepublic long next(int n) {if (n < 1) {throw new IllegalArgumentException("n must be > 0");}long nextValue = this.nextValue;long nextSequence = nextValue + n;long wrapPoint = nextSequence - bufferSize;long cachedGatingSequence = this.cachedValue;if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {long minSequence;while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {LockSupport.parkNanos(1L); }this.cachedValue = minSequence;}this.nextValue = nextSequence;return nextSequence;}...
}class LhsPadding {protected long p1, p2, p3, p4, p5, p6, p7;
}class Value extends LhsPadding {protected volatile long value;
}class RhsPadding extends Value {protected long p9, p10, p11, p12, p13, p14, p15;
}//Concurrent sequence class used for tracking the progress of the ring buffer and event processors.  
//Support a number of concurrent operations including CAS and order writes.
//Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field.
public class Sequence extends RhsPadding {static final long INITIAL_VALUE = -1L;private static final Unsafe UNSAFE;private static final long VALUE_OFFSET;static {UNSAFE = Util.getUnsafe();VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));}//Create a sequence initialised to -1.public Sequence() {this(INITIAL_VALUE);}//Create a sequence with a specified initial value.//@param initialValue The initial value for this sequence.public Sequence(final long initialValue) {UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);}//Perform a volatile read of this sequence's value.//@return The current value of the sequence.public long get() {return value;}//Perform an ordered write of this sequence.  //The intent is a Store/Store barrier between this write and any previous store.//@param value The new value for the sequence.public void set(final long value) {UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);}//Performs a volatile write of this sequence.  //The intent is a Store/Store barrier between this write and //any previous write and a Store/Load barrier between this write and //any subsequent volatile read.//@param value The new value for the sequence.public void setVolatile(final long value) {UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);}//Perform a compare and set operation on the sequence.//@param expectedValue The expected current value.//@param newValue The value to update to.//@return true if the operation succeeds, false otherwise.public boolean compareAndSet(final long expectedValue, final long newValue) {return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);}//Atomically increment the sequence by one.//@return The value after the incrementpublic long incrementAndGet() {return addAndGet(1L);}//Atomically add the supplied value.//@param increment The value to add to the sequence.//@return The value after the increment.public long addAndGet(final long increment) {long currentValue;long newValue;do {currentValue = get();newValue = currentValue + increment;} while (!compareAndSet(currentValue, newValue));return newValue;}@Overridepublic String toString() {return Long.toString(get());}
}

MultiProducerSequencer的publish()方法在发布事件消息时,则会通过UnSafe设置sequence在int数组中对应元素的值。

public final class MultiProducerSequencer extends AbstractSequencer {private static final Unsafe UNSAFE = Util.getUnsafe();private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);private final int[] availableBuffer;private final int indexMask;private final int indexShift;//Construct a Sequencer with the selected wait strategy and buffer size.//@param bufferSize   the size of the buffer that this will sequence over.//@param waitStrategy for those waiting on sequences.public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {super(bufferSize, waitStrategy);availableBuffer = new int[bufferSize];indexMask = bufferSize - 1;indexShift = Util.log2(bufferSize);initialiseAvailableBuffer();}private void initialiseAvailableBuffer() {for (int i = availableBuffer.length - 1; i != 0; i--) {setAvailableBufferValue(i, -1);}setAvailableBufferValue(0, -1);}private void setAvailableBufferValue(int index, int flag) {long bufferAddress = (index * SCALE) + BASE;UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);}@Overridepublic void publish(final long sequence) {setAvailable(sequence);waitStrategy.signalAllWhenBlocking();}//The below methods work on the availableBuffer flag.//The prime reason is to avoid a shared sequence object between publisher threads.//(Keeping single pointers tracking start and end would require coordination between the threads).//--  Firstly we have the constraint that the delta between the cursor and minimum gating sequence //will never be larger than the buffer size (the code in next/tryNext in the Sequence takes care of that).//-- Given that; take the sequence value and mask off the lower portion of the sequence //as the index into the buffer (indexMask). (aka modulo operator)//-- The upper portion of the sequence becomes the value to check for availability.//ie: it tells us how many times around the ring buffer we've been (aka division)//-- Because we can't wrap without the gating sequences moving forward //(i.e. the minimum gating sequence is effectively our last available position in the buffer), //when we have new data and successfully claimed a slot we can simply write over the top.private void setAvailable(final long sequence) {setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));}private int calculateIndex(final long sequence) {return ((int) sequence) & indexMask;}private int calculateAvailabilityFlag(final long sequence) {return (int) (sequence >>> indexShift);}@Overridepublic long next() {return next(1);}@Overridepublic long next(int n) {if (n < 1) {throw new IllegalArgumentException("n must be > 0");}long current;long next;do {current = cursor.get();next = current + n;long wrapPoint = next - bufferSize;long cachedGatingSequence = gatingSequenceCache.get();if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {long gatingSequence = Util.getMinimumSequence(gatingSequences, current);if (wrapPoint > gatingSequence) {LockSupport.parkNanos(1); continue;}gatingSequenceCache.set(gatingSequence);} else if (cursor.compareAndSet(current, next)) {break;}} while (true);return next;}...
}

(2)通过Translator事件转换器发布消息

生产者还可以直接调用RingBuffer的tryPublishEvent()方法来完成发布事件消息到RingBuffer。该方法首先会调用Sequencer接口的tryNext()方法获取sequence序号,然后根据该sequence序号从RingBuffer的环形数组中获取对应的元素,接着再调用RingBuffer的translateAndPublish()方法将事件消息赋值替换到该元素中,最后调用Sequencer接口的publish()方法设置当前生产者的sequence序号来完成事件消息的发布。

abstract class RingBufferPad {protected long p1, p2, p3, p4, p5, p6, p7;
}abstract class RingBufferFields<E> extends RingBufferPad {...private static final Unsafe UNSAFE = Util.getUnsafe();private final long indexMask;//环形数组存储事件消息private final Object[] entries;protected final int bufferSize;//RingBuffer的sequencer属性代表了当前线程对应的生产者protected final Sequencer sequencer;RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {this.sequencer = sequencer;this.bufferSize = sequencer.getBufferSize();if (bufferSize < 1) {throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1) {throw new IllegalArgumentException("bufferSize must be a power of 2");}this.indexMask = bufferSize - 1;//初始化数组this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];//内存预加载fill(eventFactory);}private void fill(EventFactory<E> eventFactory) {for (int i = 0; i < bufferSize; i++) {entries[BUFFER_PAD + i] = eventFactory.newInstance();}}protected final E elementAt(long sequence) {return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}...
}public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {//值为-1public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;protected long p1, p2, p3, p4, p5, p6, p7;//Construct a RingBuffer with the full option set.//@param eventFactory to newInstance entries for filling the RingBuffer//@param sequencer    sequencer to handle the ordering of events moving through the RingBuffer.RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {super(eventFactory, sequencer);}@Overridepublic boolean tryPublishEvent(EventTranslator<E> translator) {try {final long sequence = sequencer.tryNext();translateAndPublish(translator, sequence);return true;} catch (InsufficientCapacityException e) {return false;}}private void translateAndPublish(EventTranslator<E> translator, long sequence) {try {translator.translateTo(get(sequence), sequence);} finally {sequencer.publish(sequence);}}//Get the event for a given sequence in the RingBuffer.//This call has 2 uses.  //Firstly use this call when publishing to a ring buffer.//After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long).//Secondly use this call when consuming data from the ring buffer.  //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method.//@param sequence for the event//@return the event for the given sequence@Overridepublic E get(long sequence) {//调用父类RingBufferFields的elementAt()方法return elementAt(sequence);}...
}public abstract class AbstractSequencer implements Sequencer {private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");//环形数组的大小protected final int bufferSize;//等待策略protected final WaitStrategy waitStrategy;//当前生产者的进度protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);//每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)//这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,//由RingBuffer的addGatingSequences()方法进行添加protected volatile Sequence[] gatingSequences = new Sequence[0];...//Create with the specified buffer size and wait strategy.//@param bufferSize The total number of entries, must be a positive power of 2.//@param waitStrategypublic AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {if (bufferSize < 1) {throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1) {throw new IllegalArgumentException("bufferSize must be a power of 2");}this.bufferSize = bufferSize;this.waitStrategy = waitStrategy;}...
}public final class SingleProducerSequencer extends SingleProducerSequencerFields {protected long p1, p2, p3, p4, p5, p6, p7;//Construct a Sequencer with the selected wait strategy and buffer size.//@param bufferSize   the size of the buffer that this will sequence over.//@param waitStrategy for those waiting on sequences.public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {super(bufferSize, waitStrategy);}...@Overridepublic long tryNext() throws InsufficientCapacityException {return tryNext(1);}@Overridepublic long tryNext(int n) throws InsufficientCapacityException {if (n < 1) {throw new IllegalArgumentException("n must be > 0");}if (!hasAvailableCapacity(n, true)) {throw InsufficientCapacityException.INSTANCE;}long nextSequence = this.nextValue += n;return nextSequence;}private boolean hasAvailableCapacity(int requiredCapacity, boolean doStore) {long nextValue = this.nextValue;long wrapPoint = (nextValue + requiredCapacity) - bufferSize;long cachedGatingSequence = this.cachedValue;if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {if (doStore) {cursor.setVolatile(nextValue);//StoreLoad fence}long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);this.cachedValue = minSequence;if (wrapPoint > minSequence) {return false;}}return true;}@Overridepublic void publish(long sequence) {//设置当前生产者的sequencecursor.set(sequence);//通过等待策略通知阻塞的消费者waitStrategy.signalAllWhenBlocking();}...
}abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad {SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) {super(bufferSize, waitStrategy);}//表示生产者的当前序号,值为-1protected long nextValue = Sequence.INITIAL_VALUE;//表示消费者的最小序号,值为-1protected long cachedValue = Sequence.INITIAL_VALUE;
}abstract class SingleProducerSequencerPad extends AbstractSequencer {protected long p1, p2, p3, p4, p5, p6, p7;SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) {super(bufferSize, waitStrategy);}
}public final class MultiProducerSequencer extends AbstractSequencer {...@Overridepublic long tryNext() throws InsufficientCapacityException {return tryNext(1);}@Overridepublic long tryNext(int n) throws InsufficientCapacityException {if (n < 1) {throw new IllegalArgumentException("n must be > 0");}long current;long next;do {current = cursor.get();next = current + n;if (!hasAvailableCapacity(gatingSequences, n, current)) {throw InsufficientCapacityException.INSTANCE;}} while (!cursor.compareAndSet(current, next));return next;}private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) {long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;long cachedGatingSequence = gatingSequenceCache.get();if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) {long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);gatingSequenceCache.set(minSequence);if (wrapPoint > minSequence) {return false;}}return true;}@Overridepublic void publish(final long sequence) {setAvailable(sequence);waitStrategy.signalAllWhenBlocking();}...
}//Implementations translate (write) data representations into events claimed from the RingBuffer.
//When publishing to the RingBuffer, provide an EventTranslator. 
//The RingBuffer will select the next available event by sequence and provide it to the EventTranslator (which should update the event), 
//before publishing the sequence update.
//@param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
public interface EventTranslator<T> {//Translate a data representation into fields set in given event//@param event    into which the data should be translated.//@param sequence that is assigned to event.void translateTo(T event, long sequence);
}

http://www.xdnf.cn/news/638623.html

相关文章:

  • 黑马点评-分布式锁Lua脚本
  • 在线热更新 Upstream全面掌握 ngx_http_upstream_conf_module
  • 华为OD机试真题——字符串加密 (2025B卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • HTTP 和 HTTPS 的区别
  • 量子力学:量子力学为什么不属于经典物理学的范畴?
  • 【面板数据】上市公司外资持股数据集(2005-2023年)
  • 临床研究统计分析核心概念解析
  • 【MATLAB代码】主动声纳多路径目标测距与定位,测距使用互相关,频率、采样率可调、声速可调,定位使用三边法|订阅专栏后可直接查看源代码
  • C++学习之STL学习:string类常用接口的模拟实现
  • 大语言模型的完整训练周期从0到1的体系化拆解
  • 基于Qt的app开发第十一天
  • C语言指针详解
  • 湖北理元理律师事务所债务优化方案:让还款与生活平衡的艺术
  • [项目总结] 基于Docker与Nginx对项目进行部署
  • 思考:chmod u+x等价于chmod u=x吗
  • baseParse 有参数可以处理重复属性的逻辑吗
  • 题目 3326: 蓝桥杯2025年第十六届省赛真题-最短距离
  • 医学写作人才管理策略
  • 如何提高用例的覆盖率,减少漏测
  • Java多线程JUC
  • 三重天理论
  • 【Simulink】IEEE5/IEEE9/IEEE14/IEEE30/IEEE33/IEEE39仿真模型
  • 【Stock】日本蜡烛图技术总结(1)——反转形态
  • 【PhysUnits】13 减法操作(sub.rs)
  • setup.py Pip wheel
  • MySQL权限管理:层级化作用域、权限分类、操作命令
  • 基于大模型的大肠癌全流程预测与诊疗方案研究报告
  • Qt环境的搭建
  • 互联网大厂Java求职面试:短视频平台大规模实时互动系统架构设计
  • [论文品鉴] DeepSeek V3 最新论文 之 MTP