您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關如何使用Nebula Graph Exchange將數據從Neo4j導入到Nebula Graph Database,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
下面主要講述如何使用數據導入工具 Nebula Graph Exchange 將數據從 Neo4j 導入到 Nebula Graph Database。在講述如何實操數據導入之前,我們先來了解下 Nebula Graph 內部是如何實現這個導入功能的。
我們這個導入工具名字是 Nebula Graph Exchange,采用 Spark 作為導入平臺,來支持海量數據的導入和保障性能。Spark 本身提供了不錯的抽象——DataFrame,使得可以輕松支持多種數據源。在 DataFrame 的支持下,添加新的數據源只需提供配置文件讀取的代碼和返回 DataFrame 的 Reader 類,即可支持新的數據源。
DataFrame 可以視為一種分布式存表格。DataFrame 可以存儲在多個節點的不同分區中,多個分區可以存儲在不同的機器上,從而支持并行操作。Spark 還提供了一套簡潔的 API 使用戶輕松操作 DataFrame 如同操作本地數據集一般。現在大多數數據庫提供直接將數據導出成 DataFrame 功能,即使某個數據庫并未提供此功能也可以通過數據庫 driver 手動構建 DataFrame。
Nebula Graph Exchange 將數據源的數據處理成 DataFrame 之后,會遍歷它的每一行,根據配置文件中 fields 的映射關系,按列名獲取對應的值。在遍歷 batchSize
個行之后,Exchange 會將獲取的數據一次性寫入到 Nebula Graph 中。目前,Exchange 是通過生成 nGQL 語句再由 Nebula Client 異步寫入數據,下一步會支持直接導出 Nebula Graph 底層存儲的 sst 文件,以獲取更好的性能。接下來介紹一下 Neo4j 數據源導入的具體實現。
雖然 Neo4j 官方提供了可將數據直接導出為 DataFrame 的庫,但使用它讀取數據難以滿足斷點續傳的需求,我們未直接使用這個庫,而是使用 Neo4j 官方的 driver 實現數據讀取。Exchange 通過在不同分區調取 Neo4j driver 執行不同 skip
和 limit
的 Cypher 語句,將數據分布在不同的分區,來獲取更好的性能。這個分區數量由配置項 partition
指定。
Exchange 中的 Neo4jReader 類會先將用戶配置中的 exec
Cypher 語句,return
后邊的語句替換成 count(*)
執行獲取數據總量,再根據分區數計算每個分區的起始偏移量和大小。這里如果用戶配置了 check_point_path
目錄,會讀取目錄中的文件,如果處于續傳的狀態,Exchange 會計算出每個分區應該的偏移量和大小。然后每個分區在 Cypher 語句后邊添加不同的 skip
和 limit
,調用 driver 執行。最后將返回的數據處理成 DataFrame 就完成了 Neo4j 的數據導入。
過程如下圖所示:
我們這里導入演示的系統環境如下:
cpu name:Intel(R) Xeon(R) CPU E5-2697 v3 @ 2.60GHz
cpu cores:14
memory size:251G
軟件環境如下:
Neo4j:3.5.20 社區版
Nebula graph:docker-compose 部署,默認配置
Spark:單機版,版本為 2.4.6 pre-build for hadoop2.7
由于 Nebula Graph 是強 schema 數據庫,數據導入前需先進行創建 Space,建 Tag 和 Edge 的 schema,具體的語法可以參考這里。
這里建了名為 test 的 Space,副本數為 1。這里創建了兩種 Tag 分別為 tagA 和 tagB,均含有 4 個屬性的點類型,此外,還創建一種名為 edgeAB 的邊類型,同樣含有 4 個屬性。具體的 nGQL 語句如下所示:
# 創建圖空間 CREATE SPACE test(replica_factor=1); # 選擇圖空間 test USE test; # 創建標簽 tagA CREATE TAG tagA(idInt int, idString string, tboolean bool, tdouble double); # 創建標簽 tagB CREATE TAG tagB(idInt int, idString string, tboolean bool, tdouble double); # 創建邊類型 edgeAB CREATE EDGE edgeAB(idInt int, idString string, tboolean bool, tdouble double);
同時向 Neo4j 導入 Mock 數據——標簽為 tagA 和 tagB 的點,數量總共為 100 萬,并且導入了連接 tagA 和 tagB 類型點邊類型為 edgeAB 的邊,共 1000 萬個。另外需要注意的是,從 Neo4j 導出的數據在 Nebula Graph 中必須存在屬性,且數據對應的類型要同 Nebula Graph 一致。
最后為了提升向 Neo4j 導入 Mock 數據的效率和 Mock 數據在 Neo4j 中的讀取效率,這里為 tagA 和 tagB 的 idInt
屬性建了索引。關于索引需要注意 Exchange 并不會將 Neo4j 中的索引、約束等信息導入到 Nebula Graph 中,所以需要用戶在執行數據寫入在 Nebula Graph 之后,自行創建索引和 REBUILD 索引(為已有數據建立索引)。
接下來就可以將 Neo4j 數據導入到 Nebula Graph 中了,首先我們需要下載和編譯打包項目,項目在 nebula-java 這個倉庫下 tools/exchange 文件夾中。可執行如下命令:
git clone https://github.com/vesoft-inc/nebula-java.git cd nebula-java/tools/exchange mvn package -DskipTests
然后就可以看到 target/exchange-1.0.1.jar
這個文件。
接下來編寫配置文件,配置文件的格式為:HOCON(Human-Optimized Config Object Notation),可以基于 src/main/resources/server_application.conf
文件的基礎上進行更改。首先對 nebula 配置項下的 address、user、pswd 和 space 進行配置,測試環境均為默認配置,所以這里不需要額外的修改。然后進行 tags 配置,需要 tagA 和 tagB 的配置,這里僅展示 tagA 配置,tagB 和 tagA 配置相同。
{ # ======neo4j連接設置======= name: tagA # 必須和 Nebula Graph 的中 tag 名字一致,需要在 Nebula Graph 中事先建好 tag server: "bolt://127.0.0.1:7687" # neo4j 的地址配置 user: neo4j # neo4j 的用戶名 password: neo4j # neo4j 的密碼 encryption: false # (可選): 傳輸是否加密,默認值為 false database: graph.db # (可選): neo4j database 名稱,社區版不支持 # ======導入設置============ type: { source: neo4j # 還支持 PARQUET、ORC、JSON、CSV、HIVE、MYSQL、PULSAR、KAFKA... sink: client # 寫入 Nebula Graph 的方式,目前僅支持 client,未來會支持直接導出 Nebula Graph 底層數據庫文件 } nebula.fields: [idInt, idString, tdouble, tboolean] fields : [idInt, idString, tdouble, tboolean] # 映射關系 fields,上方為 nebula 的屬性名,下方為 neo4j 的屬性名,一一對應 # 映射關系的配置是 List 而不是 Map,是為了保持 fields 的順序,未來直接導出 nebula 底層存儲文件時需要 vertex: idInt # 作為 nebula vid 的 neo4j field,類型需要是整數(long or int)。 partition: 10 # 分區數 batch: 2000 # 一次寫入 nebula 多少數據 check_point_path: "file:///tmp/test" # (可選): 保存導入進度信息的目錄,用于斷點續傳 exec: "match (n:tagA) return n.idInt as idInt, n.idString as idString, n.tdouble as tdouble, n.tboolean as tboolean order by n.idInt" }
邊的設置大部分與點的設置無異,但由于邊在 Nebula Graph 中有起點的 vid 和終點的 vid 標識,所以這里需要指定作為邊起點 vid 的域和作為邊終點 vid 的域。
下面給出邊的特別配置。
source: { field: a.idInt # policy: "hash" } # 起點的 vid 設置 target: { field: b.idInt # policy: "uuid" } # 終點的 vid 設置 ranking: idInt # (可選): 作為 rank 的 field partition: 1 # 這里分區數設置為 1,原因在后邊 exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) return a.idInt, b.idInt, r.idInt as idInt, r.idString as idString, r.tdouble as tdouble, r.tboolean as tboolean order by id(r)"
點的 vertex 和邊的 source、target 配置項下都可以設置 policy hash/uuid,它可以將類型為字符串的域作為點的 vid,通過 hash/uuid 函數將字符串映射成整數。
上面的例子由于作為點的 vid 為整數,所以并不需要 policy 的設置。hash/uuid的 區別請看這里。
Cypher 標準中如果沒有 order by
約束的話就不能保證每次查詢結果的排序一致,雖然看起來即便不加 order by
Neo4j 返回的結果順序也是不變的,但為了防止可能造成的導入時數據丟失,還是強烈建議在 Cypher 語句中加入 order by
,雖然這會增加導入的時間。為了提升導入效率, order by
語句最好選取有索引的屬性作為排序的屬性。如果沒有索引,也可觀察默認的排序,選擇合適的排序屬性以提高效率。如果默認的排序找不到規律,可以使用點/關系的 ID 作為排序屬性,并且將 partition
的值盡量設小,減少 Neo4j 的排序壓力,本文中邊 edgeAB
的 partition
就設置為 1。
另外 Nebula Graph 在創建點和邊時會將 ID 作為唯一主鍵,如果主鍵已存在則會覆蓋該主鍵中的數據。所以假如將某個 Neo4j 屬性值作為 Nebula Graph 的 ID,而這個屬性值在 Neo4j 中是有重復的,就會導致“重復 ID”對應的數據有且只有一條會存入 Nebula Graph 中,其它的則會被覆蓋掉。由于數據導入過程是并發地往 Nebula Graph 中寫數據,最終保存的數據并不能保證是 Neo4j 中最新的數據。
這里還要留意下斷點續傳功能,在斷點和續傳之間,數據庫不應該改變狀態,如添加數據或刪除數據,且 partition
數量也不能更改,否則可能會有數據丟失。
最后由于 Exchange 需要在不同分區執行不同 skip
和 limit
的 Cypher 語句,所以用戶提供的 Cypher 語句不能含有 skip
和 limit
語句。
接下來就可以運行 Exchange 程序導數據了,執行如下命令:
$SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.tools.importer.Exchange --master "local[10]" target/exchange-1.0.1.jar -c /path/to/conf/neo4j_application.conf
在上述這些配置下,導入 100 萬個點用時 13s,導入 1000 萬條邊用時 213s,總用時是 226s。
Neo4j 和 Nebula Graph 在系統架構、數據模型和訪問方式上都有一些差異,下表列舉了常見的異同
上述就是小編為大家分享的如何使用Nebula Graph Exchange將數據從Neo4j導入到Nebula Graph Database了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。