91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

四、spark--sparkSQL原理和使用

發布時間:2020-07-04 13:35:16 來源:網絡 閱讀:1145 作者:隔壁小白 欄目:大數據

[TOC]

一、spark SQL概述

1.1 什么是spark SQL

? Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame并且作為分布式SQL查詢引擎的作用。類似于hive的作用。

1.2 spark SQL的特點

1、容易集成:安裝Spark的時候,已經集成好了。不需要單獨安裝。
2、統一的數據訪問方式:JDBC、JSON、Hive、parquet文件(一種列式存儲文件,是SparkSQL默認的數據源,hive中也支持)
3、完全兼容Hive。可以將Hive中的數據,直接讀取到Spark SQL中處理。
一般在生產中,基本都是使用hive做數據倉庫存儲數據,然后用spark從hive讀取數據進行處理。
4、支持標準的數據連接:JDBC、ODBC
5、計算效率比基于mr的hive高,而且hive2.x版本中,hive建議使用spark作為執行引擎

二、spark SQL基本原理

2.1 DataFrame和DataSet基本概念

2.1.1 DataFrame

DataFrame是組織成命名列的數據集。它在概念上等同于關系數據庫中的表,里面有表的結構以及數據,但在底層具有更豐富的優化。DataFrames可以從各種來源構建,
例如:
結構化數據文件
hive中的表
外部數據庫或現有RDDs
DataFrame API支持的語言有Scala,Java,Python和R。

? 比起RDD,DataFrame多了數據的結構信息,即schema。RDD是分布式的 Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少數據讀取以及執行計劃的優化。

2.1.2 DataSet

Dataset是一個分布式的數據收集器。這是在Spark1.6之后新加的一個接口,兼顧了RDD的優點(強類型,可以使用功能強大的lambda)以及Spark SQL的執行器高效性的優點。所以可以把DataFrames看成是一種特殊的Datasets,即:Dataset(Row)

2.2 創建DataFrame的方式

2.2.1 SparkSession對象

? Apache Spark 2.0引入了SparkSession,其為用戶提供了一個統一的切入點來使用Spark的各項功能,并且允許用戶通過它調用DataFrame和Dataset相關API來編寫Spark程序。最重要的是,它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互。
? 在2.0版本之前,與Spark交互之前必須先創建SparkConf和SparkContext。然而在Spark 2.0中,我們可以通過SparkSession來實現同樣的功能,而不需要顯式地創建SparkConf, SparkContext 以及 SQLContext,因為這些對象已經封裝在SparkSession中。
? 要注意一點,在我用的這個spark版本中,直接使用new SQLContext() 來創建SQLContext對象,會顯示該方式已經被棄用了(IDEA中會顯示已棄用),建議使用SparkSession來獲取SQLContext對象。

2.2.2 通過case class樣本類

這種方式在scala中比較常用,因為case class是scala的特色

/**
表 t_stu 的結構為:
id name age
*/

object CreateDF {
  def main(args: Array[String]): Unit = {
    //這是最新的獲取SQLContext對象的方式
    //2、創建SparkSession對象,設置master,appname
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
    //3、通過spark獲取sparkContext對象,讀取數據
    val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))

    //4、將數據映射到case class中,也就是數據映射到表的對應字段中
    val tb = lines.map(t=>emp(t(0).toInt,t(1),t(2).toInt))
    //這里必須要加上隱式轉換,否則無法調用 toDF 函數
    import spark.sqlContext.implicits._

    //5、生成df
    val df2 = tb.toDF()

    //相當于select name from t_stu
    df1.select($"name").show()

    //關閉spark對象
    spark.stop()
  }
}

/*1、定義case class,每個屬性對應表中的字段名以及類型
     一般生產中為了方便,會全部定義為string類型,然后有需要的時候
     才根據實際情況將string轉為需要的類型
   這一步相當于定義表的結構
*/
case class emp(id:Int,name:String,age:Int)

總結步驟為:

1、定義case class,用來表結構
2、創建sparkSession對象,用來讀取數據
3、將rdd中的數據和case class映射
4、調用 toDF 函數將rdd轉為 DataFrame

