您好,登錄后才能下訂單哦!
Disruptor中怎么實現一個高性能隊列,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
import java.util.concurrent.ThreadFactory import com.lmax.disruptor.dsl.{Disruptor, ProducerType} import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy} object DisruptorTest { val disruptor = { val factory = new EventFactory[Event] { override def newInstance(): Event = Event(-1) } val threadFactory = new ThreadFactory(){ override def newThread(r: Runnable): Thread = new Thread(r) } val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy()) disruptor.handleEventsWith(TestHandler).`then`(ThenHandler) disruptor } val translator = new EventTranslatorOneArg[Event, Int]() { override def translateTo(event: Event, sequence: Long, arg: Int): Unit = { event.id = arg println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}") } } def main(args: Array[String]): Unit = { disruptor.start() (0 until 100).foreach { i => disruptor.publishEvent(translator, i) } disruptor.shutdown() } } case class Event(var id: Int) { override def toString: String = s"event: ${id}" } object TestHandler extends EventHandler[Event] { override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = { println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}") } } object ThenHandler extends EventHandler[Event] { override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = { println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}") } }
先看 Disruptor 構造方法
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); }
在看 RingBuffer.create, 最終通過 fill 方法 將 eventFactory.newInstance() 作為默認值,塞到 ringBuffer 里面
public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } } public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); } RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
首先看 disruptor.start(): 消費事件消息入口
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>(); public RingBuffer<T> start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; }
consumerRepository 類型由 disruptor.handleEventsWith(TestHandler) 初始化, 并構造事件消息處理鏈
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); }
回頭看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任務時,都會 new thread **但是 consumerRepository 的數量是有限的,所以 new thread 也沒啥問題
public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this( RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); } private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) { this.ringBuffer = ringBuffer; this.executor = executor; } @Override public void start(final java.util.concurrent.Executor executor){ //EventProcessor extends Runnable //executor = BasicExecutor executor.execute(eventprocessor); } public final class BatchEventProcessor<T> implements EventProcessor { @Override public void run() { if (running.compareAndSet(IDLE, RUNNING)) { sequenceBarrier.clearAlert(); notifyStart(); try { if (running.get() == RUNNING) { processEvents(); } } finally { notifyShutdown(); running.set(IDLE); } } else { if (running.get() == RUNNING) { throw new IllegalStateException("Thread is already running"); } else { earlyExit(); } } } } private void processEvents() { T event = null; long nextSequence = sequence.get() + 1L; while (true) { try { final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (running.get() != RUNNING) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } }
executor.execute 也就是 BasicExecutor.execute(eventHandler) 會異步的執行 eventHandler, 也就是調用 BatchEventProcessor.run 方法
問題來了,既然是異步執行,多個 eventHandler 是怎么按照順序去處理事件消息的?
我們看 processEvents 方法執行邏輯
先獲取 BatchEventProcessor.sequence 并 +1
通過 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 獲取到可用的 availableSequence
先看下 BlockingWaitStrategy.waitFor 的實現
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; if (cursorSequence.get() < sequence) { lock.lock(); try { while (cursorSequence.get() < sequence) { barrier.checkAlert(); processorNotifyCondition.await(); } } finally { lock.unlock(); } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; }
如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 則batchEventProcessor掛起等待 否則 就用 dependentSequence 作為 availableSequence 返回 然后 batchEventProcessor 會將 availableSequence 索引之前的數據一次性處理完,并更新自身的 sequence 索引值
dependentSequence 由 ProcessingSequenceBarrier 構造方法初始化
final class ProcessingSequenceBarrier implements SequenceBarrier { private final WaitStrategy waitStrategy; private final Sequence dependentSequence; private volatile boolean alerted = false; private final Sequence cursorSequence; private final Sequencer sequencer; ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { this.sequencer = sequencer; this.waitStrategy = waitStrategy; this.cursorSequence = cursorSequence; if (0 == dependentSequences.length) { dependentSequence = cursorSequence; } else { dependentSequence = new FixedSequenceGroup(dependentSequences); } } }
在 Disruptor.createEventProcessors 中的, 進行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)
createEventProcessors 僅會被 Disruptor.handleEventsWith 和 EventHandlerGroup.handleEventsWith
public class Disruptor<T> { public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); } } public class EventHandlerGroup<T> { private final Disruptor<T> disruptor; private final ConsumerRepository<T> consumerRepository; private final Sequence[] sequences; EventHandlerGroup(final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository, final Sequence[] sequences) { this.disruptor = disruptor; this.consumerRepository = consumerRepository; this.sequences = Arrays.copyOf(sequences, sequences.length); } public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return disruptor.createEventProcessors(sequences, handlers); } public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) { return handleEventsWith(handlers); } }
EventHandlerGroup 會拷貝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then
(ThenHandler) 通過 then 方法將 TestHandler 中的 sequence 傳遞給 ThenHandler 這樣 ThenHandler 就依賴了 TestHandler, ThenHandler 就會在 TestHandler 后執行
接著看 disruptor.publishEvent(translator, i)
就是往 ringBuffer 里面放數據,
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); } private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) { try { translator.translateTo(get(sequence), sequence, arg0); } finally { sequencer.publish(sequence); } } public E get(long sequence) { return elementAt(sequence); }
get(sequence) 根據 sequence [ringbuffer 索引] 獲取 ringbuffer 數組里的對象 translator 將其處理替換完后,ringbuffer 數組的的值將是新的值,publish 將會更新索引的標記位 waitStrategy.signalAllWhenBlocking() 會通知阻塞等待的消費者去繼續消費消息
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); @Override public void publish(long sequence) { cursor.set(sequence); waitStrategy.signalAllWhenBlocking(); }
流程理清楚了,我們看看 知識點
ringbuffer
內存使用率很高,不會造成內存碎片,幾乎沒有浪費。業務處理的同一時間,訪問的內存數據段集中。 可以更好的適應不同系統,取得較高的性能。內存的物理布局簡單單一,不太容易發生內存越界、懸空指針等 bug,出了問題也容易在內存級別分析調試。 做出來的系統容易保持健壯。
cpu cache
CPU 訪問內存時會等待,導致計算資源大量閑置,降低 CPU 整體吞吐量。 由于內存數據訪問的熱點集中性,在 CPU 和內存之間用較為快速而成本較高(相對于內存)的介質做一層緩存,就顯得性價比極高了
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。