您好,登錄后才能下訂單哦!
這篇文章主要講解了“flink中怎么使用自定義聚合函數統計網站TP指標”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“flink中怎么使用自定義聚合函數統計網站TP指標”吧!
在網站性能測試中,我們經常會選擇 TP50、TP95 或者 TP99 等作為性能指標。接下來我們講講這些指標的含義、以及在flink中如何實時統計:
TP50,top percent 50,即 50% 的數據都滿足某一條件;
TP95,top percent 95,即 95% 的數據都滿足某一條件;
TP99,top percent 99,即 99% 的數據都滿足某一條件;
我們舉一個例子,我們要統計網站一分鐘之內的的響應時間的TP90,正常的處理邏輯就是把這一分鐘之內所有的網站的響應時間從小到大排序,然后計算出總條數count,然后計算出排名在90%的響應時間是多少(count*0.9),就是我們要的值。
這個需求很明顯就是一個使用聚合函數來做的案例,Flink中提供了大量的聚合函數,比如count,max,min等等,但是對于這個需求,卻無法滿足,所以我們需要自定義一個聚合函數來實現我們的需求。
在前段時間,我們聊了聊flink的聚合算子,具體可參考: flink實戰-聊一聊flink中的聚合算子 , 聚合算子是我們在寫代碼的時候用來實現一個聚合功能,聚合函數其實和聚合算子類似,只不過聚合函數用于在寫sql的時候使用。
自定義聚合函數需要繼承抽象類org.apache.flink.table.functions.AggregateFunction。并實現下面幾個方法。
createAccumulator():這個方法會在一次聚合操作的開始調用一次,主要用于構造一個Accumulator,用于存儲在聚合過程中的臨時對象。
accumulate() 這個方法,每來一條數據會調用一次這個方法,我們就在這個方法里實現我們的聚合函數的具體邏輯。
getValue() 這個方法是在聚合結束以后,對中間結果做處理,然后將結果返回,最終sql中得到的結果數據就是這個值。
對于TP指標,正常的思路我們可以先創建一個臨時變量,里面有一個list,每來一個數據,就放到這個list里面,在getValue方法里,進行排序,取相應的TP值。
但是這種思路會有一個問題,就是如果要聚合的時間范圍內,數據過多的話。就會在list存儲大量的數據,會造成checkpoint過大,時間過長,最后導致程序失敗。得不到正確的結果。
所以我們需要換一個思路,既然最后我們想要的是一個有序列表,那么我們是不是可以把這個list結構優化一下,使用Treemap來存儲,map的key就是指標,比如響應時間。value就是對應的指標出現的次數。這樣getValue方法里,只需要將map的value值累加,就能得到總數count,然后計算出來相應的tp值的位置position,最后我們再從頭累加map的value,直到累加結果大于相應的位置position,則map的key即為所求。
示例如下:我們先構建一個source,只是隨機生成一個變量,網站的相應時間response_time。
String sql = "CREATE TABLE source (\n" +
" response_time INT,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts," +
"proctime as proctime()\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1000',\n" +
" 'fields.response_time.min'='1',\n" +
" 'fields.response_time.max'='1000'" +
")";
定義一個聚合函數用的臨時變量:
public static class TPAccum{
public Integer tp;
public Map<Integer,Integer> map = new HashMap<>();
}
實現自定義聚合函數類
public static class TP extends AggregateFunction<Integer,TPAccum>{
@Override
public TPAccum createAccumulator(){
return new TPAccum();
}
@Override
public Integer getValue(TPAccum acc){
if (acc.map.size() == 0){
return null;
} else {
Map<Integer,Integer> map = new TreeMap<>(acc.map);
int sum = map.values().stream().reduce(0, Integer::sum);
int tp = acc.tp;
int responseTime = 0;
int p = 0;
Double d = sum * (tp / 100D);
for (Map.Entry<Integer,Integer> entry: map.entrySet()){
p += entry.getValue();
int position = d.intValue() - 1;
if (p >= position){
responseTime = entry.getKey();
break;
}
}
return responseTime;
}
}
public void accumulate(TPAccum acc, Integer iValue, Integer tp){
acc.tp = tp;
if (acc.map.containsKey(iValue)){
acc.map.put(iValue, acc.map.get(iValue) + 1);
} else {
acc.map.put(iValue, 1);
}
}
}
實際的查詢sql如下:
String sqlSelect =
"select TUMBLE_START(proctime,INTERVAL '1' SECOND) as starttime,mytp(response_time,50) from source" +
" group by TUMBLE(proctime,INTERVAL '1' SECOND)";
感謝各位的閱讀,以上就是“flink中怎么使用自定義聚合函數統計網站TP指標”的內容了,經過本文的學習后,相信大家對flink中怎么使用自定義聚合函數統計網站TP指標這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。