您好,登錄后才能下訂單哦!
與spark core的編程類似,在編寫SparkStreaming的程序時,也需要一個通用的編程入口----StreamingContext。
StreamingContext的創建:
object StreamingContextTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SCTest").setMaster("local[4]")
val streamingContext = new StreamingContext(sparkConf, Seconds(2))
}
}
注意:
如果在計算的時候,指定--master時 使用的是local 并且只指定了一個線程,那么只有receiver線程工作,計算的線程不會工作,所以在指定線程數的時候,最少指定2個。
在構建好StreamingContext之后,首先我們要讀取數據源的數據進行實時處理:
InputDStreams指的是從數據流的源頭接收的輸入數據流,每個 InputDStream 都關聯一個 Receiver 對象,該 Receiver 對象接收數據源傳來的數據并將其保存在內存中以便后期 Spark 處理。
Spark Streaming 提供兩種原生支持的流數據源和自定義的數據源:
- 直接通過 StreamingContext API 創建,例如文件系統(本地文件系統及分布式文件系統)、 Socket 連接及 Akka 的 Actor。
- Kafka, Flume, Kinesis, Twitter 等,需要借助外部工具類,在運行時需要外部依賴
-Spark Streaming 還支持用戶自定義數據源,它需要用戶定義 receiver
注意:
- 在本地運行 Spark Streaming 時,master URL 不能使用”local”或”local[1] ”,因為當 Input DStream 與 Receiver(如 sockets, Kafka, Flume 等)關聯時,Receiver 自身就需要一個線程 來運行,此時便沒有線程去處理接收到的數據。因此,在本地運行 SparkStreaming 程序時,要使用”local[n]”作為 master URL,n 要大于 receiver 的數量。
- 在集群上運行 Spark Streaming 時,分配給 Spark Streaming 程序的 CPU 核數也必須大于 receiver 的數量,否則系統將只接受數據,無法處理數據。
在編寫sparkStreaming時的注意點:
- streamingContext啟動后,增加新的操作將不起作用,一定要在啟動之前定義好邏輯,也就是說在調用start方法之后,在對sparkStreaming程序進行邏輯操作是不被允許的
- StreamingContext 是單例對象停止后,不能重新啟動,除非重新啟動任務,重新執行計算
- 在單個jvm中,一段時間內不能出現兩個active狀態的StreamingContext
- 當在調用 StreamingContext 的 stop 方法時,默認情況下 SparkContext 也將被 stop 掉, 如果希望 StreamingContext 關閉時,能夠保留 SparkContext,則需要在 stop 方法中傳入參 數 stop SparkContext=false
- 一個 SparkContext 可以用來創建多個 StreamingContext,只要前一個 StreamingContext 已經停止了。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。