您好,登錄后才能下訂單哦!
這篇文章主要介紹了hive中lateral view怎么用,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
Lateral view與UDTF函數一起使用,UDTF對每個輸入行產生0或者多個輸出行。Lateral view首先在基表的每個輸入行應用UDTF,然后連接結果輸出行與輸入行組成擁有指定表別名的虛擬表。
explain SELECT id, sq,myCol from window_test_table LATERAL VIEW explode(split(sq,',')) myTab as myCol;
這個sql 經歷了兩條線:
ts(TableScan)-->lvf(Lateral View Forward)-->sel(Select)-->lvj(Lateral View Join)-->sel(Select) ts(TableScan)-->lvf(Lateral View Forward)-->sel(Select)-->udtf-->lvj(Lateral View Join)-->sel(Select)
不多說,常規讀表操作
@Override public void process(Object row, int tag) throws HiveException { forward(row, inputObjInspectors[tag]); }
幾乎什么都沒做,數據怎么來的,還怎么送出去。
篩選出你需要的非explode的列:id,sq
篩選出explode的列:split(sq, ',')
@Override public void process(Object row, int tag) throws HiveException { StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag]; List<? extends StructField> fields = soi.getAllStructFieldRefs(); //從row里解出字段 for (int i = 0; i < fields.size(); i++) { objToSendToUDTF[i] = soi.getStructFieldData(row, fields.get(i)); } //真正處理數據的是 genericUDTF的某個實現類,比如,explode,那就是GenericUDTFExplode.java 的process genericUDTF.process(objToSendToUDTF); //這里判斷一下有沒有outer關鍵字。這里真的真的真的是,可能用了很久了,還不知道udtf還有個outer 關鍵字 if (conf.isOuterLV() && collector.getCounter() == 0) { //思考一下這一步是干嘛? collector.collect(outerObj); } collector.reset(); }
GenericUDTFExplode.java就相當容易理解了,畢竟我們自己寫udtf時,也是這么做的:
/** * GenericUDTFExplode. * */ @Description(name = "explode", value = "_FUNC_(a) - separates the elements of array a into multiple rows," + " or the elements of a map into multiple rows and columns ") public class GenericUDTFExplode extends GenericUDTF { .... @Override //主要處理數據的方法 public void process(Object[] o) throws HiveException { switch (inputOI.getCategory()) { case LIST: //處理list ListObjectInspector listOI = (ListObjectInspector)inputOI; List<?> list = listOI.getList(o[0]); if (list == null) { return; //當數組里沒有值時,不發送數據 } for (Object r : list) { forwardListObj[0] = r; forward(forwardListObj); } break; case MAP: //處理map MapObjectInspector mapOI = (MapObjectInspector)inputOI; Map<?,?> map = mapOI.getMap(o[0]); if (map == null) { return; } for (Entry<?,?> r : map.entrySet()) { forwardMapObj[0] = r.getKey(); forwardMapObj[1] = r.getValue(); forward(forwardMapObj); } break; default: throw new TaskExecutionException("explode() can only operate on an array or a map"); } } .... }
當UDTF不產生任何行時,比如explode()函數的輸入列為空,LATERALVIEW就不會生成任何輸出行。在這種情況下原有行永遠不會出現在結果中。OUTRE可被用于阻止這種情況,輸出行中來自UDTF的列將被設置為NULL。
例如:
實際上從代碼里,也能夠看到:
UDTF會借助UDTFCollector為其展開的結果計數,并forward:
@Override public void collect(Object input) throws HiveException { op.forwardUDTFOutput(input); counter++; }
如果沒有展開結果,counter就為0。 這樣,進入outer之后,會把之前建好的沒有內容的outerObj給forward到下個 算子LateralViewJoinOperator
@Override public void process(Object row, int tag) throws HiveException { StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag]; //標識是左側select過來的 if (tag == SELECT_TAG) { selectObjs.clear(); selectObjs.addAll(soi.getStructFieldsDataAsList(row)); } else if (tag == UDTF_TAG) { //代表是右側udtf過來的 acc.clear(); acc.addAll(selectObjs); acc.addAll(soi.getStructFieldsDataAsList(row)); //合并數據 forward(acc, outputObjInspector); } else { throw new HiveException("Invalid tag"); } }
LateralViewJoinOperator處理邏輯也是很簡單明了,這里的join也是簡單的List.addAll
Lateral view explode 會產生shuffle嗎?
當然不會,毋庸置疑! 其實一開始看執行計劃就會發現,沒有reduce任務呀~~
這里的Join代表的是兩份數據聯接到一起的意思,并不是真正的意義上的join。
感謝你能夠認真閱讀完這篇文章,希望小編分享的“hive中lateral view怎么用”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。