91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

大數據開發中Spark Streaming處理數據及寫入Kafka

發布時間:2021-12-15 11:06:36 來源:億速云 閱讀:247 作者:柒染 欄目:大數據

這期內容當中小編將會給大家帶來有關大數據開發中Spark Streaming處理數據及寫入Kafka,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

1.Spark Streaming簡介

Spark Streaming從各種輸入源中讀取數據,并把數據分組為小的批次。新的批次按均勻的時間間隔創建出來。在每個時間區間開始的時候,一個新的批次就創建出來,在該區間內收到的數據都會被添加到這個批次中。在時間區間結束時,批次停止增長,時間區間的大小是由批次間隔這個參數決定的。批次間隔一般設在500毫秒到幾秒之間,由開發者配置。每個輸入批次都形成一個RDD,以 Spark 作業的方式處理并生成其他的 RDD。 處理的結果可以以批處理的方式傳給外部系統,Spark Streaming的編程抽象是離散化流,也就是DStream。它是一個 RDD 序列,每個RDD代表數據流中一個時間片內的數據。另外加入了窗口操作和狀態轉化,其他和批次處理類似。

與StructedStreaming的區別

StructedStreaming誕生于2.x后,主要用于處理結構化數據,除了實現與Spark Streaming的批處理,還實現了long-running的task,主要理解為處理的時機可以是數據的生產時間,而非收到數據的時間,可以細看下表:

流處理模式SparkStreamingStructed Streaming
執行模式Micro BatchMicro batch / Streaming
APIDstream/streamingContextDataset/DataFrame,SparkSession
Job 生成方式Timer定時器定時生成jobTrigger觸發
支持數據源Socket,filstream,kafka,zeroMq,flume,kinesisSocket,filstream,kafka,ratesource
executed-basedExecuted based on dstream apiExecuted based on sparksql
Time basedProcessing TimeProcessingTime & eventTIme
UIBuilt-inNo



對于流處理,現在生產環境下使用Flink較多,數據源方式,現在基本是以kafka為主,所以本文對Spark Streaming的場景即ETL流處理結構化日志,將結果輸入Kafka隊列

2.Spark Sreaming的運行流程

1、客戶端提交Spark Streaming作業后啟動Driver,Driver啟動Receiver,Receiver接收數據源的數據

2、每個作業包含多個Executor,每個Executor以線程的方式運行task,SparkStreaming至少包含一個receiver task(一般情況下)

3、Receiver接收數據后生成Block,并把BlockId匯報給Driver,然后備份到另外一個 Executor 上

4、ReceiverTracker維護 Reciver 匯報的BlockId

5、Driver定時啟動JobGenerator,根據Dstream的關系生成邏輯RDD,然后創建Jobset,交給JobScheduler

6、JobScheduler負責調度Jobset,交給DAGScheduler,DAGScheduler根據邏輯RDD,生成相應的Stages,每個stage包含一到多個Task,將TaskSet提交給TaskSchedule

7、TaskScheduler負責把 Task 調度到 Executor 上,并維護 Task 的運行狀態

常用數據源的讀取方式

常數據流:

    val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
    val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)

Socket:

    val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
    val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)

RDD隊列:

    val queue = new Queue[RDD[Int]]()
    val queueDStream: InputDStream[Int] = ssc.queueStream(queue)

文件夾:

    val lines: DStream[String] = ssc.textFileStream("data/log/")

3.案例說明

生產上,常用流程如下,批處理原始Kafka日志,比如請求打點日志等,使用Spark Streaming來將數據清洗轉變為一定格式再導入Kafka中,為了保證exact-once,會將offer自己來保存,主要保存在redis-offset中

數據地址:鏈接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ提取碼:hell

3.1 原始Kafka日志

sample.log格式如下:

大數據開發中Spark Streaming處理數據及寫入Kafka

我們將它先放到文件里,模擬生產環境下xx.log

3.2 創建兩個topic,并創建KafkaProducer來嫁給你數據寫入mytopic1

一個用來放原始的日志數據,一個用來放處理過后的日志

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic1 --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic2 --partitions 1 --replication-factor 1

啟動redis服務:

./redis-server redis.conf

查看mytopic1數據

kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic mytopic1 --from-beginning

3.3 代碼實現

第一部分,處理原始文件數據寫入mytopic1

package com.hoult.Streaming.work

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FilerToKafka {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 定義 kafka producer參數
    val lines: RDD[String] = sc.textFile("data/sample.log")

    // 定義 kafka producer參數
    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

    // 將讀取到的數據發送到mytopic1
    lines.foreachPartition{iter =>
      // KafkaProducer
      val producer = new KafkaProducer[String, String](prop)
      iter.foreach{line =>
        val record = new ProducerRecord[String, String]("mytopic1", line)
        producer.send(record)
      }
      producer.close()
    }
  }
}

第二部分,streaming讀取mytopic1的數據,寫入mytopic2

package com.hoult.Streaming.work

import java.util.Properties

import com.hoult.Streaming.kafka.OffsetsWithRedisUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
 * 每秒處理Kafka數據,生成結構化數據,輸入另外一個Kafka topic
 */
