您好,登錄后才能下訂單哦!
機器列表:
192.168.137.115 slave0 (agent) 192.168.137.116 slave1 (agent) 192.168.137.117 slave2 (agent) 192.168.137.118 slave3 (collector) 192.168.137.119 slave4 (collector)
在每個機器上創建目錄
mkdir -p /home/qun/data/flume/logs
mkdir -p /home/qun/data/flume/data
mkdir -p /home/qun/data/flume/checkpoint
下載flume最新的包:
wget tar -zxvf apache-flume-1.6.0-bin.tar.gz
在slave3,slave4配置collectors
touch $FLUME_HOME/conf/server.conf
內容如下
a1.sources = r1 a1.channels = c1 a1.sinks = k1 #set channel a1.channels.c1.type = file a1.channels.c1.checkpointDir=/home/qun/data/flume/checkpoint a1.channels.c1.dataDirs=/home/qun/data/flume/data # other node,nna to nns a1.sources.r1.type = avro a1.sources.r1.bind = slave3 a1.sources.r1.port = 52020 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = Collector a1.sources.r1.interceptors.i1.value = SLAVE3 a1.sources.r1.channels = c1 #set sink to kafka a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic=mytopic a1.sinks.k1.brokerList=kafkahost:9092 a1.sinks.k1.requiredAcks=1 a1.sinks.k1.batchSize=100 a1.sinks.k1.channel=c1
在slave0,slave1,slave2配置agent
touch $FLUME_HOME/conf/client.conf
內容如下
agent1.channels = c1 agent1.sources = r1 agent1.sinks = k1 k2 #set gruop agent1.sinkgroups = g1 #set channel agent1.channels.c1.type = file agent1.channels.c1.checkpointDir=/home/qun/data/flume/checkpoint agent1.channels.c1.dataDirs=/home/qun/data/flume/data agent1.sources.r1.channels = c1 agent1.sources.r1.type = spooldir agent1.sources.r1.spoolDir=/home/qun/data/flume/logs agent1.sources.r1.fileHeader = false agent1.sources.r1.interceptors = i1 i2 agent1.sources.r1.interceptors.i1.type = static agent1.sources.r1.interceptors.i1.key = Type agent1.sources.r1.interceptors.i1.value = LOGIN agent1.sources.r1.interceptors.i2.type = timestamp # set sink1 agent1.sinks.k1.channel = c1 agent1.sinks.k1.type = avro agent1.sinks.k1.hostname = slave3 agent1.sinks.k1.port = 52020 # set sink2 agent1.sinks.k2.channel = c1 agent1.sinks.k2.type = avro agent1.sinks.k2.hostname = slave4 agent1.sinks.k2.port = 52020 #set sink group agent1.sinkgroups.g1.sinks = k1 k2 #set failover agent1.sinkgroups.g1.processor.type = failover agent1.sinkgroups.g1.processor.priority.k1 = 10 agent1.sinkgroups.g1.processor.priority.k2 = 1 agent1.sinkgroups.g1.processor.maxpenalty = 10000
在slave3,slave4上啟動collecters
flume-ng agent -n a1 -c conf -f /home/qun/apache-flume-1.6.0-bin/conf/server.conf -Dflume.root.logger=DEBUG,console
在slave0,slave1,slave2上啟動agent
flume-ng agent -n agent1 -c conf -f /home/qun/apache-flume-1.6.0-bin/conf/client.conf -Dflume.root.logger=DEBUG,console
測試功能
echo "hello flume">>/home/qun/data/flume/logs/test.txt collector slave3 接收到agent的日志: 16/05/26 12:44:24 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/qun/data/flume/checkpoint/checkpoint, elements to sync = 2 16/05/26 12:44:24 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1464235734894, queueSize: 0, queueHead: 0 16/05/26 12:44:24 INFO file.Log: Updated checkpoint for file: /home/qun/data/flume/data/log-3 position: 786 logWriteOrderID: 1464235734894 16/05/26 12:44:24 INFO file.Log: Removing old file: /home/qun/data/flume/data/log-1 16/05/26 12:44:24 INFO file.Log: Removing old file: /home/qun/data/flume/data/log-1.meta 16/05/26 12:44:54 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/qun/data/flume/checkpoint/checkpoint, elements to sync = 2 16/05/26 12:44:54 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1464235734901, queueSize: 0, queueHead: 0 16/05/26 12:44:54 INFO file.Log: Updated checkpoint for file: /home/qun/data/flume/data/log-3 position: 1179 logWriteOrderID: 1464235734901
測試collecters Failover
殺死slave3的flume進程,kill -9 pid
echo "hello flume">>/home/qun/data/flume/logs/test.txt collector slave4 接收到agent的日志: 16/05/26 12:08:27 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/qun/data/flume/checkpoint/checkpoint, elements to sync = 2 16/05/26 12:08:27 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1464234987484, queueSize: 0, queueHead: 0 16/05/26 12:08:27 INFO file.Log: Updated checkpoint for file: /home/qun/data/flume/data/log-3 position: 393 logWriteOrderID: 1464234987484 16/05/26 12:08:27 INFO file.LogFile: Closing RandomReader /home/qun/data/flume/data/log-1 16/05/26 12:54:38 INFO client.ClientUtils$: Fetching metadata from broker id:0,host:xiaobin,port:9092 with correlation id 4 for 1 topic(s) Set(mytopic) 16/05/26 12:54:38 INFO producer.SyncProducer: Connected to xiaobin:9092 for producing 16/05/26 12:54:38 INFO producer.SyncProducer: Disconnecting from xiaobin:9092 16/05/26 12:54:38 INFO producer.SyncProducer: Disconnecting from xiaobin:9092 16/05/26 12:54:38 INFO producer.SyncProducer: Connected to xiaobin:9092 for producing 16/05/26 12:54:57 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/qun/data/flume/checkpoint/checkpoint, elements to sync = 2 16/05/26 12:54:57 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1464234987491, queueSize: 0, queueHead: 0 16/05/26 12:54:57 INFO file.Log: Updated checkpoint for file: /home/qun/data/flume/data/log-3 position: 786 logWriteOrderID: 1464234987491 16/05/26 12:54:57 INFO file.Log: Removing old file: /home/qun/data/flume/data/log-1 16/05/26 12:54:57 INFO file.Log: Removing old file: /home/qun/data/flume/data/log-1.meta
一會兒再寫····
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。