91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

好程序員分享大數據的架構體系

發布時間:2020-07-02 01:02:20 來源:網絡 閱讀:503 作者:wx5d42865f47214 欄目:大數據

好程序員分享大數據的架構體系:

????????????flume采集數據

????????????MapReduce

????????????HBse (HDFS)

????????????Yarn ??資源調度系統

??展示平臺 數據平臺

????????????1,提交任務

????????????2,展示結果數據

??spark 分析引擎 ?S3 ??可以進行各種的數據分析 , 可可以和hive進行整合 ,spark任務可以運行在Yarn

?

提交任務到集群的入口類 ???SC

?

為什么用spark : ??速度快,易用,通用,兼容性高

?

hadoop

scala

jdk

spark

?

如果結果為定長的 ?toBuffer編程變長的

?

啟動流程

?

spark集群啟動流程 ?和任務提交

?

主節點 master

子節點work ??多臺

start-allsh 腳本 先啟動master服務 ?????啟動work

master ?提交注冊信息 ??work 響應 ??work會定時發送心跳信息

?

?

集群啟動流程

??????1、調用start-all腳本 ??,開始啟動Master

??????2master啟動以后,preStart方法調用了一個定時器,定時的檢查超時的worker

??????3、啟動腳本會解析slaves配置文件,找到啟動work的相應節點,開始啟動worker

??????4worker服務啟動后開始調用prestart方法(生命周期方法)開始向所有的master注冊

??????5master接收到work發送過來的注冊信息,master 開始保存注冊信息并把自己的URL響應給worker

??????6worker接收到masterURL后并更新,開始掉用一個定時器,定時的向master發送心跳信息

?

?

任務提交流程

?

將任務rdd通過客戶端submit 提交給Master ?的管道 (隊列:先進先出)

??????????????????????worker啟動Executor子進程 ??來從master拿取任務信息

??????????????????????Executor ?向客戶端Driver端注冊

?????????????????????????客戶端收到注冊信息 ?客戶端就會將任務給 Executor進行人物計算

?

任務提交流程

????????1、首先Driver端會通過spark-submit腳本啟動sparkSubmint進程,此時開始創建重要的對象(SparkContext),啟動后開始向Master發送信息開始通信

????????2Master接收到發送過來的信息后,開始生成任務信息,并把任務信息放到隊列中

????????3master開始把所有有效的worker過濾出來并進行排序,按照空閑的資源進行排序

????????4Master開始向有效的worker通知拿取任務信息,并啟動相應的Executor

????????5worker啟動Executor ,并向Driver反向注冊

????????6Driver開始把生成的task發送給相應的ExecutorExecutor

?

?

WordCount中產生的RDD

?

hdfs上有三個文件 ?sc.textFile(“路徑”)方法 ?生成第一個RDD ?HadoopRDD ???第二個RDD ?MapPartitionsRDD ?flatMap(_.split()"") 生成 第三個RDD ?MapPartitionsRDD

????????????????????????????????map((_,1))生成第四個RDD ?MapPartitionsRDD ???reduceByKey ?生成第五個 ShuffledRDD ?????saveAsTextFile ?生成第六個RDD MapPartitionsRDD

?

.toDebugString ?可以看出RDD

?

分區

Partition ?后跟分區 ?分區本身不會改變 ?會生成以一個新的RDD分區為修改后 ?因為rdd本身不可變 ??修改后大于原本分區的會發生shullfer ?小于的不會發生

?

coalesce ???后跟分區少于原來的分區則會改變 ?因為不會發生shuffle ?大于時則不可改變

?

PartitionBy ?后跟新的分區器 new ?全名稱的分區器org.apache.spark.hparPartition

?

客戶端提交Job任務信息給Master

Master生成任務信息 master 生成任務信息描述任務的數據 ??通知work 創建相應的Executor

??????客戶端將job信息給work ?workExecutor 進行計算數據

?

