您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關spring-boot 2.1.x中怎么集成kafka,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
pom.xml配置:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>${spring-boot.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>${spring-boot.version}</version> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-undertow</artifactId> <version>${spring-boot.version}</version> </dependency> <!--- 以下是調試后的配置 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.10.0</version> </dependency>
yml配置:
spring: kafka: # 以逗號分隔的地址列表,用于建立與 Kafka 集群的初始連接 (kafka 默認的端口號為 9092) bootstrap-servers: 127.0.0.1:9092 producer: # 發生錯誤后,消息重發的次數。 retries: 0 #當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。 batch-size: 16384 # 設置生產者內存緩沖區的大小。 buffer-memory: 33554432 # 鍵的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer # acks=0 : 生產者在成功寫入消息之前不會等待任何來自服務器的響應。 # acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自服務器成功響應。 # acks=all :只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。 acks: 1 consumer: # 自動提交的時間間隔 在 spring boot 2.X 版本中這里采用的是值的類型為 Duration 需要符合特定的格式,如 1S,1M,2H,5D auto-commit-interval: 1S # 該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理: # latest(默認值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄) # earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄 auto-offset-reset: earliest # 是否自動提交偏移量,默認值是 true,為了避免出現重復數據和數據丟失,可以把它設置為 false,然后手動提交偏移量 enable-auto-commit: true # 鍵的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: # 在偵聽器容器中運行的線程數。 concurrency: 5
看完上述內容,你們對spring-boot 2.1.x中怎么集成kafka有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。