91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Apache Hudi使用是怎么樣的

發布時間:2021-11-23 10:52:58 來源:億速云 閱讀:212 作者:柒染 欄目:大數據

Apache Hudi使用是怎么樣的,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。]

數據實時處理和實時的數據

實時分為處理的實時和數據的實時 即席分析是要求對數據實時的處理,馬上要得到對應的結果 Flink、Spark Streaming是用來對實時數據的實時處理,數據要求實時,處理也要迅速 數據不實時,處理也不及時的場景則是我們的數倉T+1數據

而本文探討的Apache Hudi,對應的場景是數據的實時,而非處理的實時。它旨在將Mysql中的時候以近實時的方式映射到大數據平臺,比如Hive中。

業務場景和技術選型

傳統的離線數倉,通常數據是T+1的,不能滿足對當日數據分析的需求 而流式計算一般是基于窗口,并且窗口邏輯相對比較固定。 而筆者所在的公司有一類特殊的需求,業務分析比較熟悉現有事務數據庫的數據結構,并且希望有很多即席分析,這些分析包含當日比較實時的數據。慣常他們是基于Mysql從庫,直接通過Sql做相應的分析計算。但很多時候會遇到如下障礙

  • 數據量較大、分析邏輯較為復雜時,Mysql從庫耗時較長

  • 一些跨庫的分析無法實現

因此,一些彌合在OLTP和OLAP之間的技術框架出現,典型有TiDB。它能同時支持OLTP和OLAP。而諸如Apache Hudi和Apache Kudu則相當于現有OLTP和OLAP技術的橋梁。他們能夠以現有OLTP中的數據結構存儲數據,支持CRUD,同時提供跟現有OLAP框架的整合(如Hive,Impala),以實現OLAP分析

Apache Kudu,需要單獨部署集群。而Apache Hudi則不需要,它可以利用現有的大數據集群比如HDFS做數據文件存儲,然后通過Hive做數據分析,相對來說更適合資源受限的環境 ###Apache hudi簡介

使用Aapche Hudi整體思路

Hudi 提供了Hudi 表的概念,這些表支持CRUD操作。我們可以基于這個特點,將Mysql Binlog的數據重放至Hudi表,然后基于Hive對Hudi表進行查詢分析。數據流向架構如下 Apache Hudi使用是怎么樣的

Hudi表數據結構

Hudi表的數據文件,可以使用操作系統的文件系統存儲,也可以使用HDFS這種分布式的文件系統存儲。為了后續分析性能和數據的可靠性,一般使用HDFS進行存儲。以HDFS存儲來看,一個Hudi表的存儲文件分為兩類。

Apache Hudi使用是怎么樣的

  • 包含_partition_key相關的路徑是實際的數據文件,按分區存儲,當然分區的路徑key是可以指定的,我這里使用的是_partition_key

  • .hoodie 由于CRUD的零散性,每一次的操作都會生成一個文件,這些小文件越來越多后,會嚴重影響HDFS的性能,Hudi設計了一套文件合并機制。 .hoodie文件夾中存放了對應的文件合并操作相關的日志文件。

數據文件

Hudi真實的數據文件使用Parquet文件格式存儲 Apache Hudi使用是怎么樣的

.hoodie文件

Hudi把隨著時間流逝,對表的一系列CRUD操作叫做Timeline。Timeline中某一次的操作,叫做Instant。Instant包含以下信息

  • Instant Action 記錄本次操作是一次數據提交(COMMITS),還是文件合并(COMPACTION),或者是文件清理(CLEANS)

  • Instant Time 本次操作發生的時間

  • state 操作的狀態,發起(REQUESTED),進行中(INFLIGHT),還是已完成(COMPLETED)

.hoodie文件夾中存放對應操作的狀態記錄 Apache Hudi使用是怎么樣的

Hudi記錄Id

hudi為了實現數據的CRUD,需要能夠唯一標識一條記錄。hudi將把數據集中的唯一字段(record key ) + 數據所在分區 (partitionPath) 聯合起來當做數據的唯一鍵

