您好,登錄后才能下訂單哦!
Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對于像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的并行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
tar xvf jdk1.8.0_231.tar.gz -C /usr/local && cd /usr/local
ln -sv jdk1.8.0_231 jdk
vim /etc/profile.d/java.sh
JAVA_HOME=/usr/local/jdk
PATH=$JAVA_HOME/bin:$PATH
vim /usr/local/kafka/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
maxClientCnxns=0
# 集群版的zookeeper添加如下配置
# server.1=ip1:2888:3888
# server.2=ip2:2888:3888
# server.3=ip3:28888:3888
wget https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
tar xvf kafka_2.11-0.10.2.1.tgz -C /usr/local && cd /usr/local
ln -sv kafka_2.11-0.10.2.1.tgz kafka
vim /usr/local/kafka/bin/kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
/usr/local/kafka/bin/zookeeper-server-start.sh -deamon /usr/local/kafka/conf/zookeeper.properties
/usr/local/kafka/bin/kafka-server-start.sh -deamon /usr/local/kafka/conf/server.properties
/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/conf/server.properties
/usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/conf/zookeeper.properties
/usr/local/zookeeper/bin/zkServer.sh stop|stop
需要一個解析到內網ip地址的域名,內網環境也可以設置/etc/hosts
host.name=kafka.test.com(對應的域名解析需要解到內網ip)
高版本已棄用。低版本0.10.2.1可以用, 僅當listeners屬性未配置時被使用,已用listeners屬性代替。表示broker的hostname
advertised.listeners=PLAINTEXT://kafka.test.com:9092(高版本用,替代host.name,設置了advertised.listeners不用設置host.name)
注冊到zookeeper上并提供給客戶端的監聽器,如果沒有配置則使用listeners。
advertised.host.name(不需要設置,僅作參考)
已棄用。僅當advertised.listeners或者listeners屬性未配置時被使用。官網建議使用advertised.listeners
listeners(不需要設置,僅作參考)
需要監聽的URL和協議,如:PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093。如果未指定該配置,則使用java.net.InetAddress.getCanonicalHostName()函數的的返回值
[內網ip] kafka.test.com
[外網ip] kafka.test.com
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list IP:9092 --topic TOPIC
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server IP:9092 --topic TOPIC--from-beginning --max-messages 1
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 外網IP:9092 --topic TOPIC --from-beginning --max-messages 1
output {
stdout { codec => rubydebug { metadata => true } }
}
a、topics_pattern 通配問題".* ","."一定不能少
topics_pattern=>"prefix-.*"
b、filter中匹配規則,注意要能匹配到kafka中topic,不同的filebeat和不同的logstash版本對應的topic元數據可能不太一樣,這點需要注意
if [type] =~ "prefix-*" {
grok { match =>["[type]","^prefix-(?<index_name>)"] }
}
if [kafka][topic] =~ "prefix-*" {
grok { match => [ "[kafka][topic]", "^prefix-(?<index_name>.*$)" ]}
}
if [@metadata][topic] =~ "prefix-*" {
grok { match =>["[@metadata][topic]","^prefix-(?<index_name>)"] }
}
if [@metadata][kafka][topic] =~ "prefix-*" {
grok { match => [ "[@metadata][kafka][topic]", "^prefix-(?<index_name>.*$)" ]}
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。