91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flume 入門

發布時間:2020-06-26 03:10:50 來源:網絡 閱讀:567 作者:灰白世界 欄目:大數據

1Flume概述

1.1 定義

FlumeCloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統;

Flume基于流式架構,靈活簡單。

1.2 特點

可以和任意存儲進程集成

輸入的的數據速率大于寫入目的存儲的速率,Flume會進行緩沖,減小HDFS的壓力

Flume中的事務基于Channel,使用了兩個事務模型(sender+ receiver),確保消息被可靠發送

Flume使用兩個獨立的事務分別負責從SoucrceChannel,以及從ChannelSink 的事件傳遞。一旦事務中所有的數據全部成功提交到Channel,那么Source才認為該數據讀取完成,同理,只有成功被Sink寫出去的數據,才會從Channel中移除

1.3 組成架構

Flume 入門

1.3.1Agent

Agent是一個JVM進程,它以事件的形式將數據從源頭傳遞到目的地

Agent主要由SourceChannelSink組成

1.3.2Source

Source是負責接收數據到Agent的組件,可以處理各種類型,包括avrothriftexecjmsspooling directorynetcatsequence generatorsysloghttplegacy

1.3.3Channel

Channel是位于SourceSink之間的緩沖區,因此,Channel允許SourceSink運作在不同的速率上,Channel是線程安全的,可以同時處理幾個Source的寫入操作和幾個Sink的讀取操作。

Flume自帶兩種Channel

Memory Channel:內存中的隊列速度快,適合在不需要關系數據丟失的情境下使用

File Channel:將所有事件寫入磁盤,因此在程序關閉或機器宕機的情況下不會丟失數據

1.3.4Sink

Sink不斷地輪詢Channel中的事件且批量地移除它們,并將這些事件批量寫入到存儲或索引系統、或者被發送到另一個Flume Agent

Sink是完全事務性的,在從Channel批量刪除數據之前,每個SinkChannel啟動一個事務,批量事件一旦成功寫出到存儲系統或下一個Flume AgentSink就利用Channel提交事務,事務一旦被提交,該Channel從自己的內部緩沖區刪除事件。

Sink組件目的地包括hdfsloggeravrothriftipcfilenullHBasesolr、自定義。

1.3.5Event

傳輸單元,Flume數據傳輸的基本單元,以事件的形式將數據從源頭送至目的地。

Event由可選的header和載有數據的一個byte array構成,Header是容納了key-value字符串對的HashMap

通常一條數據就是一個 Event,每2048個字節劃分一個Event

1.4 拓撲結構

Flume 入門

這種模式是將多個Flume給順序連接起來了,從最初的Source開始到最終Sink傳送的目的存儲系統,此模式不建議橋接過多的Flume數量, Flume數量過多不僅會影響傳輸速率,而且一旦傳輸過程中某個節點Flume宕機,會影響整個傳輸系統。

Flume 入門

Flum支持將事件流向一個或者多個目的地,這種模式將數據源復制到多個Channel中,每個Channel都有相同的數據,Sink可以選擇傳送的不同的目的地。

Flume 入門

Flume支持使用將多個Sink邏輯上分到一個Sink組,Flume將數據發送到不同的Sink,主要解決負載均衡和故障轉移問題。

Flume 入門

這種模式是我們最常見的,也非常實用,日常web應用通常分布在上百個服務器,大者甚至上千個、上萬個服務器,產生的日志,處理起來也非常麻煩,用Flume的這種組合方式能很好的解決這一問題,每臺服務器部署一個Flume采集日志,傳送到一個集中收集日志的Flume,再由此Flume上傳到 hdfshivehbasejms等進行日志分析。

1.5Agent原理

Flume 入門

2Flume部署

1、解壓apache-flume-1.7.0-bin.tar.gz/opt/module目錄下

2、修改apache-flume-1.7.0-bi的名稱為flume

3、將flume/conf下的flume-env.sh.template文件修改為flume-env.sh,并配置flume-env.sh中的JAVA_HOME

3 企業開發案例

3.1 監控端口數據

需求分析:

服務端監聽本機44444端口

服務端使用netcat工具向44444端口發送消息

最后將數據展示在控制臺上

實現步驟:

1、在job文件夾下創建Agent配置文件flume-netcat-logger.conf

[djm@hadoop102 job]$ vim flume-netcat-logger.conf

2、添加如下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、啟動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console

參數說明:

--conf conf/表示配置文件存儲在conf/目錄

--name a1表示給 Agent 起名為a1

--conf-file job/flume-netcat.conf Flume本次啟動讀取的配置文件是在job文件夾下的 flume-telnet.conf文件

-Dflume.root.logger==INFO,console -D表示Flume運行時動態修改flume.root.logger參數屬性值,并將控制臺日志打印級別設置為INFO級別

3.2 實時讀取本地文件到HDFS

需求分析:

實時監控Hive日志,并上傳到HDFS

實現步驟:

