91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark?Streaming編程初級源碼分析

發布時間:2023-04-20 17:16:23 來源:億速云 閱讀:106 作者:iii 欄目:開發技術

這篇“Spark Streaming編程初級源碼分析”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Spark Streaming編程初級源碼分析”文章吧。

寫在前面

  • Linux:CentOS7.5

  • Spark: spark-3.0.0-bin-hadoop3.2

  • Flume:Flume-1.9.0

  • IDE:IntelliJ IDEA2020.2.3

1. 安裝Flume

Flume是Cloudera提供的一個分布式、可靠、可用的系統,它能夠將不同數據源的海量日志數據進行高效收集、聚合、移動,最后存儲到一個中心化數據存儲系統中。Flume 的核心是把數據從數據源收集過來,再送到目的地。

或者也可以直接到本教程官網的“下載專區”中的“軟件”目錄中下載apache-flume-1.7.0-bin.tar.gz。

下載后,把Flume1.7.0安裝到Linux系統的“/usr/local/flume”目錄下,具體安裝和使用方法可以參考教程官網的“實驗指南”欄目中的“日志采集工具Flume的安裝與使用方法。

安裝命令

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
mv apache-flume-1.9.0-bin/ flume-1.9.0
sudo vi /etc/profile
export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin
source /etc/profile
mv flume-env.sh.template flume-env.sh
  • 查看版本號

bin/flume-ng version

Spark?Streaming編程初級源碼分析

2.使用Avro數據源測試Flume

題目描述

Avro可以發送一個給定的文件給Flume,Avro 源使用AVRO RPC機制。請對Flume的相關配置文件進行設置,從而可以實現如下功能:在一個終端中新建一個文件helloworld.txt(里面包含一行文本“Hello World”),在另外一個終端中啟動Flume以后,可以把helloworld.txt中的文本內容顯示出來。

Flume配置文件

al.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels= c1
a1.sources.r1.bind = 0.0.0.0
al.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
al.channels.c1.capacity = 1000
a1.channels.c1.transaction = 100
al.sources.r1.channels = c1
a1.sinks.k1.channel=c1

執行命令

  • 先進入到Flume安裝目錄,執行以下第一行命令;

  • 開始新的一個會話窗口,執行第二行命令寫入數據到指定的文件中

  • 查看上一步驟中指定的文件內容

./bin/flume-ng agent -c . -f ./conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
echo 'hello,world' >> ./log.00
bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F ./log.00

執行結果如下

Spark?Streaming編程初級源碼分析

3. 使用netcat數據源測試Flume

題目描述

請對Flume的相關配置文件進行設置,從而可以實現如下功能:在一個Linux終端(這里稱為“Flume終端”)中,啟動Flume,在另一個終端(這里稱為“Telnet終端”)中,輸入命令“telnet localhost 44444”,然后,在Telnet終端中輸入任何字符,讓這些字符可以順利地在Flume終端中顯示出來。

編寫Flume配置文件

al.sources = r1
a1.sinks = k1
a1.channels = c1
al.sources.r1.type = netcat
al.sources.r1.channels = c1
a1.sources.r1.bind = localhost
al.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
al.channels.c1.transaction = 100
al.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 執行以下命令

./bin/flume-ng agent -c . -f ./netcatExample.conf -n a1 -Dflume.root.logger=INFO,console
telnet localhost 44444
  • 會話窗口成功得到數據

Spark?Streaming編程初級源碼分析

4. 使用Flume作為Spark Streaming數據源

題目描述

Flume是非常流行的日志采集系統,可以作為Spark Streaming的高級數據源。請把Flume Source設置為netcat類型,從終端上不斷給Flume Source發送各種消息,Flume把消息匯集到Sink,這里把Sink類型設置為avro,由Sink把消息推送給Spark Streaming,由自己編寫的Spark Streaming應用程序對消息進行處理。

編寫Flume配置文件

al.sources = r1
a1.sinks = k1
a1.channels =  c1
al.sources.r1.type = netcat
al.sources.r1.bind = localhost
a1.sources.r1.port = 33333
a1.sinks.k1.type = avro
al.sinks.k1.hostname = localhost
a1.sinks.k1.port = 44444
a1.channels.c1.type = memory
al.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000000
al.sources.r1.channels = c1
a1.sinks.k1.channel = c1

主程序代碼

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.Milliseconds
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
object FlumeEventCount {
    def main(args: Array[String]): Unit = {
        if (args.length < 2) {
            System.err.println( "Usage: FlumeEventCount <host> <port>")
            System.exit(1)
        }
        StreamingExamples.setStreamingLogLevels()
        val Array(host, IntParam(port)) = args
        val batchInterval = Milliseconds(2000)
        val sc = new SparkConf()
          .setAppName("FlumeEventCount")
//          .setMaster("local[2]")
        val ssc = new StreamingContext(sc, batchInterval)
        val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
        stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
        ssc.start()
        ssc.awaitTermination()
    }
}

執行結果1

Spark?Streaming編程初級源碼分析

import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object StreamingExamples extends Logging {
    def setStreamingLogLevels(): Unit = {
        val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
        if (!log4jInitialized) {
            logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.")
            Logger.getRootLogger.setLevel(Level.WARN)
        }
    }
}

執行結果2

Spark?Streaming編程初級源碼分析

以上就是關于“Spark Streaming編程初級源碼分析”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

虎林市| 甘泉县| 伊吾县| 乐东| 余庆县| 南郑县| 通江县| 华安县| 云安县| 绥德县| 花莲市| 星座| 侯马市| 陕西省| 定兴县| 新密市| 西平县| 辽中县| 密山市| 达日县| 东乌| 永福县| 临清市| 鄂托克前旗| 酒泉市| 满洲里市| 太保市| 东乡县| 二手房| 沁阳市| 富宁县| 于都县| 梓潼县| 基隆市| 宜兴市| 长沙县| 仁寿县| 大港区| 巴彦县| 土默特右旗| 静乐县|