您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“Apache Hudi異步Clustering部署操作的方法”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Apache Hudi異步Clustering部署操作的方法”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
Clustering(聚簇)的表服務來重新組織數據來提供更好的查詢性能,而不用降低攝取速度,并且我們已經知道如何部署同步Clustering
,本篇博客中,我們將討論近期社區做的一些改進以及如何通過HoodieClusteringJob
和DeltaStreamer
工具來部署異步Clustering。
通常講,Clustering
根據可配置的策略創建一個計劃,根據特定規則對符合條件的文件進行分組,然后執行該計劃。Hudi支持并發寫入,并在多個表服務之間提供快照隔離,從而允許寫入程序在后臺運行Clustering
時繼續攝取。有關Clustering
的體系結構的更詳細概述請查看上一篇博文。
如前所述Clustering
計劃和執行取決于可插拔的配置策略。這些策略大致可分為三類:計劃策略、執行策略和更新策略。
該策略在創建Clustering計劃時發揮作用。它有助于決定應該對哪些文件組進行Clustering。讓我們看一下Hudi提供的不同計劃策略。請注意,使用此配置可以輕松地插拔這些策略。
SparkSizeBasedClusteringPlanStrategy:根據基本文件的小文件限制選擇文件切片并創建Clustering
組,最大大小為每個組允許的最大文件大小。可以使用此配置指定最大大小。此策略對于將中等大小的文件合并成大文件非常有用,以減少跨冷分區分布的大量文件。
SparkRecentDaysClusteringPlanStrategy:根據以前的N
天分區創建一個計劃,將這些分區中的小文件片進行Clustering
,這是默認策略,當工作負載是可預測的并且數據是按時間劃分時,它可能很有用。
SparkSelectedPartitionsClusteringPlanStrategy:如果只想對某個范圍內的特定分區進行Clustering
,那么無論這些分區是新分區還是舊分區,此策略都很有用,要使用此策略,還需要在下面設置兩個配置(包括開始和結束分區):
hoodie.clustering.plan.strategy.cluster.begin.partition hoodie.clustering.plan.strategy.cluster.end.partition
注意:所有策略都是分區感知的,后兩種策略仍然受到第一種策略的大小限制的約束。
在計劃階段構建Clustering
組后,Hudi主要根據排序列和大小為每個組應用執行策略,可以使用此配置指定策略。
SparkSortAndSizeExecutionStrategy
是默認策略。使用此配置進行Clustering
時,用戶可以指定數據排序列。除此之外我們還可以為Clustering
產生的Parquet文件設置最大文件大小。該策略使用bulk_insert
將數據寫入新文件,在這種情況下,Hudi隱式使用一個分區器,該分區器根據指定列進行排序。通過這種策略改變數據布局,不僅提高了查詢性能,而且自動平衡了重寫開銷。
現在該策略可以作為單個Spark作業或多個作業執行,具體取決于在計劃階段創建的Clustering
組的數量。默認情況下Hudi將提交多個Spark作業并合并結果。如果要強制Hudi使用單Spark作業,請將執行策略類配置設置為SingleSparkJobExecutionStrategy
。
目前只能為未接收任何并發更新的表/分區調度Clustering
。默認情況下更新策略的配置設置為SparkRejectUpdateStrategy
。如果某個文件組在Clustering
期間有更新,則它將拒絕更新并引發異常。然而在某些用例中,更新是非常稀疏的,并且不涉及大多數文件組。簡單拒絕更新的默認策略似乎不公平。在這種用例中用戶可以將配置設置為SparkAllowUpdateStregy
。
我們討論了關鍵策略配置,下面列出了與Clustering
相關的所有其他配置。在此列表中一些非常有用的配置包括:
配置項 | 解釋 | 默認值 |
---|---|---|
hoodie.clustering.async.enabled | 啟用在表上的異步運行Clustering服務。 | false |
hoodie.clustering.async.max.commits | 通過指定應觸發多少次提交來控制異步Clustering的頻率。 | 4 |
hoodie.clustering.preserve.commit.metadata | 重寫數據時保留現有的_hoodie_commit_time。這意味著用戶可以在Clustering數據上運行增量查詢,而不會產生任何副作用。 | false |
之前我們已經了解了用戶如何設置同步Clustering。此外用戶可以利用HoodiecClusteringJob設置兩步異步Clustering。
隨著Hudi版本0.9.0的發布,我們可以在同一步驟中調度和執行Clustering
。我們只需要指定-mode
或-m
選項。有如下三種模式:
schedule
(調度):制定一個Clustering計劃。這提供了一個可以在執行模式下傳遞的instant
。
execute
(執行):在給定的instant
執行Clustering計劃,這意味著這里需要instant
。
scheduleAndExecute
(調度并執行):首先制定Clustering計劃并立即執行該計劃。
請注意要在原始寫入程序仍在運行時運行作業請啟用多寫入:
hoodie.write.concurrency.mode=optimistic_concurrency_control hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
使用spark submit
命令提交HoodieClusteringJob
示例如下:
spark-submit \ --class org.apache.hudi.utilities.HoodieClusteringJob \ /path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \ --props /path/to/config/clusteringjob.properties \ --mode scheduleAndExecute \ --base-path /path/to/hudi_table/basePath \ --table-name hudi_table_schedule_clustering \ --spark-memory 1g
clusteringjob.properties
配置文件示例如下
hoodie.clustering.async.enabled=true hoodie.clustering.async.max.commits=4 hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824 hoodie.clustering.plan.strategy.small.file.limit=629145600 hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy hoodie.clustering.plan.strategy.sort.columns=column1,column2
接著看下如何使用HudiDeltaStreamer
。現在我們可以使用DeltaStreamer
觸發異步Clustering。只需將hoodie.clustering.async.enabled為true
,并在屬性文件中指定其他Clustering配置,在啟動Deltastreamer
時可以將其位置設為-props
(與HoodieClusteringJob
配置類似)。
使用spark submit
命令提交HoodieDeltaStreamer
示例如下:
spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ /path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \ --props /path/to/config/clustering_kafka.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --source-ordering-field impresssiontime \ --table-type COPY_ON_WRITE \ --target-base-path /path/to/hudi_table/basePath \ --target-table impressions_cow_cluster \ --op INSERT \ --hoodie-conf hoodie.clustering.async.enabled=true \ --continuous
我們還可以使用Spark結構化流啟用異步Clustering,如下所示。
val commonOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" ) def getAsyncClusteringOpts(isAsyncClustering: String, clusteringNumCommit: String, executionStrategy: String):Map[String, String] = { commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering, HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit, HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key -> executionStrategy ) } def initStreamingWriteFuture(hudiOptions: Map[String, String]): Future[Unit] = { val streamingInput = // define the source of streaming Future { println("streaming starting") streamingInput .writeStream .format("org.apache.hudi") .options(hudiOptions) .option("checkpointLocation", basePath + "/checkpoint") .mode(Append) .start() .awaitTermination(10000) println("streaming ends") } } def structuredStreamingWithClustering(): Unit = { val df = //generate data frame val hudiOptions = getClusteringOpts("true", "1", "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy") val f1 = initStreamingWriteFuture(hudiOptions) Await.result(f1, Duration.Inf) }
讀到這里,這篇“Apache Hudi異步Clustering部署操作的方法”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。