2.2.3 通過StructType類

這種方式java比較常用

package SparkSQLExer

import org.apache
import org.apache.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

/**
  * 創建dataschema方式2:
  * 通過spark session對象創建,表結構通過StructType創建
  */
object CreateDF02 {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().master("local").appName("create schema").getOrCreate()

    //1、通過StructType創建表結構schema,里面表的每個字段使用 StructField定義
    val tbSchema = StructType(List(
        StructField("id",DataTypes.IntegerType),
        StructField("name",DataTypes.StringType),
        StructField("age",DataTypes.IntegerType)
      ))

    //2、讀取數據
    var lines = sparkS.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))

    //3、將數據映射為ROW對象
    val rdd1 = lines.map(t=>Row(t(0).toInt,t(1),t(2).toInt))

    //4、創建表結構和表數據映射,返回的就是df
    val df2 = sparkS.createDataFrame(rdd1, tbSchema)

    //打印表結構
    df2.printSchema()

    sparkS.stop()

  }

}

總結步驟為:

1、通過StructType創建表結構schema,里面表的每個字段使用 StructField定義
2、通過sparkSession.sparkContext讀取數據
3、將數據映射格式為Row對象
4、將StructType和數據Row對象映射,返回df

2.2.4 使用json等有表格式的文件類型

package SparkSQLExer

import org.apache.spark.sql.SparkSession

/**
  * 創建df方式3:通過有格式的文件直接導入數據以及表結構,比如json格式的文件
  * 返回的直接就是一個DF
  */
object CreateDF03 {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().master("local").appName("create df through json").getOrCreate()

    //讀取json方式1:
    val jsonrdd1= sparkS.read.json("path")

    //讀取json方式2:
    val jsonrdd1= sparkS.read.format("json").load("path")

    sparkS.stop()
  }
}

這種方式比較簡單,就是直接讀取json文件而已
sparkS.read.xxxx讀取任意文件時,返回的都是DF對象

2.3 操作DataFrame

2.3.1 DSL語句

DSL語句其實就是將sql語句的一些操作轉為類似函數的方式去調用,比如:

df1.select("name").show

例子:

為了方便,直接在spark-shell里操作了,
spark-shell --master spark://bigdata121:7077

1、打印表結構
scala> df1.printSchema
root
|-- empno: integer (nullable = true)
|-- ename: string (nullable = true)
|-- job: string (nullable = true)
|-- mgr: integer (nullable = true)
|-- hiredate: string (nullable = true)
|-- sal: integer (nullable = true)
|-- comm: integer (nullable = true)
|-- deptno: integer (nullable = true)

2、顯示當前df的表數據或者查詢結果的數據
scala> df1.show
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980/12/17| 800|   0|    20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698| 1981/2/22|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|  1981/4/2|2975|   0|    20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|    30|
| 7782| CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7788| SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|    20|
| 7839|  KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7844|TURNER| SALESMAN|7698|  1981/9/8|1500|   0|    30|
| 7876| ADAMS|    CLERK|7788| 1987/5/23|1100|   0|    20|
| 7900| JAMES|    CLERK|7698| 1981/12/3| 950|   0|    30|
| 7902|  FORD|  ANALYST|7566| 1981/12/3|3000|   0|    20|
| 7934|MILLER|    CLERK|7782| 1982/1/23|1300|   0|    10|
+-----+------+---------+----+----------+----+----+------+

3、執行select, 相當于select xxx form  xxx where xxx
scala> df1.select("ename","sal").where("sal>2000").show
+------+----+
| ename| sal|
+------+----+
| SMITH| 800|
| ALLEN|1600|
|  WARD|1250|
| JONES|2975|
|MARTIN|1250|
| BLAKE|2850|
| CLARK|2450|
| SCOTT|3000|
|  KING|5000|
|TURNER|1500|
| ADAMS|1100|
| JAMES| 950|
|  FORD|3000|
|MILLER|1300|
+------+----+

