您好,登錄后才能下訂單哦!
小編給大家分享一下spark delta寫操作ACID事務中基礎類FileFormat/FileCommitProtocol的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
直接進入主題FileFormatWriter.write,這個是spark寫入文件的入口:
def write( sparkSession: SparkSession, plan: SparkPlan, fileFormat: FileFormat, committer: FileCommitProtocol, outputSpec: OutputSpec, hadoopConf: Configuration, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], statsTrackers: Seq[WriteJobStatsTracker], options: Map[String, String]) : Set[String] = {
因為delta是基于parquet實現的, 所以我們fileformat選擇分析ParquetFileFormat, 而對于FileCommitProtocol,我們分析SQLHadoopMapReduceCommitProtocol
該write方法實現比較長,我們講重點 :
committer.setupJob(job)
這個做一些job提交前的準備工作,比如設置jobId,taskId,設置OutputCommitter,OutputCommitter是用來。。
override def setupJob(jobContext: JobContext): Unit = { // Setup IDs val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) val taskId = new TaskID(jobId, TaskType.MAP, 0) val taskAttemptId = new TaskAttemptID(taskId, 0) // Set up the configuration object jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString) jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString) jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true) jobContext.getConfiguration.setInt("mapreduce.task.partition", 0) val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) committer.setupJob(jobContext) }
ParquetFileFormat對應的OutputCommitter是ParquetOutputCommitter,我們看一下方法:format.getOutputCommitter(context)
,ParquetOutputCommitter為:
@Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { if (committer == null) { Path output = getOutputPath(context); committer = new ParquetOutputCommitter(output, context); } return committer; }
而最終調用了父類的構造方法:
public FileOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { this(outputPath, (JobContext)context); if (outputPath != null) { workPath = getTaskAttemptPath(context, outputPath); } }
注意這里的workPath(全局變量)賦值為$outputPath/_temporary,在以下newTaskTempFile方法中會用到
接著進行setupJob操作:
public void setupJob(JobContext context) throws IOException { if (hasOutputPath()) { Path jobAttemptPath = getJobAttemptPath(context); FileSystem fs = jobAttemptPath.getFileSystem( context.getConfiguration()); if (!fs.mkdirs(jobAttemptPath)) { LOG.error("Mkdirs failed to create " + jobAttemptPath); } } else { LOG.warn("Output Path is null in setupJob()");
而getJobAttemptPath中引用到$path/_temporary目錄(其中path是文件輸出目錄),且建立該目錄
接下來是進行任務的提交:
sparkSession.sparkContext.runJob( rddWithNonEmptyPartitions, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { executeTask( description = description, jobIdInstant = jobIdInstant, sparkStageId = taskContext.stageId(), sparkPartitionId = taskContext.partitionId(), sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, committer, iterator = iter) }, rddWithNonEmptyPartitions.partitions.indices, (index, res: WriteTaskResult) => { committer.onTaskCommit(res.commitMsg) ret(index) = res })
其中重點看看executeTask方法:
committer.setupTask(taskAttemptContext) val dataWriter = if (sparkPartitionId != 0 && !iterator.hasNext) { // In case of empty job, leave first partition to save meta for file format like parquet. new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) { new SingleDirectoryDataWriter(description, taskAttemptContext, committer) } else { new DynamicPartitionDataWriter(description, taskAttemptContext, committer) }
對于SQLHadoopMapReduceCommitProtocol:setupTask實現如下:
committer = setupCommitter(taskContext) committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]()
而committer.setupTask(taskContext),對應到ParquetOutputCommitter為空實現,
之后看數據寫入的最終執行者dataWriter, 如果是沒有分區,則是SingleDirectoryDataWriter:
class SingleDirectoryDataWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol) extends FileFormatDataWriter(description, taskAttemptContext, committer) { private var fileCounter: Int = _ private var recordsInFile: Long = _ // Initialize currentWriter and statsTrackers newOutputWriter() private def newOutputWriter(): Unit = { recordsInFile = 0 releaseResources() val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) val currentPath = committer.newTaskTempFile( taskAttemptContext, None, f"-c$fileCounter%03d" + ext) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) statsTrackers.foreach(_.newFile(currentPath)) } override def write(record: InternalRow): Unit = { if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) { fileCounter += 1 assert(fileCounter < MAX_FILE_COUNTER, s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") newOutputWriter() } currentWriter.write(record) statsTrackers.foreach(_.newRow(record)) recordsInFile += 1 } }
這里寫文件是哪里呢?
val currentPath = committer.newTaskTempFile( taskAttemptContext, None, f"-c$fileCounter%03d" + ext)
對應到HadoopMapReduceCommitProtocol到newTaskTempFile方法為:
override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { val filename = getFilename(taskContext, ext) val stagingDir: Path = committer match { case _ if dynamicPartitionOverwrite => assert(dir.isDefined, "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") partitionPaths += dir.get this.stagingDir // For FileOutputCommitter it has its own staging path called "work path". case f: FileOutputCommitter => new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) case _ => new Path(path) } dir.map { d => new Path(new Path(stagingDir, d), filename).toString }.getOrElse { new Path(stagingDir, filename).toString } }
如果開啟partitionOverwriteMode,則設置為new Path(path, ".spark-staging-" + jobId) 如果沒有開啟partitionOverwriteMode,且FileOutputCommitter的子類,如果workpath存在則設置為workPath,否則為path,注意我們之前FileOutputCommitter構造方法中已經設置了workPath,所以最終的輸出目錄為$path/_temporary
所以job向該目錄寫入數據。 DynamicPartitionDataWriter的分析,讀者可以進行類似的分析,只不過目錄則加了分區信息,只寫入自己的分區目錄中
如果寫入成功的話執行如下:
try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { // Execute the task to write rows out and commit the task. while (iterator.hasNext) { dataWriter.write(iterator.next()) } dataWriter.commit() })(catchBlock = { // If there is an error, abort the task dataWriter.abort() logError(s"Job $jobId aborted.") }, finallyBlock = { dataWriter.close() })
dataWriter.commit()如下:
override def commit(): WriteTaskResult = { releaseResources() val summary = ExecutedWriteSummary( updatedPartitions = updatedPartitions.toSet, stats = statsTrackers.map(_.getFinalStats())) WriteTaskResult(committer.commitTask(taskAttemptContext), summary) }
首先會釋放資源,也就是關閉writer 之后調用FileCommitProtocol.commitTask();
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) }
而SparkHadoopMapRedUtil.commitTask最終調用FileOutputCommitter的commitTask方法把$PATH/_temporary下文件mv到$PATH下
之后返回統計的數值,數據格式如下:
case class BasicWriteTaskStats( numPartitions: Int, numFiles: Int, numBytes: Long, numRows: Long) extends WriteTaskStats
之后會committer.onTaskCommit(res.commitMsg)操作,
對于SQLHadoopMapReduceCommitProtocol的實現為: logDebug(s"onTaskCommit($taskCommit)")
下一步committer.commitJob(job, commitMsgs):
... committer.commitJob(jobContext) ... for ((src, dst) <- filesToMove) { fs.rename(new Path(src), new Path(dst)) } ... fs.delete(stagingDir, true)
這里主要涉及清理job,以及把task所產生的文件(writer輸出的臨時文件)移動到path目錄下,且清理臨時目錄,至此文件真正的寫入到了path目錄下
指標記錄
private[datasources] def processStats( statsTrackers: Seq[WriteJobStatsTracker], statsPerTask: Seq[Seq[WriteTaskStats]]) : Unit = { val numStatsTrackers = statsTrackers.length assert(statsPerTask.forall(_.length == numStatsTrackers), s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker. |There are $numStatsTrackers statsTrackers, but some task returned |${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead. """.stripMargin) val statsPerTracker = if (statsPerTask.nonEmpty) { statsPerTask.transpose } else { statsTrackers.map(_ => Seq.empty) } statsTrackers.zip(statsPerTracker).foreach { case (statsTracker, stats) => statsTracker.processStats(stats) } }
主要是把剛才job的指標通過statsTrackers傳給driver,而目前的statsTracker實現類為BasicWriteJobStatsTracker,也就是說最終會通過listenerbus以事件的形式傳播, 如下代碼:
class BasicWriteJobStatsTracker( serializableHadoopConf: SerializableConfiguration, @transient val metrics: Map[String, SQLMetric]) extends WriteJobStatsTracker { ... override def processStats(stats: Seq[WriteTaskStats]): Unit = { ... metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles) metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes) metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput) metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList) } }
至此整個spark parquet write文件的數據流程我們就已經全部過了一遍了,部分細節沒有展示。 最終的數據流如下:
實例化Job對象 | v FileCommitProtocol.setupJob -> OutputCommitter.setupJob 進行作業運行前的準備,如建立臨時目錄_temporary等 | v executeTask()-> FileCommitProtocol.setupTask -> OutputCommitter.setupTask 目前為空實現 | v FileCommitProtocol.newTaskTempFile/newTaskTempFileAbsPath 建立寫任務的臨時目錄 | v dataWriter.write() | v dataWriter.commit() 釋放資源以及返回寫入文件的指標信息 -> HadoopMapReduceCommitProtocol.commitTask | v SparkHadoopMapRedUtil.commitTask 完成mv $PATH/_temporary文件 到$PATH目錄,以及做outputCommitCoordination | v 返回需要額外臨時目錄的信息 | v FileCommitProtocol.onTaskCommit | v FileCommitProtocol.commitJob -> OutputCommitter.commitJob 清理$PATH/_temporary目錄且把寫額外臨時目錄下的文件mv到最終path目錄下 | v processStats,處理寫入的文件指標
那對應到delta中,spark寫入delta數據是怎么寫入的呢?其實流程和以上的流程一模一樣,唯一不同的是FileCommitProtocol類的實現,直接到TransactionalWrite.writeFiles:
def writeFiles( data: Dataset[_], writeOptions: Option[DeltaOptions], isOptimize: Boolean): Seq[AddFile] = { hasWritten = true ... val committer = getCommitter(outputPath) ... FileFormatWriter.write( sparkSession = spark, plan = physicalPlan, fileFormat = snapshot.fileFormat, // TODO doesn't support changing formats. committer = committer, outputSpec = outputSpec, hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration), partitionColumns = partitioningColumns, bucketSpec = None, statsTrackers = statsTrackers, options = Map.empty) } committer.addedStatuses }
而這里的commiter為DelayedCommitProtocol,如下:
new DelayedCommitProtocol("delta", outputPath.toString, None)
我們來看一下DelayedCommitProtocol方法:
override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { val filename = getFileName(taskContext, ext) val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String]) val relativePath = randomPrefixLength.map { prefixLength => getRandomPrefix(prefixLength) // Generate a random prefix as a first choice }.orElse { dir // or else write into the partition directory if it is partitioned }.map { subDir => new Path(subDir, filename) }.getOrElse(new Path(filename)) // or directly write out to the output path addedFiles.append((partitionValues, relativePath.toUri.toString)) new Path(path, relativePath).toString } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { if (addedFiles.nonEmpty) { val fs = new Path(path, addedFiles.head._2).getFileSystem(taskContext.getConfiguration) val statuses: Seq[AddFile] = addedFiles.map { f => val filePath = new Path(path, new Path(new URI(f._2))) val stat = fs.getFileStatus(filePath) AddFile(f._2, f._1, stat.getLen, stat.getModificationTime, true) } new TaskCommitMessage(statuses) } else { new TaskCommitMessage(Nil) } } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[AddFile]]).toArray addedStatuses ++= fileStatuses }
其中newTaskTempFile生成的文件中多了一個UUID.randomUUID.toString,這能減少文件的沖突
newTaskTempFile目前直接是返回了輸出目錄,而不是_temporary目錄
commitTask只是記錄增加的文件
commitJob并沒有真正的提交job,只是把AddFile保存到了內存中
后續我們會分析delta怎么處理AddFile,從而做到事務性
注意task輸出的文件目錄為: ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName} 如:/data/_temporary/0/_temporary/attempt_20190219101232_0003_m_000005_0/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet
以上是“spark delta寫操作ACID事務中基礎類FileFormat/FileCommitProtocol的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。