object Demo {

??def main(args: Array[String]): Unit = {

?

//SparkConf:構架配置信息類,優先于集群配置文件

//setAppName:指定應用程序名稱,如果不指定,會自動生成類似于uuid產生的名稱

//setMaster:指定運行模式:local[1]-用一個線程模擬集群運行,local[2] -用兩個集群模擬線程集群運行,local[*] -有多少線程就用多少線程運行

?

????val conf= new SparkConf().setAppName("") ???// setAppName起名稱 ?setMaster ?在哪里運行 ?是本地還是 ?[]是調用多少線程來運行

?.setMaster("local[2]") //在打包上傳集群時 ?不需要這一步直接刪除或是注釋掉

//創建提交任務到集群的入口類(上下文對象)

??????val sc = ?new SparkContext(conf)

//獲取hdfs的數據

val lines = sc.textFile("hdfs://suansn:9000/wc")

val words= lines.flatMap(_.split(" ")) // 切分后生成單詞

val tuples=words.map((_,1)) ??//將單詞生成一個元組

val ?sum= tuples.reduceBykey(_+_) ?// 進行聚合

val PX = sum.sortBy(_._2,false) ?// 倒敘拍尋

print(PX.collect.toBuffer) // 打印至控制臺 ??在打包上傳集群時 ?不需要這一步直接刪除或是注釋掉

?

PX.saveAsTextFile("hdfs://suansn:9000/ssss")

sc.stop ????//釋放資源

?

?

?

??}

?

}

?

?

RDD ????提供的方法 ?叫做算子

?

RDD ?數據集 數據的抽象 ?是一種類型,提供方法處理數據 ???分布式 ?僅僅是指向數據 , ??不可變 如果想要其他的操作 就在另外定義一個 RDD 。 可分區 ??如果一個文件 小于128M ?就是一個分區 如果大于將根據大小來分區

?

一組分片 ??一個計算每個分區的函數 ??RDD之間的依賴關系 ??一個Partitioner,即RDD的分片函數。 ??一個列表,存儲存取每個Partition的優先位置(preferred location)。

?

?

RDD ?有兩種類型 ?????一個算子對應一個Actionjob

??????1 Transformation ?轉換的類型 ???延遲加載 ??只是記錄計算過程 并不執行 ????只有調用 ?Action 類型的算子后 ?觸發job 生成計算

??????????????????????如果沒有Transformation 算子 ?而全是Action算子 ?就無法優化 ?集群一直處于繁忙狀態。

?

??????2Action

?sc.parallelize 并行方法創建RDD

?

?

?val rdd1 = sc.parallelize(List(3,4,6,5,8,7,9,2,1),2)

?

???每個數據乘10

?val rdd2 = rdd1.map(_*10)

?? ? ?Array[Int] = Array(30, 40, 60, 50, 80, 70, 90, 20, 10)

?

?

利用分區計算 ?mapPartitions

?

val rdd2= rdd1.mapPartitions(_.map(_*10)) ????//map_ ?表示每個分區的數據 ?封裝到Iterator

Array[Int] = Array(30, 40, 60, 50, 80, 70, 90, 20, 10)

?

mapWith ???????//map的變異 ?也可將元素數據遍歷出來 ?將分區號作為輸入 返回到A類型作為輸出

(constructA: Int => A)(f: (T, A) => RDD[U])

參數列表:(constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U) ???// Int => A 操作的每個分區的分區號,preservesPartitioning: Boolean = false ?是否記錄rdd的分區信息 ? ?(T, A) ??Trdd中的元素

// 實現了柯里化的步驟 ?兩個A的傳入

?

rdd1.mapWith(i => i*10)((a, b) => b+2).collect ???//分區號 i ?乘以10 ??B接收 A RDD的元素

Array[Int] = Array(2,2,2,2,12,12,12,12,12)

?

?

flatMapWith ??//分區排序

?

?

?(constructA: Int => A)(f: (T, A) => Seq[U])

參數列表:(constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U])

rdd1.flatMapWith(i => i, true)((x, y) => List((y, x))).collect ?// i為分區號 ??原樣不懂輸出 ??true 相當于 允許記錄分區信息 ???Y為拿到的分區號 ?X RDD的元素

Array[Int,Int)] = Array((0,3)(0,4)(0,6)(0,5)(1,8)(1,7)(1,9)(1,2)(1,1))

?

mapPartitions ??f: Iterator[T] => Iterator[U]

rdd1.mapPartitions(_.toList.reverse.iterator).collect ???// ?每個分區顛倒排列

?

Array[Int] = Array(5, 6, 4, 3, 1, 2, 9, 7, 8)

?

mapPartitionsWithIndex ??????????循環分區并可以操作分區號

參數列表:(f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false) ?????//Iterator[(Int) ?分區信息 ??index: Int ?分區號

val func = (index: Int, iter: Iterator[(Int)]) => {

??iter.toList.map(x => "[partID:" + ?index + ", val: " + x + "]").iterator

}

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)

