您好,登錄后才能下訂單哦!
本篇內容主要講解“Spark Streaming編程技巧是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Spark Streaming編程技巧是什么”吧!
#Spark Streaming 編程指南#
##概述## Spark Streaming 是核心Spark API的一個擴展,他可以實現高吞吐量,和容錯的實時數據流處理。
他可以接受許多數據源例如Kafka、Flume、Twitter、ZeroMQ或者普通的老的TCP套接字的數據。數據可以使用擁有高級函數例如map、reduce、join、和window的復雜算法表達式進行處理。最終,處理的數據可以被推送到文件系統、數據庫和在線儀表盤。實際上,你可以在數據流上應用Spark內置的機器學習算法和圖處理算法。
<img src="https://cache.yisu.com/upload/information/20210522/355/658120.png" />
在內部,它的工作原理如下。Spark Streaming接收實時輸入數據流,并且將數據分割成batches,which are then processed by the Spark engine to generate the final stream of results in batches.
<img src="https://cache.yisu.com/upload/information/20210522/355/658121.png" />
Spark Streaming 提供一個高級的抽象叫做離散流,或者DStream。它表示一個連續不斷的數據流。DStreams既可以通過來自數據源例如Kafka、Flume的數據數據流創建,也可以通過在其他DStreams上應用高級操作創建。在內部,一個DStream被表示成一個RDDs的序列。
本指南向你展示如何使用DStreams開始編寫Spark Streaming程序。你可以使用Scala或Java編寫Spark Streaming程序,本指南中兩者都提供。你將會發現tabs貫穿全文,可以讓你在Scala和Java代碼片段中選擇。
##一個簡單的例子## 在我們進入如何編寫你自己的Spark Streaming程序的細節之前,讓我們快速的看下一個簡單的Spark Streaming程序是怎樣的。比如說,我們想計算一個通過監聽TCP套接字得到的數據服務器上的文本數據中單詞的總數。所有你需要做的如下:
首先,我們創建一個JavaStreamingContext對象,他是所有Streaming功能的一個切入點。除了Spark的配置,we specify that any DStream would be processed in 1 second batches.
import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import scala.Tuple2; // Create a StreamingContext with a local master JavaStreamingContext jssc = new JavaStreamingContext("local[2]", "JavaNetworkWordCount", new Duration(1000))
使用這個context,我們通過指定IP地址和數據服務器的端口來創建一個新的DStream。
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
這個DStream lines表示數據的流將會從這個數據服務器接收。流中的每一條記錄都是一行文本。然后,我們通過空格將行分割成單詞。
// Split each line into words JavaDStream<String> words = lines.flatMap( new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); } });
flatMap是一個DStream操作,它通過使源DStream中的每一條記錄生成許多新的記錄而創建一個新的DStream。在這個例子中,每一行將會被分割成多個words,words流被表示成words DStream。注意,我們定義使用FlatMapFunction對象轉換。正如我們一直在探索,在Java API中有許多這樣的轉換類來幫助定義DStream轉換。
接下倆,我們想要計算這些words的和:
// Count each word in each batch JavaPairDStream<String, Integer> pairs = words.map( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); wordCounts.print(); // Print a few of the counts to the console
使用一個PairFunction,words DStream被進一步mapped(一對一轉換)成一個DStream對(word,1)。然后,使用Function2對象, it is reduced to get the frequency of words in each batch of data。最后,wordCounts.print()將會每秒打印一些生成的和。
Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call
jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate
完整的代碼可以再Spark Streaming example JavaNetworkWordCount找到。
如果你已經下載并且構建了Spark,你可以像下面這樣運行這個例子。你需要首先運行Netcat(一個可以再大多數Unix-like系統上找到的小工具)作為一個數據服務器,通過:
$ nc -lk 9999
然后,在一個不同的終端下,亦可以啟動這個例子,通過:
$ ./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999
然后,在運行netcat服務的終端中輸入的每一行將會被求和并且每秒打印在屏幕上。他看起來像這樣:
<pre> # TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world ... # TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount $ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ... </pre>
你也可以在Spark shell直接使用Spark Streaming:
$ bin/spark-shell
... 并且通過封裝已存在的交互式shell SparkContext對象sc來創建你的StreamingContext:
val ssc = new StreamingContext(sc, Seconds(1))
When working with the shell, you may also need to send a ^D to your netcat session to force the pipeline to print the word counts to the console at the sink.
##基礎知識## 現在,我們move beyond the simple example,我們詳細闡述編寫一個streaming應用程序你需要了解的Spark Streaming的基礎知識。
###接入### 要編寫你自己的Spark Streaming程序,你將需要添加下面的依賴到你的SBT或者Maven項目中:
groupId = org.apache.spark artifactId = spark-streaming_2.10 version = 1.0.2
對于從像Kafka和Flume這樣的數據源獲取數據的功能,現在已經出現在Spark Streaming核心API里。你將需要添加相應的attiface spark-streaming-xyz_2.10到依賴。例如,下面是一些常見的:
<pre> Source Artifact Kafka spark-streaming-kafka_2.10 Flume spark-streaming-flume_2.10 Twitter spark-streaming-twitter_2.10 ZeroMQ spark-streaming-zeromq_2.10 MQTT spark-streaming-mqtt_2.10 </pre>
罪行的列表,請參考Apache repository獲得所有支持的源和artifacts的列表。
###初始化### 在Java中,要初始化一個Spark Streaming程序,需要創建一個JavaStreamingContext對象,他是整個Spark Streaming 功能的切入點。一個JavaStreamingContext對象可以被創建,使用:
new JavaStreamingContext(master, appName, batchInterval, [sparkHome], [jars])
master參數是一個標準的Spark集群URL,并且可以是“local”作為本地測試。appName是你的程序的名字,它將會在你的集群的Web UI中顯示。 batchInterval是batches的大小,就像之前解釋的。最后,如果運行為分布式模式,需要最后兩個參數來部署你的代碼到一個集群上,就像Spark programming guide描述的那樣。此外,基本的SparkContext可以如同ssc.sparkContext這樣訪問。
batch internal的設置必須根據你的應用程序的延遲要求和可用的集群資源。查看Performance Tuning獲得更對詳細信息。
###DStreams### Discretized Stream或者說DStream,是Spark Streaming提供的基本的抽象。它表示連續不斷的數據流,或者來自數據源的輸入數據流,或者通過轉換輸入流生成的經過處理的數據流。在內部,它通過一個連續不斷的RDDs的序列表示,他是Spark的一個不可變得抽象,分布式數據器。Each RDD in a DStream contains data from a certain interval,就像下面的圖表中展示的:
<img src="https://cache.yisu.com/upload/information/20210522/355/658122.png"/>
應用在一個DStream上的任何操作轉換成在基礎的RDDs上面的操作。例如, in the earlier example of converting a stream of lines to words, the flatmap operation is applied on each RDD in the lines DStream to generate the RDDs of the words DStream.下面的圖表展示了這個:
<img src="https://cache.yisu.com/upload/information/20210522/355/658124.png" />
這些基礎的RDD轉換是通過Spark引擎計算的。DStream操作隱藏了大多數的細節并提供開發者方便的高級API。這些操作在后面的章節中有詳細討論。
###輸入源### 我們已經在[ quick example]( quick example)看了ssc.socketTextStream(...),它通過一個TCP套接字連接接受文本數據創建了一個DStream。除了套接字,核心Spark Streaming API提供了創建DStream通過文件 ,和將Akka actors作為輸入源。
特別的,對于文件,DStream可以這樣創建:
jssc.fileStream(dataDirectory);
Spark Streaming將會監視dataDirectory目錄下的任何Hadoop兼容的文件系統,并且處理這個目錄下創建的任何文件。
注意:
文件必須有統一的格式
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved the files must not be changed.
For more details on streams from files, Akka actors and sockets, see the API documentations of the relevant functions in StreamingContext for Scala and JavaStreamingContext for Java.
此外,通過源,例如Kafka、Flume和Twitter創建DStream的功能可以通過導入并添加正確的依賴,就像前面的章節中解釋的那樣。在Kafka的情況下,在添加artifact spark-streaming-kafka_2.10到項目的依賴后,你可以像這樣創建一個來自Kafka的DStream:
import org.apache.spark.streaming.kafka.*; KafkaUtils.createStream(jssc, kafkaParams, ...);
更多關于附加源的細節,查看相應的API文檔,此外,你可以實現你自己的源的定制接收者,查看Custom Receiver Guide.
###操作### 有兩種DStream操作-轉換和輸出操作。和RDD轉換類似,DStream轉換操作針對一個或者多個DStream來創建新的包含轉換數據的DStreams。在數據流上應用一系列轉換后,輸入操作需要調用,它寫數據到一個額外的數據槽中,例如一個文件系統或者一個數據庫。
####轉換#### DStream支持許多轉換,在一個普通的Spark RDD上。下面是一些常見的轉換:
<pre> Transformation Meaning map(func) Return a new DStream by passing each element of the source DStream through a function func. flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items. filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true. repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions. union(otherStream) Return a new DStream that contains the union of the elements in the source DStream and otherDStream. count() Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. countByValue() When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. reduceByKey(func, [numTasks]) When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. join(otherStream, [numTasks]) When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. cogroup(otherStream, [numTasks]) When called on DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. transform(func) Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. updateStateByKey(func) Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. </pre>
最后兩個轉換值得再次解釋。
####UpdateStateByKey操作#### updateStateByKey允許你維護任意的狀態,同時,可以持續不斷的更新新信息。使用它,你需要下面兩步:
定義狀態-狀態可以是任意數據類型
定義狀態更新函數-指定一個函數,怎樣從之前的狀態和新的輸入流的值中更新狀態
讓我們使用一個例子闡述。假設我們想維護一個連續的一個文本流中的單詞出現的次數。這里,連續的和是這個state,并且是一個Integer,我們定義update函數,像這樣:
import com.google.common.base.Optional; Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { Integer newSum = ... // add the new values with the previous running count to get the new count return Optional.of(newSum); } };
下面的應用在一個包含words的DStream上(假設,Pairs DStream包含(word ,1)對在quick example)
update函數將會被每一個word調用,with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count.完整的Scala代碼,查看例子StatefulNetworkWordCount.
####Transform操作####
####Window操作#### 最后,Spark Streaming還提供了window計算。
####Output操作#### 當一個輸出操作被調用,它出發一個流計算,目前,定義了下面的輸出操作:
<pre> Output Operation Meaning print() Prints first ten elements of every batch of data in a DStream on the driver. foreachRDD(func) The fundamental output operator. Applies a function, func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system. saveAsObjectFiles(prefix, [suffix]) Save this DStream's contents as a SequenceFile of serialized objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". saveAsTextFiles(prefix, [suffix]) Save this DStream's contents as a text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". saveAsHadoopFiles(prefix, [suffix]) Save this DStream's contents as a Hadoop file. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". </pre>
完整的DStream操作的列表可以在API文檔得到。對于Scala API,查看DStream和 PairDStreamFunctions,對于Java API,查看JavaDStream和 JavaPairDStream.
###持久化### 類似于RDDs,DStreams同樣允許開發者持久化流數據到內存。就是說,在一個DStream上使用persist()將會自動的持久化這個DStream的每一個RDD到內存。如果這個DStream中的數據將會被計算多次(例如,在同樣的數據上進行多個操作),這是非常有用的。對于基于window的操作例如reduceByWondow和reduceByKeyAndWindow和基于狀態的操作例如updateStateByKey,是默認持久化的。因此,通過基于window的操作生成的DStream是自動持久化到內存的,而不需要開發者去調用persist()方法。
對于數據流來說,它通過network(例如Kafka,Flume,socket等等)接收數據,它的默認的持久化級別是復制數據到兩個節點,以便容錯。
注意,不想RDDs,DSteam默認的持久化級別是保持數據在內存中序列化。在章節Performance Tuning有更多的討論。更多關于不同持久化級別的信息可以在 Spark Programming Guide找到。
###RDD Checkpointing### 一個stateful操作是那些在數據的多個batches上的操作。它包括所有基于window的操作和updateStateByKey操作。由于stateful操作依賴于之前數據的batches,他們隨著時間連續不斷的聚集元數據。要清除這些數據,Spark Streaming支持在存儲中間數據到HDFS時進行定期的checkpointing。
啟用checkpointing,開發者需要提供RDD將被保存的HDFS路徑。通過以下代碼完成:
ssc.checkpoint(hdfsPath) // assuming ssc is the StreamingContext or JavaStreamingContext
一個DStream的checkpointing的間隔可以這樣設置:
dstream.checkpoint(checkpointInterval)
對于DStream,他必須被checkpointing(即,DStream通過updateStateByKey創建,并且使用相反的函數reduceByKeyAndWindow),DStream的checkpoint間隔默認設置為set to a multiple of the DStream’s sliding interval,例如至少設置10秒。
###Deployment### 和其他任何Spark應用程序一樣,Spark Streaming應用程序部署在集群上。請參考 deployment guide獲得更多信息。
如果一個正在運行的Spark Streaming應用程序需要升級(包括新的應用代碼),這里有兩個可能的技巧:
The upgraded Spark Streaming application is started and run in parallel to the existing application. Once the new one (receiving the same data as the old one) has been warmed up and ready for prime time, the old one be can be brought down. Note that this can be done for data sources that support sending the data to two destinations (i.e., the earlier and upgraded applications).
The existing application is shutdown gracefully (see StreamingContext.stop(...) or JavaStreamingContext.stop(...) for graceful shutdown options) which ensure data that have been received is completely processed before shutdown. Then the upgraded application can be started, which will start processing from the same point where the earlier application left off. Note that this can be done only with input sources that support source-side buffering (like Kafka, and Flume) as data needs to be buffered while the previous application down and the upgraded application is not yet up.
到此,相信大家對“Spark Streaming編程技巧是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。