您好,登錄后才能下訂單哦!
這篇文章主要介紹“Flink DataSet算子的作用是什么”,在日常操作中,相信很多人在Flink DataSet算子的作用是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink DataSet算子的作用是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
Flink為了能夠處理有邊界的數據集和無邊界的數據集,提供了對應的DataSet API和DataStream API。我們可以開發對應的Java程序或者Scala程序來完成相應的功能。下面舉例了一些DataSet API中的基本的算子。
下面我們通過具體的代碼來為大家演示每個算子的作用。
//獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<String> data = new ArrayList<String>(); data.add("I love Beijing"); data.add("I love China"); data.add("Beijing is the capital of China"); DataSource<String> text = env.fromCollection(data); DataSet<List<String>> mapData = text.map(new MapFunction<String, List<String>>() { public List<String> map(String data) throws Exception { String[] words = data.split(" "); //創建一個List List<String> result = new ArrayList<String>(); for(String w:words){ result.add(w); } return result; } }); mapData.print(); System.out.println("*****************************************"); DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String data, Collector<String> collection) throws Exception { String[] words = data.split(" "); for(String w:words){ collection.collect(w); } } }); flatMapData.print(); System.out.println("*****************************************"); /* new MapPartitionFunction<String, String> 第一個String:表示分區中的數據元素類型 第二個String:表示處理后的數據元素類型*/ DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() { public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception { //針對分區進行操作的好處是:比如要進行數據庫的操作,一個分區只需要創建一個Connection //values中保存了一個分區的數據 Iterator<String> it = values.iterator(); while (it.hasNext()) { String next = it.next(); String[] split = next.split(" "); for (String word : split) { out.collect(word); } } //關閉鏈接 } }); mapPartitionData.print();
//獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<String> data = new ArrayList<String>(); data.add("I love Beijing"); data.add("I love China"); data.add("Beijing is the capital of China"); DataSource<String> text = env.fromCollection(data); DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String data, Collector<String> collection) throws Exception { String[] words = data.split(" "); for(String w:words){ collection.collect(w); } } }); //去掉重復的單詞 flatMapData.distinct().print(); System.out.println("*********************"); //選出長度大于3的單詞 flatMapData.filter(new FilterFunction<String>() { public boolean filter(String word) throws Exception { int length = word.length(); return length>3?true:false; } }).print();
//獲取運行的環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //創建第一張表:用戶ID 姓名 ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>(); data1.add(new Tuple2(1,"Tom")); data1.add(new Tuple2(2,"Mike")); data1.add(new Tuple2(3,"Mary")); data1.add(new Tuple2(4,"Jone")); //創建第二張表:用戶ID 所在的城市 ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>(); data2.add(new Tuple2(1,"北京")); data2.add(new Tuple2(2,"上海")); data2.add(new Tuple2(3,"廣州")); data2.add(new Tuple2(4,"重慶")); //實現join的多表查詢:用戶ID 姓名 所在的程序 DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1); DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2); table1.join(table2).where(0).equalTo(0) /*第一個Tuple2<Integer,String>:表示第一張表 * 第二個Tuple2<Integer,String>:表示第二張表 * Tuple3<Integer,String, String>:多表join連接查詢后的返回結果 */ .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String, String>>() { public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2) throws Exception { return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1); } }).print();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //創建第一張表:用戶ID 姓名 ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>(); data1.add(new Tuple2(1,"Tom")); data1.add(new Tuple2(2,"Mike")); data1.add(new Tuple2(3,"Mary")); data1.add(new Tuple2(4,"Jone")); //創建第二張表:用戶ID 所在的城市 ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>(); data2.add(new Tuple2(1,"北京")); data2.add(new Tuple2(2,"上海")); data2.add(new Tuple2(3,"廣州")); data2.add(new Tuple2(4,"重慶")); //實現join的多表查詢:用戶ID 姓名 所在的程序 DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1); DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2); //生成笛卡爾積 table1.cross(table2).print();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //這里的數據是:員工姓名、薪水、部門號 DataSet<Tuple3<String, Integer,Integer>> grade = env.fromElements(new Tuple3<String, Integer,Integer>("Tom",1000,10), new Tuple3<String, Integer,Integer>("Mary",1500,20), new Tuple3<String, Integer,Integer>("Mike",1200,30), new Tuple3<String, Integer,Integer>("Jerry",2000,10)); //按照插入順序取前三條記錄 grade.first(3).print(); System.out.println("**********************"); //先按照部門號排序,在按照薪水排序 grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print(); System.out.println("**********************"); //按照部門號分組,求每組的第一條記錄 grade.groupBy(2).first(1).print();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //創建第一張表:用戶ID 姓名 ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>(); data1.add(new Tuple2(1,"Tom")); data1.add(new Tuple2(3,"Mary")); data1.add(new Tuple2(4,"Jone")); //創建第二張表:用戶ID 所在的城市 ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>(); data2.add(new Tuple2(1,"北京")); data2.add(new Tuple2(2,"上海")); data2.add(new Tuple2(4,"重慶")); //實現join的多表查詢:用戶ID 姓名 所在的程序 DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1); DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2); //左外連接 table1.leftOuterJoin(table2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2) throws Exception { // 左外連接表示等號左邊的信息會被包含 if(table2 == null){ return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null); }else{ return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1); } } }).print(); System.out.println("***********************************"); //右外連接 table1.rightOuterJoin(table2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2) throws Exception { //右外鏈接表示等號右邊的表的信息會被包含 if(table1 == null){ return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1); }else{ return new Tuple3<Integer, String, String>(table2.f0,table1.f1,table2.f1); } } }).print(); System.out.println("***********************************"); //全外連接 table1.fullOuterJoin(table2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2) throws Exception { if(table1 == null){ return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1); }else if(table2 == null){ return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null); }else{ return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1); } } }).print();
到此,關于“Flink DataSet算子的作用是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。