您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關如何理解數據湖技術中的Apache Hudi,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
隨著Apache Parquet和Apache ORC等存儲格式以及Presto和Apache Impala等查詢引擎的發展,Hadoop生態系統有潛力作為面向分鐘級延時場景的通用統一服務層。然而,為了實現這一點,這需要在HDFS中實現高效且低延遲的數據攝取及數據準備。
為了解決這個問題,優步開發了Hudi項目,這是一個增量處理框架,高效和低延遲地為所有業務關鍵數據鏈路提供有力支持。事實上,Uber已經將Hudi開源。在深入的了解Hudi之前,我們首先討論一下為什么將Hadoop作為統一的服務層是一個不錯的想法。
Lambda架構是一種常見的數據處理體系結構,它的數據的處理依賴流式計算層(Streaming Layer)和批處理計算層(Batch Layer)的雙重計算。每隔幾個小時,批處理過程被啟動以計算精確的業務狀態,并將批量更新加載到服務層(Serving Layer)。同時,為了消除上述幾個小時的等待時間我們會在流式計算層對這個業務數據進行實時的狀態更新。然而,這個流計算的狀態只是一個最終結果的近似值,最終需要被批處理的計算結果所覆蓋。由于兩種模式提供的狀態差異,我們需要為批處理和流處理提供不同的服務層,并在這個上面再做合并抽象,或者設計應用一個相當復雜的服務系統(如Druid),用于同時在行級更新和批量加載中提供優異表現。
Lambda架構需要雙重計算和雙重服務
對于是否需要一個額外單獨的批處理層,Kappa架構認為一個單獨的流式計算層足以成為數據處理的通用解決方案。廣義上,所有數據計算都可以描述為生產者生產一個數據流,而消費者不斷的逐條迭代消費這個流中的記錄,如火山模型(Volcano Iterator model)。這就意味著流式計算層可以依靠堆資源以增加并行能力的方式來對業務狀態進行重算更新。這類系統可以依靠有效的檢查點(checkpoint)和大量的狀態管理來讓流式處理的結果不再只是一個近似值。這個模型被應用于很多的數據攝取任務。盡管如此,雖然批處理層在這個模型中被去掉了,但是在服務層仍然存在兩個問題。
如今很多流式處理引擎都支持行級的數據處理,這就要求我們的服務層也需要能夠支持行級更新的能力。通常,這類系統并不能對分析類的查詢掃描優化到這個地步,除非我們在內存中緩存大量記錄(如Memsql)或者有強大的索引支持(如ElasticSearch)。這些系統為了獲得數據攝取和掃描的性能往往需要增加成本和犧牲服務的可擴展性。出于這個原因,這類服務系統的數據駐留的能力往往是有限的,從時間上可能30~90天,從總量上來說幾個TB的數據就是他們的極限了。對于歷史數據的分析又會被重新定向到時延要求不那么高的HDFS上。
Kappa架構統一了處理層,但服務復雜性仍然存在
對于數據攝取延時、掃描性能和計算資源和操作復雜性的權衡是無法避免的。但是如果我們的業務場景對時延的要求并不是那么的高,比如能接受10分鐘左右的延遲,在我們如果有路子可以在HDFS上快速的進行數據攝取和數據準備的基礎上,服務層中的Speed Serving就不必要了。這么做可以統一服務層,大大降低系統整體的復雜度和資源消耗。
要將HDFS用作統一的服務層,我們不但需要使它支持存儲變更日志(或者叫日志記錄系統),而且需要支持根據實際業務維度來分區、壓縮、去重的業務狀態管理。這類統一服務層需具備如下幾個特性:
大型HDFS數據集的快速變更能力
數據存儲需要針對分析類掃描進行優化(列存)
有效的連接和將更新傳播到上層建模數據集的能力
被壓縮的業務狀態變更是無法避免的,即使我們以事件時間(Event time)作為業務分區字段。由于遲到數據和事件時間和處理時間(Processing time)的不一致,在數據攝取場景中我們依然需要對老的分區進行必要的更新操作。最后就算我們把處理時間作為分區字段,依然存在一些需要進行更新的場景,比如由于安全、審計方面的原因對原數據進行校正的需求。
作為一個增量處理框架,我們的Hudi支持前面章節中所述的所有需求。一言以蔽之,Hudi是一種針對分析型業務的、掃描優化的數據存儲抽象,它能夠使HDFS數據集在分鐘級的時延內支持變更,也支持下游系統對這個數據集的增量處理。
Hudi數據集通過自定義的InputFormat
兼容當前Hadoop生態系統,包括Apache Hive,Apache Parquet,Presto和Apache Spark,使得終端用戶可以無縫的對接。
基于Hudi簡化的服務架構,分鐘級延時
該數據流模型通過時延和數據完整性保證兩個維度去權衡以構建數據管道。下圖所示的是Uber Engineering如何根據這兩個維度進行處理方式的劃分。
Uber在不同延遲和完整性級別上的用例分布
對于很少一些需要真正做到約1分鐘的延時的用例及簡單業務指標的展示應用,我們基于行級的流式處理。對于傳統的機器學習和實驗有效性分析用例,我們選擇更加擅長較重計算的批處理。對于包含復雜連接或者重要數據處理的近實時場景,我們基于Hudi以及它的增量處理原語來獲得兩全其美的結果。想要了解Uber使用Hudi的更多用例和場景,可以去他們的Githup文檔(https://uber.github.io/hudi/use_cases.html)里面看一下。
Hudi數據集的組織目錄結構與Hive表示非常相似,一份數據集對應這一個根目錄。數據集被打散為多個分區,分區字段以文件夾形式存在,該文件夾包含該分區的所有文件。在根目錄下,每個分區都有唯一的分區路徑。每個分區記錄分布于多個文件中。每個文件都有惟一的fileId
和生成文件的commit
所標識。如果發生更新操作時,多個文件共享相同的fileId,但會有不同的commit
。
每條記錄由記錄的key值進行標識并映射到一個fileId。一條記錄的key與fileId之間的映射一旦在第一個版本寫入該文件時就是永久確定的。換言之,一個fileId標識的是一組文件,每個文件包含一組特定的記錄,不同文件之間的相同記錄通過版本號區分。
Hudi Storage由三個不同部分組成:
Metadata - 以時間軸(timeline)的形式將數據集上的各項操作元數據維護起來,以支持數據集的瞬態視圖,這部分元數據存儲于根目錄下的元數據目錄。一共有三種類型的元數據:
Commits - 一個單獨的commit包含對數據集之上一批數據的一次原子寫入操作的相關信息。我們用單調遞增的時間戳來標識commits,標定的是一次寫入操作的開始。
Cleans - 用于清除數據集中不再被查詢所用到的舊版本文件的后臺活動。
Compactions - 用于協調Hudi內部的數據結構差異的后臺活動。例如,將更新操作由基于行存的日志文件歸集到列存數據上。
Index - Hudi維護著一個索引,以支持在記錄key存在情況下,將新記錄的key快速映射到對應的fileId。索引的實現是插件式的,
Bloom filter - 存儲于數據文件頁腳。默認選項,不依賴外部系統實現。數據和索引始終保持一致。
Apache HBase - 可高效查找一小批key。在索引標記期間,此選項可能快幾秒鐘。
Data - Hudi以兩種不同的存儲格式存儲所有攝取的數據。這塊的設計也是插件式的,用戶可選擇滿足下列條件的任意數據格式:
讀優化的列存格式(ROFormat)。缺省值為Apache Parquet
寫優化的行存格式(WOFormat)。缺省值為Apache Avro
Hudi存儲內核。
Hudi對HDFS的使用模式進行了優化。Compaction是將數據從寫優化格式轉換為讀優化格式的關鍵操作。Compaction操作的基本并行單位是對一個fileID的重寫,Hudi保證所有的數據文件的大小和HDFS的塊大小對齊,這樣可以使Compaction操作的并行度、查詢的并行度和HDFS文件總數間取得平衡。Compaction操作也是插件式的,可以擴展為合并不頻繁更新的老的數據文件已進一步減少文件總數。
Hudi是一個Spark的第三方庫,以Spark Streaming的方式運行數據攝取作業,這些作業一般建議以1~2分鐘左右的微批(micro-batch)進行處理。當然,在權衡自己業務在時延要求和資源層面的前提下,我們也可以用Apache Oozie或者Apache Airflow來進行離線作業周期性調度。
在默認配置下,Hudi使用一下寫入路徑:
Hudi從相關的分區下的parquet文件中加載BloomFilter索引,并通過傳入key值映射到對應的文件來標記是更新還是插入。此處的連接操作可能由于輸入數據的大小,分區的分布或者單個分區下的文件數問題導致數據傾斜。通過對連接字段進行范圍分區以及新建子分區的方式處理,以避免Spark某些低版本中處理Shuffle文件時的2GB限制的問題 - https://issues.apache.org/jira/browse/SPARK-6190。
Hudi按分區對insert
進行分組,分配一個fileId,然后對相應的日志文件進行append操作,知道文件大小達到HDSF塊大小。然后,新的fileId生成,重復上述過程,直到所有的數據都被插入。
一個有時間限制compaction操作會被后臺以幾分鐘為周期調度起來,生成一個compactions的優先級列表,并壓縮一個fileId包含的所有avro文件以生成進行當前parquet文件的下一個版本。
Compaction操作是異步的,鎖定幾個特定的日志版本進行壓縮,并以新的日志記錄更新到對應fileId中。鎖維護在Zookeeper中。
Compaction操作的優先級順序由被壓縮的日志數據大小決定,并基于一個Compaction策略可配置。每一輪壓縮迭代過程中,大文件優先被壓縮,因為重寫parquet文件的開銷并不會根據文件的更新次數進行分攤。
Hudi在針對一個fileId進行更新操作時,如果對應的日志文件存在則append,反之,會新建日志文件。
如果數據攝取作業成功,一個commit
記錄會在Hudi的元數據時間軸中記錄,即將inflight文件重命名為commit文件,并將分區和所創建fileId版本的詳細信息記錄下來。
如上所述,Hudi會努力將文件大小和HDFS底層塊大小對齊。取決于一個分區下數據的總量和列存的壓縮效果,compaction操作依然能夠創建parquet小文件。因為對分區的插入操作會是以對現有小文件的更新來進行的,所有這些小文件的問題最終會被一次次的迭代不斷修正。最終,文件大小會不斷增長直到與HDFS塊大小一致。
首先,Spark的本身的重試機制會cover一些間歇性的異常,當然如果超過了重試次數的閾值,我們的整個作業都會失敗。下一次的迭代作業會在同一批次數據上進行重試。以下列出兩個重要的區別:
攝取失敗可能在日志文件中生成包含部分數據的avro塊 - 這個問題通過在commit
元數據中存儲對應數據塊的起始偏移量和日志文件版本來解決。當讀取日志文件時,偶爾發生的部分寫入的數據塊會被跳過,且會從正確的位置開始讀取avro文件。
Compaction過程失敗會生產包含部分數據的parquet文件 - 這個問題在查詢階段被解決,通過commit
元數據進行文件版本的過濾。查詢階段只會讀取最新的完成的compaction后的文件。這些失敗的compaction文件會在下一個compaction周期被回滾。
commit
時間軸元數據可以讓我們在同一份HDFS數據上同時享有讀取優化的視圖和實時視圖。客戶端可以基于延遲要求和查詢性能決定使用哪種視圖。Hudi以自定義的InputFormat
和一個Hive注冊模塊來提供這兩種視圖,后者可以將這兩種視圖注冊為Hive Metastore表。這兩種輸入格式都可以識別fileId和commit
時間,可以篩選并讀取最新提交的文件。然后,Hudi會基于這些數據文件生成輸入分片供查詢使用。
InputFormat
的具體信息如下:
HoodieReadOptimizedInputFormat - 提供掃描優化的視圖,篩選所有的日志文件并獲取最新版本的parquet壓縮文件
HoodieRealtimeInputFormat - 提供一個實時的視圖,除了會獲取最新的parquet壓縮文件之外,還提供一個RecordReader
以合并與parquet文件相關的日志文件。
這兩類InputFormat
都擴展了MapredParquetInputFormat
和VectorizedParquetRecordReader
,因此所有針對parquet文件的優化依然被保留。依賴于hoodie-hadoop-mr
類庫,Presto和Spark SQL可以對Hudi格式的Hive Metastore表做到開箱即用。
Hudi篩選出最新版本,在提供記錄之前將他們與日志文件合并
前面提到過,數據模型表需要在HDFS中處理和提供,才能使的HDFS算的上是一個統一的服務層。構建低延時的數據模型表需要能夠鏈接HDFS數據集記性增量處理。由于Hudi在元數據中維護了每次提交的提交時間以及對應的文件版本,使得我們可以基于起始時間戳和結束時間戳從特定的Hudi數據集中提取增量的變更數據集。
這個過程基本上與普通的查詢大致相同,只是選取特定時間范圍內的文件版本進行讀取而不是選最新的,提交時間會最為過濾條件被謂詞下推到文件掃描階段。這個增量結果集也收到文件自動清理的影響,如果某些時間范圍內的文件被自動清理掉了,那自然也是不能被訪問到了。
這樣我們就可以基于watermark做雙流join和流與靜態數據的join以對存儲在HDFS中的數據模型表計算和upsert
。
基于Hudi增量計算的建模過程
關于如何理解數據湖技術中的Apache Hudi就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。