您好,登錄后才能下訂單哦!
本篇內容介紹了“怎么用Java客戶端將數據加載到Grakn知識圖中”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
我們將概述遷移的發生方式。
首先,我們需要與我們的Grakn鍵空間進行對話。為此,我們將使用Grakn的Java客戶端。
我們將遍歷每個數據文件,提取每個數據項并將其解析為JSON對象。
我們將每個數據項(以JSON對象的形式)傳遞給其對應的模板。模板返回的是用于將該項插入Grakn的Graql查詢。
我們將執行每個查詢以將數據加載到目標鍵空間 - phone_calls
。
在繼續之前,請確保已安裝Java 1.8并在計算機上運行Grakn服務器。
該項目使用SDK 1.8并命名phone_calls
。我將使用IntelliJ作為IDE。
修改pom.xml
以包含最新版本的Grakn(1.4.2)作為依賴項。
<?xml version =“1.0”encoding =“UTF-8”?>
< project xmlns = “http://maven.apache.org/POM/4.0.0”
xmlns:xsi = “http://www.w3.org/2001/XMLSchema-instance”
xsi:schemaLocation = “http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd” >
< modelVersion > 4.0.0 </ modelVersion >
< groupId > ai.grakn.examples </ groupId >
< artifactId > migrate-csv-to-grakn </ artifactId >
< version > 1.0-SNAPSHOT </ version >
< 存儲庫>
< repository >
< id >發布</ id >
< url > https://oss.sonatype.org/content/repositories/releases </ url >
</ repository >
</ repositories >
< properties >
< grakn.version > 1.4.2 </ grakn.version >
< maven.compiler.source > 1.7 </ maven.compiler.source >
< maven.compiler.target > 1.7 </ maven.compiler.target >
</ properties >
< dependencies >
< 依賴>
< groupId > ai.grakn </ groupId >
< artifactId > client-java </ artifactId >
< version > $ {grakn.version} </ version >
</ dependency >
</ dependencies >
</ project >
我們希望能夠配置Grakn注銷的內容。為此,請修改pom.xml
以排除slf4j
附帶grakn
并logback
作為依賴項添加。
<?xml version =“1.0”encoding =“UTF-8”?>
< project xmlns = “http://maven.apache.org/POM/4.0.0”
xmlns:xsi = “http://www.w3.org/2001/XMLSchema-instance”
xsi:schemaLocation = “http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd” >
< modelVersion > 4.0.0 </ modelVersion >
<! - ... - >
< dependencies >
< 依賴>
< groupId > ai.grakn </ groupId >
< artifactId > client-java </ artifactId >
< version > $ {grakn.version} </ version >
< exclusions >
< 排除>
< groupId > org.slf4j </ groupId >
< artifactId > slf4j-simple </ artifactId >
</ exclusion >
</ exclusions >
</ dependency >
< 依賴>
< groupId > ch.qos.logback </ groupId >
< artifactId > logback-classic </ artifactId >
< version > 1.2.3 </ version >
</ dependency >
</ dependencies >
</ project >
接下來,添加一個logback.xml
使用以下內容調用的新文件并將其放在下面src/main/resources
。
< configuration debug = “false” >
< root level = “INFO” />
</ configuration >
在src/main
創建一個名為的新文件Migration.java
。這是我們要編寫所有代碼的地方。
選擇以下數據格式之一并下載文件。下載四個文件中的每個文件后,將它們放在src/main/resources/data
目錄下。我們將使用它們將數據加載到我們的phone_calls
知識圖中。
隨后的所有代碼都將被寫入Migration.java
。
在此之前,我們需要一個結構來包含讀取數據文件和構建Graql查詢所需的詳細信息。這些細節包括:
數據文件的路徑,和
接收JSON對象并生成Graql插入查詢的模板函數。
為此,我們創建了一個名為的新子類Input
。
導入 mjson。杰森 ;
公共 類 遷移 {
abstract static class Input {
字符串 路徑 ;
public Input(String path){
這個。path = path ;
}
String getDataPath(){ return path ;}
abstract String template(Json 數據);
}
}
在本文的后面,我們將看到如何Input
創建類的實例,但在我們開始之前,讓我們將mjson
依賴項添加到文件中的dependencies
標記中pom.xml
。
< 依賴>
< groupId > org.sharegov </ groupId >
< artifactId > mjson </ artifactId >
< version > 1.4.0 </ version >
</ dependency >
是時候初始化了inputs
。
下面的代碼調用initialiseInputs()
返回集合的方法inputs
。然后,我們將使用input
此集合中的每個元素將每個數據文件加載到Grakn中。
//其他進口
導入 java。util。ArrayList ;
導入 java。util。收藏 ;
公共 類 遷移 {
抽象 靜態 類 輸入 {...}
public static void main(String [] args){
集合< Input > inputs = initialiseInputs();
}
static Collection < Input > initialiseInputs(){
集合< Input > inputs = new ArrayList <>();
// 接下來的是
回報 輸入 ;
}
}
//進口
公共 類 遷移 {
抽象 靜態 類 輸入 {...}
public static void main(String [] args){...}
static Collection < Input > initialiseInputs(){
集合< Input > inputs = new ArrayList <>();
輸入。add(new Input(“data / companies”){
@覆蓋
public String template(Json company){
返回 “插入$ company isa company has name” + 公司。at(“name”)+ “;” ;
}
});
回報 輸入 ;
}
}
input.getDataPath()
會回來的data/companies
。
鑒于company
是
{ name:“Telecom” }
input.template(company)
將返回
插入 $公司 ISA 公司 擁有 的名稱 “電信” ;
//進口
公共 類 遷移 {
抽象 靜態 類 輸入 {...}
public static void main(String [] args){...}
static Collection < Input > initialiseInputs(){
集合< Input > inputs = new ArrayList <>();
輸入。add(new Input(“data / companies”){...});
輸入。add(new Input(“data / people”){
@覆蓋
public String template(Json person){
//插入人
String graqlInsertQuery = “insert $ person isa person has phone-number” + person。at(“phone_number”);
如果(! 人。有(“FIRST_NAME” )){
//人不是客戶
graqlInsertQuery + = “has is-customer false” ;
} else {
//人是顧客
graqlInsertQuery + = “has is-customer true” ;
graqlInsertQuery + = “有名字” + 人。at(“first_name”);
graqlInsertQuery + = “有姓氏” + 人。at(“last_name”);
graqlInsertQuery + = “有城市” + 人。在(“城市”);
graqlInsertQuery + = “有年齡” + 人。在(“年齡”)。asInteger();
}
graqlInsertQuery + = “;” ;
return graqlInsertQuery ;
}
});
回報 輸入 ;
}
}
input.getDataPath()
會回來的data/people
。
鑒于person
是
{ phone_number:“+ 44 091 xxx” }
input.template(person)
將返回
插入 $人 擁有 電話- 數字 “+44 091 XXX” ;
并給予person
被
{ 冷杉- 名稱:“成龍”,最后- 名:“喬”,城市:“即墨”,年齡:77,PHONE_NUMBER:“+00 091 XXX” }
input.template(person)
將返回
插入 $人 擁有 電話- 數字 “+44 091 XXX” 具有 第一- 名字 “成龍” 已經 過去- 名字 “喬” 有 城 “即墨” 具有 年齡 77 ;
//進口
公共 類 遷移 {
抽象 靜態 類 輸入 {...}
public static void main(String [] args){...}
static Collection < Input > initialiseInputs(){
集合< Input > inputs = new ArrayList <>();
輸入。add(new Input(“data / companies”){...});
輸入。add(new Input(“data / people”){...});
輸入。add(new Input(“data / contracts”){
@覆蓋
public String template(Json contract){
//匹配公司
String graqlInsertQuery = “匹配$ company isa company has name” + contract。at(“company_name”)+ “;” ;
//匹配人
graqlInsertQuery + = “$ customer isa person has phone-number” + contract。at(“person_id”)+ “;” ;
//插入合同
graqlInsertQuery + = “insert(provider:$ company,customer:$ customer)isa contract;” ;
return graqlInsertQuery ;
}
});
回報 輸入 ;
}
}
input.getDataPath()
會回來的data/contracts
。
鑒于contract
是
{ company_name:“Telecom”,person_id:“ + 00 091 xxx” }
input.template(contract)
將返回
比賽 $公司 ISA 公司 擁有 的名稱 “電信” ; $客戶 ISA 人 擁有 電話- 數字 “+00 091 XXX” ; insert(provider:$ company,customer:$ customer)isa contract ;
//進口
公共 類 遷移 {
抽象 靜態 類 輸入 {...}
public static void main(String [] args){...}
static Collection < Input > initialiseInputs(){
集合< Input > inputs = new ArrayList <>();
輸入。add(new Input(“data / companies”){...});
輸入。add(new Input(“data / people”){...});
輸入。add(new Input(“data / contracts”){...});
輸入。add(new Input(“data / calls”){
@覆蓋
public String template(Json call){
//匹配來電者
String graqlInsertQuery = “match $ caller isa person has phone-number” + call。at(“caller_id”)+ “;” ;
//匹配被叫者
graqlInsertQuery + = “$ callee isa person has phone-number” + call。at(“callee_id”)+ “;” ;
//插入電話
graqlInsertQuery + = “insert $ call(caller:$ caller,callee:$ callee)isa call;” +
“$ call已經開始” + 來電。at(“started_at”)。asString()+ “;” +
“$ call has duration” + call。在(“持續時間”)。asInteger()+ “;” ;
return graqlInsertQuery ;
}
});
回報 輸入 ;
}
}
input.getDataPath()
會回來的data/calls
。
鑒于call
是
{ caller_id:“44 091 XXX” ,callee_id:“00 091 XXX” ,started_at:2018 - 08 - 10 T07:57:51,持續時間:148 }
input.template(call)
將返回
比賽 $呼叫者 ISA 人 擁有 電話- 數字 “+44 091 XXX” ; $被叫 ISA 人 擁有 電話- 數字 “+00 091 XXX” ; insert $ call(caller:$ caller,callee:$ callee)isa call ; $電話 已經 開始- 在 2018 - 08 - 10 T07:57:51 ; $電話 具有 持續時間 148 ;
現在我們已經為每個數據文件定義了數據路徑和模板,我們可以繼續連接我們的phone_calls
知識圖并將數據加載到其中。
//其他進口
進口 ai。grakn。GraknTxType ;
進口 ai。grakn。Keyspace ;
進口 ai。grakn。客戶。Grakn ;
進口 ai。grakn。util。SimpleURI ;
導入 java。io。UnsupportedEncodingException ;
公共 類 遷移 {
抽象 靜態 類 輸入 {...}
public static void main(String [] args){
集合< Input > inputs = initialiseInputs();
connectAndMigrate(輸入);
}
static void connectAndMigrate(Collection < Input > inputs){
SimpleURI localGrakn = new SimpleURI(“localhost”,48555);
Keyspace keyspace = Keyspace。of(“phone_calls”);
Grakn grakn = new Grakn(localGrakn);
Grakn。會話 會話 = grakn。session(keyspace);
輸入。forEach(輸入 - > {
系統。出。的println(“[由加載” + 輸入。getDataPath()+ “]成Grakn ...”);
嘗試 {
loadDataIntoGrakn(輸入,會話);
} catch(UnsupportedEncodingException e){
e。printStackTrace();
}
});
會議。close();
}
static Collection < Input > initialiseInputs(){...}
static void loadDataIntoGrakn(輸入 輸入,Grakn。會話 會話)拋出 UnsupportedEncodingException {...}
}
connectAndMigrate(Collection<Input> inputs)
是啟動數據遷移到phone_calls
知識圖中的唯一方法。
此方法發生以下情況:
grakn
創建Grakn實例,連接到我們在本地運行的服務器localhost:48555
。
session
創建A ,連接到鍵空間phone_calls
。
對于集合中的每個input
對象inputs
,我們稱之為loadDataIntoGrakn(input, session)
。這將負責將input
對象中指定的數據加載到我們的鍵空間中。
最后session
關閉了。
現在我們已經session
連接到phone_calls
鍵空間,我們可以繼續將數據實際加載到我們的知識圖中。
//進口
公共 類 遷移 {
抽象 靜態 類 輸入 {...}
public static void main(String [] args){
集合< Input > inputs = initialiseInputs();
connectAndMigrate(輸入);
}
static Collection < Input > initialiseInputs(){...}
static void connectAndMigrate(Collection < Input > inputs){...}
static void loadDataIntoGrakn(輸入 輸入,Grakn。會話 會話)拋出 UnsupportedEncodingException {
ArrayList < Json > items = parseDataToJson(輸入);
物品。forEach(item - > {
Grakn。交易 tx = 會話。交易(GraknTxType。WRITE);
String graqlInsertQuery = input。模板(項目);
系統。出。println(“執行Graql查詢:” + graqlInsertQuery);
tx。graql()。解析(graqlInsertQuery)。execute();
tx。commit();
tx。close();
});
系統。出。的println(“\ nInserted” + 項目。大小()+ “由項目[” + 輸入。getDataPath()+ “]。到Grakn \ n”個);
}
static ArrayList < Json > parseDataToJson(輸入 輸入)拋出 UnsupportedEncodingException {...}
}
為了將每個文件中的數據加載到Grakn中,我們需要:
檢索一個ArrayList
JSON對象,每個對象代表一個數據項。為此,我們呼吁parseDataToJson(input)
,和
對于每個JSON對象items
:a)創建事務tx
,b)構造graqlInsertQuery
使用相應的template
,c)運行query
,d)commit
事務和e)close
事務。
有關創建和提交事務的注意事項:為避免內存不足,建議在單個事務中創建和提交每個查詢。但是,為了更快地遷移大型數據集,每次查詢都會發生一次,其中是保證在單個事務上運行的最大查詢數。
現在我們已經完成了上述所有操作,我們已準備好讀取每個文件并將每個數據項解析為JSON對象。這些JSON對象將被傳遞給template
每個Input
對象上的方法。
我們要編寫實現parseDataToJson(input)
。
parseDataToJson(input)
根據數據文件的格式,實現會有所不同。
但無論數據格式是什么,我們都需要正確的設置來逐行讀取文件。為此,我們將使用InputStreamReader
。
//其他進口
導入 java。io。InputStreamReader ;
導入 java。io。讀者 ;
公共 類 遷移 {
抽象 靜態 類 輸入 {...}
public static void main(String [] args){...}
static void connectAndMigrate(Collection < Input > inputs){...}
static Collection < Input > initialiseInputs(){...}
static void loadDataIntoGrakn(輸入 輸入,Grakn。會話 會話)拋出 UnsupportedEncodingException {...}
public static Reader getReader(String relativePath)throws UnsupportedEncodingException {
返回 新 的InputStreamReader(遷移。類。getClassLoader()。的getResourceAsStream(relativePath),“UTF-8”);
}
}
我們將使用Univocity CSV Parser來解析我們的.csv
文件。讓我們為它添加依賴項。我們需要在dependencies
標簽中添加以下內容pom.xml
。
< 依賴>
< groupId > com.univocity </ groupId >
< artifactId > univocity-parsers </ artifactId >
< version > 2.7.6 </ version >
</ dependency >
完成后,我們將編寫parseDataToJson(input)
解析.csv
文件的實現。
//其他進口
進口 com。不公平。解析器。csv。CsvParser ;
進口 com。不公平。解析器。csv。CsvParserSettings ;
公共 類 遷移 {
抽象 靜態 類 輸入 {...}
public static void main(String [] args){...}
static void connectAndMigrate(Collection < Input > inputs){...}
static Collection < Input > initialiseInputs(){...}
static void loadDataIntoGrakn(輸入 輸入,Grakn。會話 會話)拋出 UnsupportedEncodingException {...}
static ArrayList < Json > parseDataToJson(輸入 輸入)拋出 UnsupportedEncodingException {
ArrayList < Json > items = new ArrayList <>();
CsvParserSettings settings = new CsvParserSettings();
設置。setLineSeparatorDetectionEnabled(true);
CsvParser 解析器 = 新的 CsvParser(設置);
解析器。beginParsing(getReader(輸入。getDataPath()+ “的.csv” ));
String [] columns = parser。parseNext();
String [] row ;
而((行 = 分析器。parseNext())!= 空){
Json item = Json。object();
對于(詮釋 我 = 0 ; 我 < 行。長度 ; 我++){
項目。set(columns [ i ],row [ i ]);
}
物品。add(item);
}
返回 的項目 ;
}
public static Reader getReader(String relativePath)throws UnsupportedEncodingException {
返回 新 的InputStreamReader(遷移。類。getClassLoader()。的getResourceAsStream(relativePath),“UTF-8”);
}
}
除了這個實現,我們還需要進行一次更改。
鑒于CSV文件的性質,生成的JSON對象將把.csv
文件的所有列作為其鍵,即使該值不存在,它也將被視為一個null
。
出于這個原因,我們需要在persontemplate
的input
實例方法中更改一行。
if (! person.has("first_name")) {...}
變
if (person.at("first_name").isNull()) {...}
。
我們將使用Gson的JsonReader來讀取我們的.json
文件。讓我們為它添加依賴項。我們需要在dependencies
標簽中添加以下內容pom.xml
。
< 依賴>
< groupId > com.google.code.gson </ groupId >
< artifactId > gson </ artifactId >
< version > 2.7 </ version >
</ dependency >
完成后,我們將編寫parseDataToJson(input)
用于讀取.json
文件的實現。
//其他進口
進口 com。谷歌。gson。流。JsonReader ;
公共 類 遷移 {
抽象 靜態 類 輸入 {...}
public static void main(String [] args){...}
static void connectAndMigrate(Collection < Input > inputs){...}
static Collection < Input > initialiseInputs(){...}
static void loadDataIntoGrakn(輸入 輸入,Grakn。會話 會話)拋出 UnsupportedEncodingException {...}
static ArrayList < Json > parseDataToJson(輸入 輸入)拋出 IOException {
ArrayList < Json > items = new ArrayList <>();
JsonReader jsonReader = 新 JsonReader(getReader(輸入。getDataPath()+ “上傳.json” ));
jsonReader。beginArray();
而(jsonReader。hasNext()){
jsonReader。beginObject();
Json item = Json。object();
而(jsonReader。hasNext()){
String key = jsonReader。nextName();
開關(jsonReader。PEEK()){
案例 STRING:
項目。集(鍵,jsonReader。nextString());
打破 ;
案件 編號:
項目。集(鍵,jsonReader。nextInt());
打破 ;
}
}
jsonReader。endObject();
物品。add(item);
}
jsonReader。endArray();
返回 的項目 ;
}
public static Reader getReader(String relativePath)throws UnsupportedEncodingException {
返回 新 的InputStreamReader(遷移。類。getClassLoader()。的getResourceAsStream(relativePath),“UTF-8”);
}
}
我們將使用Java的內置StAX來解析我們的.xml
文件。
要解析XML數據,我們需要知道目標標記的名稱。這需要在Input
類中聲明并在構造每個input
對象時指定。
//進口
公共 類 XmlMigration {
abstract static class Input {
字符串 路徑 ;
串 選擇 ;
public Input(String path,String selector){
這個。path = path ;
這個。selector = selector ;
}
String getDataPath(){ return path ;}
String getSelector(){ return selector ;}
abstract String template(Json 數據);
}
public static void main(String [] args){...}
static void connectAndMigrate(Collection < Input > inputs){...}
static Collection < Input > initialiseInputs(){
集合< Input > inputs = new ArrayList <>();
輸入。add(new Input(“data / companies”,“company”){...});
輸入。添加(新 輸入(“數據/人”,“人”){...});
輸入。add(new Input(“data / contracts”,“contract”){...});
輸入。add(new Input(“data / calls”,“call”){...});
回報 輸入 ;
}
static void loadDataIntoGrakn(輸入 輸入,Grakn。會話 會話)拋出 UnsupportedEncodingException,XMLStreamException {...}
public static Reader getReader(String relativePath)throws UnsupportedEncodingException {...}
}
現在用于parseDataToJson(input)
解析.xml
文件的實現。
//其他進口
導入 javax。xml。流。XMLInputFactory ;
導入 javax。xml。流。XMLStreamConstants ;
導入 javax。xml。流。XMLStreamException ;
導入 javax。xml。流。XMLStreamReader ;
公共 類 XmlMigration {
abstract static class Input {
字符串 路徑 ;
串 選擇 ;
public Input(String path,String selector){
這個。path = path ;
這個。selector = selector ;
}
String getDataPath(){ return path ;}
String getSelector(){ return selector ;}
abstract String template(Json 數據);
}
public static void main(String [] args){...}
static void connectAndMigrate(Collection < Input > inputs){...}
static Collection < Input > initialiseInputs(){
集合< Input > inputs = new ArrayList <>();
輸入。add(new Input(“data / companies”,“company”){...});
輸入。添加(新 輸入(“數據/人”,“人”){...});
輸入。add(new Input(“data / contracts”,“contract”){...});
輸入。add(new Input(“data / calls”,“call”){...});
回報 輸入 ;
}
static void loadDataIntoGrakn(輸入 輸入,Grakn。會話 會話)拋出 UnsupportedEncodingException,XMLStreamException {...}
static ArrayList < Json > parseDataToJson(輸入 輸入)拋出 UnsupportedEncodingException,XMLStreamException {
ArrayList < Json > items = new ArrayList <>();
XMLStreamReader r = XMLInputFactory。newInstance()。createXMLStreamReader(getReader(輸入。getDataPath()+ “的.xml” ));
字符串 鍵 ;
String value = null ;
Boolean inSelector = false ;
Json item = null ;
而([R 。hasNext()){
int event = r。next();
開關(事件){
case XMLStreamConstants。START_ELEMENT:
如果(?。號·getLocalName()。等于(輸入。getSelector())){
inSelector = true ;
item = Json。object();
}
打破 ;
case XMLStreamConstants。字符:
value = r。getText();
打破 ;
case XMLStreamConstants。END_ELEMENT:
key = r。getLocalName();
如果(inSelector && ! 關鍵。平等(輸入。getSelector())){
項目。set(key,value);
}
如果(鍵。等于(輸入。getSelector())){
inSelector = false ;
物品。add(item);
}
打破 ;
}
}
返回 的項目 ;
}
public static Reader getReader(String relativePath)throws UnsupportedEncodingException {...}
}
以下是我們Migrate.java
將CSV數據加載到Grakn中的樣子,并在這里找到JSON和XML文件的樣子。
包 ai。grakn。例子 ;
進口 ai。grakn。GraknTxType ;
進口 ai。grakn。Keyspace ;
進口 ai。grakn。客戶。Grakn ;
進口 ai。grakn。util。SimpleURI ;
/ **
*用于CSV,TSV和固定寬度文件的快速可靠的基于Java的解析器的集合
* @see <a href="https://www.univocity.com/pages/univocity_parsers_documentation"> univocity </a>
* /
進口 com。不公平。解析器。csv。CsvParser ;
進口 com。不公平。解析器。csv。CsvParserSettings ;
/ **
*適用于Java的精簡JSON庫,
* @see <a href="https://bolerio.github.io/mjson/"> mjson </a>
* /
導入 mjson。杰森 ;
導入 java。io。InputStreamReader ;
導入 java。io。讀者 ;
導入 java。io。UnsupportedEncodingException ;
導入 java。util。ArrayList ;
導入 java。util。收藏 ;
公共 課 CsvMigration {
/ **
*表示將輸入文件鏈接到自己的模板函數的Input對象,
*用于將Json對象映射到Graql查詢字符串
* /
abstract static class Input {
字符串 路徑 ;
public Input(String path){
這個。path = path ;
}
String getDataPath(){ return path ;}
abstract String template(Json 數據);
}
/ **
* 1.創建Grakn實例
* 2.創建到目標鍵空間的會話
* 3.初始化輸入列表,每個輸入包含解析數據所需的詳細信息
* 4.將csv數據加載到每個文件的Grakn
* 5.結束會議
* /
public static void main(String [] args){
集合< Input > inputs = initialiseInputs();
connectAndMigrate(輸入);
}
static void connectAndMigrate(Collection < Input > inputs){
SimpleURI localGrakn = new SimpleURI(“localhost”,48555);
Grakn grakn = new Grakn(localGrakn);
Keyspace keyspace = Keyspace。of(“phone_calls”);
Grakn。會話 會話 = grakn。session(keyspace);
輸入。forEach(輸入 - > {
系統。出。的println(“[由加載” + 輸入。getDataPath()+ “]成Grakn ...”);
嘗試 {
loadDataIntoGrakn(輸入,會話);
} catch(UnsupportedEncodingException e){
e。printStackTrace();
}
});
會議。close();
}
static Collection < Input > initialiseInputs(){
集合< Input > inputs = new ArrayList <>();
//定義用于構建公司Graql插入查詢的模板
輸入。add(new Input(“data / companies”){
@覆蓋
public String template(Json company){
返回 “插入$ company isa company has name” + 公司。at(“name”)+ “;” ;
}
});
//定義用于構造人Graql插入查詢的模板
輸入。add(new Input(“data / people”){
@覆蓋
public String template(Json person){
//插入人
String graqlInsertQuery = “insert $ person isa person has phone-number” + person。at(“phone_number”);
如果(個人。在(“FIRST_NAME” )。參考isNull()){
//人不是客戶
graqlInsertQuery + = “has is-customer false” ;
} else {
//人是顧客
graqlInsertQuery + = “has is-customer true” ;
graqlInsertQuery + = “有名字” + 人。at(“first_name”);
graqlInsertQuery + = “有姓氏” + 人。at(“last_name”);
graqlInsertQuery + = “有城市” + 人。在(“城市”);
graqlInsertQuery + = “有年齡” + 人。在(“年齡”)。asInteger();
}
graqlInsertQuery + = “;” ;
return graqlInsertQuery ;
}
});
//定義用于構造合同的模板Graql插入查詢
輸入。add(new Input(“data / contracts”){
@覆蓋
public String template(Json contract){
//匹配公司
String graqlInsertQuery = “匹配$ company isa company has name” + contract。at(“company_name”)+ “;” ;
//匹配人
graqlInsertQuery + = “$ customer isa person has phone-number” + contract。at(“person_id”)+ “;” ;
//插入合同
graqlInsertQuery + = “insert(provider:$ company,customer:$ customer)isa contract;” ;
return graqlInsertQuery ;
}
});
//定義用于構造調用Graql插入查詢的模板
輸入。add(new Input(“data / calls”){
@覆蓋
public String template(Json call){
//匹配來電者
String graqlInsertQuery = “match $ caller isa person has phone-number” + call。at(“caller_id”)+ “;” ;
//匹配被叫者
graqlInsertQuery + = “$ callee isa person has phone-number” + call。at(“callee_id”)+ “;” ;
//插入電話
graqlInsertQuery + = “insert $ call(caller:$ caller,callee:$ callee)isa call;” +
“$ call已經開始” + 來電。at(“started_at”)。asString()+ “;” +
“$ call has duration” + call。在(“持續時間”)。asInteger()+ “;” ;
return graqlInsertQuery ;
}
});
回報 輸入 ;
}
/ **
*將csv數據加載到我們的Grakn phone_calls鍵空間:
* 1.將數據項作為json對象列表獲取
* 2.對于每個json對象
* 一個。創建Grakn事務
* b。構造相應的Graql插入查詢
* C。運行查詢
* d。提交交易
* e。關閉交易
*
* @param輸入包含解析數據所需的詳細信息
* @param會話將創建一個事務
* @throws UnsupportedEncodingException
* /
static void loadDataIntoGrakn(輸入 輸入,Grakn。會話 會話)拋出 UnsupportedEncodingException {
ArrayList < Json > items = parseDataToJson(輸入); // 1
物品。forEach(item - > {
Grakn。交易 tx = 會話。交易(GraknTxType。WRITE); // 2a
String graqlInsertQuery = input。模板(項目); // 2b
系統。出。println(“執行Graql查詢:” + graqlInsertQuery);
tx。graql()。解析(graqlInsertQuery)。execute(); // 2c
tx。commit(); // 2d
tx。close(); // 2e
});
系統。出。的println(“\ nInserted” + 項目。大小()+ “由項目[” + 輸入。getDataPath()+ “]。到Grakn \ n”個);
}
/ **
* 1.通過流讀取csv文件
* 2.將每行解析為json對象
* 3.將json對象添加到項列表中
*
* @param輸入用于獲取數據文件的路徑,減去格式
* @return json對象列表
* @throws UnsupportedEncodingException
* /
static ArrayList < Json > parseDataToJson(輸入 輸入)拋出 UnsupportedEncodingException {
ArrayList < Json > items = new ArrayList <>();
CsvParserSettings settings = new CsvParserSettings();
設置。setLineSeparatorDetectionEnabled(true);
CsvParser 解析器 = 新的 CsvParser(設置);
解析器。beginParsing(getReader(輸入。getDataPath()+ “的.csv” )); // 1
String [] columns = parser。parseNext();
String [] row ;
而((行 = 分析器。parseNext())!= 空){
Json item = Json。object();
對于(詮釋 我 = 0 ; 我 < 行。長度 ; 我++){
項目。set(columns [ i ],row [ i ]); // 2
}
物品。add(item); // 3
}
返回 的項目 ;
}
public static Reader getReader(String relativePath)throws UnsupportedEncodingException {
返回 新 的InputStreamReader(CsvMigration。類。getClassLoader()。的getResourceAsStream(relativePath),“UTF-8”);
}
}
運行main
方法,坐下來,放松并觀察日志,同時數據開始涌入Grakn。
我們從設置項目和定位數據文件開始。
接下來,我們繼續設置遷移機制,該機制獨立于數據格式。
然后,我們了解了如何將具有不同數據格式的文件解析為JSON對象。
最后,我們運行了使用給定main
方法觸發connectAndMigrate
方法的方法inputs
。這將數據加載到我們的Grakn知識圖中。
“怎么用Java客戶端將數據加載到Grakn知識圖中”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。