您好,登錄后才能下訂單哦!
這篇文章主要介紹“RocketMQ順序消息是什么意思”,在日常操作中,相信很多人在RocketMQ順序消息是什么意思問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RocketMQ順序消息是什么意思”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
我們知道消息隊列的特性導致其消息不是順序進行消費的,RocketMQ沒有提供所謂的順序消息來供我們使用,但是有時候一些場景需要需要順序的去接收消息。今天我們重點討論一下如何實現這種功能。雖然RocketMQ沒有提供順序消費但是我們可以變相的來實現它。我們知道消息需要放入隊列中才能被消費,而隊列本身的特性就是FIFO先進先出,我們可以將需要順序的消息放入一個隊列中,則就可以實現這個功能。
場景:兩個業務系統之間消息通過MQ傳輸,業務系統A數據傳輸至業務系統B,要求消息準確、實時。但是業務系統A的原始的數據可能會存在修改的情況,要求業務系統B需要實時的更改。保證消息的實時性、一致性、可靠性。
主題test_1發送消息到RocketMQ的雙主Broker1、Broker2上,每個broker上test_1主題對應4個隊列,消息id為001001的消息存在創建(create)、更新(update),MQ集群是雙主的,使用默認的消息發送算法,消息將輪詢的丟棄到各個隊列中。
默認按照輪詢算法將消息分發到各個broker的不同的隊列中,保證每個隊列的消息都是均勻分配,集群消費且消費者多個時,多個消費者會分散到不同的隊列中消費消息,保證消息能夠實時消費。
因為消息本身是放入到不同的隊列中消費的就不能保證其順序性,更新的消息可能是最先被消費掉,創建的消息消費時業務需要判斷消息是否是最新的,需要進行查庫驗證,是則更新,不是則丟棄保存最新的消息,保證業務系統A與業務系統B,數據的一致性,增加了業務處理的難度。
我們在生產消息的時候可以將同一個消息ID的消息放入到相同的隊列中,保證同一類需要順序消費的消息放入到同一個隊列中,這樣隊列中的消息就是有序的。但是同時也需要保證消息的消費也是有序的才可以保證消息的順序消費。
集群模式下同一個消費組內的消費者共同承擔其訂閱主題下的消息隊列的消費,同一個消息消息隊列在同一時刻只會被消費組內的一個消費者消費,一個消費者同一時刻可以分配多個消費隊列。
集群模式下的普通消息,線程池默認創建20個(可配置)線程。多線程從隊列中拉取消息,提高并發加快消息的消費。
集群模式下的順序消息,順序消費是單線程,一個線程只能去一個隊列獲取數據,當需要獲取某個隊列中的消息時,需要鎖定該消息隊列(PS:后面會根據源碼詳細分析其原理)。
廣播模式下的順序消息,順序消費是單線程,直接進行消費,無需鎖定消息隊列,因為相互之間無競爭
public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("order_group_test_1"); //Launch the instance. producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; //Create a message instance, specifying topic, tag and message body. Message msg = new Message("order_test_1", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } //server shutdown producer.shutdown(); }
我們根據100個消息的序號來放入到不同的隊列中,根據序號%10取模,相同的放入到一個隊列中。
上面是我們實現自定義隊列選擇器的算法,RocketMQ也提供了三種隊列選擇算法
從圖中我們可以看到一共三種
SelectMessageQueueByHash:通過 hash 進行選擇 queue。
SelectMessageQueueByRandom:隨機選擇 queue。
SelectMessageQueueByMachineRoom:機房選擇queue(未實現)
我們分別來看一下實現
public class SelectMessageQueueByHash implements MessageQueueSelector { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); if (value < 0) { value = Math.abs(value); } value %= mqs.size(); return ((MessageQueue) mqs.get(value)); } }
我們差看源碼可以發現,通過提供的參數獲取其HashCode,如果為負值則取絕對值,hash值與隊列的總數進行取模獲取其隊列。
public class SelectMessageQueueByRandom implements MessageQueueSelector { private Random random; public SelectMessageQueueByRandom() { this.random = new Random(System.currentTimeMillis()); } public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = this.random.nextInt(mqs.size()); return ((MessageQueue) mqs.get(value)); } }
生成一個隊列數以內的隨機數,通過隨機數獲取隊列。
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { private Set<String> consumeridcs; public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } public Set<String> getConsumeridcs() { return this.consumeridcs; } public void setConsumeridcs(Set<String> consumeridcs) { this.consumeridcs = consumeridcs; } }
我們發現其select方法為null,其實是沒有進行實現。需要我們自己實現。
雖然RocketMQ提供了三種(其實2種,SelectMessageQueueByMachineRoom未實現)隊列選擇算法,但是不建議使用,不同的業務規則其選擇隊列的算法也不盡相同,建議手動實現。
public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("order_consumer_test_push"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("order_test_1", "*"); consumer.registerMessageListener(new MessageListenerOrderly(){ @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> paramList, ConsumeOrderlyContext paramConsumeOrderlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: "+ msgbody);//輸出消息內容 } } catch (Exception e) { e.printStackTrace(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; //消費成功 } }); consumer.start(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }
查看結果
我們找到兩個典型的一組數字,尾號是6和9的,尾號是6的計較集中,尾號是9的比較分散,但是結果都是一樣的,按照順序消費的。
到此,關于“RocketMQ順序消息是什么意思”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。