91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

storm怎么構建拓撲代碼

發布時間:2021-12-23 14:43:39 來源:億速云 閱讀:151 作者:iii 欄目:大數據

這篇文章主要講解了“storm怎么構建拓撲代碼”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“storm怎么構建拓撲代碼”吧!

storm怎么構建拓撲代碼

storm怎么構建拓撲代碼

storm怎么構建拓撲代碼

1.  構建拓撲代碼

package demo;

import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class AreaAmtTopo {

    public static void main(String[] args) {
    
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.Order_topic),5);
builder.setBolt("filter",new AreaFilterBolt(),5).shuffleGrouping("spout");
builder.setBolt("areabolt",new AreaAmtBolt(),2).fieldsGrouping("filter",new Fields("area_id"));
builder.setBolt("rsltbolt",new AreaRsltBolt(),1).shuffleGrouping("areabolt");
    }

}

2.一級過濾bolt

package demo;

import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
//一級的過濾bolt
public class AreaFilterBolt implements IBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("area_id","order_amt","create_time"));//tuple里面每個value的對應name
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //order_id,order_amt,create_time,area_id
        String order=input.getString(0);//取出集合values中的第一個value
        if(order!=null){
            
            String orderArr[]=order.split("\\t");
            collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time
            
        }

     }

    @Override
    public void prepare(Map arg0, TopologyContext arg1) {
        // TODO Auto-generated method stub

    }

}

3.局部匯總bolt(按日期和區域和匯總)

package demo;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

//局部匯總
public class AreaAmtBolt implements IBasicBolt {

    
    Map<String,Double> countsMap=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer declarer) {
        
        declarer.declare(new Fields("date_area","amt"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
        // TODO Auto-generated method stub
         countsMap =new HashMap<String, Double>();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector collector) {
        
        if(input!=null)//如果spout端沒數據就會發空值,所以要做判斷再往下發
        {
        String area_id=input.getString(0);
        Double order_amt=input.getDouble(1);
        String  order_date=input.getStringByField("order_date");
        
        Double count=countsMap.get(area_id+"_"+order_date);
        if (count==null){
            count = 0.0;    
        }
        
        count+=order_amt;
        countsMap.put(area_id+"_"+order_date,count);
        System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count);
        collector.emit(new Values(area_id+"_"+order_date,count));
        }
    }

    @Override
    public void cleanup() {
        countsMap.clear();
    }

}

4. 最終結果寫入Hbase

package demo;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//結果定時寫入hbase的bolt
public class AreaRsltBolt implements IBasicBolt {

    Map<String,Double> countsMap=null;
    long beginTime=System.currentTimeMillis();
    long endTime=0L;
    HBaseDao dao=null;
    @Override
    public void declareOutputFields(
            OutputFieldsDeclarer paramOutputFieldsDeclarer) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {
         countsMap =new HashMap<String, Double>();
         dao=new HBaseDAOImp();
    }

    @Override
    public void execute(Tuple input,
            BasicOutputCollector paramBasicOutputCollector) {
        String date_areaid=input.getString(0);
        double  order_amt=input.getDouble(1); 
        countsMap.put(date_areaid,order_amt);
        endTime=System.currentTimeMillis();
        if (endTime-beginTime>=5*1000){
        

           for(String key:countsMap.keySet()){
              //put into hbase
            //2014-05-05_1,amt
              dao.insert("area_order","cf","order_amt",countsMap.get(key));
              System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));
            }
            beginTime=System.currentTimeMillis();
        }
        
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

}

5. DateFmt代碼

package demo;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class DateFmt {

    public static final String date_long="yyyy-MM-dd HH:mm:ss";
    public static final String date_short="yyyy-MM-dd";
    
    public static SimpleDateFormat sdf=new SimpleDateFormat(date_short);
    public static String getCountDate(String date,String patton){
        SimpleDateFormat sdf=new SimpleDateFormat(patton);
        Calendar cal =Calendar.getInstance();
        if (date!=null){
            
            try {
                cal.setTime(sdf.parse(date));
            } catch (ParseException e) {
                
                e.printStackTrace();
            }
        }
        
        return sdf.format(cal.getTime());
        
    }
    
    public static Date parseDate(String dateStr) throws Exception{
        
        return sdf.parse(dateStr);
    }
    
    
    public static void main(String[] args) {
        
        System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long));
    }
}

感謝各位的閱讀,以上就是“storm怎么構建拓撲代碼”的內容了,經過本文的學習后,相信大家對storm怎么構建拓撲代碼這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

宁津县| 邮箱| 兰西县| 游戏| 普兰店市| 富顺县| 葵青区| 怀宁县| 宁乡县| 阳江市| 涪陵区| 平南县| 习水县| 莫力| 曲松县| 惠东县| 孝感市| 庆云县| 馆陶县| 库尔勒市| 炎陵县| 横山县| 长沙县| 甘谷县| 新河县| 新绛县| 塔河县| 石景山区| 南宫市| 同德县| 太白县| 渝北区| 扶沟县| 白玉县| 苍山县| 罗定市| 平舆县| 呼伦贝尔市| 都安| 德州市| 裕民县|