4、對某些列進行操作
對某個指定進行操作時,需要加上$符號,然后后面才能操作
$代表 取出來以后,再做一些操作。
注意:這個 $ 的用法在ideal中無法正常使用,解決方法下面說
scala> df1.select($"ename",$"sal",$"sal"+100).show
+------+----+-----------+
| ename| sal|(sal + 100)|
+------+----+-----------+
| SMITH| 800|        900|
| ALLEN|1600|       1700|
|  WARD|1250|       1350|
| JONES|2975|       3075|
|MARTIN|1250|       1350|
| BLAKE|2850|       2950|
| CLARK|2450|       2550|
| SCOTT|3000|       3100|
|  KING|5000|       5100|
|TURNER|1500|       1600|
| ADAMS|1100|       1200|
| JAMES| 950|       1050|
|  FORD|3000|       3100|
|MILLER|1300|       1400|
+------+----+-----------+

5、過濾行
scala> df1.filter($"sal">2000).show
+-----+-----+---------+----+----------+----+----+------+
|empno|ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+----+----+------+
| 7566|JONES|  MANAGER|7839|  1981/4/2|2975|   0|    20|
| 7698|BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|    30|
| 7782|CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7788|SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|    20|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7902| FORD|  ANALYST|7566| 1981/12/3|3000|   0|    20|
+-----+-----+---------+----+----------+----+----+------+

6、分組以及計數
scala> df1.groupBy($"deptno").count.show
+------+-----+                                                                  
|deptno|count|
+------+-----+
|    20|    5|
|    10|    3|
|    30|    6|
+------+-----+

上面說到在ide中 select($"name")中無法正常使用,解決方法為:

在該語句之前加上這么一句:
import spark.sqlContext.implicits._

主要還是因為類型的問題,加上隱式轉換就好了

2.3.2 sql語句

df對象不能直接執行sql。需要生成一個視圖,再執行SQL。
需要指定創建的視圖名稱,后面視圖名稱就相當于表名。
視圖后面還會細說,這里先有個概念
例子:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
。。。。。。。。。。。。。。
//通過df對象創建臨時視圖。視圖名就相當于表名
df1.createOrReplaceTempView("emp")

//通過sparksession對象執行執行
spark.sql("select * from emp").show
spark.sql("select * from emp where sal > 2000").show
spark.sql("select deptno,count(1) from emp group by deptno").show

//可以創建多個視圖,不沖突
df1.createOrReplaceTempView("emp12345")
spark.sql("select e.deptno from emp12345 e").show

2.3.3 多表查詢

scala> case class Dept(deptno:Int,dname:String,loc:String)
defined class Dept

scala> val lines = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
lines: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[68] at map at <console>:24

scala> val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))
allDept: org.apache.spark.rdd.RDD[Dept] = MapPartitionsRDD[69] at map at <console>:28

scala> val df2 = allDept.toDF
df2: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 1 more field]

scala> df2.create
createGlobalTempView   createOrReplaceTempView   createTempView

scala> df2.createOrReplaceTempView("dept")

scala> spark.sql("select dname,ename from emp12345,dept where emp12345.deptno=dept.deptno").show
+----------+------+                                                             
|     dname| ename|
+----------+------+
|  RESEARCH| SMITH|
|  RESEARCH| JONES|
|  RESEARCH| SCOTT|
|  RESEARCH| ADAMS|
|  RESEARCH|  FORD|
|ACCOUNTING| CLARK|
|ACCOUNTING|  KING|
|ACCOUNTING|MILLER|
|     SALES| ALLEN|
|     SALES|  WARD|
|     SALES|MARTIN|
|     SALES| BLAKE|
|     SALES|TURNER|
|     SALES| JAMES|
+----------+------+

2.4 創建DataSet

2.4.1 通過case class

和DataFrame類似,只是把 toDF改為調用 toDS 方法

package SparkSQLExer

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object CreateDS {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
    val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))
    val tb = lines.map(t=>emp1(t(0).toInt,t(1),t(2).toInt))
    import spark.sqlContext.implicits._
    val df1 = tb.toDS()
    df1.select($"name")

  }
}

