您好,登錄后才能下訂單哦!
基于數據庫系統的 T+0 全量實時查詢,在數據量很大時一般只能進行數據庫擴容(包括分庫手段),成本高昂;如果采用文件系統和生產數據庫混合運算,就可以實現低成本高性能的 T+0 查詢,而熱導出機制則是這個方案的基礎!
在報表的應用系統中,用戶越來越關注數據的實時性,希望最新發生的數據能在報表中體現出來,也就是我們常說的T+0場景, 以此及時輔助決策、驅動運營。
比如交通大數據應用的場景:需要結合實時數據了解車輛通行密度,合理進行道路規劃,同時根據歷史數據預測線路擁堵情況、事故多發地提醒等等。
但常規的方案:報表+數據倉庫+ETL工具很難實現此類實時報表,往往只能看到昨天、上周甚至是上個月的情況,也就是T+1、T+7、T+30等,我們統稱為T+n報表。
究其原因,困難大概體現在如下三個方面:
1、如果報表的歷史數據和最新數據都從生產系統讀取,雖然可以實現T+0報表,但是會對生產數據庫造成壓力,當數據量越來越大時,產生性能瓶頸,直接影響業務;并且大量的歷史數據會占用高昂的數據庫成本(存儲成本和性能成本)。
2、如果采用數據倉庫的方式,那么ETL從生產庫中取出數據,需要較長的“窗口時間”,一般是業務人員下班之后,到第二天早上上班之前,所以能看到的最新數據也只能是T+1。
3、雖然理論上可以從歷史庫中和生產庫中同時取數據形成實時報表,但是一般的報表工具都不具備跨庫混合計算的能力,其他的跨庫計算方案又比較復雜,難以實施,并且性能較低。
那么,是否有成本更低、實施起來更簡單的T+0報表方案呢?下面將要介紹的潤乾集算器,就是這樣一款利器,利用集算器的混合數據源能力就能實現低成本的T+0實時報表。
實現思路:把不再發生變動的大量歷史數據采用數據文件存儲,僅從生產庫讀取少量新數據,在保證報表實時性的同時,降低了歷史數據存儲的成本,減少了報表系統對生產數據庫造成的負載。
下圖顯示了常規T+n方案和集算器T+0方案的結構對比,應該說,引入集算器后,減少了很多不必要的成本和多余的組件,整個體系架構也變得更加清新與合理了:
上圖新處理方式體系結構中的”導出(非實時)”是指在非工作時間(例如晚上),定時將生產數據庫的新增數據同步到存儲歷史數據的文件中;
關于數據外置方案、設計數據存儲組織、定時任務等相關準備和外圍工作,具體做法可參考<<基于文件系統實現可追加的數據集市>>的相關章節,這里不再贅述。
下面,我們就通過制作“實時流程工站不良柏拉圖”這個例子,來看一下集算器是如何利用歷史數據結合當期數據進行混合運算,實現T+0方案的。報表最終的展示效果如下圖:
這張報表清楚地顯示了電子設備在生產過程中,80%的問題是由20%的原因造成的,對于找出產生大多數問題的關鍵原因很有優勢。
報表中數據的查詢過程是:根據選擇開始日期、結束日期進行過濾查詢;先按照不良代碼分組,統計匯總每個分類的不良數量,并按照匯總數量降序,然后計算出不良累計比率(算法為“(不良數量累計匯總/總不良數量匯總)*100”)。報表上部的查詢按鈕是報表工具提供的“參數模板”功能,具體做法參見教程,這里不再贅述。
我們假定已經將變化不大的歷史數據搬出了數據庫,采用集文件(集文件利用集算器提供的壓縮格式,具有更好IO性能)存儲,命名為MES-pre.btx,同時每天定時執行數據同步腳本,把前一天的數據追加到當前數據文件中;查詢涉及的當天少量數據直接從生產數據庫(demo)取出,以此保證數據的實時性。集算器SPL腳本如下(也支持僅查歷史數據的情況):
A | B | |
1 | =connect("demo") | =A1.query("SELECT code,name FROM watch") |
2 | =A1.cursor@x("SELECT code,nums FROM meta_resource WHERE "+if(Efiledate>=date(now()),"DATE_FORMAT(fildate,'%Y-%m-%d')>='"+string(date(now()))+"'","1=0")) | |
3 | =file("D:/PT/MES-pre.btx").cursor@b().select(filedate>=Bfiledate && filedate<Efiledate) | =[A2,A3].conjx() |
4 | =B3.groups(code:不良代碼,code:不良名稱;sum(nums):不良數量,sum(nums):不良累計數量,sum(0):不良累計比率) | |
5 | >A4.switch(不良名稱,B1:code) | =A4.sort(不良數量:-1) |
6 | >B5.run(不良累計數量+=不良累計數量[-1]) | =A4.sum(不良數量) |
7 | >B5.run(不良累計比率=round(不良累計數量/B6*100,2)) | =B5.new(不良代碼,不良名稱.name:不良名稱,不良數量,不良累計數量,不良累計比率) |
8 | return B7 |
A1:連接預先配置好的生產數據庫(demo)
B1:查詢字典表,不良代碼、不良名稱
A2:建立數據庫游標,用簡單的sql讀取數據表的數據。Sql的過濾條件部分會根據邏輯判斷進行動態拼接,當結束日期>=當前系統日期時,代表查詢當天的實時數據,否則做一次結果為空的查詢動作,以適應只查歷史數據的業務場景。@x選項是指讀完數據庫后關閉連接。
A3:建立數據文件D:/PT/MES-pre.btx的游標。文件游標允許分批從大數據文件中讀取數據,從而避免內存溢出。@b選項是指按照集算器提供的二進制格式來讀取文件,同時根據傳入的開始日期(Bfiledate)、結束日期(Efiledate)過濾出符合條件的記錄
B3:將數據庫游標(新數據)和文件游標(歷史數據)合并
A4:利用groups函數,完成對合并后游標的分組匯總,同時多構造了幾列:不良名稱、不良累計數量、不良累計比率,方便后面的賦值計算。
A5:通過switch()函數在A4結果的”不良名稱”字段上建立指向B1表中code字段的指針引用記錄,實現關聯,如下圖:
B5:按照不良數量降序排列。如下圖:
A6:計算不良累計數量;可以看到,集算器用“不良累計數量[-1]”來表示上一行的不良數量,可以輕松進行相對位置的計算。
B6:對不良數量進行總計
A7:計算出不良累計比率,算法為“(不良數量累計匯總/總不良數量匯總)*100”,同時保留兩位小數,計算結果如下圖:
B7-A8:取出需要的字段,將關聯了不良名稱后的結果集返回給報表工具,如下圖:
在利用集算器完成了數據查詢工作后,為了在報表中使用查詢結果,可以在報表中直接將集算器設置為數據源,用法和使用數據庫一樣簡單,具體做法如下:
l 在報表中定義參數(Bfiledate、Efiledate),
l 設置集算器數據集,并傳遞報表參數,
l 設計報表統計圖
如下圖所示:
完成報表設計后,輸入參數進行計算,就可以得到希望的報表了。
上一章節中,通過對歷史數據(文件)和實時數據(數據庫)進行混合計算,就能夠輕松實現實時報表(T+0)方案;而為了做到這一點,相應的數據預處理,包括怎么導出到文件、設計怎樣的存儲組織等,也就顯得尤為重要了。
接下來將討論歷史數據導出到文件的幾種模式及優缺點分析:冷導出、折中辦法、熱導出。
關于用文件存儲歷史數據能夠帶來的諸多好處,可以參考<<基于文件系統實現可追加的數據集市>>的相關章節,這里不再贅述。
所謂冷導出,就是允許有一段 “時間窗口”,能夠從生產庫取出歷史數據追加導出到文件中。例如每天的凌晨2-6點為定時執行任務的時間窗口。
冷導出的缺點也很明顯,在追加數據導出到文件的這段時間里,這個文件是不可讀的,也就是說相關的查詢也無法進行了,所以,從本質上說,冷導出并沒有真正意義上做到T+0實時查詢(生產系統不停機,查詢系統也不停機)。
不過,這里順便解釋一下:如果使用另一個數據庫存儲歷史數據,就不會有這樣的問題。原因在于關系型數據庫支持事務一致性,數據寫入的同時仍然可以很好地支持查詢。當然這樣做肯定也會犧牲一部分性能,當每天導出的數據量較多時對資源占用相當巨大(因為數據庫回滾段會很大)。
所以一致性和高性能在一定程度上是矛盾的。數據庫雖然有一致性,但數據庫本身太慢太貴;而集算器(集文件)可以獲得高性能,但沒有事務一致性,在維護數據的同時不能參與其他計算。
不過,在對業務場景要求不是很高的情況下,冷導出也是夠用了,下面我們還是簡單舉例說明一下如何編寫集算器腳本,獲取昨天的歷史數據追加到當前集文件中,代碼如下:
A | B | |
1 | =file(“D:/PT/MES-pre.btx”) | =connect("demo") |
2 | =B1.cursor@x("SELECT * FROM meta_resource WHERE DATE_FORMAT(fildate,'%Y-%m-%d')=?", after(date(now()),-1)) | >A1.export@ab(A2) |
A1:按路徑打開需要導出的集文件路徑
B1:連接數據庫(demo)
A2:根據sql創建數據庫游標,獲取昨日數據,參數為昨天日期; @x選項是指讀完數據庫后關閉
B2:執行結果追加寫入到集文件
針對”冷導出”方案的不足,比較容易想到的折中辦法就是:歷史數據不再按照追加的模式寫入到一個集文件中,而是把文件拆開,讓彼此之間的耦合度更低,互不影響。這樣做的話,就需要考慮以下兩條規則:
1、每天導出一個獨立的集文件,可以用年月日命名,這樣導出過程中,就不會影響對已導出的歷史數據的查詢。
2、在查詢腳本中增加時間范圍判斷,規避掉導出的”時間窗口”;比如定時任務的時間窗口為每天凌晨2-6點;在查詢腳本中,可以根據查詢動作的當前時間點進行邏輯判斷,如果查詢發生在當天6點以后,說明數據導出已經完成,那么數據來源就是集文件(到昨天為止的歷史數據)+當前數據庫(到今天當前時間點的新數據),若查詢發生在當天6點以前的,那就是集文件(到前天為止的歷史數據)+當前數據庫(昨天到今天當前時間點的新數據)。
這種辦法的缺點就是在設計數據存儲組織時,文件會分的比較碎,邏輯判斷部分的代碼也會顯得比較冗長,而文件管理也會麻煩一些。但不管怎樣,還是能夠達到要求,實現真正意義上的實時報表(T+0)方案。下面介紹一下實現步驟。
歷史數據按照業務模塊進行劃分,每天數據存一份集文件。目錄結構為:/業務模塊/數據明細表/年月日文件名,如下圖所示:
改造“冷導出”方案中數據導出腳本,從數據庫中獲取昨天的歷史數據每天存一份集文件,用年月日命名,代碼如下:
A | B | |
1 | =file(“D:/PT/”+string(after(date(now()),-1),"yyyyMMdd")) | =connect("demo") |
2 | =B1.cursor@x("SELECT * FROM meta_resource WHERE DATE_FORMAT(fildate,'%Y-%m-%d')=?", after(date(now()),-1)) | >A1.export@b(A2) |
A1:按路徑打開需要導出的集文件路徑,每天一個,用年月日命名
前面已經解釋過的格子的代碼這里不再贅述。
首先,我們需要寫一個工具腳本,主要功能是能夠根據傳入的開始日期、結束日期,過濾出需要查詢跨度范圍的多個集文件路徑,同時判斷路徑下的集文件對象是否存在。腳本命名為:判斷讀取文件的范圍.dfx,編寫代碼如下:
A | |
1 | =if(endDate>=date(now()),if(now()>datetime(concat(date(now())," 06:00:00")),after(endDate,-1),after(endDate,-2)),endDate) |
2 | =periods(startDate,A1,1) |
3 | =A2.(path+string(~,"yyyyMMdd")) |
4 | =A3.select(file(~).exists()) |
5 | return A4 |
腳本接收3個參數,開始日期(startDate),結束日期(endDate),集文件的存儲路徑(path)
A1:當傳入的結束日期>=當前系統日期時,并且當前時間是在當天6點之后的,返回昨天日期,在當天6點之前的,返回前天日期,否則就返回傳入的實際結束日期
A2:根據開始日期,計算后的結束日期,默認按天間隔獲取日期范圍
A3:循環A2,通過集文件的存儲路徑與該日期段內的年月日進行拼接,利用string()函數進行格式化
A4:判斷路徑下的文件是否真實存在,由A5返回實際存在的文件路徑,最終結果如下圖:
然后,我們需要對前面章節中“混合運算場景”數據查詢的腳本做一些改造,值得注意的是這里將采用多路游標的概念,將多個游標合并成一個游標使用,改造后的腳本如下:
A | B | C | |
1 | =connect("demo") | =A1.query("SELECT code,name FROM watch") | |
2 | =A1.cursor@x("SELECT code,nums FROM meta_resource WHERE "+if(Efiledate>=date(now()),if(now()>datetime(concat(date(now())," 06:00:00")),"DATE_FORMAT(fildate,'%Y-%m-%d')>='"+string(date(now()))+"'","DATE_FORMAT(fildate,'%Y-%m-%d')>='"+string(after(date(now()),-1))+"'"),"1=0")) | ||
3 | =call("D:/PT/判斷讀取文件的范圍.dfx",Bfiledate,Efiledate,"D:/PT/數據表A/") | ||
4 | =A3.(file(~).cursor@b()) | =(A2|A4).mcursor() | |
5 | =B4.groups(code:不良代碼,code:不良名稱;sum(nums):不良數量,sum(nums):不良累計數量,sum(0):不良累計比率) | ||
6 | >A5.switch(不良名稱,B1:code) | =A5.sum(不良數量) | =A5.sort(不良數量:-1) |
7 | >C6.run(不良累計數量+=不良累計數量[-1]) | >C6.run(不良累計比率=round((不良累計數量/B6)*100,2)) | |
8 | =C6.new(不良代碼,不良名稱.name:不良名稱,不良數量,不良累計數量,不良累計比率) | ||
9 | return A8 |
前面已經解釋過的格子代碼這里不再贅述。
A2:建立數據庫游標,根據邏輯判斷動態拼接sql,當查詢的結束日期>=當前系統日期時,并且當前查詢時間點是在當天6點之后的,只查詢當天的實時數據,當前查詢時間點發生在當天6點之前的,查詢返回昨天和當天的實時數據;否則都不滿足的情況下,做一次結果為空的查詢動作,適應只查詢歷史數據的業務場景。
A3:調用”判斷讀取文件的范圍.dfx”,傳入腳本參數開始日期、結束日期的值,獲得起止日期內的所有集文件的集合
A4:循環A3,分別打開每個集文件對象,根據文件創建游標,其中cursor()函數使用@b選項代表從集文件中讀取。
B4:利用集算器提供的多路游標概念,把數據結構相同的多個游標合并成一個游標使用。使用時,多路游標采用并行計算來處理各個游標的數據,可以通過設置cs.mcursor(n) 函數中的n來決定并行數,當n空缺時,將按默認自動設置并行數
A9:最后返回結果集給報表工具使用
所謂熱導出,是相對于冷導出而言的。熱導出要保證查詢系統永不停機,在導出數據的過程中有查詢請求進來,依然能夠工作。熱導出一般適用于實時查詢場景要求較高的情況。
熱導出需要利用文件的備份機制結合數據庫的一致性來實現熱切換動作。為了便于理解,可參考以下邏輯圖:
首先,在數據庫中建備份表,主要目的是為了記錄當前正在使用的是哪個備份文件,以及從DB中取的熱數據的日期范圍,查詢系統啟動時把這個表清空。
其次,導出歷史數據到集文件A,同時備份一個文件B,然后在數據庫備份表中記錄該文件A,以及設定從DB中取的熱數據的日期(比如某個時刻之后);這個動作在系統初始化運行時,只做一次。
然后,設計數據查詢的流程:
1、在數據庫中建狀態表,當數據查詢時,先從備份表中查出可用的備份是哪個文件以及熱數據的日期范圍,然后加入一條記錄到狀態表中,表明該備份文件正有一個查詢,當查詢完成后將在狀態表中把這條記錄刪除,可以用自增列的方式。
再次,設計數據導出到集文件的流程:
1、每天凌晨2點執行定時任務,先同步歷史數據追加到文件B上,當導出完成后,修改數據庫備份表的記錄為使用文件B,同時修改從DB中取熱數據的范圍,以后新產生的查詢動作都將使用文件B
2、檢查并等待狀態表中A的使用記錄都已清空(基于A的所有查詢都結束了),這時才會同步歷史數據追加到文件A上,否則每等待1分鐘就循環檢查一次。
3、當步驟2的數據追加完成后,再修改數據庫備份表為使用文件A,以后的新產生的查詢又會回到了使用文件A,從而達到熱切換的動作。
4、直到等待狀態表中B的使用記錄都清空(基于B的查詢也都結束了)
5、整個過程執行完成,可以等待下一輪導出
這里需要特別說明的是,備份表、狀態表必須用數據庫作為媒介,從而利用數據庫的一致性;不能用文件記錄備份表、狀態表的內容,因為文件無法保持一致性,當多任務并發時可能就亂了。
第一步,在數據庫中定義”備份表”,包含三個字段(文件名稱/邊界時間/標識),同時定義”查詢狀態表”,包含三個字段(唯一標識/文件名稱/當前系統時間,其中定義唯一標識為自增列),數據結構分別如下圖示:
第二步,通過集文件A備份一個集文件B,然后在“備份表”中記錄可查詢的備份文件為A,并設定從DB中取的熱數據邊界時間(定義為每日的零點),此步操作如果用集算器腳本執行,樣例代碼如下:
A | B | |
1 | =movefile@c(file("D:/PT/MES-A.btx"),"D:/PT/MES-B.btx") | =connect("demo") |
2 | >B1.execute("INSERT INTO backup (name,crashtime,flag) VALUES (?,?,?)","A",date(now()),"WORKING_STATUS") | >B1.close() |
A1:根據導出的集文件A,復制備份同樣的文件B
A2:備份文件B完成后,往數據表中寫入當前可用的集文件A,當前系統時間(零點),給定標識列為:WORKING_STATUS
第三步,我們需要對前面章節中“混合運算場景”數據查詢的腳本做一些改造,改造后的腳本如下(此例中也支持僅查歷史數據的情況):
A | B | |
1 | =connect("demo") | =A1.query("SELECT code,name FROM watch") |
2 | =A1.query@1("SELECT NAME,crashtime FROM BACKUP WHERE flag='WORKING_STATUS'") | =name=A2(1),crashtime=A2(2) |
3 | =A1.cursor("SELECT code,nums FROM meta_resource WHERE "+if(Efiledate>=date(crashtime),"DATE_FORMAT(fildate,'%Y-%m-%d')>='"+string(date(crashtime))+"'","1=0")) | >A1.execute("INSERT INTO status (name,time) VALUES (?,?)",name,now()),uniques =A1.query@1("SELECT @@identity") |
4 | =file(concat("D:/PT/MES-",name,".btx")).cursor@b().select(filedate>=Bfiledate && filedate<Efiledate) | =[A3,A4].conjx() |
5 | =B4.groups(code:不良代碼,code:不良名稱;sum(nums):不良數量,sum(nums):不良累計數量,sum(0):不良累計比率) | |
6 | >A5.switch(不良名稱,B1:code) | =A5.sort(不良數量:-1) |
7 | >B6.run(不良累計數量+=不良累計數量[-1]) | =A5.sum(不良數量) |
8 | >B6.run(不良累計比率=round((不良累計數量/B7)*100,2)) | =B6.new(不良代碼,不良名稱.name:不良名稱,不良數量,不良累計數量,不良累計比率) |
9 | >A1.execute("DELETE FROM STATUS WHERE uniques=?",uniques) | >A1.close() |
10 | return B8 |
前面已經解釋過的格子代碼這里不再贅述。
A2:根據標識WORKING_STATUS作為條件,查詢出來當前可用的集文件名稱,以及熱數據取值的邊界日期時間
B2:定義變量name, crashtime并賦值,便于后面單元格計算引用。
B3:此單元格做了兩步動作,首先,寫入一條記錄到狀態表中,表明該當前備份文件正有一個查詢,其中uniques為自增列;接著在插入記錄后,通過執行【SELECT @@IDENTITY】獲取上一條插入語句中生成的自增長字段的值,賦值給變量uniques,便于A9查詢時引用。數據庫中的效果如下圖:
A9:當查詢完成后,根據變量uniques的值作為條件,在狀態表中把這條記錄刪除,效果如下圖:
改造“冷導出”方案中數據導出腳本,每天凌晨2點定時執行,代碼如下:
A | B | C | |
1 | =file("D:/PT/MES-B.btx") | =connect("demo") | =file("D:/PT/MES-A.btx") |
2 | =B1.cursor("SELECT * FROM meta_resource WHERE DATE_FORMAT(fildate,'%Y-%m-%d')=?", after(date(now()),-1)) | >A1.export@ab(A2) | >B1.execute("UPDATE BACKUP SET NAME = ?,crashtime=? WHERE flag ='WORKING_STATUS'","B",date(now())) |
3 | for connect("demo").query@1x("SELECT COUNT(*) FROM STATUS WHERE NAME='A'")>0 | >sleep(60*1000) | |
4 | =B1.cursor("SELECT * FROM meta_resource WHERE DATE_FORMAT(fildate,'%Y-%m-%d')=?", after(date(now()),-1)) | >C1.export@ab(A4) | >B1.execute("UPDATE BACKUP SET NAME = ?,crashtime=? WHERE flag = 'WORKING_STATUS'","A",date(now())) |
5 | for connect("demo").query@1x("SELECT COUNT(*) FROM STATUS WHERE NAME='B'")>0 | >sleep(60*1000) | |
6 | >B1.close() |
前面已經解釋過的格子代碼這里不再贅述。
C2:當歷史數據同步追加到文件B上,修改數據庫”備份表”的記錄為使用文件B,同時修改從DB中取熱數據的邊界日期范圍,執行結果如下圖:
A3-B3:循環查詢”狀態表”中A的使用記錄是否已清空,若發現還有基于A的查詢沒有結束,那就等待1分鐘,然后接著循環,直到基于A的查詢全部結束;
其中A3的數據庫連接表達式需要特別說明一下:通常情況下,在B1單元格中已經定義了數據庫連接,在A3中,可直接引用,寫成:
for B1.query@1("SELECT COUNT(*) FROM STATUS WHERE NAME='A'")>0
不過,有些數據庫在默認情況下做成了一次連接只處理一個事務,這樣會導致A3在循環的時候結果不會變化,總是按照第一次查詢出來的結果為主,比如第一次查詢返回是true,當數據庫發生變化了,它還是返回true,為了保險期間,可以寫成如下格式:
for connect("demo").query@1x("SELECT COUNT(*) FROM STATUS WHERE NAME='A'")>0
這個屬于數據庫配置的范疇,可以通過數據庫的連接參數來控制,這里不再詳解。
C4:當文件A的數據追加完成后,再修改數據庫”備份表”的記錄為使用文件A,以后新產生的查詢就會再使用文件A,執行結果如下圖:
A5-B5:循環查詢”狀態表”中B的使用記錄是否已清空,若發現還有基于B的查詢沒有結束,那就等待1分鐘,然后接著循環,直到基于B的查詢全部結束, 此輪整個導出過程全部完成,然后等待下一輪導出
實時報表(T+0)的場景下,數據的熱導出是個有些復雜的話題,不過,利用集算器(集文件)的備份機制結合數據庫的一致性就可以輕松應對這類難題了,其中主要用到了以下兩個優勢:
1、跨庫混合計算
集算器作為獨立的計算引擎,可以并行指揮各個數據庫分別計算,收集結果后再進行一輪匯總運算,然后向前端提交或者落地,從而可以很簡單的實現T+0全量查詢報表。
同時,在集算器跨庫混合計算模型下,也不要求數據庫是否同構,歷史數據可以選擇存儲在成本更低的開源數據庫中,例如Oracle和MySQL的混搭集群。
2、高性價比、高性能的集文件
無需構建數倉,將歷史數據外置存放到文件系統中,不僅便于管理,而且可以獲得更高效的IO性能和計算能力,從而很好的解決了關系型數據庫中由于數據量大而導致的性能瓶頸和存儲成本。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。