您好,登錄后才能下訂單哦!
前言:通常情況下,我們將Kafka的日志數據通過logstash訂閱輸出到ES,然后用Kibana來做可視化分析,這就是我們通常用的ELK日志分析模式。但是基于ELK的日志分析,通常比較常用的是實時分析,日志存個十天半個月都會刪掉。那么在一些情況下,我需要將日志數據也存一份到我HDFS,積累到比較久的時間做半年、一年甚至更長時間的大數據分析。下面就來說如何最簡單的通過logstash將kafka中的數據訂閱一份到hdfs。
一:安裝logstash(下載tar包安裝也行,我直接yum裝了)
#yum install logstash-2.1.1
二:從github上克隆代碼
#git clone https://github.com/heqin5136/logstash-output-webhdfs-discontinued.git #ls logstash-output-webhdfs-discontinued
三:安裝logstash-output-webhdfs插件
#cd logstash-output-webhdfs-discontinued logstash的bin目錄下有個plugin,使用plugin來安裝插件 #/opt/logstash/bin/plugin install logstash-output-webhdfs
四:配置logstash
#vim /etc/logstash/conf.d/logstash.conf input { kafka { zk_connect => '10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181' #kafka的zk集群地址 group_id => 'hdfs' #消費者組,不要和ELK上的消費者一樣 topic_id => 'apiAppWebCms-topic' #topic consumer_id => 'logstash-consumer-10.10.8.8' #消費者id,自定義,我寫本機ip。 consumer_threads => 1 queue_size => 200 codec => 'json' } } output { #如果你一個topic中會有好幾種日志,可以提取出來分開存儲在hdfs上。 if [type] == "apiNginxLog" { webhdfs { workers => 2 host => "10.10.8.1" #hdfs的namenode地址 port => 50070 #webhdfs端口 user => "hdfs" #hdfs運行的用戶啊,以這個用戶的權限去寫hdfs。 path => "/data/logstash/apiNginxLog-%{+YYYY}-%{+MM}-%{+dd}/logstash-%{+HH}.log #按天建目錄,按小時建log文件。 flush_size => 500 # compression => "snappy" #壓縮格式,可以不壓縮 idle_flush_time => 10 retry_interval => 0.5 } } if [type] == "apiAppLog" { webhdfs { workers => 2 host => "10.64.8.1" port => 50070 user => "hdfs" path => "/data/logstash/api/apiAppLog-%{+YYYY}-%{+MM}-%{+dd}.log" flush_size => 500 # compression => "snappy" idle_flush_time => 10 retry_interval => 0.5 } } stdout { codec => rubydebug } }
五:啟動logstash
#/etc/init.d/logstash start
已經可以成功寫入了。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。