您好,登錄后才能下訂單哦!
Serverless 銜接Kafka上下游數據流轉的實戰分析,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
CKafka 作為大數據架構中的關鍵組件,起到了數據聚合,流量削峰,消息管道的作用。在 CKafka 上下游中的數據流轉中有各種優秀的開源解決方案。如 Logstash,File Beats,Spark,Flink 等等。下面將帶來一種新的解決方案:Serverless Function。其在學習成本,維護成本,擴縮容能力等方面相對已有開源方案將有優異的表現。
Tencent Cloud Kafka 是基于開源 Kafka 引擎研發的適合大規模公有云部署的 Cloud Kafka。是一款適合公有云部署、運行、運維的分布式的、高可靠、高吞吐和高可擴展的消息隊列系統。它 100% 兼容開源的 Kafka API,目前主要支持開源的 0.9, 0.10, 1.1.1, 2.4.2 四個大版本 ,并提供向下兼容的能力。
目前 Tencent Cloud Kafka 維護了超過 4000+ 節點的集群,每日吞吐的消息量超過 9 萬億+條,峰值帶寬達到了 800GB+/s, 堆積數據達到了 20PB+。是一款集成了租戶隔離、限流、鑒權、安全、數據監控告警、故障快速切換、跨可用區容災等等一系列特性的,歷經大流量檢驗的、可靠的公有云上 Kafka 集群。
CKafka 作為一款高吞吐,高可靠的消息隊列引擎。需要承接大量數據的流入和流出,數據流動的這一過程我們稱之它為數據流轉。而在處理數據的流入和流出過程中,會有很多成熟豐富的開源的解決方案,如 Logstash,Spark,Fllink等。從簡單的數據轉儲,到復雜的數據清洗,過濾,聚合等,都有現成的解決方案。
如圖所示,在 Kafka 上下游生態圖中,CKafka 處于中間層,起到數據聚合,流量削峰,消息管道的作用。圖左和圖上是數據寫入的組件概覽,圖右和圖下是下游流式數據處理方案和持久化存儲引擎。這些構成了 Kafka 周邊的數據流動的生態。
下圖是流式計算典型數據流動示意圖。其中承接數據流轉方案的是各種開源解決方案。單純從功能和性能的角度來講,開源解決方案都有很優秀的表現。
而從學習成本,維護成本,金錢成本,擴縮容能力等角度來看,這些開源方案還是有欠缺的。怎么說呢?開源方案的缺點主要在于如下三點:
學習成本
調優、維護、解決問題的成本
擴縮容能力
以 Logstash 為例,它的入門使用學習門檻不高,進階使用有一定的成本,主要包括眾多 release 版本的使用成本,參數調優和故障處理成本,后續的維護成本(進程可用性,單機的負載處理)等。如果用流式計算引擎,如 spark 和 flink,其雖然具有分布式調度能力和即時的數據處理能力,但是其學習門檻和后期的集群維護成本,將大大提高。
來看 Serverless Function 是怎么處理數據流轉的。如圖所示,Serverless Function 運行在數據的流入和流出的處理層的位置,代替了開源的解決方案。Serverless Function 是以自定義代碼的形式來實現數據清洗、過濾、聚合、轉儲等能力的。它具有學習成本低、無維護成本、自動擴縮容和按量計費等優秀特性。
接下來我們來看一下 Serverless Function 是怎么實現數據流轉的,并且了解一下其底層的運行機制及其優勢。
首先來看一下怎么使用 Serverless Function 實現 Kafka To Elasticsearch 的數據流轉。下面以 Function 事件觸發的方式來說明 Function 是怎么實現低成本的數據清洗、過濾、格式化和轉儲的:
在業務錯誤日志采集分析的場景中,會將機器上的日志信息采集并發送到服務端。服務端選擇 Kafka 作為消息中間件,起到數據可靠存儲,流量削峰的作用。為了保存長時間的數據(月,年),一般會將數據清洗、格式化、過濾、聚合后,存儲到后端的分布式存儲系統,如 HDFS,HBASE,Elasticsearch 中。
以下代碼段分為三部分:數據源的消息格式,處理后的目標消息格式,功能實現的 Function 代碼段
源數據格式:
{ "version": 1, "componentName": "trade", "timestamp": 1595944295, "eventId": 9128499, "returnValue": -1, "returnCode": 101103, "returnMessage": "return has no deal return error[錯誤:缺少**c參數][seqId:u3Becr8iz*]", "data": [], "seqId": "@kibana-highlighted-field@u3Becr8iz@/kibana-highlighted-field@*" }
目標數據格式:
{ "timestamp": "2020-07-28 21:51:35", "returnCode": 101103, "returnError": "return has no deal return error", "returnMessage": "錯誤:缺少**c參數", "requestId": "u3Becr8iz*" }
Function 代碼
Function 實現的功能是將數據從源格式,通過清洗,過濾,格式化轉化為目標數據格式,并轉儲到 Elasticsearch。代碼的邏輯很簡單:CKafka 收到消息后,觸發了函數的執行,函數接收到信息后會執行 convertAndFilter 函數的過濾,重組,格式化操作,將源數據轉化為目標格式,最后數據會被存儲到 Elasticsearch。
#!/usr/bin/python # -*- coding: UTF-8 -*- from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers esServer = "http://172.16.16.53:9200" # 修改為 es server 地址+端口 E.g. http://172.16.16.53:9200 esUsr = "elastic" # 修改為 es 用戶名 E.g. elastic esPw = "PW123" # 修改為 es 密碼 E.g. PW2312321321 esIndex = "pre1" # es 的 index 設置 # ... or specify common parameters as kwargs es = Elasticsearch([esServer], http_auth=(esUsr, esPw), sniff_on_start=False, sniff_on_connection_fail=False, sniffer_timeout=None) def convertAndFilter(sourceStr): target = {} source = json.loads(sourceStr) # 過濾掉returnCode=0的日志 if source["returnCode"] == 0: return dateArray = datetime.datetime.fromtimestamp(source["timestamp"]) target["timestamp"] = dateArray.strftime("%Y-%m-%d %H:%M:%S") target["returnCode"] = source["returnCode"] message = source["returnMessage"] message = message.split("][") errorInfo = message[0].split("[") target["returnError"] = errorInfo[0] target["returnMessage"] = errorInfo[1] target["requestId"] = message[1].replace("]", "").replace("seqId:", "") return target def main_handler(event, context): # 獲取 event Records 字段并做轉化操作 數據結構 https://cloud.tencent.com/document/product/583/17530 for record in event["Records"]: target = convertAndFilter(record) action = { "_index": esIndex, "_source": { "msgBody": target # 獲取 Ckafka 觸發器 msgBody } } helpers.bulk(es, action) return ("successful!")
看到這里,大家可能會發現,這個代碼段平時是處理單機的少量數據的腳本是一樣的,就是做轉化,轉儲,很簡單。其實很多分布式的系統做的系統從微觀的角度看,其實就是做的這么簡單的事情。分布式框架本身做的更多的是分布式調度,分布式運行,可靠性,可用性等等工作,細化到執行單元,功能其實和上面的代碼段是一樣的。
從宏觀來看,Serverless Function 做的事情和分布式計算框架 Spark, Flink 等做的事情是一樣的,都是調度,執行基本的執行單元,處理業務邏輯。區別在于用開源的方案,需要使用方去學習,使用,維護運行引擎,而 Serverless Function 則是平臺來幫用戶做這些事情。
接下來我們來看 Serverless Function 在底層是怎么去支持這些功能的,來看一下其底層的運行機制。如圖所示:
Function 作為一個代碼片段,提交給平臺以后。需要有一種觸發函數運行的方式,目前主要有如下三種:事件觸發、定時觸發和主動觸發。
在上面的例子中,我們是以事件觸發為例的。當消息提交到 Kafka,就會觸發函數的運行。此時 Serverless 調度運行平臺就會調度底層的 Container 并發去執行函數,并執行函數的邏輯。此時關于 Container 的并發度是由系統自動調度,自動計算的,當 Kafka 的源數據多的時候,并發量就大,當數據少的時候,相應的就會較少并發數。因為函數是以運行時長計費的,當源消息數據量少的時候,并發量小,自然運行時長就少,自然所需付出的資金成本就降下來。
在函數執行過程當中,函數的可靠性運行,自動擴縮容調度,并發度等都是用戶不需要關心的。用戶需要 Cover 的只是函數代碼段的可運行,無 BUG。這對于研發人員的精力投入成本就降低很多。
值得一談的是,在開發語言方面,開源方案只支持其相對應的語言,如 Logstash 的嵌入腳本用的是 ruby,spark 主要支持java,scala,python 等。而 Serverless Function 支持的是幾乎業界常見到的開發語言,包括不限于 java,golang,python,node JS,php 等等。這點就可以讓研發人員用其熟悉的語言去解決數據流轉問題,這在無形中就減少了很多代碼出錯和出問題的機會。
下面我們來統一看一下 Serverless Function 和開源的方案的主要區別及優勢。如圖5所示,和開源方案相比。在非實時的數據流轉場景中,Serverless Function 相對現有的開源方案,它具有的優勢幾乎是壓倒性的。從功能和性能的角度,它在批式計算(實時)的場景中是完全可以滿足的。但是它相對開源方案在學習成本,運維成本幾乎可以忽略,其動態擴縮容,按需付費,毫秒級付費對于資金成本的投入也是非常友好的。
用一句話總結就是:Serverless Function 能用一段熟悉的語言編寫一小段代碼去銜接契合流式計算中的數據流轉。
隨著流式計算的發展,慢慢演化出了批量計算 (batch computing)、流式計算 (stream computing)、交互計算 (interactive computing)、圖計算 (graph computing) 等方向。而架構師在業務中選擇批式計算或者流式計算,其核心是希望按需使用批式計算或流式計算,以取得在延時、吞吐、容錯、成本投入等方面的平衡。在使用者看來,批式處理可以提供精確的批式數據視圖,流式處理可以提供近實時的數據視圖。而在批式處理當中,或者說在未來的批式處理和流式處理的底層技術的合流過程中,Lambda 架構是其發展的必然路徑。
Serverless Function 以其按需使用,自動擴縮容及近乎無限的橫向擴容能力給現階段的批式處理提供了一種選擇,并且在未來批流一體化的過程中,未來可期。
關于Serverless 銜接Kafka上下游數據流轉的實戰分析問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。