91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何實現Spark Streaming和Kafka整合

發布時間:2021-12-15 10:02:34 來源:億速云 閱讀:134 作者:柒染 欄目:云計算

本篇文章為大家展示了如何實現Spark Streaming和Kafka整合,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

最近完成了Spark Streaming和Kafka的整合工作,耗時雖然不長,但是當中還是遇到了不少的坑,記錄下來,大家方便繞行。

先說一下環境:

Spark 2.0.0    kafka_2.11-0.10.0.0

之前的項目當中,已經在pom當中添加了需要的Spark Streaming的依賴,這次只需要添加Spark Streaming Kafka的以來就行了,問題來了。首先是我之前添加的Spark Streaming的依賴:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.0.0</version>
    </dependency>

然后是找到的spark streaming對kafka的支持依賴:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.11</artifactId>
    <version>1.6.2</version>
</dependency>

請注意2個version部分,好像差的有點多。不管了,照著例子寫寫看,果然報了各種class not found的錯誤。基本可以判斷是版本差異造成的問題。

可是,在http://mvnrepository.com上找不到更高版本的依賴怎么辦呢?

考慮了一下,只有一個辦法了,下載spark源碼,自行編譯打包需要的jar包。

在github上找到spark項目,clone下來,懶病又犯了,也沒仔細看當中的說明,直接就clean compile等等。結果又是各種報錯。好吧,好好看看吧,github上給了個地址:http://spark.apache.org/docs/latest/building-spark.html,照著做就沒問題了。

然后把項目當中pom里面對streaming kafka的依賴刪掉,引入我們自己生成的jar包:

spark-streaming-kafka-0-10_2.11-2.1.0-SNAPSHOT.jar

然后貼上代碼:

    val conf = new SparkConf().setAppName("kafkastream").setMaster("spark://master:7077").
      set("spark.driver.host", "192.168.1.142").
      setJars(List("/src/git/msgstream/out/artifacts/msgstream_jar/msgstream.jar",
        "/src/git/msgstream/lib/kafka-clients-0.10.0.0.jar",
        "/src/git/msgstream/lib/kafka_2.11-0.10.0.0.jar",
        "/src/git/msgstream/lib/spark-streaming-kafka-0-10_2.11-2.1.0-SNAPSHOT.jar"))
    val ssc = new StreamingContext(conf, Seconds(2))

    val topics = List("woozoom")
    val kafkaParams = Map(("bootstrap.servers", "master:9092,slave01:9092,slave02:9092"),
      ("group.id", "sparkstreaming"), ("key.deserializer", classOf[StringDeserializer]),
      ("value.deserializer", classOf[StringDeserializer]))
    val preferredHosts = LocationStrategies.PreferConsistent
    val offsets = Map(new TopicPartition("woozoom", 0) -> 2L)

    val lines = KafkaUtils.createDirectStream[String, String](
      ssc,
      preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))

    lines.foreachRDD(rdd => {
      rdd.foreach(x => {
        println(x)
      })
    })

    ssc.start()
    ssc.awaitTermination()

上面標紅的部分,是需要注意的,而這些本來我也是不會寫的,后來去到spark源碼找到test代碼

/src/git/spark/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala

測試,通過!!!

上述內容就是如何實現Spark Streaming和Kafka整合,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

太仆寺旗| 涟源市| 常州市| 兰州市| 涿鹿县| 葫芦岛市| 顺昌县| 建昌县| 邯郸市| 会东县| 大方县| 汉川市| 吉林省| 囊谦县| 南澳县| 闻喜县| 嘉峪关市| 汉川市| 耿马| 肇州县| 惠水县| 金湖县| 武汉市| 兴海县| 思南县| 锦屏县| 大关县| 牟定县| 崇信县| 曲松县| 方城县| 固始县| 鸡泽县| 大埔区| 天等县| 万安县| 高阳县| 五指山市| 汉寿县| 新余市| 成安县|