91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

這樣進行Spark的解析

發布時間:2021-12-16 21:50:52 來源:億速云 閱讀:163 作者:柒染 欄目:云計算

這樣進行Spark的解析,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

Spark場景

   Spark是基于內存的迭代計算框架,適用于需要多次操作特定數據集的應用場合。需要反復操作的次數越多,
所需讀取的數據量越大,受益越大,數據量小但是計算密集度較大的場合,受益就相對較小

    由于RDD的特性,Spark不適用那種異步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web
爬蟲和索引。就是對于那種增量修改的應用模型不適合

    數據量不是特別大,但是要求實時統計分析需求

Spark Master模式(Url)

 1、local:這種方式是在本地啟動一個線程來運行作業;
 2、local[N]:也是本地模式,但是啟動了N個線程;
 3、local[*]:還是本地模式,但是用了系統中所有的核;
 4、local[N,M]:這里有兩個參數,第一個代表的是用到的核個數;第二個參數代表的是容許該作業失敗M次;
 5、local-cluster[N, cores, memory] :本地偽集群模式;
 6、spark:// :這是用到了 Spark 的Standalone模;
 7、(mesos|zk):// :這是Mesos模式;
 8、yarn\yarn-cluster\yarn-client :這是YARN模式。前面兩種代表的是集群模式;后面代表的是客戶端模式;
 9、simr:// :simr其實是Spark In MapReduce的縮寫

Spark deploy模式

 1、Local模式
 local模式出了偽集群模式(local-cluster),所有的local都是用到了LocalBackend和TaskSchedulerImpl類。LocalBackend接收來自TaskSchedulerImpl的receiveOffers()調用,并根據運行Application傳進來的CPU核生成WorkerOffer,并調用scheduler.resourceOffers(offers)生成Task,最后通過 executor.launchTask來執行這些Task。

 2、Standalone
 Standalone模式使用SparkDeploySchedulerBackend和TaskSchedulerImpl,SparkDeploySchedulerBackend是繼承自CoarseGrainedSchedulerBackend類,并重寫了其中的一些方法。
 CoarseGrainedSchedulerBackend是一個粗粒度的資源調度類,在Spark job運行的整個期間,它會保存所有的Executor,在task運行完的時候,并不釋放該Executor,也不向Scheduler申請一個新的Executor。Executor的啟動方式有很多中,需要根據Application提交的Master URL進行判斷。在CoarseGrainedSchedulerBackend中封裝了一個DriverActor類,它接受Executor注冊(RegisterExecutor)、狀態更新(StatusUpdate)、響應Scheduler的ReviveOffers請求、殺死Task等等。
 在本模式中將會啟動一個或者多個CoarseGrainedExecutorBackend。具體是通過AppClient類向Master請求注冊Application。當注冊成功之后,Master會向Client進行反饋,并調用schedule啟動Driver和CoarseGrainedExecutorBackend,啟動的Executor會向DriverActor進行注冊。然后CoarseGrainedExecutorBackend通過aunchTask方法啟動已經提交的Task。

 3、yarn-cluster
 yarn-cluster集群模式涉及到的類有YarnClusterScheduler和YarnClusterSchedulerBackend。YarnClusterSchedulerBackend同樣是繼承自CoarseGrainedSchedulerBackend。而YarnClusterScheduler繼承自TaskSchedulerImpl,它只是簡單地對TaskSchedulerImpl進行封裝,并重寫了getRackForHost和postStartHook方法。
 Client類通過YarnClient在Hadoop集群上啟動一個Container,并在其中運行ApplicationMaster,并通過Yarn提供的接口在集群中啟動多個Container用于運行CoarseGrainedExecutorBackend,并向CoarseGrainedSchedulerBackend中的DriverActor進行注冊。

 4、yarn-client
 yarn-cluster集群模式涉及到的類有YarnClientClusterScheduler和YarnClientSchedulerBackend。YarnClientClusterScheduler繼承自TaskSchedulerImpl,并對其中的getRackForHost方法進行了重寫。Yarn-client模式下,會在集群外面啟動一個ExecutorLauncher來作為driver,并想集群申請Container,來啟動CoarseGrainedExecutorBackend,并向CoarseGrainedSchedulerBackend中的DriverActor進行注冊。

 5、Mesos
 Mesos模式調度方式有兩種:粗粒度和細粒度。粗粒度涉及到的類有CoarseMesosSchedulerBackend和TaskSchedulerImpl類;而細粒度涉及到的類有MesosSchedulerBackend和TaskSchedulerImpl類。CoarseMesosSchedulerBackend和 MesosSchedulerBackend都繼承了MScheduler(其實是Mesos的Scheduler),便于注冊到Mesos資源調度的框架中。選擇哪種模式可以通過spark.mesos.coarse參數配置。默認的是MesosSchedulerBackend。

 上面涉及到Spark的許多部署模式,究竟哪種模式好這個很難說,需要根據需求,如果只是測試Spark Application,可以選擇local模式。而如果數據量不是很多,Standalone 是個不錯的選擇。當你需要統一管理集群資源(Hadoop、Spark等)那么可以選擇Yarn,但是這樣維護成本就會變高。yarn-cluster和yarn-client模式內部實現還是有很大的區別。如果需要用于生產環境,那么請選擇yarn-cluster;而如果僅僅是Debug程序,可以選擇yarn-client。

