您好,登錄后才能下訂單哦!
本篇內容介紹了“RocketMQ普通消息同步發送怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
同步消息是指發送出消息后,同步等待,直到接收到Broker發送成功的響應才會繼續發送下一個消息。這個方式可以確保消息發送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。
public static void main(String[] args) throws Exception { //實例化消息生產者對象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動Producer實例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //同步發送方式 SendResult send = producer.send(msg); //確認返回 System.out.println(send); } //關閉producer producer.shutdown(); }
異步消息發送方在發送了一條消息后,不等接收方發回響應,接著進行第二條消息發送。發送方通過回調接口的方式接收服務器響應,并對響應結果進行處理。
public static void main(String[] args) throws Exception { //實例化消息生產者對象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動Producer實例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //SendCallback會接收異步返回結果的回調 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { throwable.printStackTrace(); } }); } //若是過早關閉producer,會拋出The producer service state not OK, SHUTDOWN_ALREADY的錯 Thread.sleep(10000); //關閉producer producer.shutdown(); }
單項發送不關心發送的結果,只發送請求不等待應答。發送消息耗時極短。
public static void main(String[] args) throws Exception { //實例化消息生產者對象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動Producer實例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //同步發送方式 producer.sendOneway(msg); } //關閉producer producer.shutdown(); }
消費者采用負載均衡的方式消費消息,同一個Group下的多個Consumer共同消費Queue里的Message,每個Consumer處理的消息不同。
一個Consumer Group中的各個Consumer實例分共同消費消息,即一條消息只會投遞到一個Group下面的一個實例,并且只消費一遍。
例如某個Topic有3個隊列,其中一個Consumer Group 有 3 個實例,那么每個實例只消費其中的1個隊列。集群消費模式是消費者默認的消費方式。
public static void main(String[] args) throws Exception { //實例化消息消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.CLUSTERING); // 注冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 consumer.start(); System.out.printf("Consumer Started.%n"); }
廣播消費模式中把消息對一個Group下的各個Consumer實例都投遞一遍。也就是說消息也會被 Group 中的每個Consumer都消費一次。
實際上,是一個消費組下的每個消費者實例都獲取到了topic下面的每個Message Queue去拉取消費。所以消息會投遞到每個消費者實例。
public static void main(String[] args) throws Exception { //實例化消息消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.BROADCASTING); // 注冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 consumer.start(); System.out.printf("Consumer Started.%n"); }
“RocketMQ普通消息同步發送怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。