您好,登錄后才能下訂單哦!
Disruptor是一個高性能的異步處理框架,或者可以認為是最快的消息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現
性能遠遠高于傳統的BlockingQueue容器
Disruptor使用觀察者模式,主動將消息發送給消費者,而不是等消費者從隊列中取,在無鎖的情況下, 實現queue(環形, RingBuffer)的并發操作, 性能遠高于BlockingQueue
環形數組結構
為了避免垃圾回收,使用數組,數組對處理器的緩存機制更加友好
數組長度為 2^n,通過位運算,加快定位速度,下標采用遞增的方式,不用擔心索引溢出
無鎖設計
每個生產者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之后,直接在該位置寫入或者讀取數據
pom
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.1</version>
</dependency>
LongEvent
// 聲明一個Event來包含需要傳遞的數據
public class LongEvent {
private Long value;
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
LongEventFactory
// Event工廠
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
LongEventHandler
// 事件消費者
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("消費者:"+event.getValue());
}
}
LongEventProducer
public class LongEventProducer {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 獲取事件隊列下標位置
long sequence = ringBuffer.next();
try {
// 取出空隊列
LongEvent longEvent = ringBuffer.get(sequence);
// 賦值
longEvent.setValue(byteBuffer.getLong(0));
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("生產者發送數據。。。");
// 發送數據
ringBuffer.publish(sequence);
}
}
}
Main
public class Main {
public static void main(String[] args) {
// 創建可緩存線程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 創建工廠
EventFactory eventFactory = new LongEventFactory();
// 創建ringBufferSize
int ringBufferSize = 1024 * 1024;
// 創建disruptor
// MULTI表示可以多個生產者
Disruptor<LongEvent> longEventDisruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executorService, ProducerType.MULTI, new YieldingWaitStrategy());
// 注冊消費者
longEventDisruptor.handleEventsWith(new LongEventHandler());
// 啟動
longEventDisruptor.start();
// 創建RingBuffer容器
RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();
// 創建生產者
LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);
// 指定緩沖區大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 0; i < 100; i++) {
byteBuffer.putLong(0, i);
longEventProducer.onData(byteBuffer);
}
executorService.shutdown();
longEventDisruptor.shutdown();
}
}
它是一個環(首尾相接的環),作用是存儲數據,實現不同線程之間的數據傳輸
RingBuffer 每塊區是擁有一個序號的,這個序號指向環形數組結構的下一個可用元素
隨著不斷地寫進了填充這個圓環,這個指針序號會不斷地遞增,直到繞過這個環
如果圓環滿了,它會將金數據覆蓋,如上圖:現在12的區域的下個區域目前是3,如果有新的數據到來,那么指針往下移的時候就會把區域3的數據給覆蓋變成13,框架提供了一系列幫助我們平行消費的監控,會很好的控制生產者和消費者之間的速度,從而達到生產和消費之間的平衡
RingBuffer 為什么效率高?
采用數組,數組支持索引訪問
數組的內存分配是預先加載的,一但指定大小創建后,就一直存在,這也意味著不需要花大量的時間做垃圾回收,而阻塞隊列采用鏈表實現,需要不斷的刪除、創建節點
Sequence:序號,聲明一個序號,用于跟蹤 ringbuffer 中任務的變化和消費者的消費情況
WaitStrategy:有多種實現,用以表示當無可消費事件時,消費者的等待策略
Event:消費事件
EventProcessor:事件處理器,監聽 RingBuffer 的事件,并消費可用事件,從 RingBuffer 讀取的事件會交由實際的生產者實現類來消費,它會一直偵聽下一個可用的序號,直到該序號對應的事件已經準備好
EventHandler:業務處理器,是實際消費者的接口,完成具體的業務邏輯實現,第三方實現該接口,代表著消費者
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。