您好,登錄后才能下訂單哦!
這篇文章主要介紹“Flume怎么自定義Event Serializer序列化類”,在日常操作中,相信很多人在Flume怎么自定義Event Serializer序列化類問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flume怎么自定義Event Serializer序列化類”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
把日志從flume打到hbase中,但是我們的日志由于前期是存到MongoDb中的,所以都是Json格式的日志,這時候使用flume自帶的SimpleHbaseEventSerializer和RegexHbaseEventSerializer這樣的就不行了,于是開始痛苦的看源碼,自己寫序列化的類(這里需要注意,如果是在flume的hbasesink包下編寫的代碼,License信息一定要加上。就是最上面那段英文,要不然在運行的時候會報錯),比較簡單,編寫好類之后,編譯打包,傳到flume的lib目錄下,然后在配置agent的時候指定Serializer的類為編寫的類即可。下面是代碼(類注釋沒貼出來,見諒哈):
public class PRTMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { private byte[] table;//hbase表 private byte[] cf;//列簇 private byte[][] payload;//列集合 private byte[][] payloadColumn;//列值 private byte[] incrementColumn; private String rowSuffix;//roykey后綴 private String rowPrefix;//rowkey前綴 private byte[] incrementRow; private KeyType keyType;//rowkey后綴類型 private static final Logger logger = LoggerFactory.getLogger(PRTMSAsyncHbaseEventSerializer.class); @Override public void configure(Context context) { // TODO Auto-generated method stub //設置主鍵后綴類型,這里使用時間戳 keyType = KeyType.TS; if (iCol != null && !iCol.isEmpty()) { incrementColumn = iCol.getBytes(Charsets.UTF_8); } incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); } @Override public void configure(ComponentConfiguration conf) { // TODO Auto-generated method stub } @Override public void initialize(byte[] table, byte[] cf) { // TODO Auto-generated method stub this.table = table; this.cf = cf; } /** * * @Title: setEvent * @Description: 獲取日志信息,并解析出HBase的列以及列的value值 * @param event * @throws * @see org.apache.flume.sink.hbase.AsyncHbaseEventSerializer#setEvent(org.apache.flume.Event) */ @Override public void setEvent(Event event) { // TODO Auto-generated method stub //獲取日志信息 String log = new String(event.getBody(), StandardCharsets.UTF_8); //headers包含日志中項目編號和host信息 Map<String, String> headers = event.getHeaders(); JsonReader jsonReader = new JsonReader(new StringReader(log)); String name = ""; String value = ""; String path = ""; Map<String, String> kv = new HashMap<String, String>(); try { //解析日志中的鍵值對緩存到map中 jsonReader.beginObject(); while (jsonReader.hasNext()) { name = jsonReader.nextName(); value = jsonReader.nextString(); if(name.equals("uri")) path = value.split(" ")[1]; kv.put(name, value); } jsonReader.endObject(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } //解析headers中的項目id和服務host、路徑 if(path.contains("?")){ path = path.substring(0, path.indexOf("?")); } String pcode = headers.get("pcode"); String host = headers.get("host"); //將項目編號和服務器host添加到map中 kv.put("pcode",pcode); kv.put("host", host); //初始化列和value數組 this.payloadColumn = new byte[kv.keySet().size()][]; this.payload = new byte[kv.keySet().size()][]; int i = 0; //給hbase的列和value賦值 for (String key : kv.keySet()) { this.payloadColumn[i] = key.getBytes(); this.payload[i] = kv.get(key).getBytes(); i++; } //設置rowkey的前綴 格式是項目編號+路徑 this.rowSuffix = new StringBuilder(pcode).reverse().toString() + ":"+path+":"+kv.get("time"); } @Override public List<PutRequest> getActions() { // TODO Auto-generated method stub List<PutRequest> actions = new ArrayList<PutRequest>(); if (payloadColumn != null) { byte[] rowKey; try { rowKey = rowSuffix.getBytes(); // for 循環,提交所有列和對于數據的put請求。 for (int i = 0; i < this.payload.length; i++) { PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn[i], payload[i]); actions.add(putRequest); } } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; } @Override public List<AtomicIncrementRequest> getIncrements() { // TODO Auto-generated method stub List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>(); if (incrementColumn != null) { AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn); actions.add(inc); } return actions; } @Override public void cleanUp() { // TODO Auto-generated method stub } }
到此,關于“Flume怎么自定義Event Serializer序列化類”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。