case class emp1(id:Int,name:String,age:Int)

2.4.2 通過序列Seq類對象

package SparkSQLExer

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object CreateDS {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()

    //創建一個序列對象,里面都是emp1對象,映射的數據,然后直接toDS轉為DataSet
    val ds1 = Seq(emp1(1,"king",20)).toDS()
    ds1.printSchema()

  }
}

case class emp1(id:Int,name:String,age:Int)

2.4.3 使用json格式文件

定義case class
case class Person(name:String,age:BigInt)

使用JSON數據生成DataFrame
val df = spark.read.format("json").load("/usr/local/tmp_files/people.json")

將DataFrame轉換成DataSet
df.as[Person].show

df.as[Person] 是一個 DataSet
as[T]中的泛型需要是一個case class類,用于映射表頭

2.5 操作DataSet

DataSet支持的算子其實就是rdd和DataFrame算子的結合。

使用emp.json 生成DataFrame
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")

scala> empDF.show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
|    |    20| 7369| SMITH|1980/12/17|    CLERK|7902| 800|
| 300|    30| 7499| ALLEN| 1981/2/20| SALESMAN|7698|1600|
| 500|    30| 7521|  WARD| 1981/2/22| SALESMAN|7698|1250|
|    |    20| 7566| JONES|  1981/4/2|  MANAGER|7839|2975|
|1400|    30| 7654|MARTIN| 1981/9/28| SALESMAN|7698|1250|
|    |    30| 7698| BLAKE|  1981/5/1|  MANAGER|7839|2850|
|    |    10| 7782| CLARK|  1981/6/9|  MANAGER|7839|2450|
|    |    20| 7788| SCOTT| 1987/4/19|  ANALYST|7566|3000|
|    |    10| 7839|  KING|1981/11/17|PRESIDENT|    |5000|
|   0|    30| 7844|TURNER|  1981/9/8| SALESMAN|7698|1500|
|    |    20| 7876| ADAMS| 1987/5/23|    CLERK|7788|1100|
|    |    30| 7900| JAMES| 1981/12/3|    CLERK|7698| 950|
|    |    20| 7902|  FORD| 1981/12/3|  ANALYST|7566|3000|
|    |    10| 7934|MILLER| 1982/1/23|    CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+

scala> empDF.where($"sal" >= 3000).show
+----+------+-----+-----+----------+---------+----+----+
|comm|deptno|empno|ename|  hiredate|      job| mgr| sal|
+----+------+-----+-----+----------+---------+----+----+
|    |    20| 7788|SCOTT| 1987/4/19|  ANALYST|7566|3000|
|    |    10| 7839| KING|1981/11/17|PRESIDENT|    |5000|
|    |    20| 7902| FORD| 1981/12/3|  ANALYST|7566|3000|
+----+------+-----+-----+----------+---------+----+----+

#### empDF 轉換成 DataSet 需要 case class

scala> case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt)
defined class Emp

scala> val empDS = empDF.as[Emp]
empDS: org.apache.spark.sql.Dataset[Emp] = [comm: string, deptno: bigint ... 6 more fields]

scala> empDS.filter(_.sal > 3000).show
+----+------+-----+-----+----------+---------+---+----+
|comm|deptno|empno|ename|  hiredate|      job|mgr| sal|
+----+------+-----+-----+----------+---------+---+----+
|    |    10| 7839| KING|1981/11/17|PRESIDENT|   |5000|
+----+------+-----+-----+----------+---------+---+----+

scala> empDS.filter(_.deptno == 10).show
+----+------+-----+------+----------+---------+----+----+
|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|
+----+------+-----+------+----------+---------+----+----+
|    |    10| 7782| CLARK|  1981/6/9|  MANAGER|7839|2450|
|    |    10| 7839|  KING|1981/11/17|PRESIDENT|    |5000|
|    |    10| 7934|MILLER| 1982/1/23|    CLERK|7782|1300|
+----+------+-----+------+----------+---------+----+----+

多表查詢:

1、創建部門表
scala> val deptRDD = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
deptRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[154] at map at <console>:24

scala> case class Dept(deptno:Int,dname:String,loc:String)
defined class Dept

scala> val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
deptDS: org.apache.spark.sql.Dataset[Dept] = [deptno: int, dname: string ... 1 more field]

scala> deptDS.show
+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+

2、員工表
同上 empDS

empDS.join(deptDS,"deptno").where(xxxx) 連接兩個表,通過deptno字段
empDS.joinWith(deptDS,deptDS("deptno")===empDS("deptno")) 這個用于連接的字段名稱不一樣的情況

2.6 視圖view

? 如果想使用標準的sql語句來操作df或者ds對象時,必須先給df或者ds對象創建視圖,然后通過SparkSession對象的sql函數來對相應的視圖進行操作才可以。那么視圖是什么?
? 視圖是一個虛表,不存儲數據,可以當做是表的一個訪問鏈接。視圖有兩種類型:
普通視圖:也叫本地視圖,只在當前session會話中有效
全局視圖:在全部session中都有效,全局視圖創建在指定命名空間中:global_temp 類似于一個庫
操作說明:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")

創建本地視圖:
empDF.createOrReplaceTempView(視圖名),視圖存在就會重新創建
empDF.createTempView(視圖名),如果視圖存在就不會創建

創建全局視圖:
empDF.createGlobalTempView(視圖名)

對視圖執行sql操作,這里視圖名就類似于表名
spark.sql("xxxxx")

例子:
empDF.createOrReplaceTempView("emp")
spark.sql("select * from emp").show

注意,只要創建了視圖,那么就可以通過sparksession對象在任意一個類中操作視圖,也就是表。這個特性很好用,當我們要操作一些表時,可以一開始就讀取成df,然后創建成視圖,那么就可以在任意一個地方查詢表了。

2.7 數據源

通過SparkSession對象可以讀取不同格式的數據源:

val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()

下面都用上面的spark代稱SparkSession。

2.7.1 SparkSession讀取數據的方式

1、load
spark.read.load(path):讀取指定路徑的文件,要求文件存儲格式為Parquet文件

2、format
spark.read.format("格式").load(path) :指定讀取其他格式的文件,如json
例子:
spark.read.format("json").load(path)

3、直接讀取其他格式文件
spark.read.格式名(路徑),這是上面2中的一個簡寫方式,例子:
spark.read.json(路徑)  json格式文件
spark.read.text(路徑)  讀取文本文件

注意:這些方式返回的都是 DataFrame 對象

2.7.2 SparkSession保存數據的方式

可以將DataFrame 對象寫入到指定格式的文件中,假設有個DataFrame 對象為df1.

1、save
df1.write.save(路徑) 
他會將文件保存到這個目錄下,文件名spark隨機生成的,所以使用上面的讀取方式的時候,直接指定讀取目錄即可,不用指定文件名。輸出的文件格式為 Parquet。可以直接指定hdfs的路徑,否則就存儲到本地
如:
df1.write.save("/test")
spark.read.load("/test")

2、直接指定格式存儲
df1.write.json(路徑)  這樣就會以json格式保存文件,生成的文件名的情況和上面類似

3、指定保存模式
如果沒有指定保存模式,輸出路徑存在的情況下,就會報錯
df1.write.mode("append").json(路徑) 
mode("append") 就表示文件存在時就追加
mode("overwrite") 表示覆蓋舊數據

4、保存為表
df1.write.saveAsTable(表名) 會保存在當前目錄的spark-warehouse 目錄下

5、format
df1.write.format(格式).save()
使用指定特定格式的方式來輸出保存數據,比如保存到MongoDB數據庫中

2.7.3 Parquet格式

? 這種一種列式存儲格式,具體原理可以看看之前hive的文章。這種格式是默認的存儲格式,使用load和save時默認的格式,操作方式很像前面說的,這里不重復。這里要講的是Parquet的一個特殊的功能,支持schema(表結構)的合并。例子:

scala> val df1 = sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF("single","double")
df1: org.apache.spark.sql.DataFrame = [single: int, double: int]

