您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關Spark廣播變量分析以及如何動態更新廣播變量,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
廣播變量存儲目前基于Spark實現的BlockManager分布式存儲系統,Spark中的shuffle數據、加載HDFS數據時切分過來的block塊都存儲在BlockManager中,不是今天的討論點,這里先不做詳述了。
廣播變量的創建方式和獲取
//創建廣播變量
val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))
//獲取廣播變量
broadcastVar.value
1.首先調用val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
3.通過廣播工廠的newBroadcast方法進行創建
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
在調用BroadcastManager的newBroadcast方法時已完成對廣播工廠的初始化(initialize方法),我們只需看BroadcastFactory的實現TorrentBroadcastFactory中對TorrentBroadcast的實例化過程:
new TorrentBroadcast[T](value_, id)
conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
廣播變量初始化過程
從driver端或者其他的executor中讀取,將讀取的對象存儲到本地,并存于緩存中
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
Spark兩種廣播變量對比
TorrentBroadcast在executor端存儲一個對象的同時會將獲取的block存儲于BlockManager,并向driver端的BlockManager匯報block的存儲信息。
總之就是HttpBroadcast導致獲取廣播變量的請求集中于driver端,容易引起driver端單點故障,網絡IO過高影響性能等問題,而TorrentBroadcast獲取廣播變量的請求服務即可以請求到driver端也可以在executor,避免了上述問題,當然這只是主要的優化點。
既然無法更新,那么只能動態生成,應用場景有實時風控中根據業務情況調整規則庫、實時日志ETL服務中獲取最新的日志格式以及字段變更等。
@volatile private var instance: Broadcast[Array[Int]] = null
//獲取廣播變量單例對象
def getInstance(sc: SparkContext, ctime: Long): Broadcast[Array[Int]] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(fetchLastestData())
}
}
}
instance
}
//加載要廣播的數據,并更新廣播變量
def updateBroadCastVar(sc: SparkContext, blocking: Boolean = false): Unit = {
if (instance != null) {
//刪除緩存在executors上的廣播副本,并可選擇是否在刪除完成后進行block等待
//底層可選擇是否將driver端的廣播副本也刪除
instance.unpersist(blocking)
instance = sc.broadcast(fetchLastestData())
}
}
def fetchLastestData() = {
//動態獲取需要更新的數據
//這里是偽代碼
Array(1, 2, 3)
}
val dataFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
...
...
stream.foreachRDD { rdd =>
val current_time = dataFormat.format(new Date())
val new_time = current_time.substring(14, 16).toLong
//每10分鐘更新一次
if (new_time % 10 == 0) {
updateBroadCastVar(rdd.sparkContext, true)
}
rdd.foreachPartition { records =>
instance.value
...
}
}
Spark流式程序中為何使用單例模式
1.廣播變量是只讀的,使用單例模式可以減少Spark流式程序中每次job生成執行,頻繁創建廣播變量帶來的開銷
2.廣播變量單例模式也需要做同步處理。在FIFO調度模式下,基本不會發生并發問題。但是如果你改變了調度模式,如采用公平調度模式,同時設置Spark流式程序并行執行的job數大于1,如設置參數spark.streaming.concurrentJobs=4,則必須加上同步代碼
3.在多個輸出流共享廣播變量的情況下,同時配置了公平調度模式,也會產生并發問題。建議在foreachRDD或者transform中使用局部變量進行廣播,避免在公平調度模式下不同job之間產生影響。
除了廣播變量,累加器也是一樣。在Spark流式組件如Spark Streaming底層,每個輸出流都會產生一個job,形成一個job集合提交到線程池里并發執行。
以上就是Spark廣播變量分析以及如何動態更新廣播變量,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。