您好,登錄后才能下訂單哦!
這篇文章主要介紹“RocketMQ延遲消息的實現方法”,在日常操作中,相信很多人在RocketMQ延遲消息的實現方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RocketMQ延遲消息的實現方法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
延時消息即消息發送后并不立即對消費者可見,而是在用戶指定的時間投遞給消費者。比如我們現在發送一條延時30秒的消息,消息發送后立即發送給服務器,但是服務器在30秒后才將該消息交給消費者。
RocketMQ通過配置的延遲級別延遲消息投遞到消費者,其中不同的延遲級別對應不同的延遲時間,可配置,默認的延遲級別有18種,分別是1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,支持時間單位 s 秒 m分鐘 h小時 d天。
源碼 MessageStoreConfig.java 是定義如下:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
可以在brocker配置 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,自定義其時間級別。
前提:先啟動消費者等待消息的發送,先發送消息,消費者啟動需要時間,影響測試結果。
public class DelayProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer_test"); producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); for (int i = 0; i < 10; i++) { try { //構建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("延遲消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //延時的級別為3 對應的時間為10s 就是發送后延時10S在把消息投遞出去 msg.setDelayTimeLevel(3); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sd.format(new Date())+" == "+sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
查看結果:
public class DelayConsumer { public static void main(String[] args) { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_delay"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); System.out.println("接收時間 : "+ sd.format(new Date()) +" == MessageBody: "+ msgbody);//輸出消息內容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功 } }); consumer.start(); System.out.println("DelayConsumer===啟動成功!"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
查看結果:
查看其消息投遞的核心方法org.apache.rocketmq.store.CommitLog.putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) { //設置消息存儲到文件中的時間 msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery消息的延遲級別是否大于0 if (msg.getDelayTimeLevel() > 0) { //如果消息的延遲級別大于最大的延遲級別則置為最大延遲級別 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } //將消息主題設置為SCHEDULE_TOPIC_XXXX topic = ScheduleMessageService.SCHEDULE_TOPIC; //將消息隊列設置為延遲的消息隊列的ID queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId //消息的原有的主題和消息隊列存入屬性中 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } long eclipseTimeInLock = 0; MappedFile unlockMappedFile = null; //獲取最后一個消息的映射文件,mappedFileQueue可看作是CommitLog文件夾下的一個個文件的映射 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); //寫入消息之前先申請putMessageLock,也就是保證消息寫入CommitLog文件中串行的 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; // Here settings are stored timestamp, in order to ensure an orderly // global //設置消息的存儲時間 msg.setStoreTimestamp(beginLockTimestamp); //mappedFile==null標識CommitLog文件還未創建,第一次存消息則創建CommitLog文件 //mappedFile.isFull()表示mappedFile文件已滿,需要重新創建CommitLog文件 if (null == mappedFile || mappedFile.isFull()) { //里面的參數0代表偏移量 mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } //mappedFile==null說明創建CommitLog文件失敗拋出異常,創建失敗可能是磁盤空間不足或者權限不夠 if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } //mappedFile文件后面追加消息 result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { //釋放鎖 putMessageLock.unlock(); } if (eclipseTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); //消息刷盤 handleDiskFlush(result, putMessageResult, msg); //主從數據同步復制 handleHA(result, putMessageResult, msg); return putMessageResult; }
我們發現在通過putMessage 延遲消息就被放存放到了主題為 SCHEDULE_TOPIC_XXXX的commitlog中,消息的原有的主題和消息隊列存入屬性中,后面再通過定時的方式對這這些消息進行重新發送。
ScheduleMessageService.start()啟動會為每一個延遲隊列創建一個調度任務每一個調度任務對應SCHEDULE_TOPIC_XXXX主題下的一個消息消費隊列。
public void start() { for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }
定時任務的實現類DeliverDelayedMessageTimerTask,核心方法是executeOnTimeup
public void executeOnTimeup() { //根據延遲級別獲取該延遲隊列信息 ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; //未找到說明目前沒有該延遲級別的消息,忽略本次任務 if (cq != null) { //根據offset獲取隊列中獲取當前隊列中有效的消息, SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); //遍歷ConsumeQueue,每一個ConsumeQueue條目是20個字節解析消息 for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { //物理偏移量 long offsetPy = bufferCQ.getByteBuffer().getLong(); //消息長度 int sizePy = bufferCQ.getByteBuffer().getInt(); //消息的tag的Hash值 long tagsCode = bufferCQ.getByteBuffer().getLong(); // if (cq.isExtAddr(tagsCode)) { if (cq.getExt(tagsCode, cqExtUnit)) { tagsCode = cqExtUnit.getTagsCode(); } else { //can't find ext content.So re compute tags code. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", tagsCode, offsetPy, sizePy); long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); } } long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; if (countdown <= 0) { //根據物理偏移量和消息的大小從Commitlog文件中查找消息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); //消息存儲到Commitlog文件中,轉發到主題對應的消息隊列上,供消費者再次消費。 PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore .putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { continue; } else { // XXX: warn and notify me log.error( "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } catch (Exception e) { log.error( "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); } } } else { ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } }else { //未找到有效的消息,更新延遲隊列定時拉取進度,并創建定時任務帶下一次繼續嘗試 long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" + cqMinOffset + ", queueId=" + cq.getQueueId()); } } } //創建延遲任務 ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }
圖解:
1、消息生產者發送消息,如果發送的消息DelayTimeLevel大于0,則改變消息主題為SCHEDULE_TOPIC_XXXX,消息的隊列為DelayTimeLevel-1
2、消息經由Commitlog轉發到消息隊列SCHEDULE_TOPIC_XXXX的消費隊列1。
3、定時任務Timer每隔1秒根據上次拉取消息的偏移量從消費隊列中取出所有消息。
4、根據消息的物理偏移量和消息大小從Commitlog中拉取消息。(PS:消息存儲章節中會重點講解)
5、根據消息的屬性重新創建消息,并恢復原主題TopicTest、原消息隊列ID,清除DelayTimeLevel屬性存入Commitlog中。
6、記錄原主題TopicTest的消息隊列的消息偏移量,供消費者索引檢索消息進行消費。
到此,關于“RocketMQ延遲消息的實現方法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。