您好,登錄后才能下訂單哦!
Spark SQL 是 Spark 用來處理結構化數據(結構化數據可以來自外部結構化數據源也可以通 過 RDD 獲取)的一個模塊,它提供了一個編程抽象叫做 DataFrame 并且作為分布式 SQL 查 詢引擎的作用。
外部的結構化數據源包括 JSON、Parquet(默認)、RMDBS、Hive 等。當前 Spark SQL 使用 Catalyst 優化器來對 SQL 進行優化,從而得到更加高效的執行方案。并且可以將結果存儲到外部系統。
- 容易整合
- 統一的數據訪問方式
- 兼容hive
- 標準的數據連接
- spark sql 的前身是shark。但是spark sql拋棄了原有shark的代碼,汲取了shark的一些優點,如:列存儲(In-Memory Columnar Storage)、Hive 兼容性等,重新開發 SparkSQL。
- spark -1.1 2014 年 9 月 11 日,發布 Spark1.1.0。Spark 從 1.0 開始引入 SparkSQL(Shark 不再支持升級與維護)。Spark1.1.0 變化較大是 SparkSQL 和 MLlib
- spark -1.3 增加了dataframe新
- spark -1.4 增加了窗口分析函數
- spark - 1.5 鎢絲計劃。Hive 中有 UDF 與 UDAF,Spark 中對 UDF 支持較早
- spark 1.6 執行的 sql 中可以增加"--"注釋,Spark-1.5/1.6 的新特性,引入 DataSet 的概念
- spark 2.x SparkSQL+DataFrame+DataSet(正式版本),Structured Streaming(DataSet),引入 SparkSession 統一了 RDD,DataFrame,DataSet 的編程入口
SparkSession 是 Spark-2.0 引如的新概念。SparkSession 為用戶提供了統一的切入點,來讓用戶學習 Spark 的各項功能。
隨著 DataSet 和 DataFrame 的 API 逐漸成為標準的 API,SparkSession 作為 DataSet 和 DataFrame API 的切入點,SparkSession 封裝了 SparkConf、SparkContext 和 SQLContext。為了向后兼容,SQLContext 和 HiveContext 也被保存下來。
特點:
- 為用戶提供一個統一的切入點使用 Spark 各項功能
- 允許用戶通過它調用 DataFrame 和 Dataset 相關 API 來編寫程序
- 減少了用戶需要了解的一些概念,可以很容易的與 Spark 進行交互
- 與 Spark 交互之時不需要顯示的創建 SparkConf、SparkContext 以及 SQlContext,這些對 象已經封閉在 SparkSession 中
- SparkSession 提供對 Hive 特征的內部支持:用 HiveQL 寫 SQL 語句,訪問 Hive UDFs,從 Hive 表中讀取數據
SparkSession的創建:
在spark-shell中SparkSession 會被自動初始化一個對象叫做 spark,為了向后兼容,Spark-Shell 還提供了一個 SparkContext 的初始化對象,方便用戶操作:
在代碼開發的時候創建:
val conf = new SparkConf()
val spark: SparkSession = SparkSession.builder()
.appName("_01spark_sql")
.config(conf)
.getOrCreate()
這里主要說的是RDD的局限性:
- RDD是不支持spark-sql的
- RDD 僅表示數據集,RDD 沒有元數據,也就是說沒有字段語義定義
- RDD 需要用戶自己優化程序,對程序員要求較高
- 從不同數據源讀取數據相對困難,讀取到不同格式的數據都必須用戶自己定義轉換方式 合并多個數據源中的數據也較困難
DataFrame 被稱為 SchemaRDD。以行為單位構成的分布式數據集合,按照列賦予不同的名稱。對 select、fileter、aggregation 和 sort 等操作符的抽象。其中 Schema 是就是元數據,是語義描述信息。DataFrame是分布式的Row對象的集合.
DataFrame = RDD+Schema = SchemaRDD
優勢:
- DataFrame 是一種特殊類型的 Dataset,DataSet[Row] = DataFrame
- DataFrame 自帶優化器 Catalyst,可以自動優化程序
- DataFrame 提供了一整套的 Data Source API
特點:
- 支持 單機 KB 級到集群 PB 級的數據處理
- 支持多種數據格式和存儲系統
- 通過 Spark SQL Catalyst 優化器可以進行高效的代碼生成和優化
- 能夠無縫集成所有的大數據處理工具
- 提供 Python, Java, Scala, R 語言 API
由于 DataFrame 的數據類型統一是 Row,所以 DataFrame 也是有缺點的。Row 運行時類型檢查,比如 salary 是字符串類型,下面語句也只有運行時才進行類型檢查。 dataframe.filter("salary>1000").show()
Dataset擴展了 DataFrame API,提供了編譯時類型檢查,面向對象風格的 API。
Dataset 可以和 DataFrame、RDD 相互轉換。DataFrame=Dataset[Row],可見 DataFrame 是一種特殊的 Dataset。
這里小編要重點強調一下二者的區別,但是在學習spark-sql的時候就對二者的關系不太清楚,而且在面試的時候也問到了這個問題,真的是一番血淚史啊。
通過查看多個前輩對二者的總結我大概的總結一下二者的區別:
- Dataset可以認為是DataFrame的一個特例,主要區別是Dataset每一個record存儲的是一個強類型值而不是一個Row
- DataSet可以在編譯時檢查類型,而DataFrame只有在正真運行的時候才會檢查
- DataFrame每一行的類型都是Row,不解析我們就無法知曉其中有哪些字段,每個字段又是什么類型。我們只能通過getAs[類型]或者row(i)的方式來獲取特定的字段內容(超級大弊端);而dataSet每一行的類型是不一定的,在自定義了case class之后就可以很自由的獲取每一行的信息。
好了 廢話說了一堆,不如直接上代碼:
object SparkSqlTest {
def main(args: Array[String]): Unit = {
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
conf.setMaster("local[2]")
.setAppName("SparkSqlTest")
//設置spark的序列化器
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//將自定義的對象,加入序列化器中
.registerKryoClasses(Array(classOf[Person]))
//構建SparkSession對象
val spark: SparkSession = SparkSession.builder()
.config(conf).getOrCreate()
//創建sparkContext對象
val sc: SparkContext = spark.sparkContext
val list = List(
new Person("委xx", 18),
new Person("吳xx", 20),
new Person("戚xx", 30),
new Person("王xx", 40),
new Person("薛xx", 18)
)
//創建DataFrame
//構建元數據
val schema = StructType(List(
StructField("name", DataTypes.StringType),
StructField("age", DataTypes.IntegerType)
))
//構建RDD
val listRDD: RDD[Person] = sc.makeRDD(list)
val RowRDD: RDD[Row] = listRDD.map(field => {
Row(field.name, field.age)
})
val perDF: DataFrame = spark.createDataFrame(RowRDD,schema)
//創建DataSet
import spark.implicits._ //這句話一定要加
val perDS: Dataset[Person] = perDF.as[Person]
/**
* 這里主要介紹DF 和 DS的區別
*/
perDF.foreach(field=>{
val name=field.get(0) //根據元素的index,取出相應的元素的值
val age=field.getInt(1) //根據元素的index和元素的類型取出元素的值
field.getAs[Int]("age") //根據元素的類型和元素的名稱取出元素的值
println(s"${age},${name}")
})
perDS.foreach(field=>{
//直接根據上面定義的元素的名稱取值
val age=field.age
val name=field.name
println(s"${age},${name}")
})
}
}
case class Person(name: String, age: Int)
個人感覺,就是DataFrame雖然集成和很多優點,但是,如果想從DataFrame中取出具體的某個對象的某個屬性,是不能確定的,步驟比較繁瑣,而且類型不確定。但是使用DataSet則有效額的避免了所有的問題。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。