您好,登錄后才能下訂單哦!
本篇內容主要講解“kafka序列化器和攔截器怎么自定義使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“kafka序列化器和攔截器怎么自定義使用”吧!
序列化器是和數據在網絡中的傳輸有關,數據在網絡中的傳輸為字節流,所以生產者在發送時需要將其序列化為字節流,消費者收到消息時,需要將字節流反序列化為我們能夠識別的對象,我們不難看出,這就是RPC通信,kafka中實現了很多自定義協議,我們知道,在RPC通信中,只有生產者和消費者的協議一樣,才能相互傳輸和解析數據,在使用HTTP時,我們就不用去關注協議本身,因為HTTP是TCP的上層建筑,它自己實現了一套協議,我們不用去關注,但是使用RPC,我們是面向TCP編程,所以自然得約定和實現自己的協議,而序列化就是這過程中很重要的一部分。
攔截器是一個隨處可見的詞,基本上很多框架中都有攔截器機制,它的作用主要是對請求進行攔截,我們可以對請求進行過濾和處理,以達到業務目的,比如Spring中有HandlerInterceptor
攔截器,在kafka種也有攔截器,我們可以自定義攔截器,對消息進行攔截,比如某些異常消息我們不需要發送,那么就將其攔截下來。
數據在網絡中傳輸是以字節流的形式進行傳輸,在生產者端發送消息需要先進行序列化,消費者端進行反序列化,序列化的方式有很多,比如jdk,json,protobuf,kryo,hessian,avro等等,在大數據量的傳輸中,序列化和反序列化的效率對吞吐量有一定的影響,kafka提供了許多序列化和反序列化器,如StringDeserializer
和StringSerializer
,如果我們需要自定義一個序列化和反序列化器,那么實現Serializer
,Deserializer
接口即可。
如下,kafka生產者在發送消息到broker之前需要序列化,消費者從broker獲取消息后需要反序列化。
生產者端設置序列化
//序列化 props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName());
消費者端設置反序列化
props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName());
/** * 功能說明: JSON序列化 * <p> * Original @Author: steakliu , 2022-11-02 15:14 */ public class JsonSerializer<T> implements Serializer<T> { @Override public byte[] serialize(String topic, T obj) { try { return obj == null ? null : JSON.toJSONBytes(obj); }catch (Exception e){ throw new SerializationException("json serializing exception"); } } }
/** * 功能說明:JSON反序列化 * <p> * Original @Author: steakliu-劉牌, 2022-11-11 09:38 */ public class JsonDeserializer<T> implements Deserializer<T> { @Override public T deserialize(String topic, byte[] data) { return (T) JSON.parse(data); } }
如上簡單的使用fastjson作為序列化和反序列化工具,演示了自定義kafka的序列化和反序列化機制,我們可以根據實際情況來設計不同的序列化反序列化機制,當然,不會是像上面這些簡單,如果使用spring,那么spring提供了JSON序列化和反序列化器直接使用。
雖然我們可以自定義序列化和反序列化器,但是自定義序列化和反序列化器在使用上也要保持一些一致,也就是說生產者和消費者要保持使用一種類型的序列化機制,不然會出現消息轉換問題,如果我們以kafka的方式向別人提供服務,那么他們就需要使用我們的制定的序列化方式,所以這可能就存在一定的耦合,如果使用Kafka的String序列化和反序列化機制,因為是它是默認方式并且是字符串,通用性比較好,所以就不用去考慮序列化和反序列化,直接拿到字符串轉為對象,再進行業務處理,使用自定義序列化的話,就直接拿到序列化后的對象,不用進行字符串轉對象操作。
在實際場景中,我們可以根據自己的業務來使用何種序列化方式,沒有最好的,只有合適的。
kafka中消費者和生產者都有攔截器,分別為ConsumerInterceptor
和ProducerInterceptor
,只需實現它們即可實現攔截,加入攔截器后,生產者會在發送消息之前對消息進行攔截處理,消費者在收到消息之前也會經過攔截器,那么我們就可以在攔截器中加入一些自己需要的邏輯。
如下消費者攔截器對消息進行攔截,如果有異常消息,則對異常消息進行處理,只要需要對消息進行處理,監控等,都可以使用攔截器。
/** * 功能說明: 消費者攔截器 * <p> * Original @Author: steakliu-劉牌, 2023-03-15 10:17 */ public class MyConsumerInterceptor implements ConsumerInterceptor<String, Message> { @Override public ConsumerRecords<String, Message> onConsume(ConsumerRecords<String, Message> records) { long currentTimeMillis = System.currentTimeMillis(); records.forEach(record -> { if ("消息異常".equals(record.value().getMessageText())) { //處理異常消息 this.handleMsg(record); } }); return records; } private void handleMsg(ConsumerRecord<String, Message> record) { //處理異常消息 } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {} @Override public void close() {} @Override public void configure(Map<String, ?> configs) { } }
攔截器可以有多個,如果設置多個攔截器,那么就形成一個攔截器鏈,一個一個地執行。
下面是使用spring-kafka時所配置的攔截器和序列化器的基本配置。
spring: kafka: bootstrap-servers: 127.0.0.1:9092 consumer: # 反序列化器 key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: # 攔截器 interceptor: classes: com.steakliu.kafka.interceptor.MyConsumerInterceptor spring: json: trusted: packages: '*' producer: key-serializer: org.springframework.kafka.support.serializer.JsonSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: # 攔截器 interceptor: classes: com.steakliu.kafka.interceptor.MyProducerInterceptor,com.steakliu.kafka.interceptor.MyProducerInterceptor2
到此,相信大家對“kafka序列化器和攔截器怎么自定義使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。