您好,登錄后才能下訂單哦!
這篇文章主要介紹Flume如何采集到HDFS,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
一、需求:
采集指定文件的內容到HDFS
技術選型:exec - memory - hdfs
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/data/data.log # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.0.129:9000/user/hadoop/flume a1.sinks.k1.hdfs.batchSize = 10 #10行產生新文件 a1.sinks.k1.hdfs.fileType = DataStream #壓縮格式 a1.sinks.k1.hdfs.writeFormat = Text #格式類型 # Use a channel which buffers events in memory a1.channels.c1.type = memory # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啟動:
./flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file /home/hadoop/script/flume/exec-memory-hdfs.conf \ -Dflume.root.logger=INFO,console \ -Dflume.monitoring.type=http \ -Dflume.monitoring.port=34343
添加測試數據:
[hadoop@hadoop001 data]$ touch data.log [hadoop@hadoop001 data]$ echo test >> data.log [hadoop@hadoop001 data]$ echo test >> data.log [hadoop@hadoop001 data]$ echo test >> data.log [hadoop@hadoop001 data]$ echo test >> data.log [hadoop@hadoop001 data]$ echo test >> data.log
檢查HDFS:
[hadoop@hadoop001 flume]$ hdfs dfs -text hdfs://192.168.0.129:9000/user/hadoop/flume/* 18/08/09 20:59:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable test test test test test
二、需求:
采集指定文件夾的內容到(HDFS或者控制臺)
==》文件夾下文件不能修改切不能重名
==》處理完當前文件添加.COMPLETED標識
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/data/ a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
三、需求:(生產使用,記錄偏移量)
采集指定文件夾和文件內容到(控制臺或者HDFS)
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 #記錄偏移量,重啟續傳 a1.sources.r1.positionFile = /home/hadoop/script/flume/taildir_position.json a1.sources.r1.filegroups = f1 f2 #監控指定log文件 a1.sources.r1.filegroups.f1 =/home/hadoop/data/example.log a1.sources.r1.headers.f1.headerKey1 = value1 #監控文加下的所有log*文件夾和內容 a1.sources.r1.filegroups.f2 = /home/hadoop/data/test/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 # 控制臺輸出 a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啟動:
./flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file /home/hadoop/script/flume/taildir-memory-logger.conf \ -Dflume.root.logger=INFO,console
記錄偏移量:
[hadoop@hadoop001 flume]$ cat taildir_position.json
[{"inode":679982,"pos":14,"file":"/home/hadoop/data/example.log"}
{"inode":679984,"pos":0,"file":"/home/hadoop/data/test/log1.log"}]
以上是“Flume如何采集到HDFS”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。