rdd1.mapPartitionsWithIndex(func).collect

?Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])

?

aggregate ???// ?聚合算子

(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

?

def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {

??iter.toList.map(x => "[partID:" + ?index + ", val: " + x + "]").iterator

}

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)

rdd1.mapPartitionsWithIndex(func1).collect

Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])

?

?

rdd1.aggregate(0)(math.max(_, _), _ + _) ???// 循環時 ?第一個_ 拿到的時初始值0 ?第二個_拿到的0分區第一個元素 ?然后判斷最大值 ?依次類推 ????局部聚合完,最后全局聚合時 ??初始值+ 0分區的最大值。第1分區的最大值

Int=13

rdd1.aggregate(5)(math.max(_, _), _ + _) ??//原理和上面的相同不過初始值時5 ??這樣得到的第0 分區的最大值就是 初始值 ?5 ?1分區的最大值還是9 ???最后的全局聚合時 ?就是5 + 5+9

Int=19

?

val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)

def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {

??iter.toList.map(x => "[partID:" + ?index + ", val: " + x + "]").iterator

}

rdd2.mapPartitionsWithIndex(func2).collect

Array[String] = Array([partID:0, val: a], [partID:0, val: b], [partID:0, val: c], [partID:1, val: d], [partID:1, val: e], [partID:1, val: f])

?

?

rdd2.aggregate("")(_ + _, _ + _) ?//全局聚合和局部聚合 ???都屬于字符串拼接 ??初始值為空

String = abcdef ??String = defabc ?//因為不確定那個分區先完成任務所以 會出現兩種結果

rdd2.aggregate("=")(_ + _, _ + _)

String = ==abc=def

?

val rdd3 = sc.parallelize(List("12","23","345","4567"),2)

rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) // 取每個字符串的長度 ??第一次與初始值 比較 而后用第二個數據的長度與上一次比較后的長度相比較, ??最后全局聚合時 兩個分區最長的字符串和初始值相加

String = 24 ?String = 42

val rdd4 = sc.parallelize(List("12","23","345",""),2)

rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) ?// ?運算方法與上面的相同 這個求的字符串是最短的 因為在第二個分區內有個空數據字符串為0 ??第一個分區的因為初始值也為空 所以為空 ??tostring后第一次的變為字符串 0 長度為1 全局后為10

?String = 10

val rdd5 = sc.parallelize(List("12","23","","345"),2)

rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) 與上面相同

String = 11

?

aggregateByKey ??通過相同的key 進行聚合

(zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)

//Partitioner ?分區器

val pairRDD = sc.parallelize(List(("mouse", 2),("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12)), 2)

def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {

??iter.toList.map(x => "[partID:" + ?index + ", val: " + x + "]").iterator

??}

pairRDD.mapPartitionsWithIndex(func2).collect

??// ???????全局聚合時 不會加 初始值

????pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect ?// 相同的keyvalue值進行操作

??pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect

?

??combineByKey ??// 聚合的算子

??????(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)

??val rdd1 = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1))

val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)

??rdd2.collect

?

??val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)

rdd.collect

?

?

??val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

??val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)

??val rdd6 = rdd5.zip(rdd4)

??val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m

?

??countByKey

?

??val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))

??rdd1.countByKey ??//相同key value的個數

??rdd1.countByValue // 把整個rdd看成Value

?

?

??filterByRange ?//給定范圍 ?求

?

??val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))

??val rdd2 = rdd1.filterByRange("c", "d")

??rdd2.collect

?

?

??flatMapValues

??val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))

??rdd3.flatMapValues(_.split(" "))

?

?

??foldByKey

?

??val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)

??val rdd2 = rdd1.map(x => (x.length, x))

??val rdd3 = rdd2.foldByKey("")(_+_)

?

??val rdd = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1))

??rdd.foldByKey(0)(_+_)

?

?

??foreachPartition ?//

??val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)

??rdd1.foreachPartition(x => println(x.reduce(_ + _))) ?表示每個分區的數據的聚合值

?

?

??keyBy

??val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

??val rdd2 = rdd1.keyBy(_.length) ??元素數據的長度生成為key 元素數據生成為value

??rdd2.collect

?

??keys values

??val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

??val rdd2 = rdd1.map(x => (x.length, x))

??rdd2.keys.collect

??rdd2.values.collect

?

?

??checkpoint

??sc.setCheckpointDir("hdfs://node01:9000/cp")

