91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink DataSet算子的作用是什么

發布時間:2021-12-31 13:43:52 來源:億速云 閱讀:150 作者:iii 欄目:大數據

這篇文章主要介紹“Flink DataSet算子的作用是什么”,在日常操作中,相信很多人在Flink DataSet算子的作用是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink DataSet算子的作用是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

Flink為了能夠處理有邊界的數據集和無邊界的數據集,提供了對應的DataSet API和DataStream API。我們可以開發對應的Java程序或者Scala程序來完成相應的功能。下面舉例了一些DataSet API中的基本的算子。

Flink DataSet算子的作用是什么

下面我們通過具體的代碼來為大家演示每個算子的作用。

1、Map、FlatMap與MapPartition

//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ArrayList<String> data = new ArrayList<String>();
data.add("I love Beijing");
data.add("I love China");
data.add("Beijing is the capital of China");
DataSource<String> text = env.fromCollection(data);

DataSet<List<String>> mapData = text.map(new MapFunction<String, List<String>>() {

	public List<String> map(String data) throws Exception {
		String[] words = data.split(" ");
		
		//創建一個List
		List<String> result = new ArrayList<String>();
		for(String w:words){
			result.add(w);
		}
		return result;
	}
});
mapData.print();
System.out.println("*****************************************");

DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {

	public void flatMap(String data, Collector<String> collection) throws Exception {
		String[] words = data.split(" ");
		for(String w:words){
			collection.collect(w);
		}
	}
});
flatMapData.print();

System.out.println("*****************************************");
/*	new MapPartitionFunction<String, String>
	第一個String:表示分區中的數據元素類型
	第二個String:表示處理后的數據元素類型*/
DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() {

	public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {
		//針對分區進行操作的好處是:比如要進行數據庫的操作,一個分區只需要創建一個Connection
		//values中保存了一個分區的數據
		 Iterator<String> it = values.iterator();
		while (it.hasNext()) {
			String next = it.next();
			String[] split = next.split(" ");
			for (String word : split) {
				out.collect(word);
			}
		}
		//關閉鏈接
	}
});
mapPartitionData.print();

2、Filter與Distinct

//獲取運行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ArrayList<String> data = new ArrayList<String>();
data.add("I love Beijing");
data.add("I love China");
data.add("Beijing is the capital of China");
DataSource<String> text = env.fromCollection(data);

DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {

	public void flatMap(String data, Collector<String> collection) throws Exception {
		String[] words = data.split(" ");
		for(String w:words){
			collection.collect(w);
		}
	}
});

//去掉重復的單詞
flatMapData.distinct().print();
System.out.println("*********************");

//選出長度大于3的單詞
flatMapData.filter(new FilterFunction<String>() {
	
	public boolean filter(String word) throws Exception {
		int length = word.length();
		return length>3?true:false;
	}
}).print();

3、Join操作

//獲取運行的環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//創建第一張表:用戶ID  姓名
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(2,"Mike"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));
//創建第二張表:用戶ID 所在的城市
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1,"北京"));
data2.add(new Tuple2(2,"上海"));
data2.add(new Tuple2(3,"廣州"));
data2.add(new Tuple2(4,"重慶"));

//實現join的多表查詢:用戶ID  姓名  所在的程序
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);

table1.join(table2).where(0).equalTo(0)
/*第一個Tuple2<Integer,String>:表示第一張表
 * 第二個Tuple2<Integer,String>:表示第二張表
 * Tuple3<Integer,String, String>:多表join連接查詢后的返回結果   */		                   
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String, String>>() {
	public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
			Tuple2<Integer, String> table2) throws Exception {
		return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
	} }).print();

4、笛卡爾積

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//創建第一張表:用戶ID  姓名
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(2,"Mike"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));

//創建第二張表:用戶ID 所在的城市
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1,"北京"));
data2.add(new Tuple2(2,"上海"));
data2.add(new Tuple2(3,"廣州"));
data2.add(new Tuple2(4,"重慶"));

//實現join的多表查詢:用戶ID  姓名  所在的程序
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);

//生成笛卡爾積
table1.cross(table2).print();

5、First-N

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//這里的數據是:員工姓名、薪水、部門號
DataSet<Tuple3<String, Integer,Integer>> grade = 
		env.fromElements(new Tuple3<String, Integer,Integer>("Tom",1000,10),
						 new Tuple3<String, Integer,Integer>("Mary",1500,20),
						 new Tuple3<String, Integer,Integer>("Mike",1200,30),
						 new Tuple3<String, Integer,Integer>("Jerry",2000,10));

//按照插入順序取前三條記錄
grade.first(3).print();
System.out.println("**********************");

//先按照部門號排序,在按照薪水排序
grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();
System.out.println("**********************");

//按照部門號分組,求每組的第一條記錄
grade.groupBy(2).first(1).print();

6、外鏈接操作

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//創建第一張表:用戶ID  姓名
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));

//創建第二張表:用戶ID 所在的城市
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1,"北京"));
data2.add(new Tuple2(2,"上海"));
data2.add(new Tuple2(4,"重慶"));

//實現join的多表查詢:用戶ID  姓名  所在的程序
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);

//左外連接
table1.leftOuterJoin(table2).where(0).equalTo(0)
	  .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {

		public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
				Tuple2<Integer, String> table2) throws Exception {
			// 左外連接表示等號左邊的信息會被包含
			if(table2 == null){
				return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
			}else{
				return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
			}
		}
	}).print();

System.out.println("***********************************");
//右外連接
table1.rightOuterJoin(table2).where(0).equalTo(0)
	  .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {

		public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
				Tuple2<Integer, String> table2) throws Exception {
			//右外鏈接表示等號右邊的表的信息會被包含
			if(table1 == null){
				return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
			}else{
				return new Tuple3<Integer, String, String>(table2.f0,table1.f1,table2.f1);
			}
		}
	}).print();

System.out.println("***********************************");

//全外連接
table1.fullOuterJoin(table2).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {

	public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2)
			throws Exception {
		if(table1 == null){
			return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
		}else if(table2 == null){
			return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
		}else{
			return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
		}
	}
	
}).print();

到此,關于“Flink DataSet算子的作用是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

来宾市| 闽侯县| 华阴市| 云林县| 翁牛特旗| 静海县| 喀什市| 屏东市| 南郑县| 湘潭市| 保德县| 金堂县| 文水县| 梧州市| 伽师县| 泗洪县| 婺源县| 牟定县| 同心县| 木里| 公主岭市| 郑州市| 民权县| 信宜市| 襄樊市| 微山县| 林口县| 二手房| 景宁| 峨眉山市| 万盛区| 黄山市| 门源| 开平市| 垦利县| 弋阳县| 芮城县| 广南县| 白城市| 贵南县| 景德镇市|