您好,登錄后才能下訂單哦!
本篇文章為大家展示了spark 流式去重的示例分析,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
大數據去重本身很蛋疼,針對個別數據去重更是不可理喻但是spark的Structured Streaming就很容易能實現這個功能。
數據從采集到最終處理結束是會存在一條數據在某一個點被重復接收處理的情況。如 kafka支持的是至少一次寫語義,也即是當寫數據到kafka的時候,有些記錄可能重復,例如如果消息已經被broker接收并寫入文件但是并沒有應答,這時生產者向kafka重發一個消息,就可能重復。由于kafka的至少一次的寫語義,structured streaming不能避免這種類型數據重復。所以一旦寫入成功,可以假設structured Streaming的查詢輸出是以至少一次語義寫入kafka的。一個可行去除重復記錄的解決方案是數據中引入一個primary(unique)key,這樣就可以在讀取數據的時候實行去重。
structured streaming是可以使用事件中的唯一標識符對數據流中的記錄進行重復數據刪除。這與使用唯一標識符列的靜態重復數據刪除完全相同。該查詢將存儲來自先前記錄的一定量的數據,以便可以過濾重復的記錄。與聚合類似,可以使用帶有或不帶有watermark 的重復數據刪除功能。
A),帶watermark:如果重復記錄可能到達的時間有上限,則可以在事件時間列上定義watermark,并使用guid和事件時間列進行重復數據刪除。
B),不帶watermark:由于重復記錄可能到達時間沒有界限,所以查詢將來自所有過去記錄的數據存儲為狀態。
源代碼,已測試通過~
package bigdata.spark.StructuredStreaming.KafkaSourceOperator
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.get_json_object
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
object KafkaDropDuplicate {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
.set("yarn.resourcemanager.hostname", "mt-mdh.local")
.set("spark.executor.instances","2")
.set("spark.default.parallelism","4")
.set("spark.sql.shuffle.partitions","4")
.setJars(List("/opt/sparkjar/bigdata.jar"
,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
,"/opt/jars/kafka-clients-0.10.2.2.jar"
,"/opt/jars/kafka_2.11-0.10.2.2.jar"
,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))
val spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.config(sparkConf)
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","mt-mdh.local:9093")
.option("subscribe", "jsontest")
.load()
val words = df.selectExpr("CAST(value AS STRING)")
val fruit = words.select(
get_json_object($"value", "$.time").alias("timestamp").cast("long")
, get_json_object($"value", "$.fruit").alias("fruit"))
val fruitCast = fruit
.select(fruit("timestamp")
.cast("timestamp"),fruit("fruit"))
.withWatermark("timestamp", "10 Seconds")
.dropDuplicates("fruit")
.groupBy("fruit").count()
fruitCast.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.trigger(Trigger.ProcessingTime(5000))
.option("truncate","false")
.start()
.awaitTermination()
}
}
上述內容就是spark 流式去重的示例分析,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。