您好,登錄后才能下訂單哦!
[TOC]
? Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame并且作為分布式SQL查詢引擎的作用。類似于hive的作用。
1、容易集成:安裝Spark的時候,已經集成好了。不需要單獨安裝。
2、統一的數據訪問方式:JDBC、JSON、Hive、parquet文件(一種列式存儲文件,是SparkSQL默認的數據源,hive中也支持)
3、完全兼容Hive。可以將Hive中的數據,直接讀取到Spark SQL中處理。
一般在生產中,基本都是使用hive做數據倉庫存儲數據,然后用spark從hive讀取數據進行處理。
4、支持標準的數據連接:JDBC、ODBC
5、計算效率比基于mr的hive高,而且hive2.x版本中,hive建議使用spark作為執行引擎
DataFrame是組織成命名列的數據集。它在概念上等同于關系數據庫中的表,里面有表的結構以及數據,但在底層具有更豐富的優化。DataFrames可以從各種來源構建,
例如:
結構化數據文件
hive中的表
外部數據庫或現有RDDs
DataFrame API支持的語言有Scala,Java,Python和R。
? 比起RDD,DataFrame多了數據的結構信息,即schema。RDD是分布式的 Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少數據讀取以及執行計劃的優化。
Dataset是一個分布式的數據收集器。這是在Spark1.6之后新加的一個接口,兼顧了RDD的優點(強類型,可以使用功能強大的lambda)以及Spark SQL的執行器高效性的優點。所以可以把DataFrames看成是一種特殊的Datasets,即:Dataset(Row)
? Apache Spark 2.0引入了SparkSession,其為用戶提供了一個統一的切入點來使用Spark的各項功能,并且允許用戶通過它調用DataFrame和Dataset相關API來編寫Spark程序。最重要的是,它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互。
? 在2.0版本之前,與Spark交互之前必須先創建SparkConf和SparkContext。然而在Spark 2.0中,我們可以通過SparkSession來實現同樣的功能,而不需要顯式地創建SparkConf, SparkContext 以及 SQLContext,因為這些對象已經封裝在SparkSession中。
? 要注意一點,在我用的這個spark版本中,直接使用new SQLContext() 來創建SQLContext對象,會顯示該方式已經被棄用了(IDEA中會顯示已棄用),建議使用SparkSession來獲取SQLContext對象。
這種方式在scala中比較常用,因為case class是scala的特色
/**
表 t_stu 的結構為:
id name age
*/
object CreateDF {
def main(args: Array[String]): Unit = {
//這是最新的獲取SQLContext對象的方式
//2、創建SparkSession對象,設置master,appname
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
//3、通過spark獲取sparkContext對象,讀取數據
val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))
//4、將數據映射到case class中,也就是數據映射到表的對應字段中
val tb = lines.map(t=>emp(t(0).toInt,t(1),t(2).toInt))
//這里必須要加上隱式轉換,否則無法調用 toDF 函數
import spark.sqlContext.implicits._
//5、生成df
val df2 = tb.toDF()
//相當于select name from t_stu
df1.select($"name").show()
//關閉spark對象
spark.stop()
}
}
/*1、定義case class,每個屬性對應表中的字段名以及類型
一般生產中為了方便,會全部定義為string類型,然后有需要的時候
才根據實際情況將string轉為需要的類型
這一步相當于定義表的結構
*/
case class emp(id:Int,name:String,age:Int)
總結步驟為:
1、定義case class,用來表結構
2、創建sparkSession對象,用來讀取數據
3、將rdd中的數據和case class映射
4、調用 toDF 函數將rdd轉為 DataFrame
這種方式java比較常用
package SparkSQLExer
import org.apache
import org.apache.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
/**
* 創建dataschema方式2:
* 通過spark session對象創建,表結構通過StructType創建
*/
object CreateDF02 {
def main(args: Array[String]): Unit = {
val sparkS = SparkSession.builder().master("local").appName("create schema").getOrCreate()
//1、通過StructType創建表結構schema,里面表的每個字段使用 StructField定義
val tbSchema = StructType(List(
StructField("id",DataTypes.IntegerType),
StructField("name",DataTypes.StringType),
StructField("age",DataTypes.IntegerType)
))
//2、讀取數據
var lines = sparkS.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))
//3、將數據映射為ROW對象
val rdd1 = lines.map(t=>Row(t(0).toInt,t(1),t(2).toInt))
//4、創建表結構和表數據映射,返回的就是df
val df2 = sparkS.createDataFrame(rdd1, tbSchema)
//打印表結構
df2.printSchema()
sparkS.stop()
}
}
總結步驟為:
1、通過StructType創建表結構schema,里面表的每個字段使用 StructField定義
2、通過sparkSession.sparkContext讀取數據
3、將數據映射格式為Row對象
4、將StructType和數據Row對象映射,返回df
package SparkSQLExer
import org.apache.spark.sql.SparkSession
/**
* 創建df方式3:通過有格式的文件直接導入數據以及表結構,比如json格式的文件
* 返回的直接就是一個DF
*/
object CreateDF03 {
def main(args: Array[String]): Unit = {
val sparkS = SparkSession.builder().master("local").appName("create df through json").getOrCreate()
//讀取json方式1:
val jsonrdd1= sparkS.read.json("path")
//讀取json方式2:
val jsonrdd1= sparkS.read.format("json").load("path")
sparkS.stop()
}
}
這種方式比較簡單,就是直接讀取json文件而已
sparkS.read.xxxx讀取任意文件時,返回的都是DF對象
DSL語句其實就是將sql語句的一些操作轉為類似函數的方式去調用,比如:
df1.select("name").show
例子:
為了方便,直接在spark-shell里操作了,
spark-shell --master spark://bigdata121:7077
1、打印表結構
scala> df1.printSchema
root
|-- empno: integer (nullable = true)
|-- ename: string (nullable = true)
|-- job: string (nullable = true)
|-- mgr: integer (nullable = true)
|-- hiredate: string (nullable = true)
|-- sal: integer (nullable = true)
|-- comm: integer (nullable = true)
|-- deptno: integer (nullable = true)
2、顯示當前df的表數據或者查詢結果的數據
scala> df1.show
+-----+------+---------+----+----------+----+----+------+
|empno| ename| job| mgr| hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH| CLERK|7902|1980/12/17| 800| 0| 20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300| 30|
| 7521| WARD| SALESMAN|7698| 1981/2/22|1250| 500| 30|
| 7566| JONES| MANAGER|7839| 1981/4/2|2975| 0| 20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400| 30|
| 7698| BLAKE| MANAGER|7839| 1981/5/1|2850| 0| 30|
| 7782| CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10|
| 7788| SCOTT| ANALYST|7566| 1987/4/19|3000| 0| 20|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10|
| 7844|TURNER| SALESMAN|7698| 1981/9/8|1500| 0| 30|
| 7876| ADAMS| CLERK|7788| 1987/5/23|1100| 0| 20|
| 7900| JAMES| CLERK|7698| 1981/12/3| 950| 0| 30|
| 7902| FORD| ANALYST|7566| 1981/12/3|3000| 0| 20|
| 7934|MILLER| CLERK|7782| 1982/1/23|1300| 0| 10|
+-----+------+---------+----+----------+----+----+------+
3、執行select, 相當于select xxx form xxx where xxx
scala> df1.select("ename","sal").where("sal>2000").show
+------+----+
| ename| sal|
+------+----+
| SMITH| 800|
| ALLEN|1600|
| WARD|1250|
| JONES|2975|
|MARTIN|1250|
| BLAKE|2850|
| CLARK|2450|
| SCOTT|3000|
| KING|5000|
|TURNER|1500|
| ADAMS|1100|
| JAMES| 950|
| FORD|3000|
|MILLER|1300|
+------+----+
4、對某些列進行操作
對某個指定進行操作時,需要加上$符號,然后后面才能操作
$代表 取出來以后,再做一些操作。
注意:這個 $ 的用法在ideal中無法正常使用,解決方法下面說
scala> df1.select($"ename",$"sal",$"sal"+100).show
+------+----+-----------+
| ename| sal|(sal + 100)|
+------+----+-----------+
| SMITH| 800| 900|
| ALLEN|1600| 1700|
| WARD|1250| 1350|
| JONES|2975| 3075|
|MARTIN|1250| 1350|
| BLAKE|2850| 2950|
| CLARK|2450| 2550|
| SCOTT|3000| 3100|
| KING|5000| 5100|
|TURNER|1500| 1600|
| ADAMS|1100| 1200|
| JAMES| 950| 1050|
| FORD|3000| 3100|
|MILLER|1300| 1400|
+------+----+-----------+
5、過濾行
scala> df1.filter($"sal">2000).show
+-----+-----+---------+----+----------+----+----+------+
|empno|ename| job| mgr| hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+----+----+------+
| 7566|JONES| MANAGER|7839| 1981/4/2|2975| 0| 20|
| 7698|BLAKE| MANAGER|7839| 1981/5/1|2850| 0| 30|
| 7782|CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10|
| 7788|SCOTT| ANALYST|7566| 1987/4/19|3000| 0| 20|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10|
| 7902| FORD| ANALYST|7566| 1981/12/3|3000| 0| 20|
+-----+-----+---------+----+----------+----+----+------+
6、分組以及計數
scala> df1.groupBy($"deptno").count.show
+------+-----+
|deptno|count|
+------+-----+
| 20| 5|
| 10| 3|
| 30| 6|
+------+-----+
上面說到在ide中 select($"name")中無法正常使用,解決方法為:
在該語句之前加上這么一句:
import spark.sqlContext.implicits._
主要還是因為類型的問題,加上隱式轉換就好了
df對象不能直接執行sql。需要生成一個視圖,再執行SQL。
需要指定創建的視圖名稱,后面視圖名稱就相當于表名。
視圖后面還會細說,這里先有個概念
例子:
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
。。。。。。。。。。。。。。
//通過df對象創建臨時視圖。視圖名就相當于表名
df1.createOrReplaceTempView("emp")
//通過sparksession對象執行執行
spark.sql("select * from emp").show
spark.sql("select * from emp where sal > 2000").show
spark.sql("select deptno,count(1) from emp group by deptno").show
//可以創建多個視圖,不沖突
df1.createOrReplaceTempView("emp12345")
spark.sql("select e.deptno from emp12345 e").show
scala> case class Dept(deptno:Int,dname:String,loc:String)
defined class Dept
scala> val lines = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
lines: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[68] at map at <console>:24
scala> val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))
allDept: org.apache.spark.rdd.RDD[Dept] = MapPartitionsRDD[69] at map at <console>:28
scala> val df2 = allDept.toDF
df2: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 1 more field]
scala> df2.create
createGlobalTempView createOrReplaceTempView createTempView
scala> df2.createOrReplaceTempView("dept")
scala> spark.sql("select dname,ename from emp12345,dept where emp12345.deptno=dept.deptno").show
+----------+------+
| dname| ename|
+----------+------+
| RESEARCH| SMITH|
| RESEARCH| JONES|
| RESEARCH| SCOTT|
| RESEARCH| ADAMS|
| RESEARCH| FORD|
|ACCOUNTING| CLARK|
|ACCOUNTING| KING|
|ACCOUNTING|MILLER|
| SALES| ALLEN|
| SALES| WARD|
| SALES|MARTIN|
| SALES| BLAKE|
| SALES|TURNER|
| SALES| JAMES|
+----------+------+
和DataFrame類似,只是把 toDF改為調用 toDS 方法
package SparkSQLExer
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
object CreateDS {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))
val tb = lines.map(t=>emp1(t(0).toInt,t(1),t(2).toInt))
import spark.sqlContext.implicits._
val df1 = tb.toDS()
df1.select($"name")
}
}
case class emp1(id:Int,name:String,age:Int)
package SparkSQLExer
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
object CreateDS {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
//創建一個序列對象,里面都是emp1對象,映射的數據,然后直接toDS轉為DataSet
val ds1 = Seq(emp1(1,"king",20)).toDS()
ds1.printSchema()
}
}
case class emp1(id:Int,name:String,age:Int)
定義case class
case class Person(name:String,age:BigInt)
使用JSON數據生成DataFrame
val df = spark.read.format("json").load("/usr/local/tmp_files/people.json")
將DataFrame轉換成DataSet
df.as[Person].show
df.as[Person] 是一個 DataSet
as[T]中的泛型需要是一個case class類,用于映射表頭
DataSet支持的算子其實就是rdd和DataFrame算子的結合。
使用emp.json 生成DataFrame
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")
scala> empDF.show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename| hiredate| job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
| | 20| 7369| SMITH|1980/12/17| CLERK|7902| 800|
| 300| 30| 7499| ALLEN| 1981/2/20| SALESMAN|7698|1600|
| 500| 30| 7521| WARD| 1981/2/22| SALESMAN|7698|1250|
| | 20| 7566| JONES| 1981/4/2| MANAGER|7839|2975|
|1400| 30| 7654|MARTIN| 1981/9/28| SALESMAN|7698|1250|
| | 30| 7698| BLAKE| 1981/5/1| MANAGER|7839|2850|
| | 10| 7782| CLARK| 1981/6/9| MANAGER|7839|2450|
| | 20| 7788| SCOTT| 1987/4/19| ANALYST|7566|3000|
| | 10| 7839| KING|1981/11/17|PRESIDENT| |5000|
| 0| 30| 7844|TURNER| 1981/9/8| SALESMAN|7698|1500|
| | 20| 7876| ADAMS| 1987/5/23| CLERK|7788|1100|
| | 30| 7900| JAMES| 1981/12/3| CLERK|7698| 950|
| | 20| 7902| FORD| 1981/12/3| ANALYST|7566|3000|
| | 10| 7934|MILLER| 1982/1/23| CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+
scala> empDF.where($"sal" >= 3000).show
+----+------+-----+-----+----------+---------+----+----+
|comm|deptno|empno|ename| hiredate| job| mgr| sal|
+----+------+-----+-----+----------+---------+----+----+
| | 20| 7788|SCOTT| 1987/4/19| ANALYST|7566|3000|
| | 10| 7839| KING|1981/11/17|PRESIDENT| |5000|
| | 20| 7902| FORD| 1981/12/3| ANALYST|7566|3000|
+----+------+-----+-----+----------+---------+----+----+
#### empDF 轉換成 DataSet 需要 case class
scala> case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt)
defined class Emp
scala> val empDS = empDF.as[Emp]
empDS: org.apache.spark.sql.Dataset[Emp] = [comm: string, deptno: bigint ... 6 more fields]
scala> empDS.filter(_.sal > 3000).show
+----+------+-----+-----+----------+---------+---+----+
|comm|deptno|empno|ename| hiredate| job|mgr| sal|
+----+------+-----+-----+----------+---------+---+----+
| | 10| 7839| KING|1981/11/17|PRESIDENT| |5000|
+----+------+-----+-----+----------+---------+---+----+
scala> empDS.filter(_.deptno == 10).show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename| hiredate| job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
| | 10| 7782| CLARK| 1981/6/9| MANAGER|7839|2450|
| | 10| 7839| KING|1981/11/17|PRESIDENT| |5000|
| | 10| 7934|MILLER| 1982/1/23| CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+
多表查詢:
1、創建部門表
scala> val deptRDD = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
deptRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[154] at map at <console>:24
scala> case class Dept(deptno:Int,dname:String,loc:String)
defined class Dept
scala> val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
deptDS: org.apache.spark.sql.Dataset[Dept] = [deptno: int, dname: string ... 1 more field]
scala> deptDS.show
+------+----------+--------+
|deptno| dname| loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+
2、員工表
同上 empDS
empDS.join(deptDS,"deptno").where(xxxx) 連接兩個表,通過deptno字段
empDS.joinWith(deptDS,deptDS("deptno")===empDS("deptno")) 這個用于連接的字段名稱不一樣的情況
? 如果想使用標準的sql語句來操作df或者ds對象時,必須先給df或者ds對象創建視圖,然后通過SparkSession對象的sql函數來對相應的視圖進行操作才可以。那么視圖是什么?
? 視圖是一個虛表,不存儲數據,可以當做是表的一個訪問鏈接。視圖有兩種類型:
普通視圖:也叫本地視圖,只在當前session會話中有效
全局視圖:在全部session中都有效,全局視圖創建在指定命名空間中:global_temp 類似于一個庫
操作說明:
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")
創建本地視圖:
empDF.createOrReplaceTempView(視圖名),視圖存在就會重新創建
empDF.createTempView(視圖名),如果視圖存在就不會創建
創建全局視圖:
empDF.createGlobalTempView(視圖名)
對視圖執行sql操作,這里視圖名就類似于表名
spark.sql("xxxxx")
例子:
empDF.createOrReplaceTempView("emp")
spark.sql("select * from emp").show
注意,只要創建了視圖,那么就可以通過sparksession對象在任意一個類中操作視圖,也就是表。這個特性很好用,當我們要操作一些表時,可以一開始就讀取成df,然后創建成視圖,那么就可以在任意一個地方查詢表了。
通過SparkSession對象可以讀取不同格式的數據源:
val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
下面都用上面的spark代稱SparkSession。
1、load
spark.read.load(path):讀取指定路徑的文件,要求文件存儲格式為Parquet文件
2、format
spark.read.format("格式").load(path) :指定讀取其他格式的文件,如json
例子:
spark.read.format("json").load(path)
3、直接讀取其他格式文件
spark.read.格式名(路徑),這是上面2中的一個簡寫方式,例子:
spark.read.json(路徑) json格式文件
spark.read.text(路徑) 讀取文本文件
注意:這些方式返回的都是 DataFrame 對象
可以將DataFrame 對象寫入到指定格式的文件中,假設有個DataFrame 對象為df1.
1、save
df1.write.save(路徑)
他會將文件保存到這個目錄下,文件名spark隨機生成的,所以使用上面的讀取方式的時候,直接指定讀取目錄即可,不用指定文件名。輸出的文件格式為 Parquet。可以直接指定hdfs的路徑,否則就存儲到本地
如:
df1.write.save("/test")
spark.read.load("/test")
2、直接指定格式存儲
df1.write.json(路徑) 這樣就會以json格式保存文件,生成的文件名的情況和上面類似
3、指定保存模式
如果沒有指定保存模式,輸出路徑存在的情況下,就會報錯
df1.write.mode("append").json(路徑)
mode("append") 就表示文件存在時就追加
mode("overwrite") 表示覆蓋舊數據
4、保存為表
df1.write.saveAsTable(表名) 會保存在當前目錄的spark-warehouse 目錄下
5、format
df1.write.format(格式).save()
使用指定特定格式的方式來輸出保存數據,比如保存到MongoDB數據庫中
? 這種一種列式存儲格式,具體原理可以看看之前hive的文章。這種格式是默認的存儲格式,使用load和save時默認的格式,操作方式很像前面說的,這里不重復。這里要講的是Parquet的一個特殊的功能,支持schema(表結構)的合并。例子:
scala> val df1 = sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF("single","double")
df1: org.apache.spark.sql.DataFrame = [single: int, double: int]
scala> df1.show
+------+------+
|single|double|
+------+------+
| 1| 2|
| 2| 4|
| 3| 6|
| 4| 8|
| 5| 10|
+------+------+
scala> sc.makeRDD(1 to 5)
res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at makeRDD at <console>:25
scala> sc.makeRDD(1 to 5).collect
res9: Array[Int] = Array(1, 2, 3, 4, 5)
//導出表1
scala> df1.write.parquet("/usr/local/tmp_files/test_table/key=1")
scala> val df2 = sc.makeRDD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]
scala> df2.show
+------+------+
|single|triple|
+------+------+
| 6| 18|
| 7| 21|
| 8| 24|
| 9| 27|
| 10| 30|
+------+------+
//導出表2
scala> df2.write.parquet("/usr/local/tmp_files/test_table/key=2")
scala> val df3 = spark.read.parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 1 more field]
//直接讀取會丟失字段
scala> df3.show
+------+------+---+
|single|double|key|
+------+------+---+
| 8| null| 2|
| 9| null| 2|
| 10| null| 2|
| 3| 6| 1|
| 4| 8| 1|
| 5| 10| 1|
| 6| null| 2|
| 7| null| 2|
| 1| 2| 1|
| 2| 4| 1|
+------+------+---+
//加上option,指定"mergeSchema"為true,就可以合并
scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]
scala> df3.show
+------+------+------+---+
|single|double|triple|key|
+------+------+------+---+
| 8| null| 24| 2|
| 9| null| 27| 2|
| 10| null| 30| 2|
| 3| 6| null| 1|
| 4| 8| null| 1|
| 5| 10| null| 1|
| 6| null| 18| 2|
| 7| null| 21| 2|
| 1| 2| null| 1|
| 2| 4| null| 1|
+------+------+------+---+
補充問題:key 是什么?必須用key嘛?
key是不同表的一個區分字段,在合并的時候,會作為合并之后的表的一個字段,并且值等于key=xx 中設置的值
如果目錄下,兩個表的目錄名不一樣,是無法合并的,合并字段名可以任意,
如:一個是key ,一個是 test 這兩個無法合并,必須統一key或者test
這種一種帶表格式字段的文件,例子:
scala> val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> peopleDF.createOrReplaceTempView("people")
scala> spark.sql("select * from people where age=19")
res25: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> spark.sql("select * from people where age=19").show
+---+------+
|age| name|
+---+------+
| 19|Justin|
+---+------+
scala> spark.sql("select age,count(1) from people group by age").show
+----+--------+
| age|count(1)|
+----+--------+
| 19| 1|
|null| 1|
| 30| 1|
+----+--------+
df對象支持通過jdbc連接數據庫,寫入數據到數據庫,或者從數據庫讀取數據。
例子:
1、通過jdbc 從mysql讀取數據:
使用 format(xx).option()的方式指定連接數據庫的一些參數,比如用戶名密碼,使用的連接驅動等
import java.util.Properties
import org.apache.spark.sql.SparkSession
object ConnMysql {
def main(args: Array[String]): Unit = {
val sparkS = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
//連接mysql方式1:
//創建properties配置對象,用于存放連接mysql的參數
val mysqlConn = new Properties()
mysqlConn.setProperty("user","root")
mysqlConn.setProperty("password","wjt86912572")
//使用jdbc連接,指定連接字符串,表名,以及其他連接參數,并返回對應的dataframe
val mysqlDF1 = sparkS.read.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)
mysqlDF1.printSchema()
mysqlDF1.show()
mysqlDF1.createTempView("customer")
sparkS.sql("select * from customer limit 2").show()
//連接mysql方式2,這種方式比較常用:
val mysqlConn2 = sparkS.read.format("jdbc")
.option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")
.option("user","root")
.option("password","wjt86912572")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","customer").load()
mysqlConn2.printSchema()
}
}
這是兩種連接讀取數據的方式。
2、jdbc寫入數據到mysql
和讀取類似,只不過換成了write操作
import java.util.Properties
import org.apache.spark.sql.SparkSession
object WriteToMysql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("write to mysql").master("local").getOrCreate()
val df1 = spark.read.text("G:\\test\\t_stu.json")
//方式1:
df1.write.format("jdbc")
.option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")
.option("user","root")
.option("password","wjt86912572")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","customer").save()
//方式2:
val mysqlConn = new Properties()
mysqlConn.setProperty("user","root")
mysqlConn.setProperty("password","wjt86912572")
df1.write.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)
}
}
必須要保證df的表格式和寫入的mysql的表格式一樣,字段名也要一樣
1、通過jdbc連接hive
方式和普通jdbc類似,例如:
import java.util.Properties
import org.apache.spark.sql.SparkSession
/**
* 連接hive的情況有兩種:
* 1、如果是直接在ideal中運行spark程序的話,則必須在程序中指定jdbc連接的hiveserver的地址
* 且hiveserver必須以后臺服務的形式暴露10000端口出來.這種方式是直接通過jdbc連接hive
*
* 2、如果程序是打包到spark集群中運行的話,一般spark集群的conf目錄下,已經有hive client
* 的配置文件,就會直接啟動hive client來連接hive。這時不需要啟動hiveserver服務。
* 這種方式是通過hive client連接hive
*/
object ConnHive {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
val properties = new Properties()
properties.setProperty("user","")
properties.setProperty("password","")
val hiveDF = spark.read.jdbc("jdbc:hive2://bigdata121:10000/default","customer",properties)
hiveDF.printSchema()
spark.stop()
}
}
這種方式要注意一點:
hiveserver必須以后臺服務的形式暴露10000端口出來.這種方式是直接通過jdbc連接hive。
2、通過hive client連接hive
? 這種方式一般用在生產中,因為任務一般都是通過spark-submit提交到集群中運行,這時候就會直接通過hive client來連接hive,不會通過jdbc來連接了。
? 要注意:需要在spark的節點上都配置上hive client,然后將hive-site.xml配置文件拷貝到 spark的conf目錄下。同時需要將hadoop的core-site.xml hdfs-site.xml也拷貝過去。另外一方面,因為要使用hive client,所以hive server那邊,一般都要配置metastore server,具體配置看hive的文章。
? 這樣在spark集群中的程序就可以直接使用
spark.sql("xxxx").show
這樣的操作,默認就會從hive中讀取對應的表進行操作。不用另外做任何連接hive 的操作
或者直接到 spark-shell中,也是可以直接使用 上面的方式操作hive的表
例如:
import org.apache.spark.sql.SparkSession
object ConnHive02 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("spark sql conn hive").getOrCreate()
spark.sql("select * from customer").show()
}
}
這樣直接操作的就是 hive 的表了
import java.util.Properties
import org.apache.spark.sql.SparkSession
object HiveToMysql {
def main(args: Array[String]): Unit = {
//直接通過spark集群中的hive client連接hive,不需要jdbc以及hive server
val spark = SparkSession.builder().appName("hive to mysql").enableHiveSupport().getOrCreate()
val resultDF = spark.sql("select * from default.customer")
//一般中間寫的處理邏輯都是處理從hive讀取的數據,處理完成后寫入到mysql
val mysqlConn = new Properties()
mysqlConn.setProperty("user","root")
mysqlConn.setProperty("password","wjt86912572")
//通過jdbc寫入mysql
resultDF.write.mode("append").jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8", "customer", mysqlConn)
spark.stop()
}
}
先啟動個spark-shell
spark-shell --master spark://bigdata121:7077
要在spark-shell中操作mysql,所以記得自己找個 mysql-connector的jar放到spark的jars目錄下
例子:
創建df,從mysql讀取表
scala> val mysqDF = spark.read.format("jdbc").option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","wjt86912572").option("driver","com.mysql.jdbc.Driver").option("dbtable","customer").load()
mysqDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
scala> mysqDF.show
+---+------+--------------------+
| id| name| last_mod|
+---+------+--------------------+
| 1| neil|2019-07-20 17:09:...|
| 2| jack|2019-07-20 17:09:...|
| 3|martin|2019-07-20 17:09:...|
| 4| tony|2019-07-20 17:09:...|
| 5| eric|2019-07-20 17:09:...|
| 6| king|2019-07-20 17:42:...|
| 7| tao|2019-07-20 17:45:...|
+---+------+--------------------+
必須注冊成一張表,才可以緩存。
scala> mysqDF.registerTempTable("customer")
warning: there was one deprecation warning; re-run with -deprecation for details
標識這張表可以被緩存,但是現在數據并沒有直接緩存
scala> spark.sqlContext.cacheTable("customer")
第一次查詢表,從mysql讀取數據,并緩存到內存中
scala> spark.sql("select * from customer").show
+---+------+--------------------+
| id| name| last_mod|
+---+------+--------------------+
| 1| neil|2019-07-20 17:09:...|
| 2| jack|2019-07-20 17:09:...|
| 3|martin|2019-07-20 17:09:...|
| 4| tony|2019-07-20 17:09:...|
| 5| eric|2019-07-20 17:09:...|
| 6| king|2019-07-20 17:42:...|
| 7| tao|2019-07-20 17:45:...|
+---+------+--------------------+
這一次查詢從內存中返回
scala> spark.sql("select * from customer").show
+---+------+--------------------+
| id| name| last_mod|
+---+------+--------------------+
| 1| neil|2019-07-20 17:09:...|
| 2| jack|2019-07-20 17:09:...|
| 3|martin|2019-07-20 17:09:...|
| 4| tony|2019-07-20 17:09:...|
| 5| eric|2019-07-20 17:09:...|
| 6| king|2019-07-20 17:42:...|
| 7| tao|2019-07-20 17:45:...|
+---+------+--------------------+
清空緩存
scala> spark.sqlContext.clearCache
將數據緩存到內存中的相關優化參數
? spark.sql.inMemoryColumnarStorage.compressed
? 默認為 true
? Spark SQL 將會基于統計信息自動地為每一列選擇一種壓縮編碼方式。
? spark.sql.inMemoryColumnarStorage.batchSize
? 默認值:10000
? 緩存批處理大小。緩存數據時, 較大的批處理大小可以提高內存利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。
其他性能相關的配置選項(不過不推薦手動修改,可能在后續版本自動的自適應修改)
? spark.sql.files.maxPartitionBytes
? 默認值:128 MB
? 讀取文件時單個分區可容納的最大字節數
? spark.sql.files.openCostInBytes
? 默認值:4M
? 打開文件的估算成本, 按照同一時間能夠掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)。
spark.sql.autoBroadcastJoinThreshold
? 默認值:10M
? 用于配置一個表在執行 join 操作時能夠廣播給所有 worker 節點的最大字節大小。通過將這個值設置為 -1 可以禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。
spark.sql.shuffle.partitions
? 默認值:200
? 用于配置 join 或聚合操作混洗(shuffle)數據時使用的分區數。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。