91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka Producer 攔截器

發布時間:2020-06-16 18:06:48 來源:網絡 閱讀:1630 作者:Java_老男孩 欄目:編程語言

Kafka中的攔截器(Interceptor)是0.10.x.x版本引入的一個功能,一共有兩種:Kafka Producer端的攔截器和Kafka Consumer端的攔截器。本篇主要講述的是Kafka Producer端的攔截器,它主要用來對消息進行攔截或者修改,也可以用于Producer的Callback回調之前進行相應的預處理。

使用Kafka Producer端的攔截器非常簡單,主要是實現ProducerInterceptor接口,此接口包含4個方法:

    1. ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):Producer在將消息序列化和分配分區之前會調用攔截器的這個方法來對消息進行相應的操作。一般來說最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需確保對其有準確的判斷,否則會與預想的效果出現偏差。比如修改key不僅會影響分區的計算,同樣也會影響Broker端日志壓縮(Log Compaction)的功能。
    1. void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被應答(Acknowledgement)之前或者消息發送失敗時調用,優先于用戶設定的Callback之前執行。這個方法運行在Producer的IO線程中,所以這個方法里實現的代碼邏輯越簡單越好,否則會影響消息的發送速率。
    1. void close():關閉當前的攔截器,此方法主要用于執行一些資源的清理工作。
    1. configure(Map<String, ?> configs):用來初始化此類的方法,這個是ProducerInterceptor接口的父接口Configurable中的方法。

一般情況下只需要關注并實現onSend或者onAcknowledgement方法即可。下面我們來舉個案例,通過onSend方法來過濾消息體為空的消息以及通過onAcknowledgement方法來計算發送消息的成功率。

public class ProducerInterceptorDemo implements ProducerInterceptor<String,String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        if(record.value().length()<=0)
            return null;
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            sendSuccess++;
        } else {
            sendFailure ++;
        }
    }

    @Override
    public void close() {
        double succe***atio = (double)sendSuccess / (sendFailure + sendSuccess);
        System.out.println("[INFO] 發送成功率="+String.format("%f", succe***atio * 100)+"%");
    }

    @Override
    public void configure(Map<String, ?> configs) {}
}

自定義的ProducerInterceptorDemo類實現之后就可以在Kafka Producer的主程序中指定,示例代碼如下:

public class ProducerMain {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "hidden-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");

        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        for(int i=0;i<100;i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "msg-" + i);
            producer.send(producerRecord).get();
        }
        producer.close();
    }
}

Kafka Producer不僅可以指定一個攔截器,還可以指定多個攔截器以形成攔截鏈,這個攔截鏈會按照其中的攔截器的加入順序一一執行。比如上面的程序多添加一個攔截器,示例如下:

properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");1

這樣Kafka Producer會先執行攔截器ProducerInterceptorDemo,之后再執行ProducerInterceptorDemoPlus。

有關interceptor.classes參數,在kafka 1.0.0版本中的定義如下:

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
interceptor.calssses A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors. list null low

本文的重點是你有沒有收獲與成長,其余的都不重要,希望讀者們能謹記這一點。同時我經過多年的收藏目前也算收集到了一套完整的學習資料,包括但不限于:分布式架構、高可擴展、高性能、高并發、Jvm性能調優、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個知識點高級進階干貨,希望對想成為架構師的朋友有一定的參考和幫助

需要更詳細思維導圖和以下資料的可以加一下技術交流分享群:“708 701 457”免費獲取

Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

比如县| 永福县| 通化县| 新和县| 中超| 且末县| 金堂县| 莱阳市| 神池县| 吉隆县| 封丘县| 邯郸县| 尉氏县| 三原县| 新干县| 彝良县| 赤峰市| 南汇区| 田东县| 科尔| 江城| 南宫市| 盘山县| 闽清县| 祁门县| 桦南县| 图片| 麦盖提县| 宁安市| 和政县| 老河口市| 获嘉县| 嘉定区| 集贤县| 新晃| 思南县| 嘉鱼县| 宜兰市| 岳池县| 岱山县| 申扎县|