scala> df1.show
+------+------+
|single|double|
+------+------+
|     1|     2|
|     2|     4|
|     3|     6|
|     4|     8|
|     5|    10|
+------+------+

scala> sc.makeRDD(1 to 5)
res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at makeRDD at <console>:25

scala> sc.makeRDD(1 to 5).collect
res9: Array[Int] = Array(1, 2, 3, 4, 5)

//導出表1
scala> df1.write.parquet("/usr/local/tmp_files/test_table/key=1")

scala> val df2 = sc.makeRDD(6 to 10).map(i=>(i,i*3)).toDF("single","triple")
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]

scala> df2.show
+------+------+
|single|triple|
+------+------+
|     6|    18|
|     7|    21|
|     8|    24|
|     9|    27|
|    10|    30|
+------+------+

//導出表2
scala> df2.write.parquet("/usr/local/tmp_files/test_table/key=2")

scala> val df3 = spark.read.parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 1 more field]

//直接讀取會丟失字段
scala> df3.show
+------+------+---+
|single|double|key|
+------+------+---+
|     8|  null|  2|
|     9|  null|  2|
|    10|  null|  2|
|     3|     6|  1|
|     4|     8|  1|
|     5|    10|  1|
|     6|  null|  2|
|     7|  null|  2|
|     1|     2|  1|
|     2|     4|  1|
+------+------+---+

//加上option,指定"mergeSchema"為true,就可以合并
scala> val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]

scala> df3.show
+------+------+------+---+
|single|double|triple|key|
+------+------+------+---+
|     8|  null|    24|  2|
|     9|  null|    27|  2|
|    10|  null|    30|  2|
|     3|     6|  null|  1|
|     4|     8|  null|  1|
|     5|    10|  null|  1|
|     6|  null|    18|  2|
|     7|  null|    21|  2|
|     1|     2|  null|  1|
|     2|     4|  null|  1|
+------+------+------+---+

補充問題:key 是什么?必須用key嘛?
key是不同表的一個區分字段,在合并的時候,會作為合并之后的表的一個字段,并且值等于key=xx 中設置的值
如果目錄下,兩個表的目錄名不一樣,是無法合并的,合并字段名可以任意,
如:一個是key ,一個是 test 這兩個無法合并,必須統一key或者test

2.7.4 json文件

這種一種帶表格式字段的文件,例子:

scala> val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> peopleDF.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

scala> peopleDF.createOrReplaceTempView("people")

scala> spark.sql("select * from people where age=19")
res25: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> spark.sql("select * from people where age=19").show
+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+

scala> spark.sql("select age,count(1) from people group by  age").show
+----+--------+                                                                 
| age|count(1)|
+----+--------+
|  19|       1|
|null|       1|
|  30|       1|
+----+--------+

2.7.5 JDBC 連接

df對象支持通過jdbc連接數據庫,寫入數據到數據庫,或者從數據庫讀取數據。
例子:
1、通過jdbc 從mysql讀取數據:

使用 format(xx).option()的方式指定連接數據庫的一些參數,比如用戶名密碼,使用的連接驅動等

import java.util.Properties

import org.apache.spark.sql.SparkSession

object ConnMysql {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
    //連接mysql方式1:
    //創建properties配置對象,用于存放連接mysql的參數
    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    //使用jdbc連接,指定連接字符串,表名,以及其他連接參數,并返回對應的dataframe
    val mysqlDF1 = sparkS.read.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)
    mysqlDF1.printSchema()
    mysqlDF1.show()
    mysqlDF1.createTempView("customer")
    sparkS.sql("select * from customer limit 2").show()

    //連接mysql方式2,這種方式比較常用:
    val mysqlConn2 = sparkS.read.format("jdbc")
      .option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")              
      .option("user","root")
      .option("password","wjt86912572")
      .option("driver","com.mysql.jdbc.Driver")
      .option("dbtable","customer").load()

    mysqlConn2.printSchema()
  }
}

這是兩種連接讀取數據的方式。

2、jdbc寫入數據到mysql

