您好,登錄后才能下訂單哦!
本篇文章為大家展示了RocketMQ推拉模式是什么,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
消費者客戶端有兩種方式從消息中間件獲取消息并消費。嚴格意義上來講,RocketMQ并沒有實現PUSH模式,而是對拉模式進行一層包裝,名字雖然是 Push 開頭,實際在實現時,使用 Pull 方式實現。通過 Pull 不斷輪詢 Broker 獲取消息。當不存在新消息時,Broker 會掛起請求,直到有新消息產生,取消掛起,返回新消息。
由消費者客戶端主動向消息中間件(MQ消息服務器代理)拉取消息;采用Pull方式,如何設置Pull消息的拉取頻率需要重點去考慮,舉個例子來說,可能1分鐘內連續來了1000條消息,然后2小時內沒有新消息產生(概括起來說就是“消息延遲與忙等待”)。如果每次Pull的時間間隔比較久,會增加消息的延遲,即消息到達消費者的時間加長,MQ中消息的堆積量變大;若每次Pull的時間間隔較短,但是在一段時間內MQ中并沒有任何消息可以消費,那么會產生很多無效的Pull請求的RPC開銷,影響MQ整體的網絡性能;
由消息中間件(MQ消息服務器代理)主動地將消息推送給消費者;采用Push方式,可以盡可能實時地將消息發送給消費者進行消費。但是,在消費者的處理消息的能力較弱的時候(比如,消費者端的業務系統處理一條消息的流程比較復雜,其中的調用鏈路比較多導致消費時間比較久。概括起來地說就是“慢消費問題”),而MQ不斷地向消費者Push消息,消費者端的緩沖區可能會溢出,導致異常;
主動推送的模式實現起來簡單,避免了拉取的消費端業務邏輯的復雜度,消息的消費可以認為是實時的,同時也存在一定的弊端,要求消費端要有很強的消費能力。
public class Consumer1 { public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_push"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); Date date = new Date(msg.getStoreTimestamp()); System.out.println("Consumer1=== 存入時間 : "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功 } }); consumer.start(); System.out.println("Consumer1===啟動成功!"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
PUSH消費方式,需要注冊一個監聽器Listener,,用來監聽最新的消息,進行業務處理,同時反饋消息的消費狀態,消費成功(CONSUME_SUCCESS)、消費重試(RECONSUME_LATER),消息重試會根據配置的消息的延遲等級的時間間隔,定時重新發送消費失敗的記錄。(PS:延遲消息中會重點討論)
PUSH消息方式由于返回了消息的狀態,服務端會維護每個消費端的消費進度,內部會記錄消費進度,消息發送成功后會更新消費進度。
PUSH消息方式的局限性,是在HOLD住Consumer請求的時候需要占用資源,它適合用在消息隊列這種客戶端連接數可控的場景中。
上一個章節說明了服務端存儲的每個主題對應的消費組的每個消息隊列的偏移量
查看服務器文件上的消費進度信息:/usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json
public class PullConsumer { private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pullConsumer"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); for (MessageQueue mq : mqs) { SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.println("============================================================="); System.out.println("Consume from the queue: " + mq + "offset:" + getMessageQueueOffset(mq) + "結果:" + pullResult.getPullStatus()); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: List<MessageExt> messageExtList = pullResult.getMsgFoundList(); for (MessageExt m : messageExtList) { System.out.print(new String(m.getBody()) +" == "); } System.out.println(""); case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null) return offset; return 0; } }
結果:
每次拉取消息的時候需要提供偏移量和拉取的消息的個數,需要自己業務實現每個主題下的隊列的消費進度。
代碼實現(1)這種方式只能拉取歷史的消息,最新的消息拉取不了,也可以進行改造,來實現一直拉取。
在MQPullConsumer這個類里面,有一個MessageQueueListener,它的目的就是當queue發生變化的時候,通知Consumer。也正是這個借口,幫助我們在Pull模式里面,實現負載均衡。
注意,這個接口在MQPushConsumer里面是沒有的,那里面有的是上面代碼里的MessageListener。
void registerMessageQueueListener(final String topic, final MessageQueueListener listener); public interface MessageQueueListener { void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided); }
有了這個Listener,我們就可以動態的知道當前的Consumer分攤到了幾個MessageQueue。然后對這些MessageQueue,我們可以開個線程池來消費。
public class PullConsumerExtend { public static void main(String[] args) throws MQClientException { //消費組 final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("pullConsumer"); //MQ NameService地址 scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); //負載均衡模式 scheduleService.setMessageModel(MessageModel.CLUSTERING); //需要處理的消息topic scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() { @Override public void doPullTask(MessageQueue mq, PullTaskContext context) { MQPullConsumer consumer = context.getPullConsumer(); try { long offset = consumer.fetchConsumeOffset(mq, false); if (offset < 0) offset = 0; PullResult pullResult = consumer.pull(mq, "*", offset, 32); System.out.println(""); System.out.println("Consume from the queue: " + mq + "offset:" + offset + "結果:" + pullResult.getPullStatus()); switch (pullResult.getPullStatus()) { case FOUND: List<MessageExt> messageExtList = pullResult.getMsgFoundList(); for (MessageExt m : messageExtList) { System.out.print(new String(m.getBody()) +" == "); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: case OFFSET_ILLEGAL: break; default: break; } consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); //設置下一下拉取的間隔時間 context.setPullNextDelayTimeMillis(10000); } catch (Exception e) { e.printStackTrace(); } } }); scheduleService.start(); } }
結果:
比較**代碼實現(1)**這種方式改進了很多,不需要業務維護每個消費隊列的消費進度,可以更新到服務端的。
弊端也很明顯就是每次隊列拉取消息的時間間隔,時間長導致消息擠壓,時間段消息少,影響服務端性能。
上述內容就是RocketMQ推拉模式是什么,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。