COW和MOR

基于上述基礎概念之上,Hudi提供了兩類表格式COW和MOR。他們會在數據的寫入和查詢性能上有一些不同

Copy On Write Table

簡稱COW。顧名思義,他是在數據寫入的時候,復制一份原來的拷貝,在其基礎上添加新數據。正在讀數據的請求,讀取的是是近的完整副本,這類似Mysql 的MVCC的思想。

Apache Hudi使用是怎么樣的

上圖中,每一個顏色都包含了截至到其所在時間的所有數據。老的數據副本在超過一定的個數限制后,將被刪除。這種類型的表,沒有compact instant,因為寫入時相當于已經compact了。

  • 優點 讀取時,只讀取對應分區的一個數據文件即可,較為高效

  • 缺點 數據寫入的時候,需要復制一個先前的副本再在其基礎上生成新的數據文件,這個過程比較耗時。且由于耗時,讀請求讀取到的數據相對就會滯后

Merge On Read Table

簡稱MOR。新插入的數據存儲在delta log 中。定期再將delta log合并進行parquet數據文件。讀取數據時,會將delta log跟老的數據文件做merge,得到完整的數據返回。當然,MOR表也可以像COW表一樣,忽略delta log,只讀取最近的完整數據文件。下圖演示了MOR的兩種數據讀寫方式 Apache Hudi使用是怎么樣的

  • 優點 由于寫入數據先寫delta log,且delta log較小,所以寫入成本較低

  • 缺點 需要定期合并整理compact,否則碎片文件較多。讀取性能較差,因為需要將delta log 和 老數據文件合并

基于hudi的代碼實現

我在github上放置了基于Hudi的封裝實現,對應的源碼地址為 https://github.com/wanqiufeng/hudi-learn。

binlog數據寫入Hudi表
  • binlog-consumer分支使用Spark streaming消費kafka中的Binlog數據,并寫入Hudi表。Kafka中的binlog是通過阿里的Canal工具同步拉取的。程序入口是CanalKafkaImport2Hudi,它提供了一系列參數,配置程序的執行行為

參數名含義是否必填默認值
--base-save-pathhudi表存放在HDFS的基礎路徑,比如hdfs://192.168.16.181:8020/hudi_data/
--mapping-mysql-db-name指定處理的Mysql庫名
--mapping-mysql-table-name指定處理的Mysql表名
--store-table-name指定Hudi的表名默認會根據--mapping-mysql-db-name和--mapping-mysql-table-name自動生成。假設--mapping-mysql-db-name 為crm,--mapping-mysql-table-name為order。那么最終的hudi表名為crm__order
--real-save-path指定hudi表最終存儲的hdfs路徑默認根據--base-save-path和--store-table-name自動生成,生成格式為'--base-save-path'+'/'+'--store-table-name' ,推薦默認
--primary-key指定同步的mysql表中能唯一標識記錄的字段名默認id
--partition-key指定mysql表中可以用于分區的時間字段,字段必須是timestamp 或dateime類型
--precombine-key最終用于配置hudi的hoodie.datasource.write.precombine.field默認id
--kafka-server指定Kafka 集群地址
--kafka-topic指定消費kafka的隊列
--kafka-group指定消費kafka的group默認在存儲表名前加'hudi'前綴,比如'hudi_crm__order'
--duration-seconds由于本程序使用Spark streaming開發,這里指定Spark streaming微批的時長默認10秒

一個使用的demo如下

