您好,登錄后才能下訂單哦!
這篇文章主要介紹“RocketMQ Broker怎么實現高可用高并發的消息中轉服務”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“RocketMQ Broker怎么實現高可用高并發的消息中轉服務”文章能幫助大家解決問題。
broker主要作用就是存儲消息。所以重點就放在它對于消息的處理上面。我提出幾個問題,后續看代碼解答。
broker啟動的時候是怎么向nameserv進行注冊的?
productor發送過來的消息是怎么儲存的?
comsumer是怎么在broker拉取數據的?
高可用怎么做的?broker掛了怎么辦,數據肯定要有備份的
注冊的時候,就是在啟動的時候,向所有的nameService注冊自己的信息。其中nameService的地址是可以在啟動的時候配置的。代碼在org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll。這里我省略了其他代碼
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean enableActingMaster, final boolean compressed, final Long heartbeatTimeoutMillis, final BrokerIdentity brokerIdentity) { final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>(); List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) { @Override public void run2() { try { RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body); if (result != null) { registerBrokerResultList.add(result); } LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr); } catch (Exception e) { LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) { LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills); } } catch (InterruptedException ignore) { } } return registerBrokerResultList; }
這里用了countDownLatch來判斷一下所有broker注冊完成是否超時,超時就打印一個warn。
具體可以看官網的文檔設計。我這里貼一部分內容。
消息存儲架構圖中主要有下面三個跟消息存儲相關的文件構成。
(1) CommitLog:消息主體以及元數據的存儲主體,存儲Producer端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G, 文件名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日志文件,當文件滿了,寫入下一個文件;
(2) ConsumeQueue:消息消費索引,引入的目的主要是提高消息消費的性能。ConsumeQueue作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結構
(3) IndexFile:IndexFile(索引文件)提供了一種可以通過key或時間區間來查詢消息的方法。Index文件的存儲位置是:$HOME/store/index/{fileName},文件名fileName是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故RocketMQ的索引文件其底層實現為hash索引。
具體請求是通過netty來處理的
NettyRemotingAbstract#processRequestCommand里面會根據請求code拿到具體的processor。
其中
SendMessageProcessor 負責處理 Producer 發送消息的請求;
PullMessageProcessor 負責處理 Consumer 消費消息的請求;
QueryMessageProcessor 負責處理按照消息 Key 等查詢消息的請求。
數據寫入主要是在DefaultMessageStore#asyncPutMessage里面
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { ...... topicQueueLock.lock(topicQueueKey); try { boolean needAssignOffset = true; if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable() && defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { needAssignOffset = false; } if (needAssignOffset) { defaultMessageStore.assignOffset(msg, getMessageNum(msg)); } PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); if (encodeResult != null) { return CompletableFuture.completedFuture(encodeResult); } msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer()); PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey); putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; // Here settings are stored timestamp, in order to ensure an orderly // global if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { msg.setStoreTimestamp(beginLockTimestamp); } if (null == mappedFile || mappedFile.isFull()) { // 首先獲取mappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); } // 寫入數據 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); switch (result.getStatus()) { case PUT_OK: onCommitLogAppend(msg, result, mappedFile); break; case END_OF_FILE: onCommitLogAppend(msg, result, mappedFile); unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); } result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) { onCommitLogAppend(msg, result, mappedFile); } break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); case UNKNOWN_ERROR: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); default: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); } elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); } } finally { topicQueueLock.unlock(topicQueueKey); } if (elapsedTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum()); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes()); // 刷盤策略 return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA); }
首先獲取mappedFile,可以理解就是commitLog文件的一個映射。創建mappedFile會同時提前創建兩個文件,避免了下次創建文件等待。
org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation
private boolean mmapOperation() { boolean isSuccess = false; AllocateRequest req = null; try { req = this.requestQueue.take(); AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath()); if (null == expectedRequest) { log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " " + req.getFileSize()); return true; } if (expectedRequest != req) { log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " " + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest); return true; } if (req.getMappedFile() == null) { long beginTime = System.currentTimeMillis(); MappedFile mappedFile; if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { try { mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } catch (RuntimeException e) { log.warn("Use default implementation."); mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } } else { mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize()); } long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime); if (elapsedTime > 10) { int queueSize = this.requestQueue.size(); log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize + " " + req.getFilePath() + " " + req.getFileSize()); } // pre write mappedFile if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig() .getMappedFileSizeCommitLog() && this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(), this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile()); } req.setMappedFile(mappedFile); this.hasException = false; isSuccess = true; } } catch (InterruptedException e) { log.warn(this.getServiceName() + " interrupted, possibly by shutdown."); this.hasException = true; return false; } catch (IOException e) { log.warn(this.getServiceName() + " service has exception. ", e); this.hasException = true; if (null != req) { requestQueue.offer(req); try { Thread.sleep(1); } catch (InterruptedException ignored) { } } } finally { if (req != null && isSuccess) req.getCountDownLatch().countDown(); } return true; }
這里會去初始化mapperFile
org.apache.rocketmq.store.logfile.DefaultMappedFile#init
private void init(final String fileName, final int fileSize) throws IOException { ...... try { this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { log.error("Failed to create file " + this.fileName, e); throw e; } catch (IOException e) { log.error("Failed to map file " + this.fileName, e); throw e; } finally { if (!ok && this.fileChannel != null) { this.fileChannel.close(); } } }
這里其實就是用java的map創建文件。
如果開啟了堆外對象池,會用writeBuffer來寫入數據。讀取文件還是用mappedByteBuffer。
@Override public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize); this.writeBuffer = transientStorePool.borrowBuffer(); this.transientStorePool = transientStorePool; }
在創建好maperFile后,還有個預熱的操作
public void warmMappedFile(FlushDiskType type, int pages) { this.mappedByteBufferAccessCountSinceLastSwap++; long beginTime = System.currentTimeMillis(); ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); int flush = 0; long time = System.currentTimeMillis(); //通過寫入 1G 的字節 0 來讓操作系統分配物理內存空間,如果沒有填充值,操作系統不會實際分配物理內存,防止在寫入消息時發生缺頁異常 for (int i = 0, j = 0; i < this.fileSize; i += DefaultMappedFile.OS_PAGE_SIZE, j++) { byteBuffer.put(i, (byte) 0); // force flush when flush disk type is sync if (type == FlushDiskType.SYNC_FLUSH) { if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) { flush = i; mappedByteBuffer.force(); } } // 這里就是每隔一段時間sleep一下,這樣讓其他線程有執行的機會,這其中也包括gc線程,讓gc線程有機會在循環的中途可以執行gc。避免很久才執行一次gc // prevent gc if (j % 1000 == 0) { log.info("j={}, costTime={}", j, System.currentTimeMillis() - time); time = System.currentTimeMillis(); try { Thread.sleep(0); } catch (InterruptedException e) { log.error("Interrupted", e); } } } // force flush when prepare load finished if (type == FlushDiskType.SYNC_FLUSH) { log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}", this.getFileName(), System.currentTimeMillis() - beginTime); mappedByteBuffer.force(); } log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(), System.currentTimeMillis() - beginTime); this.mlock(); }
因為通過 mmap 映射,只是建立了進程虛擬內存地址與物理內存地址之間的映射關系,并沒有將 Page Cache 加載至內存。讀寫數據時如果沒有命中寫 Page Cache 則發生缺頁中斷,從磁盤重新加載數據至內存,這樣會影響讀寫性能。為了防止缺頁異常,阻止操作系統將相關的內存頁調度到交換空間(swap space),RocketMQ 通過對文件預熱,將對應page cache提前加載到內存中。
然后中間循環會sleep一下,就是讓gc可以運行。我復制一下chatGpt的回答:
這段代碼中的if (j % 1000 == 0)語句是為了防止頻繁的GC。在每次循環中,當j的值是1000的倍數時,會執行一次Thread.sleep(0),這個操作會讓當前線程暫停一小段時間,從而讓JVM有機會回收一些不再使用的對象。這樣做的目的是為了減少GC的頻率,從而提高程序的性能。
最后還有一個鎖定
public void mlock() { final long beginTime = System.currentTimeMillis(); final long address = ((DirectBuffer) (this.mappedByteBuffer)).address(); Pointer pointer = new Pointer(address); { // 通過系統調用 mlock 鎖定該文件的 Page Cache,防止其被交換到 swap 空間 int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize)); log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); } { // 通過系統調用 madvise 給操作系統建議,說明該文件在不久的將來要被訪問 int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED); log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); } }
然后就是對mapperFile進行寫入消息。就是拿著buffer寫入具體的數據。
接著就是處理刷盤方式和高可用。
org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA
private CompletableFuture<PutMessageResult> handleDiskFlushAndHA(PutMessageResult putMessageResult, MessageExt messageExt, int needAckNums, boolean needHandleHA) { // 處理刷盤機制 CompletableFuture<PutMessageStatus> flushResultFuture = handleDiskFlush(putMessageResult.getAppendMessageResult(), messageExt); CompletableFuture<PutMessageStatus> replicaResultFuture; if (!needHandleHA) { replicaResultFuture = CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } else { // 處理HA replicaResultFuture = handleHA(putMessageResult.getAppendMessageResult(), putMessageResult, needAckNums); } return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(flushStatus); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } return putMessageResult; }); }
處理刷盤
org.apache.rocketmq.store.CommitLog.DefaultFlushManager#handleDiskFlush
@Override public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); flushDiskWatcher.add(request); service.putRequest(request); return request.future(); } else { service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } // Asynchronous flush else { if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }
根據配置的同步刷盤或者異步刷盤的機制來決定具體的刷盤策略。
處理高可用
org.apache.rocketmq.store.CommitLog#handleHA
private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult, int needAckNums) { if (needAckNums >= 0 && needAckNums <= 1) { return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } HAService haService = this.defaultMessageStore.getHaService(); long nextOffset = result.getWroteOffset() + result.getWroteBytes(); // Wait enough acks from different slaves GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums); haService.putRequest(request); haService.getWaitNotifyObject().wakeupAll(); return request.future(); }
其實后臺一直有一個同步線程去處理消息同步的事情,只要比較一下master和salve的commitLog的offset就可以比較出來差多少數據了。所以把slave沒有的數據同步過去就可以了,這塊后面再寫一篇文章細講。
那還有一個問題,consumeQueue和indexFile是怎么處理的呢?
ReputMessageService里面會去讀取commitLog的數據,寫入到comsunerQueue和IndexFile
根據各個dispatch,分別處理兩個文件。這里就不細講了。
ConsumeQueue的處理是在這里面
org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch
文件的名字其實就是topic/queueid。寫入的數據是
this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode);
其實就是commitLog的一個offset,根據這個值就可以拿到具體的消息了。
org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex
indexFile就是寫入這些數據
this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
包括key的hash值,還有物理偏移,還有時間等信息。首先文件是按照每個毫秒創建的,所以天然就是按照時間順序排列。根據key查詢的話,寫入文件的位置是根據key的hash來的,所以可以馬上知道是哪個位置。
好了,到這里數據存儲就差不多了。來看看怎么讀消息的
拉取消息有自己的處理器:
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest
里面有很多額外的邏輯,具體在下面的方法中:
org.apache.rocketmq.store.DefaultMessageStore#getMessage
消息讀取很簡單,就是從根據topic和queueId去consumeQueue里面讀,消費者知道上次拉取到了哪里,所以就直接根據consumeQueue的offset去讀內容,consumeQueue里面存的是commitLog的offset和size,根據這兩個值就可以從commitLog里面拿到消息,返回。然后更新下次的offset,返回給productor。
org.apache.rocketmq.store.DefaultMessageStore#queryMessage
主要是查的indexFile,前面提到indexFile就是按照時間來創建文件的,所以先按照時間篩選出符合條件的indexFile,然后根據key的hash,找到文件對應的寫入位置,因為對應的hash會有沖突,就一個個遍歷,找到所有hash值相等的數據。然后再根據indexFile記錄的offset,去commitLog里面去查消息。
關于“RocketMQ Broker怎么實現高可用高并發的消息中轉服務”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。