您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關Flink如何實時分析Iceberg數據湖的CDC數據,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
我們先看一下今天的 topic 需要設計的是什么?輸入是一個 CDC 或者 upsert 的數據,輸出是 Database 或者是用于大數據 OLAP 分析的存儲。
我們常見的輸入主要有兩種數據,第一種數據是數據庫的 CDC 數據,不斷的產生 changeLog;另一種場景是流計算產生的 upsert 數據,在最新的 Flink 1.12 版本已經支持了 upsert 數據。
1.1 離線 HBase 集群分析 CDC 數據
我們通常想到的第一個方案,就是把 CDC upsert 的數據通過 Flink 進行一些處理之后,實時的寫到 HBase 當中。HBase 是一個在線的、能提供在線點查能力的一種數據庫,具有非常高的實時性,對寫入操作是非常友好的,也可以支持一些小范圍的查詢,而且集群可擴展。
這種方案其實跟普通的點查實時鏈路是同一套,那么用 HBase 來做大數據的 OLAP 的查詢分析有什么問題呢?
首先,HBase 是一個面向點查設計的一種數據庫,是一種在線服務,它的行存的索引不適合分析任務。典型的數倉設計肯定是要列存的,這樣壓縮效率和查詢效率才會高。第二,HBase 的集群維護成本比較高。最后,HBase 的數據是 HFile,不方便與大數據里數倉當中典型的 Parquet、Avro、Orc 等結合。
1.2 Apache Kudu 維護 CDC 數據集
針對 HBase 分析能力比較弱的情況,社區前幾年出現了一個新的項目,這就是 Apache Kudu 項目。Kudu 項目擁有 HBase 的點查能力的同時,采用列存,這樣列存加速非常適合 OLAP 分析。
這種方案會有什么問題呢?
首先 Kudu 是比較小眾的、獨立的集群,維護成本也比較高,跟 HDFS、S3、OSS 比較割裂。其次由于 Kudu 在設計上保留了點查能力,所以它的批量掃描性能不如 parquet,另外 Kudu 對于 delete 的支持也比較弱,最后它也不支持增量拉取。
1.3 直接導入 CDC 到 Hive 分析
第三種方案,也是大家在數倉中比較常用的方案,就是把 MySQL 的數據寫到 Hive,流程是:維護一個全量的分區,然后每天做一個增量的分區,最后把增量分區寫好之后進行一次 Merge ,寫入一個新的分區,流程上這樣是走得通的。Hive 之前的全量分區是不受增量的影響的,只有當增量 Merge 成功之后,分區才可查,才是一個全新的數據。這種純列存的 append 的數據對于分析是非常友好的。
這種方案會有什么問題呢?
增量數據和全量數據的 Merge 是有延時的,數據不是實時寫入的,典型的是一天進行一次 Merge,這就是 T+1 的數據了。所以,時效性很差,不支持實時 upsert。每次 Merge 都需要把所有數據全部重讀重寫一遍,效率比較差、比較浪費資源。
1.4 Spark + Delta 分析 CDC 數據
針對這個問題,Spark + Delta 在分析 CDC 數據的時候提供了 MERGE INTO 的語法。這并不僅僅是對 Hive 數倉的語法簡化,Spark + Delta 作為新型數據湖的架構(例如 Iceberg、Hudi),它對數據的管理不是分區,而是文件,因此 Delta 優化 MERGE INTO 語法,僅掃描和重寫發生變化的文件即可,因此高效很多。
我們評估一下這個方案,他的優點是僅依賴 Spark + Delta 架構簡潔、沒有在線服務、列存,分析速度非常快。優化之后的 MERGE INTO 語法速度也夠快。
這個方案,業務上是一個 Copy On Write 的一個方案,它只需要 copy 少量的文件,可以讓延遲做的相對低。理論上,在更新的數據跟現有的存量沒有很大重疊的話,可以把天級別的延遲做到小時級別的延遲,性能也是可以跟得上的。
這個方案在 Hive 倉庫處理 upsert 數據的路上已經前進了一小步了。但小時級別的延遲畢竟不如實時更有效,因此這個方案最大的缺點在 Copy On Write 的 Merge 有一定的開銷,延遲不能做的太低。
第一部分大概現有的方案就是這么多,同時還需要再強調一下,upsert 之所以如此重要,是因為在數據湖的方案中,upsert 是實現數據庫準實時、實時入湖的一個關鍵技術點。
2.1 Flink 對 CDC 數據消費的支持
第一,Flink 原生支持 CDC 數據消費。在前文 Spark + Delta 的方案中,MARGE INTO 的語法,用戶需要感知 CDC 的屬性概念,然后寫到 merge 的語法上來。但是 Flink 是原生支持 CDC 數據的。用戶只要聲明一個 Debezium 或者其他 CDC 的 format,Flink 上面的 SQL 是不需要感知任何 CDC 或者 upsert 的屬性的。Flink 中內置了 hidden column 來標識它 CDC 的類型數據,所以對用戶而言比較簡潔。
如下圖示例,在 CDC 的處理當中,Flink 在只用聲明一個 MySQL Binlog 的 DDL 語句,后面的 select 都不用感知 CDC 屬性。
2.2 Flink 對 Change Log Stream 的支持
下圖介紹的是 Flink 原生支持 Change Log Stream,Flink 在接入一個 Change Log Stream 之后,拓撲是不用關心 Change Log flag 的 SQL。拓撲完全是按照自己業務邏輯來定義,并且一直到最后寫入 Iceberg,中間不用感知 Change Log 的 flag。
2.3 Flink + Iceberg CDC 導入方案評估
最后,Flink + Iceberg 的 CDC 導入方案的優點是什么?
對比之前的方案,Copy On Write 跟 Merge On Read 都有適用的場景,側重點不同。Copy On Write 在更新部分文件的場景中,當只需要重寫其中的一部分文件時是很高效的,產生的數據是純 append 的全量數據集,在用于數據分析的時候也是最快的,這是 Copy On Write 的優勢。
另外一個是 Merge On Read,即將數據連同 CDC flag 直接 append 到 Iceberg 當中,在 merge 的時候,把這些增量的數據按照一定的組織格式、一定高效的計算方式與全量的上一次數據進行一次 merge。這樣的好處是支持近實時的導入和實時數據讀取;這套計算方案的 Flink SQL 原生支持 CDC 的攝入,不需要額外的業務字段設計。
Iceberg 是統一的數據湖存儲,支持多樣化的計算模型,也支持各種引擎(包括 Spark、Presto、hive)來進行分析;產生的 file 都是純列存的,對于后面的分析是非常快的;Iceberg 作為數據湖基于 snapshot 的設計,支持增量讀取;Iceberg 架構足夠簡潔,沒有在線服務節點,純 table format 的,這給了上游平臺方足夠的能力來定制自己的邏輯和服務化。
3.1 批量更新場景和 CDC 寫入場景
首先我們來了解一下在整個數據湖里面批量更新的兩個場景。
第一批量更新的這種場景,在這個場景中我們使用一個 SQL 更新了成千上萬行的數據,比如歐洲的 GDPR 策略,當一個用戶注銷掉自己的賬戶之后,后臺的系統是必須將這個用戶所有相關的數據全部物理刪除。
第二個場景是我們需要將 date lake 中一些擁有共同特性的數據刪除掉,這個場景也是屬于批量更新的一個場景,在這個場景中刪除的條件可能是任意的條件,跟主鍵(Primary key)沒有任何關系,同時這個待更新的數據集是非常大,這種作業是一個長耗時低頻次的作業。
另外是 CDC 寫入的場景,對于對 Flink 來說,一般常用的有兩種場景,第一種場景是上游的 Binlog 能夠很快速的寫到 data lake 中,然后供不同的分析引擎做分析使用; 第二種場景是使用 Flink 做一些聚合操作,輸出的流是 upsert 類型的數據流,也需要能夠實時的寫到數據湖或者是下游系統中去做分析。如下圖示例中 CDC 寫入場景中的 SQL 語句,我們使用單條 SQL 更新一行數據,這種計算模式是一種流式增量的導入,而且屬于高頻的更新。
3.2 Apache Iceberg 設計 CDC 寫入方案需要考慮的問題
接下來我們看下 iceberg 對于 CDC 寫入這種場景在方案設計時需要考慮哪些問題。
第一是正確性,即需要保證語義及數據的正確性,如上游數據 upsert 到 iceberg 中,當上游 upsert 停止后, iceberg 中的數據需要和上游系統中的數據保持一致。
第二是高效寫入,由于 upsert 的寫入頻率非常高,我們需要保持高吞吐、高并發的寫入。
第三是快速讀取,當數據寫入后我們需要對數據進行分析,這其中涉及到兩個問題,第一個問題是需要支持細粒度的并發,當作業使用多個 task 來讀取時可以保證為各個 task 進行均衡的分配以此來加速數據的計算;第二個問題是我們要充分發揮列式存儲的優勢來加速讀取。
第四是支持增量讀,例如一些傳統數倉中的 ETL,通過增量讀取來進行進一步數據轉換。
3.3 Apache Iceberg Basic
在介紹具體的方案細節之前,我們先了解一下 Iceberg 在文件系統中的布局,總體來講 Iceberg 分為兩部分數據,第一部分是數據文件,如下圖中的 parquet 文件,每個數據文件對應一個校驗文件(.crc文件)。第二部分是表元數據文件(Metadata 文件),包含 Snapshot 文件(snap-.avro)、Manifest 文件(.avro)、TableMetadata 文件(*.json)等。
下圖展示了在 iceberg 中 snapshot、manifest 及 partition 中的文件的對應關系。下圖中包含了三個 partition,第一個 partition 中有兩個文件 f1、f3,第二個 partition 有兩個文件f4、f5,第三個 partition 有一個文件f2。對于每一次寫入都會生成一個 manifest 文件,該文件記錄本次寫入的文件與 partition 的對應關系。再向上層有 snapshot 的概念,snapshot 能夠幫助快速訪問到整張表的全量數據,snapshot 記錄多個 manifest,如第二個 snapshot 包含 manifest2 和 manifest3。
3.4 INSERT、UPDATE、DELETE 寫入
在了解了基本的概念,下面介紹 iceberg 中 insert、update、delete 操作的設計。
下圖示例的 SQL 中展示的表包含兩個字段即 id、data,兩個字段都是 int 類型。在一個 transaction 中我們進行了圖示中的數據流操作,首先插入了(1,2)一條記錄,接下來將這條記錄更新為(1,3),在 iceberg 中 update 操作將會拆為 delete 和 insert 兩個操作。
這么做的原因是考慮到 iceberg 作為流批統一的存儲層,將 update 操作拆解為 delete 和 insert 操作可以保證流批場景做更新時讀取路徑的統一,如在批量刪除的場景下以 Hive 為例,Hive 會將待刪除的行的文件 offset 寫入到 delta 文件中,然后做一次 merge on read,因為這樣會比較快,在 merge 時通過 position 將原文件和 delta 進行映射,將會很快得到所有未刪除的記錄。
接下來又插入記錄(3,5),刪除了記錄(1,3),插入記錄(2,5),最終查詢是我們得到記錄(3,5)(2,5)。
上面操作看上去非常簡單,但在實現中是存在一些語義上的問題。如下圖中,在一個 transaction 中首先執行插入記錄(1,2)的操作,該操作會在 data file1 文件中寫入 INSERT(1,2),然后執行刪除記錄(1,2)操作,該操作會在 equalify delete file1 中寫入 DELETE(1,2),接著又執行插入記錄(1,2)操作,該操作會在 data file1 文件中再寫入INSERT(1,2),然后執行查詢操作。
在正常情況下查詢結果應該返回記錄 INSERT(1,2),但在實現中,DELETE(1,2)操作無法得知刪除的是 data file1 文件中的哪一行,因此兩行 INSERT(1,2)記錄都將被刪除。
那么如何來解決這個問題呢,社區當前的方式是采用了 Mixed position-delete and equality-delete。Equality-delete 即通過指定一列或多列來進行刪除操作,position-delete 是根據文件路徑和行號來進行刪除操作,通過將這兩種方法結合起來以保證刪除操作的正確性。
如下圖我們在第一個 transaction 中插入了三行記錄,即 INSERT(1,2)、INSERT(1,3)、INSERT(1,4),然后執行 commit 操作進行提交。接下來我們開啟一個新的 transaction 并執行插入一行數據(1,5),由于是新的 transaction,因此新建了一個 data file2 并寫入 INSERT(1,5)記錄,接下來執行刪除記錄(1,5),實際寫入 delete 時是:
在 position delete file1 文件寫入(file2, 0),表示刪除 data file2 中第 0 行的記錄,這是為了解決同一個 transaction 內同一行數據反復插入刪除的語義的問題。
在 equality delete file1 文件中寫入 DELETE (1,5),之所以寫入這個 delete 是為了確保本次 txn 之前寫入的 (1,5) 能被正確刪除。
然后執行刪除(1,4)操作,由于(1,4)在當前 transaction 中未曾插入過,因此該操作會使用 equality-delete 操作,即在 equality delete file1 中寫入(1,4)記錄。在上述流程中可以看出在當前方案中存在 data file、position delete file、equality delete file 三類文件。
在了解了寫入流程后,如何來讀取呢。如下圖所示,對于 position delete file 中的記錄(file2, 0)只需和當前 transaction 的 data file 進行 join 操作,對于 equality delete file 記錄(1,4)和之前的 transaction 中的 data file 進行 join 操作。最終得到記錄 INSERT(1,3)、INSERT(1,2)保證了流程的正確性。
3.5 Manifest 文件的設計
上面介紹了 insert、update 及 delete,但在設計 task 的執行計劃時我們對 manifest 進行了一些設計,目的是通過 manifest 能夠快速到找到 data file,并按照數據大小進行分割,保證每個 task 處理的數據盡可能的均勻分布。
如下圖示例,包含四個 transaction,前兩個 transaction 是 INSERT 操作,對應 M1、M2,第三個 transaction 是 DELETE 操作,對應 M3,第四個 transaction 是 UPDATE 操作,包含兩個 manifest 文件即 data manifest 和 delete manifest。
對于為什么要對 manifest 文件拆分為 data manifest 和 delete manifest 呢,本質上是為了快速為每個 data file 找到對應的 delete file 列表。可以看下圖示例,當我們在 partition-2 做讀取時,需要將 deletefile-4 與datafile-2、datafile-3 做一個 join 操作,同樣也需要將 deletefile-5 與 datafile-2、datafile-3 做一個 join 操作。
以 datafile-3 為例,deletefile 列表包含 deletefile-4 和 deletefile-5 兩個文件,如何快速找到對應的 deletefIle 列表呢,我們可以根據上層的 manifest 來進行查詢,當我們將 manifest 文件拆分為 data manifest 和 delete manifest 后,可以將 M2(data manifest)與 M3、M4(delete manifest)先進行一次 join 操作,這樣便可以快速的得到 data file 所對應的 delete file 列表。
3.6 文件級別的并發
另一個問題是我們需要保證足夠高的并發讀取,在 iceberg 中這點做得非常出色。在 iceberg 中可以做到文件級別的并發讀取,甚至文件中更細粒度的分段的并發讀取,比如文件有 256MB,可以分為兩個 128MB 進行并發讀取。這里舉例說明,假設 insert 文件跟 delete 文件在兩個 Bucket 中的布局方式如下圖所示。
我們通過 manifest 對比發現,datafile-2 的 delete file 列表只有 deletefile-4,這樣可以將這兩個文件作為一個單獨的 task(圖示中Task-2)進行執行,其他的文件也是類似,這樣可以保證每個 task 數據較為均衡的進行 merge 操作。
對于這個方案我們做了簡單的總結,如下圖所示。首先這個方案的優點可以滿足正確性,并且可以實現高吞吐寫入和并發高效的讀取,另外可以實現 snapshot 級別的增量的拉取。
當前該方案還是比較粗糙,下面也有一些可以優化的點。
第一點,如果同一個 task 內的 delete file 有重復可以做緩存處理,這樣可以提高 join 的效率。
第二點,當 delete file 比較大需要溢寫到磁盤時可以使用 kv lib 來做優化,但這不依賴外部服務或其他繁重的索引。
第三點,可以設計 Bloom filter(布隆過濾器)來過濾無效的 IO,因為對于 Flink 中常用的 upsert 操作會產生一個 delete 操作和一個 insert 操作,這會導致在 iceberg 中 data file 和 delete file 大小相差不大,這樣 join 的效率不會很高。如果采用 Bloom Filter,當 upsert 數據到來時,拆分為 insert 和 delete 操作,如果通過 bloom filter 過濾掉那些之前沒有 insert 過數據的 delete 操作(即如果這條數據之前沒有插入過,則不需要將 delete 記錄寫入到 delete file 中),這將極大的提高 upsert 的效率。
第四點,是需要一些后臺的 compaction 策略來控制 delete file 文件大小,當 delete file 越少,分析的效率越高,當然這些策略并不會影響正常的讀寫。
3.7 增量文件集的 Transaction 提交
前面介紹了文件的寫入,下圖我們介紹如何按照 iceberg 的語義進行寫入并且供用戶讀取。主要分為數據和 metastore 兩部分,首先會有 IcebergStreamWriter 進行數據的寫入,但此時寫入數據的元數據信息并沒有寫入到 metastore,因此對外不可見。第二個算子是 IcebergFileCommitter,該算子會將數據文件進行收集, 最終通過 commit transaction 來完成寫入。
在 Iceberg 中并沒有其他任何其他第三方服務的依賴,而 Hudi 在某些方面做了一些 service 的抽象,如將 metastore 抽象為獨立的 Timeline,這可能會依賴一些獨立的索引甚至是其他的外部服務來完成。
下面是我們未來的一些規劃,首先是 Iceberg 內核的一些優化,包括方案中涉及到的全鏈路穩定性測試及性能的優化, 并提供一些 CDC 增量拉取的相關 Table API 接口。
在 Flink 集成上,會實現 CDC 數據的自動和手動合并數據文件的能力,并提供 Flink 增量拉取 CDC 數據的能力。
在其他生態集成上,我們會對 Spark、Presto 等引擎進行集成,并借助 Alluxio 加速數據查詢。
看完上述內容,你們對Flink如何實時分析Iceberg數據湖的CDC數據有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。