您好,登錄后才能下訂單哦!
這里小編先將需要的pom.xml的依賴提供給大家:(根據自己的版本進行修改)
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<spark.version>2.3.2</spark.version>
</properties>
<dependencies>
<dependency><!-- 依賴管理,有選擇的繼承-->
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.2.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.3</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180813</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
??主要是提供一個Utils,通過讀取配置文件進行創建ES的編程入口。
#elasticSearch.conf
elastic.host=192.168.130.131
elastic.port=9300
elastic.cluster.name=zzy-application
#Constants
public interface Constants {
String ELASTIC_HOST = "elastic.host";
String ELASTIC_PORT="elastic.port";
String ELASTIC_CLUSTER_NAME = "elastic.cluster.name";
}
#ElasticSearchUtil
import com.zy.es.constant.Constants;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.Properties;
/**
* 一般情況下的工具類都是單例
* 里面若干方法一般都是static
* 如果在連接集群的時候,集群的名稱對應不上:
* NoNodeAvailableException[None of the configured nodes are available:
*/
public class ElasticSearchUtil {
private static TransportClient client;
private static Properties ps;
static {
try {
InputStream resourceAsStream = ElasticSearchUtil.class.getClassLoader().getResourceAsStream("elasticsearch.conf");
ps =new Properties();
ps.load(resourceAsStream);
String host=ps.getProperty(Constants.ELASTIC_HOST);
int port = Integer.parseInt(ps.getProperty(Constants.ELASTIC_PORT));
String clusterName=ps.getProperty(Constants.ELASTIC_CLUSTER_NAME);
Settings settings =Settings.builder()
.put("cluster.name",clusterName)
.build();
client=new PreBuiltTransportClient(settings);
//這里可以有多個,集群模式
TransportAddress ta=new TransportAddress(
InetAddress.getByName(host),
port
);
//addTransportAddresses(TransportAddress... transportAddress),參數為一個可變參數
client.addTransportAddresses(ta);
} catch (IOException e) {
e.printStackTrace();
}
}
public static TransportClient getTransportClient(){
return client;
}
public static void close(TransportClient client){
if(client!=null){
client.close();
}
}
}
??小編這里提供了json、map、javabean、XContentBuilder四種創建方式。
import java.util
import com.zy.es.pojo.Book
import com.zy.es.utils.ElasticSearchUtil
import org.elasticsearch.action.index.IndexResponse
import org.elasticsearch.cluster.metadata.MetaData.XContentContext
import org.elasticsearch.common.xcontent.{XContentBuilder, XContentType}
import org.elasticsearch.common.xcontent.json.JsonXContent
import org.json.JSONObject
object createIndex {
private var index="library"
private var `type`="books"
private val client = ElasticSearchUtil.getTransportClient()
def main(args: Array[String]): Unit = {
createIndexByJson()
//createIndexByMap()
// createIndexByBean()
// createIndexByXContentBuilder()
//關閉es連接對象
ElasticSearchUtil.close(client)
}
/**
* 1.通過json方式創建
* java.lang.IllegalArgumentException: The number of object passed must be even but was [1]
* 在es5.x以上,使用XContentType.JSON來制定即可
*setSource(json.toString(),XContentType.JSON) 必須指定第二個參數。
*/
def createIndexByJson()={
val json=new JSONObject
json.put("name","我愛你中國")
json.put("author","周迅")
json.put("date","2018-6-6")
//返回創建后的結果
var response: IndexResponse = client.prepareIndex(index, `type`, "9")
.setSource(json.toString, XContentType.JSON).get()
//查看版本
println(response.getVersion)
}
/**
* 2.map方式
*/
def createIndexByMap(): Unit ={
val sourceMap=new util.HashMap[String,String]()
sourceMap.put("name","朝花夕拾")
sourceMap.put("author","魯迅")
sourceMap.put("date","2009-4-5")
var response: IndexResponse = client.prepareIndex(index, `type`, "2").setSource(sourceMap)
.get()
//查看版本
println(response.getVersion)
}
/**
* 3.使用普通的javabean
*/
def createIndexByBean()={
val book:Book=new Book("斗破蒼穹","天蠶土豆","2012-2-6");
val json=new JSONObject(book)
//返回創建后的結果
var response: IndexResponse = client.prepareIndex(index, `type`, "3")
.setSource(json.toString, XContentType.JSON).get()
//查看版本
println(response.getVersion)
}
/**
* 4.XContentBuilder方式
*/
def createIndexByXContentBuilder()={
var builder: XContentBuilder = JsonXContent.contentBuilder()
builder.startObject()
.field("name","西游記")
.field("author","吳承恩")
.field("version","1.0")
.endObject()
var response: IndexResponse = client.prepareIndex(index, `type`,"4").setSource(builder)
.get()
println(response.getVersion)
}
}
??小編這里提供了刪除數據,更新數據,批量操作。
import java.util
import com.zy.es.utils.ElasticSearchUtil
import org.elasticsearch.action.bulk.BulkResponse
import org.elasticsearch.action.delete.DeleteResponse
import org.elasticsearch.action.update.{UpdateRequestBuilder, UpdateResponse}
import org.elasticsearch.common.xcontent.{XContentBuilder, XContentType}
import org.elasticsearch.common.xcontent.json.JsonXContent
import org.json.JSONObject
object ElasticsearchCRUD {
private var index="library"
private var `type`="books"
private val client = ElasticSearchUtil.getTransportClient()
def main(args: Array[String]): Unit = {
//刪除數據
testDelete()
//更新
//testUpdate()
//批量操作
//testBulk()
//關閉連接對象
ElasticSearchUtil.close(client)
}
//刪除數據
def testDelete()={
var response: DeleteResponse = client.prepareDelete(index, `type`, "2").get()
println("version:"+response.getVersion)
}
//更新
def testUpdate()={
var builder: XContentBuilder = JsonXContent.contentBuilder()
builder.startObject()
.field("version","3.0")
.endObject()
var response: UpdateResponse = client.prepareUpdate(index, `type`, "4")
.setDoc(builder).get()
println("version:"+response.getVersion)
}
//批量操作
def testBulk()={
val map=new util.HashMap[String,String]()
map.put("name","無雙")
map.put("author","周潤發")
map.put("version","2")
val json=new JSONObject
json.put("name","紅樓夢")
json.put("author","曹雪芹")
json.put("version","1.0")
var responses: BulkResponse = client.prepareBulk().add(client.prepareIndex(index, `type`, "7")
.setSource(map))
.add(client.prepareIndex(index, `type`, "8").setSource(json.toString(),XContentType.JSON))
.get()
for(response <-responses.getItems){
print(response.getVersion)
}
}
}
import java.util
import com.zy.es.utils.ElasticSearchUtil
import org.elasticsearch.action.search.{SearchResponse, SearchType}
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder
import org.elasticsearch.search.{SearchHit, SearchHits}
import org.json.JSONObject
import scala.collection.JavaConversions
object testSearch {
private var index="library"
private var `type`="books"
private val client = ElasticSearchUtil.getTransportClient()
def main(args: Array[String]): Unit = {
//全文索引
//fullTextSearch()
//分頁索引
//pagingSearch()
//高亮索引
highlightSearch()
}
//全文索引
def fullTextSearch()={
val json=new JSONObject()
val response = client.prepareSearch(index) //設置檢索的類型
.setSearchType(SearchType.DEFAULT) //設置檢索的類型
.setQuery(QueryBuilders.matchQuery("author", "天蠶土豆")) //設置檢索方式
.get()
val hits = response.getHits //獲取檢索結果
println("totals:"+hits.getTotalHits) //檢索出的數據的個數
println("maxSource"+hits.getMaxScore) //最大的得分
//查詢的具體的內容
val myhits = hits.getHits
for(hit <- myhits){
val index = hit.getIndex
val id = hit.getId
val `type` = hit.getType
val source =hit.getSourceAsString
val score=hit.getScore
json.put("_index",index)
json.put("_id",id)
json.put("_type",`type`)
json.put("_score", score )
json.put("_source",new JSONObject(source))
println(json.toString())
}
}
//分頁索引
//分頁查詢:查詢第num頁,查count條 每一頁的長度*(num-1)+count
def pagingSearch(from:Int=0,size:Int=10)={
var response: SearchResponse = client.prepareSearch(index)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.matchQuery("name", "西游記"))
.setFrom(from)
.setSize(size)
.get()
val myhits: SearchHits = response.getHits
val total=myhits.totalHits
println("zzy為您查詢出"+total+"記錄:")
val hits: Array[SearchHit] = myhits.getHits
for (hit<-hits){
val map: util.Map[String, AnyRef] = hit.getSourceAsMap
val author=map.get("author")
val name=map.get("name")
val version=map.get("version")
print(
s"""
|author:${author}
|name:${name}
|version:${version}
""".stripMargin)
}
}
//高亮索引
def highlightSearch()={
val response=client.prepareSearch(index)
.setSearchType(SearchType.DEFAULT)
.setQuery(QueryBuilders.matchQuery("author","周潤發"))
.highlighter(new HighlightBuilder()
.field("author")//給哪個字段添加標簽
.preTags("<font color='red' size='20px'>")//添加的前置標簽
.postTags("</font>"))//添加的后置標簽
.get()
val myHits = response.getHits
val total = myHits.totalHits
println("zzy為您查詢出" + total + "記錄:")
val hits: Array[SearchHit] = myHits.getHits
for(hit <-hits){
//注意這里如果想要獲取高亮的字段,必須使用高亮的方式獲取
val HLfields = hit.getHighlightFields
//這里的field是設置高亮的字段名:author highlight查詢的所有的字段值(含高亮的)
for((field,highlight)<-JavaConversions.mapAsScalaMap(HLfields)){
var date=""
val fragments=highlight.getFragments
for(fragment <-fragments){
date+=fragment.toString
}
print(date)
}
}
}
}
首先我們現在自己的ES集群中添加一些數據:
#創建索引庫
curl -H "Content-Type: application/json" -XPUT 'http://192.168.130.131:9200/chinese'
#添加數據
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/1 -d'{"content":"美國留給伊拉克的是個爛攤子嗎"}'
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/2 -d'{"content":"公安部:各地校車將享最高路權"}'
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/3 -d'{"content":"中韓漁警沖突調查:韓警平均每天扣1艘中國漁船"}'
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/4 -d'{"content":"中國駐洛杉磯領事館遭亞裔男子槍擊 嫌犯已自首"}'
#然后使用不同的查詢看看效果:
import com.zy.es.utils.ElasticSearchUtil
import org.elasticsearch.action.search.{SearchResponse, SearchType}
import org.elasticsearch.index.query.QueryBuilders
object ChineseParticipleSearch {
private var index="chinese"
private var `type`="fulltext"
private val client = ElasticSearchUtil.getTransportClient()
def main(args: Array[String]): Unit = {
val response: SearchResponse =client.prepareSearch(index)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.matchQuery("content","中國"))
.get()
val myHits = response.getHits.getHits
for(hit <- myHits){
println(hit.getSourceAsString)
}
}
}
注意:我們這里使用match查詢,查詢了是“中國”
看看運行結果:
這里為什么美國也會被查詢出來?
這是因為:原生的查詢將‘中國’這個兩個字分開之后在進行檢索,索引會出現上圖中的查詢錯誤的情況。
那我們該怎么辦呢,我只想查詢出來有關中國的內容啊,沒關系中文分詞幫你解決。
??常見的中文分詞插件:IK,庖丁解牛中文分詞等等。這里我們使用IK分詞。
① 下載: https://github.com/medcl/elasticsearch-analysis-ik 版本對應
② 使用maven對源代碼進行編譯(在IK_HOME下):(mvn clean install -DskipTests)
③ 把編譯后的target/releases下的zip文件拷貝到 ES_HOME/plugins/analysis-ik目錄下面,然后解壓將其中的plugin-descriptor.properties 和plugin-security.policy文件中的ES的版本改為自己使用的版本
④ 修改ES_HOME/config/elasticsearch.yml文件,添加(ES6.x以上版本無需此操作)index.analysis.analyzer.default.type: ik
⑤ 重啟es服務
這里小編就有些粗暴了:
#ps -aux|grep elasticsearch
#kill -9 pid
#/ES_HOME/bin/elasticsearch -d 啟動
第一步: 將之前數據進行刪除
curl -XDELETE 'http://192.168.130.131:9200/chinese/1'
curl -XDELETE 'http://192.168.130.131:9200/chinese/2'
curl -XDELETE 'http://192.168.130.131:9200/chinese/3'
curl -XDELETE 'http://192.168.130.131:9200/chinese/4'
第二步: 重新加載數據,并設置為IK分詞
#設置為ik分詞
curl -XPOST http://192.168.130.131:9200/chinese/fulltext/_mapping -H 'Content-Type:application/json' -d'
{
"properties": {
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
}'
#添加數據
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/1 -d'{"content":"美國留給伊拉克的是個爛攤子嗎"}'
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/2 -d'{"content":"公安部:各地校車將享最高路權"}'
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/3 -d'{"content":"中韓漁警沖突調查:韓警平均每天扣1艘中國漁船"}'
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/4 -d'{"content":"中國駐洛杉磯領事館遭亞裔男子槍擊 嫌犯已自首"}'
第三步:
重新執行剛剛上面的代碼,這里我們看看結果:
整合條件:
ES官網:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html
maven依賴:https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop/6.2.4
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>6.2.4</version>
</dependency>
//如果使用spark中可以讀到ES中的數據,需要導入隱式轉換
import java.util.Date
import com.zy.es.utils.ElasticSearchUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.cluster.metadata.MetaData.XContentContext
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.spark._
/**
* spark整合ES
* 通過spark去讀取es中的數據,同時將操作之后的結果落地到ES
*/
object EsOnSpark {
private val client = ElasticSearchUtil.getTransportClient()
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("EsOnSpark")
.setMaster("local[2]")
.set("es.index.auto.create", "true") //寫數據的時候如果索引庫不存在,自動創建
.set("es.nodes", "192.168.130.131") //設置ES集群的節點
.set("es.port", "9200") //設置ES集群的端口
val sc = new SparkContext(conf)
var EsRDD: RDD[(String, String)] = sc.esJsonRDD("library/books") //指定index/type
var index = "es-spark"
var `type` = "book"
EsRDD.foreach { case (id, json) => {
client.prepareIndex(index, `type`, new Date().getTime.toString)
.setSource(json, XContentType.JSON).get()
println(id + "" + json)
}
}
sc.stop()
}
}
這里只是小編介紹一些常見的API操作,大家知道ES最大的優勢在于他的查詢,后期小編會進一步的補充關于ElasticSearch強大的查詢功能的API。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。