您好,登錄后才能下訂單哦!
這篇“spark作業怎么實現”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“spark作業怎么實現”文章吧。
將sample.log的數據發送到Kafka中,經過Spark Streaming處理,將數據格式變為以下形式: commandid | houseid | gathertime | srcip | destip |srcport| destport | domainname | proxytype | proxyip | proxytype | title | content | url | logid 在發送到kafka的另一個隊列中 要求: 1、sample.log => 讀文件,將數據發送到kafka隊列中 2、從kafka隊列中獲取數據(0.10 接口不管理offset),變更數據格式 3、處理后的數據在發送到kafka另一個隊列中 分析 1 使用課程中的redis工具類管理offset 2 讀取日志數據發送數據到topic1 3 消費主題,將數據的分割方式修改為豎線分割,再次發送到topic2
1.OffsetsWithRedisUtils
package home.one import java.util import org.apache.kafka.common.TopicPartition import org.apache.spark.streaming.kafka010.OffsetRange import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} import scala.collection.mutable object OffsetsWithRedisUtils { // 定義Redis參數 private val redisHost = "linux123" private val redisPort = 6379 // 獲取Redis的連接 private val config = new JedisPoolConfig // 最大空閑數 config.setMaxIdle(5) // 最大連接數 config.setMaxTotal(10) private val pool = new JedisPool(config, redisHost, redisPort, 10000) private def getRedisConnection: Jedis = pool.getResource private val topicPrefix = "kafka:topic" // Key:kafka:topic:TopicName:groupid private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid" // 根據 key 獲取offsets def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = { val jedis: Jedis = getRedisConnection val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic => val key = getKey(topic, groupId) import scala.collection.JavaConverters._ // 將獲取到的redis數據由Java的map轉換為scala的map,數據格式為{key:[{partition,offset}]} jedis.hgetAll(key) .asScala .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong } } // 歸還資源 jedis.close() offsets.flatten.toMap } // 將offsets保存到Redis中 def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = { // 獲取連接 val jedis: Jedis = getRedisConnection // 組織數據 offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))} .groupBy(_._1) .foreach{case (topic, buffer) => val key: String = getKey(topic, groupId) import scala.collection.JavaConverters._ // 同樣將scala的map轉換為Java的map存入redis中 val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava // 保存數據 jedis.hmset(key, maps) } jedis.close() } }
KafkaProducer
package home.one import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object KafkaProducer { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]") val sc = new SparkContext(conf) // 讀取sample.log文件數據 val lines: RDD[String] = sc.textFile("data/sample.log") // 定義 kafka producer參數 val prop = new Properties() // kafka的訪問地址 prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") // key和value的序列化方式 prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) // 將讀取到的數據發送到mytopic1 lines.foreachPartition{iter => // 初始化KafkaProducer val producer = new KafkaProducer[String, String](prop) iter.foreach{line => // 封裝數據 val record = new ProducerRecord[String, String]("mytopic1", line) // 發送數據 producer.send(record) } producer.close() } } }
3.HomeOne
package home.one import java.util.Properties import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} object HomeOne { val log = Logger.getLogger(this.getClass) def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) // 需要消費的topic val topics: Array[String] = Array("mytopic1") val groupid = "mygroup1" // 定義kafka相關參數 val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid) // 從Redis獲取offset val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid) // 創建DStream val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, // 從kafka中讀取數據 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets) ) // 轉換后的數據發送到另一個topic dstream.foreachRDD { rdd => if (!rdd.isEmpty) { // 獲取消費偏移量 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 處理數據發送到topic2 rdd.foreachPartition(process) // 將offset保存到Redis OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid) } } // 啟動作業 ssc.start() // 持續執行 ssc.awaitTermination() } // 將處理后的數據發送到topic2 def process(iter: Iterator[ConsumerRecord[String, String]]) = { iter.map(line => parse(line.value)) .filter(!_.isEmpty) .foreach(line => sendMsg2Topic(line, "mytopic2")) } // 調用kafka生產者發送消息 def sendMsg2Topic(msg: String, topic: String): Unit = { val producer = new KafkaProducer[String, String](getKafkaProducerParameters()) val record = new ProducerRecord[String, String](topic, msg) producer.send(record) } // 修改數據格式,將逗號分隔變成豎線分割 def parse(text: String): String = { try { val arr = text.replace("<<<!>>>", "").split(",") if (arr.length != 15) return "" arr.mkString("|") } catch { case e: Exception => log.error("解析數據出錯!", e) "" } } // 定義kafka消費者的配置信息 def getKafkaConsumerParameters(groupid: String): Map[String, Object] = { Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG -> groupid, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ) } // 定義生產者的kafka配置 def getKafkaProducerParameters(): Properties = { val prop = new Properties() prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop } }
/* 假設機場的數據如下: 1, "SFO" 2, "ORD" 3, "DFW" 機場兩兩之間的航線及距離如下: 1, 2,1800 2, 3, 800 3, 1, 1400 用 GraphX 完成以下需求: 求所有的頂點 求所有的邊 求所有的triplets 求頂點數 求邊數 求機場距離大于1000的有幾個,有哪些 按所有機場之間的距離排序(降序),輸出結果 */
代碼:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.graphx.{Edge, Graph, VertexId} import org.apache.spark.rdd.RDD object TwoHome { def main(args: Array[String]): Unit = { // 初始化 val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("warn") //初始化數據 val vertexArray: Array[(Long, String)] = Array((1L, "SFO"), (2L, "ORD"), (3L, "DFW")) val edgeArray: Array[Edge[Int]] = Array( Edge(1L, 2L, 1800), Edge(2L, 3L, 800), Edge(3L, 1L, 1400) ) //構造vertexRDD和edgeRDD val vertexRDD: RDD[(VertexId, String)] = sc.makeRDD(vertexArray) val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray) //構造圖 val graph: Graph[String, Int] = Graph(vertexRDD, edgeRDD) //所有的頂點 println("所有頂點:") graph.vertices.foreach(println) //所有的邊 println("所有邊:") graph.edges.foreach(println) //所有的triplets println("所有三元組信息:") graph.triplets.foreach(println) //求頂點數 val vertexCnt = graph.vertices.count() println(s"總頂點數:$vertexCnt") //求邊數 val edgeCnt = graph.edges.count() println(s"總邊數:$edgeCnt") //機場距離大于1000的 println("機場距離大于1000的邊信息:") graph.edges.filter(_.attr > 1000).foreach(println) //按所有機場之間的距離排序(降序) println("降序排列所有機場之間距離") graph.edges.sortBy(-_.attr).collect().foreach(println) } }
運行結果
以上就是關于“spark作業怎么實現”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。