91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java和scala實現 Spark RDD轉換成DataFrame的兩種方法小結

發布時間:2020-10-18 23:12:58 來源:腳本之家 閱讀:141 作者:黑白調92 欄目:編程語言

一:準備數據源

在項目下新建一個student.txt文件,里面的內容為:

1,zhangsan,20 
2,lisi,21 
3,wanger,19 
4,fangliu,18 

二:實現

Java版:

1.首先新建一個student的Bean對象,實現序列化和toString()方法,具體代碼如下:

package com.cxd.sql;
import java.io.Serializable;
@SuppressWarnings("serial")
public class Student implements Serializable {
 String sid;
 String sname;
 int sage;
 public String getSid() {
  return sid;
 }
 public void setSid(String sid) {
  this.sid = sid;
 }
 public String getSname() {
  return sname;
 }
 public void setSname(String sname) {
  this.sname = sname;
 }
 public int getSage() {
  return sage;
 }
 public void setSage(int sage) {
  this.sage = sage;
 }
 @Override
 public String toString() {
  return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
 }
 
}
		

2.轉換,具體代碼如下

package com.cxd.sql;
import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class TxtToParquetDemo {
 public static void main(String[] args) {
  
  SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
  SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
  reflectTransform(spark);//Java反射
  dynamicTransform(spark);//動態轉換
 }
 
 /**
  * 通過Java反射轉換
  * @param spark
  */
 private static void reflectTransform(SparkSession spark)
 {
  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
  
  JavaRDD<Student> rowRDD = source.map(line -> {
   String parts[] = line.split(",");
   Student stu = new Student();
   stu.setSid(parts[0]);
   stu.setSname(parts[1]);
   stu.setSage(Integer.valueOf(parts[2]));
   return stu;
  });
  
  Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
  df.select("sid", "sname", "sage").
  coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
 }
 /**
  * 動態轉換
  * @param spark
  */
 private static void dynamicTransform(SparkSession spark)
 {
  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
  
  JavaRDD<Row> rowRDD = source.map( line -> {
   String[] parts = line.split(",");
   String sid = parts[0];
   String sname = parts[1];
   int sage = Integer.parseInt(parts[2]);
   
   return RowFactory.create(
     sid,
     sname,
     sage
     );
  });
  
  ArrayList<StructField> fields = new ArrayList<StructField>();
  StructField field = null;
  field = DataTypes.createStructField("sid", DataTypes.StringType, true);
  fields.add(field);
  field = DataTypes.createStructField("sname", DataTypes.StringType, true);
  fields.add(field);
  field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
  fields.add(field);
  
  StructType schema = DataTypes.createStructType(fields);
  
  Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
  df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
  
  
 }
 
}

scala版本:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
object RDD2Dataset {
 
 case class Student(id:Int,name:String,age:Int)
 def main(args:Array[String])
 {
 
 val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
 import spark.implicits._
 reflectCreate(spark)
 dynamicCreate(spark)
 }
 
 /**
	 * 通過Java反射轉換
	 * @param spark
	 */
 private def reflectCreate(spark:SparkSession):Unit={
 import spark.implicits._
 val stuRDD=spark.sparkContext.textFile("student2.txt")
 //toDF()為隱式轉換
 val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
 //stuDf.select("id","name","age").write.text("result") //對寫入文件指定列名
 stuDf.printSchema()
 stuDf.createOrReplaceTempView("student")
 val nameDf=spark.sql("select name from student where age<20")
 //nameDf.write.text("result") //將查詢結果寫入一個文件
 nameDf.show()
 }
 
 /**
	 * 動態轉換
	 * @param spark
	 */
 private def dynamicCreate(spark:SparkSession):Unit={
 val stuRDD=spark.sparkContext.textFile("student.txt")
 import spark.implicits._
 val schemaString="id,name,age"
 val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
 val schema=StructType(fields)
 val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
 val stuDf=spark.createDataFrame(rowRDD, schema)
  stuDf.printSchema()
 val tmpView=stuDf.createOrReplaceTempView("student")
 val nameDf=spark.sql("select name from student where age<20")
 //nameDf.write.text("result") //將查詢結果寫入一個文件
 nameDf.show()
 }
}

注:

1.上面代碼全都已經測試通過,測試的環境為spark2.1.0,jdk1.8。

2.此代碼不適用于spark2.0以前的版本。

以上這篇Java和scala實現 Spark RDD轉換成DataFrame的兩種方法小結就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

富阳市| 绥江县| 泾源县| 洪泽县| 唐河县| 宜都市| 池州市| 普兰店市| 确山县| 蕲春县| 友谊县| 繁峙县| 名山县| 永和县| 平南县| 白水县| 芜湖市| 广州市| 河北省| 铁岭市| 新建县| 昭平县| 自贡市| 永德县| 剑河县| 萨嘎县| 察雅县| 小金县| 浦北县| 禹城市| 长兴县| 乌恰县| 卓尼县| 江口县| 上蔡县| 巨鹿县| 望谟县| 商河县| 搜索| 六盘水市| 泰顺县|