91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Delta Lake在Soul的應用實踐是怎么樣的

發布時間:2021-12-24 10:43:40 來源:億速云 閱讀:144 作者:柒染 欄目:大數據

Delta Lake在Soul的應用實踐是怎么樣的,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

一、背景介紹

(一)業務場景

傳統離線數倉模式下,日志入庫前首要階段便是ETL,Soul的埋點日志數據量龐大且需動態分區入庫,在按day分區的基礎上,每天的動態分區1200+,分區數據量大小不均,數萬條到數十億條不等。下圖為我們之前的ETL過程,埋點日志輸入Kafka,由Flume采集到HDFS,再經由天級Spark ETL任務,落表入Hive。任務凌晨開始運行,數據處理階段約1h,Load階段1h+,整體執行時間為2-3h。

Delta Lake在Soul的應用實踐是怎么樣的

(二)存在的問題

在上面的架構下,我們面臨如下問題:

1.天級ETL任務耗時久,影響下游依賴的產出時間。
2.凌晨占用資源龐大,任務高峰期搶占大量集群資源。
3.ETL任務穩定性不佳且出錯需凌晨解決、影響范圍大。

二、為什么選擇Delta?

為了解決天級ETL逐漸尖銳的問題,減少資源成本、提前數據產出,我們決定將T+1級ETL任務轉換成T+0實時日志入庫,在保證數據一致的前提下,做到數據落地即可用。
之前我們也實現了Lambda架構下離線、實時分別維護一份數據,但在實際使用中仍存在一些棘手問題,比如:無法保證事務性,小文件過多帶來的集群壓力及查詢性能等問題,最終沒能達到理想化使用。

所以這次我們選擇了近來逐漸進入大家視野的數據湖架構,數據湖的概念在此我就不過多贅述了,我理解它就是一種將元數據視為大數據的Table Format。目前主流的數據湖分別有Delta Lake(分為開源版和商業版)、Hudi、Iceberg,三者都支持了ACID語義、Upsert、Schema動態變更、Time Travel等功能,其他方面我們做些簡單的總結對比:

開源版Delta

優勢:

1.支持作為source流式讀
2.Spark3.0支持sql操作

劣勢:

1.引擎強綁定Spark
2.手動Compaction
3.Join式Merge,成本高

Hudi

優勢:

1.基于主鍵的快速Upsert/Delete
2.Copy on Write / Merge on Read 兩種merge方式,分別適配讀寫場景優化
3.自動Compaction

劣勢:

1.寫入綁定Spark/DeltaStreamer
2.API較為復雜

Iceberg

優勢:

1.可插拔引擎

劣勢:

1.調研時還在發展階段,部分功能尚未完善
2.Join式Merge,成本高

調研時期,阿里云的同學提供了EMR版本的Delta,在開源版本的基礎上進行了功能和性能上的優化,諸如:SparkSQL/Spark Streaming SQL的集成,自動同步Delta元數據信息到HiveMetaStore(MetaSync功能),自動Compaction,適配Tez、Hive、Presto等更多查詢引擎,優化查詢性能(Zorder/DataSkipping/Merge性能)等等

三、實踐過程

測試階段,我們反饋了多個EMR Delta的bug,比如:Delta表無法自動創建Hive映射表,Tez引擎無法正常讀取Delta類型的Hive表,Presto和Tez讀取Delta表數據不一致,均得到了阿里云同學的快速支持并一一解決。

引入Delta后,我們實時日志入庫架構如下所示:

Delta Lake在Soul的應用實踐是怎么樣的

數據由各端埋點上報至Kafka,通過Spark任務分鐘級以Delta的形式寫入HDFS,然后在Hive中自動化創建Delta表的映射表,即可通過Hive MR、Tez、Presto等查詢引擎直接進行數據查詢及分析。

我們基于Spark,封裝了通用化ETL工具,實現了配置化接入,用戶無需寫代碼即可實現源數據到Hive的整體流程接入。并且,為了更加適配業務場景,我們在封裝層實現了多種實用功能:

