您好,登錄后才能下訂單哦!
這篇“kafka消費者kafka-console-consumer接收不到數據如何解決”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“kafka消費者kafka-console-consumer接收不到數據如何解決”文章吧。
發送端
接收端
采用內置的zookeeper,發送端發送數據,接收端能夠接收數據
但是采用外置的zookeeper,發送端發送數據,接收端一直接收不到數據
先判斷主題是否一致,如果一致就在關閉kafka
./kafka-server-stop.sh ../config/server.properties
修改一下配置,確保這些配置已加上,不要用localhost,在listeners的ip地址和端口號要和消費者,生產者的的地址端口號一直
vim ../config/server.propertiesst3
最后把log.dirs后面的文件刪除或者重新換個地址
rm -rf /tmp/kafka
重新在前臺啟動kafka,注意查看打印在桌面的日志,有無報錯信息
./kafka-server-start.sh ../config/server.properties
如果沒有報錯信息,啟動正常,那么就可以在后臺啟動了
./kafka-server-start.sh -daemon ../config/server.properties
創建生產者
./kafka-console-producer.sh --broker-list 172.16.193.175:9092 --topic test3
創建消費者
./kafka-console-consumer.sh --bootstrap-server 172.16.193.175:9092 --topic test3 --from-beginning
我司現在有三臺kafka服務器作為一個集群
需求是我寫了一個監聽器去監聽活動失敗的情況,如果活動失敗則調用一個統計接口 做數據統計
我需要從失敗事件的隨路數據中取一些數據,做一些判斷.
現在我想從集群中看一下失敗事件中的隨路數據是否完整正確
于是,我xshell連接上了三臺服務器并且運行以下命令
./kafka-console-consumer.sh --bootstrap-server broker1IP:9092 --topic topicname ./kafka-console-consumer.sh --bootstrap-server broker2IP:9092 --topic topicname ./kafka-console-consumer.sh --bootstrap-server broker3IP:9092 --topic topicname
發現只要發送一個事件三個服務器都可以收到事件中的消息
怪了,為什么三臺都會顯示.
我第一反應是:這是否是傳說中的leader和follower 同步策略
我問了一下我的leader ,
leader:.....,你知道你這個命令是什么意思嗎?
這個命令就是相當于創建了一個消費者去消費了隊列中的消息!
你這個3個服務器相當于啟動了3個消費者去消費了,同一個消息三次!
我:不對啊,同一個消息不能被消費三次啊!?
leader:........,你知道什么是消費者組嗎?你這相當于三個消費者組 不信你看看
./kafka-console-consumer.sh -help ... --group <String: consumer group id> The consumer group id of the consumer. ...
kafka-console-consumer.sh腳本是一個簡易的消費者控制臺。該 shell 腳本的功能通過調用 kafka.tools 包下的 ConsoleConsumer 類,并將提供的命令行參數全部傳給該類實現。
./kafka-console-consumer.sh --bootstrap-server node:9092 --topic topicName //表示從 latest 位移位置開始消費該主題的所有分區消息,即僅消費正在寫入的消息。 bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic topicName //?表示從指定主題中有效的起始位移位置開始消費所有分區的消息。 bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --property print.key=true --topic topicName //?消費出的消息結果將打印出消息體的 key 和 value。
參數 | 值類型 | 說明 | 有效值 |
---|---|---|---|
--topic | string | 被消費的topic | |
--whitelist | string | 正則表達式,指定要包含以供使用的主題的白名單 | |
--partition | integer | 指定分區 除非指定’–offset’,否則從分區結束(latest)開始消費 | |
--offset | string | 執行消費的起始offset位置 默認值:latest | |
--from-beginning | 從存在的最早消息開始,而不是從最新消息開始 | ||
--max-messages | integer | 消費的最大數據量,若不指定,則持續消費下去 | |
--timeout-ms | integer | 在指定時間間隔內沒有消息可用時退出 | |
--bootstrap-server | string | 必需(除非使用舊版本的消費者),要連接的服務器 | |
--key-deserializer | string | ||
--value-deserializer | string | ||
--group | string | 指定消費者所屬組的ID | |
--zookeeper | string | 必需(僅當使用舊的使用者時)連接zookeeper的字符串。 可以給出多個URL以允許故障轉移 |
以上就是關于“kafka消費者kafka-console-consumer接收不到數據如何解決”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。