您好,登錄后才能下訂單哦!
這篇文章主要介紹spark 3.0.1中AQE配置的示例分析,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
從spark configuration,到在最早在spark 1.6版本就已經有了AQE;到了spark 2.x版本,intel大數據團隊進行了相應的原型開發和實踐;到了spark 3.0時代,Databricks和intel一起為社區貢獻了新的AQE
配置項 | 默認值 | 官方說明 | 分析 |
---|---|---|---|
spark.sql.adaptive.enabled | false | 是否開啟自適應查詢 | 此處設置為true開啟 |
spark.sql.adaptive.coalescePartitions.enabled | true | 是否合并臨近的shuffle分區(根據'spark.sql.adaptive.advisoryPartitionSizeInBytes'的閾值來合并) | 此處默認為true開啟,分析見: 分析1 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum | (none) | shuffle合并分區之前的初始分區數,默認為spark.sql.shuffle.partitions的值 | 分析見:分析2 |
spark.sql.adaptive.coalescePartitions.minPartitionNum | (none) | shuffle 分區合并后的最小分區數,默認為spark集群的默認并行度 | 分析見: 分析3 |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64MB | 建議的shuffle分區的大小,在合并分區和處理join數據傾斜的時候用到 | 分析見:分析3 |
spark.sql.adaptive.skewJoin.enabled | true | 是否開啟join中數據傾斜的自適應處理 | |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5 | 數據傾斜判斷因子,必須同時滿足skewedPartitionFactor和skewedPartitionThresholdInBytes | 分析見:分析4 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | 數據傾斜判斷閾值,必須同時滿足skewedPartitionFactor和skewedPartitionThresholdInBytes | 分析見:分析4 |
spark.sql.adaptive.logLevel | debug | 配置自適應執行的計劃改變日志 | 調整為info級別,便于觀察自適應計劃的改變 |
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin | 0.2 | 轉為broadcastJoin的非空分區比例閾值,>=該值,將不會轉換為broadcastjoin | 分析見:分析5 |
在OptimizeSkewedJoin.scala中,我們看到ADVISORY_PARTITION_SIZE_IN_BYTES,也就是spark.sql.adaptive.advisoryPartitionSizeInBytes被引用的地方, (OptimizeSkewedJoin是物理計劃中的規則)
/** * The goal of skew join optimization is to make the data distribution more even. The target size * to split skewed partitions is the average size of non-skewed partition, or the * advisory partition size if avg size is smaller than it. */ private def targetSize(sizes: Seq[Long], medianSize: Long): Long = { val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize)) // It's impossible that all the partitions are skewed, as we use median size to define skew. assert(nonSkewSizes.nonEmpty) math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length) }
其中:
nonSkewSizes為task非傾斜的分區
targetSize返回的是max(非傾斜的分區的平均值,advisorySize),其中advisorySize為spark.sql.adaptive.advisoryPartitionSizeInBytes值,所以說 targetSize不一定是spark.sql.adaptive.advisoryPartitionSizeInBytes值
medianSize值為task的分區大小的中位值
在SQLConf.scala
def numShufflePartitions: Int = { if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) { getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions) } else { defaultNumShufflePartitions } }
從spark 3.0.1開始如果開啟了AQE和shuffle分區合并,則用的是spark.sql.adaptive.coalescePartitions.initialPartitionNum,這在如果有多個shuffle stage的情況下,增加分區數,可以有效的增強shuffle分區合并的效果
在CoalesceShufflePartitions.scala,CoalesceShufflePartitions是一個物理計劃的規則,會執行如下操作
if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) { plan } else { // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions, // we should skip it when calculating the `partitionStartIndices`. val validMetrics = shuffleStages.flatMap(_.mapStats) // We may have different pre-shuffle partition numbers, don't reduce shuffle partition number // in that case. For example when we union fully aggregated data (data is arranged to a single // partition) and a result of a SortMergeJoin (multiple partitions). val distinctNumPreShufflePartitions = validMetrics.map(stats => stats.bytesByPartitionId.length).distinct if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { // We fall back to Spark default parallelism if the minimum number of coalesced partitions // is not set, so to avoid perf regressions compared to no coalescing. val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM) .getOrElse(session.sparkContext.defaultParallelism) val partitionSpecs = ShufflePartitionsUtil.coalescePartitions( validMetrics.toArray, advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES), minNumPartitions = minPartitionNum) // This transformation adds new nodes, so we must use `transformUp` here. val stageIds = shuffleStages.map(_.id).toSet plan.transformUp { // even for shuffle exchange whose input RDD has 0 partition, we should still update its // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same // number of output partitions. case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) => CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION) } } else { plan } } }
也就是說:
如果是用戶自己指定的分區操作,如repartition操作,spark.sql.adaptive.coalescePartitions.minPartitionNum無效,且跳過分區合并優化
如果多個task進行shuffle,且task有不同的分區數的話,spark.sql.adaptive.coalescePartitions.minPartitionNum無效,且跳過分區合并優化
見ShufflePartitionsUtil.coalescePartition分析
在OptimizeSkewedJoin.scala中,我們看到
/** * A partition is considered as a skewed partition if its size is larger than the median * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than * ADVISORY_PARTITION_SIZE_IN_BYTES. */ private def isSkewed(size: Long, medianSize: Long): Boolean = { size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) && size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD) }
OptimizeSkewedJoin是個物理計劃的規則,會根據isSkewed來判斷是否數據數據有傾斜,而且必須是滿足SKEW_JOIN_SKEWED_PARTITION_FACTOR和SKEW_JOIN_SKEWED_PARTITION_THRESHOLD才會判斷為數據傾斜了
medianSize為task的分區大小的中位值
在AdaptiveSparkPlanExec方法getFinalPhysicalPlan中調用了reOptimize方法,而reOptimize方法則會執行邏輯計劃的優化操作:
private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = { logicalPlan.invalidateStatsCache() val optimized = optimizer.execute(logicalPlan) val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next() val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules) (newPlan, optimized) }
而optimizer 中有個DemoteBroadcastHashJoin規則:
@transient private val optimizer = new RuleExecutor[LogicalPlan] { // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)) ) }
而對于DemoteBroadcastHashJoin則有對是否broadcastjoin的判斷:
case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] { private def shouldDemote(plan: LogicalPlan): Boolean = plan match { case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined && stage.mapStats.isDefined => val mapStats = stage.mapStats.get val partitionCnt = mapStats.bytesByPartitionId.length val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0) partitionCnt > 0 && nonZeroCnt > 0 && (nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin case _ => false } def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { case j @ Join(left, right, _, _, hint) => var newHint = hint if (!hint.leftHint.exists(_.strategy.isDefined) && shouldDemote(left)) { newHint = newHint.copy(leftHint = Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH)))) } if (!hint.rightHint.exists(_.strategy.isDefined) && shouldDemote(right)) { newHint = newHint.copy(rightHint = Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH)))) } if (newHint.ne(hint)) { j.copy(hint = newHint) } else { j } } }
shouldDemote就是對是否進行broadcastjoin的判斷:
首先得是ShuffleQueryStageExec操作
如果非空分區比列大于nonEmptyPartitionRatioForBroadcastJoin,也就是spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin,則不會把mergehashjoin轉換為broadcastJoin
這在sql中先join在groupby的場景中比較容易出現
見coalescePartition如示:
def coalescePartitions( mapOutputStatistics: Array[MapOutputStatistics], advisoryTargetSize: Long, minNumPartitions: Int): Seq[ShufflePartitionSpec] = { // If `minNumPartitions` is very large, it is possible that we need to use a value less than // `advisoryTargetSize` as the target size of a coalesced task. val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum // The max at here is to make sure that when we have an empty table, we only have a single // coalesced partition. // There is no particular reason that we pick 16. We just need a number to prevent // `maxTargetSize` from being set to 0. val maxTargetSize = math.max( math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16) val targetSize = math.min(maxTargetSize, advisoryTargetSize) val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ") logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " + s"actual target size $targetSize.") // Make sure these shuffles have the same number of partitions. val distinctNumShufflePartitions = mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct // The reason that we are expecting a single value of the number of shuffle partitions // is that when we add Exchanges, we set the number of shuffle partitions // (i.e. map output partitions) using a static setting, which is the value of // `spark.sql.shuffle.partitions`. Even if two input RDDs are having different // number of partitions, they will have the same number of shuffle partitions // (i.e. map output partitions). assert( distinctNumShufflePartitions.length == 1, "There should be only one distinct value of the number of shuffle partitions " + "among registered Exchange operators.") val numPartitions = distinctNumShufflePartitions.head val partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]() var latestSplitPoint = 0 var coalescedSize = 0L var i = 0 while (i < numPartitions) { // We calculate the total size of i-th shuffle partitions from all shuffles. var totalSizeOfCurrentPartition = 0L var j = 0 while (j < mapOutputStatistics.length) { totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i) j += 1 } // If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a // new coalesced partition. if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) { partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i) latestSplitPoint = i // reset postShuffleInputSize. coalescedSize = totalSizeOfCurrentPartition } else { coalescedSize += totalSizeOfCurrentPartition } i += 1 } partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions) partitionSpecs }
totalPostShuffleInputSize 先計算出總的shuffle的數據大小
maxTargetSize取max(totalPostShuffleInputSize/minNumPartitions,16)的最大值,minNumPartitions也就是spark.sql.adaptive.coalescePartitions.minPartitionNum的值
targetSize取min(maxTargetSize,advisoryTargetSize),advisoryTargetSize也就是spark.sql.adaptive.advisoryPartitionSizeInBytes的值,所以說該值只是建議值,不一定是targetSize
while循環就是取相鄰的分區合并,對于每個task中的每個相鄰分區合并,直到不大于targetSize
見optimizeSkewJoin如示:
def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp { case smj @ SortMergeJoinExec(_, _, joinType, _, s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _), s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _) if supportedJoinTypes.contains(joinType) => assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length) val numPartitions = left.partitionsWithSizes.length // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions. val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2)) val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2)) logDebug( s""" |Optimizing skewed join. |Left side partitions size info: |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))} |Right side partitions size info: |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))} """.stripMargin) val canSplitLeft = canSplitLeftSide(joinType) val canSplitRight = canSplitRightSide(joinType) // We use the actual partition sizes (may be coalesced) to calculate target size, so that // the final data distribution is even (coalesced partitions + split partitions). val leftActualSizes = left.partitionsWithSizes.map(_._2) val rightActualSizes = right.partitionsWithSizes.map(_._2) val leftTargetSize = targetSize(leftActualSizes, leftMedSize) val rightTargetSize = targetSize(rightActualSizes, rightMedSize) val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val leftSkewDesc = new SkewDesc val rightSkewDesc = new SkewDesc for (partitionIndex <- 0 until numPartitions) { val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1 val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1 val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex // A skewed partition should never be coalesced, but skip it here just to be safe. val leftParts = if (isLeftSkew && !isLeftCoalesced) { val reducerId = leftPartSpec.startReducerIndex val skewSpecs = createSkewPartitionSpecs( left.mapStats.shuffleId, reducerId, leftTargetSize) if (skewSpecs.isDefined) { logDebug(s"Left side partition $partitionIndex is skewed, split it into " + s"${skewSpecs.get.length} parts.") leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex)) } skewSpecs.getOrElse(Seq(leftPartSpec)) } else { Seq(leftPartSpec) } // A skewed partition should never be coalesced, but skip it here just to be safe. val rightParts = if (isRightSkew && !isRightCoalesced) { val reducerId = rightPartSpec.startReducerIndex val skewSpecs = createSkewPartitionSpecs( right.mapStats.shuffleId, reducerId, rightTargetSize) if (skewSpecs.isDefined) { logDebug(s"Right side partition $partitionIndex is skewed, split it into " + s"${skewSpecs.get.length} parts.") rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex)) } skewSpecs.getOrElse(Seq(rightPartSpec)) } else { Seq(rightPartSpec) } for { leftSidePartition <- leftParts rightSidePartition <- rightParts } { leftSidePartitions += leftSidePartition rightSidePartitions += rightSidePartition } } logDebug("number of skewed partitions: " + s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}") if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) { val newLeft = CustomShuffleReaderExec( left.shuffleStage, leftSidePartitions, leftSkewDesc.toString) val newRight = CustomShuffleReaderExec( right.shuffleStage, rightSidePartitions, rightSkewDesc.toString) smj.copy( left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true) } else { smj } }
SortMergeJoinExec說明適用于sort merge join
assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)保證進行join的兩個task的分區數相等
分別計算進行join的task的分區中位數的大小leftMedSize和rightMedSize
分別計算進行join的task的分區的targetzise大小leftTargetSize和rightTargetSize
循環判斷兩個task的每個分區的是否存在傾斜,如果傾斜且滿足沒有進行過shuffle分區合并,則進行傾斜分區處理,否則不處理
createSkewPartitionSpecs方法為: 1.獲取每個join的task的對應分區的數據大小 2.根據targetSize分成多個slice
如果存在數據傾斜,則構造包裝成CustomShuffleReaderExec,進行后續任務的運行,最最終調用ShuffledRowRDD的compute方法 匹配case PartialMapperPartitionSpec進行數據的讀取,其中還會自動開啟“spark.sql.adaptive.fetchShuffleBlocksInBatch”批量fetch減少io
如:AdaptiveSparkPlanExec
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, context.subqueryCache), CoalesceShufflePartitions(context.session), // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs' // added by `CoalesceShufflePartitions`. So they must be executed after it. OptimizeSkewedJoin(conf), OptimizeLocalShuffleReader(conf) )
可見在AdaptiveSparkPlanExec中被調用 ,且CoalesceShufflePartitions先于OptimizeSkewedJoin, 而AdaptiveSparkPlanExec在InsertAdaptiveSparkPlan中被調用 ,而InsertAdaptiveSparkPlan在QueryExecution中被調用
而在InsertAdaptiveSparkPlan.shouldApplyAQE方法和supportAdaptive中我們看到
private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = { conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || { plan.find { case _: Exchange => true case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true case p => p.expressions.exists(_.find { case _: SubqueryExpression => true case _ => false }.isDefined) }.isDefined } } private def supportAdaptive(plan: SparkPlan): Boolean = { // TODO migrate dynamic-partition-pruning onto adaptive execution. sanityCheck(plan) && !plan.logicalLink.exists(_.isStreaming) && !plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) && plan.children.forall(supportAdaptive) }
如果不滿足以上條件也是不會開啟AQE的,如果要強制開啟,也可以配置spark.sql.adaptive.forceApply 為true(文檔中提示是內部配置)
在spark 3.0.1中已經廢棄了如下的配置:
spark.sql.adaptive.skewedPartitionMaxSplits spark.sql.adaptive.skewedPartitionRowCountThreshold spark.sql.adaptive.skewedPartitionSizeThreshold
以上是“spark 3.0.1中AQE配置的示例分析”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。