您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關Spark是如何實現資源管理器比如yarn等可插拔的,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
作為Spark源碼閱讀愛好者,有誰想過Spark是如何實現資源管理器比如yarn等可插拔的呢?
其實,在這里不得不說一下,spark1.6及之前,資源管理器還是不可插拔,代碼是寫死在sparkContext類里的,你要想增加一種資源管理器,必須要修改SparkContext的代碼。
spark2.以后開始可以實現資源管理器的熱插拔,主要工具是ServiceLoader。本文就給大家揭示一下。
ServiceLoader與ClassLoader是Java中2個即相互區別又相互聯系的加載器.JVM利用ClassLoader將類載入內存,這是一個類聲明周期的第一步(一個java類的完整的生命周期會經歷加載、連接、初始化、使用、和卸載五個階段,當然也有在加載或者連接之后沒有被初始化就直接被使用的情況)。詳情請參閱:詳解Java類的生命周期
那ServiceLoader又是什么呢?ServiceLoader:一個簡單的服務提供者加載設施。服務 是一個熟知的接口和類(通常為抽象類)集合。服務提供者 是服務的特定實現。提供者中的類通常實現接口,并子類化在服務本身中定義的子類。服務提供者可以以擴展的形式安裝在 Java 平臺的實現中,也就是將 jar 文件放入任意常用的擴展目錄中。也可通過將提供者加入應用程序類路徑,或者通過其他某些特定于平臺的方式使其可用。……唯一強制要求的是,提供者類必須具有不帶參數的構造方法,以便它們可以在加載中被實例化。
通過在資源目錄META-INF/services中放置提供者配置文件 來標識服務提供者。文件名稱是服務類型的完全限定二進制名稱。該文件包含一個具體提供者類的完全限定二進制名稱列表,每行一個。忽略各名稱周圍的空格、制表符和空行。注釋字符為'#'('\u0023', NUMBER SIGN);忽略每行第一個注釋字符后面的所有字符。文件必須使用 UTF-8 編碼。
以延遲方式查找和實例化提供者,也就是說根據需要進行。服務加載器維護到目前為止已經加載的提供者緩存。每次調用 iterator 方法返回一個迭代器,它首先按照實例化順序生成緩存的所有元素,然后以延遲方式查找和實例化所有剩余的提供者,依次將每個提供者添加到緩存。可以通過 reload 方法清除緩存。
以上來源于Java API里的說明,也許說的很專業,讓我們有點暈頭轉向,我們可以簡單的認為:ServiceLoader也像ClassLoader一樣,能裝載類文件,但是使用時有區別,具體區別如下:
(1) ServiceLoader裝載的是一系列有某種共同特征的實現類,而ClassLoader是個萬能加載器;
(2)ServiceLoader裝載時需要特殊的配置,使用時也與ClassLoader有所區別;
(3)ServiceLoader還實現了Iterator接口。
[如有錯誤或不到的地方敬請指出,互相學習:)]
鏈接:https://www.cnblogs.com/sparkbj/articles/6208328.html
首先看一下SparkContext內部初始化管理器的代碼
// 創建和啟動調度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
主要類方法是createTaskScheduler,其中有片段是通過url來找到資源管理器的。
case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) }
getClusterManager內部實現了資源管理器的加載。
private def getClusterManager(url: String): Option[ExternalClusterManager] = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) { throw new SparkException( s"Multiple external cluster managers registered for the url $url: $serviceLoaders") } serviceLoaders.headOption }
然后我們可以找到相關配置了。
以上就是Spark是如何實現資源管理器比如yarn等可插拔的,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。