您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Flink支持的數據類型有哪些,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
Flink 對可以在 DataSet 或 DataStream 中的元素類型進行了一些限制。這樣做的原因是系統會分析類型以確定有效的執行策略。
1.Java Tuple 和 Scala Case類;
2.Java POJO;
3.基本類型;
4.通用類;
5.值;
6.Hadoop Writables;
7.特殊類型
Tuple類型 Tuple
是flink
一個很特殊的類型 (元組類型),是一個抽象類,共26個Tuple
子類繼承Tuple
他們是 Tuple0
一直到Tuple25
package org.apache.flink.api.java.tuple; import java.io.Serializable; import org.apache.flink.annotation.Public; import org.apache.flink.types.NullFieldException; @Public public abstract class Tuple implements Serializable { private static final long serialVersionUID = 1L; public static final int MAX_ARITY = 25; private static final Class<?>[] CLASSES = new Class[]{Tuple0.class, Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class}; public Tuple() { } public abstract <T> T getField(int var1); public <T> T getFieldNotNull(int pos) { T field = this.getField(pos); if (field != null) { return field; } else { throw new NullFieldException(pos); } } public abstract <T> void setField(T var1, int var2); public abstract int getArity(); public abstract <T extends Tuple> T copy(); public static Class<? extends Tuple> getTupleClass(int arity) { if (arity >= 0 && arity <= 25) { return CLASSES[arity]; } else { throw new IllegalArgumentException("The tuple arity must be in [0, 25]."); } } public static Tuple newInstance(int arity) { switch(arity) { case 0: return Tuple0.INSTANCE; case 1: return new Tuple1(); case 2: return new Tuple2(); case 3: return new Tuple3(); case 4: return new Tuple4(); case 5: return new Tuple5(); case 6: return new Tuple6(); case 7: return new Tuple7(); case 8: return new Tuple8(); case 9: return new Tuple9(); case 10: return new Tuple10(); case 11: return new Tuple11(); case 12: return new Tuple12(); case 13: return new Tuple13(); case 14: return new Tuple14(); case 15: return new Tuple15(); case 16: return new Tuple16(); case 17: return new Tuple17(); case 18: return new Tuple18(); case 19: return new Tuple19(); case 20: return new Tuple20(); case 21: return new Tuple21(); case 22: return new Tuple22(); case 23: return new Tuple23(); case 24: return new Tuple24(); case 25: return new Tuple25(); default: throw new IllegalArgumentException("The tuple arity must be in [0, 25]."); } } }
查看源碼我們看到Tuple0
一直到Tuple25
我們看flink為我們為我們構造好了0-25個字段的模板類
ackage org.apache.flink.api.java.tuple; import java.io.ObjectStreamException; import org.apache.flink.annotation.Public; @Public public class Tuple0 extends Tuple { private static final long serialVersionUID = 1L; public static final Tuple0 INSTANCE = new Tuple0(); public Tuple0() { } public int getArity() { return 0; } public <T> T getField(int pos) { throw new IndexOutOfBoundsException(String.valueOf(pos)); } public <T> void setField(T value, int pos) { throw new IndexOutOfBoundsException(String.valueOf(pos)); } public Tuple0 copy() { return new Tuple0(); } public String toString() { return "()"; } public boolean equals(Object o) { return this == o || o instanceof Tuple0; } public int hashCode() { return 0; } private Object readResolve() throws ObjectStreamException { return INSTANCE; } }
方式一:初始化元組
可使用靜態方法 newInstance進行元組構造 指定元組空間大小;
ex: 1 則元組只有一個空間,則實際使用的Tuple1 字段只有f0
ex: 12 則元組只有兩個空間,則實際使用的Tuple2 字段只有f0,f1
指定 Tuple元組空間大小 (可理解為字段個數)
Tuple tuple = Tuple.newInstance(1);
方式一:構造元組
使用Tuple.newInstance(xx),指定元組空間大小的話,這樣存取雖然能夠實現,但會存在存儲索引位置使用不正確的情況,可能由于失誤操作編寫出索引越界異常,而且使用不太方便,使用Tuplex.of(數據)方法構造Tuple元組
Tuple3<String, String, String> tuple3 = Tuple3.of("test0", "test1", "test2"); System.out.println(tuple3.f0); // test0 System.out.println(tuple3.f1); // test1 System.out.println(tuple3.f2); // test2
Java和Scala的類在滿足下列條件時,將會被Flink視作特殊的POJO數據類型專門進行處理:
1.是公共類;
2.無參構造是公共的;
3.所有的屬性都是可獲得的(聲明為公共的,或提供get,set方法);
4.字段的類型必須是Flink支持的。Flink會用Avro來序列化任意的對象。
Flink會分析POJO類型的結構獲知POJO的字段。POJO類型要比一般類型好用。此外,Flink訪問POJO要比一般類型更高效。
public class WordWithCount { public String word; public int count; public WordWithCount() {} public WordWithCount(String word, int count) { this.word = word; this.count = count; } } DataStream<WordWithCount> wordCounts = env.fromElements( new WordWithCount("hello", 1), new WordWithCount("world", 2)); wordCounts.keyBy("word");
Flink支持Java和Scala所有的基本數據類型,比如 Integer,String,和Double。
Flink支持大多數的Java,Scala類(API和自定義)。包含不能序列化字段的類在增加一些限制后也可支持。遵循Java Bean規范的類一般都可以使用。
所有不能視為POJO的類Flink都會當做一般類處理。這些數據類型被視作黑箱,其內容是不可見的。通用類使用Kryo進行序列/反序列化。
通過實現org.apache.flinktypes.Value接口的read和write方法提供自定義代碼來進行序列化/反序列化,而不是使用通用的序列化框架。
Flink預定義的值類型與原生數據類型是一一對應的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。這些值類型作為原生數據類型的可變變體,他們的值是可以改變的,允許程序重用對象從而緩解GC的壓力。
它實現org.apache.hadoop.Writable接口的類型,該類型的序列化邏輯在write()和readFields()方法中實現。
Flink比較特殊的類型有以下兩種:
1.Scala的 Either、Option和Try。
2.Java ApI有自己的Either實現。
Java Api 與 Scala 的 類似Either
,它表示兩種可能類型的值,Left或Right。Either
對于錯誤處理或需要輸出兩種不同類型的記錄的運算符很有用。
類型擦除和類型推理
Java編譯器在編譯之后會丟棄很多泛型類型信息。這在Java中稱為類型擦除。這意味著在運行時,對象的實例不再知道其泛型類型。
例如,在JVM中,DataStream<String>和DataStream<Long>的實例看起來是相同的。
List<String> l1 = new ArrayList<String>(); List<Integer> l2 = new ArrayList<Integer>(); System.out.println(l1.getClass() == l2.getClass());
泛型:一種較為準確的說法就是為了參數化類型,或者說可以將類型當作參數傳遞給一個類或者是方法。Flink 的Java API會試圖去重建(可以做類型推理)這些被丟棄的類型信息,并將它們明確地存儲在數據集以及操作中。你可以通過DataStream.getType()方法來獲取類型,這個方法將返回一個TypeInformation的實例,這個實例是Flink內部表示類型的方式。
關于Flink支持的數據類型有哪些就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。