Spark Jar/File Url格式

file:/
  文件絕對路徑,并且file:/URI是通過驅動器的HTTP文件服務器來下載的,每個執行器都從驅動器的HTTP server拉取這些文件。
hdfs:/http:/https:/ftp: 
  Spark將會從指定的URI位置下載所需的文件和jar包。
local:/
  指定在每個工作節點上都能訪問到的本地或共享文件。這意味著,不會占用網絡IO,特別是對一些大文件或jar包,最好使用這種方式,當需要把文件推送到每個工作節點上可以通過NFS和GlusterFS共享文件。

Spark執行模型

Dependency
  Dependency代表了RDD之間的依賴關系,即血緣
  NarrowDependency代表窄依賴,即父RDD的分區,最多被子RDD的一個分區使用。所以支持并行計算。
      OneToOneDependency表示父RDD和子RDD的分區依賴是一對一的
      RangeDependency表示在一個range范圍內,依賴關系是一對一的,所以初始化的時候會有一個范圍,范圍外的partitionId,傳進去之后返回的是Nil
  Shuffle代表寬依賴針對的RDD是KV形式的,需要一個partitioner指定分區方式,需要一個序列化工具類


Partition
    Partition具體表示RDD每個數據分區。


Partitioner
Partitioner決定KV形式的RDD如何根據key進行partition
    默認Partitioner
    Partitioner的伴生對象提供defaultPartitioner方法,邏輯為:
    傳入的RDD(至少兩個)中,遍歷(順序是partition數目從大到小)RDD,如果已經有Partitioner了,就使用。如果RDD們都沒有Partitioner,則使用默認的HashPartitioner。而HashPartitioner的初始化partition數目,取決于是否設置了Spark.default.parallelism,如果沒有的話就取RDD中partition數目最大的值
    HashPartitioner基于Java的Object.hashCode。會有個問題是Java的Array有自己的hashCode,不基于Array里的內容,所以RDD[Array[_]]或RDD[(Array[_], _)]使用HashPartitioner會有問題。
    RangePartitioner處理的KV RDD要求Key是可排序的,即滿足Scala的Ordered[K]類型


Persist/Unpersist
默認cache()過程是將RDD persist在內存里,persist()操作可以為RDD重新指定StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication), persist并不是action,并不會觸發任何計算


Checkpoint
RDD Actions api里提供了checkpoint()方法,會把本RDD save到SparkContext CheckpointDir
目錄下。建議該RDD已經persist在內存中,否則需要recomputation。


Transformations
RDD transformation 見下

Actions
RDD action 見下

Job->Stage->Task->Transformations/Action
一個Spark的Job分為多個stage,最后一個stage會包括一個或多個ResultTask,前面的stages會包括一個或多個ShuffleMapTasks。
ResultTask執行并將結果返回給driver application。
ShuffleMapTask將task的output根據task的partition分離到多個buckets里。一個ShuffleMapTask對應一個ShuffleDependency的partition,而總partition數同并行度、reduce數目是一致的