/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit --class com.niceshot.hudi.CanalKafkaImport2Hudi \
	--name hudi__goods \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 512m \
    --executor-memory 512m \
    --executor-cores 1 \
	--num-executors 1 \
    --queue hudi \
    --conf spark.executor.memoryOverhead=2048 \
    --conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \
	--conf spark.core.connection.ack.wait.timeout=300 \
	--conf spark.locality.wait=100 \
	--conf spark.streaming.backpressure.enabled=true \
	--conf spark.streaming.receiver.maxRate=500 \
	--conf spark.streaming.kafka.maxRatePerPartition=200 \
	--conf spark.ui.retainedJobs=10 \
	--conf spark.ui.retainedStages=10 \
	--conf spark.ui.retainedTasks=10 \
	--conf spark.worker.ui.retainedExecutors=10 \
	--conf spark.worker.ui.retainedDrivers=10 \
	--conf spark.sql.ui.retainedExecutions=10 \
	--conf spark.yarn.submit.waitAppCompletion=false \
	--conf spark.yarn.maxAppAttempts=4 \
	--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
	--conf spark.yarn.max.executor.failures=20 \
	--conf spark.yarn.executor.failuresValidityInterval=1h \
	--conf spark.task.maxFailures=8 \
    /data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar  --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200
歷史數據同步以及表元數據同步至hive

history_import_and_meta_sync 分支提供了將歷史數據同步至hudi表,以及將hudi表數據結構同步至hive meta的操作

同步歷史數據至hudi表

這里采用的思路是

  • 將mysql全量數據通過注入sqoop等工具,導入到hive表。

  • 然后采用分支代碼中的工具HiveImport2HudiConfig,將數據導入Hudi表

HiveImport2HudiConfig提供了如下一些參數,用于配置程序執行行為

參數名含義是否必填默認值
--base-save-pathhudi表存放在HDFS的基礎路徑,比如hdfs://192.168.16.181:8020/hudi_data/
--mapping-mysql-db-name指定處理的Mysql庫名
--mapping-mysql-table-name指定處理的Mysql表名
--store-table-name指定Hudi的表名默認會根據--mapping-mysql-db-name和--mapping-mysql-table-name自動生成。假設--mapping-mysql-db-name 為crm,--mapping-mysql-table-name為order。那么最終的hudi表名為crm__order
--real-save-path指定hudi表最終存儲的hdfs路徑默認根據--base-save-path和--store-table-name自動生成,生成格式為'--base-save-path'+'/'+'--store-table-name' ,推薦默認
--primary-key指定同步的hive歷史表中能唯一標識記錄的字段名默認id
--partition-key指定hive歷史表中可以用于分區的時間字段,字段必須是timestamp 或dateime類型
--precombine-key最終用于配置hudi的hoodie.datasource.write.precombine.field默認id
--sync-hive-db-name全量歷史數據所在hive的庫名
--sync-hive-table-name全量歷史數據所在hive的表名
--hive-base-pathhive的所有數據文件存放地址,需要參看具體的hive配置/user/hive/warehouse
--hive-site-pathhive-site.xml配置文件所在的地址
--tmp-data-path程序執行過程中臨時文件存放路徑。一般默認路徑是/tmp。有可能出現/tmp所在磁盤太小,而導致歷史程序執行失敗的情況。當出現該情況時,可以通過該參數自定義執行路徑默認操作系統臨時目錄

一個程序執行demo

nohup java -jar hudi-learn-1.0-SNAPSHOT.jar --sync-hive-db-name hudi_temp --sync-hive-table-name crm__wx_user_info --base-save-path hdfs://192.168.2.2:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name "order" --primary-key "id" --partition-key created_date --hive-site-path /etc/lib/hive/conf/hive-site.xml --tmp-data-path /data/tmp > order.log &
同步hudi表結構至hive meta

需要將hudi的數據結構和分區,以hive外表的形式同步至Hive meta,才能是Hive感知到hudi數據,并通過sql進行查詢分析。Hudi本身在消費Binlog進行存儲時,可以順帶將相關表元數據信息同步至hive。但考慮到每條寫入Apache Hudi表的數據,都要讀寫Hive Meta ,對Hive的性能可能影響很大。所以我單獨開發了HiveMetaSyncConfig工具,用于同步hudi表元數據至Hive。考慮到目前程序只支持按天分區,所以同步工具可以一天執行一次即可。參數配置如下

