您好,登錄后才能下訂單哦!
這篇文章主要介紹了Spring Boot怎么整合Kafka的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Spring Boot怎么整合Kafka文章都會有所收獲,下面我們一起來看看吧。
在 pom.xml 中添加以下依賴項:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.0</version> </dependency>
在 application.yml
文件中添加以下配置:
sping: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer
這里我們配置了 Kafka 的服務地址為 localhost:9092
,配置了一個消費者組 ID 為 my-group
,并設置了一個最早的偏移量來讀取消息。在生產者方面,我們配置了消息序列化程序為 StringSerializer
。
現在,我們將創建一個 Kafka 生產者,用于發送消息到 Kafka 服務器。在這里,我們將創建一個 RESTful 端點,用于接收 POST 請求并將消息發送到 Kafka。
首先,我們將創建一個 KafkaProducerConfig
類,用于配置 Kafka 生產者:
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
在上面的代碼中,我們使用 @Configuration
注解將 KafkaProducerConfig
類聲明為配置類。然后,我們使用 @Value
注解注入配置文件中的 bootstrap-servers
屬性。
接下來,我們創建了一個 producerConfigs
方法,用于設置 Kafka 生產者的配置。在這里,我們設置了 BOOTSTRAP_SERVERS_CONFIG
、KEY_SERIALIZER_CLASS_CONFIG
和 VALUE_SERIALIZER_CLASS_CONFIG
三個屬性。
然后,我們創建了一個 producerFactory
方法,用于創建 Kafka 生產者工廠。在這里,我們使用了 DefaultKafkaProducerFactory
類,并傳遞了我們的配置。
最后,我們創建了一個 kafkaTemplate
方法,用于創建 KafkaTemplate
實例。在這里,我們使用了剛剛創建的生產者工廠作為參數,然后返回 KafkaTemplate
實例。
接下來,我們將創建一個 RESTful 端點,用于接收 POST 請求并將消息發送到 Kafka。在這里,我們將使用 @RestController
注解創建一個 RESTful 控制器:
@RestController public class KafkaController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @PostMapping("/send") public void sendMessage(@RequestBody String message) { kafkaTemplate.send("my-topic", message); } }
在上面的代碼中,我們使用 @Autowired
注解將 KafkaTemplate
實例注入到 KafkaController
類中。然后,我們創建了一個 sendMessage
方法,用于發送消息到 Kafka。
在這里,我們使用 kafkaTemplate.send
方法發送消息到 my-topic
主題。send 方法返回一個 ListenableFuture
對象,用于異步處理結果。
現在,我們將創建一個 Kafka 消費者,用于從 Kafka 服務器接收消息。在這里,我們將創建一個消費者組,并將其配置為從 my-topic
主題讀取消息。
首先,我們將創建一個 KafkaConsumerConfig
類,用于配置 Kafka 消費者:
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
在上面的代碼中,我們使用 @Configuration
注解將 KafkaConsumerConfig
類聲明為配置類,并使用 @EnableKafka
注解啟用 Kafka。
然后,我們使用 @Value
注解注入配置文件中的 bootstrap-servers
和 consumer.group-id
屬性。
接下來,我們創建了一個 consumerConfigs
方法,用于設置 Kafka 消費者的配置。在這里,我們設置了 BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIG
、AUTO_OFFSET_RESET_CONFIG
、KEY_DESERIALIZER_CLASS_CONFIG
和 VALUE_DESERIALIZER_CLASS_CONFIG
五個屬性。
然后,我們創建了一個 consumerFactory
方法,用于創建 Kafka 消費者工廠。在這里,我們使用了 DefaultKafkaConsumerFactory
類,并傳遞了我們的配置。
最后,我們創建了一個 kafkaListenerContainerFactory
方法,用于創建一個 ConcurrentKafkaListenerContainerFactory
實例。在這里,我們將消費者工廠注入到 kafkaListenerContainerFactory
實例中。
接下來,我們將創建一個 Kafka 消費者類 KafkaConsumer
,用于監聽 my-topic
主題并接收消息:
@Service public class KafkaConsumer { @KafkaListener(topics = "my-topic", groupId = "my-group-id") public void consume(String message) { System.out.println("Received message: " + message); } }
在上面的代碼中,我們使用 @KafkaListener
注解聲明了一個消費者方法,用于接收從 my-topic
主題中讀取的消息。在這里,我們將消費者組 ID 設置為 my-group-id
。
現在,我們已經完成了 Kafka 生產者和消費者的設置。我們可以使用 mvn spring-boot:run
命令啟動應用程序,并使用 curl 命令發送 POST 請求到 http://localhost:8080/send
端點,以將消息發送到 Kafka。然后,我們可以在控制臺上查看消費者接收到的消息。這就是使用 Spring Boot 和 Kafka 的基本設置。我們可以根據需要進行更改和擴展,以滿足特定的需求。
關于“Spring Boot怎么整合Kafka”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Spring Boot怎么整合Kafka”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。