DAGScheduler
面向stage的調度層,為job生成以stage組成的DAG,以stage為單位,提交TaskSet給TaskScheduler執行。
每一個Stage內,都是獨立的tasks,他們共同執行同一個compute function,享有相同的shuffledependencies。DAG在切分stage的時候是依照出現shuffle為界限的。

DAGSchedulerEvent

TaskScheduler
TaskScheduler接收task、接收分到的資源和executor、維護信息、與backend打交道、分配任務

SchedulableBuilder
FIFO和Fair兩種實現, addTaskSetManager會把TaskSetManager加到pool里。FIFO的話只有一個pool。Fair有多個pool,Pool也分FIFO和Fair兩種模式

TaskSet,即Stage
封裝一個stage的所有的tasks, 以提交給TaskScheduler

ResultTask
對應于Result Stage直接產生結果

ShuffleMapTask
對應于ShuffleMap Stage, 產生的結果作為其他stage的輸入

TaskSetManager
負責這批Tasks的啟動,失敗重試,感知本地化等事情。每次reourseOffer方法會尋找合適(符合條件execId, host, locality)的Task并啟動它

TaskResultGetter
維護一個線程池,用來反序列化和從遠端獲取task結果

BlockManagerMaster/BlockManagerWorker
TaskResult里包含BolckId, BlockManagerMaster通過這個blockId的獲取bolck的locations,BlockManagerWorker通過這些locations來獲得(反序列化)block的數據

Spark RDD

RDD是Spark中的抽象數據結構類型,任何數據在Spark中都被表示為RDD。從編程的角度來看,RDD可以簡單看成是一個數組。
和普通數組的區別是,RDD中的數據是分區存儲的,這樣不同分區的數據就可以分布在不同的機器上,同時可以被并行處理。

Spark應用程序所做把需要處理的數據轉換為RDD,然后對RDD進行一系列的變換和操作從而得到結果

一個RDD對象,包含如下5個核心屬性。
  一個分區列表,每個分區里是RDD的部分數據(或稱數據塊)。
  一個依賴列表,存儲依賴的其他RDD。
  一個名為compute的計算函數(由子類實現),用于計算RDD各分區的值。
  一個分區器(可選),用于鍵/值類型的RDD,比如某個RDD是按散列來分區。
  一個計算各分區時優先的位置列表(可選),比如從HDFS上的文件生成RDD時,RDD分區的位置優先選擇數據所在的節點,這樣可以避免數據移動帶來的開銷。

Work with RDD

object array -> object list -> object rdd

object array -> object list -> Row list -> Row rdd + StructType schema -> object df

object arrays -> object lists -> object rdds -> object rdd queue -> object dstream

RDD Transformer

map(func):對調用map的RDD數據集中的每個element都使用func,然后返回一個新的RDD,這個返回的數據集是分布式的數據集
keyBy(f: T => K)

filter(func): 對調用filter的RDD數據集中的每個元素都使用func,然后返回一個包含使func為true的元素構成的RDD
flatMap(func):和map差不多,但是flatMap生成的是多個結果
mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition
mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一個split上,所以func中應該有index
sample(withReplacement,faction,seed):抽樣
union(otherDataset):并集, 返回一個新的dataset,包含源dataset和給定dataset的元素的集合
intersection(otherDataset):交集
subtract(otherDataset):差集
distinct([numTasks]):返回一個新的dataset,這個dataset含有的是源dataset中的distinct的element
groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函數接受的key-valuelist
reduceByKey(func,[numTasks]):就是用一個給定的reducefunc再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數
sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean類型
join(otherDataset,[numTasks]):當有兩個KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks為并發的任務數
cogroup(otherDataset,[numTasks]):當有兩個KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks為并發的任務數
cartesian(otherDataset):笛卡爾積就是m*n
pipe(command: String)	把RDD數據通過ProcessBuilder創建額外的進程輸出走
zip(RDD[U]): RDD[(T, U)]	兩個RDD分區數目一致,且每個分區數據條數一致

RDD Action

