您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關大數據SparkSQl指的是什么呢,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame并且作為分布式SQL查詢引擎的作用。SparkSql中返回的數據類型是DataFrame
1.1.1. 為什么要學習Spark SQL
我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然后提交到集群上執行,大大簡化了編寫MapReduce的程序的復雜性,由于MapReduce這種計算模型執行效率比較慢。所有Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然后提交到集群執行,執行效率非常快!
HIVE:簡化編寫MapReduce的程序的復雜性
Spark SQL轉換成RDD:替代MapReduce,提高效率
Spark1.0版本開始就推出了SparkSQL,最早是叫Shark
1、內存列存儲--可以大大優化內存使用效率,減少了內存消耗,避免了gc對大量數據的性能開銷
2、字節碼生成技術(byte-code generation)--可以使用動態字節碼生成技術來優化性能
3、Scala代碼的優化
結構化數據是指任何有結構信息的數據。所謂結構信息,就是每條記錄共用的已知的字段集合。當數據符合 這樣的條件時,Spark SQL 就會使得針對這些數據的讀取和查詢變得更加簡單高效。具體 來說,Spark SQL 提供了以下三大功能(見圖 9-1)。
(1) Spark SQL 可以從各種結構化數據源(例如 JSON、Hive、Parquet 等)中讀取數據。
(2) Spark SQL 不僅支持在 Spark 程序內使用 SQL 語句進行數據查詢,也支持從類似商業 智能軟件 Tableau 這樣的外部工具中通過標準數據庫連接器(JDBC/ODBC)連接 Spark SQL 進行查詢。
(3) 當在 Spark 程序內使用 Spark SQL 時,Spark SQL 支持 SQL 與常規的 Python/Java/Scala 代碼高度整合,包括連接 RDD 與 SQL 表、公開的自定義 SQL 函數接口等。這樣一來, 許多工作都更容易實現了。
為了實現這些功能,Spark SQL 提供了一種特殊的 RDD,叫作 SchemaRDD。SchemaRDD 是存放 Row 對象的 RDD,每個 Row 對象代表一行記錄。SchemaRDD 還包含記錄的結構信 息(即數據字段)。SchemaRDD 看起來和普通的 RDD 很像,但是在內部,SchemaRDD 可 以利用結構信息更加高效地存儲數據。此外,SchemaRDD 還支持 RDD 上所沒有的一些新 操作,比如運行 SQL 查詢。SchemaRDD 可以從外部數據源創建,也可以從查詢結果或普 通 RDD 中創建。
什么是DataFrames
(SparkSql中返回的數據類型: 它在概念上等同于關系數據庫中的表,但在查詢上進行了優化)
與RDD類似,DataFrame也是一個分布式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據以外,還記錄數據的結構信息,即schema。
1.1.1. 創建DataFrames
在Spark SQL中SQLContext是創建DataFrames和執行SQL的入口,在spark-1.6.1中已經內置了一個sqlContext
1.在本地創建一個文件,有三列,分別是id、name、age,用空格分隔,然后上傳到hdfs上
hdfs dfs -put person.txt /
2.在spark shell執行下面命令,讀取數據,將每一行的數據使用列分隔符分割
val lineRDD = sc.textFile("hdfs://node01:9000/person.txt").map(_.split(" "))
3.定義case class(相當于表的schema)
case class Person(id:Int, name:String, age:Int)
4.將RDD和case class關聯
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
(里面的數據是在Array中)
5.將RDD轉換成DataFrame
val personDF = personRDD.toDF
6.對DataFrame進行處理
personDF.show
val seq1 = Seq(("1","bingbing",35),("2","yuanyuan",34),("3","mimi",33))
val rdd1 =sc.parallelize(seq1)
val df = rdd1.toDF("id","name","age")
df.show
DSL:領域特定語言
////查看DataFrame中的內容
//查看DataFrame部分列中的內容
1.
2.
3.
//打印DataFrame的Schema信息
//查詢所有的name和age,并將age+1
1.df.select(col("id"),col("name"),col("age")+1).show
2.df.select(df("id"), df("name"), df("age") + 1).show
//過濾age大于等于18的
df.filter(col("age") >= 35).show
//按年齡進行分組并統計相同年齡的人數
df.groupBy("age").count().show()
SQL風格語法
//查詢年齡最大的前兩名
1.如果想使用SQL風格的語法,需要將DataFrame注冊成表
df.registerTempTable("t_person")
2.sqlContext.sql("select * from t_person order by age desc limit 2").show
//顯示表的Schema信息
以編程方式執行Spark SQL查詢
1.編寫Spark SQL查詢程序
1.通過反射推斷Schema
=======================================================
package com.qf.gp1708.day06
//通過反射獲取用戶信息
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object InferSchema {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("inferschema")
val sc = new SparkContext(conf)
val sqlContext:SQLContext = new SQLContext(sc)
1. //獲取數據并切分
val line = sc.textFile("C://Users/Song/Desktop/person.txt").map(_.split(","))
3 //將獲取的數據和Person樣例類進行關聯
val personRdd: RDD[Godness] = line.map(arr=>Godness(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt))
//引入隱式轉換函數,這樣才可以調用到toDF方法
import sqlContext.implicits._
4 //將personRDD轉換成DataFrame
val dF: DataFrame = personRdd.toDF
5. //注冊一張臨時表
dF.registerTempTable("t_person")
val sql = "select * from t_person where fv > 70 order by age"
//查詢
val res: DataFrame = sqlContext.sql(sql)
res.show()
sc.stop()
}
}
2//創建樣例類
case class Godness(id:Long,name:String,age:Int,fv:Int)
=========================================================
2.通過StructType直接指定Schema
===========================================
package com.qf.gp1708.day06
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 通過StructType類型直接指定Schema
*/
object StructTypeSchema {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("str")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//獲取數據并切分
val lines = sc.textFile("hdfs://...").map(_.split(","))
//指定schema信息
StructType{
List(
StructField("id",IntegerType,false),
StructField("name",StringType,true),
StructField("age",IntegerType,true),
StructField("fv",IntegerType,true),
)
}
//開始映射
val rowRDD: RDD[Row] = lines.map(arr =>Row(arr(0).toInt,arr(1),arr(2).toInt,arr(3).toInt))
//把RDD轉換為DataFrame
val personDF: DataFrame = sqlContext.createDataFrame(rowRDD,schema)
//生成臨時表
personDF.registerTempTable("t_person")
val sql = "select name,age,fv from t_person where age >30 order by age desc"
val res = sqlContext.sql(sql)
res.write.mode("append").json("c://out-20180903-1")
sc.stop()
}
}
=================================================================
1. 數據源
1.1. JDBC
Spark SQL可以通過JDBC從關系型數據庫中讀取數據的方式創建DataFrame,通過對DataFrame一系列的計算后,還可以將數據再寫回關系型數據庫中。
1.1.1. 從MySQL中加載數據(Spark Shell方式)
1.啟動Spark Shell,必須指定mysql連接驅動jar包
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-shell \
--master spark://node01:7077 \
--jars /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
(指定MySQL包)
--driver-class-path /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar (指定驅動類)
2.從mysql中加載數據
val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://node03:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "person", "user" -> "root", "password" -> "root")).load()
3.執行查詢
jdbcDF.show()
1.1.2. 將數據寫入到MySQL中(打jar包方式)
package com.qf.gp1708.day06
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 寫入數據到MySQL
*/
object InsertData2MySQLDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lines= sc.textFile("").map(_.split(","))
//生成Schema
val schema = StructType {
Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("fv", StringType, true),
)
}
//映射
val personRDD = lines.map(arr =>Row(arr(1).toString,arr(2).toInt,arr(3).toInt))
//生成DataFrame
val personDF = sqlContext.createDataFrame(personRDD,schema)
//生成用于寫入MySQL的配置信息
val prop = new Properties()
prop.put("user","root")
prop.put("password","root")
prop.put("driver","com.mysql.jdbc.Driver")
val jdbcUrl="jdbc:mysql://hadoop03:3306/bigdata"
val table="person"
//把數據寫入MySQL
personDF.write.mode("append").jdbc(jdbcUrl,table,prop)
sc.stop()
}
}
/usr/local/spark-1.6.3-bin-hadoop2.6/spark-submit \
--class com.qf..... \
--master spark://hadoop01:7077 \
--executor-memory 512m \
--total-executor-cores 2 \
--jars /usr/.../mysql-connector-java-5.1.35-bin.jar \
--driver-class-path /usr/.../mysql-connector-java-5.1.35-bin.jar \
/root/1.jar
=======================================================
kafka:消息中間件(緩存數據)---解耦
為處理實時數據提供一個統一、高吞吐量、低等待的平臺
3、為什么需要消息隊列(重要、了解)
消息系統的核心作用就是三點:解耦,異步和并行
Kafka對消息保存時根據Topic進行歸類
Topic:底層就是隊列,將不同的消息放在不同的隊列中進行分類
上述就是小編為大家分享的大數據SparkSQl指的是什么呢了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。