您好,登錄后才能下訂單哦!
這篇文章主要講解了“SparkStreaming的實現和使用方法”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“SparkStreaming的實現和使用方法”吧!
生產中使用多的是一個文件中有很多域名,另一個中是黑名單,要進行剔除 數據一:日志信息 DStream domain,traffic xinlang.com xinlang.com baidu.com 數據二:已有的文件 黑名單 RDD domain baidu.com
package sparkstreaming02 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ListBuffer object Demo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Demo1").setMaster("local[2]") val sc = new SparkContext(conf) val input1 = new ListBuffer[(String,Long)] input1.append(("www.xinlang.com", 8888)) input1.append(("www.xinalng.com", 9999)) input1.append(("www.baidu.com", 7777)) val data1 = sc.parallelize(input1) //進行join一定要是key,value形式的 val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = sc.parallelize(input2) data1.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true }).map(x => (x._1,x._2._1)) .collect().foreach(println) } }
package sparkstreaming02 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable.ListBuffer object Streaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Streaming").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(10)) val lines = ssc.socketTextStream("s201",9999) // 數據二: rdd val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = ssc.sparkContext.parallelize(input2) lines.map(x=>(x.split(",")(0), x)).transform( rdd => { rdd.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true //注意 join之后過濾 }).map(x => (x._1,x._2._1)) } ).print() ssc.start() ssc.awaitTermination() } }
、
connect 在Driver端創建,record在executor,發過去序列化錯誤
解決:第一種把connect放到executor端 這樣弊端是每條記錄會生成一個connect太耗費資源 words.foreachRDD { rdd => rdd.foreach { record => val connection = createConnection() // executed at the driver val word = record._1 val count = record._2.toInt val sql = s"insert into wc (wc,count) values($word,$count)" connection.createStatement().execute(sql) }
package sparkstreaming02 import java.sql.DriverManager import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object MysqlStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("MysqlStreaming") val ssc = new StreamingContext(conf,Seconds(1)) val lines = ssc.socketTextStream("s201",9999) val words = lines.flatMap(x => x.split(",")).map((_,1)).reduceByKey(_+_) // words.foreachRDD { rdd => // val connection = createConnection() // executed at the driver // rdd.foreach { record => // val word = record._1 // val count = record._2 // val sql = s"insert into wc (word,count) values($word,$count)" // connection.createStatement().execute(sql) // } // } // words.foreachRDD { rdd => // rdd.foreach { record => // val connection = createConnection() // executed at the driver // val word = record._1 // val count = record._2.toInt // val sql = s"insert into wc (wc,count) values($word,$count)" // connection.createStatement().execute(sql) // } // } //最終的寫法 words.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createConnection() partitionOfRecords.foreach( record =>{ val word = record._1 val count = record._2 val sql = s"insert into wc (wc,count) values('$word',$count)" connection.createStatement().execute(sql)} ) } } ssc.start() ssc.awaitTermination() } def createConnection() = { Class.forName("com.mysql.cj.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://localhost:3306/hive?serverTimezone=UTC&useSSL=false","root","123456") } }
錯誤,插入數據庫時,你要插入字符串要用'' 例如: val sql = s"insert into wc (wc,count) values($word,$count)" word是字符串,你要不加雙引號就報這個錯誤 正確 val sql = s"insert into wc (wc,count) values('$word',$count)"
感謝各位的閱讀,以上就是“SparkStreaming的實現和使用方法”的內容了,經過本文的學習后,相信大家對SparkStreaming的實現和使用方法這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。