您好,登錄后才能下訂單哦!
這篇文章主要介紹“如何把JSON文件轉化為DataFrame ”,在日常操作中,相信很多人在如何把JSON文件轉化為DataFrame 問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何把JSON文件轉化為DataFrame ”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
一:簡單了解SparkSQL。
Spark SQL 是結構化的數據處理一個Spark模塊。與基本的Spark RDD API不同,Spark SQL 所提供的接口為Spark 提供有關數據和正在執行的計算的結構的詳細信息。Spark SQL內部使用這些額外的信息來執行額外的優化。有幾種方法與Spark SQL 包括 SQL、 DataFrames API 和數據集 API 進行交互。計算結果相同的執行引擎在使用時,獨立的 API/語言使用的表達計算。這種統一意味著開發人員很容易可以提供最自然的方式來表達一個給定的轉換基于各種 Api 之間來回切換。
Spark SQL是Spark中的一個模塊,主要用于進行結構化數據的處理。它提供的最核心的編程抽象,就是DataFrame。同時Spark SQL還可以作為分布式的SQL查詢引擎。Spark SQL最重要的功能之一,就是從Hive中查詢數據。
二:簡單了解DataFrame。
DataFrame是一個以命名列方式組織的分布式數據集,等同于關系型數據庫中的一個表,也相當于R/Python中的data frames(但是進行了更多的優化)。DataFrame可以通過很多來源進行構建,包括:結構化的數據文件,Hive中的表,外部的關系型數據庫,以及RDD。
接下來是對 結構化數據集 與 非結構化數據集 的操作。
三:結構化數據集: 如何把JSON文件轉化為DataFrame
3.1.在HDFS上放置了兩個JSON文件,即
people.json, 文件內容如下:
{"id": "19","name": "berg","sex": "male","age": 19} {"id": "20","name": "cccc","sex": "female","age": 20} {"id": "21","name": "xxxx","sex": "male","age": 21} {"id": "22","name": "jjjj","sex": "female","age": 21}
student.json,文件內容如下:
{"id": "1","name": "china","sex": "female","age": 100} {"id": "19","name": "xujun","sex": "male","age": 22}
3.2 通過DataFrame的API來操作數據,熟悉下DataFrame中方法的使用:
public class SparkSqlDemo { private static String appName = "Test Spark RDD"; private static String master = "local"; public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.set("spark.testing.memory", "269522560000"); JavaSparkContext sc = new JavaSparkContext(master, appName, conf); //創建了 sqlContext的上下文,注意,它是DataFrame的起點。 SQLContext sqlContext = new SQLContext(sc); //本地的JSON文件轉化為DataFrame DataFrame df = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/people.json"); //輸出表結構 df.printSchema(); //顯示DataFrame的內容。 df.show(); //選擇name df.select(df.col("name")).show(); // 選擇所有年齡大于21歲的人,只保留name字段 df.filter(df.col("age").lt(21)).select("name").show(); // 選擇name,并把age字段自增 1 df.select(df.col("name"), df.col("age").plus(1)).show(); //按年齡分組計數: df.groupBy("age").count().show(); // 應該有一條數據記錄為 2 //把另個JSON文件轉化為DataFrame DataFrame df2 = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/student.json"); df2.show(); //表的關聯。 df.join(df2,df.col("id").equalTo(df2.col("id"))).show(); //以編程方式運行SQL: //把DataFrame對象轉化為一個虛擬的表 df.registerTempTable("people"); sqlContext.sql("select age,count(*) from people group by age").show(); System.out.println( "-------------" ); sqlContext.sql("select * from people").show(); } }
3.3 以編程方式運行 SQL 查詢并返回作為綜合結果,通過注冊表,操作sql的方式來操作數據:
public class SparkSqlDemo1 { private static String appName = "Test Spark RDD"; private static String master = "local"; public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.set("spark.testing.memory", "269522560000"); JavaSparkContext sc = new JavaSparkContext(master, appName, conf); //創建了 sqlContext的上下文,注意,它是DataFrame的起點。 SQLContext sqlContext = new SQLContext(sc); //本地的JSON文件轉化為DataFrame DataFrame df = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/people.json"); //把另一個JSON文件轉化為DataFrame DataFrame df2 = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/student.json"); //以編程方式運行SQL: //把DataFrame對象轉化為一個虛擬的表 df.registerTempTable("people"); df2.registerTempTable("student"); // 查詢虛擬表 people 中所有數據 sqlContext.sql("select * from people").show(); //查看某個字段 sqlContext.sql("select name from people ").show(); //查看多個字段 sqlContext.sql("select name,age+1 from people ").show(); //過濾某個字段的值 sqlContext.sql("select name, age from people where age>=21").show(); //count group 某個字段的值 sqlContext.sql("select age,count(*) countage from people group by age").show(); //關聯: 內聯 。 sqlContext.sql("select * from people inner join student on people.id = student.id ").show(); /* +---+---+----+----+---+---+-----+----+ |age| id|name| sex|age| id| name| sex| +---+---+----+----+---+---+-----+----+ | 19| 19|berg|male| 22| 19|xujun|male| +---+---+----+----+---+---+-----+----+ */ } }
四:非結構化數據集:
第一種方法使用反射來推斷架構 RDD 包含特定類型的對象。
這種基于反射方法導致更簡潔的代碼和工程好當您已經知道該Schema編寫Spark應用程序時。
創建 DataFrames 的第二個方法是通過允許您構建一個Schema,然后將它應用于現有 RDD 的編程接口。
雖然這種方法更為詳細,它允許您構建 DataFrames 時直到運行時才知道的列和它們的類型。
4.1 非結構化的數據集文件,user.txt,內容如下:
1,"Hadoop",20 2,"HBase", 21 3,"Zookeeper",22 4,"Hive",23 5,"Spark",24 6,"Berg",22 7,"Xujun",23
4.2 通過 class反射來注冊一張表。
Spark SQL 支持 JavaBeans RDD 自動轉換分布式數據集。BeanInfo,使用反射來獲取定義表的架構。目前,Spark SQL 不支持包含嵌套的 JavaBeans 或包含復雜的類型,例如列表或數組。您可以通過創建一個類,實現可序列化并有 getter 和 setter 方法的所有其字段創建 JavaBean。
public class SparkSqlDemo2 { private static String appName = "Test Spark RDD"; private static String master = "local"; public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.set("spark.testing.memory", "269522560000"); JavaSparkContext sc = new JavaSparkContext(master, appName, conf); //創建了 sqlContext的上下文,注意,它是DataFrame的起點。 SQLContext sqlContext = new SQLContext(sc); //把加載的文本文件 并 每行轉換 JavaBean JavaRDD<String> rdd = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/user.txt"); JavaRDD<User> userRDD = rdd.map( new Function<String, User>() { private static final long serialVersionUID = 1L; public User call(String line) throws Exception { String[] parts = line.split(","); User user = new User(); user.setId(Integer.parseInt(parts[0].trim())); user.setName(parts[1].trim()); user.setAge(Integer.parseInt(parts[2].trim())); return user; } }); // collect 屬于行動算子Action 提交作業并觸發運算。 List<User> list = userRDD.collect(); for (User user : list) { System.out.println( user ); } //通過 class 反射注冊一張表 DataFrame df = sqlContext.createDataFrame(userRDD, User.class); df.registerTempTable("user"); DataFrame df1 = sqlContext.sql("SELECT id,name,age FROM user WHERE age >= 21 AND age <= 23"); // 通過sql 查詢的結果是 DataFrame 即df1 它還是支持 RDD的所有正常操作。 df1.show(); //并且 結果中的行列可以按序號訪問。 List<String> listString = df1.javaRDD().map(new Function<Row, String>() { private static final long serialVersionUID = 1L; public String call(Row row) { return "Id: " + row.getInt(0) + ", Name: "+row.getString(1) + ", Age: " + row.getInt(2); } }).collect(); for (String string : listString) { System.out.println( string ); } } }
4.3 以編程方式指定 schema, 通過字段反射來映射注冊臨時表
在某些情況下不能提前定義 JavaBean 類 (例如,記錄的結構編碼的字符串,或將解析文本數據集和領域預計將以不同的方式為不同的用戶),
三個步驟,可以以編程方式創建分布式數據集。
1. 從原始 RDD; 創建行 RDD
2. 創建由 StructType 中 RDD 在步驟 1 中創建的行結構相匹配的schema。
3.適用于行 RDD 通過 createDataFrame 方法由 SQLContext 提供的schema。
public class SparkSqlDemo3 { private static String appName = "Test Spark RDD"; private static String master = "local"; public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.set("spark.testing.memory", "269522560000"); JavaSparkContext sc = new JavaSparkContext(master, appName, conf); //創建了 sqlContext的上下文,注意,它是DataFrame的起點。 SQLContext sqlContext = new SQLContext(sc); //把加載的文本文件 并 每行轉換 JavaBean JavaRDD<String> rdd = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/user.txt"); // schema 以字符串形式編碼 String schemaString = "id name age"; // 基于 字符串的schema生成 schema。 List<StructField> fields = new ArrayList<StructField>(); String[] str = schemaString.split(" "); fields.add(DataTypes.createStructField(str[0], DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField(str[1], DataTypes.StringType, true)); fields.add(DataTypes.createStructField(str[2], DataTypes.IntegerType, true)); StructType schema = DataTypes.createStructType(fields); // id name age JavaRDD<Row> rowRDD = rdd.map( new Function<String, Row>() { private static final long serialVersionUID = 1L; public Row call(String record) throws Exception { String[] fields = record.split(","); return RowFactory.create(Integer.parseInt(fields[0].trim()), fields[1].trim(),Integer.parseInt(fields[2].trim())); } }); List<Row> list = rowRDD.collect(); for (Row row : list) { System.out.println( row.getInt(0) + "\t"+ row.getString(1) + "\t"+row.getInt(2) ); } //對RDD應用schema 并注冊一張表: DataFrame df = sqlContext.createDataFrame(rowRDD, schema); System.out.println( "df : " + df); df.registerTempTable("user"); df.show(); DataFrame df2 = sqlContext.sql("SELECT id,name,age FROM user WHERE age >= 21 AND age <= 23"); // 通過sql 查詢的結果是 DataFrame 即df1 它還是支持 RDD的所有正常操作。 df2.show(); // 并且 結果中的行列可以按序號訪問。 List<String> listString = df2.javaRDD().map(new Function<Row, String>() { private static final long serialVersionUID = 1L; public String call(Row row) { System.out.println( row ); return "Id: " + row.getInt(0) + ", Name: "+row.getString(1) + ", Age: " + row.getInt(2); } }).collect(); for (String string : listString) { System.out.println( string ); } } }
注意如果將上述代碼段中的一段,即:
String[] str = schemaString.split(" ");
fields.add(DataTypes.createStructField(str[0], DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField(str[1], DataTypes.StringType, true));
fields.add(DataTypes.createStructField(str[2], DataTypes.IntegerType, true));
改為下面這段代碼:
for (String fieldName: schemaString.split(" ")) {
fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
將會出現以下錯誤:
Caused by: scala.MatchError: 1 (of class java.lang.Integer)
到此,關于“如何把JSON文件轉化為DataFrame ”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。