reduce(func):說白了就是聚集,但是傳入的函數是兩個參數輸入返回一個值,這個函數必須是滿足交換律和結合律的
fold(zeroValue: T)(op: (T, T) => T)	特殊的reduce,帶初始值,函數式語義的fold
aggregate(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U):帶初始值、reduce聚合、merge聚合三個完整條件的聚合方法。rdd的做法是把函數傳入分區里去做計算,最后匯總各分區的結果再一次combOp計算
subtract(RDD[T]):rdd實現為map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys 與求交類似
collect():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組
count():返回的是dataset中的element的個數
first():返回的是dataset中的第一個元素
top(n)(ordering):每個分區內傳入top的處理函數,得到分區的堆,使用rdd.reduce(),把每個分區的堆合起來,排序,取前n個
take(n):返回前n個elements,這個士driverprogram返回的
takeSample(withReplacement,num,seed):抽樣返回一個dataset中的num個元素,隨機種子seed
saveAsTextFile(path):把dataset寫到一個textfile中,或者hdfs,或者hdfs支持的文件系統中,spark把每條記錄都轉換為一行記錄,然后寫到file中
saveAsSequenceFile(path):只能用在key-value對上,然后生成SequenceFile寫到本地或者hadoop文件系統
countByKey():返回的是key對應的個數的一個map,作用于一個RDD
countByValue(): Map[T, Long]	rdd實現為map(value => (value, null)).countByKey():本質上是一次簡單的combineByKey,返回Map,會全load進driver的內存里,需要數據集規模較小
foreach(func):對dataset中的每個元素都使用func
max()/min()	特殊的reduce,傳入max/min比較函數



PairRDDFunctions
.....


DoubleRDDFunctions
sum()	rdd實現是reduce(_ + _)
stats()	rdd實現是mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b)) StatCounter在一次遍歷里統計出中位數、方差、count三個值,merge()是他內部的方法
mean()	rdd實現是stats().mean
variance()/sampleVariance()	rdd實現是stats().variance
stdev()/sampleStdev()	rdd實現是stats().stdev 求標準差
meanApprox()/sumApprox()	調用runApproximateJob
histogram()	比較復雜的計算,rdd實現是先mapPartitions再reduce,包含幾次遞歸

Spark Core

    提供了有向無環圖(DAG)的分布式并行計算框架,并提供Cache機制來支持多次迭代計算或者數據共享,大大減少
迭代計算之間讀取數據局的開銷,這對于需要進行多次迭代的數據挖掘和分析性能有很大提升
    在Spark中引入了RDD (Resilient Distributed Dataset) 的抽象,它是分布在一組節點中的只讀對象集合,
這些集合是彈性的,如果數據集一部分丟失,則可以根據“血統”對它們進行重建,保證了數據的高容錯性;
    移動計算而非移動數據,RDD Partition可以就近讀取分布式文件系統中的數據塊到各個節點內存中進行計算
    使用多線程池模型來減少task啟動開稍
    采用容錯的、高可伸縮性的akka作為通訊框架

Spark SQL

    引入了新的RDD類型SchemaRDD,可以象傳統數據庫定義表一樣來定義SchemaRDD,SchemaRDD由定義了列數據類型的行對象構成。SchemaRDD可以從RDD轉換過來,也可以從Parquet文件讀入,也可以使用HiveQL從Hive中獲取。
    內嵌了Catalyst查詢優化框架,在把SQL解析成邏輯執行計劃之后,利用Catalyst包里的一些類和接口,執行了一些簡單的執行計劃優化,最后變成RDD的計算
    在應用程序中可以混合使用不同來源的數據,如可以將來自HiveQL的數據和來自SQL的數據進行Join操作。
    內存列存儲(In-Memory Columnar Storage),sparkSQL的表數據在內存中存儲不是采用原生態的JVM對象存儲方式,而是采用內存列存儲;
    字節碼生成技術(Bytecode Generation),Spark1.1.0在Catalyst模塊的expressions增加了codegen模塊,使用動態字節碼生成技術,對匹配的表達式采用特定的代碼動態編譯。另外對SQL表達式都作了CG優化, CG優化的實現主要還是依靠Scala2.10的運行時放射機制(runtime reflection);
    Scala代碼優化 SparkSQL在使用Scala編寫代碼的時候,盡量避免低效的、容易GC的代碼;盡管增加了編寫代碼的難度,但對于用戶來說接口統一。

