您好,登錄后才能下訂單哦!
如何實現Kafka精確傳遞一次語義,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
我們都知道Kafka的吞吐量很大,但是Kafka究竟會不會丟失消息呢?又會不會重復消費消息呢?
有很多公司因為業務要求必須保證消息不丟失、不重復的到達,比如無人機實時監控系統,當無人機闖入機場區域,我們必須立刻報警,不允許消息丟失。而無人機離開禁飛區域后我們需要將及時報警解除。如果消息重復了呢,我們是否需要復雜的邏輯來自己處理消息重復的情況呢,這種情況恐怕相當復雜而難以處理。但是如果我們能保證消息exactly once,那么一切都容易得多。
下面我們來簡單了解一下消息傳遞語義,以及kafka的消息傳遞機制。
首先我們要了解的是message delivery semantic 也就是消息傳遞語義。
這是一個通用的概念,也就是消息傳遞過程中消息傳遞的保證性。
分為三種:
最多一次(at most once): 消息可能丟失也可能被處理,但最多只會被處理一次。
可能丟失 不會重復
至少一次(at least once): 消息不會丟失,但可能被處理多次。
可能重復 不會丟失
精確傳遞一次(exactly once): 消息被處理且只會被處理一次。
不丟失 不重復 就一次
而kafka其實有兩次消息傳遞,一次生產者發送消息給kafka,一次消費者去kafka消費消息。
兩次傳遞都會影響最終結果,
兩次都是精確一次,最終結果才是精確一次。
兩次中有一次會丟失消息,或者有一次會重復,那么最終的結果就是可能丟失或者重復的。
這是producer端的代碼:
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for (int i = 1; i <= 600; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
System.out.println("testkafka"+i);
}
kafkaProducer.close();
其中指定了一個參數acks 可以有三個值選擇:
0:producer完全不管broker的處理結果 回調也就沒有用了 并不能保證消息成功發送 但是這種吞吐量最高
all或者-1:leader broker會等消息寫入 并且ISR都寫入后 才會響應,這種只要ISR有副本存活就肯定不會丟失,但吞吐量最低。
1:默認的值 leader broker自己寫入后就響應,不會等待ISR其他的副本寫入,只要leader broker存活就不會丟失,即保證了不丟失,也保證了吞吐量。
所以設置為0時,實現了at most once,而且從這邊看只要保證集群穩定的情況下,不設置為0,消息不會丟失。
但是還有一種情況就是消息成功寫入,而這個時候由于網絡問題producer沒有收到寫入成功的響應,producer就會開啟重試的操作,直到網絡恢復,消息就發送了多次。這就是at least once了。
kafka producer 的參數acks 的默認值為1,所以默認的producer級別是at least once。并不能exactly once。
consumer是靠offset保證消息傳遞的。
consumer消費的代碼如下:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
try{
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}finally{
consumer.close();
}
其中有一個參數是 enable.auto.commit
若設置為true consumer在消費之前提交位移 就實現了at most once
若是消費后提交 就實現了 at least once 默認的配置就是這個。
kafka consumer的參數enable.auto.commit的默認值為true ,所以默認的consumer級別是at least once。也并不能exactly once。
通過了解producer端與consumer端的設置,我們發現kafka在兩端的默認配置都是at least once,肯能重復,通過配置的話呢也不能做到exactly once,好像kafka的消息一定會丟失或者重復的,是不是沒有辦法做到exactly once了呢?
確實在kafka 0.11.0.0版本之前producer端確實是不可能的,但是在kafka 0.11.0.0版本之后,kafka正式推出了idempotent producer。
也就是冪等的producer還有對事務的支持。
kafka 0.11.0.0版本引入了idempotent producer機制,在這個機制中同一消息可能被producer發送多次,但是在broker端只會寫入一次,他為每一條消息編號去重,而且對kafka開銷影響不大。
如何設置開啟呢? 需要設置producer端的新參數 enable.idempotent 為true。
而多分區的情況,我們需要保證原子性的寫入多個分區,即寫入到多個分區的消息要么全部成功,要么全部回滾。
這時候就需要使用事務,在producer端設置 transcational.id為一個指定字符串。
這樣冪等producer只能保證單分區上無重復消息;事務可以保證多分區寫入消息的完整性。
這樣producer端實現了exactly once,那么consumer端呢?
consumer端由于可能無法消費事務中所有消息,并且消息可能被刪除,所以事務并不能解決consumer端exactly once的問題,我們可能還是需要自己處理這方面的邏輯。比如自己管理offset的提交,不要自動提交,也是可以實現exactly once的。
還有一個選擇就是使用kafka自己的流處理引擎,也就是Kafka Streams,
設置processing.guarantee=exactly_once,就可以輕松實現exactly once了。
看完上述內容,你們掌握如何實現Kafka精確傳遞一次語義的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。