91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SpringBoot如何使用RocketMQ

發布時間:2021-09-28 09:59:55 來源:億速云 閱讀:145 作者:小新 欄目:編程語言

這篇文章主要為大家展示了“SpringBoot如何使用RocketMQ”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“SpringBoot如何使用RocketMQ”這篇文章吧。

什么是RocketMQ?#

官方說明:

隨著使用越來越多的隊列和虛擬主題,ActiveMQ IO模塊遇到了瓶頸。我們盡力通過節流,斷路器或降級來解決此問題,但效果不佳。因此,我們那時開始關注流行的消息傳遞解決方案Kafka。不幸的是,Kafka不能滿足我們的要求,特別是在低延遲和高可靠性方面。

看到這里可以很清楚的知道RcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。

具有以下特性:

支持發布/訂閱(Pub/Sub)和點對點(P2P)消息模型  能夠保證嚴格的消息順序,在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞  提供豐富的消息拉取模式,支持拉(pull)和推(push)兩種消息模式  單一隊列百萬消息的堆積能力,億級消息堆積能力  支持多種消息協議,如 JMS、MQTT 等  分布式高可用的部署架構,滿足至少一次消息傳遞語義

RocketMQ環境安裝#

下載地址:https://rocketmq.apache.org/dowloading/releases/

從官方下載二進制或者源碼來進行使用。源碼編譯需要Maven3.2x,JDK8

在根目錄進行打包:

mvn -Prelease-all -DskipTests clean packager -U

distribution/target/apache-rocketmq文件夾中會存在一個文件夾版,zip,tar三個可運行的完整程序。

使用rocketmq-4.6.0.zip:

啟動名稱服務 mqnamesrv.cmd  啟動數據中心 mqbroker.cmd -n localhost:9876

SpringBoot環境中使用RocketMQ#

SpringBoot 入門:https://www.jb51.net/article/177449.htm

SpringBoot 常用start:https://www.jb51.net/article/177451.htm

當前環境版本為:

SpringBoot 2.0.6.RELEASE  SpringCloud Finchley.RELEASE  SpringCldod Alibaba 0.2.1.RELEASE  RocketMQ 4.3.0

在項目工程中導入:

<!-- MQ Begin --><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq.version}</version></dependency><!-- MQ End -->

由于我們這邊已經有工程了所以就不在進行創建這種過程了。主要是看看如何使用RocketMQ。

創建RocketMQProperties配置屬性類,類中內容如下:

@ConfigurationProperties(prefix = "rocketmq")public class RocketMQProperties { private boolean isEnable = false; private String namesrvAddr = "localhost:9876"; private String groupName = "default"; private int producerMaxMessageSize = 1024; private int producerSendMsgTimeout = 2000; private int producerRetryTimesWhenSendFailed = 2; private int consumerConsumeThreadMin = 5; private int consumerConsumeThreadMax = 30; private int consumerConsumeMessageBatchMaxSize = 1; //省略get set}

現在我們所有子系統中的生產者,消費者對應:

isEnable 是否開啟mq

namesrvAddr 集群地址

groupName 分組名稱

設置為統一已方便系統對接,如有其它需求在進行擴展,類中我們已經給了默認值也可以在配置文件或配置中心中獲取配置,配置如下:

#發送同一類消息的設置為同一個group,保證唯一,默認不需要設置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示rocketmq.groupName=please_rename_unique_group_name#是否開啟自動配置rocketmq.isEnable=true#mq的nameserver地址rocketmq.namesrvAddr=127.0.0.1:9876#消息最大長度 默認1024*4(4M)rocketmq.producer.maxMessageSize=4096#發送消息超時時間,默認3000rocketmq.producer.sendMsgTimeout=3000#發送消息失敗重試次數,默認2rocketmq.producer.retryTimesWhenSendFailed=2#消費者線程數量rocketmq.consumer.consumeThreadMin=5rocketmq.consumer.consumeThreadMax=32#設置一次消費消息的條數,默認為1條rocketmq.consumer.consumeMessageBatchMaxSize=1

創建消費者接口 RocketConsumer.java 該接口用戶約束消費者需要的核心步驟:

/** * 消費者接口 *  * @author SimpleWu * */public interface RocketConsumer {/** * 初始化消費者 */ public abstract void init(); /** * 注冊監聽 *  * @param messageListener */ public void registerMessageListener(MessageListener messageListener);}

創建抽象消費者 AbstractRocketConsumer.java:

/** * 消費者基本信息 *  * @author SimpelWu */public abstract class AbstractRocketConsumer implements RocketConsumer { protected String topics; protected String tags; protected MessageListener messageListener; protected String consumerTitel; protected MQPushConsumer mqPushConsumer; /** * 必要的信息 *  * @param topics * @param tags * @param consumerTitel */ public void necessary(String topics, String tags, String consumerTitel) { this.topics = topics; this.tags = tags; this.consumerTitel = consumerTitel; } public abstract void init(); @Override public void registerMessageListener(MessageListener messageListener) { this.messageListener = messageListener; } }

在類中我們必須指定這個topics,tags與消息監聽邏輯

public abstract void init();該方法是用于初始化消費者,由子類實現。

接下來我們編寫自動配置類RocketMQConfiguation.java,該類用戶初始化一個默認的生產者連接,以及加載所有的消費者。

@EnableConfigurationProperties({ RocketMQProperties.class }) 使用該配置文件

@Configuration 標注為配置類

@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") 只有當配置中指定rocketmq.isEnable = true的時候才會生效

核心內容如下:

/** * mq配置 *  * @author SimpleWu */@Configuration@EnableConfigurationProperties({ RocketMQProperties.class })@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true")public class RocketMQConfiguation { private RocketMQProperties properties; private ApplicationContext applicationContext; private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class); public RocketMQConfiguation(RocketMQProperties properties, ApplicationContext applicationContext) { this.properties = properties; this.applicationContext = applicationContext; } /** * 注入一個默認的消費者 * @return * @throws MQClientException */ @Bean public DefaultMQProducer getRocketMQProducer() throws MQClientException { if (StringUtils.isEmpty(properties.getGroupName())) {  throw new MQClientException(-1, "groupName is blank"); } if (StringUtils.isEmpty(properties.getNamesrvAddr())) {  throw new MQClientException(-1, "nameServerAddr is blank"); } DefaultMQProducer producer; producer = new DefaultMQProducer(properties.getGroupName()); producer.setNamesrvAddr(properties.getNamesrvAddr()); // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); // 如果需要同一個jvm中不同的producer往不同的mq集群發送消息,需要設置不同的instanceName // producer.setInstanceName(instanceName); producer.setMaxMessageSize(properties.getProducerMaxMessageSize()); producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout()); // 如果發送消息失敗,設置重試次數,默認為2次 producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed()); try {  producer.start();  log.info("producer is start ! groupName:{},namesrvAddr:{}", properties.getGroupName(),   properties.getNamesrvAddr()); } catch (MQClientException e) {  log.error(String.format("producer is error {}", e.getMessage(), e));  throw e; } return producer; } /** * SpringBoot啟動時加載所有消費者 */ @PostConstruct public void initConsumer() { Map<String, AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class); if (consumers == null || consumers.size() == 0) {  log.info("init rocket consumer 0"); } Iterator<String> beans = consumers.keySet().iterator(); while (beans.hasNext()) {  String beanName = (String) beans.next();  AbstractRocketConsumer consumer = consumers.get(beanName);  consumer.init();  createConsumer(consumer);  log.info("init success consumer title {} , toips {} , tags {}", consumer.consumerTitel, consumer.tags,   consumer.topics); } } /** * 通過消費者信心創建消費者 *  * @param consumerPojo */ public void createConsumer(AbstractRocketConsumer arc) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName()); consumer.setNamesrvAddr(this.properties.getNamesrvAddr()); consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin()); consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax()); consumer.registerMessageListener(arc.messageListenerConcurrently); /**  * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 如果非第一次啟動,那么按照上次消費的位置繼續消費  */ // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /**  * 設置消費模型,集群還是廣播,默認為集群  */ // consumer.setMessageModel(MessageModel.CLUSTERING); /**  * 設置一次消費消息的條數,默認為1條  */ consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize()); try {  consumer.subscribe(arc.topics, arc.tags);  consumer.start();  arc.mqPushConsumer=consumer; } catch (MQClientException e) {  log.error("info consumer title {}", arc.consumerTitel, e); } }}

然后在src/main/resources文件夾中創建目錄與文件META-INF/spring.factories里面添加自動配置類即可開啟啟動配置,我們只需要導入依賴即可:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.xcloud.config.rocketmq.RocketMQConfiguation

接下來在服務中導入依賴,然后通過我們的抽象類獲取所有必要信息對消費者進行創建,該步驟會在所有消費者初始化完成后進行,且只會管理是Spring Bean的消費者。

下面我們看看如何創建一個消費者,創建消費者的步驟非常簡單,只需要繼承AbstractRocketConsumer然后再加上Spring的@Component就能夠完成消費者的創建,我們可以在類中自定義消費的主題與標簽。

在項目可以根據需求當消費者創建失敗的時候是否繼續啟動工程。

創建一個默認的消費者 DefaultConsumerMQ.java

@Componentpublic class DefaultConsumerMQ extends AbstractRocketConsumer { /** * 初始化消費者 */ @Override public void init() { // 設置主題,標簽與消費者標題 super.necessary("TopicTest", "*", "這是標題"); //消費者具體執行邏輯 registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  msgs.forEach(msg -> {   System.out.printf("consumer message boyd %s %n", new String(msg.getBody()));  });  // 標記該消息已經被成功消費  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  } }); }}

super.necessary("TopicTest", "*", "這是標題"); 是必須要設置的,代表該消費者監聽TopicTest主題下所有tags,標題那個字段是我自己定義的,所以對于該配置來說沒什么意義。

我們可以在這里注入Spring的Bean來進行任意邏輯處理。

創建一個消息發送類進行測試

@Overridepublic String qmtest(@PathVariable("name")String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException { Message msg = new Message("TopicTest", "tags1", name.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 發送消息到一個Broker SendResult sendResult = defaultMQProducer.send(msg); // 通過sendResult返回消息是否成功送達 System.out.printf("%s%n", sendResult); return null;}

我們來通過Http請求測試:

http://localhost:10001/demo/base/mq/hello consumer message boyd hello http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿 consumer message boyd 嘿嘿嘿嘿嘿

好了到這里簡單的start算是設計完成了,后面還有一些:順序消息生產,順序消費消息,異步消息生產等一系列功能,官人可參照官方去自行處理。

ActiveMQ 沒經過大規模吞吐量場景的驗證,社區不高不活躍。  RabbitMQ 集群動態擴展麻煩,且與當前程序語言不至于難以定制化。  kafka 支持主要的MQ功能,功能無法達到程序需求的要求,所以不使用,且與當前程序語言不至于難以定制化。  rocketMQ 經過全世界的女人的洗禮,已經很強大;MQ功能較為完善,還是分布式的,擴展性好;支持復雜MQ業務場景。(業務復雜可做首選)

以上是“SpringBoot如何使用RocketMQ”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

天等县| 天祝| 新郑市| 个旧市| 乐都县| 晋宁县| 江川县| 根河市| 武冈市| 杭州市| 丘北县| 英山县| 于田县| 怀宁县| 江永县| 江华| 大英县| 常德市| 丹寨县| 泰兴市| 彭州市| 寻甸| 洛川县| 江达县| 白水县| 旬阳县| 黎平县| 邹平县| 兰西县| 长阳| 乳山市| 仙居县| 南开区| 蒙山县| 峡江县| 黄大仙区| 连平县| 山阴县| 特克斯县| 娄底市| 驻马店市|