您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關flume中hdfssink如何自定義EventSerializer序列化類,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
因為之前做了hbasesink的序列化類,覺得寫hdfs的應該會很簡單,可是沒想到竟然不一樣。hdfs并沒有直接配置序列化類的選項需要根據fileType來選擇對相應序列化類,我們使用的datastream的類型,對應的類是HDFSDataStream,這個類默認的序列化類TEXT(這是個枚舉類型)
serializerType = context.getString("serializer", "TEXT");
枚舉的類如下:
public enum EventSerializerType { TEXT(BodyTextEventSerializer.Builder.class), HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class), AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class), CUSTOM(CUSTOMEventSerializer.Builder.class),//自定義的序列化類 OTHER(null); private final Class<? extends EventSerializer.Builder> builderClass; EventSerializerType(Class<? extends EventSerializer.Builder> builderClass) { this.builderClass = builderClass; } public Class<? extends EventSerializer.Builder> getBuilderClass() { return builderClass; } }
在里面加了自定義的類型和枚舉,在配置agent的時候配置好filetype和serializer即可,同樣需要編譯上傳。
自定義的序列化類如下:
public class CUSTOMEventSerializer implements EventSerializer { private final static Logger logger = LoggerFactory.getLogger(CUSTOMEventSerializer.class); private final String SPLITCHAR = "\001";//列分隔符 // for legacy reasons, by default, append a newline to each event written // out private final String APPEND_NEWLINE = "appendNewline"; private final boolean APPEND_NEWLINE_DFLT = true; private final OutputStream out; private final boolean appendNewline; private CUSTOMEventSerializer(OutputStream out, Context ctx) { this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT); this.out = out; } @Override public boolean supportsReopen() { return true; } @Override public void afterCreate() { // noop } @Override public void afterReopen() { // noop } @Override public void beforeClose() { // noop } @Override public void write(Event e) throws IOException { // 獲取日志信息 String log = new String(e.getBody(), StandardCharsets.UTF_8); logger.info("-----------logs-------" + log); // headers包含日志中項目編號和host信息 Map<String, String> headers = e.getHeaders(); String parsedLog = parseJson2Value(log, headers); out.write(parsedLog.getBytes()); logger.info("-----------values-------" + parsedLog); logger.info("-----------valueSSSSSS-------" + parsedLog.getBytes()); out.write('\n'); } /** * * @Title: parseJson2Value * @Description: 解析出json日志中的value。 * @param log json格式日志 * @param headers event頭信息 * @return * @return String 解析后的日志 * @throws */ private String parseJson2Value(String log, Map<String, String> headers) { log.replace("\\", "/"); String time = ""; String path = ""; Object value = ""; StringBuilder values = new StringBuilder(); ObjectMapper objectMapper = new ObjectMapper(); try { Map<String,Object> m = objectMapper.readValue(log, Map.class); for(String key:m.keySet()){ value = m.get(key); if (key.equals("uri")){ //解析訪問路徑 path = pasreUriToPath(value.toString()); } if(key.equals("time")){ time = value.toString().substring(10); } values.append(value).append(this.SPLITCHAR); } } catch (JsonParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JsonMappingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } // 解析headers中的項目編號和服務host String pcode = headers.get("pcode"); String host = headers.get("host"); values.append(path).append(this.SPLITCHAR). append(pcode).append(this.SPLITCHAR). append(host).append(this.SPLITCHAR). append(time).append(this.SPLITCHAR); //value字符串 return values.toString(); } @Override public void flush() throws IOException { // noop } public static class Builder implements EventSerializer.Builder { @Override public EventSerializer build(Context context, OutputStream out) { CUSTOMEventSerializer s = new CUSTOMEventSerializer(out, context); return s; } } /** * 把請求uri轉換成具體的訪問路徑 * * @param uri 請求uri * @return 訪問路徑 */ protected String pasreUriToPath(String uri){ if(uri == null || "".equals(uri.trim())){ return uri; } int index = uri.indexOf("/"); if(index > -1){ uri = uri.substring(index); } index = uri.indexOf("?"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(";"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(" HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf("HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } return uri; } }
關于“flume中hdfssink如何自定義EventSerializer序列化類”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。