??val rdd = sc.textFile("hdfs://node01:9000/wc").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

??rdd.checkpoint ??checkpoint后的文件 準備存儲 ?還未存儲 沒有Action 算子沒有運行job

??rdd.isCheckpointed ?查看是否運行checkpoint

??rdd.count ????隨便調動Avtion的算子 提交job

??rdd.isCheckpointed

??rdd.getCheckpointFile ??查看checkpoint的文件存儲的位置

?

??repartition, coalesce, partitionBy

??val rdd1 = sc.parallelize(1 to 10, 3)

??val rdd2 = rdd1.coalesce(2, false)

??rdd2.partitions.length

?

??collectAsMap ??Array 轉換map kv)對

??val rdd = sc.parallelize(List(("a", 1), ("b", 2)))

??rdd.collectAsMap

?

?

在一定時間范圍內,求所有用戶在經過所有基站停留時間最長的TOP2

?

?

思路:獲取用戶產生的log日志并切分

??????用戶在基站停留的總時長

??????過去基站的基礎信息

??????把經緯度信息join到用戶數據中

??????求出用戶在某些基站停留的時長的TOP2

?

?

??????object Demo ?{

????????def main(args: Array[String]): Unit = {

//模板代碼

val ??conf = new SparkConf()

.setAppName("ML")

.setMaster("local[2]")

val sc= new SparkContext(conf)

?

?

//獲取用戶訪問基站的log

val files=sc.textFile("地址")

//切分用戶的log

val userInfo=files.map(line=>{

val fields=line.split(",")

val phone = fields(0)//用戶手機號

val time = fields(1).toLong//時間戳

val lac = fields(2) //基站ID

val eventType = fields(3)//事件類型

val time_long = if(eventType.equals("1")) -time else time

?

?

((phone,lac),time_long)

})

?

//用戶在相同的基站停留的總時長

val ?sumedUserAndTime ?= userInfo.reduceByKey(_+_)

?

//為了便于和基站基礎信息進行Join 需要把數據調整,把基站ID作為key

val lacAndPhoneAndTime sumedUserAndTime.map(tup =>{

?

val phone = tup._1._1 //用戶手機號

val lac= tup._1._2//基站的ID

val time = tup._2 //用戶在某個基站停留的總時長

(lac,(phone,time))

})

?//獲取基站的基礎信息

?val lacInfo= sc.textFile("路徑")

//切分基站基礎數據

?val lacAndXY=lacInfo.map (line =>{

val fields = line.split(",")

val lac= fields(0)//基站ID

val x = files(1)//經度

val y = fields(2)//緯度

?

(lac,(x,y))

?

?})

?

//把經緯度信息join到用戶的訪問信息

val ?joined=lacAndPhoneAndTime join ?lacAndXY

?

//為了便于以后發呢組排序計算,需要整合數據

val phoneAndTimeAndXY=joined,map(tup=>{

val phone = tup._2._1._1//手機號

val lac = tup._1// ID

val time ?= tup._2._1._2

val xy = tup._2._2 //經緯度

phone,time,xy

?

})

//按照用戶手機號進行分組

val grouped=phoneAndTimeAndXY.groupBy(_._1)

//按照時長進行組內排序

//val ?sorted = grouped.map(x => (x._,x._2.toList.sortBy(_._2).reverse))

val ?sorted = grouped.mapValues(_.toList.sortBy(_._2).reverse)

//整合數據

val filterede=sorted.map(tup =>{

val phone= tup._1

?

val list = tup._2

val filteredList=list.map(x =>{

?

val time ?= x._2

val xy = x._3

?

??List(time,xy)

})

?

?

(phone,filteredList)

?

})

?

?

?

val res = filterede.mapValues(_.take(2))

?

?

?

sc.stop()

????????}

?

??????}


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

子长县| 石河子市| 霍州市| 乌拉特后旗| 乐业县| 巫溪县| 漠河县| 常德市| 凉城县| 元谋县| 会同县| 平顺县| 弥勒县| 乌兰浩特市| 白水县| 福建省| 兴山县| 万山特区| 海林市| 罗源县| 剑阁县| 辽源市| 定南县| 巴中市| 疏附县| 游戏| 庄河市| 郓城县| 宁阳县| 西安市| 年辖:市辖区| 尚义县| 黔江区| 衡山县| 稻城县| 临湘市| 高台县| 邛崃市| 石柱| 泰顺县| 武隆县|