您好,登錄后才能下訂單哦!
本篇內容主要講解“GraphX的基礎知識有哪些”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“GraphX的基礎知識有哪些”吧!
Spark GraphX是一個分布式圖處理框架,Spark GraphX基于Spark平臺提供對圖計算和圖挖掘簡潔易用的而豐富多彩的接口,極大的方便了大家對分布式圖處理的需求。Spark GraphX由于底層是基于Spark來處理的,所以天然就是一個分布式的圖處理系統。圖的分布式或者并行處理其實是把這張圖拆分成很多的子圖,然后我們分別對這些子圖進行計算,計算的時候可以分別迭代進行分階段的計算,即對圖進行并行計算。
設計GraphX時,點分割和GAS都已成熟,在設計和編碼中針對它們進行了優化,并在功能和性能之間尋找最佳的平衡點。如同Spark本身,每個子模塊都有一個核心抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一種點和邊都帶屬性的有向多重圖。它擴展了Spark RDD的抽象,有Table和Graph兩種視圖,而只需要一份物理存儲。兩種視圖都有自己獨有的操作符,從而獲得了靈活操作和執行效率。
類成員
在GraphX中,圖的基礎類為Garph,它包含兩個RDD:一個為邊RDD,另一個為頂點RDD。可以用給定的邊RDD和頂點RDD構建一個圖。一旦構建好圖,就可以用edges()和vertices()來訪問邊和頂點的集合。VD和ED代表了用戶自定義的頂點和邊類,對應的圖是參數化類型的泛類型Graph[VD,ED]。GraphX中圖必須要有頂點和邊屬性。GraphX中Vertice和Edge持有VerticeId值,而不是頂點的引用。圖在集群中是分布式存儲的,不屬于單個JVM,因此一條邊的頂點可能在不同的集群節點上。
頂點: Vertice(VertexId, VD)
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)]
抽象值成員innerJoin
leftJoin
mapValues
···
具體值成員collect
count
distinct
filter
foreach
groupBy
isEmpty
persist
map
reduce
sortBy
toString
···
邊: Edge(VertexId, VertexId, ED)
class Edge[ED](srcId:VertexId, dstId:VertexId, attire:E
abstract class EdgeRDD[ED] extends RDD[Edge[ED]]
class EdgeTriplet[VD, ED] extends Edge[ED]
值成員Attr
srcId
srcAttr
dstId
dstAttr
抽象值成員innerJoin
mapValues
reverse
具體值成員++
aggregate
cache
collect
count
distinct
filter
foreach
groupBy
isEmpty
map
persist
reduce
sortBy
toString
···
圖: Graph(VD, ED)
abstract class Graph[VD,ED] extend Serializable
class GraphOps[VD,ED] extends Serializable
值成員collectEdges
collectNeiborIds
collectNeibors
degrees
filter
inDegrees
joinVertices
numEdges
numVertices
outDegrees
pageRank
personalizedPageRank
pickRandomVertex
pregel
triangleCount
···
抽象值成員cache
edges
mapEdges
mapTriplets
mapVertices
mask
outerJoinVertices
persist
reverse
subgraph
triplets
vertices
···
具體值成員aggregateMessages
mapEdges
mapTriplets
···
GraphX實例
引用
import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD
構圖
有很多方式從一個原始文件、RDD構造一個屬性圖。最一般的方法是利用Graph object。 下面的代碼從RDD集合生成屬性圖。
// 假設SparkContext已經被構造 val sc: SparkContext // 創建點RDD val users: RDD[(VertexId, (String, String))] =sc.parallelize( Array((3L, ("rxin", "student")), (7L, ("jgonzal","postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // 創建邊RDD val relationships: RDD[Edge[String]] = sc.parallelize( Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi") Edge(5L, 0L, "colleague"))) // 定義一個默認用戶,避免有不存在用戶的關系 val defaultUser = ("John Doe", "Missing") // 構造Graph val graph = Graph(users, relationships, defaultUser)
緩存
//緩存。默認情況下,緩存在內存的圖會在內存緊張的時候被強制清理,采用的是LRU算法 graph.cache() graph.persist(StorageLevel.MEMORY_ONLY) graph.unpersistVertices(true)
點、邊和三元組
下面的代碼用到了Edge樣本類。邊有一個srcId和dstId分別對應于源和目標頂點的標示符。另外,Edge類有一個attr成員用來存儲邊屬性。可以分別用graph.vertices和graph.edges成員將一個圖解構為相應的頂點和邊。graph.vertices返回一個VertexRDD[(String, String)],它繼承于 RDD[(VertexID, (String, String))]。所以我們可以用scala的case表達式解構這個元組。另一方面,graph.edges返回一個包含Edge[String]對象的EdgeRDD,我們也可以用到case類的類型構造器。
除了屬性圖的頂點和邊視圖,GraphX也包含了一個三元組視圖,三元視圖邏輯上將頂點和邊的屬性保存為一個RDD[EdgeTriplet[VD, ED]],它包含EdgeTriplet類的實例。EdgeTriplet類繼承于Edge類,并且加入了srcAttr和dstAttr成員,這兩個成員分別包含源和目的的屬性。我們可以用一個三元組視圖渲染字符串集合用來描述用戶之間的關系。
// 找出職業為postdoc的人 graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.collect // 計算源頂點ID大于目標頂點ID的邊的數量 graph.edges.filter(e => e.srcId > e.dstId).count graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count // 使用三元組視圖描述關系事實 val facts: RDD[String] = graph.triplets.map(triplet =>triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1) facts.collect.foreach(println(_))
度、入度、出度
正如RDDs有基本的操作map, filter和reduceByKey一樣,屬性圖也有基本的集合操作,這些操作采用用戶自定義的函數并產生包含轉換特征和結構的新圖。定義在Graph中的 核心操作是經過優化的實現。表示為核心操作的組合的便捷操作定義在GraphOps中。然而, 因為有Scala的隱式轉換,定義在GraphOps中的操作可以作為Graph的成員自動使用。例如,我們可以通過下面的方式計算每個頂點(定義在GraphOps中)的入度。區分核心圖操作和GraphOps的原因是為了在將來支持不同的圖表示。每個圖表示都必須提供核心操作的實現并重用很多定義在GraphOps中的有用操作。
val degrees: VertexRDD[Int] = graph.degrees; degrees.collect().foreach(println) val inDegrees: VertexRDD[Int] = graph.inDegrees inDegrees.collect().foreach(println) val outDegrees: VertexRDD[Int] = graph.outDegrees outDegrees.collect().foreach(println)
屬性操作:修改頂點和邊的屬性
屬性操作每個操作都產生一個新的圖,這個新的圖包含通過用戶自定義的map操作修改后的頂點或邊的屬性。Map操作根據原圖的一些特性得到新圖,原圖結構是不變的。這些操作的一個重要特征是它允許所得圖形重用原有圖形的結構索引(indices)。下面的兩行代碼在邏輯上是等價的,但是第一個不是圖操作,它不保存結構索引,所以不會從GraphX系統優化中受益。Map操作根據原圖的一些特性得到新圖,原圖結構是不變的。這些操作經常用來初始化的圖形,用作特定計算或者用來處理項目不需要的屬性。例如,給定一個圖,這個圖的頂點特征包含出度,我們為PageRank初始化它。
//頂點轉換,頂點age+1 //RDD操作,再構造新圖,不保存結構索引,不會被系統優化 val newVertices = graph.vertices.map { case (id, attr) => (id, (attr._1 + "-1", attr._2 + "-2")) } val newGraph2 = Graph(newVertices, graph.edges) //圖Map操作,被系統優化 val newGraph3 = graph.mapVertices((id, attr) => (id, (attr._1 + "-1", attr._2 + "-2")))
//構造一個新圖,頂點屬性是出度 val inputGraph: Graph[Int, String] = graph.outerJoinVertices( graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0)) //根據頂點屬性為出度的圖構造一個新圖,依據PageRank算法初始化邊與點 val outputGraph: Graph[Double, Double] =inputGraph.mapTriplets( triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
//創建一個新圖,頂點 VD 的數據類型為 User,并從 graph 做類型轉換 case class User(name: String, pos: String, inDeg: Int, outDeg: Int) val initialUserGraph: Graph[User, String] = graph.mapVertices { case (id, (name, age)) => User(name, pos, 0, 0)} //initialUserGraph 與 inDegrees、outDegrees(RDD)進行連接,并修改 initialUserGraph中 inDeg 值、outDeg 值 val userGraph = initialUserGraph.outerJoinVertices( initialUserGraph.inDegrees) { case (id, u, inDegOpt) => User(u.name, u.pos, inDegOpt.getOrElse(0), u.outDeg) }.outerJoinVertices( initialUserGraph.outDegrees) { case (id, u, outDegOpt) => User(u.name, u.pos, u.inDeg, outDegOpt.getOrElse(0)) } userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg} outDeg: ${v._2.outDeg}")) //出度和入讀相同的人員 userGraph.vertices.filter { case (id, u) => u.inDeg == u.outDeg }.collect.foreach { case (id, property) => println(property.name) }
自定義類型
Join 操作
map 操作
結構操作
//由已定義的頂點構成的子圖 val subGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing" ) subGraph.vertices.collect().foreach(println(_)) subGraph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1).collect().foreach(println(_))
//圖的反向操作,新的圖形的所有邊的方向相反,不修改頂點或邊性屬性、不改變的邊的數目,它可以有效地實現不必要的數據移動或復制 var rGraph = graph.reverse
//Mask操作也是根據輸入圖構造一個新圖,達到一個限制制約的效果 val ccGraph = graph.connectedComponents() val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing") val validCCGraph = ccGraph.mask(validGraph)
Mask
圖反向
子圖
聚合操作
//計算年齡大于自己的關注者的總人數和總年齡 val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)]( //Map函數 triplet => { if (triplet.srcAttr > triplet.dstAttr) { Iterator((triplet.dstId, (1, triplet.srcAttr))) } else { Iterator.empty } }, //Reduce函數 (a, b) => (a._1 + b._1, a._2 + b._2) ) //計算年齡大于自己的關注者的平均年齡 val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues((id, value) => value match {case (count, totalAge) => totalAge / count }) avgAgeOfOlderFollowers.collect.foreach(println(_)) //定義一個Reduce函數來計算圖中較大度的點 def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b } val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max) println(s"maxInDegree: $maxInDegree") val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max) println(s"maxOutDegree: $maxOutDegree") val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max) println(s"maxDegrees: $maxDegrees")
相鄰聚合
//計算鄰居相關函數,這些操作是相當昂貴的,需要大量的重復信息作為他們的通信,因此相同的計算還是推薦用mapReduceTriplets val neighboorIds:VertexRDD[Array[VertexId]] = graph.collectNeighborIds(EdgeDirection.Out) val neighboors:VertexRDD[Array[(VertexId, Double)]]= graph.collectNeighbors(EdgeDirection.Out);
Pregel API
//Pregel API。計算單源最短路徑 //通過GraphGenerators構建一個隨機圖 val numVertices = 100 val numEParts = 2 val mu = 4.0 val sigma = 1.3 val graph2 = GraphGenerators.logNormalGraph(sc, numVertices, numEParts, mu, sigma).mapEdges(e=> e.attr.toDouble) //定義一個源值 點 val sourceId: VertexId = 42 //初始化圖的所有點,除了與指定的源值點相同值的點為0.0以外,其他點為無窮大 val initialGraph = graph2.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) //Pregel有兩個參數列表,第一個參數列表包括的是:初始化消息、迭代較大數、邊的方向(Out)。第二個參數列表包括的是:用戶定義的接受消息、計算消息、聯合合并消息的函數。 val sssp = initialGraph.pregel(Double.PositiveInfinity)( //點程序 (id, dist, newDist) => math.min(dist, newDist), //發送消息 triplet => { if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, //合并消息 (a, b) => math.min(a, b) ) println(sssp.vertices.collect.mkString("\n"))
主要圖算法
val pageRankGraph = graph2.pageRank(0.001) pageRankGraph.vertices.sortBy(_._2,false).saveAsTextFile( "/user/hadoop/data/temp/graph/graph.pr") pageRankGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
val connectedComponentsGraph = graph2.connectedComponents() connectedComponentsGraph.vertices.sortBy(_._2, false).saveAsTextFile( "/user/hadoop/data/temp/graph/graph.cc") connectedComponentsGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
//TriangleCount主要用途之一是用于社區發現 保持sourceId小于destId val graph3 = GraphLoader.edgeListFile(sc, path, true) val triangleCountGraph = graph3.triangleCount() triangleCountGraph.vertices.sortBy(_._2,false).saveAsTextFile( "/user/hadoop/data/temp/graph/graph.tc") triangleCountGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
TriangleCount
Connected Components
PageRank
其他操作
var path = "/user/Hadoop/data/temp/graph/graph.txt" var minEdgePartitions = 1 var canonicalOrientation = false // if sourceId < destId this value is true val graph2 = GraphLoader.edgeListFile(sc, path, canonicalOrientation, minEdgePartitions,StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY)
//通過GraphGenerators構建一個隨機圖 val numVertices = 100 val numEParts = 2 val mu = 4.0 val sigma = 1.3 val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc, numVertices, numEParts, mu, sigma).mapVertices((id, _) => id.toDouble) graph.triplets.collect.foreach(triplet => println(triplet.srcId + "-" + triplet.srcAttr + "-" + triplet.attr + "-" + triplet.dstId + "-" + triplet.dstAttr))
val setA: VertexRDD[Int] = VertexRDD( sc.parallelize(0L until 100L).map(id => (id, 1))) val rddB: RDD[(VertexId, Double)] = sc.parallelize( 0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0))) val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _) val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
到此,相信大家對“GraphX的基礎知識有哪些”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。