您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關Flume+Kafka+SparkStreaming的整合是怎么樣的,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
第一步,Flume和Kakfa對接,Flume抓取日志,寫到Kafka中
第二部,Spark Streaming讀取Kafka中的數據,進行實時分析
下面首先使用Kakfa自帶的消息處理(腳本)來獲取消息,走通Flume和Kafka的對接
flume install: http://my.oschina.net/u/192561/blog/692225
kafka install: http://my.oschina.net/u/192561/blog/692357
3.1 兩者整合優勢
Flume更傾向于數據傳輸本身,Kakfa是典型的消息中間件用于解耦生產者消費者。
具體架構上,Agent并沒把數據直接發送到Kafka,在Kafka前面有層由Flume構成的forward。這樣做有兩個原因:
Kafka的API對非JVM系的語言支持很不友好,forward對外提供更加通用的HTTP接口。forward層可以做路由、Kafka topic和Kafkapartition key等邏輯,進一步減少Agent端的邏輯。
數據有數據源到flume再到Kafka時,數據一方面可以同步到HDFS做離線計算,另一方面可以做實時計算。本文實時計算采用SparkStreaming做測試。
3.2 Flume和Kafka整合安裝
1. 下載Flume和Kafka集成的插件,下載地址:
https://github.com/beyondj2ee/flumeng-kafka- plugin
將package目錄中的flumeng-kafka-plugin.jar拷貝到Flume安裝目錄的lib目錄下
2. 將Kakfa安裝目錄libs目錄下的如下jar包拷貝到Flume安裝目錄的lib目錄下
kafka_2.11-0.10.0.0.jar
scala-library-2.11.8.jar
metrics-core-2.2.0.jar
提取插件中的flume-conf.properties文件:修改如下:flume源采用exec
producer.sources.s.type = exec producer.sources.s.command=tail -F -n+1 /home/eric/bigdata/kafka-logs/a.log producer.sources.s.channels = c1
修改producer代理的topic為 HappyBirthDayToAnYuan
將配置放到 apache-flume-1.6.0-bin/conf/producer.conf中
完整 producer.conf:
#agentsectionproducer.sources= s1producer.channels= c1producer.sinks= k1#配置數據源producer.sources.s1.type=exec#配置需要監控的日志輸出文件或目錄producer.sources.s1.command=tail -F -n+1 /home/eric/bigdata/kafka-logs/a.log#配置數據通道producer.channels.c1.type=memoryproducer.channels.c1.capacity=10000producer.channels.c1.transactionCapacity=100#配置數據源輸出#設置Kafka接收器,此處最坑,注意版本,此處為Flume 1.6.0的輸出槽類型producer.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink#設置Kafka的broker地址和端口號producer.sinks.k1.brokerList=localhost:9092#設置Kafka的Topicproducer.sinks.k1.topic=HappyBirthDayToAnYuan#設置序列化方式producer.sinks.k1.serializer.class=kafka.serializer.StringEncoder#將三者級聯producer.sources.s1.channels=c1producer.sinks.k1.channel=c1
3.3 啟動kafka flume相關服務
啟動ZK bin/zookeeper-server-start.sh config/zookeeper.properties
啟動Kafka服務 bin/kafka-server-start.sh config/server.properties
創建主題
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic HappyBirthDayToAnYuan
查看主題
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看主題詳情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic HappyBirthDayToAnYuan
刪除主題
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
創建消費者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
啟動flume
bin/flume-ng agent -n producer -c conf -f conf/producer.conf -Dflume.root.logger=INFO,console
向flume發送數據:
echo "yuhai" >> a.log
kafka消費數據:
注意:當前文件內容刪除,服務器重啟,主題需重新創建,但是消費內容有落地文件,當前消費內容不消失.
以上就是Flume+Kafka+SparkStreaming的整合是怎么樣的,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。