您好,登錄后才能下訂單哦!
本篇內容介紹了“Spark Streaming運行流程是怎樣的”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
通過下面的一個簡單的例子來理解spark streaming
object OnlineForeachRDD2DB { def main(args: Array[String]){ /* * 第1步:創建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息, * 例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,如果設置 * 為local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差(例如 * 只有1G的內存)的初學者 * */ val conf = new SparkConf() //創建SparkConf對象 conf.setAppName("OnlineForeachRDD") //設置應用程序的名稱,在程序運行的監控界面可以看到名稱 // conf.setMaster("spark://Master:7077") //此時,程序在Spark集群 conf.setMaster("local[6]") //設置batchDuration時間間隔來控制Job生成的頻率并且創建Spark Streaming執行的入口 val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream("Master", 9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => { val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")" val stmt = connection.createStatement(); stmt.executeUpdate(sql); }) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } } /** * 在StreamingContext調用start方法的內部其實是會啟動JobScheduler的Start方法,進行消息循環,在JobScheduler * 的start內部會構造JobGenerator和ReceiverTacker,并且調用JobGenerator和ReceiverTacker的start方法: * 1,JobGenerator啟動后會不斷的根據batchDuration生成一個個的Job * 2,ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到 * 數據后會通過ReceiverSupervisor存儲到Executor并且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker * 內部會通過ReceivedBlockTracker來管理接受到的元數據信息 * 每個BatchInterval會產生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD * 的DAG而已,從Java角度講,相當于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個 * 單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發真正的作業的運行),為什么使用線程池呢? * 1,作業不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執行Task有異曲同工之妙; * 2,有可能設置了Job的FAIR公平調度的方式,這個時候也需要多線程的支持; * */ ssc.start() ssc.awaitTermination() } }
“Spark Streaming運行流程是怎樣的”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。