您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Spark SQL編程的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
#Spark SQL 編程指南#
##簡介## Spark SQL支持在Spark中執行SQL,或者HiveQL的關系查詢表達式。它的核心組件是一個新增的RDD類型JavaSchemaRDD。JavaSchemaRDD由Row對象和表述這個行的每一列的數據類型的schema組成。一個JavaSchemaRDD類似于傳統關系數據庫的一個表。JavaSchemaRDD可以通過一個已存在的RDD,Parquet文件,JSON數據集,或者通過運行HiveSQL獲得存儲在Apache Hive上的數據創建。
Spark SQL目前是一個alpha組件。盡管我們會盡量減少API變化,但是一些API任然后再以后的發布中改變。
##入門## 在Spark中,所有關系函數功能的入口點是JavaSQLContext類。或者他的子類。要創建一個基本的JavaSQLContext,所有你需要的只是一個JavaSparkContext。
JavaSparkContext sc = ...; // An existing JavaSparkContext. JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
##數據源## Spark SQL支持通過JavaSchemaRDD接口操作各種各樣的數據源。一單一個數據集被加載,它可以被注冊成一個表,甚至和來自其他源的數據連接。
###RDDs### Spark SQL支持的表的其中一個類型是由JavaBeans的RDD。BeanInfo定義了這個表的schema。現在 ,Spark SQL 不支持包括嵌套或者復雜類型例如Lists或者Arrays的JavaBeans。你可以通過創建一個實現了Serializable并且它的所有字段都有getters和setters方法的類類創建一個JavaBeans。
public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }
一個schema可以被應用在一個已存在的RDD上,通過調用applySchema并且提供這個JavaBean的類對象。
// sc is an existing JavaSparkContext. JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc) // Load a text file and convert each line to a JavaBean. JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); // Apply a schema to an RDD of JavaBeans and register it as a table. JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class); schemaPeople.registerAsTable("people"); // SQL can be run over RDDs that have been registered as tables. JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers.map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect();
注意,Spark SQL目前使用一個非常簡單的SQL解析器。用戶如果想獲得一個更加完整的SQL方言,應該看看HiveContext提供的HiveQL支持。
###Parquet Files### Parquet是一個columnar格式,并且被許多其他數據處理系統支持。Spark SQL對讀寫Parquet文件提供支持,并且自動保存原始數據的Schema。通過下面的例子使用數據:
// sqlContext from the previous example is used in this example. JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example. // JavaSchemaRDDs can be saved as Parquet files, maintaining the schema information. schemaPeople.saveAsParquetFile("people.parquet"); // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a JavaSchemaRDD. JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet"); //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile"); JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); List<String> teenagerNames = teenagers.map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect();
###JSON Datasets### Spark SQL可以自動推斷一個JSON數據集的schema,并加載成一個JavaSchemaRDD。這個轉換可以通過JavaSQLContext中的兩個方法中的一個完成:
jsonFile -從一個目錄下的文件中加載數據,這個文件中的每一行都是一個JSON對象。
jsonRdd -從一個已存在的RDD加載數據,這個RDD中的每一個元素是一個包含一個JSON對象的String。
// sc is an existing JavaSparkContext. JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc); // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. String path = "examples/src/main/resources/people.json"; // Create a JavaSchemaRDD from the file(s) pointed to by path JavaSchemaRDD people = sqlContext.jsonFile(path); // The inferred schema can be visualized using the printSchema() method. people.printSchema(); // root // |-- age: IntegerType // |-- name: StringType // Register this JavaSchemaRDD as a table. people.registerAsTable("people"); // SQL statements can be run by using the sql methods provided by sqlContext. JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); // Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. List<String> jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData); JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);
###Hive Tables### Spark SQL也支持讀和寫存儲在apache Hive中的數據。然而,由于Hive有一個非常大的依賴,他沒有在Spark默認寶中包括。為了使用Hive,你必須運行‘SPARK_HIVE=true sbt/sbt assembly/assembly'(或者對Maven使用 -Phive)。這個命令構建一個包含Hive的assembly。注意,這個Hive assembly 必須放在所有的工作節點上,因為它們需要訪問Hive的序列化和方序列化包(SerDes),以此訪問存儲在Hive中的數據。
可以通過conf目錄下的hive-site.xml文件完成Hive配置 。
要和Hive配合工作,你需要構造一個JavaHiveContext,它繼承了JavaSQLContext,并且添加了發現MetaStore中的表和使用HiveQL編寫查詢的功能。此外,除了sql方法,JavaHiveContext方法還提供了一個hql方法,它允許查詢使用HiveQL表達。
##Writing Language-Integrated Relational Queries## Language-Integrated查詢目前只在Scala中被支持。
Spark SQL同樣支持使用領域特定的語言來編寫查詢。再次,使用上面例子中的數據:
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Importing the SQL context gives access to all the public SQL functions and implicit conversions. import sqlContext._ val people: RDD[Person] = ... // An RDD of case class objects, from the first example. // The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19' val teenagers = people.where('age >= 10).where('age <= 19).select('name) teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
DSL使用Scala中得到標記來表示基礎表中的表,他們使用一個前綴’標識。隱式轉換這些標記為被SQL 執行引擎評估的表達式。支持這些功能的完成列表可以再ScalaDoc找到。
關于“Spark SQL編程的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。