您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關怎么使用Spring Cloud Stream玩轉RabbitMQ,RocketMQ和Kafka,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
業務的發展對MQ的依賴越來越重,地位也越來越高,對它的需求也越來越多。比如順序消費,事務消息,回溯消費等,性能方面也有更高要求。越來越多的趨勢提醒我們有更好MQ方案。
假如我們將“MQ從Rabbit替換成Rocket”的方案提上議程,就會發放這是一個非常浩大的工程。以前好多服務都是用的有RabbitMQ
的特征代碼,如果要替換相當于所有服務的代碼都要較大的更新,這帶來的運營風險是巨大的,需要非常多的開發測試資源的投入。
那回頭來講,我們最開始使用rabbitmq的時候能不能盡量隱藏特征代碼嗎,為以后的升級替換保留可能性。
這個時候就需要使用Spring Cloud
的子組件Spring Cloud Stream
。它是一個構建消息驅動微服務的框架,提供一套消息訂閱消費的標準為不同供應商的消息中間件進行集成。目前官方提供Kafka
和RabbitMQ
的集成實現,而阿里也實現對RocketMQ
的集成。
Spring Cloud Stream應用由第三方的中間件組成。應用間的通信通過輸入通道(input channel)和輸出通道(output channel)完成。這些通道是由Spring Cloud Stream 注入的。而通道與外部的代理的連接又是通過Binder實現的。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
public interface Source { String OUTPUT = "myOutput"; @Output(OUTPUT) MessageChannel message(); } public interface Sink { String INPUT = "myInput"; @Input(INPUT) SubscribableChannel sub1(); }
輸出通道為消息的發送方,輸入通道為消息的接收方
myOutput
,myInput
為通道名,后續通過配置文件進行特性配置,切記兩個通道的綁定最好是分開定義,不然有可能產生莫名錯誤
spring cloud: stream: bindings: myOutput: destination: login-user myInput: # 通道名,對應代碼中的消費監聽組 destination: login-user # exchange group: logined-member # 消費組 rabbit: bindings: myOutput: producer: routing-key-expression: headers.routingKey # 發送端路由key delayed-exchange: true # 開啟延時隊列 myInput: consumer: binding-routing-key: login.user.succeed # 消費監聽路由表達式 delayed-exchange: true # 開啟延時隊列 auto-bind-dlq: true # 綁定死信隊列 republish-to-dlq: true # 重投到死信隊列并帶有報錯信息
destination
消息的主題名在Rabbit中用來定義exchange
以及成為queue
的一部分
group
消費組沒有定義消費組時,如果啟動多實例則一個消息同時都消費
定義了消費組后,多實例共用一個queue,負載消費。從圖可以看出queue名為destination.group
組成
binding-routing-key:消費路由監聽表達式
delayed-exchange: 開啟延時隊列
auto-bind-dlq:開啟死信隊列
republish-to-dlq:此設置可以讓死信消息帶報錯信息
發送消息
@Autowired private Source source; @GetMapping("/") public void sendSucceed() { source.message().send(MessageBuilder.withPayload("Hello World...") .setHeader("routingKey", "login.user.succeed") .setHeader("version", "1.0") .setHeader("x-delay", 5000) .build()); }
這里可以為消息設置不同header,以現實不同的功能,這部分每種MQ有不同的特性,需要視情況而定
接收消息
@StreamListener(value = Sink.MY_INPUT_1, condition = "headers['version']=='1.0'") public void receiveSucceed_v1(@Payload String message) { String msg = "StreamReceiver v1: " + message; log.error(msg); }
@EnableBinding(value = {Source.class, Sink.class}) @SpringBootApplication public class RabbitApplication { public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); } }
實現這5步就可以正常發送接收消息了,你會發現除了引入不同的包和消息特性配置外,其它的代碼都是抽象代碼,沒有任何rabbitmq的特征代碼
根據RabbitMQ
的相關代碼,只需要修改引入包和特片配置就可以替換成RocketMQ
了(一些特性功能除外)
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
spring cloud: stream: bindings: myOutput: destination: login-user content-type: application/json myInput: # 通道名,對應代碼中的消費監聽組 destination: login-user # exchange group: logined-member # 消費者組, 同組負載消費 rocketmq: binder: name-server: 127.0.0.1:9876
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
spring cloud: stream: bindings: myOutput: destination: login-user content-type: application/json myInput: # 通道名,對應代碼中的消費監聽組 destination: login-user # exchange group: logined-member # 消費者組, 同組負載消費 kafka: binder: brokers: localhost:9092 #Kafka的消息中間件服務器 auto-create-topics: true
由上面三個簡單的例子可以看出,Spring Cloud Stream
對消息訂閱和消費做了高度抽象,用一套代碼實現多種消息中間件的支持。同時它也可以非常簡單的實現多種消息中間件的混用,大大擴展了消息中間件的玩法。
上述就是小編為大家分享的怎么使用Spring Cloud Stream玩轉RabbitMQ,RocketMQ和Kafka了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。