您好,登錄后才能下訂單哦!
Spark2.2.0中RDD轉DataFrame的方式是怎樣的,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
Spark SQL將現有的RDDs轉換為數據集。
方法:使用反射來推斷包含特定對象類型的RDD的模式。這種基于反射的方法使代碼更加簡潔,并且當您在編寫Spark應用程序時已經了解了模式時,它可以很好地工作。
第一種方法代碼實例java版本實現:
數據準備studentDatatxt
1001,20,zhangsan1002,17,lisi1003,24,wangwu1004,16,zhaogang
本地模式代碼實現:
package com.unicom.ljs.spark220.study;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-01-20 08:58
* @version: v1.0
* @description: com.unicom.ljs.spark220.study
*/
public class RDD2DataFrameReflect {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD2DataFrameReflect");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext=new SQLContext(sc);
JavaRDD<String> lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\studentData.txt");
JavaRDD<Student2> studentRDD = lines.map(new Function<String, Student2>() {
@Override
public Student2 call(String line) throws Exception {
String[] split = line.split(",");
Student2 student=new Student2();
student.setId(Integer.valueOf(split[0]));
student.setAge(Integer.valueOf(split[1]));
student.setName(split[2]);
return student;
}
});
//使用反射方式將RDD轉換成dataFrame
//將Student.calss傳遞進去,其實就是利用反射的方式來創建DataFrame
Dataset<Row> dataFrame = sqlContext.createDataFrame(studentRDD, Student2.class);
//拿到DataFrame之后將其注冊為臨時表,然后針對其中的數據執行SQL語句
dataFrame.registerTempTable("studentTable");
//針對student臨時表,執行sql語句查詢年齡小于18歲的學生,
/*DataFrame rowDF */
Dataset<Row> dataset = sqlContext.sql("select * from studentTable where age < 18");
JavaRDD<Row> rowJavaRDD = dataset.toJavaRDD();
JavaRDD<Student2> ageRDD = rowJavaRDD.map(new Function<Row, Student2>() {
@Override
public Student2 call(Row row) throws Exception {
Student2 student = new Student2();
student.setId(row.getInt(0));
student.setAge(row.getInt(1));
student.setName(row.getString(2));
return student;
}
});
ageRDD.foreach(new VoidFunction<Student2>() {
@Override
public void call(Student2 student) throws Exception {
System.out.println(student.toString());
}
});
}
}
Student2類:
package com.unicom.ljs.spark220.study;
import java.io.Serializable;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-01-20 08:57
* @version: v1.0
* @description: com.unicom.ljs.spark220.study
*/
public class Student2 implements Serializable {
int id;
int age;
String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Student2{" +
"id=" + id +
", age=" + age +
", name='" + name + '\'' +
'}';
}
}
pom.xml關鍵依賴:
<spark.version>2.2.0</spark.version>
<scala.version>2.11.8</scala.version>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version></dependency>
看完上述內容,你們掌握Spark2.2.0中RDD轉DataFrame的方式是怎樣的的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。