91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark 與flume 1.6.0的示例代碼

發布時間:2021-12-16 10:47:11 來源:億速云 閱讀:147 作者:小新 欄目:開發技術

小編給大家分享一下spark 與flume 1.6.0的示例代碼,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

package hgs.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.HashPartitioner
/*	pom.xml中加入如下配置
 * 	<dependency>
 
    		<groupId>org.apache.spark</groupId>
    		<artifactId>spark-streaming-flume_2.11</artifactId>
   			<version>2.1.0</version>
		</dependency>
		* 
		*/
/*flume的conf文件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/logs
a1.sources.r1.fileHeader=true
a1.sinks.k1.type=avro
a1.sinks.k1.hostname= 192.168.1.9
a1.sinks.k1.port= 8888
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
#the command to start a agent
#bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
*/
object SparkStreamingFlumePush {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]");
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(5))
    ssc.checkpoint("d:\\checkpoint")
    val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
    //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一
    //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二
    iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三
    }
    //總共有兩種獲取數據的方式,push和poll,這種是push即flume將數據推送給spark 該出的ip、port是spark的ip地址和port
    val rds = FlumeUtils.createStream(ssc, "192.168.1.9", 8888, StorageLevel.MEMORY_ONLY)
    val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" "))
    .map(x=>(x,1))
    .updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true)
    
    result.print()
    
    ssc.start()
    ssc.awaitTermination()
    
    
  }
}
package hgs.spark.streaming
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtils
import java.net.InetAddress
import java.net.InetSocketAddress
import org.apache.spark.storage.StorageLevel
import org.apache.spark.HashPartitioner
//spark支持1.6.0的flume版本
/*	pom.xml中加入如下配置
 * 	<dependency>
 
    		<groupId>org.apache.spark</groupId>
    		<artifactId>spark-streaming-flume_2.11</artifactId>
   			<version>2.1.0</version>
		</dependency>
		* 
		*/
/*
 * flume配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true
a1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.6.129
a1.sinks.k1.port = 8888
a1.channels.c1.type=memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel = c1
#the command to start a agent
#bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
*/
//同時需要如下三個包 將三個包放到flume的classpath下面
/* groupId = org.apache.spark
 artifactId = spark-streaming-flume-sink_2.11
 version = 2.1.0
 
 groupId = org.scala-lang
 artifactId = scala-library
 version = 2.11.7
 
 groupId = org.apache.commons
 artifactId = commons-lang3
 version = 3.5*/
 
object SparkStreamingFlumePoll {
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]");
   val sc = new SparkContext(conf)
   val ssc = new StreamingContext(sc,Seconds(5))
   ssc.checkpoint("d:\\checkpoint")
   val ipSeq =  Seq(new InetSocketAddress("192.168.6.129",8888))
   //這種方式通過spark從flume拉取數據
   val rds = FlumeUtils.createPollingStream(ssc, ipSeq, StorageLevel.MEMORY_AND_DISK)
   
   val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
    //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一
    //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二
    iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三
    }
   val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" "))
    .map(x=>(x,1))
    .updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true)
    
    result.print()
    
    ssc.start()
    ssc.awaitTermination()
  }
}
//遇到的錯誤 scala-library包在flume 的lib下面本來就有,包重復導致的沖突,刪除一個
/*18 Oct 2018 20:58:32,123 WARN  [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error while processing transaction.
java.lang.IllegalStateException: begin() called when transaction is OPEN!
	at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
	at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
18 Oct 2018 20:58:32,128 WARN  [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:59)  - Spark was unable to successfully process the events. Transaction is being rolled back.
18 Oct 2018 20:58:32,128 WARN  [New I/O  worker #1] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:59)  - Received an error batch - no events were received from channel! */

看完了這篇文章,相信你對“spark 與flume 1.6.0的示例代碼”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

繁峙县| 大余县| 尼勒克县| 安岳县| 志丹县| 南投市| 涿鹿县| 油尖旺区| 铅山县| 霍邱县| 西华县| 株洲县| 龙口市| 柳河县| 宁城县| 常熟市| 牡丹江市| 郎溪县| 章丘市| 台山市| 南阳市| 舒城县| 建昌县| 蒙阴县| 邮箱| 岳普湖县| 合山市| 遂宁市| 三门县| 南华县| 东乌珠穆沁旗| 长丰县| 山东省| 涡阳县| 买车| 高州市| 新津县| 尉犁县| 石景山区| 界首市| 同仁县|