您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關如何從零開始搭建Kafka+SpringBoot分布式消息系統,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
由于kafka強依賴于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java編寫的,需運行在jvm上,所以首先應具備java環境。 (ps:默認您的centos系統可聯網,本教程就不教配置ip什么的了) (ps2:沒有wget的先裝一下:yum install wget) (ps3:人啊,就是要條理。東邊放一點,西邊放一點,過段時間就不知道自己裝在哪里了。本教程所有下載均放在/usr/local目錄下) (ps4:kafka可能有內置zookeeper,感覺可以越過zookeeper教程,但是這里也配置出來了。我沒試過)
因為oracle 公司不允許直接通過wget 下載官網上的jdk包。所以你直接wget以下地址下載下來的是一個只有5k的網頁文件而已,并不是需要的jdk包。(壟斷地位就是任性)。 (請通過java -version判斷是否自帶jdk,我的沒帶)
下面是jdk8的官方下載地址:
https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html
這里通過xftp上傳到服務器指定位置:/usr/local
運行命令使環境生效
source /etc/profile
等待下載完成之后解壓:
tar -zxvf zookeeper-3.4.6.tar.gz
重命名為zookeeper1
mv zookeeper-3.4.6 zookeeper1 cp -r zookeeper1 zookeeper2 cp -r zookeeper1 zookeeper3
在zookeeper1目錄下創建
在data目錄下新建myid文件。內容為1
cd /usr/local/zookeeper/zookeeper1/conf/ cp zoo_sample.cfg zoo.cfg
進行過上面兩步之后,有zoo.cfg文件了,現在修改內容為:
dataDir=/usr/local/zookeeper/zookeeper1/data dataLogDir=/usr/local/zookeeper/zookeeper1/logs server.1=192.168.233.11:2888:3888 server.2=192.168.233.11:2889:3889 server.3=192.168.233.11:2890:3890
首先,復制改名。
cd /usr/local/zookeeper/ cp -r zookeeper1 zookeeper2
然后修改具體的某些配置:
vim zookeeper2/conf/zoo.cfg
將下圖三個地方1改成2
vim zookeeper2/data/myid
同時將myid中的值改成2
vim zookeeper3/conf/zoo.cfg
修改為3
cd /usr/local/zookeeper/zookeeper1/bin/
由于啟動所需代碼比較多,這里簡單寫了一個啟動腳本:
vim start
start的內容如下
cd /usr/local/zookeeper/zookeeper1/bin/ ./zkServer.sh start ../conf/zoo.cfg cd /usr/local/zookeeper/zookeeper2/bin/ ./zkServer.sh start ../conf/zoo.cfg cd /usr/local/zookeeper/zookeeper3/bin/ ./zkServer.sh start ../conf/zoo.cfg
下面是連接腳本:
vim login
login內容如下:
./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183
腳本編寫完成,接下來啟動:
sh start sh login
啟動集群成功,如下圖:
首先創建kafka目錄:
mkdir /usr/local/kafka
然后在該目錄下載
cd /usr/local/kafka/ wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
下載成功之后解壓:
tar -zxvf kafka_2.11-1.1.0.tgz
首先進入conf目錄下:
cd /usr/local/kafka/kafka_2.11-1.1.0/config
修改server.properties 修改內容:
broker.id=0 log.dirs=/tmp/kafka-logs listeners=PLAINTEXT://192.168.233.11:9092
復制兩份server.properties
cp server.properties server2.properties cp server.properties server3.properties
修改server2.properties
vim server2.properties
修改主要內容為:
broker.id=1 log.dirs=/tmp/kafka-logs1 listeners=PLAINTEXT://192.168.233.11:9093
如上,修改server3.properties 修改內容為:
broker.id=2 log.dirs=/tmp/kafka-logs2 listeners=PLAINTEXT://192.168.233.11:9094
這里還是在bin目錄編寫一個腳本:
cd ../bin/ vim start
腳本內容為:
./kafka-server-start.sh ../config/server.properties & ./kafka-server-start.sh ../config/server2.properties & ./kafka-server-start.sh ../config/server3.properties &
通過jps命令可以查看到,共啟動了3個kafka。
cd /usr/local/kafka/kafka_2.11-1.1.0 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
kafka打印了幾條日志
查看kafka狀態
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
可以看出,啟動消費者之后就會自動消費。
消費者自動捕獲成功!
不滿足的話啟動springboot的時候會拋異常的!!!ps:該走的岔路我都走了o(╥﹏╥)o (我的kafka-clients是1.1.0,spring-kafka是2.2.2,中間那列暫時不用管)
回歸正題,搞了兩個小時,終于搞好了,想哭… 遇到的問題基本就是jar版本不匹配。 上面的步驟我也都會相應的去修改,爭取大家按照本教程一遍過!!!
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.gzky</groupId> <artifactId>study</artifactId> <version>0.0.1-SNAPSHOT</version> <name>study</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.3.8.RELEASE</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
pom文件中,重點是下面這兩個版本。
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency>
spring: redis: cluster: #設置key的生存時間,當key過期時,它會被自動刪除; expire-seconds: 120 #設置命令的執行時間,如果超過這個時間,則報錯; command-timeout: 5000 #設置redis集群的節點信息,其中namenode為域名解析,通過解析域名來獲取相應的地址; nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006 kafka: # 指定kafka 代理地址,可以多個 bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094 producer: retries: 0 # 每次批量發送消息的數量 batch-size: 16384 buffer-memory: 33554432 # 指定消息key和消息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # 指定默認消費者group id group-id: test-group auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 100 # 指定消息key和消息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer server: port: 8085 servlet: #context-path: /redis context-path: /kafka
沒有配置Redis的可以把Redis部分刪掉,也就是下圖: 想學習配置Redis集群的可以參考:《Redis集群redis-cluster的搭建及集成springboot》
package com.gzky.study.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * kafka生產者工具類 * * @author biws * @date 2019/12/17 **/ @Component public class KfkaProducer { private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * 生產數據 * @param str 具體數據 */ public void send(String str) { logger.info("生產數據:"> 4、消費者package com.gzky.study.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * kafka消費者監聽消息 * * @author biws * @date 2019/12/17 **/ @Component public class KafkaConsumerListener { private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class); @KafkaListener(topics = "testTopic") public void onMessage(String str){ //insert(str);//這里為插入數據庫代碼 logger.info("監聽到:" + str); System.out.println("監聽到:" + str); } } 5、對外接口package com.gzky.study.controller; import com.gzky.study.utils.KfkaProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * kafka對外接口 * * @author biws * @date 2019/12/17 **/ @RestController public class KafkaController { @Autowired KfkaProducer kfkaProducer; /** * 生產消息 * @param str * @return */ @RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET) @ResponseBody public boolean sendTopic(@RequestParam String str){ kfkaProducer.send(str); return true; } } 6、postman測試這里首先應該在服務器啟動監聽器(kafka根目錄),下面命令必須是具體的服務器ip,不能是localhost,是我踩過的坑:推薦此處重啟一下集群 關閉kafka命令:cd /usr/local/kafka/kafka_2.11-1.1.0/bin ./kafka-server-stop.sh ../config/server.properties & ./kafka-server-stop.sh ../config/server2.properties & ./kafka-server-stop.sh ../config/server3.properties & 此處應該jps看一下,等待所有的kafka都關閉(關不掉的kill掉),再重新啟動kafka:./kafka-server-start.sh ../config/server.properties & ./kafka-server-start.sh ../config/server2.properties & ./kafka-server-start.sh ../config/server3.properties & 等待kafka啟動成功后,啟動消費者監聽端口:cd /usr/local/kafka/kafka_2.11-1.1.0 bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic 曾經我亂輸的測試信息全部被監聽過來了!啟動springboot服務 然后用postman生產消息: 然后享受成果,服務器端監聽成功。 項目中也監聽成功!
上述就是小編為大家分享的如何從零開始搭建Kafka+SpringBoot分布式消息系統了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。