您好,登錄后才能下訂單哦!
Flume
概述Flume
是Cloudera
提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統;
Flume
基于流式架構,靈活簡單。
可以和任意存儲進程集成
輸入的的數據速率大于寫入目的存儲的速率,Flume
會進行緩沖,減小HDFS
的壓力
Flume
中的事務基于Channel
,使用了兩個事務模型(sender
+ receiver
),確保消息被可靠發送
Flume
使用兩個獨立的事務分別負責從Soucrce
到Channel
,以及從Channel
到Sink
的事件傳遞。一旦事務中所有的數據全部成功提交到Channel
,那么Source
才認為該數據讀取完成,同理,只有成功被Sink
寫出去的數據,才會從Channel
中移除
Agent
Agent
是一個JVM
進程,它以事件的形式將數據從源頭傳遞到目的地
Agent
主要由Source
、Channel
、Sink
組成
Source
Source
是負責接收數據到Agent
的組件,可以處理各種類型,包括avro
、thrift
、exec
、jms
、spooling directory
、netcat
、sequence generator
、syslog
、http
、legacy
Channel
Channel
是位于Source
和Sink
之間的緩沖區,因此,Channel
允許Source
和Sink
運作在不同的速率上,Channel
是線程安全的,可以同時處理幾個Source
的寫入操作和幾個Sink
的讀取操作。
Flume
自帶兩種Channel
:
Memory Channel
:內存中的隊列速度快,適合在不需要關系數據丟失的情境下使用
File Channel
:將所有事件寫入磁盤,因此在程序關閉或機器宕機的情況下不會丟失數據
Sink
Sink
不斷地輪詢Channel
中的事件且批量地移除它們,并將這些事件批量寫入到存儲或索引系統、或者被發送到另一個Flume Agent
。
Sink
是完全事務性的,在從Channel
批量刪除數據之前,每個Sink
用Channel
啟動一個事務,批量事件一旦成功寫出到存儲系統或下一個Flume Agent
,Sink
就利用Channel
提交事務,事務一旦被提交,該Channel
從自己的內部緩沖區刪除事件。
Sink
組件目的地包括hdfs
、logger
、avro
、thrift
、ipc
、file
、null
、HBase
、solr
、自定義。
Event
傳輸單元,Flume
數據傳輸的基本單元,以事件的形式將數據從源頭送至目的地。
Event
由可選的header
和載有數據的一個byte array
構成,Header
是容納了key-value
字符串對的HashMap
。
通常一條數據就是一個 Event
,每2048
個字節劃分一個Event
。
這種模式是將多個Flume
給順序連接起來了,從最初的Source
開始到最終Sink
傳送的目的存儲系統,此模式不建議橋接過多的Flume
數量, Flume
數量過多不僅會影響傳輸速率,而且一旦傳輸過程中某個節點Flume
宕機,會影響整個傳輸系統。
Flum
支持將事件流向一個或者多個目的地,這種模式將數據源復制到多個Channel
中,每個Channel
都有相同的數據,Sink
可以選擇傳送的不同的目的地。
Flume
支持使用將多個Sink
邏輯上分到一個Sink
組,Flume
將數據發送到不同的Sink
,主要解決負載均衡和故障轉移問題。
這種模式是我們最常見的,也非常實用,日常web
應用通常分布在上百個服務器,大者甚至上千個、上萬個服務器,產生的日志,處理起來也非常麻煩,用Flume
的這種組合方式能很好的解決這一問題,每臺服務器部署一個Flume
采集日志,傳送到一個集中收集日志的Flume
,再由此Flume
上傳到 hdfs
、hive
、hbase
、jms
等進行日志分析。
Agent
原理Flume
部署1、解壓apache-flume-1.7.0-bin.tar.gz
到/opt/module
目錄下
2、修改apache-flume-1.7.0-bi
的名稱為flume
3、將flume/conf
下的flume-env.sh.template
文件修改為flume-env.sh
,并配置flume-env.sh
中的JAVA_HOME
需求分析:
服務端監聽本機44444
端口
服務端使用netcat
工具向44444
端口發送消息
最后將數據展示在控制臺上
實現步驟:
1、在job
文件夾下創建Agent
配置文件flume-netcat-logger.conf
[djm@hadoop102 job]$ vim flume-netcat-logger.conf
2、添加如下內容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、啟動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console
參數說明:
--conf conf/
表示配置文件存儲在conf/
目錄
--name a1
表示給 Agent 起名為a1
--conf-file job/flume-netcat.conf Flume
本次啟動讀取的配置文件是在job
文件夾下的 flume-telnet.conf
文件
-Dflume.root.logger==INFO,console -D
表示Flume
運行時動態修改flume.root.logger
參數屬性值,并將控制臺日志打印級別設置為INFO
級別
HDFS
需求分析:
實時監控Hive
日志,并上傳到HDFS
中
實現步驟:
1、在job
文件夾下創建Agent
配置文件flume-file-hdfs.conf
[djm@hadoop102 job]$ vim flume-file-hdfs.conf
2、添加如下內容:
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#設置每個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3、啟動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 –f job/flume-file-hdfs.conf
注意:
要想讀取Linux
系統中的文件,就得按照Linux
命令的規則執行命令,由于Hive
日志在Linux
系統中所以讀取文件的類型選擇:exec
即execute
執行的意思。表示執行Linux
命令來讀取文件。
HDFS
需求分析:
使用Flume
監聽整個目錄的文件
實現步驟:
1、在job
文件夾下創建Agent
配置文件flume-dir-hdfs.conf
[djm@hadoop102 job]$ vim flume-dir-hdfs.conf
2、添加如下內容:
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位創建一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設置每個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
3、啟動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 –f job/flume-dir-hdfs.conf
注意:
不要在監控目錄中創建并持續修改文件
需求分析:
使用Flume-1
監控文件變動,Flume-1
將變動內容傳遞給Flume-2
Flume-2
負責存儲到HDFS
同時Flume-1
將變動內容傳遞給Flume-3
,Flume-3
負責輸出到Local FileSystem
1、在group1
文件夾下創建Agent
配置文件flume-file-flume.conf
[djm@hadoop102 group1]$ vim flume-file-flume.conf
2、添加如下內容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數據流復制給所有channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink端的avro是一個數據發送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
3、在group1
文件夾下創建Agent
配置文件flume-flume-hdfs.conf
[djm@hadoop102 group1]$ vim flume-flume-hdfs.conf
4、添加如下內容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source端的avro是一個數據接收服務
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設置每個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5、在group1
文件夾下創建 Agent 配置文件flume-flume-dir.conf
[djm@hadoop102 group1]$ vim flume-flume-dir.conf
6、添加如下內容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
7、啟動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
注意:
Avro
是一種語言無關的數據序列化和RPC
框架
輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,并不會創建新的目錄
必須先啟動Sink
存在的job
Sink
組)需求分析:
使用Flume-1
監控端口數據,Flume-1
將變動內容傳遞給Flume-2
Flume-2
負責將數據展示在控制臺上
同時Flume-1
將變動內容傳遞給Flume-3
,Flume-3
也負責將數據展示在控制臺上
實現步驟:
1、在group2
文件夾下創建Agent
配置文件flume-netcat-flume.conf
2、添加如下內容:
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
3、在group2
文件夾下創建Agent
配置文件flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5、在 group2
文件夾下創建Agent
配置文件flume-flume-console2.conf
6、添加如下內容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
7、啟動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
需求分析:
hadoop103
上的Flume-1
監控文件/opt/module/group.log
hadoop102
上的Flume-2
監控某一個端口的數據流
Flume-1
與Flume-2
將數據發送給hadoop104
上的Flume-3
,Flume-3
將最終數據打印到控制臺
實現步驟:
1、在group3
文件夾下創建Agent
配置文件flume1-logger-flume.conf
[djm@hadoop102 group3]$ vim flume1-logger-flume.conf
2、添加如下內容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、在group3
文件夾下創建Agent
配置文件flume2-netcat-flume.conf
[djm@hadoop102 group3]$ vim flume2-netcat-flume.conf
4、添加如下內容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5、在group3
文件夾下創建Agent
配置文件flume3-flume-logger.conf
[djm@hadoop102 group3]$ vim flume3-flume-logger.conf
6、添加如下內容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
7、分發配置文件
[djm@hadoop102 group3]$ xsync /opt/module/flume/job
8、啟動任務
[djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
[djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf
Ganglia
部署1、安裝httpd
服務與php
yum -y install httpd php
2、安裝其他依賴
yum -y install rrdtool perl-rrdtool rrdtool-devel
3、安裝ganglia
rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
yum -y install ganglia-gmetad ganglia-gmond ganglia-web
4、修改ganglia
配置文件
vim /etc/httpd/conf.d/ganglia.conf
#
# Ganglia monitoring system php web frontend
#
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
# Require local
Require all granted
# Require ip 10.1.2.3
# Require host example.org
</Location>
特別注意:以下配置是不能起作用的
<Location /ganglia>
Order deny,allow
Allow from all
</Location>
5、修改gmetad
配置文件
vim /etc/ganglia/gmetad.conf
data_source "hadoop102" 192.168.1.102
6、修改gmond
配置文件
vim /etc/ganglia/gmond.conf
cluster {
#name = "unspecified"
name = "hadoop102"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
#mcast_join = 239.2.11.71
host = 192.168.10.102
port = 8649
ttl = 1
}
/* You can specify as many udp_recv_channels as you like as well. */
udp_recv_channel {
#mcast_join = 239.2.11.71
port = 8649
#bind = 239.2.11.71
bind = 192.168.10.102
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
6、查看SELinux
狀態
sestatus
如果不是disabled
,需修改以下配置文件:
vim /etc/selinux/config
或者臨時關閉SELinux
:
setenforce 0
7、啟動ganglia
systemctl start httpd
systemctl start gmetad
systemctl start gmond
8、打開瀏覽器訪問
http://hadoop102/ganglia/
如果完成以上操作仍出現權限不足錯誤,可修改/var/lib/ganglia
目錄的權限嘗試
chmod -R 777 /var/lib/ganglia
Source
需求分析:
編碼實現:
1、引入依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
2、代碼編寫
package com.djm.flume;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements Configurable, PollableSource {
//定義配置文件將來要讀取的字段
private Long delay;
private String field;
/**
* 接收數據,將數據封裝成一個個event,寫入channel
* @return
* @throws EventDeliveryException
*/
public Status process() throws EventDeliveryException {
HashMap<String, String> hearderMap = new HashMap<>();
SimpleEvent event = new SimpleEvent();
try {
for (int i = 0; i < 5; i++) {
event.setHeaders(hearderMap);
event.setBody((field + i).getBytes());
getChannelProcessor().processEvent(event);
Thread.sleep(delay);
}
} catch (InterruptedException e) {
e.printStackTrace();
return Status.BACKOFF;
}
return Status.READY;
}
public long getBackOffSleepIncrement() {
return 0;
}
public long getMaxBackOffSleepInterval() {
return 0;
}
/**
* 讀取配置文件
* @param context
*/
public void configure(Context context) {
delay = context.getLong("delay");
field = context.getString("field", "hello");
}
}
3、打包測試
利用Maven
打包并上傳到 /opt/module/flume/lib
目錄下
在job
文件夾下創建Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysource.conf
添加如下內容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.djm.flume.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = djm
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
Sink
需求分析:
編碼實現:
1、引入依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
2、代碼編寫
package com.djm.flume;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
private String prefix;
private String suffix;
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
try {
Event event;
transaction.begin();
while ((event = channel.take()) == null) {
Thread.sleep(200);
}
LOG.info(prefix + new String(event.getBody()) + suffix);
transaction.commit();
status = Status.READY;
} catch (Throwable e) {
transaction.rollback();
status = Status.BACKOFF;
if (e instanceof Error)
throw (Error) e;
} finally {
transaction.close();
}
return status;
}
@Override
public void configure(Context context) {
prefix = context.getString("prefix");
suffix = context.getString("suffix");
}
}
3、打包測試
利用Maven
打包并上傳到 /opt/module/flume/lib
目錄下
在job
文件夾下創建Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysink.conf
添加如下內容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.djm.flume.MySink
a1.sinks.k1.prefix = djm:
a1.sinks.k1.suffix = :end
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
Flume
參數調優Source
增加Source
個數可以增大Source
的讀取數據的能力,例如:當某一個目錄產生的文件過多時需要將這個文件目錄拆分成多個文件目錄,同時配置好多個Source
以保證Source
有足夠的能力獲取到新產生的數據。
batchSize
參數決定Source
一次批量運輸到Channel
的Event
條數,適當調大這個參數可以提高Source
搬運Event
到Channel
時的性能。
Channel
Type
選擇Memory Channel
時Channel
的性能最好,但是如果Flume
進程意外掛掉可能會丟失數據
Type
選擇File Channel
時Channel
的容錯性更好,但是性能上會比Memory Channel
差,使用File Channel
時`dataDirs 配置多個不同盤下的目錄可以提高性能。
Capacity
參數決定Channel
可容納最大的Event
條數,TransactionCapacity
參數決定每次Source
往Channel
里面寫的最大Event
條數和每次Sink
從Channel
里面讀的最大Event
條數,TransactionCapacity
需要大于Source
和Sink
的batchSize
參數。
Sink
增加Sink
的個數可以增加Sink
消費Event
的能力,Sink
也不是越多越好夠用就行,過多的Sink
會占用系統資源,造成系統資源不必要的浪費。
batchSize
參數決定Sink
一次批量從Channel
讀取的Event
條數,適當調大這個參數可以提高Sink
從Channel
搬出Event
的性能。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。