和讀取類似,只不過換成了write操作

import java.util.Properties

import org.apache.spark.sql.SparkSession

object WriteToMysql {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("write to mysql").master("local").getOrCreate()

    val df1 = spark.read.text("G:\\test\\t_stu.json")

    //方式1:
    df1.write.format("jdbc")
      .option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8")
      .option("user","root")
      .option("password","wjt86912572")
      .option("driver","com.mysql.jdbc.Driver")
      .option("dbtable","customer").save()

    //方式2:
    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    df1.write.jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8","customer",mysqlConn)

  }

}

必須要保證df的表格式和寫入的mysql的表格式一樣,字段名也要一樣

2.7.6 hive

1、通過jdbc連接hive
方式和普通jdbc類似,例如:

import java.util.Properties

import org.apache.spark.sql.SparkSession

/**
  * 連接hive的情況有兩種:
  * 1、如果是直接在ideal中運行spark程序的話,則必須在程序中指定jdbc連接的hiveserver的地址
  * 且hiveserver必須以后臺服務的形式暴露10000端口出來.這種方式是直接通過jdbc連接hive
  *
  * 2、如果程序是打包到spark集群中運行的話,一般spark集群的conf目錄下,已經有hive client
  * 的配置文件,就會直接啟動hive client來連接hive。這時不需要啟動hiveserver服務。
  * 這種方式是通過hive client連接hive
  */
object ConnHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("spark sql conn mysql").master("local").getOrCreate()
    val properties = new Properties()
    properties.setProperty("user","")
    properties.setProperty("password","")

    val hiveDF = spark.read.jdbc("jdbc:hive2://bigdata121:10000/default","customer",properties)
    hiveDF.printSchema()
    spark.stop()
  }
}

這種方式要注意一點:
hiveserver必須以后臺服務的形式暴露10000端口出來.這種方式是直接通過jdbc連接hive。

2、通過hive client連接hive
? 這種方式一般用在生產中,因為任務一般都是通過spark-submit提交到集群中運行,這時候就會直接通過hive client來連接hive,不會通過jdbc來連接了。
? 要注意:需要在spark的節點上都配置上hive client,然后將hive-site.xml配置文件拷貝到 spark的conf目錄下。同時需要將hadoop的core-site.xml hdfs-site.xml也拷貝過去。另外一方面,因為要使用hive client,所以hive server那邊,一般都要配置metastore server,具體配置看hive的文章。
? 這樣在spark集群中的程序就可以直接使用

spark.sql("xxxx").show

這樣的操作,默認就會從hive中讀取對應的表進行操作。不用另外做任何連接hive 的操作

或者直接到 spark-shell中,也是可以直接使用 上面的方式操作hive的表
例如:

import org.apache.spark.sql.SparkSession

object ConnHive02 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("spark sql conn hive").getOrCreate()
    spark.sql("select * from customer").show()
  }

}

這樣直接操作的就是 hive 的表了

2.8 小案例--讀取hive數據分析結果寫入mysql

import java.util.Properties

import org.apache.spark.sql.SparkSession

object HiveToMysql {
  def main(args: Array[String]): Unit = {
    //直接通過spark集群中的hive client連接hive,不需要jdbc以及hive server
    val spark = SparkSession.builder().appName("hive to mysql").enableHiveSupport().getOrCreate()
    val resultDF = spark.sql("select * from default.customer")

    //一般中間寫的處理邏輯都是處理從hive讀取的數據,處理完成后寫入到mysql

    val mysqlConn = new Properties()
    mysqlConn.setProperty("user","root")
    mysqlConn.setProperty("password","wjt86912572")
    //通過jdbc寫入mysql
  resultDF.write.mode("append").jdbc("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8", "customer", mysqlConn)

    spark.stop()
  }

}

三、性能優化

3.1 內存中緩存數據

先啟動個spark-shell

spark-shell --master spark://bigdata121:7077

要在spark-shell中操作mysql,所以記得自己找個 mysql-connector的jar放到spark的jars目錄下

例子:

