您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關大數據開發中Spark Streaming處理數據及寫入Kafka,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
Spark Streaming從各種輸入源中讀取數據,并把數據分組為小的批次。新的批次按均勻的時間間隔創建出來。在每個時間區間開始的時候,一個新的批次就創建出來,在該區間內收到的數據都會被添加到這個批次中。在時間區間結束時,批次停止增長,時間區間的大小是由批次間隔這個參數決定的。批次間隔一般設在500毫秒到幾秒之間,由開發者配置。每個輸入批次都形成一個RDD,以 Spark 作業的方式處理并生成其他的 RDD。 處理的結果可以以批處理的方式傳給外部系統,Spark Streaming的編程抽象是離散化流,也就是DStream。它是一個 RDD 序列,每個RDD代表數據流中一個時間片內的數據。另外加入了窗口操作和狀態轉化,其他和批次處理類似。
與StructedStreaming的區別
StructedStreaming誕生于2.x后,主要用于處理結構化數據,除了實現與Spark Streaming的批處理,還實現了long-running的task,主要理解為處理的時機可以是數據的生產時間,而非收到數據的時間,可以細看下表:
流處理模式 | SparkStreaming | Structed Streaming |
---|---|---|
執行模式 | Micro Batch | Micro batch / Streaming |
API | Dstream/streamingContext | Dataset/DataFrame,SparkSession |
Job 生成方式 | Timer定時器定時生成job | Trigger觸發 |
支持數據源 | Socket,filstream,kafka,zeroMq,flume,kinesis | Socket,filstream,kafka,ratesource |
executed-based | Executed based on dstream api | Executed based on sparksql |
Time based | Processing Time | ProcessingTime & eventTIme |
UI | Built-in | No |
對于流處理,現在生產環境下使用Flink較多,數據源方式,現在基本是以kafka為主,所以本文對Spark Streaming的場景即ETL流處理結構化日志,將結果輸入Kafka隊列
1、客戶端提交Spark Streaming作業后啟動Driver,Driver啟動Receiver,Receiver接收數據源的數據
2、每個作業包含多個Executor,每個Executor以線程的方式運行task,SparkStreaming至少包含一個receiver task(一般情況下)
3、Receiver接收數據后生成Block,并把BlockId匯報給Driver,然后備份到另外一個 Executor 上
4、ReceiverTracker維護 Reciver 匯報的BlockId
5、Driver定時啟動JobGenerator,根據Dstream的關系生成邏輯RDD,然后創建Jobset,交給JobScheduler
6、JobScheduler負責調度Jobset,交給DAGScheduler,DAGScheduler根據邏輯RDD,生成相應的Stages,每個stage包含一到多個Task,將TaskSet提交給TaskSchedule
7、TaskScheduler負責把 Task 調度到 Executor 上,并維護 Task 的運行狀態
常用數據源的讀取方式
常數據流:
val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray) val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)
Socket:
val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray) val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)
RDD隊列:
val queue = new Queue[RDD[Int]]() val queueDStream: InputDStream[Int] = ssc.queueStream(queue)
文件夾:
val lines: DStream[String] = ssc.textFileStream("data/log/")
生產上,常用流程如下,批處理原始Kafka日志,比如請求打點日志等,使用Spark Streaming來將數據清洗轉變為一定格式再導入Kafka中,為了保證exact-once,會將offer自己來保存,主要保存在redis-offset中
數據地址:鏈接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ提取碼:hell
sample.log格式如下:
我們將它先放到文件里,模擬生產環境下xx.log
一個用來放原始的日志數據,一個用來放處理過后的日志
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic1 --partitions 1 --replication-factor 1 kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic2 --partitions 1 --replication-factor 1
啟動redis服務:
./redis-server redis.conf
查看mytopic1數據
kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic mytopic1 --from-beginning
第一部分,處理原始文件數據寫入mytopic1
package com.hoult.Streaming.work import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FilerToKafka { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]") val sc = new SparkContext(conf) // 定義 kafka producer參數 val lines: RDD[String] = sc.textFile("data/sample.log") // 定義 kafka producer參數 val prop = new Properties() prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) // 將讀取到的數據發送到mytopic1 lines.foreachPartition{iter => // KafkaProducer val producer = new KafkaProducer[String, String](prop) iter.foreach{line => val record = new ProducerRecord[String, String]("mytopic1", line) producer.send(record) } producer.close() } } }
第二部分,streaming讀取mytopic1的數據,寫入mytopic2
package com.hoult.Streaming.work import java.util.Properties import com.hoult.Streaming.kafka.OffsetsWithRedisUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 每秒處理Kafka數據,生成結構化數據,輸入另外一個Kafka topic */ object KafkaStreamingETL { val log = Logger.getLogger(this.getClass) def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) // 需要消費的topic val topics: Array[String] = Array("mytopic1") val groupid = "mygroup1" // 定義kafka相關參數 val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid) // 從Redis獲取offset val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid) // 創建DStream val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, // 從kafka中讀取數據 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets) ) // 轉換后的數據發送到另一個topic dstream.foreachRDD{rdd => if (!rdd.isEmpty) { val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition(process) // 將offset保存到Redis OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid) } } // 啟動作業 ssc.start() ssc.awaitTermination() } def process(iter: Iterator[ConsumerRecord[String, String]]) = { iter.map(line => parse(line.value)) .filter(!_.isEmpty) // .foreach(println) .foreach(line =>sendMsg2Topic(line, "mytopic2")) } def parse(text: String): String = { try{ val arr = text.replace("<<<!>>>", "").split(",") if (arr.length != 15) return "" arr.mkString("|") } catch { case e: Exception => log.error("解析數據出錯!", e) "" } } def getKafkaConsumerParameters(groupid: String): Map[String, Object] = { Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG -> groupid, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" ) } def getKafkaProducerParameters(): Properties = { val prop = new Properties() prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop } def sendMsg2Topic(msg: String, topic: String): Unit = { val producer = new KafkaProducer[String, String](getKafkaProducerParameters()) val record = new ProducerRecord[String, String](topic, msg) producer.send(record) } }
第三部分,從redis中讀寫offset的工具
package com.hoult.Streaming.kafka import java.util import org.apache.kafka.common.TopicPartition import org.apache.spark.streaming.kafka010.OffsetRange import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} import scala.collection.mutable object OffsetsWithRedisUtils { // 定義Redis參數 private val redisHost = "linux121" private val redisPort = 6379 // 獲取Redis的連接 private val config = new JedisPoolConfig // 最大空閑數 config.setMaxIdle(5) // 最大連接數 config.setMaxTotal(10) private val pool = new JedisPool(config, redisHost, redisPort, 10000) private def getRedisConnection: Jedis = pool.getResource private val topicPrefix = "kafka:topic" // Key:kafka:topic:TopicName:groupid private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid" // 根據 key 獲取offsets def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = { val jedis: Jedis = getRedisConnection val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic => val key = getKey(topic, groupId) import scala.collection.JavaConverters._ jedis.hgetAll(key) .asScala .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong } } // 歸還資源 jedis.close() offsets.flatten.toMap } // 將offsets保存到Redis中 def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = { // 獲取連接 val jedis: Jedis = getRedisConnection // 組織數據 offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))} .groupBy(_._1) .foreach{case (topic, buffer) => val key: String = getKey(topic, groupId) import scala.collection.JavaConverters._ val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava // 保存數據 jedis.hmset(key, maps) } jedis.close() } def main(args: Array[String]): Unit = { val topics = Array("mytopic1") val groupid = "mygroup1" val x: Map[TopicPartition, Long] = getOffsetsFromRedis(topics, groupid) x.foreach(println) } }
啟動redis ./redis-server ./redis.conf
啟動kafka并創建topic sh scripts/kafka.sh start
3.2 創建兩個topic,并創建KafkaProducer來嫁給你數據寫入mytopic1
啟動FilerToKafka 和 KafkaStreamingETL
spark-streaming讀文件讀不到的問題 ,讀取本地文件時候,要注意,它不會讀取原本就存在于該文件里的文本,只會讀取在監聽期間,傳進文件夾里的數據,而且本文本還有要求,必須是它組后一次更改并且保存的操作,是在監聽開始的那一刻 之后的,其實意思就是,如果要向被監聽的文件夾里傳一個文本,你就要在監聽開始之后,先打開這個文本,隨便輸入幾個空格,或者回車,或者其他不影響文本內容的操作,然后保存,最后再傳進文件夾里,這樣它才能 檢測到這個被傳進來的文本。
上述就是小編為大家分享的大數據開發中Spark Streaming處理數據及寫入Kafka了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。