您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“hbase0.98 coprocessor Endpoint如何實現HelloWorld”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“hbase0.98 coprocessor Endpoint如何實現HelloWorld”這篇文章吧。
HBase作為列族數據庫最經常被人詬病的特性包括:無法輕易建立“二級索引”,難以執行求和、計數、排序等操作。比如,在舊版本的(<0.92)Hbase中,統計數據表的總行數,需要使用Counter方法,執行一次MapReduce Job才能得到。雖然HBase在數據存儲層中集成了MapReduce,能夠有效用于數據表的分布式計算。然而在很多情況下,做一些簡單的相加或者聚合計算的時候,如果直接將計算過程放置在server端,能夠減少通訊開銷,從而獲得很好的性能提升。于是,HBase在0.92之后引入了協處理器(coprocessors),實現一些激動人心的新特性:能夠輕易建立二次索引、復雜過濾器(謂詞下推)以及訪問控制等。 HBase協處理器的靈感來自于Jeff Dean 09年的演講( P66-67)。
####hbase coprocessor 大類分為兩種coprocessor分別是:
RegionObserver :它是一種類似于傳統數據庫的觸發器,提供了鉤子函數:Get、Put、Delete、Scan等。
Endpoint:是一個遠程rpc調用,類似于webservice形式調用,但他不適用xml,而是使用的序列化框架是protobuf(序列化后數據更小),本文將介紹此種Coprocessor.
Endpoint 允許您定義自己的動態RPC協議,用于客戶端與region servers通訊。Coprocessor 與region server在相同的進程空間中,因此您可以在region端定義自己的方法(endpoint),將計算放到region端,減少網絡開銷,常用于提升hbase的功能,如:count,sum等。
###我的環境
hadoop : 2.2
hbase-hadoop2 :0.98+
JDK:1.6 ##這里必須要1.6 要不然會出現不能加載jar包的現象。
操作系統:CentOS 6.4
###編寫代碼
首先你需要利用protobuf(網上自己搜google維護的目前發展到2.5版本) 工具成一個HelloWorld 序列化對象。
####HelloWorld.proto
option java_package = "com.gzhdi.coprocessor.generated"; option java_outer_classname = "ServerHelloworld"; option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; message HelloRequest { required bytes askWord = 10; } message HelloResponse { required bytes retWord = 10; } message AskRequest { required bytes ask = 100; } message AnsResponse { required bytes ans = 100; } service HelloWorld { rpc sendHello(HelloRequest) returns (HelloResponse); rpc question(AskRequest) returns (AnsResponse); }
使用命令生成代碼,并拷貝到你的工程里邊去,我的文件在工程下面放著呢,直接生成到工程里邊。 這段代碼就會生成一個HelloWorld.java文件.
protoc.exe --java_out=../src HelloWorld.proto
編寫主要代碼
####server端代碼
package com.gzhdi.copocessor; import java.io.IOException; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import com.google.protobuf.ByteString; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; import com.gzhdi.coprocessor.generated.ServerHelloworld; import com.gzhdi.coprocessor.generated.ServerHelloworld.AnsResponse; import com.gzhdi.coprocessor.generated.ServerHelloworld.AskRequest; import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloRequest; import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloResponse; public class HelloWorldEndPoint extends ServerHelloworld.HelloWorld implements Coprocessor,CoprocessorService{ private RegionCoprocessorEnvironment env; @Override public void sendHello(RpcController controller, HelloRequest request, RpcCallback<HelloResponse> done) { System.out.println("request HelloRequest:"+request.getAskWord()); HelloResponse resp=HelloResponse.newBuilder().setRetWord(ByteString.copyFromUtf8("hello world!!!")).build(); done.run(resp); } @Override public void question(RpcController controller, AskRequest request, RpcCallback<AnsResponse> done) { System.out.println("request question:"+request.getAsk()); AnsResponse resp=AnsResponse.newBuilder().setAns(ByteString.copyFromUtf8("helloworld,"+request.getAsk().toStringUtf8())).build(); done.run(resp); } @Override public Service getService() { return this; } @Override public void start(CoprocessorEnvironment env) throws IOException { if (env instanceof RegionCoprocessorEnvironment) { this.env = (RegionCoprocessorEnvironment)env; } else { throw new CoprocessorException("Must be loaded on a table region!"); } } @Override public void stop(CoprocessorEnvironment env) throws IOException { } }
####client 端代碼
package com.gzhdi.copocessor; import java.io.IOException; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import com.google.protobuf.ByteString; import com.google.protobuf.ServiceException; import com.gzhdi.coprocessor.generated.ServerHelloworld.AnsResponse; import com.gzhdi.coprocessor.generated.ServerHelloworld.AskRequest; import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloWorld; public class HelloWorldClient { public static void main(String[] args) throws ServiceException, Throwable { myclient(); } //如果你沒有寫好自己的例子可以跑跑hbase自帶的小例子 // private static void example1() throws IOException, ServiceException, // Throwable { // System.out.println("begin....."); // long begin_time=System.currentTimeMillis(); // Configuration config=HBaseConfiguration.create(); //// String master_ip="192.168.150.128"; // String master_ip="10.10.113.211"; // String zk_ip="10.10.113.211"; // String table_name="t1"; // config.set("hbase.zookeeper.property.clientPort", "2181"); // config.set("hbase.zookeeper.quorum", zk_ip); // config.set("hbase.master", master_ip+":600000"); // // HTable table = new HTable(config, table_name); // final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance(); // Map results = table.coprocessorService( // ExampleProtos.RowCountService.class, // the protocol interface we're invoking // null, null, // start and end row keys // // new Batch.Call() { // // public Long call(Object counter) throws IOException { // BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback = // new BlockingRpcCallback(); // ((ExampleProtos.RowCountService)counter).getRowCount(null, request, rpcCallback); // ExampleProtos.CountResponse response = rpcCallback.get(); // System.out.println("count :::::"+response.getCount()); // return response.hasCount() ? response.getCount() : 0; // } // // }); // } public static void myclient(){ // TODO Auto-generated method stub System.out.println("begin....."); long begin_time=System.currentTimeMillis(); Configuration config=HBaseConfiguration.create(); // String master_ip="192.168.150.128"; String master_ip="10.10.113.211"; String zk_ip="10.10.113.211"; String table_name="t1"; config.set("hbase.zookeeper.property.clientPort", "2181"); config.set("hbase.zookeeper.quorum", zk_ip); config.set("hbase.master", master_ip+":600000"); final AskRequest req=AskRequest.newBuilder().setAsk(ByteString.copyFromUtf8("hello")).build(); AnsResponse resp=null; try { HTable table=new HTable(config,table_name); Map<byte[], ByteString> re=table.coprocessorService(HelloWorld.class, null, null, new Batch.Call<HelloWorld, ByteString>() { @Override public ByteString call(HelloWorld instance) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<AnsResponse> rpccall=new BlockingRpcCallback<AnsResponse>(); instance.question(controller, req, rpccall); AnsResponse resp=rpccall.get(); //result System.out.println("resp:"+ resp.getAns().toStringUtf8()); return resp.getAns(); } }); } catch (IOException e) { e.printStackTrace(); } catch (ServiceException e) { e.printStackTrace(); } catch (Throwable e) { e.printStackTrace(); } } }
利用jdk 1.6打包(切記jdk1.6,因為hbase用1.6打包的) 導出hellworld.jar 包名隨便起。
###部署
將包helloworld.jar 放在 %HBASE_HOME/lib/ 下就可以了。
重新啟動hbase
驗證
[root@hdp22 ~ Desktop]# hbase shell hbase(main):001:0> import com.gzhdi.copocessor.HelloWorldEndPoint => Java::ComGzhdiCopocessor::HelloWorldEndPoint //如果打印出這句話就說明包已經加載完畢
向指定表添加endpoint
hbase(main):002:0> create 't1','f1' 0 row(s) in 6.5290 seconds => Hbase::Table - t1 //創建表t1 hbase(main):003:0> alter 't1','coprocessor'=>'|com.gzhdi.copocessor.HelloWorldEndPoint|1001|' Updating all regions with the new schema... 0/1 regions updated. 1/1 regions updated. Done. 0 row(s) in 2.5960 seconds hbase(main):005:0> describe 't1' DESCRIPTION ENABLED 't1', {TABLE_ATTRIBUTES => {coprocessor$1 => '|com.gzhdi.copocessor.HelloWorldEndPoint|1001|'}, {NAME => 'f1', DATA_BLO true CK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERS IONS => '0', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'} 1 row(s) in 0.0940 seconds //OK 成功了
###調用 現在就可以使用你的客戶端代碼調用該服務了,需要制定zookeeper地址和表名(因為服務是針對表的)。
以上是“hbase0.98 coprocessor Endpoint如何實現HelloWorld”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。