91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka 消息序列化和反序列化(下)

發布時間:2020-07-23 22:05:36 來源:網絡 閱讀:630 作者:Java_老男孩 欄目:編程語言

有序列化就會有反序列化,反序列化的操作是在Kafka Consumer中完成的,使用起來只需要配置一下key.deserializer和value.deseriaizer。對應上面自定義的Company類型的Deserializer就需要實現org.apache.kafka.common.serialization.Deserializer接口,這個接口同樣有三個方法:

public void configure(Map<String, ?> configs, boolean isKey):用來配置當前類。
public byte[] serialize(String topic, T data):用來執行反序列化。如果data為null建議處理的時候直接返回null而不是拋出一個異常。
public void close():用來關閉當前序列化器。
下面就來看一下DemoSerializer對應的反序列化的DemoDeserializer,詳細代碼如下:

public class DemoDeserializer implements Deserializer<Company> {
    public void configure(Map<String, ?> configs, boolean isKey) {}
    public Company deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        if (data.length < 8) {
            throw new SerializationException("Size of data received by DemoDeserializer is shorter than expected!");
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen, addressLen;
        String name, address;
        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();
        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressLen);
        try {
            name = new String(nameBytes, "UTF-8");
            address = new String(addressBytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error occur when deserializing!");
        }
        return new Company(name,address);
    }
    public void close() {}
}

有些讀者可能對新版的Consumer不是很熟悉,這里順帶著舉一個完整的消費示例,并以DemoDeserializer作為消息Value的反序列化器。

Properties properties = new Properties();
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", consumerGroup);
properties.put("session.timeout.ms", 10000);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "com.hidden.client.DemoDeserializer");
properties.put("client.id", "hidden-consumer-client-id-zzh-2");
KafkaConsumer<String, Company> consumer = new KafkaConsumer<String, Company>(properties);
consumer.subscribe(Arrays.asList(topic));
try {
    while (true) {
        ConsumerRecords<String, Company> records = consumer.poll(100);
        for (ConsumerRecord<String, Company> record : records) {
            String info = String.format("topic=%s, partition=%s, offset=%d, consumer=%s, country=%s",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
            System.out.println(info);
        }
        consumer.commitAsync(new OffsetCommitCallback() {
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception != null) {
                    String error = String.format("Commit failed for offsets {}", offsets, exception);
                    System.out.println(error);
                }
            }
        });
    }
} finally {
    consumer.close();
}

有些時候自定義的類型還可以和Avro、ProtoBuf等聯合使用,而且這樣更加的方便快捷,比如我們將前面Company的Serializer和Deserializer用Protostuff包裝一下,由于篇幅限制,筆者這里只羅列出對應的serialize和deserialize方法,詳細參考如下:

public byte[] serialize(String topic, Company data) {
    if (data == null) {
        return null;
    }
    Schema schema = (Schema) RuntimeSchema.getSchema(data.getClass());
    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    byte[] protostuff = null;
    try {
        protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        buffer.clear();
    }
    return protostuff;
}

public Company deserialize(String topic, byte[] data) {
    if (data == null) {
        return null;
    }
    Schema schema = RuntimeSchema.getSchema(Company.class);
    Company ans = new Company();
    ProtostuffIOUtil.mergeFrom(data, ans, schema);
    return ans;
}

如果Company的字段很多,我們使用Protostuff進一步封裝一下的方式就顯得簡潔很多。不過這個不是最主要的,而最主要的是經過Protostuff包裝之后,這個Serializer和Deserializer可以向前兼容(新加字段采用默認值)和向后兼容(忽略新加字段),這個特性Avro和Protobuf也都具備。

自定義的類型有一個不得不面對的問題就是Kafka Producer和Kafka Consumer之間的序列化和反序列化的兼容性,試想對于StringSerializer來說,Kafka Consumer可以順其自然的采用StringDeserializer,不過對于Company這種專用類型,某個服務使用DemoSerializer進行了序列化之后,那么下游的消費者服務必須也要實現對應的DemoDeserializer。再者,如果上游的Company類型改變,下游也需要跟著重新實現一個新的DemoSerializer,這個后面所面臨的難題可想而知。所以,如無特殊需要,筆者不建議使用自定義的序列化和反序列化器;如有業務需要,也要使用通用的Avro、Protobuf、Protostuff等序列化工具包裝,盡可能的實現得更加通用且向前后兼容。

題外話,對于Kafka的“深耕者”Confluent來說,還有其自身的一套序列化和反序列化解決方案(io.confluent.kafka.serializer.KafkaAvroSerializer),GitHub上有相關資料,讀者如有興趣可以自行擴展學習。


本文的重點是你有沒有收獲與成長,其余的都不重要,希望讀者們能謹記這一點。同時我經過多年的收藏目前也算收集到了一套完整的學習資料,包括但不限于:分布式架構、高可擴展、高性能、高并發、Jvm性能調優、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個知識點高級進階干貨,希望對想成為架構師的朋友有一定的參考和幫助

需要更詳細思維導圖和以下資料的可以加一下技術交流分享群:“708 701 457”免費獲取

Kafka 消息序列化和反序列化(下)
Kafka 消息序列化和反序列化(下)
Kafka 消息序列化和反序列化(下)
Kafka 消息序列化和反序列化(下)

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

泽普县| 竹山县| 浠水县| 石林| 永寿县| 濉溪县| 湖南省| 海晏县| 岳池县| 郎溪县| 阳曲县| 三亚市| 涪陵区| 林甸县| 金山区| 蒲江县| 扬中市| 阳春市| 平顺县| 龙山县| 新竹县| 汤原县| 隆安县| 屏边| 江华| 长治市| 郧西县| 沧源| 淮阳县| 翁牛特旗| 阿勒泰市| 龙井市| 华亭县| 青海省| 郁南县| 内江市| 太白县| 佳木斯市| 盐池县| 宝清县| 惠州市|