您好,登錄后才能下訂單哦!
本篇內容主要講解“flink的Transformation數據處理方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“flink的Transformation數據處理方法是什么”吧!
將一個或多個DataStream生成新的DataStream的過程被稱為Transformation。轉換過程中,每種操作類型被定義為不同的Operator,Flink能將多個Transformation組合為一個DataFlow的拓撲。
所以DataStream的轉換操作可以分為SingleDataStream、MultiDataStream、物理分區三個類型。
SingleDataStream:單個DataStream的處理邏輯。
MultiDataStream:多個DataStream的處理邏輯。
物理分區:對數據集中的并行度和數據分區調整轉換的處理邏輯。
常用作對數據集內數據的清晰和轉換。如將輸入數據的每個數值全部加1,并將數據輸出到下游。
val dataStream = evn.formElements(("a",3),("d",4),("c",4),("c",5),("a",5)) //方法一 val mapStream:DataStream[(String,Int)] = dataStream.map(t => (t._1,t._2+1)) //方法二 val mapStream:DataStream[(String,Int)] = dataStream.map( new MapFunction[(String,Int),(String, Int)]{ override def map(t: (String,Int)): (String,Int) ={ (t._1, t._2+1) } })
主要應用于處理輸入一個元素轉換為多個元素場景,如WordCount,將沒行文本數據分割,生成單詞序列。
val dataStream:DataStream[String] = environment.fromCollections() val resultStream[String] =dataStream.flatMap{str => str.split(" ")}
按條件對輸入數據集進行篩選,輸出符合條件的數據。
//通配符 val filter:DataStream[Int] = dataStream.filter{ _ %2 == 0} //運算表達式 val filter:DataStream[Int] = dataStream.filter { x => x % 2 ==0}
根據指定的key對輸入的數據集執行Partition操作,將相同的key值的數據放置到相同的區域中。
將下標為1相同的數據放到一個分區
val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3)) //指定第一個字段為分區key val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)
與MapReduce中reduce原理基本一致,將輸入的KeyedStream通過傳入用戶自定義的ReduceFunction滾動進行數據聚合處理,定義的ReduceFunction必須滿足運算結合律和交換律。
val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5)) //指定第一個字段為分區key val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0) //實現一:滾動第二個字段進行reduce相加求和 val reduceStream = keyedStream.reduce{(t1,t2) => (t1._1, t1._2+t2._2)} //實現二:實現ReduceFunction val reduceStream1 = keyedStream.reduce(new ReduceFunction[(String, Int)] { override def reduce(t1: (String,Int), t2:(String,Int)):(String, int) = { (t1._1, t1._2+ t2._2) } })
運行結果為:(c,2)(c,7)(a,3)(d,4)(a,8),結果不是最后求和的值,是將每條記錄累加后的結果輸出。
DataStream提供的聚合算子,根據指定的字段進行聚合操作,滾動產生一系列數據聚合結果。實際是將Reduce算子中函數進行封裝,封裝的聚合操作有sum、min、minBy、max、maxBy等。這樣就不需要用戶自己定義Reduce函數。
val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3)) //指定第一個字段為分區key val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0) //對第二個字段進行sum統計 val sumStream: DataStream[(Int,Int)] = keyedStream.sum(1) //輸出統計結果 sumStream.print()
聚合函數中傳入參數必須是數值型,否則會拋出異常。
//統計計算指定key最小值 val minStream: DataStream[(Int,Int)] = keyedStream.min(1) //統計計算指定key最大值 val maxStream: DataStream[(Int,Int)] = keyedStream.max(1) //統計計算指定key最小值,返回最小值對應元素 val minByStream: DataStream[(Int,Int)] = keyedStream.minBy(1) //統計計算指定key最大值,返回最大值對應元素 val maxByStream: DataStream[(Int,Int)] = keyedStream.maxBy(1)
將兩個或多個輸入的數據集合并為一個數據集,需要保證輸入待合并數據集和輸出數據集格式一致。
//創建不同數據集 val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5)) val dataStream2: DataStream [(String ,Int)]= env.fromElements(("d",1),("s",2),("a",4),("e",5),("a",6)) val dataStream3: DataStream [(String ,Int)]= env.fromElements(("a",2),("d",1),("s",2),("c",3),("b",1)) //合并兩個數據集 val unionStream = dataStream1.union(dataStream2) //合并多個數據集 val allUnionStream = dataStream1.union(dataStream2,dataStream3)
該算子為了合并兩種或多種不同類型的數據集,合并后會保留原始數據集的數類型。連接操作允許共享狀態數據,也就是說在多個數據集之間可以操作和查看對方數據集的狀態。
實例:dataStream1數據集為(String,Int)元祖類型,dataStream2數據集為Int類型,通過connect連接將兩種類型數據結合在一起,形成格式為ConnectedStream是的數據集,其內部數據為[(String,Int),Int]的混合數據類型,保留兩個數據集的數據類型。
val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5)) val dataStream2: DataStream [Int]= env.fromElements(1,2,4,5,6) //連接兩個數據集 val connectedStream :ConnectedStreams[(String, Int), Int] = dataStream1.connect(dataStream2)
注意:ConnectedStreams類型的數據集不能進行類似Print()操作,需轉換為DataStream類型數據集。
ConnectedStreams提供map()和flatMap()需要定義CoMapFunction或CoFlatMapFunction分別處理輸入的DataStream數據集,或直接傳入MapFunction來分別處理兩個數據集。
map()實例如下:
val resultStream = connectedStream.map(new CoMapFunction[(String,Int),Int,(Int, String)]{ //定義第一個數據集函數處理邏輯,輸入值為第一個DataStream override def map1(in1: (String,Int)): (Int ,String) = { (int1._2 , in1._1) } //定義第二個數據集函數處理邏輯 override def amp2(in2: Int):(Int,String) = { (int2,"default") } })
以上實例中,兩個函數會多線程交替執行產生結果,最后根據定義生成目標數據集。
flatMap()方法中指定CoFlatMapFunction。兩個函數共享number變量,代碼如下:
val resultStream2 = connectedStream.flatMap(new CoFlatMapFunction[(String,Int), Int ,(String ,Int , Int)]{ //定義共享變量 var number=0 //定義第一個數據集處理函數 override def flatMap1(in1:(String ,Int ), collector : Collector[(String,Int ,Int)]): Unit = { collector.collect((in1._1,in1._2,number)) } //定義第二個數據集處理函數 override def flatMap2(in2: Int, collector : Collector[(String , Int ,Int)]):Unit = { number=in2 } })
如果想通過指定的條件對兩個數據集進行關聯,可以借助keyBy韓碩或broadcast廣播變量實現。keyBy會將相同key的數據路由在同一個Operator中。broadcast會在執行計算邏輯前,將DataStream2數據集廣播到所有并行計算的Operator中,再根據條件對數據集進行關聯。這兩種方式本質是分布式join算子的基本實現方式。
//通過keyby函數根據指定的key連接兩個數據集 val keyedConnect: ConnectedStreams[(String ,Int ), Int] = dataStream1.connect(dataStream2).keyBy(1,0) //通過broadcast關聯兩個數據集 val broadcastConnect: BroadcastConnectedStream [(String, Int), Int] = dataStream1.connect(dataStream2.broadcast())
將一個DataStream數據集按條件進行拆分,形成兩個數據集的過程,union的逆向操作。實例:如調用split函數,指定條件判斷,根據第二個字段的奇偶性將數據集標記出來,偶數標記為event,奇數標記為odd,再通過集合將標記返回,最終生成SplitStream數據集。
//創建數據集 val DataStream1: DataStream[(String, Int)] = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5)) //合并連個DataStream數據集 val splitedStream : SplitStream[(String,Int)] = dataStream1.split(t => if(t._2 % 2 ==0 ) Seq("even") else Seq("odd"))
split函數只是標記數據,沒有拆分數據,因此需要select函數根據標記將數據切分為不同數據集。
//篩選出偶數數據集 val evenStream: DataStream[(String,Int)] = splitedStream.select("even") //篩選出奇數數據集 val oddStream: DataStream[(String,Int)] = splitedStream.select("odd") //篩選出偶數和奇數數據集 val allStream: DataStream[(String,Int)] = splitedStream.select("even","odd")
Iterate適合于迭代計算,通過每一次的迭代計算,并將計算結果反饋到下一次迭代計算中。
//創建數據集,map處理為對數據分區根據默認并行度進行平衡 val DataStream = env.fromElements(3,1,2,1,5).map{ t:Int => t} val iterated = dataStream.iterate((input: ConnectedStreams[Int , String]) => { //定義兩個map處理數據集,第一個map反饋操作,第二個map將數據輸出到下游 val head= input.map(i => (i+1).toString, s => s) (head.filter( _ == "2"), head.filter (_ != "2")) },1000) //超過1000ms沒有數據接入終止迭代
根據指定的分區策略將數據重新分發到不同節點的Task實例上執行,以此優化DataStream自身API對數據的分區控制。
隨機將數據集中數據分配到下游算子的每個分區中,優點數據相對均衡,缺點失去原有數據的分區結構
val shuffleStream=dataStream.shuffle
循環將數據集中數據進行重分區,能盡可能保證每個分區的數據平衡,可有效解決數據集的傾斜問題。
val shuffleStream= dataStream.rebalance();
一種通過循環方式進行數據重平衡的分區策略,與Roundrobin Partitioning不同,它僅會對上下游繼承的算子數據進行重新平衡,具體主要根據上下游算子的并行度決定。如上游算子的并發度為2,下游算子的并發度為4,上游算子中第一個分區數據按照同等比例將數據路由在下游的固定兩個分區中,另一個分區也是一樣。
//通過調用DataStream API中rescale()方法實現Rescaling Partitioning操作 val shuffleStream = dataStream.rescale();
將輸入的數據集復制到下游算子的并行的Tasks實例中,下游算子Tasks可直接從本地內存中獲取廣播數據集,不再依賴網絡傳輸。
這種分區策略適合于小集群,如大數據集關聯小數據集時,可通過廣播方式將小數據分發到算子的分區中。
//通過DataStream API的broadcast() 方法實現廣播分區 val shuffleStream= dataStream.broadcast()
實現自定義分區器,調用DataStream API上的partitionCustom()方法將創建的分區器應用到數據集上。
如下,自定義分區器實現將字段中包含flink關鍵字的數據放在partition為0的分區中,其余數據執行隨機分區策略,其中num Partitions是從系統中獲取的并行度參數。
Object customPartitioner extends Partitioner[String]{ //獲取隨機數生成器 val r=scala.util.Random override def partition(key: String, numPartitions: Int): Int ={ //定義分區策略,key中如果包含a則放入0分區中,其他情況則根據Partitions num隨機分區 if(key.contains("flink")) 0 else r.nextInt(numPartitions) } }
完成自定義分區器,調用DataStream API的partitionCustom應用分區器,第二個參數指定分區器使用到的字段,對于Tuple類型數據,分區字段可以通過字段名稱指定,其他類型數據集則通過位置索引指定。
//通過數據集字段名稱指定分區字段 dataStream.partitionCustom(customPartitioner,"filed_name"); //通過數據集字段索引指定分區字段 dataStream.partitionCustom(customPartitioner,0)
到此,相信大家對“flink的Transformation數據處理方法是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。