您好,登錄后才能下訂單哦!
這篇文章主要介紹“如何理解微博基于Flink的實時計算平臺建設”,在日常操作中,相信很多人在如何理解微博基于Flink的實時計算平臺建設問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何理解微博基于Flink的實時計算平臺建設”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
相比于 Spark,目前 Spark 的生態總體更為完善一些,且在機器學習的集成和應用性暫時領先。但作為下一代大數據引擎的有力競爭者-Flink 在流式計算上有明顯優勢,Flink 在流式計算里屬于真正意義上的單條處理,每一條數據都觸發計算,而不是像 Spark 一樣的 Mini Batch 作為流式處理的妥協。Flink 的容錯機制較為輕量,對吞吐量影響較小,而且擁有圖和調度上的一些優化,使得 Flink 可以達到很高的吞吐量。而 Strom 的容錯機制需要對每條數據進行 ack,因此其吞吐量瓶頸也是備受詬病。
這里引用一張圖來對常用的實時計算框架做個對比。
cdn.com/3da0ac542030556f0def525ccf6ec7ee9eec5b1f.jpeg">
Flink 是一個開源的分布式實時計算框架。Flink 是有狀態的和容錯的,可以在維護一次應用程序狀態的同時無縫地從故障中恢復;它支持大規模計算能力,能夠在數千個節點上并發運行;它具有很好的吞吐量和延遲特性。同時,Flink 提供了多種靈活的窗口函數。
Flink 檢查點機制能保持 exactly-once 語義的計算。狀態保持意味著應用能夠保存已經處理的數據集結果和狀態。
Flink 支持流處理和窗口事件時間語義。事件時間可以很容易地通過事件到達的順序和事件可能的到達延遲流中計算出準確的結果。
Flink 支持基于時間、數目以及會話的非常靈活的窗口機制(window)。可以定制 window 的觸發條件來支持更加復雜的流模式。
Flink 高效的容錯機制允許系統在高吞吐量的情況下支持 exactly-once 語義的計算。Flink 可以準確、快速地做到從故障中以零數據丟失的效果進行恢復。
Flink 具有高吞吐量和低延遲(能快速處理大量數據)特性。下圖展示了 Apache Flink 和 Apache Storm 完成分布式項目計數任務的性能對比。
初期架構僅為計算與存儲兩層,新來的計算需求接入后需要新開發一個實時計算任務進行上線。重復模塊的代碼復用率低,重復率高,計算任務間的區別主要是集中在任務的計算指標口徑上。
在存儲層,各個需求方所需求的存儲路徑都不相同,計算指標可能在不通的存儲引擎上有重復,有計算資源以及存儲資源上的浪費情況。并且對于指標的計算口徑也是僅局限于單個任務需求里的,不通需求任務對于相同的指標的計算口徑沒有進行統一的限制于保障。各個業務方也是在不同的存儲引擎上開發數據獲取服務,對于那些專注于數據應用本身的團隊來說,無疑當前模式存在一些弊端。
隨著數據體量的增加以及業務線的擴展,前期架構模式的弊端逐步開始顯現。從當初單需求單任務的模式逐步轉變為通用的數據架構模式。為此,我們開發了一些基于 Flink 框架的通用組件來支持數據的快速接入,并保證代碼模式的統一性和維護性。在數據層,我們基于 Clickhouse 來作為我們數據倉庫的計算和存儲引擎,利用其支持多維 OLAP 計算的特性,來處理在多維多指標大數據量下的快速查詢需求。在數據分層上,我們參考與借鑒離線數倉的經驗與方法,構建多層實時數倉服務,并開發多種微服務來為數倉的數據聚合,指標提取,數據出口,數據質量,報警監控等提供支持。
整體架構分為五層:
1)接入層:接入原始數據進行處理,如 Kafka、RabbitMQ、File 等。
2)計算層:選用 Flink 作為實時計算框架,對實時數據進行清洗,關聯等操作。
3)存儲層:對清洗完成的數據進行數據存儲,我們對此進行了實時數倉的模型分層與構建,將不同應用場景的數據分別存儲在如 Clickhouse,Hbase,Redis,Mysql 等存儲。服務中,并抽象公共數據層與維度層數據,分層處理壓縮數據并統一數據口徑。
4)服務層:對外提供統一的數據查詢服務,支持從底層明細數據到聚合層數據 5min/10min/1hour 的多維計算服務。同時最上層特征指標類數據,如計算層輸入到Redis、Mysql 等也從此數據接口進行獲取。
5)應用層:以統一查詢服務為支撐對各個業務線數據場景進行支撐。
監控報警:對 Flink 任務的存活狀態進行監控,對異常的任務進行郵件報警并根據設定的參數對任務進行自動拉起與恢復。根據如 Kafka 消費的 offset 指標對消費處理延遲的實時任務進行報警提醒。
數據質量:監控實時數據指標,對歷史的實時數據與離線 hive 計算的數據定時做對比,提供實時數據的數據質量指標,對超過閾值的指標數據進行報警。
整體數據從原始數據接入后經過 ETL 處理, 進入實時數倉底層數據表,經過配置化聚合微服務組件向上進行分層數據的聚合。根據不同業務的指標需求也可通過特征抽取微服務直接配置化從數倉中抽取到如 Redis、ES、Mysql 中進行獲取。大部分的數據需求可通過統一數據服務接口進行獲取。
原始日志數據因為各業務日志的不同,所擁有的維度或指標數據并不完整。所以需要進行實時的日志的關聯才能獲取不同維度條件下的指標數據查詢結果。并且關聯日志的回傳周期不同,有在 10min 之內完成 95% 以上回傳的業務日志,也有類似于激活日志等依賴第三方回傳的有任務日志,延遲窗口可能大于1天。
并且最大日志關聯任務的日均數據量在 10 億級別以上,如何快速處理與構建實時關聯任務的問題首先擺在我們面前。對此我們基于 Flink 框架開發了配置化關聯組件。對于不同關聯日志的指標抽取,我們也開發了配置化指標抽取組件用于快速提取復雜的日志格式。以上兩個自研組件會在后面的內容里再做詳細介紹。
對于回傳晚的日志,我們在關聯窗口內未取得關聯結果。我們采用實時+離線的方式進行數據回刷補全。實時處理的日志我們會將未關聯的原始日志輸出到另外一個暫存地(Kafka),同時不斷消費處理這個未關聯的日志集合,設定超時重關聯次數與超時重關聯時間,超過所設定任意閾值后,便再進行重關聯。離線部分,我們采用 Hive 計算昨日全天日志與 N 天內的全量被關聯日志表進行關聯,將最終的結果回寫進去,替換實時所計算的昨日關聯數據。
① Operator Chain
為了更高效地分布式執行,Flink 會盡可能地將 operator 的 subtask 鏈接(chain)在一起形成 task。每個 task 在一個線程中執行。將 operators 鏈接成 task 是非常有效的優化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數據在緩沖區的交換,減少了延遲的同時提高整體的吞吐量。
Flink 會在生成 JobGraph 階段,將代碼中可以優化的算子優化成一個算子鏈(Operator Chains)以放到一個 task(一個線程)中執行,以減少線程之間的切換和緩沖的開銷,提高整體的吞吐量和延遲。下面以官網中的例子進行說明。
圖中棕色的長條表示等待時間,可以發現網絡等待時間極大地阻礙了吞吐和延遲。為了解決同步訪問的問題,異步模式可以并發地處理多個請求和回復。也就是說,你可以連續地向數據庫發送用戶 a、b、c 等的請求,與此同時,哪個請求的回復先返回了就處理哪個回復,從而連續的請求之間不需要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實現原理。
③ Checkpoint 優化
Flink 實現了一套強大的 checkpoint 機制,使它在獲取高吞吐量性能的同時,也能保證 Exactly Once 級別的快速恢復。
首先提升各節點 checkpoint 的性能考慮的就是存儲引擎的執行效率。Flink 官方支持的三種 checkpoint state 存儲方案中,Memory 僅用于調試級別,無法做故障后的數據恢復。其次還有 Hdfs 與 Rocksdb,當所做 Checkpoint 的數據大小較大時,可以考慮采用 Rocksdb 來作為 checkpoint 的存儲以提升效率。
其次的思路是資源設置,我們都知道 checkpoint 機制是在每個 task 上都會進行,那么當總的狀態數據大小不變的情況下,如何分配減少單個 task 所分的 checkpoint 數據變成了提升 checkpoint 執行效率的關鍵。
最后,增量快照. 非增量快照下,每次 checkpoint 都包含了作業所有狀態數據。而大部分場景下,前后 checkpoint 里,數據發生變更的部分相對很少,所以設置增量 checkpoint,僅會對上次 checkpoint 和本次 checkpoint 之間狀態的差異進行存儲計算,減少了 checkpoint 的耗時。
在任務執行過程中,會遇到各種各樣的問題,導致任務異常甚至失敗。所以如何做好異常情況下的恢復工作顯得異常重要。
① 設定重啟策略
Flink 支持不同的重啟策略,以在故障發生時控制作業如何重啟。集群在啟動時會伴隨一個默認的重啟策略,在沒有定義具體重啟策略時會使用該默認策略。如果在工作提交時指定了一個重啟策略,該策略會覆蓋集群的默認策略。
默認的重啟策略可以通過 Flink 的配置文件 flink-conf.yaml 指定。配置參數 restart-strategy 定義了哪個策略被使用。
常用的重啟策略:
固定間隔(Fixed delay);
失敗率(Failure rate);
無重啟(No restart)。
② 設置 HA
Flink 在任務啟動時指定 HA 配置主要是為了利用 Zookeeper 在所有運行的 JobManager 實例之間進行分布式協調 .Zookeeper 通過 leader 選取和輕量級一致性的狀態存儲來提供高可用的分布式協調服務。
③ 任務監控報警平臺
在實際環境中,我們遇見過因為集群狀態不穩定而導致的任務失敗。在 Flink 1.6 版本中,甚至遇見過任務出現假死的情況,也就是 Yarn 上的 job 資源依然存在,而 Flink 任務實際已經死亡。為了監測與恢復這些異常的任務,并且對實時任務做統一的提交、報警監控、任務恢復等管理,我們開發了任務提交與管理平臺。通過 Shell 拉取 Yarn 上 Running 狀態與 Flink Job 狀態的列表進行對比,心跳監測平臺上的所有任務,并進行告警、自動恢復等操作。
④ 作業指標監控
Flink 任務在運行過程中,各 Operator 都會產生各自的指標數據,例如,Source 會產出 numRecordIn、numRecordsOut 等各項指標信息,我們會將這些指標信息進行收集,并展示在我們的可視化平臺上。指標平臺如下圖:
⑤ 任務運行節點監控
我們的 Flink 任務都是運行在 Yarn 上,針對每一個運行的作業,我們需要監控其運行環境。會收集 JobManager 及 TaskManager 的各項指標。收集的指標有 jobmanager-fullgc-count、jobmanager-younggc-count、jobmanager-fullgc-time、jobmanager-younggc-time、taskmanager-fullgc-count、taskmanager-younggc-count、taskmanager-fullgc-time、taskmanager-younggc-time 等,用于判斷任務運行環境的健康度,及用于排查可能出現的問題。監控界面如下:
從 Flink 的官方文檔,我們知道 Flink 的編程模型分為四層,sql 是最高層的 api, Table api 是中間層,DataSteam/DataSet Api 是核心,stateful Streaming process 層是底層實現。
剛開始我們直接使用 Flink Table 做為數據關聯的方式,直接將接入進來的 DataStream 注冊為 Dynamic Table 后進行兩表關聯查詢,如下圖:
但嘗試后發現在做那些日志數據量大的關聯查詢時往往只能在較小的時間窗口內做查詢,否則會超過 datanode 節點單臺內存限制,產生異常。但為了滿足不同業務日志延遲到達的情況,這種實現方式并不通用。
之后,我們直接在 DataStream 上進行處理,在 CountWindow 窗口內進行關聯操作,將被關聯的數據 Hash 打散后存儲在各個 datanode 節點的 Rocksdb 中,利用 Flink State 原生支持 Rocksdb 做 Checkpoint 這一特性進行算子內數據的備份與恢復。這種方式是可行的,但受制于 Rocksdb 集群物理磁盤為非 SSD 的因素,這種方式在我們的實際線上場景中關聯耗時較高。
如 Redis 類的 KV 存儲的確在查詢速度上提升不少,但類似廣告日志數據這樣單條日志大小較大的情況,會占用不少寶貴的機器內存資源。經過調研后,我們選取了 Hbase 作為我們日志關聯組件的關聯數據存儲方案。
為了快速構建關聯任務,我們開發了基于 Flink 的配置化組件平臺,提交配置文件即可生成數據關聯任務并自動提交到集群。下圖是任務執行的處理流程。
示意圖如下:
下圖是關聯組件內的執行流程圖:
隨著日志量的增加,某些需要進行關聯的日志數量可能達到日均十幾億甚至幾十億的量級。前期關聯組件的配置化生成任務的方式的確解決了大部分線上業務需求,但隨著進一步的關聯需求增加,Hbase 面臨著巨大的查詢壓力。在我們對 Hbase 表包括 rowkey 等一系列完成優化之后,我們開始了對關聯組件的迭代與優化。
第一步,減少 Hbase 的查詢。我們使用 Flink Interval Join 的方式,先將大部分關聯需求在程序內部完成,只有少部分仍需查詢的日志會去查詢外部存儲(Hbase). 經驗證,以請求日志與實驗日志關聯為例,對于設置 Interval Join 窗口在 10s 左右即可減少 80% 的 hbase 查詢請求。
① Interval Join 的語義示意圖
數據 JOIN 的區間 - 比如時間為 3 的 EXP 會在 IMP 時間為[2, 4]區間進行JOIN;
WaterMark - 比如圖示 EXP 一條數據時間是 3,IMP 一條數據時間是 5,那么WaterMark是根據實際最小值減去 UpperBound 生成,即:Min(3,5)-1 = 2;
過期數據 - 出于性能和存儲的考慮,要將過期數據清除,如圖當 WaterMark 是 2 的時候時間為 2 以前的數據過期了,可以被清除。
② Interval Join 內部實現邏輯
③ Interval Join 改造
因 Flink 原生的 Intervak Join 實現的是 Inner Join,而我們業務中所需要的是 Left Join,具體改造如下:
取消右側數據流的 join 標志位;
左側數據流有 join 數據時不存 state。
2)關聯率動態監控
在任務執行中,往往會出現意想不到的情況,比如被關聯的數據日志出現缺失,或者日志格式錯誤引發的異常,造成關聯任務的關聯率下降嚴重。那么此時關聯任務雖然繼續在運行,但對于整體數據質量的意義不大,甚至是反向作用。在任務進行恢復的時,還需要清除異常區間內的數據,將 Kafka Offset 設置到異常前的位置再進行處理。
故我們在關聯組件的優化中,加入了動態監控,下面示意圖:
關聯任務中定時探測指定時間范圍 Hbase 是否有最新數據寫入,如果沒有,說明寫 Hbase 任務出現問題,則終止關聯任務;
當寫 Hbase 任務出現堆積時,相應的會導致關聯率下降,當關聯率低于指定閾值時終止關聯任務;
當關聯任務終止時會發出告警,修復上游任務后可重新恢復關聯任務,保證關聯數據不丟失。
為了快速進行日志數據的指標抽取,我們開發了基于 Flink 計算平臺的指標抽取組件Logwash。封裝了基于 Freemaker 的模板引擎做為日志格式的解析模塊,對日志進行提取,算術運算,條件判斷,替換,循環遍歷等操作。
下圖是 Logwash 組件的處理流程:
組件支持文本與 Json 兩種類型日志進行解析提取,目前該清洗組件已支持微博廣告近百個實時清洗需求,提供給運維組等第三方非實時計算方向人員快速進行提取日志的能力。
配置文件部分示例:
Flink 中 DataStream 的開發,對于通用的邏輯及相同的代碼進行了抽取,生成了我們的通用組件庫 FlinkStream。FlinkStream 包括了對 Topology 的抽象及默認實現、對 Stream 的抽象及默認實現、對 Source 的抽象和某些實現、對 Operator 的抽象及某些實現、Sink 的抽象及某些實現。任務提交統一使用可執行 Jar 和配置文件,Jar 會讀取配置文件構建對應的拓撲圖。
對于 Source 進行抽象,創建抽象類及對應接口,對于 Flink Connector 中已有的實現,例如 kafka,Elasticsearch 等,直接創建新 class 并繼承接口,實現對應的方法即可。對于需要自己去實現的 connector,直接繼承抽象類及對應接口,實現方法即可。目前只實現了 KafkaSource。
與 Source 抽象類似,我們實現了基于 Stream 到 Stream 級別的 Operator 抽象。創建抽象 Operate 類,抽象 Transform 方法。對于要實現的 Transform 操作,直接繼承抽象類,實現其抽象方法即可。目前實現的 Operator,直接按照文檔使用。如下:
針對 Sink,我們同樣創建了抽象類及接口。對 Flink Connector 中已有的 Sink 進行封裝。目前可通過配置進行數據輸出的 Sink。目前以實現和封裝的 Sink 組件有:Kafka、Stdout、Elasticsearch、Clickhouse、Hbase、Redis、MySQL。
創建 Stream 抽象類及抽象方法 buildStream,用于構建 StreamGraph。我們實現了默認的 Stream,buildStream 方法讀取 Source 配置生成 DataStream,通過 Operator 配置列表按順序生成拓撲圖,通過 Sink 配置生成數據寫出組件。
對于單 Stream,要處理的邏輯可能比較簡單,主要讀取一個 Source 進行數據的各種操作并輸出。對于復雜的多 Stream 業務需求,比如多流 Join,多流 Union、Split 流等,因此我們多流業務進行了抽象,產生了 Topology。在 Topology 這一層可以對多流進行配置化操作。對于通用的操作,我們實現了默認 Topology,直接通過配置文件就可以實現業務需求。對于比較復雜的業務場景,用戶可以自己實現 Topology。
我們對抽象的組件都是可配置化的,直接通過編寫配置文件,構造任務的運行拓撲結構,啟動任務時指定配置文件。
正文文本框 Flink Environment 配置化,包括時間處理類型、重啟策略,checkpoint 等;
Topology 配置化,可配置不同 Stream 之間的處理邏輯與 Sink;
Stream 配置化,可配置 Source,Operator 列表,Sink。
配置示例如下:
run_env: timeCharacteristic: "ProcessingTime" #ProcessingTime\IngestionTime\EventTime restart: # 重啟策略配置 type: # noRestart, fixedDelayRestart, fallBackRestart, failureRateRestart checkpoint: # 開啟checkpoint type: "rocksdb" # streams: impStream: #粉絲經濟曝光日志 type: "DefaultStream" config: source: type: "Kafka011" # 源是kafka011版本 config: parallelism: 20 operates: - type: "StringToMap" config: - type: "SplitElement" config: ... - type: "SelectElement" config: transforms: - type: "KeyBy" config: - type: "CountWindowWithTimeOut" #Window需要和KeyBy組合使用 config: - type: "SplitStream" config: - type: "SelectStream" config: sink: - type: Kafka config: - type: Kafka config:
在實時任務管理平臺,新建任務,填寫任務名稱,選擇任務類型(Flink)及版本,上傳可執行 Jar 文件,導入配置或者手動編寫配置,填寫 JobManager 及 TaskManager 內存配置,填寫并行度配置,選擇是否重試,選擇是否從 checkpoint 恢復等選項,保存后即可在任務列表中啟動任務,并觀察啟動日志用于排查啟動錯誤。
SQL 語言是一門聲明式的,簡單的,靈活的語言,Flink 本身提供了對 SQL 的支持。Flink 1.6 版本和 1.8 版本對 SQL 語言的支持有限,不支持建表語句,不支持對外部數據的關聯操作。因此我們通過 Apache Calcite 對 Flink SQL API 進行了擴展,用戶只需要關心業務需求怎么用 SQL 語言來表達即可。
擴展了支持創建源表 SQL,通過解析 SQL 語句,獲取數據源配置信息,創建對應的 TableSource 實例,并將其注冊到 Flink environment。示例如下:
使用 Apache Calcite 對 SQL 進行解析,通過維表關鍵字識別維表,使用 RichAsyncFunction 算子異步讀取維表數據,并通過 flatMap 操作生成關聯后的 DataStream,然后轉換為 Table 注冊到 Flink Environment。示例如下:
使用 SQLQuery 方法,支持從上一層表或者視圖中創建視圖表,并將新的視圖表注冊到 Flink Environment。創建語句需要按照順序寫,比如 myView2 是從視圖 myView1 中創建的,則 myView1 創建語句要在myView2語句前面。如下:
支持創建結果表,通過解析 SQL 語句,獲取配置信息,創建對應的 AppendStreamTableSink 或者 UpsertStreamTableSink 實例,并將其注冊到 Flink Environment。示例如下:
支持自定義 UDF 函數,繼承 ScalarFunction 或者 TableFunction。在 resources 目錄下有相應的 UDF 資源配置文件,默認會注冊全部可執行 Jar 包中配置的 UDF。直接按照使用方法使用即可。
部署方式同 Flink Stream 組件。
為了保證實時數據的統一對外出口以及保證數據指標的統一口徑,我們根據業界離線數倉的經驗來設計與構架微博廣告實時數倉。
數據倉庫分為三層,自下而上為:數據引入層(ODS,Operation Data Store)、數據公共層(CDM,Common Data Model)和數據應用層(ADS,Application Data Service)。
數據引入層(ODS,Operation Data Store):將原始數據幾乎無處理的存放在數據倉庫系統,結構上與源系統基本保持一致,是數據倉庫的數據準。
數據公共層(CDM,Common Data Model,又稱通用數據模型層):包含 DIM 維度表、DWD 和 DWS,由 ODS 層數據加工而成。主要完成數據加工與整合,建立一致性的維度,構建可復用的面向分析和統計的明細事實表,以及匯總公共粒度的指標。
公共維度層(DIM):基于維度建模理念思想,建立整個企業的一致性維度。降低數據計算口徑和算法不統一風險。
公共維度層的表通常也被稱為邏輯維度表,維度和維度邏輯表通常一一對應。
公共匯總粒度事實層(DWS,Data Warehouse Service):以分析的主題對象作為建模驅動,基于上層的應用和產品的指標需求,構建公共粒度的匯總指標事實表,以寬表化手段物理化模型。構建命名規范、口徑一致的統計指標,為上層提供公共指標,建立匯總寬表、明細事實表。
公共匯總粒度事實層的表通常也被稱為匯總邏輯表,用于存放派生指標數據。
明細粒度事實層(DWD,Data Warehouse Detail):以業務過程作為建模驅動,基于每個具體的業務過程特點,構建最細粒度的明細層事實表。可以結合企業的數據使用特點,將明細事實表的某些重要維度屬性字段做適當冗余,也即寬表化處理。
明細粒度事實層的表通常也被稱為邏輯事實表。
數據應用層(ADS,Application Data Service):存放數據產品個性化的統計指標數據。根據 CDM 與 ODS 層加工生成。
對于原始日志數據,ODS 層幾乎是每條日志抽取字段后進行保留,這樣便能對問題的回溯與追蹤。在 CDM 層對 ODS 的數據僅做時間粒度上的數據壓縮,也就是在指定時間切分窗口里,對所有維度下的指標做聚合操作,而不涉及業務性的操作。在 ADS 層,我們會有配置化抽取微服務,對底層數據做定制化計算和提取,輸出到用戶指定的存儲服務里。
到此,關于“如何理解微博基于Flink的實時計算平臺建設”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。