1. 實現了類似Iceberg的hidden partition功能,用戶可選擇某些列做適當變化形成一個新的列,此列可作為分區列,也可作為新增列,使用SparkSql操作。如:有日期列date,那么可以通過 'substr(date,1,4) as year' 生成新列,并可以作為分區。
2. 為避免臟數據導致分區出錯,實現了對動態分區的正則檢測功能,比如:Hive中不支持中文分區,用戶可以對動態分區加上'\w+'的正則檢測,分區字段不符合的臟數據則會被過濾。
3. 實現自定義事件時間字段功能,用戶可選數據中的任意時間字段作為事件時間落入對應分區,避免數據漂移問題。
4. 嵌套Json自定義層數解析,我們的日志數據大都為Json格式,其中難免有很多嵌套Json,此功能支持用戶選擇對嵌套Json的解析層數,嵌套字段也會被以單列的形式落入表中。
5. 實現SQL化自定義配置動態分區的功能,解決埋點數據傾斜導致的實時任務性能問題,優化資源使用,此場景后面會詳細介紹。

平臺化建設:我們已經把日志接入Hive的整體流程嵌入了Soul的數據平臺中,用戶可通過此平臺申請日志接入,由審批人員審批后進行相應參數配置,即可將日志實時接入Hive表中,簡單易用,降低操作成本。

Delta Lake在Soul的應用實踐是怎么樣的

為了解決小文件過多的問題,EMR Delta實現了Optimize/Vacuum語法,可以定期對Delta表執行Optimize語法進行小文件的合并,執行Vacuum語法對過期文件進行清理,使HDFS上的文件保持合適的大小及數量。值得一提的是,EMR Delta目前也實現了一些auto-compaction的策略,可以通過配置來自動觸發compaction,比如:小文件數量達到一定值時,在流式作業階段啟動minor compaction任務,在對實時任務影響較小的情況下,達到合并小文件的目的。

四、問題 & 方案

接下來介紹一下我們在落地Delta的過程中遇到過的問題

(一)埋點數據動態分區數據量分布不均導致的數據傾斜問題

Soul的埋點數據是落入分區寬表中的,按埋點類型分區,不同類型的埋點數據量分布不均,例如:通過Spark寫入Delta的過程中,5min為一個Batch,大部分類型的埋點,5min的數據量很小(10M以下),但少量埋點數據量卻在5min能達到1G或更多。數據落地時,我們假設DataFrame有M個partition,表有N個動態分區,每個partition中的數據都是均勻且混亂的,那么每個partition中都會生成N個文件分別對應N個動態分區,那么每個Batch就會生成M*N個小文件。

Delta Lake在Soul的應用實踐是怎么樣的

為了解決上述問題,數據落地前對DataFrame按動態分區字段repartition,這樣就能保證每個partition中分別有不同分區的數據,這樣每個Batch就只會生成N個文件,即每個動態分區一個文件,這樣解決了小文件膨脹的問題。但與此同時,有幾個數據量過大的分區的數據也會只分布在一個partition中,就導致了某幾個partition數據傾斜,且這些分區每個Batch產生的文件過大等問題。

解決方案:如下圖,我們實現了用戶通過SQL自定義配置repartition列的功能,簡單來說,用戶可以使用SQL,把數據量過大的幾個埋點,通過加鹽方式打散到多個partition,對于數據量正常的埋點則無需操作。通過此方案,我們把Spark任務中每個Batch執行最慢的partition的執行時間從3min提升到了40s,解決了文件過小或過大的問題,以及數據傾斜導致的性能問題。

Delta Lake在Soul的應用實踐是怎么樣的

(二)應用層基于元數據的動態schema變更

