您好,登錄后才能下訂單哦!
本篇文章為大家展示了Pyspark處理數據中帶有列分隔符的數據集,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
下面主要內容是處理在數據集中存在列分隔符或分隔符的特殊場景。對于Pyspark開發人員來說,處理這種類型的數據集有時是一件令人頭疼的事情,但無論如何都必須處理它。
數據集基本上如下所示:
#first line is the headerNAME|AGE|DEP
Vivek|Chaudhary|32|BSC
John|Morgan|30|BE
Ashwin|Rao|30|BE
數據集包含三個列" Name ", " AGE ", " DEP ",用分隔符" | "分隔。如果我們關注數據集,它也包含' | '列名。
讓我們看看如何進行下一步:
步驟1。使用spark的Read .csv()方法讀取數據集:
#create spark session
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName(‘delimit’).getOrCreate()
上面的命令幫助我們連接到spark環境,并讓我們使用spark.read.csv()讀取數據集
#create
df=spark.read.option(‘delimiter’,’|’).csv(r’<path>/delimit_data.txt’,inferSchema=True,header=True)
df.show()
從文件中讀取數據并將數據放入內存后我們發現,最后一列數據在哪里,列年齡必須有一個整數數據類型,但是我們看到了一些其他的東西。這不是我們所期望的。一團糟,完全不匹配,不是嗎?答案是肯定的,確實一團糟。
現在,讓我們來學習如何解決這個問題。
步驟2。再次讀取數據,但這次使用Read .text()方法:
df=spark.read.text(r’/Python_Pyspark_Corp_Training/delimit_data.txt’)
df.show(truncate=0)
#extract first row as this is our header
head=df.first()[0]
schema=[‘fname’,’lname’,’age’,’dep’]
print(schema)
Output: ['fname', 'lname', 'age', 'dep']
下一步是根據列分隔符對數據集進行分割:
#filter the header, separate the columns and apply the schema
df_new=df.filter(df[‘value’]!=head).rdd.map(lambda x:x[0].split(‘|’)).toDF(schema)
df_new.show()
現在,我們已經成功分離出列。
我們已經成功地將“|”分隔的列(“name”)數據分成兩列。現在,數據更加干凈,可以輕松地使用。
接下來,連接列“fname”和“lname”:
from pyspark.sql.functions import concat, col, lit
df1=df_new.withColumn(‘fullname’,concat(col(‘fname’),lit(“|”),col(‘lname’)))
df1.show()
要驗證數據轉換,我們將把轉換后的數據集寫入CSV文件,然后使用read. CSV()方法讀取它。
df1.write.option(‘sep’,’|’).mode(‘overwrite’).option(‘header’,’true’).csv(r’<file_path>\cust_sep.csv’)
下一步是數據驗證:
df=spark.read.option(‘delimiter’,’|’).csv(r<filepath>,inferSchema=True,header=True)
df.show()
現在的數據看起來像我們想要的那樣。
上述內容就是Pyspark處理數據中帶有列分隔符的數據集,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。