91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

向kafka集群發布記錄的kafka客戶端怎么實現

發布時間:2021-12-16 16:49:37 來源:億速云 閱讀:122 作者:iii 欄目:云計算

這篇文章主要介紹“向kafka集群發布記錄的kafka客戶端怎么實現”,在日常操作中,相信很多人在向kafka集群發布記錄的kafka客戶端怎么實現問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”向kafka集群發布記錄的kafka客戶端怎么實現”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

生產者是線程安全的,而且,多線程共享同一個producer實例通常比多個producer實例更快。

這里是一個簡單的例子,使用producer發送字符串數據,包含key和value。

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

一個producer由幾部分組成:1、一個buff poll,保存尚未發送的數據;2、一個后臺運行的I/O線程,負責執行數據發送。producer使用完畢后,務必執行close操作,否則將會造成資源的泄漏。

send()方法是異步的。當調用它時,它將記錄添加到緩沖區中,并立即返回。這使得producer能夠批量的執行數據的生產。

acks有3個可能的值,0:客戶端不必等待任何的server響應;1:leader of partition將會在把數據寫入自己的log之后,響應客戶端,而不必等待其他的follower完成同步的操作;all:leader和follower全部完成log寫入操作。服務器才會響應客戶端。相比之下,all最慢但是可靠性更好。

如果請求失敗,生產者可以自動重試,但是我們已經設置retries = 0,那么重試將不會發生。如果我們開啟了重試,可能會出現重復記錄的問題。

producer保持每個partition的未發送數據的緩沖區。這些緩沖的大小由batch.size配置指定。如果增大這個配置,可以一次性執行更大的批量操作,但需要更多的內存(因為我們通常會有一個緩沖區為每個partition)。

默認情況下,緩沖區可以立即發送,即使在緩沖區中有額外的未使用的空間。但是如果你想減少請求的數量,可以設置linger.ms > 0。producer會等待一段時間(單位是毫秒)之后在進行發送,以期獲得更大的批量操作。例如,在上面的代碼片段,設置linger.ms = 1, 可能會有100條記錄被批量發送。但是,如果在1毫秒的時間內,沒有跟多的數據到達緩沖區,那么這1毫秒的等待僅僅是增加了延遲,而沒有達到任何正面的效果。需要注意的是,如果在短時間內,大量的數據到達緩沖區,即使 linger.ms = 0 ,仍然會發生批量操作。

buffer.memory控制提供給producer的緩沖內存總量,如果該緩沖區的寫入速率長時間大于輸出速率,那么這個緩沖區將耗盡。當緩沖區耗盡后,額外的發送調用將被阻塞。阻塞一段時間之后(max.block.ms ),將會拋出一個TimeoutException。

key.serializervalue.serializer負責把record當中key和value 分別轉換為byte數組,kafka提供了一組簡單的序列化class。

到此,關于“向kafka集群發布記錄的kafka客戶端怎么實現”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

如皋市| 兴仁县| 互助| 张家界市| 百色市| 天峨县| 宜君县| 大冶市| 宝清县| 益阳市| 宁河县| 日照市| 张北县| 北安市| 枣阳市| 大城县| 静安区| 博爱县| 巴南区| 日喀则市| 体育| 隆尧县| 和林格尔县| 平湖市| 武冈市| 江西省| 奉贤区| 商都县| 永济市| 重庆市| 阿克| 祁东县| 三门县| 石河子市| 泰兴市| 晋州市| 汉源县| 独山县| 永顺县| 体育| 包头市|