您好,登錄后才能下訂單哦!
Storm中有個特殊的Executor叫acker,他們負責跟蹤spout發出的每一個Tuple的Tuple樹。當acker發現一個Tuple樹已經處理完成了,它會告訴框架回調Spout的ack(),否則回調Spout的fail()。
Acker的跟蹤算法是Storm的主要突破之一,對任意大的一個Tuple樹,它只需要恒定的20字節就可以進行跟蹤。
我們期望的是,如果某個Tuple被Bolt執行失敗了,則Spout端可以重新發送該Tuple。但很遺憾的是,框架不會自動重新發送,需要我們自己手工編碼實現。后續給大家實戰案例!
什么是Tuple樹?
Spout類代碼如下:
package les19.Ack_Fail;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class AckSpout implements IRichSpout{
/**
*
*/
private static final long serialVersionUID = 1L;
FileInputStream fis;
InputStreamReader isr;
BufferedReader br;
private ConcurrentHashMap<Object, Values> _pending;//線程安全的Map,存儲emit過的tuple
private ConcurrentHashMap<Object, Integer> fail_pending;//存儲失敗的tuple和其失敗次數
SpoutOutputCollector collector = null;
String str = null;
@Override
public void nextTuple() {
try {
while ((str = this.br.readLine()) != null) {
// 過濾動作
UUID msgId = UUID.randomUUID();
String arr[] = str.split("\t");
String date = arr[2].substring(0, 10);
String orderAmt = arr[1];
Values val = new Values(date,orderAmt);
this._pending.put(msgId, val);
collector.emit(val, msgId);
System.out.println("_pending.size()="+_pending.size());
}
} catch (Exception e) {
// TODO: handle exception
}
}
@Override
public void close() {
// TODO Auto-generated method stub
try {
br.close();
isr.close();
fis.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
@Override
//初始化函數
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.collector = collector;
this.fis = new FileInputStream("order.log");
this.isr = new InputStreamReader(fis, "UTF-8");
this.br = new BufferedReader(isr);
_pending = new ConcurrentHashMap<Object, Values>();
fail_pending = new ConcurrentHashMap<Object, Integer>();
} catch (Exception e) {
e.printStackTrace();
}
// TODO Auto-generated method stub
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("date","orderAmt"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void ack(Object msgId) {
// TODO Auto-generated method stub
System.out.println("_pending size 共有:"+_pending.size());
System.out.println("spout ack:"+msgId.toString()+"---"+msgId.getClass());
this._pending.remove(msgId);
System.out.println("_pending size 剩余:"+_pending.size());
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public void fail(Object msgId) {
// TODO Auto-generated method stub
System.out.println("spout fail:"+msgId.toString());
Integer fail_count = fail_pending.get(msgId);//獲取該Tuple失敗的次數
if (fail_count == null) {
fail_count = 0;
}
fail_count ++ ;
if (fail_count>=3) {
//重試次數已滿,不再進行重新emit
fail_pending.remove(msgId);
}else {
//記錄該tuple失敗次數
fail_pending.put(msgId, fail_count);
//重發
this.collector.emit(this._pending.get(msgId), msgId);
}
}
}
Bolt如下:
package les19.Ack_Fail;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class AckBolt implements IRichBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
OutputCollector collector = null;
TopologyContext context = null;
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
int num = 0;
String url = null;
String session_id = null;
String date = null;
String province_id = null;
@Override
public void execute(Tuple input) {
try {
date = input.getStringByField("date") ;
Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt"));
collector.emit(input,new Values(date,orderAmt));//注意參數,第一個參數是Tuple本身
collector.ack(input);
// Thread.sleep(300);
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
}
//初始化,對應spout的open函數
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method
this.context = context ;
this.collector = collector ;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("date","orderAmt")) ;
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
TOPO類如下:
package les19.Ack_Fail;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
public class Ack_FailTopo {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new AckSpout(), 1);
builder.setBolt("bolt", new AckBolt(), 1).shuffleGrouping("spout");
Config conf = new Config() ;
//conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
conf.setDebug(false);
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder.createTopology());
}
}
}
想了解更多,見我的51CTO上的Storm視頻教程http://edu.51cto.com/course/course_id-9041.html
,本節來自第18-19講。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。