91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark Streaming運行流程是怎樣的

發布時間:2021-12-16 16:29:38 來源:億速云 閱讀:119 作者:iii 欄目:云計算

本篇內容介紹了“Spark Streaming運行流程是怎樣的”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

通過下面的一個簡單的例子來理解spark streaming

object OnlineForeachRDD2DB {
  def main(args: Array[String]){
    /*
      * 第1步:創建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息,
      * 例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,如果設置
      * 為local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差(例如
      * 只有1G的內存)的初學者       *
      */
    val conf = new SparkConf() //創建SparkConf對象
    conf.setAppName("OnlineForeachRDD") //設置應用程序的名稱,在程序運行的監控界面可以看到名稱
//    conf.setMaster("spark://Master:7077") //此時,程序在Spark集群
    conf.setMaster("local[6]")
    //設置batchDuration時間間隔來控制Job生成的頻率并且創建Spark Streaming執行的入口
    val ssc = new StreamingContext(conf, Seconds(5))


    val lines = ssc.socketTextStream("Master", 9999)

    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords => {
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => {
          val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
          val stmt = connection.createStatement();
          stmt.executeUpdate(sql);

        })
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse

      }

      }
    }


    /**
      * 在StreamingContext調用start方法的內部其實是會啟動JobScheduler的Start方法,進行消息循環,在JobScheduler
      * 的start內部會構造JobGenerator和ReceiverTacker,并且調用JobGenerator和ReceiverTacker的start方法:
      *   1,JobGenerator啟動后會不斷的根據batchDuration生成一個個的Job
      *   2,ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到
      *   數據后會通過ReceiverSupervisor存儲到Executor并且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker
      *   內部會通過ReceivedBlockTracker來管理接受到的元數據信息
      * 每個BatchInterval會產生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD
      * 的DAG而已,從Java角度講,相當于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個
      * 單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發真正的作業的運行),為什么使用線程池呢?
      *   1,作業不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執行Task有異曲同工之妙;
      *   2,有可能設置了Job的FAIR公平調度的方式,這個時候也需要多線程的支持;
      *
      */
    ssc.start()
    ssc.awaitTermination()

  }
}

Spark Streaming運行流程是怎樣的Spark Streaming運行流程是怎樣的

“Spark Streaming運行流程是怎樣的”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

遂昌县| 金秀| 江北区| 班玛县| 阜平县| 五寨县| 浦东新区| 郸城县| 十堰市| 东城区| 南宁市| 武平县| 大理市| 巨野县| 白银市| 浦县| 台中市| 上杭县| 花垣县| 宁城县| 兰溪市| 蕲春县| 岳阳市| 永胜县| 鲁甸县| 咸阳市| 富平县| 定陶县| 清新县| 五常市| 新闻| 叶城县| 衡南县| 灯塔市| 酒泉市| 林芝县| 灌南县| 墨竹工卡县| 安义县| 张家口市| 石泉县|