您好,登錄后才能下訂單哦!
小編給大家分享一下Flink 1.11與Hive批流一體數倉的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
為什么要做 Flink 和 Hive 集成的功能呢?最早的初衷是我們希望挖掘 Flink 在批處理方面的能力。眾所周知,Flink 在流計算方面已經是成功的引擎了,使用的用戶也非常多。在 Flink 的設計理念當中,批計算是流處理中的一個特例。也就意味著,如果 Flink 在流計算方面做好,其實它的架構也能很好的支持批計算的場景。在批計算的場景中,SQL 是一個很重要的切入點。因為做數據分析的同學,他們更習慣使用SQL 進行開發,而不是去寫 DataStream 或者 DataSet 這樣的程序。
Hadoop 生態圈的 SQL 引擎,Hive 是一個事實上的標準。大部分的用戶環境中都會使用到了 Hive 的一些功能,來搭建數倉。一些比較新的 SQL 的引擎,例如 Spark SQL、Impala ,它們其實都提供了與 Hive 集成的能力。為了方便的能夠對接上目前用戶已有的使用場景,所以我們認為對 Flink 而言,對接 Hive 也是不可缺少的功能。
因此,我們在 Flink 1.9 當中,就開始提供了與 Hive 集成的功能。當然在 1.9 版本里面,這個功能是作為試用版發布的。到了 Flink 1.10 版本,與 Hive 集成的功能就達到了生產可用。同時在 Flink 1.10 發布的時候,我們用 10TB 的 TPC-DS 測試集,對 Flink 和 Hive on MapReduce 進行了對比,對比結果如下:
藍色的方框表示 Flink 用的時間,桔紅色的方框表示 Hive on MapReduce 用的時間。最終的結果是 Flink 對于 Hive on MapReduce 大概提升了 7 倍左右的性能。所以驗證了 Flink SQL 可以很好的支持批計算的場景。
接下來介紹下 Flink 對接 Hive 的設計架構。對接 Hive 的時候需要幾個層面,分別是:
能夠訪問 Hive 的元數據;
讀寫 Hive 表數據;
Production Ready ;
1. 訪問 Hive 元數據
使用過 Hive 的同學應該都知道,Hive 的元數據是通過 Hive Metastore 來管理的。所以意味著 Flink 需要打通與 Hive Metastore 的通信。為了更好的訪問 Hive 元數據,在 Flink 這邊是提出了一套全新設計的 Catalog API 。
這個全新的接口是一個通用化的設計。它并不只是為了對接 Hive 元數據,理論上是它可以對接不同外部系統的元數據。
而且在一個 Flink Session 當中,是可以創建多個 Catalog ,每一個 Catalog 對應于一個外部系統。用戶可以在 Flink Table API 或者如果使用的是 SQL Client 的話,可以在 Yaml 文件里指定定義哪些 Catalog 。然后在 SQL Client 創建 TableEnvironment 的時候,就會把這些 Catalog 加載起來。TableEnvironment 通過CatalogManager 來管理這些不同的 Catalog 的實例。這樣 SQL Client 在后續的提交 SQL 語句的過程中,就可以使用這些 Catalog 去訪問外部系統的元數據了。
上面這張圖里列出了 2 個 Catalog 的實現。一個是 GenericlnMemoryCatalog ,把所有的元數據都保存在 Flink Client 端的內存里。它的行為是類似于 Catalog 接口出現之前 Flink 的行為。也就是所有的元數據的生命周期跟 SQL Client 的 Session 周期是一樣的。當 Session 結束,在 Session 里面創建的元數據也就自動的丟失了。
另一個是對接 Hive 著重介紹的 HiveCatalog 。HiveCatalog 背后對接的是 Hive Metastore 的實例,要與 Hive Metastore 進行通信來做元數據的讀寫。為了支持多個版本的 Hive,不同版本的 Hive Metastore 的API可能存在不兼容。所以在 HiveCatalog 和 Hive Metastore 之間又加了一個 HiveShim ,通過 HiveShim 可以支持不同版本的 Hive 。
這里的 HiveCatalog 一方面可以讓 Flink 去訪問 Hive 自身有的元數據,另一方面它也為 Flink 提供了持久化元數據的能力。也就是 HiveCatalog 既可以用來存儲 Hive的元數據,也可以存 Flink 使用的元數據。例如,在 Flink 中創建一張 Kafka 的表,那么這張表也是可以存到 HiveCatalog 里的。這樣也就是為 Flink 提供了持久化元數據的能力。在沒有 HiveCatalog 之前,是沒有持久化能力的。
2. 讀寫 Hive 表數據
有了訪問 Hive 元數據的能力后,另一個重要的方面是讀寫 Hive 表數據。Hive 的表是存在 Hadoop 的 file system 里的,這個 file system 是一個 HDFS ,也可能是其他文件系統。只要是實現了 Hadoop 的 file system 接口的,理論上都可以存儲Hive 的表。
在 Flink 當中:
讀數據時實現了 HiveTableSource
寫數據時實現了 HiveTableSink
而且設計的一個原則是:希望盡可能去復用 Hive 原有的 Input/Output Format、SerDe 等,來讀寫 Hive 的數據。這樣做的好處主要是 2 點,一個是復用可以減少開發的工作量。另一個是復用好處是盡可能與 Hive 保證寫入數據的兼容性。目標是Flink 寫入的數據,Hive 必須可以正常的讀取。反之, Hive 寫入的數據,Flink 也可以正常讀取。
3. Production Ready
在 Flink 1.10 中,對接 Hive 的功能已經實現了 Production Ready 。實現 Production Ready 主要是認為在功能上已經完備了。具體實現的功能如下:
下面將介紹下,在 Flink 1.11 版本中,對接 Hive 的一些新特性。
1. 簡化的依賴管理
首先做的是簡化使用 Hive connector 的依賴管理。Hive connector 的一個痛點是需要添加若干個 jar 包的依賴,而且使用的 Hive 版本的不同,所需添加的 jar 包就不同。例如下圖:
第一張圖是使用的 Hive 1.0.0 版本需要添加的 jar 包。第二張圖是用 Hive 2.2.0 版本需要添加的 jar 包。可以看出,不管是從 jar 包的個數、版本等,不同 Hive 版本添加的 jar 包是不一樣的。所以如果不仔細去讀文檔的話,就很容易導致用戶添加的依賴錯誤。一旦添加錯誤,例如添加少了或者版本不對,那么會報出來一些比較奇怪、難理解的錯誤。這也是用戶在使用 Hive connector 時暴露最多的問題之一。
所以我們希望能簡化依賴管理,給用戶提供更好的體驗。具體的做法是,在 Flink 1.11 版本中開始,會提供一些預先打好的 Hive 依賴包:
用戶可以根據自己的 Hive 版本,選擇對應的依賴包就可以了。
如果用戶使用的 Hive 并不是開源版本的 Hive ,用戶還是可以使用 1.10 那種方式,去自己添加單個 jar 包。
2. Hive Dialect 的增強
在 Flink 1.10 就引入了 Hive Dialect ,但是很少有人使用,因為這個版本的 Hive Dialect 功能比較弱。僅僅的一個功能是:是否允許創建分區表的開關。就是如果設置了 Hive Dialect ,那就可以在 Flink SQL 中創建分區表。如果沒設置,則不允許創建。
另一個關鍵的是它不提供 Hive 語法的兼容。如果設置了 Hive Dialect 并可以創建分區表,但是創建分區表的 DDL 并不是 Hive 的語法。
在 Flink 1.11 中著重對 Hive Dialect 的功能進行了增強。增強的目標是:希望用戶在使用 Flink SQL Client 的時候,能夠獲得與使用 Hive CLI 或 Beeline 近似的使用體驗。就是在使用 Flink SQL Client 中,可以去寫一些 Hive 特定的一些語法。或者說用戶在遷移至 Flink 的時候, Hive 的腳本可以完全不用修改。
為了實現上述目標,在 Flink 1.11 中做了如下改進:
給 Dialect 做了參數化,目前參數支持 default 和 hive 兩種值。default 是Flink 自身的 Dialect ,hive 是 Hive 的 Dialect。
SQL Client 和 API 均可以使用。
可以靈活的做動態切換,切換是語句級別的。例如 Session 創建后,第一個語句想用 Flink 的 Dialect 來寫,就設置成 default 。在執行了幾行語句后,想用 Hive 的 Dialect 來寫,就可以設置成 hive 。在切換時,就不需要重啟 Session。
兼容 Hive 常用 DDL 以及基礎的 DML。
提供與 Hive CLI 或 Beeline 近似的使用體驗。
3. 開啟 Hive Dialect
上圖是在 SQL Client 中開啟 Hive Dialect 的方法。在 SQL Client 中可以設置初始的 Dialect。可以在 Yaml 文件里設置,也可以在 SQL Client 起來后,進行動態的切換。
還可以通過 Flink Table API 的方式開啟 Hive Dialect :
可以看到通過 TableEnvironment 去獲取 Config 然后設置開啟。
4. Hive Dialect 支持的語法
Hive Dialect 的語法主要是在 DDL 方面進行了增強。因為在 1.10 中通過 Flink SQL寫 DDL 去操作 Hive 的元數據不是十分可用,所以要解決這個痛點,將主要精力集中在 DDL 方向了。
目前所支持的 DDL 如下:
5. 流式數據寫入Hive
在 Flink 1.11 中還做了流式數據場景,以及跟 Hive 相結合的功能,通過 Flink 與Hive 的結合,來幫助 Hive 數倉進行實時化的改造。
流式數據寫入 Hive 是借助 Streaming File Sink 實現的,它是完全 SQL 化的,不需要用戶進行代碼開發。流式數據寫入 Hive 也支持分區和非分區表。Hive 數倉一般都是離線數據,用戶對數據一致性要求比較高,所以支持 Exactly-Once 語義。流式數據寫 Hive 大概有 5-10 分鐘級別的延遲。如果希望延遲盡可能的低,那么產生的一個結果就是會生成更多的小文件。小文件對 HDFS 來說是不友好的,小文件多了以后,會影響 HDFS 的性能。這種情況下可以做一些小文的合并操作。
流式數據寫入 Hive 需要有幾個配置的地方:
對于分區表來說,要設置 Partition Commit Delay 的參數。這個參數的意義就是控制每個分區包含多長時間的數據,例如可設置成天、小時等。
Partition Commit Trigger 表示 Partition Commit 什么時候觸發,在 1.11 版本中支持 Process-time 和 Partition-time 觸發機制。
Partition Commit Policy 表示用什么方式提交分區。對于 Hive 來說,是需要將分區提交到 metastore, 這樣分區才是可見的。metastore 策略只支持 Hive 表。還有一個是 success-file 方式,success-file 是告訴下游的作業分區的數據已經準備好了。用戶也可以自定義,自己去實現一個提交方式。另外 Policy 可以指定多個的,例如可以同時指定 metastore 和 success-file。
下面看下流式數據寫入Hive的實現原理:
主要是兩個部分,一個是 StreamingFileWriter ,借助它實現數據的寫入,它會區分 Bucket,這里的 Buck 類似 Hive 的分區概念,每個 Subtask 都會往不同的 Bucket去寫數據。每個 Subtask 寫的 Bucket 同一個時間可能會維持 3 種文件,In-progress Files 表示正在寫的文件,Pending Files 表示文件已經寫完了但是還沒有提交,Finished Files 表示文件已經寫完并且也已經提交了。
另一個是 StreamingFileCommitter,在 StreamingFileWriter 后執行。它是用來提交分區的,所以對于非分區表就不需要它了。當 StreamingFileWriter 的一個分區數據準備好后,StreamingFileWriter 會向 StreamingFileCommitter 發一個 Commit Message,Commit Message 告訴 StreamingFileCommitter 那些數據已經準備好了的。然后進行提交的觸發 Commit Trigger,以及提交方式 Commit Policy。
下面是一個具體的例子:
例子中創建了一個叫 hive_table 的分區表,它有兩個分區 dt 和 hour。dt 代表的是日期的字符串,hour 代表小時的字符串。Commit trigger 設置的是 partition-time,Commit delay 設置的是1小時,Commit Policy 設置的是 metastore 和success-file。
6. 流式消費 Hive
在 Flink 1.10 中讀 Hive 數據的方式是批的方式去讀的,從 1.11 版本中,提供了流式的去讀 Hive 數據。
通過不斷的監控 Hive 數據表有沒有新數據,有的話就進行增量數據的消費。
如果要針對某一張 Hive 表開啟流式消費,可以在 table property 中開啟,或者也可以使用在 1.11 中新加的 dynamic options 功能,可以查詢的時候動態的指定 Hive 表是否需要打開流式讀取。
流式消費 Hive 支持分區表和非分區表。對于非分區表會監控表目錄下新文件添加,并增量讀取。對于分區表通過監控分區目錄和 Metastore 的方式確認是否有新分區添加,如果有新增分區,就會把新增分區數據讀取出來。這里需要注意,讀新增分區數據是一次性的。也就是新增加分區后,會把這個分區數據一次性都讀出來,在這之后就不再監控這個分區的數據了。所以如果需要用 Flink 流式消費 Hive 的分區表,那應該保證分區在添加的時候它的數據是完整的。
流式消費 Hive 數據也需要額外的指定一些參數。首先要指定消費順序,因為數據是增量讀取,所以需要指定要用什么順序消費數據,目前支持兩種消費順序 create-time 和 partition-time。
用戶還可以指定消費起點,類似于消費 kafka 指定 offset 這樣的功能,希望從哪個時間點的數據開始消費。Flink 去消費數據的時候,就會檢查并只會讀取這個時間點之后的數據。
最后還可以指定監控的間隔。因為目前監控新數據的添加都是要掃描文件系統的,可能你希望監控的不要太頻繁,太頻繁會給文件系統造成比較大的壓力。所以可以控制一個間隔。
最后看下流式消費的原理。先看流式消費非分區表:
圖中 ContinuoousFileMonitoringFunction 會不斷監控非分區表目錄下面的文件,會不斷的跟文件系統進行交互。一旦發現有新的文件添加了,就會對這些文件生成Splits ,并將 Splits 傳到 ContinuoousFileReaderOperator,FileReaderOperator 拿到 Splits 后就會到文件系統中實際的消費這些數據,然后把讀出來的數據再傳往下游處理。
對于流式消費分區表和非分區表區別不是很大,其中 HiveContinuousMonitoringFunction 也會去不斷的掃描文件系統,但是它掃描的是新增分區的目錄。當它發現有新增的分區目錄后,會進一步到 metstore 中做核查,查看是否這個分區已經提交到 metstore 中。如果已經提交,那就可以消費分區中的數據了。然后會把分區中的數據生成 Splits 傳給 ContinuousFileReaderOperator ,然后就可以對數據進行消費了。
7. 關聯 Hive 維表
關于 Hive 跟流式數據結合的另一個場景就是:關聯 Hive 維表。例如在消費流式數據時,與一張線下的 Hive 維表進行 join。
關聯Hive維表采用了 Flink 的 Temporal Table 的語法,就是把 Hive 的維表作為Temporal Table,然后與流式的表進行 join。想了解更多關于 Temporal Table 的內容,可查看 Flink 的官網。
關聯 Hive 維表的實現是每個 sub-task 將 Hive 表緩存在內存中,是緩存整張的Hive 表。如果 Hive 維表大小超過 sub-task 的可用內存,那么作業會失敗。
Hive 維表在關聯的時候,Hive 維表可能會發生更新,所以會允許用戶設置 hive 表緩存的超時時間。超過這個時間后,sub-task 會重新加載 Hive 維表。需要注意,這種場景不適用于 Hive 維表頻繁更新的情況,這樣會對 HDFS 文件系統造成很大的壓力。所以適用于 Hive 維表緩慢更新的情況。緩存超時時間一般設置的比較長,一般是小時級別的。
這張圖表示的是關聯 Hive 維表的原理。Streaming Data 代表流式數據,LookupJoinRunner 表示 Join 算子,它會拿到流式數據的 join key,并把 join key 傳給FileSystemLookupFunction。
FileSystemLookupFunction 是 一個Table function,它會去跟底層的文件系統交互并加載 Hive 表,然后在 Hive 表中查詢 join key,判斷哪些行數據是可以 join的。
下面是關聯 Hive 維表的例子:
這是 Flink 官網的一個例子,流式表是 Orders,LatestTates 是 Hive 的維表。
經過上面的介紹可以看出,在 Flink 1.11 中,在 Hive 數倉和批流一體的功能是進行了著重的開發。因為 Flink 是一個流處理的引擎,希望幫用戶更好的將批和流結合,讓 Hive 數倉實現實時化的改造,讓用戶更方便的挖掘數據的價值。
在 Flink 1.11 之前,Flink 對接 Hive 會做些批處理的計算,并且只支持離線的場景。離線的場景一個問題是延遲比較大,批作業的調度一般都會通過一些調度的框架去調度。這樣其實延遲會有累加的作用。例如第一個 job 跑完,才能去跑第二個 job...這樣依次執行。所以端對端的延遲就是所有 job 的疊加。
到了 1.11 之后,支持了 Hive 的流式處理的能力,就可以對 Hive 數倉進行一個實時化的改造。
例如 Online 的一些數據,用 Flink 做 ETL,去實時的寫入 Hive。當數據寫入 Hive之后,可以進一步接一個新的 Flink job,來做實時的查詢或者近似實時的查詢,可以很快的返回結果。同時,其他的 Flink job 還可以利用寫入 Hive 數倉的數據作為維表,來跟其它線上的數據進行關聯整合,來得到分析的結果。
以上是“Flink 1.11與Hive批流一體數倉的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。