您好,登錄后才能下訂單哦!
Spark Streaming中的反壓機制是Spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。
當批處理時間(Batch Processing Time)大于批次間隔(Batch Interval,即 BatchDuration)時,說明處理數據的速度小于數據攝入的速度,持續時間過長或源頭數據暴增,容易造成數據在內存中堆積,最終導致Executor OOM或任務奔潰。
在這種情況下,若是基于Kafka Receiver的數據源,可以通過設置spark.streaming.receiver.maxRate來控制最大輸入速率;若是基于Direct的數據源(如Kafka Direct Stream),則可以通過設置spark.streaming.kafka.maxRatePerPartition來控制最大輸入速率。當然,在事先經過壓測,且流量高峰不會超過預期的情況下,設置這些參數一般沒什么問題。但最大值,不代表是最優值,最好還能根據每個批次處理情況來動態預估下個批次最優速率。在Spark 1.5.0以上,就可通過背壓機制來實現。開啟反壓機制,即設置spark.streaming.backpressure.enabled為true,Spark Streaming會自動根據處理能力來調整輸入速率,從而在流量高峰時仍能保證最大的吞吐和性能。
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
// 處理結束時間
processingEnd <- batchCompleted.batchInfo.processingEndTime
// 處理時間,即`processingEndTime` - `processingStartTime`
workDelay <- batchCompleted.batchInfo.processingDelay
// 在調度隊列中的等待時間,即`processingStartTime` - `submissionTime`
waitDelay <- batchCompleted.batchInfo.schedulingDelay
// 當前批次處理的記錄數
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
可以看到,接著又調用的是computeAndPublish
方法,如下:
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
// 根據處理時間、調度時間、當前Batch記錄數,預估新速率
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
// 設置新速率
rateLimit.set(s.toLong)
// 發布新速率
publish(getLatestRate())
}
}
更深一層,具體調用的是rateEstimator.compute
方法來預估新速率,如下:
def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Double]
spark.streaming.backpressure.enabled
默認值false,是否啟用反壓機制。
spark.streaming.backpressure.initialRate
默認值無,初始最大接收速率。只適用于Receiver Stream,不適用于Direct Stream。類型為整數,默認直接讀取所有,在1開啟的情況下,限制第一次批處理應該消費的數據,因為程序冷啟動隊列里面有大量積壓,防止第一次全部讀取,造成系統阻塞
spark.streaming.kafka.maxRatePerPartition
類型為整數,默認直接讀取所有,限制每秒每個消費線程讀取每個kafka分區最大的數據量
注意: 只有 3 激活的時候,每次消費的最大數據量,就是設置的數據量,如果不足這個數,就有多少讀多少,如果超過這個數字,就讀取這個數字的設置的值
只有 1+3 激活的時候,每次消費讀取的數量最大會等于3設置的值,最小是spark根據系統負載自動推斷的值,消費的數據量會在這兩個范圍之內變化根據系統情況,但第一次啟動會有多少讀多少數據。此后按 1+3 設置規則運行
1+2+3 同時激活的時候,跟上一個消費情況基本一樣,但第一次消費會得到限制,因為我們設置第一次消費的頻率了。
spark.streaming.backpressure.rateEstimator
默認值pid,速率控制器,Spark 默認只支持此控制器,可自定義。
spark.streaming.backpressure.pid.proportional
默認值1.0,只能為非負值。當前速率與最后一批速率之間的差值對總控制信號貢獻的權重。用默認值即可。
spark.streaming.backpressure.pid.integral
默認值0.2,只能為非負值。比例誤差累積對總控制信號貢獻的權重。用默認值即可。
spark.streaming.backpressure.pid.derived
默認值0.0,只能為非負值。比例誤差變化對總控制信號貢獻的權重。用默認值即可。
spark.streaming.backpressure.pid.minRate
默認值100,只能為正數,最小速率。
//啟用反壓機制
conf.set("spark.streaming.backpressure.enabled","true")
//最小攝入條數控制
conf.set("spark.streaming.backpressure.pid.minRate","1")
//最大攝入條數控制
conf.set("spark.streaming.kafka.maxRatePerPartition","12")
//初始最大接收速率控制
conf.set("spark.streaming.backpressure.initialRate","10")
要保證反壓機制真正起作用前Spark 應用程序不會崩潰,需要控制每個批次最大攝入速率。以Direct Stream為例,如Kafka Direct Stream,則可以通過spark.streaming.kafka.maxRatePerPartition參數來控制。此參數代表了 每秒每個分區最大攝入的數據條數。假設BatchDuration為10秒,spark.streaming.kafka.maxRatePerPartition為12條,kafka topic 分區數為3個,則一個批(Batch)最大讀取的數據條數為360條(31210=360)。同時,需要注意,該參數也代表了整個應用生命周期中的最大速率,即使是背壓調整的最大值也不會超過該參數。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。