您好,登錄后才能下訂單哦!
本篇內容主要講解“spring cloud stream和kafka的原理及作用是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“spring cloud stream和kafka的原理及作用是什么”吧!
Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.
野生翻譯:spring cloud stream是打算統一消息中間件后宮的男人,他身手靈活,身后有靠山spring,會使十八般武器(消息訂閱模式啦,消費者組,stateful partitions什么的),目前后宮有東宮娘娘kafka和西宮娘娘rabbitMQ。
八卦黨:今天我們扒一扒spring cloud stream和kafka的關系,rabbitMQ就讓她在冷宮里面呆著吧。
A streaming platform has three key capabilities:
Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
Store streams of records in a fault-tolerant durable way.
Process streams of records as they occur.
野生翻譯:老娘是個流處理平臺,能干的活可多了:
能處理發布/訂閱消息
用很穩的方式保存消息
一來就處理,真的很快
總結一句話,就是快、穩、準。
kafka的運行非常簡單,從這里下載,然后先運行zookeeper。在最新的kafka的下載包里面也包含了一個zookeeper,可以直接用里面的。zookeeper啟動后,需要在kafka的配置文件里面配置好zookeeper的ip和端口,配置文件是config/server.properties。
############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
然后運行bin目錄下的命令,啟動kafka就可以啦
bin/kafka-server-start.sh -daemon config/server.properties
kafka雖然啟動了,但我們需要了解她的話,還是需要一個總管來匯報情況,我這邊用的就是kafka-manager,下載地址在這里。很可惜的是只有源代碼的下載,沒有可運行版本的,需要自行編譯,這個編譯速度還挺慢的,我這邊提供一個編譯好的版本給大家,點這里。
kafka-manager同樣需要配置一下和kafka的關系,在conf/application.conf文件里面,不過配置的不是kafka自己,而是kafka掛載的zookeeper。
kafka-manager.zkhosts="localhost:2181"
然后啟動bin/kafka-manager就可以了(windows環境下也有kafka-manager.bat可以運行)
這里有個坑,在windows下面運行的話,可能啟動失敗,提示輸入行太長
這個是因為目錄太長,把kafak-manager-2.0.0.2目錄名縮短就可以正常運行了。
啟動后通過Add Cluster把Cluster Zookeeper Host把zookeeper的地址端口填上,Kafka Version的版本一定要和正在使用的kafka版本對上,否則可能看不到kafka的內容。
然后我們就能看到kafka的broker,topic,consumers,partitions等信息了。
一切的起點,還在start.spring.io
這黑乎乎的界面是spring為了萬圣節搞的事情。和我們相關的是右邊這兩個依賴,這兩個依賴在pom.xml里面對應的是這些
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
不過只憑這些還不行,直接運行的話,會提示
Caused by: java.lang.IllegalStateException: Unknown binder configuration: kafka
還需要加上一個依賴包
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
spring cloud stream項目框架搭好后,我們需要分兩個部分,一個是發消息的部分,一個是收消息的地方。我們先看發消息的部分,首先是配置文件,application.yml
spring: cloud: stream: default-binder: kafka #默認的綁定器, kafka: #如果用的是rabbitMQ這里填 rabbit binder: brokers: #Kafka的消息中間件服務器地址 - localhost:9092 bindings: output: #通道名稱 binder: kafka destination: test1 #消息發往的目的地,對應topic group: output-group-1 #對應kafka的group content-type: text/plain #消息的格式
注意這里的output,表示是發布消息的,和后面訂閱消息是對應的。這個output的名字是消息通道名稱,是可以自定義的,后面會講到。
然后我們需要創建一個發布者
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; @EnableBinding(Source.class) public class Producer { private Source mySource; public Producer(Source mySource) { super(); this.mySource = mySource; } public Source getMysource() { return mySource; } public void setMysource(Source mysource) { mySource = mySource; } }
@EnableBinding 按字面理解就知道是綁定通道的,綁定的通道名就是上面的output,Soure.class是spring 提供的,表示這是一個可綁定的發布通道,它的通道名稱就是output,和application.yml里面的output對應
源碼可以看的很清楚
package org.springframework.cloud.stream.messaging; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * Bindable interface with one output channel. * * @author Dave Syer * @author Marius Bogoevici * @see org.springframework.cloud.stream.annotation.EnableBinding */ public interface Source { /** * Name of the output channel. */ String OUTPUT = "output"; /** * @return output channel */ @Output(Source.OUTPUT) MessageChannel output(); }
如果我們需要定義我們自己的通道,可以自己寫一個類,比如下面這種,通道名就改成了my-out
import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface MySource { String INPUT = "my-in"; String OUTPUT = "my-out"; @Input(INPUT) SubscribableChannel myInput(); @Output(OUTPUT) MessageChannel myOutput(); }
這樣的話,application.yml就要改了
my-out: binder: kafka destination: mytest #消息發往的目的地,對應topic group: output-group-2 #對應kafka的group content-type: text/plain #消息的格式
Product.class的@EnableBinding也需要改,為了做對應,我另外寫了一個MyProducer
import org.springframework.cloud.stream.annotation.EnableBinding; @EnableBinding(MySource.class) public class MyProducer { private MySource mySource; public MyProducer(MySource mySource) { super(); this.mySource = mySource; } public MySource getMysource() { return mySource; } public void setMysource(MySource mysource) { mySource = mySource; } }
這樣,發布消息的部分就寫好了,我們寫個controller來發送消息
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import com.wphmoon.kscs.service.ChatMessage; import com.wphmoon.kscs.service.MyProducer; import com.wphmoon.kscs.service.Producer; @RestController public class MyController { @Autowired private Producer producer; @Autowired private MyProducer myProducer; // get the String message via HTTP, publish it to broker using spring cloud stream @RequestMapping(value = "/sendMessage/string", method = RequestMethod.POST) public String publishMessageString(@RequestBody String payload) { // send message to channel output producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build()); return "success"; } @RequestMapping(value = "/sendMyMessage/string", method = RequestMethod.POST) public String publishMyMessageString(@RequestBody String payload) { // send message to channel myoutput myProducer.getMysource().myOutput().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build()); return "success"; } }
很簡單,直接調用producer發送一個字符串就行了,我使用postman來發起這個動作
消息發送出去了,我們怎么收消息呢?往下看。
同樣的,我們用之前的spring cloud stream項目框架做收消息的部分,首先是application.yml文件
server: port: 8081 spring: cloud: stream: default-binder: kafka kafka: binder: brokers: - localhost:9092 bindings: input: binder: kafka destination: test1 content-type: text/plain group: input-group-1 my-in: binder: kafka destination: mytest content-type: text/plain group: input-group-2
重點關注的就是input和my-in ,這個和之前的output和my-out一一對應。
默認和Source類對應的是Sink,這個是官方提供的,代碼如下
package org.springframework.cloud.stream.messaging; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * Bindable interface with one input channel. * * @author Dave Syer * @author Marius Bogoevici * @see org.springframework.cloud.stream.annotation.EnableBinding */ public interface Sink { /** * Input channel name. */ String INPUT = "input"; /** * @return input channel. */ @Input(Sink.INPUT) SubscribableChannel input(); }
調用它的類Consumer用來接收消息,代碼如下
import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.FormatStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.handler.annotation.Payload; @EnableBinding(Sink.class) public class Consumer { private static final Logger logger = LoggerFactory.getLogger(Consumer.class); @StreamListener(target = Sink.INPUT) public void consume(String message) { logger.info("recieved a string message : " + message); } @StreamListener(target = Sink.INPUT, condition = "headers['type']=='chat'") public void handle(@Payload ChatMessage message) { final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM) .withZone(ZoneId.systemDefault()); final String time = df.format(Instant.ofEpochMilli(message.getTime())); logger.info("recieved a complex message : [{}]: {}", time, message.getContents()); } }
而我們自定義channel的類MySink和MyConsumer代碼如下:
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface MySink { String INPUT = "my-in"; @Input(INPUT) SubscribableChannel myInput(); }
import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.FormatStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.handler.annotation.Payload; @EnableBinding(MySink.class) public class MyConsumer { private static final Logger logger = LoggerFactory.getLogger(MyConsumer.class); @StreamListener(target = MySink.INPUT) public void consume(String message) { logger.info("recieved a string message : " + message); } @StreamListener(target = MySink.INPUT, condition = "headers['type']=='chat'") public void handle(@Payload ChatMessage message) { final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM) .withZone(ZoneId.systemDefault()); final String time = df.format(Instant.ofEpochMilli(message.getTime())); logger.info("recieved a complex message : [{}]: {}", time, message.getContents()); } }
這樣就OK了,當上面我們用postman發了消息后,這邊就能直接在日志里面看到
2019-10-29 18:42:39.455 INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.MyConsumer : recieved a string message : 你瞅啥 2019-10-29 18:43:17.017 INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.Consumer : recieved a string message : 你瞅啥
我們在application.yml里面定義的destination,就是kafka的topic,在kafka-manager的topic list里面可以看到
而接收消息的consumer也可以看到
這就是spring cloud stream和kafka的帝后之戀,不過他們這種政治聯姻哪有這么簡單,里面復雜的部分我們后面再講,敬請期待,起駕回宮(野生翻譯:The Return of the King)
到此,相信大家對“spring cloud stream和kafka的原理及作用是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。