您好,登錄后才能下訂單哦!
這篇文章主要介紹RocketMQ消費中Broker端處理邏輯的示例分析,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
1.Broker是如何處理消費流程的?
2.消費進度是如何流轉的?
說明:本文分析均為PUSH消費模式
本部分將消費的切分成三塊梳理:Broker消費處理流程概覽、查找消息流程、以及消息查詢結果處理流程。
小結:在拉取消息時會進行Broker和主題讀權限的判斷,實戰中若有必要可以封鎖Broker的拉取權限從而禁止從該broker進行消費;或者封鎖某主題的讀權限禁止消費組從該主題消費消息。
小結:如果需要從磁盤拉取消息則一次默認最多拉取8條,一次消息的消息大小最大為64K。如果從緩存中拉取默認最多32條,一次拉取的消息大小最大256K。使用tagcode會在查找消息前進行過濾,使用SQL92過濾再消息查找出來后進行過濾。
小結:建議開啟slaveReadEnable=true,當拉取的消息超過Broker內存40%時會從Slave節點消費,Master不必從磁盤重新讀取數據;transferMsgByHeap默認為true即消息先拉取到堆空間再返回到客戶端;如果設置為false則使用Netty#FileRegion,可用零字節拷貝不必再拷貝到堆內存提高性能。
//@1 順序消費/并發消費流程相同
//ConsumeMessageOrderlyService#processConsumeResult
//ConsumeMessageConcurrentlyService#processConsumeResult
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
//更新消費進度偏移量
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
@2 RemoteBrokerOffsetStore#updateOffset
AtomicLong offsetOld = this.offsetTable.get(mq);
MixAll.compareAndIncreaseOnly(offsetOld, offset);
@3 offsetTable存儲結構:key為MessageQueue value為消費的偏移量進度
ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>()
@4 定時同步消費進度
//持久化消息消費進度,默認5秒保存一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
@5 RemoteBrokerOffsetStore#persistAll
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet())
this.updateConsumeOffsetToBroker(mq, offset.get());
小結:PUSH消費中消費進度存儲在offsetTable中,定時任務每5秒鐘上報Broker一次。
//@1 ConsumerManageProcessor#processRequest#updateConsumerOffset
this.brokerController.getConsumerOffsetManager().commitOffset
//@2 ConsumerOffsetManager#commitOffset
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
Long storeOffset = map.put(queueId, offset);
//@3 消費進度緩存結構
//key=topic@group
//value=ConcurrentMap<Integer/* queueId*/, Long/*offset*/>>
offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
//@4 5秒鐘一次存儲消費進度
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//@5 consumerOffset.json文件格式
"zeus-package-mismatch-topic@autosort-packagelog":{0:9055300,1:9055157,2:9055304,3:9055232}
小結:Broker接到客戶端消費進度上報后更新緩存offsetTable,每隔5秒中定時任務將offsetTable消費進度存儲在磁盤文件consumerOffset.json中。
//@1 PullMessageProcessor#processRequest
if (storeOffsetEnable) {
//更新消費進度
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
小結:PUSH消費客戶端拉取消息后會實時更新消費的進度。
以上是“RocketMQ消費中Broker端處理邏輯的示例分析”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。