Spark MLlib

MLBase    是Spark生態圈的一部分專注于機器學習,讓機器學習的門檻更低,讓一些可能并不了解機器學習的用戶也能方便地使用MLbase。MLBase分為四部分:MLlib、MLI、ML Optimizer和MLRuntime。
ML Optimizer    會選擇它認為最適合的已經在內部實現好了的機器學習算法和相關參數,來處理用戶輸入的數據,并返回模型或別的幫助分析的結果;
MLI     是一個進行特征抽取和高級ML編程抽象的算法實現的API或平臺;
MLlib    是Spark實現一些常見的機器學習算法和實用程序,包括分類、回歸、聚類、協同過濾、降維以及底層優化,該算法可以進行可擴充; MLRuntime 基于Spark計算框架,將Spark的分布式計算應用到機器學習領域。

Spark GraphX

    GraphX是Spark中用于圖(e.g., Web-Graphs and Social Networks)和圖并行計算(e.g., PageRank and Collaborative Filtering)的API,可以認為是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重寫及優化,跟其他分布式圖計算框架相比,GraphX最大的貢獻是,在Spark之上提供一棧式數據解決方案,可以方便且高效地完成圖計算的一整套流水作業。GraphX最先是伯克利AMPLAB的一個分布式圖計算框架項目,后來整合到Spark中成為一個核心組件。
GraphX的核心抽象是Resilient Distributed Property Graph,一種點和邊都帶屬性的有向多重圖。它擴展了Spark RDD的抽象,有Table和Graph兩種視圖,而只需要一份物理存儲。兩種視圖都有自己獨有的操作符,從而獲得了靈活操作和執行效率。如同Spark,GraphX的代碼非常簡潔。GraphX的核心代碼只有3千多行,而在此之上實現的Pregel模型,只要短短的20多行。GraphX的代碼結構整體下圖所示,其中大部分的實現,都是圍繞Partition的優化進行的。這在某種程度上說明了點分割的存儲和相應的計算優化的確是圖計算框架的重點和難點。
GraphX的底層設計有以下幾個關鍵點。
    1.對Graph視圖的所有操作,最終都會轉換成其關聯的Table視圖的RDD操作來完成。這樣對一個圖的計算,最終在邏輯上,等價于一系列RDD的轉換過程。因此,Graph最終具備了RDD的3個關鍵特性:Immutable、Distributed和Fault-Tolerant。其中最關鍵的是Immutable(不變性)。邏輯上,所有圖的轉換和操作都產生了一個新圖;物理上,GraphX會有一定程度的不變頂點和邊的復用優化,對用戶透明。
    2.兩種視圖底層共用的物理數據,由RDD[VertexPartition]和RDD[EdgePartition]這兩個RDD組成。點和邊實際都不是以表Collection[tuple]的形式存儲的,而是由VertexPartition/EdgePartition在內部存儲一個帶索引結構的分片數據塊,以加速不同視圖下的遍歷速度。不變的索引結構在RDD轉換過程中是共用的,降低了計算和存儲開銷。
    3.圖的分布式存儲采用點分割模式,而且使用partitionBy方法,由用戶指定不同的劃分策略(PartitionStrategy)。劃分策略會將邊分配到各個EdgePartition,頂點Master分配到各個VertexPartition,EdgePartition也會緩存本地邊關聯點的Ghost副本。劃分策略的不同會影響到所需要緩存的Ghost副本數量,以及每個EdgePartition分配的邊的均衡程度,需要根據圖的結構特征選取最佳策略。目前有EdgePartition2d、EdgePartition1d、RandomVertexCut和CanonicalRandomVertexCut這四種策略。在淘寶大部分場景下,EdgePartition2d效果最好。

Spark Streaming

SparkStreaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,可以對多種數據源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行類似Map、Reduce和Join等復雜操作,并將結果保存到外部文件系統、數據庫或應用到實時儀表盤。
計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理作業。這里的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分成一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),然后將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果保存在內存中。整個流式計算根據業務的需求可以對中間的結果進行疊加或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。
容錯性:對于流式計算來說,容錯性至關重要。首先我們要明確一下Spark中RDD的容錯機制。每一個RDD都是一個不可變的分布式可重算的數據集,其記錄著確定性的操作繼承關系(lineage),所以只要輸入數據是可容錯的,那么任意一個RDD的分區(Partition)出錯或不可用,都是可以利用原始輸入數據通過轉換操作而重新算出的。  
實時性:對于實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對于每一段數據的處理都會經過Spark DAG圖分解以及Spark的任務集的調度過程。對于目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式準實時計算場景。
擴展性與吞吐量:Spark目前在EC2上已能夠線性擴展到100個節點(每個節點4Core),可以以數秒的延遲處理6GB/s的數據量(60M records/s),其吞吐量也比流行的Storm高2~5倍,Berkeley利用WordCount和Grep兩個用例所做的測試,在Grep這個測試中,Spark Streaming中的每個節點的吞吐量是670k records/s,而Storm是115k records/s。

SparkR

    SparkR是AMPLab發布的一個R開發包,使得R擺脫單機運行的命運,可以作為Spark的job運行在集群上,極大得擴展了R的數據處理能力。
SparkR的幾個特性:
    提供了Spark中彈性分布式數據集(RDD)的API,用戶可以在集群上通過R shell交互性的運行Spark job。
    支持序化閉包功能,可以將用戶定義函數中所引用到的變量自動序化發送到集群中其他的機器上。
    SparkR還可以很容易地調用R開發包,只需要在集群上執行操作前用includePackage讀取R開發包就可以了,當然集群上要安裝R開發包。

SparkPython

    Spark Python

pom.xml

<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-mllib_2.11</artifactId>
  <version>2.0.0</version>
</dependency>

<dependency>
  <groupId>com.typesafe.play</groupId>
  <artifactId>play-json_2.11</artifactId>
  <version>2.3.9</version>
</dependency>

<plugin>
  <groupId>net.alchim31.maven</groupId>
  <artifactId>scala-maven-plugin</artifactId>
  <version>3.1.3</version>
  <executions>
    <execution>
      <id>scala-compile-first</id>
      <phase>process-resources</phase>
      <goals>
        <goal>add-source</goal>
        <goal>compile</goal>
      </goals>
    </execution>
    <execution>
      <id>scala-test-compile</id>
      <phase>process-test-resources</phase>
      <goals>
        <goal>testCompile</goal>
      </goals>
    </execution>
  </executions>
</plugin>

	<repositories>
		<repository>
			<id>maven_central</id>
			<url>http://central.maven.org/maven2/</url>
		</repository>

		<repository>
			<id>sonatype-nexus-snapshots</id>
			<url>https://oss.sonatype.org/content/repositories/snapshots</url>
		</repository>
		
		<repository>
			<id>typesafe</id>
			<url>http://repo.typesafe.com/typesafe/releases/</url>
		</repository>
		
	</repositories>

	<pluginRepositories>
		<pluginRepository>
			<id>maven_central</id>
			<url>http://central.maven.org/maven2/</url>
		</pluginRepository>

		<pluginRepository>
			<id>sonatype-nexus-snapshots</id>
			<url>https://oss.sonatype.org/content/repositories/releases/</url>
		</pluginRepository>
	</pluginRepositories>

Tests.scala 

  def listRdd(){
    var sc = new SparkContext("local[1]", "spdb")
    var sqlContext = new SQLContext(sc)
  
    var listStr1 = """zm,zn,zq""" 
    var list = listStr1.split(",").toList
    var rdd = sc.parallelize(list, 2)
    var max = rdd.max()
    println(max)
  }


 

看完上述內容,你們掌握這樣進行Spark的解析的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

大安市| 兰西县| 花莲市| 澄城县| 嵊州市| 徐闻县| 鹤壁市| 嘉黎县| 通城县| 信阳市| 建阳市| 南陵县| 南川市| 峡江县| 长子县| 阿勒泰市| 饶平县| 乐清市| 吴旗县| 遵义市| 东海县| 岗巴县| 石首市| 娱乐| 韶山市| 桑日县| 贡嘎县| 临江市| 广宁县| 平原县| 竹山县| 西华县| 湖北省| 嘉祥县| 隆昌县| 紫金县| 舒城县| 霍州市| 静宁县| 大城县| 光山县|