您好,登錄后才能下訂單哦!
本篇內容介紹了“Spark Core讀取ES的分區問題案例分析”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
1.Spark Core讀取ES
ES官網直接提供的有elasticsearch-hadoop 插件,對于ES 7.x,hadoop和Spark版本支持如下:
hadoop2Version = 2.7.1hadoop22Version = 2.2.0spark13Version = 1.6.2spark20Version = 2.3.0
浪尖這了采用的ES版本是7.1.1,測試用的Spark版本是2.3.1,沒有問題。整合es和spark,導入相關依賴有兩種方式:
a,導入整個elasticsearch-hadoop包
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>7.1.1</version> </dependency>
b,只導入spark模塊的包
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>7.1.1</version> </dependency>
浪尖這里為了測試方便,只是在本機起了一個單節點的ES實例,簡單的測試代碼如下:
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
object es2sparkrdd {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)
conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")
conf.set(ConfigurationOptions.ES_PORT, "9200")
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")
val sc = new SparkContext(conf)
import org.elasticsearch.spark._
sc.esRDD("posts").foreach(each=>{
each._2.keys.foreach(println)
})
sc.esJsonRDD("posts").foreach(each=>{
println(each._2)
})
sc.stop()
}
}
可以看到Spark Core讀取RDD主要有兩種形式的API:
a,esRDD。這種返回的是一個tuple2的類型的RDD,第一個元素是id,第二個是一個map,包含ES的document元素。
RDD[(String, Map[String, AnyRef])]
b,esJsonRDD。這種返回的也是一個tuple2類型的RDD,第一個元素依然是id,第二個是json字符串。
RDD[(String, String)]
雖然是兩種類型的RDD,但是RDD都是ScalaEsRDD類型。
要分析Spark Core讀取ES的并行度,只需要分析ScalaEsRDD的getPartitions函數即可。
2.源碼分析
首先導入源碼https://github.com/elastic/elasticsearch-hadoop這個是gradle工程,可以直接導入idea,然后切換到7.x版本即可。
廢話少說直接找到ScalaEsRDD,發現gePartitions是在其父類實現的,方法內容如下:
override def getPartitions: Array[Partition] = {
esPartitions.zipWithIndex.map { case(esPartition, idx) =>
new EsPartition(id, idx, esPartition)
}.toArray
}
esPartitions是一個lazy型的變量:
@transient private[spark] lazy val esPartitions = { RestService.findPartitions(esCfg, logger) }
這種聲明原因是什么呢?
lazy+transient的原因大家可以考慮一下。
RestService.findPartitions方法也是僅是創建客戶端獲取分片等信息,然后調用,分兩種情況調用兩個方法。
final List<PartitionDefinition> partitions;// 5.x及以后版本 同時沒有配置es.input.max.docs.per.partitionif (clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null) { partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);} else { partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);}
a).findSlicePartitions
這個方法其實就是在5.x及以后的ES版本,同時配置了
es.input.max.docs.per.partition
以后,才會執行,實際上就是將ES的分片按照指定大小進行拆分,必然要先進行分片大小統計,然后計算出拆分的分區數,最后生成分區信息。具體代碼如下:
long numDocs;if (readResource.isTyped()) { numDocs = client.count(index, readResource.type(), Integer.toString(shardId), query);} else { numDocs = client.countIndexShard(index, Integer.toString(shardId), query);}int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);for (int i = 0; i < numPartitions; i++) { PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions); partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));}
實際上分片就是用游標的方式,對_doc進行排序,然后按照分片計算得到的分區偏移進行數據的讀取,組裝過程是SearchRequestBuilder.assemble方法來實現的。
這個其實個人覺得會浪費一定的性能,假如真的要ES結合Spark的話,建議合理設置分片數。
b).findShardPartitions方法
這個方法沒啥疑問了就是一個RDD分區對應于ES index的一個分片。
PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,locationList.toArray(new String[0]));partitions.add(partition);
“Spark Core讀取ES的分區問題案例分析”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。