object KafkaStreamingETL {
  val log = Logger.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))

    // 需要消費的topic
    val topics: Array[String] = Array("mytopic1")
    val groupid = "mygroup1"
    // 定義kafka相關參數
    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid)
    // 從Redis獲取offset
    val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid)

    // 創建DStream
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      // 從kafka中讀取數據
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets)
    )

    // 轉換后的數據發送到另一個topic
    dstream.foreachRDD{rdd =>
      if (!rdd.isEmpty) {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition(process)
        // 將offset保存到Redis
        OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid)
      }
    }

    // 啟動作業
    ssc.start()
    ssc.awaitTermination()
  }

  def process(iter: Iterator[ConsumerRecord[String, String]]) = {
    iter.map(line => parse(line.value))
      .filter(!_.isEmpty)
//      .foreach(println)
      .foreach(line =>sendMsg2Topic(line, "mytopic2"))
  }

  def parse(text: String): String = {
    try{
      val arr = text.replace("<<<!>>>", "").split(",")
      if (arr.length != 15) return ""
      arr.mkString("|")
    } catch {
      case e: Exception =>
        log.error("解析數據出錯!", e)
        ""
    }
  }

  def getKafkaConsumerParameters(groupid: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupid,
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )
  }

  def getKafkaProducerParameters(): Properties = {
    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop
  }

  def sendMsg2Topic(msg: String, topic: String): Unit = {
    val producer = new KafkaProducer[String, String](getKafkaProducerParameters())
    val record = new ProducerRecord[String, String](topic, msg)
    producer.send(record)
  }
}

第三部分,從redis中讀寫offset的工具

package com.hoult.Streaming.kafka

import java.util

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

import scala.collection.mutable

object OffsetsWithRedisUtils {
  // 定義Redis參數
  private val redisHost = "linux121"
  private val redisPort = 6379

  // 獲取Redis的連接
  private val config = new JedisPoolConfig
  // 最大空閑數
  config.setMaxIdle(5)
  // 最大連接數
  config.setMaxTotal(10)

  private val pool = new JedisPool(config, redisHost, redisPort, 10000)
  private def getRedisConnection: Jedis = pool.getResource

  private val topicPrefix = "kafka:topic"

  // Key:kafka:topic:TopicName:groupid
  private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid"

  // 根據 key 獲取offsets
  def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
    val jedis: Jedis = getRedisConnection

    val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic =>
      val key = getKey(topic, groupId)

      import scala.collection.JavaConverters._

      jedis.hgetAll(key)
        .asScala
        .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong }
    }

    // 歸還資源
    jedis.close()

    offsets.flatten.toMap
  }

  // 將offsets保存到Redis中
  def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = {
    // 獲取連接
    val jedis: Jedis = getRedisConnection

    // 組織數據
    offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))}
        .groupBy(_._1)
      .foreach{case (topic, buffer) =>
        val key: String = getKey(topic, groupId)

        import scala.collection.JavaConverters._
        val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava

        // 保存數據
        jedis.hmset(key, maps)
      }

    jedis.close()
  }

  def main(args: Array[String]): Unit = {
    val topics = Array("mytopic1")
    val groupid = "mygroup1"
    val x: Map[TopicPartition, Long] = getOffsetsFromRedis(topics, groupid)
    x.foreach(println)
  }
}

3.4 演示

  • 啟動redis ./redis-server ./redis.conf

  • 啟動kafka并創建topic sh scripts/kafka.sh start 3.2 創建兩個topic,并創建KafkaProducer來嫁給你數據寫入mytopic1

  • 啟動FilerToKafka 和 KafkaStreamingETL

大數據開發中Spark Streaming處理數據及寫入Kafka 大數據開發中Spark Streaming處理數據及寫入Kafka

4.spark-streamin注意事項

spark-streaming讀文件讀不到的問題 ,讀取本地文件時候,要注意,它不會讀取原本就存在于該文件里的文本,只會讀取在監聽期間,傳進文件夾里的數據,而且本文本還有要求,必須是它組后一次更改并且保存的操作,是在監聽開始的那一刻 之后的,其實意思就是,如果要向被監聽的文件夾里傳一個文本,你就要在監聽開始之后,先打開這個文本,隨便輸入幾個空格,或者回車,或者其他不影響文本內容的操作,然后保存,最后再傳進文件夾里,這樣它才能 檢測到這個被傳進來的文本。

上述就是小編為大家分享的大數據開發中Spark Streaming處理數據及寫入Kafka了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

杭锦旗| 海晏县| 乐至县| 永济市| 沅江市| 马公市| 达孜县| 大丰市| 西吉县| 博兴县| 米脂县| 瑞金市| 芦山县| 阿拉尔市| 宜兰县| 祁门县| 舟山市| 益阳市| 海城市| 青铜峡市| 万荣县| 泰宁县| 社旗县| 易门县| 铁岭县| 甘洛县| 富平县| 康平县| 山东| 东乡族自治县| 和顺县| 手游| 淮滨县| 顺昌县| 南汇区| 白银市| 塔河县| 噶尔县| 克东县| 剑川县| 罗城|