91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka+SparkStream+Hive的項目實現方法是什么

發布時間:2021-11-22 10:01:03 來源:億速云 閱讀:126 作者:iii 欄目:大數據

本篇內容主要講解“Kafka+SparkStream+Hive的項目實現方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Kafka+SparkStream+Hive的項目實現方法是什么”吧!

目前的項目中需要將kafka隊列的數據實時存到hive表中。

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  def main(args: Array[String]): Unit = {
      //    val conf = new SparkConf()
      //    conf.setMaster("local")
      //    conf.setAppName("SparkStreamingOnKafkaDirect")
      val spark = SparkSession.builder().appName("test").master("local").enableHiveSupport().getOrCreate()
      val ssc = new StreamingContext(spark.sparkContext, Durations.seconds(3))
      //設置日志級別
      ssc.sparkContext.setLogLevel("Error")

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "MyGroupId", //

        /**
         * 當沒有初始的offset,或者當前的offset不存在,如何處理數據
         * earliest :自動重置偏移量為最小偏移量
         * latest:自動重置偏移量為最大偏移量【默認】
         * none:沒有找到以前的offset,拋出異常
         */
        "auto.offset.reset" -> "earliest",

        /**
         * 當設置 enable.auto.commit為false時,不會自動向kafka中保存消費者offset.需要異步的處理完數據之后手動提交
         */
        "enable.auto.commit" -> (false: java.lang.Boolean) //默認是true
      )

      //設置Kafka的topic
      val topics = Array("test")
      //創建與Kafka的連接,接收數據
      /*這里接收到數據的樣子
      2019-09-26  1569487411604   1235    497 Kafka   Register
      2019-09-26  1569487411604   1235    497 Kafka   Register
      2019-09-26  1569487414838   390    778  Flink   View
      */
      val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent, //
        Subscribe[String, String](topics, kafkaParams)
      )

      //對接收到的數據進行處理,打印出來接收到的key跟value,最后放回的是value
      val transStrem: DStream[String] = stream.map(record => {
        val key_value = (record.key, record.value)
        println("receive message key = " + key_value._1)
        println("receive message value = " + key_value._2)
        key_value._2
      })


      //這里用了一下動態創建的Schema
      val structType: StructType = StructType(List[StructField](
        StructField("Date_", StringType, nullable = true),
        StructField("Timestamp_", StringType, nullable = true),
        StructField("UserID", StringType, nullable = true),
        StructField("PageID", StringType, nullable = true),
        StructField("Channel", StringType, nullable = true),
        StructField("Action", StringType, nullable = true)
      ))

      //因為foreachRDD可以拿到封裝到DStream中的rdd,可以對里面的rdd進行,
      /*代碼解釋:
          先從foreach中拿到一條數據,,在函數map中對接收來的數據用 “\n” 進行切分,放到Row中,用的是動態創建Schema,因為我們需要再將數據存儲到hive中,所以需要Schema。
          因為map是transformance算子,所以用rdd.count()觸發一下
           spark.createDataFrame:創建一個DataFrame,因為要注冊一個臨時表,必須用到DataFrame
           frame.createOrReplaceTempView("t1"):注冊臨時表
             spark.sql("use spark"):使用 hive 的 spark 庫
           result.write.mode(SaveMode.Append).saveAsTable("test_kafka"):將數據放到 test_kafka 中
      */
      transStrem.foreachRDD(one => {
        val rdd: RDD[Row] = one.map({
          a =>
            val arr = a.toString.split("\t")
            Row(arr(0).toString, arr(1).toString, arr(2).toString, arr(3).toString, arr(4).toString, arr(5).toString)
        })
        rdd.count()
        val frame: DataFrame = spark.createDataFrame(rdd, structType)
        //      println(" Scheme: "+frame.printSchema())

        frame.createOrReplaceTempView("t1")
        //      spark.sql("select * from t1").show()
        spark.sql("use spark")
        spark.sql("select * from t1").
          write.mode(SaveMode.Append).saveAsTable("test_kafka")
      }
      )

      /**
       * 以上業務處理完成之后,異步的提交消費者offset,這里將 enable.auto.commit 設置成false,就是使用kafka 自己來管理消費者offset
       * 注意這里,獲取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset時,必須從 源頭讀取過來的 stream中獲取,不能從經過stream轉換之后的DStream中獲取。
       */
      stream.foreachRDD { rdd =>
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        // some time later, after outputs have completed
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
      ssc.start()
      ssc.awaitTermination()
      ssc.stop()
  }

到此,相信大家對“Kafka+SparkStream+Hive的項目實現方法是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

涿鹿县| 武城县| 周至县| 磐石市| 舒兰市| 南木林县| 大英县| 调兵山市| 新竹县| 高州市| 北票市| 彭州市| 历史| 京山县| 拉萨市| 香港| 辽宁省| 合肥市| 金溪县| 邵阳市| 庆云县| 邵武市| 胶州市| 瑞昌市| 安徽省| 奎屯市| 德庆县| 万载县| 正定县| 两当县| 新宁县| 山西省| 鲁甸县| 政和县| 敦煌市| 钦州市| 胶州市| 乐亭县| 方城县| 乌鲁木齐县| 洪湖市|