您好,登錄后才能下訂單哦!
這篇文章是來自Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions:的不嚴格翻譯,因為翻譯的文章示例寫得比較通俗易懂,此外,我把自己對于Hive的UDAF理解穿插到文章里面。
udfa是Hive中用戶自定義的聚集函數,hive內置UDAF函數包括有sum()與count(),UDAF實現有簡單與通用兩種方式,簡單UDAF因為使用Java反射導致性能損失,而且有些特性不能使用,已經被棄用了;在這篇博文中我們將關注Hive中自定義聚類函數-GenericUDAF,UDAF開發主要涉及到以下兩個抽象類:
[java]?view plain?copy
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver??
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator??
博文中的所有的代碼和數據可以在以下鏈接找到:hive examples
首先先創建一張包含示例數據的表:people,該表只有name一列,該列中包含了一個或多個名字,該表數據保存在people.txt文件中。
[plain]?view plain?copy
~$?cat?./people.txt??
??
John?Smith??
John?and?Ann?White??
Ted?Green??
Dorothy??
把該文件上載到hdfs目錄/user/matthew/people中:
[plain]?view plain?copy
hadoop?fs?-mkdir?people??
hadoop?fs?-put?./people.txt?people??
下面要創建hive外部表,在hive shell中執行
[sql]?view plain?copy
CREATE?EXTERNAL?TABLE?people?(name?string)??
ROW?FORMAT?DELIMITED?FIELDS???
????TERMINATED?BY?'\t'???
????ESCAPED?BY?''???
????LINES?TERMINATED?BY?'\n'??
STORED?AS?TEXTFILE???
LOCATION?'/user/matthew/people';??
創建一個GenericUDAF必須先了解以下兩個抽象類:
[java]?view plain?copy
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver???
[java]?view plain?copy
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator??
為了更好理解上述抽象類的API,要記住hive只是mapreduce函數,只不過hive已經幫助我們寫好并隱藏mapreduce,向上提供簡潔的sql函數,所以我們要結合Mapper、Combiner與Reducer來幫助我們理解這個函數。要記住在Hadoop集群中有若干臺機器,在不同的機器上Mapper與Reducer任務獨立運行。
所以大體上來說,這個UDAF函數讀取數據(mapper),聚集一堆mapper輸出到部分聚集結果(combiner),并且最終創建一個最終的聚集結果(reducer)。因為我們跨域多個combiner進行聚集,所以我們需要保存部分聚集結果。
AbstractGenericUDAFResolver
Resolver很簡單,要覆蓋實現下面方法,該方法會根據sql傳人的參數數據格式指定調用哪個Evaluator進行處理。
[java]?view plain?copy
<span?style="background-color:?rgb(255,?255,?255);"><span?style="font-size:14px;">public?GenericUDAFEvaluator?getEvaluator(TypeInfo[]?parameters)?throws?SemanticException;</span></span>??
GenericUDAFEvaluator
UDAF邏輯處理主要發生在Evaluator中,要實現該抽象類的幾個方法。
在理解Evaluator之前,必須先理解objectInspector接口與GenericUDAFEvaluator中的內部類Model。
作用主要是解耦數據使用與數據格式,使得數據流在輸入輸出端切換不同的輸入輸出格式,不同的Operator上使用不同的格式。可以參考這兩篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,里面有關于objectinspector的介紹。
Model代表了UDAF在mapreduce的各個階段。
[java]?view plain?copy
public?static?enum?Mode?{??
????/**?
?????*?PARTIAL1:?這個是mapreduce的map階段:從原始數據到部分數據聚合?
?????*?將會調用iterate()和terminatePartial()?
?????*/??
????PARTIAL1,??
????????/**?
?????*?PARTIAL2:?這個是mapreduce的map端的Combiner階段,負責在map端合并map的數據::從部分數據聚合到部分數據聚合:?
?????*?將會調用merge()?和?terminatePartial()??
?????*/??
????PARTIAL2,??
????????/**?
?????*?FINAL:?mapreduce的reduce階段:從部分數據的聚合到完全聚合??
?????*?將會調用merge()和terminate()?
?????*/??
????FINAL,??
????????/**?
?????*?COMPLETE:?如果出現了這個階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結果了:從原始數據直接到完全聚合?
??????*?將會調用?iterate()和terminate()?
?????*/??
????COMPLETE??
??};??
一般情況下,完整的UDAF邏輯是一個mapreduce過程,如果有mapper和reducer,就會經歷PARTIAL1(mapper),FINAL(reducer),如果還有combiner,那就會經歷PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。
而有一些情況下的mapreduce,只有mapper,而沒有reducer,所以就會只有COMPLETE階段,這個階段直接輸入原始數據,出結果。
[java]?view plain?copy
//?確定各個階段輸入輸出參數的數據格式ObjectInspectors??
public??ObjectInspector?init(Mode?m,?ObjectInspector[]?parameters)?throws?HiveException;??
??
//?保存數據聚集結果的類??
abstract?AggregationBuffer?getNewAggregationBuffer()?throws?HiveException;??
??
//?重置聚集結果??
public?void?reset(AggregationBuffer?agg)?throws?HiveException;??
??
//?map階段,迭代處理輸入sql傳過來的列數據??
public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)?throws?HiveException;??
??
//?map與combiner結束返回結果,得到部分數據聚集結果??
public?Object?terminatePartial(AggregationBuffer?agg)?throws?HiveException;??
??
//?combiner合并map返回的結果,還有reducer合并mapper或combiner返回的結果。??
public?void?merge(AggregationBuffer?agg,?Object?partial)?throws?HiveException;??
??
//?reducer階段,輸出最終結果??
public?Object?terminate(AggregationBuffer?agg)?throws?HiveException;??
Model各階段對應Evaluator方法調用
Evaluator各個階段下處理mapreduce流程
下面將講述一個聚集函數UDAF的實例,我們將計算people這張表中的name列字母的個數。
下面的函數代碼是計算指定列中字符的總數(包括空格)
[java]?view plain?copy
@Description(name?=?"letters",?value?=?"_FUNC_(expr)?-?返回該列中所有字符串的字符總數")??
public?class?TotalNumOfLettersGenericUDAF?extends?AbstractGenericUDAFResolver?{??
??
????@Override??
????public?GenericUDAFEvaluator?getEvaluator(TypeInfo[]?parameters)??
????????????throws?SemanticException?{??
????????if?(parameters.length?!=?1)?{??
????????????throw?new?UDFArgumentTypeException(parameters.length?-?1,??
????????????????????"Exactly?one?argument?is?expected.");??
????????}??
??????????
????????ObjectInspector?oi?=?TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);??
??????????
????????if?(oi.getCategory()?!=?ObjectInspector.Category.PRIMITIVE){??
????????????throw?new?UDFArgumentTypeException(0,??
????????????????????????????"Argument?must?be?PRIMITIVE,?but?"??
????????????????????????????+?oi.getCategory().name()??
????????????????????????????+?"?was?passed.");??
????????}??
??????????
????????PrimitiveObjectInspector?inputOI?=?(PrimitiveObjectInspector)?oi;??
??????????
????????if?(inputOI.getPrimitiveCategory()?!=?PrimitiveObjectInspector.PrimitiveCategory.STRING){??
????????????throw?new?UDFArgumentTypeException(0,??
????????????????????????????"Argument?must?be?String,?but?"??
????????????????????????????+?inputOI.getPrimitiveCategory().name()??
????????????????????????????+?"?was?passed.");??
????????}??
??????????
????????return?new?TotalNumOfLettersEvaluator();??
????}??
??
????public?static?class?TotalNumOfLettersEvaluator?extends?GenericUDAFEvaluator?{??
??
????????PrimitiveObjectInspector?inputOI;??
????????ObjectInspector?outputOI;??
????????PrimitiveObjectInspector?integerOI;??
??????????
????????int?total?=?0;??
??
????????@Override??
????????public?ObjectInspector?init(Mode?m,?ObjectInspector[]?parameters)??
????????????????throws?HiveException?{??
??????????????
????????????assert?(parameters.length?==?1);??
????????????super.init(m,?parameters);??
?????????????
?????????????//map階段讀取sql列,輸入為String基礎數據格式??
????????????if?(m?==?Mode.PARTIAL1?||?m?==?Mode.COMPLETE)?{??
????????????????inputOI?=?(PrimitiveObjectInspector)?parameters[0];??
????????????}?else?{??
????????????//其余階段,輸入為Integer基礎數據格式??
????????????????integerOI?=?(PrimitiveObjectInspector)?parameters[0];??
????????????}??
??
?????????????//?指定各個階段輸出數據格式都為Integer類型??
????????????outputOI?=?ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,??
????????????????????ObjectInspectorOptions.JAVA);??
????????????return?outputOI;??
??
????????}??
??
????????/**?
?????????*?存儲當前字符總數的類?
?????????*/??
????????static?class?LetterSumAgg?implements?AggregationBuffer?{??
????????????int?sum?=?0;??
????????????void?add(int?num){??
????????????????sum?+=?num;??
????????????}??
????????}??
??
????????@Override??
????????public?AggregationBuffer?getNewAggregationBuffer()?throws?HiveException?{??
????????????LetterSumAgg?result?=?new?LetterSumAgg();??
????????????return?result;??
????????}??
??
????????@Override??
????????public?void?reset(AggregationBuffer?agg)?throws?HiveException?{??
????????????LetterSumAgg?myagg?=?new?LetterSumAgg();??
????????}??
??????????
????????private?boolean?warned?=?false;??
??
????????@Override??
????????public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)??
????????????????throws?HiveException?{??
????????????assert?(parameters.length?==?1);??
????????????if?(parameters[0]?!=?null)?{??
????????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??
????????????????Object?p1?=?((PrimitiveObjectInspector)?inputOI).getPrimitiveJavaObject(parameters[0]);??
????????????????myagg.add(String.valueOf(p1).length());??
????????????}??
????????}??
??
????????@Override??
????????public?Object?terminatePartial(AggregationBuffer?agg)?throws?HiveException?{??
????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??
????????????total?+=?myagg.sum;??
????????????return?total;??
????????}??
??
????????@Override??
????????public?void?merge(AggregationBuffer?agg,?Object?partial)??
????????????????throws?HiveException?{??
????????????if?(partial?!=?null)?{??
??????????????????
????????????????LetterSumAgg?myagg1?=?(LetterSumAgg)?agg;??
??????????????????
????????????????Integer?partialSum?=?(Integer)?integerOI.getPrimitiveJavaObject(partial);??
??????????????????
????????????????LetterSumAgg?myagg2?=?new?LetterSumAgg();??
??????????????????
????????????????myagg2.add(partialSum);??
????????????????myagg1.add(myagg2.sum);??
????????????}??
????????}??
??
????????@Override??
????????public?Object?terminate(AggregationBuffer?agg)?throws?HiveException?{??
????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??
????????????total?=?myagg.sum;??
????????????return?myagg.sum;??
????????}??
??
????}??
}??
這里有一些關于combiner的資源,Philippe Adjiman?講得不錯。
AggregationBuffer
?允許我們保存中間結果,通過定義我們的buffer,我們可以處理任何格式的數據,在代碼例子中字符總數保存在AggregationBuffer
?。
[java]?view plain?copy
/**?
*?保存當前字符總數的類?
*/??
static?class?LetterSumAgg?implements?AggregationBuffer?{??
????int?sum?=?0;??
????void?add(int?num){??
????????sum?+=?num;??
????}??
}??
這意味著UDAF在不同的mapreduce階段會接收到不同的輸入。Iterate讀取我們表中的一行(或者準確來說是表),然后輸出其他數據格式的聚集結果。
artialAggregation
合并這些聚集結果到另外相同格式的新的聚集結果,然后最終的reducer取得這些聚集結果然后輸出最終結果(該結果或許與接收數據的格式不一致)。
在init()方法中我們指定輸入為string,結果輸出格式為integer,還有,部分聚集結果輸出格式為integer(保存在aggregation buffer中);terminate()
與
terminatePartial()
兩者輸出一個
integer
。
[java]?view plain?copy
//?init方法中根據不同的mode指定輸出數據的格式objectinspector??
if?(m?==?Mode.PARTIAL1?||?m?==?Mode.COMPLETE)?{??
????inputOI?=?(PrimitiveObjectInspector)?parameters[0];??
}?else?{??
????integerOI?=?(PrimitiveObjectInspector)?parameters[0];??
}??
??
//?不同model階段的輸出數據格式??
outputOI?=?ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,??
????????????????????ObjectInspectorOptions.JAVA);??
iterate()
函數讀取到每行中列的字符串,計算與保存該字符串的長度
[java]?view plain?copy
public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)??
????throws?HiveException?{??
????...??
????Object?p1?=?((PrimitiveObjectInspector)?inputOI).getPrimitiveJavaObject(parameters[0]);??
????myagg.add(String.valueOf(p1).length());??
????}??
}??
Merge函數增加部分聚集總數到AggregationBuffer
[java]?view plain?copy
public?void?merge(AggregationBuffer?agg,?Object?partial)??
????????throws?HiveException?{??
????if?(partial?!=?null)?{??
??????????????????
????????LetterSumAgg?myagg1?=?(LetterSumAgg)?agg;??
??????????????????
????????Integer?partialSum?=?(Integer)?integerOI.getPrimitiveJavaObject(partial);??
??????????????????
????????LetterSumAgg?myagg2?=?new?LetterSumAgg();??
??????????????????
????????myagg2.add(partialSum);??
????????myagg1.add(myagg2.sum);??
????}??
}??
Terminate()函數返回AggregationBuffer中的內容,這里產生了最終結果。
[java]?view plain?copy
public?Object?terminate(AggregationBuffer?agg)?throws?HiveException?{??
????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??
????total?=?myagg.sum;??
????return?myagg.sum;??
}??
[plain]?view plain?copy
ADD?JAR?./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;??
CREATE?TEMPORARY?FUNCTION?letters?as?'com.matthewrathbone.example.TotalNumOfLettersGenericUDAF';??
??
SELECT?letters(name)?FROM?people;??
OK??
44??
Time?taken:?20.688?seconds ?
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。