1、在job文件夾下創建Agent配置文件flume-file-hdfs.conf

[djm@hadoop102 job]$ vim flume-file-hdfs.conf

2、添加如下內容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#設置每個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

3、啟動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 –f job/flume-file-hdfs.conf

注意:

要想讀取Linux系統中的文件,就得按照Linux命令的規則執行命令,由于Hive日志在Linux系統中所以讀取文件的類型選擇:execexecute執行的意思。表示執行Linux命令來讀取文件。

3.3 實時讀取目錄文件到 HDFS

需求分析:

使用Flume監聽整個目錄的文件

實現步驟:

1、在job文件夾下創建Agent配置文件flume-dir-hdfs.conf

[djm@hadoop102 job]$ vim flume-dir-hdfs.conf

2、添加如下內容:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位創建一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設置每個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

3、啟動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 –f job/flume-dir-hdfs.conf

注意:

不要在監控目錄中創建并持續修改文件

3.4 單數據源多出口案例(選擇器)

需求分析:

使用Flume-1監控文件變動,Flume-1將變動內容傳遞給Flume-2

Flume-2負責存儲到HDFS

同時Flume-1將變動內容傳遞給Flume-3Flume-3負責輸出到Local FileSystem

1、在group1文件夾下創建Agent配置文件flume-file-flume.conf

[djm@hadoop102 group1]$ vim flume-file-flume.conf

2、添加如下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數據流復制給所有channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一個數據發送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102 
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

3、在group1文件夾下創建Agent配置文件flume-flume-hdfs.conf

[djm@hadoop102 group1]$ vim flume-flume-hdfs.conf

4、添加如下內容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一個數據接收服務
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設置每個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5、在group1文件夾下創建 Agent 配置文件flume-flume-dir.conf

[djm@hadoop102 group1]$ vim flume-flume-dir.conf

6、添加如下內容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

7、啟動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf

注意:

Avro是一種語言無關的數據序列化和RPC框架

輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,并不會創建新的目錄

必須先啟動Sink存在的job

3.5 單數據源多出口案例(Sink組)

需求分析:

使用Flume-1監控端口數據,Flume-1將變動內容傳遞給Flume-2

Flume-2負責將數據展示在控制臺上

同時Flume-1將變動內容傳遞給Flume-3Flume-3也負責將數據展示在控制臺上

實現步驟:

1、在group2文件夾下創建Agent配置文件flume-netcat-flume.conf

2、添加如下內容:

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

3、在group2文件夾下創建Agent配置文件flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5、在 group2文件夾下創建Agent配置文件flume-flume-console2.conf

6、添加如下內容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

7、啟動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf

3.6 多數據源匯總

需求分析:

hadoop103上的Flume-1監控文件/opt/module/group.log

hadoop102上的Flume-2監控某一個端口的數據流

Flume-1Flume-2將數據發送給hadoop104上的Flume-3Flume-3將最終數據打印到控制臺

實現步驟:

1、在group3文件夾下創建Agent配置文件flume1-logger-flume.conf

[djm@hadoop102 group3]$ vim flume1-logger-flume.conf 

2、添加如下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、在group3文件夾下創建Agent配置文件flume2-netcat-flume.conf

[djm@hadoop102 group3]$ vim flume2-netcat-flume.conf

4、添加如下內容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5、在group3文件夾下創建Agent配置文件flume3-flume-logger.conf

[djm@hadoop102 group3]$ vim flume3-flume-logger.conf

6、添加如下內容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

7、分發配置文件

[djm@hadoop102 group3]$ xsync /opt/module/flume/job

8、啟動任務

[djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
[djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf

4Ganglia部署

1、安裝httpd服務與php

yum -y install httpd php

2、安裝其他依賴

yum -y install rrdtool perl-rrdtool rrdtool-devel

3、安裝ganglia

rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
yum -y install ganglia-gmetad ganglia-gmond ganglia-web 

4、修改ganglia配置文件

vim /etc/httpd/conf.d/ganglia.conf
#
# Ganglia monitoring system php web frontend
#

Alias /ganglia /usr/share/ganglia

<Location /ganglia>
  # Require local
  Require all granted
  # Require ip 10.1.2.3
  # Require host example.org
</Location> 

特別注意:以下配置是不能起作用的

<Location /ganglia>
  Order deny,allow
  Allow from all
</Location> 

5、修改gmetad配置文件

vim /etc/ganglia/gmetad.conf 
data_source "hadoop102" 192.168.1.102

6、修改gmond配置文件

vim /etc/ganglia/gmond.conf 
cluster {
  #name = "unspecified"
  name = "hadoop102"
  owner = "unspecified"
  latlong = "unspecified"
  url = "unspecified"
}

udp_send_channel { 
#bind_hostname = yes # Highly recommended, soon to be default. 
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
  #mcast_join = 239.2.11.71
  host = 192.168.10.102
  port = 8649
  ttl = 1
}

/* You can specify as many udp_recv_channels as you like as well. */
udp_recv_channel {
  #mcast_join = 239.2.11.71
  port = 8649
  #bind = 239.2.11.71
  bind = 192.168.10.102
  retry_bind = true 

# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
} 

6、查看SELinux狀態

sestatus

如果不是disabled,需修改以下配置文件:

vim /etc/selinux/config

或者臨時關閉SELinux

setenforce 0

7、啟動ganglia

systemctl start httpd
systemctl start gmetad 
systemctl start gmond

8、打開瀏覽器訪問

http://hadoop102/ganglia/

如果完成以上操作仍出現權限不足錯誤,可修改/var/lib/ganglia目錄的權限嘗試

chmod -R 777 /var/lib/ganglia

5 自定義Source

需求分析:

Flume 入門

編碼實現:

1、引入依賴

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>

2、代碼編寫

package com.djm.flume;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    //定義配置文件將來要讀取的字段
    private Long delay;

    private String field;

    /**
     * 接收數據,將數據封裝成一個個event,寫入channel
     * @return
     * @throws EventDeliveryException
     */
    public Status process() throws EventDeliveryException {
        HashMap<String, String> hearderMap  = new HashMap<>();
        SimpleEvent event = new SimpleEvent();
            try {
                for (int i = 0; i < 5; i++) {
                    event.setHeaders(hearderMap);
                    event.setBody((field + i).getBytes());
                    getChannelProcessor().processEvent(event);
                    Thread.sleep(delay);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                return Status.BACKOFF;
            }
            return Status.READY;
    }

    public long getBackOffSleepIncrement() {
        return 0;
    }

    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    /**
     * 讀取配置文件
     * @param context
     */
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field", "hello");
    }
}

3、打包測試

利用Maven打包并上傳到 /opt/module/flume/lib目錄下

job文件夾下創建Agent配置文件mysource.conf

[djm@hadoop102 job]$ vim mysource.conf

添加如下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.djm.flume.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = djm

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

6 自定義Sink

需求分析:

Flume 入門

編碼實現:

1、引入依賴

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>

2、代碼編寫

package com.djm.flume;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {

    private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);

    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;

        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        try {
            Event event;
            transaction.begin();
            while ((event = channel.take()) == null) {
                Thread.sleep(200);
            }
            LOG.info(prefix + new String(event.getBody()) + suffix);
            transaction.commit();
            status = Status.READY;
        } catch (Throwable e) {
            transaction.rollback();
            status = Status.BACKOFF;
            if (e instanceof Error)
                throw (Error) e;
        } finally {
            transaction.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix");
        suffix = context.getString("suffix");
    }
}

3、打包測試

利用Maven打包并上傳到 /opt/module/flume/lib目錄下

job文件夾下創建Agent配置文件mysource.conf

[djm@hadoop102 job]$ vim mysink.conf

添加如下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.djm.flume.MySink
a1.sinks.k1.prefix = djm:
a1.sinks.k1.suffix = :end

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console

7Flume參數調優

7.1Source

增加Source個數可以增大Source的讀取數據的能力,例如:當某一個目錄產生的文件過多時需要將這個文件目錄拆分成多個文件目錄,同時配置好多個Source以保證Source有足夠的能力獲取到新產生的數據。

batchSize參數決定Source一次批量運輸到ChannelEvent條數,適當調大這個參數可以提高Source搬運EventChannel時的性能。

7.2Channel

Type選擇Memory ChannelChannel的性能最好,但是如果Flume進程意外掛掉可能會丟失數據

Type選擇File ChannelChannel的容錯性更好,但是性能上會比Memory Channel差,使用File Channel時`dataDirs 配置多個不同盤下的目錄可以提高性能。

Capacity參數決定Channel可容納最大的Event條數,TransactionCapacity 參數決定每次SourceChannel里面寫的最大Event條數和每次SinkChannel里面讀的最大Event條數,TransactionCapacity需要大于SourceSinkbatchSize參數。

7.3Sink

增加Sink的個數可以增加Sink消費Event的能力,Sink也不是越多越好夠用就行,過多的Sink會占用系統資源,造成系統資源不必要的浪費。

batchSize參數決定Sink一次批量從Channel讀取的Event條數,適當調大這個參數可以提高SinkChannel搬出Event的性能。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

恩施市| 当涂县| 馆陶县| 鄢陵县| 安康市| 兰坪| 墨江| 阿鲁科尔沁旗| 衡山县| 剑阁县| 兴城市| 河东区| 新巴尔虎右旗| 庆元县| 新余市| 元江| 上虞市| 班戈县| 阿荣旗| 如东县| 资中县| 都江堰市| 大庆市| 于都县| 错那县| 临泽县| 玛曲县| 泽普县| 会泽县| 白水县| 邻水| 汤阴县| 宁德市| 济源市| 淮北市| 延边| 临夏市| 乡城县| 昌黎县| 宝兴县| 丰宁|