您好,登錄后才能下訂單哦!
今天小編給大家分享一下spring kafka @KafkaListener如何使用的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
從2.2.4版開始,您可以直接在注釋上指定Kafka使用者屬性,這些屬性將覆蓋在使用者工廠中配置的具有相同名稱的所有屬性。您不能通過這種方式指定group.id和client.id屬性。他們將被忽略;
可以使用#{…}或屬性占位符(${…})在SpEL上配置注釋上的大多數屬性。
比如:
@KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}", clientIdPrefix = "myClientId")
屬性concurrency
將會從容器中獲取listen.concurrency
的值,如果不存在就默認用3
①. 消費者線程命名規則
填寫:
2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 線程:Thread[
consumer-id5-1-C-1
,5,main]-groupId:BASE-DEMO consumer-id5 消費
沒有填寫ID:
2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 線程:Thread[
org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
,5,main] consumer-id7
②.在相同容器中的監聽器ID不能重復
否則會報錯
Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
③.會覆蓋消費者工廠的消費組GroupId
假如配置文件屬性配置了消費組kafka.consumer.group-id=BASE-DEMO
正常情況它是該容器中的默認消費組
但是如果設置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么當前消費者的消費組就是consumer-id7
;
當然如果你不想要他作為groupId的話 可以設置屬性idIsGroup = false
;那么還是會使用默認的GroupId;
④. 如果配置了屬性groupId,則其優先級最高
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")
例如上面代碼中最終這個消費者的消費組GroupId
是 “groupId-test”
該id屬性(如果存在)將用作Kafka消費者group.id屬性,并覆蓋消費者工廠中的已配置屬性(如果存在)您還可以groupId顯式設置或將其設置idIsGroup為false,以恢復使用使用者工廠的先前行為group.id。
指定該消費組的消費組名; 關于消費組名的配置可以看看上面的 id 監聽器的id
如何獲取消費者 group.id
在監聽器中調用KafkaUtils.getConsumerGroupId()
可以獲得當前的groupId; 可以在日志中打印出來; 可以知道是哪個客戶端消費的;
topics 指定要監聽哪些topic(與topicPattern、topicPartitions 三選一)
可以同時監聽多個topics = {"SHI_TOPIC3","SHI_TOPIC4"}
topicPattern 匹配Topic進行監聽(與topics、topicPartitions 三選一) topicPartitions 顯式分區分配
可以為監聽器配置明確的主題和分區(以及可選的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { ... }
上面例子意思是 監聽topic1
的0,1分區;監聽topic2
的第0分區,并且第1分區從offset為100的開始消費;
實現KafkaListenerErrorHandler
; 然后做一些異常處理;
@Component public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception) { return null; } @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { //do someting return null; } }
調用的時候 填寫beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"
指定生成監聽器的工廠類;
例如我寫一個 批量消費的工廠類
/** * 監聽器工廠 批量消費 * @return */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(kafkaConsumerFactory()); //設置為批量消費,每個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(true); return factory; }
使用containerFactory = "batchFactory"
clientIdPrefix 客戶端前綴
會覆蓋消費者工廠的
kafka.consumer.client-id
屬性; 最為前綴后面接-n
n是數字
concurrency并發數
會覆蓋消費者工廠中的concurrency ,這里的并發數就是多線程消費; 比如說單機情況下,你設置了3; 相當于就是啟動了3個客戶端來分配消費分區;分布式情況 總線程數=concurrency*機器數量; 并不是設置越多越好,具體如何設置請看Java concurrency之集合
/** * 監聽器工廠 * @return */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(kafkaConsumerFactory()); factory.setConcurrency(6); return factory; }
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)
雖然使用的工廠是concurrencyFactory
(concurrency配置了6); 但是他最終生成的監聽器數量 是1;
kafka中的屬性看org.apache.kafka.clients.consumer.ConsumerConfig
;
同名的都可以修改掉;
用法
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1" , clientIdPrefix = "myClientId5",groupId = "groupId-test", properties = { "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")
KafkaListenerEndpointRegistry
@Autowired private KafkaListenerEndpointRegistry registry; //.... 獲取所有注冊的監聽器 registry.getAllListenerContainers();
當您將Spring Boot與驗證啟動器一起使用時,將LocalValidatorFactoryBean自動配置:如下
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer { @Autowired private LocalValidatorFactoryBean validator; ... @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(this.validator); } }
使用
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler", containerFactory = "kafkaJsonListenerContainerFactory") public void validatedListener(@Payload @Valid ValidatedClass val) { ... } @Bean public KafkaListenerErrorHandler validationErrorHandler() { return (m, e) -> { ... }; }
spring-kafka官方文檔
官方文檔: https://docs.spring.io/spring-kafka/reference/html/
@KafkaListener
The @KafkaListener
annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter
configured with various features, such as converters to convert the data, if necessary, to match the method parameters.
If, say, six TopicPartition
instances are provided and the concurrency
is 3
; each container gets two partitions. For five TopicPartition
instances, two containers get two partitions, and the third gets one. If the concurrency
is greater than the number of TopicPartitions
, the concurrency
is adjusted down such that each container gets one partition.
You can now configure a KafkaListenerErrorHandler
to handle exceptions. See Handling Exceptions for more information.
By default, the @KafkaListener
id
property is now used as the group.id
property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId
on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id
values for listeners. To restore the previous behavior of using the factory configured group.id
, set the idIsGroup
property on the annotation to false
.
示例:
demo類:
public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } }</code> 配置類及注解:
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } }
以上就是“spring kafka @KafkaListener如何使用”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。