您好,登錄后才能下訂單哦!
小編給大家分享一下Spark中Spark SQL怎么用,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
1. Spark SQL是什么?
處理結構化數據的一個spark的模塊
它提供了一個編程抽象叫做DataFrame并且作為分布式SQL查詢引擎的作用
2. Spark SQL的特點
多語言的接口支持(java python scala)
統一的數據訪問
完全兼容hive
支持標準的連接
3. 為什么學習SparkSQL?
我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然后提交到集群上執行,大大簡化了編寫MapReduce的程序的復雜性,由于MapReduce這種計算模型執行效率比較慢。所有Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然后提交到集群執行,執行效率非常快!
4. DataFrame(數據框)
與RDD類似,DataFrame也是一個分布式數據容器
然而DataFrame更像傳統數據庫的二維表格,除了數據以外,還記錄數據的結構信息,即schema
DataFrame其實就是帶有schema信息的RDD
5. SparkSQL1.x的API編程
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency>
5.1 使用sqlContext創建DataFrame(測試用)
object Ops3 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"),Person("admin2", 16, "man"),Person("admin3", 18, "man"))) val df1: DataFrame = sqlContext.createDataFrame(rdd1) df1.show(1) } } case class Person(name: String, age: Int, sex: String);
5.2 使用sqlContxet中提供的隱式轉換函數(測試用)
import org.apache.spark val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"), Person("admin2", 16, "man"), Person("admin3", 18, "man"))) import sqlContext.implicits._ val df1: DataFrame = rdd1.toDF df1.show() 5.3 使用SqlContext創建DataFrame(常用) val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/") val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val rowRDD: RDD[Row] = linesRDD.map(line => { val lineSplit: Array[String] = line.split(",") Row(lineSplit(0), lineSplit(1).toInt, lineSplit(2)) }) val rowDF: DataFrame = sqlContext.createDataFrame(rowRDD, schema) rowDF.show()
6. 使用新版本的2.x的API
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]") val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate() val sc = sparkSession.sparkContext val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/") //數據清洗 val rowRDD: RDD[Row] = linesRDD.map(line => { val splits: Array[String] = line.split(",") Row(splits(0), splits(1).toInt, splits(2)) }) val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema) df.createOrReplaceTempView("p1") val df2 = sparkSession.sql("select * from p1") df2.show()
7. 操作SparkSQL的方式
7.1 使用SQL語句的方式對DataFrame進行操作
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]") val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()//Spark2.x新的API相當于Spark1.x的SQLContext val sc = sparkSession.sparkContext val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/") //數據清洗 val rowRDD: RDD[Row] = linesRDD.map(line => { val splits: Array[String] = line.split(",") Row(splits(0), splits(1).toInt, splits(2)) }) val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema) df.createOrReplaceTempView("p1")//這是Sprk2.x新的API 相當于Spark1.x的registTempTable() val df2 = sparkSession.sql("select * from p1") df2.show()
7.2 使用DSL語句的方式對DataFrame進行操作
DSL(domain specific language ) 特定領域語言 val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]") val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate() val sc = sparkSession.sparkContext val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/") //數據清洗 val rowRDD: RDD[Row] = linesRDD.map(line => { val splits: Array[String] = line.split(",") Row(splits(0), splits(1).toInt, splits(2)) }) val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema) import sparkSession.implicits._ val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc) df.show()
8. SparkSQL的輸出
8.1 寫出到JSON文件
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]") val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate() val sc = sparkSession.sparkContext val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest") //數據清洗 val rowRDD: RDD[Row] = linesRDD.map(line => { val splits: Array[String] = line.split(",") Row(splits(0), splits(1).toInt, splits(2)) }) val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema) import sparkSession.implicits._ val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc) df.write.json("hdfs://uplooking02:8020/sparktest1")
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]") val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate() val sc = sparkSession.sparkContext val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest") //數據清洗 val rowRDD: RDD[Row] = linesRDD.map(line => { val splits: Array[String] = line.split(",") Row(splits(0), splits(1).toInt, splits(2)) }) val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema) import sparkSession.implicits._ val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc) val url = "jdbc:mysql://localhost:3306/test" //表會自動創建 val tbName = "person1"; val prop = new Properties() prop.put("user", "root") prop.put("password", "root") //SaveMode 默認為ErrorIfExists df.write.mode(SaveMode.Append).jdbc(url, tbName, prop)
以上是“Spark中Spark SQL怎么用”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。