您好,登錄后才能下訂單哦!
Flume+Kafka整合
準備5臺內網服務器創建Zookeeper和Kafka集群
服務器地址:
192.168.2.240
192.168.2.241
192.168.2.242
192.168.2.243
192.168.2.244
服務器系統:Centos 6.5 64位
下載安裝包
Zookeeper:http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
Flume:http://apache.fayea.com/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
Kafka:http://apache.fayea.com/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
Zookeeper,Flume,kafka需要用到Java環境,所以先安裝JDk
yum install java-1.7.0-openjdk-devel
選擇3臺服務器作為zookeeper集群,他們的IP分別為:
192.168.2.240
192.168.2.241
192.168.2.242
注:先在第一臺服務器192.168.2.240上分別執行(1)-(3)步。
(1)解壓:將zookeeper-3.4.6.tar.gz放入/opt目錄下
tar zxf zookeeper-3.4.6.tar.gz
(2)創建配置文件:將conf/zoo_sample.cfg拷貝一份命名為zoo.cfg,也放在conf目錄下。然后按照如下值修改其中的配置:
tickTime=2000
dataDir=/opt/zookeeper/Data
initLimit=5
syncLimit=2
clientPort=2181
server.1=192.168.2.240:2888:3888
server.2=192.168.2.241:2888:3888
server.3=192.168.2.242:2888:3888
各個參數的意義:
tickTime:心跳檢測的時間間隔(毫秒),缺省:2000
clientPort:其他應用(比如solr)訪問ZooKeeper的端口,缺省:2181
initLimit:初次同步的階段(followers連接到leader的階段),允許的時長(tick數量),缺省:10
syncLimit:允許followers同步到ZooKeeper的時長(tick數量),缺省:5
dataDir:數據(比如所管理的配置文件)的存放路徑
server.X:X是集群中一個服務器的id,與myid文件中的id是一致的。右邊可以配置兩個端口,第一個端口用于Fllower和Leader之間的數據同步和其它通信,第二個端口用于Leader選舉過程中投票通信。
(3)創建/opt/zookeeper/Data快照目錄,并創建my id文件,里面寫入1。
mkdir /opt/zookeeper/Data vi /opt/zookeeper/Data/myid 1
(4)將192.168.2.240上已經配置好的/opt/zookeeper/目錄分別拷貝至192.168.2.241和192.168.2.242。然后將對應的myid的內容修改為2和3
(5)啟動zookeeper集群
分別在3臺服務器上執行啟動命令
/opt/zookeeper/bin/zkServer.sh start
一共5臺服務器,服務器IP地址:
192.168.2.240 node1
192.168.2.241 node2
192.168.2.242 node3
192.168.2.243 node4
192.168.2.244 node5
1、解壓安裝文件到/opt/目錄
cd /opt tar -zxvf kafka_2.10-0.10.0.0.tar.gz mv kafka_2.10-0.10.0.0 kafka
2、修改server. properties文件
#node1 配置
broker.id=0
port=9092
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9092
advertised.host.name=58.246.xx.xx
#碰到的坑,由于我是從線上把nginx日志拉回公司本地服務器,所以這兩選項必須配置成路由器外網IP地址,否則線上flume報無法連接kafka節點,報無法傳送日志消息
advertised.port=9092
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node2 配置
broker.id=1
port=9093
advertised.listeners=PLAINTEXT://58.246.xx.xx:9093
advertised.host.name=58.246.xx.xx
advertised.port=9093
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node3 配置
broker.id=2
port=9094
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9094
advertised.host.name=58.246.xx.xx
advertised.port=9094
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node4 配置
broker.id=2
port=9095
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9095
advertised.host.name=58.246.xx.xx
advertised.port=9095
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node5 配置
broker.id=2
port=9096
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9096
advertised.host.name=58.246.xx.xx
advertised.port=9096
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
啟動卡夫卡集群
分別在所有節點執行以下命令來啟動服務
/opt/kafka/bin/kafka-server-start.sh/opt/kafka/config/server.properties &
安裝兩臺flume,一臺安裝在線上,把線上的日志傳回本地kafka,另一臺安裝在本地,把kafka集群的日志信息轉存到HDFS
收集nginx日志傳給公司內部kafka
1、 解壓安裝包
cd /opt
tar –zxvf apache-flume-1.7.0-bin.tar.gz
2、 創建配置文件
Vi flume-conf.properties 添加以下內容
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F/unilifeData/logs/nginx/access.log
a1.sources.r1.channels = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000
#sinks
a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = unilife_nginx_production
a1.sinks.k1.kafka.bootstrap.servers = 58.246.xx.xx:9092,58.246.xx.xx:9093,58.246.xx.xx:9094
a1.sinks.k1.brokerList = 58.246.xx.xx:9092,58.246.xx.xx:9093,58.246.xx.xx:9094
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.flumeBatchSize = 2000
a1.sinks.k1.channel = c1
啟動flume服務
/opt/flume/bin/flume-ng agent --conf /opt/flume/conf/--conf-file /opt/flume/conf/flume-conf.properties --name a1-Dflume.root.logger=INFO,LOGFILE &
轉存日志到HDFS
1、解壓安裝包
cd /opt
tar –zxvf apache-flume-1.7.0-bin.tar.gz
3、 創建配置文件
nginx.sources = source1
nginx.channels = channel1
nginx.sinks = sink1
nginx.sources.source1.type =org.apache.flume.source.kafka.KafkaSource
nginx.sources.source1.zookeeperConnect =master:2181,slave1:2181,slave2:2181
nginx.sources.source1.topic =unilife_nginx_production
nginx.sources.source1.groupId =flume_unilife_nginx_production
nginx.sources.source1.channels = channel1
nginx.sources.source1.interceptors = i1
nginx.sources.source1.interceptors.i1.type =timestamp
nginx.sources.source1.kafka.consumer.timeout.ms = 100
nginx.channels.channel1.type = memory
nginx.channels.channel1.capacity = 10000000
nginx.channels.channel1.transactionCapacity = 1000
nginx.sinks.sink1.type = hdfs
nginx.sinks.sink1.hdfs.path =hdfs://192.168.2.240:8020/user/hive/warehouse/nginx_log
nginx.sinks.sink1.hdfs.writeFormat=Text
nginx.sinks.sink1.hdfs.inUsePrefix=_
nginx.sinks.sink1.hdfs.rollInterval = 3600
nginx.sinks.sink1.hdfs.rollSize = 0
nginx.sinks.sink1.hdfs.rollCount = 0
nginx.sinks.sink1.hdfs.fileType = DataStream
nginx.sinks.sink1.hdfs.minBlockReplicas=1
nginx.sinks.sink1.channel = channel1
啟動服務
/opt/flume/bin/flume-ng agent --conf /opt/flume/conf/--conf-file /opt/flume/conf/flume-nginx-log.properties --name nginx-Dflume.root.logger=INFO,LOGFILE &
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。