您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關Storm中Thrift如何使用,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
1 IDL
首先是storm.thrift, 作為IDL里面定義了用到的數據結構和service
然后backtype.storm.generated, 存放從IDL通過Thrift自動轉化成的Java代碼
比如對于nimbus service
在IDL的定義為,
service Nimbus {
void submitTopology(1: string name, 2: string uploadedJarLocation, 3: stringjsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2:InvalidTopologyException ite);
void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3:string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyExceptionite);
void killTopology(1: string name) throws (1: NotAliveException e);
void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
void activate(1: string name) throws (1: NotAliveException e);
void deactivate(1: string name) throws (1: NotAliveException e);
void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyExceptionite);
// need to add functions for asking aboutstatus of storms, what nodes they're running on, looking at task logs
string beginFileUpload();
void uploadChunk(1: string location, 2: binary chunk);
void finishFileUpload(1: string location);
string beginFileDownload(1: string file);
//can stop downloading chunks when receive0-length byte array back
binary downloadChunk(1: string id);
// returns json
string getNimbusConf();
// stats functions
ClusterSummary getClusterInfo();
TopologyInfo getTopologyInfo(1: string id) throws (1:NotAliveException e);
//returns json
string getTopologyConf(1: string id) throws (1:NotAliveException e);
StormTopologygetTopology(1: string id) throws (1: NotAliveException e);
StormTopology getUserTopology(1: string id) throws (1:NotAliveException e);
}
而對應在Nimbus.java的Java代碼如下,
public class Nimbus {
public interface Iface {
public void submitTopology(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException,org.apache.thrift7.TException;
public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException;
public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;
public void killTopologyWithOpts(String name,KillOptions options) throws NotAliveException,org.apache.thrift7.TException;
public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;
public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;
public void rebalance(String name, RebalanceOptionsoptions) throws NotAliveException, InvalidTopologyException,org.apache.thrift7.TException;
public String beginFileUpload() throwsorg.apache.thrift7.TException;
public void uploadChunk(String location, ByteBufferchunk) throws org.apache.thrift7.TException;
public void finishFileUpload(String location) throws org.apache.thrift7.TException;
public String beginFileDownload(String file) throws org.apache.thrift7.TException;
public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;
public String getNimbusConf() throwsorg.apache.thrift7.TException;
public ClusterSummary getClusterInfo() throwsorg.apache.thrift7.TException;
public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;
public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;
public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;
public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;
}
2 Client
1. 首先Get Client,
NimbusClient client =NimbusClient.getConfiguredClient(conf);
看看backtype.storm.utils下面的client.getConfiguredClient的邏輯,
只是從配置中取出nimbus的host:port, 并new NimbusClient
public static NimbusClient getConfiguredClient(Map conf) {
try {
String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
int nimbusPort =Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
return new NimbusClient(conf, nimbusHost, nimbusPort);
} catch (TTransportException ex) {
throw new RuntimeException(ex);
}
}
NimbusClient 繼承自ThriftClient, public class NimbusClient extends ThriftClient
ThriftClient又做了什么? 關鍵是怎么進行數據序列化和怎么將數據傳輸到remote
這里看出Thrift對Transport和Protocol的封裝
對于Transport, 其實就是對Socket的封裝, 使用TSocket(host, port)
然后對于protocol, 默認使用TBinaryProtocol, 如果你不指定的話
public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {
try {
//locate loginconfiguration
Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);
//construct atransport plugin
ITransportPlugin transportPlugin= AuthUtils.GetTransportPlugin(storm_conf, login_conf);
//create a socketwith server
if(host==null) {
throw new IllegalArgumentException("host is not set");
}
if(port<=0) {
throw new IllegalArgumentException("invalid port: "+port);
}
TSocket socket = new TSocket(host, port);
if(timeout!=null) {
socket.setTimeout(timeout);
}
final TTransport underlyingTransport = socket;
//establishclient-server transport via plugin
_transport = transportPlugin.connect(underlyingTransport, host);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
_protocol = null;
if (_transport != null)
_protocol = new TBinaryProtocol(_transport);
}
2. 調用任意RPC
那么就看看submitTopologyWithOpts
client.getClient().submitTopologyWithOpts(name,submittedJar, serConf, topology, opts);
可以看出上面的Nimbus的interface里面有這個方法的定義, 而且Thrift不僅僅自動產生java interface, 而且還提供整個RPC client端的實現
public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException
{
send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf,topology, options);
recv_submitTopologyWithOpts();
}
分兩步,
首先send_submitTopologyWithOpts, 調用sendBase
接著, recv_submitTopologyWithOpts, 調用receiveBase
protected void sendBase(String methodName, TBase args) throws TException {
oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL,++seqid_));
args.write(oprot_);
oprot_.writeMessageEnd();
oprot_.getTransport().flush();
}
protected void receiveBase(TBase result, String methodName)throws TException {
TMessage msg = iprot_.readMessageBegin();
if (msg.type == TMessageType.EXCEPTION) {
TApplicationException x = TApplicationException.read(iprot_);
iprot_.readMessageEnd();
throw x;
}
if (msg.seqid != seqid_) {
throw newTApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName +" failed: out ofsequence response");
}
result.read(iprot_);
iprot_.readMessageEnd();
}
可以看出Thrift對protocol的封裝, 不需要自己處理序列化, 調用protocol的接口搞定
3 Server
Thrift強大的地方是, 實現了整個協議棧而不光只是IDL的轉化, 對于server也給出多種實現
下面看看在nimbus server端, 是用clojure來寫的
可見其中使用Thrift封裝的NonblockingServerSocket, THsHaServer,TBinaryProtocol, Proccessor, 非常簡單
其中processor會使用service-handle來處理recv到的數據, 所以作為使用者只需要在service-handle中實現Nimbus$Iface, 其他和server相關的, Thrift都已經幫你封裝好了, 這里使用的IDL也在backtype.storm.generated, 因為clojure基于JVM所以IDL只需要轉化成Java即可.
(defn launch-server! [conf nimbus]
(validate-distributed-mode! conf)
(let[service-handler (service-handler conf nimbus)
options (-> (TNonblockingServerSocket. (int (confNIMBUS-THRIFT-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor(Nimbus$Processor. service-handler))
)
(.addShutdownHook(Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stopserver))))
(log-message "StartingNimbus server...")
(.serve server)))
上述就是小編為大家分享的Storm中Thrift如何使用了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。