參數名含義是否必填默認值
--hive-db-name指定hudi表同步至哪個hive數據庫
--hive-table-name指定hudi表同步至哪個hive表
--hive-jdbc-url指定hive meta的jdbc鏈接地址,例如jdbc:hive2://192.168.16.181:10000
--hive-user-name指定hive meta的鏈接用戶名默認hive
--hive-pwd指定hive meta的鏈接密碼默認hive
--hudi-table-path指定hudi表所在hdfs的文件路徑
--hive-site-path指定hive的hive-site.xml路徑

一個程序執行demo

java -jar hudi-learn-1.0-SNAPSHOT.jar --hive-db-name streaming --hive-table-name crm__order --hive-user-name hive --hive-pwd hive --hive-jdbc-url jdbc:hive2://192.168.16.181:10000 --hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order --hive-site-path /lib/hive/conf/hive-site.xml

一些踩坑

hive相關配置

有些hive集群的hive.input.format配置,默認是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,這會導致掛載Hudi數據的Hive外表讀取到所有Hudi的Parquet數據,從而導致最終的讀取結果重復。需要將hive的format改為org.apache.hadoop.hive.ql.io.HiveInputFormat,為了避免在整個集群層面上更改對其余離線Hive Sql造成不必要的影響,建議只對當前hive session設置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

spark streaming的一些調優

由于binlog寫入Hudi表的是基于Spark streaming實現的,這里給出了一些spark 和spark streaming層面的配置,它能使整個程序工作更穩定

配置含義
spark.streaming.backpressure.enabled=true啟動背壓,該配置能使Spark Streaming消費速率,基于上一次的消費情況,進行調整,避免程序崩潰
spark.ui.retainedJobs=10 <br> spark.ui.retainedStages=10 <br> spark.ui.retainedTasks=10 <br>spark.worker.ui.retainedExecutors=10 <br>spark.worker.ui.retainedDrivers=10 <br>spark.sql.ui.retainedExecutions=10默認情況下,spark 會在driver中存儲一些spark 程序執行過程中各stage和task的歷史信息,當driver內存過小時,可能使driver崩潰,通過上述參數,調節這些歷史數據存儲的條數,從而減小對內層使用
spark.yarn.maxAppAttempts=4配置當driver崩潰后,嘗試重啟的次數
spark.yarn.am.attemptFailuresValidityInterval=1h假若driver執行一周才崩潰一次,那我們更希望每次都能重啟,而上述配置在累計到重啟4次后,driver就再也不會被重啟,該配置則用于重置maxAppAttempts的時間間隔
spark.yarn.max.executor.failures=20executor執行也可能失敗,失敗后集群會自動分配新的executor, 該配置用于配置允許executor失敗的次數,超過次數后程序會報(reason: Max number of executor failures (400) reached),并退出
spark.yarn.executor.failuresValidityInterval=1h指定executor失敗重分配次數重置的時間間隔
spark.task.maxFailures=8允許任務執行失敗的次數

未來改進

  • 支持無分區,或非日期分區表。目前只支持日期分區表

  • 多數據類型支持,目前為了程序的穩定性,會將Mysql中的字段全部以String類型存儲至Hudi

看完上述內容,你們掌握Apache Hudi使用是怎么樣的的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

钟祥市| 七台河市| 准格尔旗| 江永县| 乌鲁木齐县| 文昌市| 民权县| 顺平县| 白玉县| 桐庐县| 和田市| 永康市| 富裕县| 葵青区| 沧州市| 岳西县| 宁南县| 玉龙| 兴安盟| 武夷山市| 南部县| 安宁市| 石景山区| 翁牛特旗| 喀喇沁旗| 石家庄市| 齐河县| 泰宁县| 邮箱| 梅州市| 梅河口市| 浙江省| 台东市| 旅游| 静宁县| 马山县| 宁国市| 云龙县| 松滋市| 大名县| 肥城市|