您好,登錄后才能下訂單哦!
本篇文章為大家展示了如何在spring boot中使用spring-kafka實現一個接收消息功能,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
實現方法
pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.linuxsogood.sync</groupId> <artifactId>linuxsogood-sync</artifactId> <version>1.0.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <!-- 依賴版本 --> <mybatis.version>3.3.1</mybatis.version> <mybatis.spring.version>1.2.4</mybatis.spring.version> <mapper.version>3.3.6</mapper.version> <pagehelper.version>4.1.1</pagehelper.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-freemarker</artifactId> </dependency> <!--<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>2.0.1.RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>4.3.1.RELEASE</version> <scope>compile</scope> </dependency>--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.0.RELEASE</version> </dependency> <!--<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>1.1.0.RELEASE</version> </dependency>--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.5.2</version> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <version>1.3</version> <scope>test</scope> </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>1.9.5</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.2.3.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.microsoft.sqlserver</groupId> <artifactId>sqljdbc4</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.11</version> </dependency> <!--Mybatis--> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>${mybatis.version}</version> </dependency> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis-spring</artifactId> <version>${mybatis.spring.version}</version> </dependency> <!--<dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.1.1</version> </dependency>--> <!-- Mybatis Generator --> <dependency> <groupId>org.mybatis.generator</groupId> <artifactId>mybatis-generator-core</artifactId> <version>1.3.2</version> <scope>compile</scope> <optional>true</optional> </dependency> <!--分頁插件--> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper</artifactId> <version>${pagehelper.version}</version> </dependency> <!--通用Mapper--> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper</artifactId> <version>${mapper.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.17</version> </dependency> </dependencies> <repositories> <repository> <id>repo.spring.io.milestone</id> <name>Spring Framework Maven Milestone Repository</name> <url>https://repo.spring.io/libs-milestone</url> </repository> </repositories> <build> <finalName>mybatis_generator</finalName> <plugins> <plugin> <groupId>org.mybatis.generator</groupId> <artifactId>mybatis-generator-maven-plugin</artifactId> <version>1.3.2</version> <configuration> <verbose>true</verbose> <overwrite>true</overwrite> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <mainClass>org.linuxsogood.sync.Starter</mainClass> </configuration> </plugin> </plugins> </build> </project>
orm層使用了MyBatis,又使用了通用Mapper和分頁插件.
kafka消費端配置
import org.linuxsogood.sync.listener.Listener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.broker.address}") private String brokerAddress; @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; } @Bean public Listener listener() { return new Listener(); } }
生產者的配置.
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.broker.address}") private String brokerAddress; @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
監聽,監聽里面,寫的就是業務邏輯了,從kafka里面得到數據后,具體怎么去處理. 如果需要開啟kafka處理消息的廣播模式,多個監聽要監聽不同的group,即方法上的注解@KafkaListener里的group一定要不一樣.如果多個監聽里的group寫的一樣,就會造成只有一個監聽能處理其中的消息,另外監聽就不能處理消息了.也即是kafka的分布式消息處理方式.
在同一個group里的監聽,共同處理接收到的消息,會根據一定的算法來處理.如果不在一個組,但是監聽的是同一個topic的話,就會形成廣播模式
import com.alibaba.fastjson.JSON; import org.linuxsogood.qilian.enums.CupMessageType; import org.linuxsogood.qilian.kafka.MessageWrapper; import org.linuxsogood.qilian.model.store.Store; import org.linuxsogood.sync.mapper.StoreMapper; import org.linuxsogood.sync.model.StoreExample; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import java.util.List; import java.util.Optional; public class Listener { private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class); @Autowired private StoreMapper storeMapper; /** * 監聽kafka消息,如果有消息則消費,同步數據到新烽火的庫 * @param record 消息實體bean */ @KafkaListener(topics = "linuxsogood-topic", group = "sync-group") public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); try { MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class); CupMessageType type = messageWrapper.getType(); //判斷消息的數據類型,不同的數據入不同的表 if (CupMessageType.STORE == type) { proceedStore(messageWrapper); } } catch (Exception e) { LOGGER.error("將接收到的消息保存到數據庫時異常, 消息:{}, 異常:{}",message.toString(),e); } } } /** * 消息是店鋪類型,店鋪消息處理入庫 * @param messageWrapper 從kafka中得到的消息 */ private void proceedStore(MessageWrapper messageWrapper) { Object data = messageWrapper.getData(); Store cupStore = JSON.parseObject(data.toString(), Store.class); StoreExample storeExample = new StoreExample(); String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName(); storeExample.createCriteria().andStoreNameEqualTo(storeName); List<org.linuxsogood.sync.model.Store> stores = storeMapper.selectByExample(storeExample); org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store(); org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore); //如果查詢不到記錄則新增 if (stores.size() == 0) { storeMapper.insert(store); } else { store.setStoreId(stores.get(0).getStoreId()); storeMapper.updateByPrimaryKey(store); } } }
上述內容就是如何在spring boot中使用spring-kafka實現一個接收消息功能,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。