數據湖支持了動態schema變更,但在Spark寫入之前,構造DataFrame時,是需要獲取數據schema的,如果此時無法動態變更,那么便無法把新字段寫入Delta表,Delta的動態schena便也成了擺設。埋點數據由于類型不同,每條埋點數據的字段并不完全相同,那么在落表時,必須取所有數據的字段并集,作為Delta表的schema,這就需要我們在構建DataFrame時便能感知是否有新增字段。

解決方案:我們額外設計了一套元數據,在Spark構建DataFrame時,首先根據此元數據判斷是否有新增字段,如有,就把新增字段更新至元數據,以此元數據為schema構建DataFrame,就能保證我們在應用層動態感知schema變更,配合Delta的動態schema變更,新字段自動寫入Delta表,并把變化同步到對應的Hive表中。

(三)Spark Kafka偏移量提交機制導致的數據重復

我們在使用Spark Streaming時,會在數據處理完成后將消費者偏移量提交至Kafka,調用的是
spark-streaming-kafka-0-10中的commitAsync API。我一直處于一個誤區,以為數據在處理完成后便會提交當前Batch消費偏移量。但后來遇到Delta表有數據重復現象,排查發現偏移量提交時機為下一個Batch開始時,并不是當前Batch數據處理完成后就提交。那么問題來了:假如一個批次5min,在3min時數據處理完成,此時成功將數據寫入Delta表,但偏移量卻在5min后(第二個批次開始時)才成功提交,如果在3min-5min這個時間段中,重啟任務,那么就會重復消費當前批次的數據,造成數據重復。

解決方案:

1.StructStreaming支持了對Delta的exactly-once,可以使用StructStreaming適配解決。
2.可以通過其他方式維護消費偏移量解決。

(四)查詢時解析元數據耗時較多

因為Delta單獨維護了自己的元數據,在使用外部查詢引擎查詢時,需要先解析元數據以獲取數據文件信息。隨著Delta表的數據增長,元數據也逐漸增大,此操作耗時也逐漸變長。
解決方案:阿里云同學也在不斷優化查詢方案,通過緩存等方式盡量減少對元數據的解析成本。

(五)關于CDC場景

目前我們基于Delta實現的是日志的Append場景,還有另外一種經典業務場景CDC場景。Delta本身是支持Update/Delete的,是可以應用在CDC場景中的。但是基于我們的業務考量,暫時沒有將Delta使用在CDC場景下,原因是Delta表的Update/Delete方式是Join式的Merge方式,我們的業務表數據量比較大,更新頻繁,并且更新數據涉及的分區較廣泛,在Merge上可能存在性能問題。
阿里云的同學也在持續在做Merge的性能優化,比如Join的分區裁剪、Bloomfilter等,能有效減少Join時的文件數量,尤其對于分區集中的數據更新,性能更有大幅提升,后續我們也會嘗試將Delta應用在CDC場景。

五、后續計劃

1.基于Delta Lake,進一步打造優化實時數倉結構,提升部分業務指標實時性,滿足更多更實時的業務需求。
2.打通我們內部的元數據平臺,實現日志接入->實時入庫->元數據+血緣關系一體化、規范化管理。
3.持續觀察優化Delta表查詢計算性能,嘗試使用Delta的更多功能,比如Z-Ordering,提升在即席查詢及數據分析場景下的性能。

看完上述內容,你們掌握Delta Lake在Soul的應用實踐是怎么樣的的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

通化县| 肥城市| 舟山市| 德兴市| 佛冈县| 高阳县| 鄯善县| 澎湖县| 崇明县| 西吉县| 饶河县| 彭州市| 阜平县| 兖州市| 亳州市| 临桂县| 玛多县| 娄烦县| 徐水县| 普洱| 松潘县| 关岭| 呼图壁县| 顺平县| 互助| 靖边县| 云浮市| 涡阳县| 台东市| 中山市| 卓资县| 平昌县| 庆元县| 兴业县| 元江| 杭锦后旗| 东莞市| 嘉义市| 鄂伦春自治旗| 梅河口市| 万年县|