您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關如何分析Spark SQL中的Parquet,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
Parquet是一種列式存儲格式,很多種處理引擎都支持這種存儲格式,也是sparksql的默認存儲格式。Spark SQL支持靈活的讀和寫Parquet文件,并且對parquet文件的schema可以自動解析。當Spark SQL需要寫成Parquet文件時,處于兼容的原因所有的列都被自動轉化為了nullable。
讀寫Parquet文件
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
分區發現
分區表時很多系統支持的,比如hive,對于一個分區表,往往是采用表中的某一或多個列去作為分區的依據,分區是以文件目錄的形式體現。所有內置的文件源(Text/CSV/JSON/ORC/Parquet)都支持自動的發現和推測分區信息。例如,我們想取兩個分區列,gender和country,先按照性別分區,再按照國家分區:
path└── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
SparkSession.read.parquet 或者 SparkSession.read.load讀取的目錄為path/to/table的時候,會自動從路徑下抽取分區信息。返回DataFrame的表結構為:
root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)
細細分析一下你也會發現分區列的數據類型也是自動推斷的。當前支持的數據類型有,數字類型,date,timestamp和string類型。有時候用戶可能不希望自動推斷分區列的類型,這時候只需要將spark.sql.sources.partitionColumnTypeInference.enabled配置為false即可。如果分區列的類型推斷這個參數設置為了false,那么分區列的類型會被認為是string。
從spark 1.6開始,分區發現默認情況只會發現給定路徑下的分區。比如,上面的分區表,假如你講路徑path/to/table/gender=male傳遞給SparkSession.read.parquet 或者 SparkSession.read.load 那么gender不會被認為是分區列。如果想檢測到該分區,傳給spark的路徑應該是其父路徑也即是path/to/table/,這樣gender就會被認為是分區列。
schema合并
跟protocol buffer,avro,thrift一樣,parquet也支持schema演變升級。用戶可以在剛開始的時候創建簡單的schema,然后根據需要隨時擴展新的列。
Parquet 數據源支持自動檢測新作列并且會合并schema。
由于合并schema是一個相當耗費性能的操作,而且很多情況下都是不必要的,所以從spark 1.5開始就默認關閉掉該功能。有兩種配置開啟方式:
通過數據源option設置mergeSchema為true。
在全局sql配置中設置spark.sql.parquet.mergeSchema 為true.
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
hive metastore Parquet表轉換
當讀寫hive metastore parquet格式表的時候,Spark SQL為了較好的性能會使用自己默認的parquet格式而不是采用hive SerDe。該行為是通過參數spark.sql.hive.convertMetastoreParquet空值,默認是true。
Hive和parquet兼容性
從表schema處理角度講hive和parquet有兩個主要的區別
hive是大小寫敏感的,但是parquet不是。
hive會講所有列視為nullable,但是nullability在parquet里有獨特的意義。
由于上面的原因,在將hive metastore parquet轉化為spark parquet表的時候,需要處理兼容一下hive的schema和parquet的schema。兼容處理的原則是:
有相同名字的字段必須要有相同的數據類型,忽略nullability。兼容處理的字段應該保持parquet側的數據類型,這樣就可以處理到nullability類型了。
兼容處理的schema應直接包含在hive元數據里的schema信息:
任何僅僅出現在parquet schema的字段將會被刪除
任何僅僅出現在hive 元數據里的字段將會被視為nullable。
元數據刷新
Spark SQL為了更好的性能會緩存parquet的元數據。當spark 讀取hive表的時候,schema一旦從hive轉化為spark sql的,就會被spark sql緩存,如果此時表的schema被hive或者其他外部工具更新,必須要手動的去刷新元數據,才能保證元數據的一致性。
spark.catalog.refreshTable("my_table")
配置
parquet的相關的參數可以通過setconf或者set key=value的形式配置。
spark.sql.parquet.binaryAsString 默認值是false。一些parquet生產系統,尤其是impala,hive和老版本的spark sql,不區分binary和string類型。該參數告訴spark 講binary數據當作字符串處理。
spark.sql.parquet.int96AsTimestamp 默認是true。有些parquet生產系統,尤其是parquet和hive,將timestamp翻譯成INT96.該參數會提示Spark SQL講INT96翻譯成timestamp。
spark.sql.parquet.compression.codec 默認是snappy。當寫parquet文件的時候設置壓縮格式。如果在option或者properties里配置了compression或者parquet.compression優先級依次是:compression,parquet.compression,spark.sql.parquet.compression.codec。支持的配置類型有:none,uncompressed,snappy,gzip,lzo,brotli,lz4,zstd。在hadoop2.9.0之前,zstd需要安裝ZstandardCodec,brotli需要安裝BrotliCodec。
spark.sql.parquet.filterPushdown 默認是true。設置為true代表開啟parquet下推執行優化。
spark.sql.hive.convertMetastoreParquet 默認是true。假如設置為false,spark sql會讀取hive parquet表的時候使用Hive SerDe,替代內置的。
spark.sql.parquet.mergeSchema 默認是false。當設置為true的時候,parquet數據源會合并讀取所有的parquet文件的schema,否則會從summary文件或者假如沒有summary文件的話隨機的選一些數據文件來合并schema。
spark.sql.parquet.writeLegacyFormat 默認是false。如果設置為true 數據會以spark 1.4和更早的版本的格式寫入。比如,decimal類型的值會被以apache parquet的fixed-length byte array格式寫出,該格式是其他系統例如hive,impala等使用的。如果是false,會使用parquet的新版格式。例如,decimals會以int-based格式寫出。如果spark sql要以parquet輸出并且結果會被不支持新格式的其他系統使用的話,需要設置為true。
看完上述內容,你們對如何分析Spark SQL中的Parquet有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。