有狀態的計算作為容錯以及數據一致性的保證,是當今實時計算必不可少的特性之一,流行的實時計算引擎包括 Google Dataflow、Flink、Spark (Structure) Streaming、Kafka Streams 都分別提供對內置 State 的支持。State 的引入使得實時應用可以不依賴外部數據庫來存儲元數據及中間數據,部分情況下甚至可以直接用 State 存儲結果數據,這讓業界不禁思考: State 和 Database 是何種關系?有沒有可能用 State 來代替數據庫呢?
在這個課題上,Flink 社區是比較早就開始探索的。總體來說,Flink 社區的努力可以分為兩條線: 一是在作業運行時通過作業查詢接口訪問 State 的能力,即 QueryableState;二是通過 State 的離線 dump 文件(Savepoint)來離線查詢和修改 State 的能力,即即將引入的 Savepoint Processor API。
QueryableState
在 2017 年發布的 Flink 1.2 版本,Flink 引入了 QueryableState 的特性以允許用戶通過特定的 client 查詢作業 State 的內容 [1],這意味著 Flink 應用可以在完全不依賴 State 存儲介質以外的外部存儲的情況下提供實時訪問計算結果的能力。
只通過 Queryable State 提供實時數據訪問
然而,QueryableState 雖然設想上比較理想化,但由于依賴底層架構的改動較多且功能也比較受限,它一直處于 Beta 版本并不能用于生產環境。針對這個問題,在前段時間騰訊的工程師楊華提出 QueryableState 的改進計劃 [2]。在郵件列表中,社區就 QueryableState 是否可以用于代替數據庫作了討論并出現了不同的觀點。筆者結合個人見解將 State as Database 的主要優缺點整理如下。
優點:
更低的數據延遲。一般情況下 Flink 應用的計算結果需要同步到外部的數據庫,比如定時觸發輸出窗口計算結果,而這種同步通常是定時的會帶來一定的延遲,導致計算是實時的而查詢卻不是實時的尷尬局面,而直接 State 則可以避免這個問題。
相對地,假如用 Operator State 來記錄總得分和總時長(并行度設為 1),我們注冊 total_score 和 total_time 兩個 State,得到的表有兩個:
total_score |
------- |
14,500 |
total_time5,600
至此 Savepoint 和 Database 的對應關系應該是比較清晰明了的。而對于 Savepoint 來說還有不同的 StateBackend 來決定 State 具體如何持續化,這顯然對應的是數據庫的存儲引擎。在 MySQL 中,我們可以通過簡單的一行命令 ALTER TABLE xxx ENGINE = InnoDB; 來改變存儲引擎,在背后 MySQL 會自動完成繁瑣的格式轉換工作。而對于 Savepoint 來說,由于 StateBackend 各自的存儲格式不兼容,目前尚不能方便地切換 StateBackend。為此,社區在不久前創建 FLIP-41 [5] 來進一步完善 Savepoint 的可操作性。
總結
State as Database 是實時計算發展的大趨勢,它并不是要代替數據庫的使用,而是借鑒數據庫領域的經驗拓展 State 接口使其操作方式更接近我們熟悉的數據庫。對于 Flink 而言,State 的外部使用可以分為在線的實時訪問和離線的訪問和修改,分別將由 Queryable State 和 Savepoint Processor API 兩個特性支持。