創建df,從mysql讀取表
scala> val mysqDF = spark.read.format("jdbc").option("url","jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","wjt86912572").option("driver","com.mysql.jdbc.Driver").option("dbtable","customer").load()
mysqDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> mysqDF.show
+---+------+--------------------+
| id|  name|            last_mod|
+---+------+--------------------+
|  1|  neil|2019-07-20 17:09:...|
|  2|  jack|2019-07-20 17:09:...|
|  3|martin|2019-07-20 17:09:...|
|  4|  tony|2019-07-20 17:09:...|
|  5|  eric|2019-07-20 17:09:...|
|  6|  king|2019-07-20 17:42:...|
|  7|   tao|2019-07-20 17:45:...|
+---+------+--------------------+

必須注冊成一張表,才可以緩存。
scala> mysqDF.registerTempTable("customer")
warning: there was one deprecation warning; re-run with -deprecation for details

標識這張表可以被緩存,但是現在數據并沒有直接緩存
scala> spark.sqlContext.cacheTable("customer")

第一次查詢表,從mysql讀取數據,并緩存到內存中
scala> spark.sql("select * from customer").show
+---+------+--------------------+
| id|  name|            last_mod|
+---+------+--------------------+
|  1|  neil|2019-07-20 17:09:...|
|  2|  jack|2019-07-20 17:09:...|
|  3|martin|2019-07-20 17:09:...|
|  4|  tony|2019-07-20 17:09:...|
|  5|  eric|2019-07-20 17:09:...|
|  6|  king|2019-07-20 17:42:...|
|  7|   tao|2019-07-20 17:45:...|
+---+------+--------------------+

這一次查詢從內存中返回
scala> spark.sql("select * from customer").show
+---+------+--------------------+
| id|  name|            last_mod|
+---+------+--------------------+
|  1|  neil|2019-07-20 17:09:...|
|  2|  jack|2019-07-20 17:09:...|
|  3|martin|2019-07-20 17:09:...|
|  4|  tony|2019-07-20 17:09:...|
|  5|  eric|2019-07-20 17:09:...|
|  6|  king|2019-07-20 17:42:...|
|  7|   tao|2019-07-20 17:45:...|
+---+------+--------------------+

清空緩存
scala> spark.sqlContext.clearCache

3.2 調優相關參數

將數據緩存到內存中的相關優化參數
?   spark.sql.inMemoryColumnarStorage.compressed
?   默認為 true
?   Spark SQL 將會基于統計信息自動地為每一列選擇一種壓縮編碼方式。

?   spark.sql.inMemoryColumnarStorage.batchSize
?   默認值:10000
?   緩存批處理大小。緩存數據時, 較大的批處理大小可以提高內存利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。

其他性能相關的配置選項(不過不推薦手動修改,可能在后續版本自動的自適應修改)
?   spark.sql.files.maxPartitionBytes
?   默認值:128 MB
?   讀取文件時單個分區可容納的最大字節數

?   spark.sql.files.openCostInBytes
?   默認值:4M
?   打開文件的估算成本, 按照同一時間能夠掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)。

spark.sql.autoBroadcastJoinThreshold
?   默認值:10M
?   用于配置一個表在執行 join 操作時能夠廣播給所有 worker 節點的最大字節大小。通過將這個值設置為 -1 可以禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。

spark.sql.shuffle.partitions
?   默認值:200
?   用于配置 join 或聚合操作混洗(shuffle)數據時使用的分區數。
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

巴林左旗| 右玉县| 改则县| 宁化县| 济南市| 景德镇市| 高淳县| 西乡县| 南岸区| 洪泽县| 崇州市| 天祝| 静海县| 洪湖市| 临夏市| 红桥区| 蓬莱市| 开封市| 巫溪县| 孝感市| 南昌市| 宁陵县| 鄢陵县| 北海市| 新邵县| 临安市| 油尖旺区| 凤翔县| 化州市| 七台河市| 榆社县| 大方县| 高唐县| 依安县| 祁连县| 红原县| 安新县| 商水县| 子洲县| 共和县| 北辰区|