91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flume中如何自定義Interceptor

發布時間:2021-07-29 15:44:33 來源:億速云 閱讀:150 作者:Leah 欄目:云計算

這期內容當中小編將會給大家帶來有關flume中如何自定義Interceptor,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

flume現狀:

這種比較個性化的轉換flume沒有相關插件

分析:

flume event 針對source為文本文件時,會一行一個event(默認小于2048長度)

而攔截器就是針對event來做處理的

代碼:

package com.wy.flume.interceptor;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;  

public class AdRefererLogFormatInterceptor implements Interceptor {
    //匹配user-agent
    private static final Pattern pattern = Pattern.compile("^\"(.*)\"\\s\"(.*)\"(.*)$");
    private static final HashMap<String, String> platform = initPlatforms();
    private static final HashMap<String, String> browser = initBrowsers();

    private static HashMap<String, String> initPlatforms() {
        HashMap<String, String> platforms = new HashMap<>();
        platforms.put("windows nt 6.2", "Win8");
        platforms.put("windows nt 6.2", "Win8");
        platforms.put("windows nt 6.1", "Win7");
        platforms.put("windows nt 6.0", "Win Longhorn");
        platforms.put("windows nt 5.2", "Win2003");
        platforms.put("windows nt 5.0", "Win2000");
        platforms.put("windows nt 5.1", "WinXP");
        platforms.put("windows nt 4.0", "Windows NT 4.0");
        platforms.put("winnt4.0", "Windows NT 4.0");
        platforms.put("winnt 4.0", "Windows NT");
        platforms.put("winnt", "Windows NT");
        platforms.put("windows 98", "Win98");
        platforms.put("win98", "Win98");
        platforms.put("windows 95", "Win95");
        platforms.put("win95", "Win95");
        platforms.put("windows", "Unknown Windows OS");
        platforms.put("os x", "MacOS X");
        platforms.put("ppc mac", "Power PC Mac");
        platforms.put("freebsd", "FreeBSD");
        platforms.put("ppc", "Macintosh");
        platforms.put("linux", "Linux");
        platforms.put("debian", "Debian");
        platforms.put("sunos", "Sun Solaris");
        platforms.put("beos", "BeOS");
        platforms.put("apachebench", "ApacheBench");
        platforms.put("aix", "AIX");
        platforms.put("irix", "Irix");
        platforms.put("osf", "DEC OSF");
        platforms.put("hp-ux", "HP-UX");
        platforms.put("netbsd", "NetBSD");
        platforms.put("bsdi", "BSDi");
        platforms.put("openbsd", "OpenBSD");
        platforms.put("gnu", "GNU/Linux");
        platforms.put("unix", "Unknown Unix OS");
        return platforms;
    }

    private static HashMap<String, String> initBrowsers() {
        HashMap<String, String> browsers = new HashMap<>();
        browsers.put("Flock", "Flock");
        browsers.put("Chrome", "Chrome");
        browsers.put("Opera", "Opera");
        browsers.put("MSIE", "IE");
        browsers.put("Internet Explorer", "IE");
        browsers.put("Shiira", "Shiira");
        browsers.put("Firefox", "Firefox");
        browsers.put("Chimera", "Chimera");
        browsers.put("Phoenix", "Phoenix");
        browsers.put("Firebird", "Firebird");
        browsers.put("Camino", "Camino");
        browsers.put("Netscape", "Netscape");
        browsers.put("OmniWeb", "OmniWeb");
        browsers.put("Safari", "Safari");
        browsers.put("Mozilla", "Mozilla");
        browsers.put("Konqueror", "Konqueror");
        browsers.put("icab", "iCab");
        browsers.put("Lynx", "Lynx");
        browsers.put("Links", "Links");
        browsers.put("hotjava", "HotJava");
        browsers.put("amaya", "Amaya");
        browsers.put("IBrowse", "IBrowse");
        return browsers;
    }

    private AdRefererLogFormatInterceptor() {
    }

    @Override
    public void initialize() {
        // NO-OP...
    }

    @Override
    public void close() {
        // NO-OP...
    }

    @Override
    public Event intercept(Event event) {
        String body = new String(event.getBody(), Charsets.UTF_8);
        String[] fields = body.split(",", 8);
        StringBuilder sb = new StringBuilder();
        sb.append(fields[0]);
        sb.append('\t');
        sb.append(fields[1]);
        sb.append('\t');
        sb.append(fields[2]);
        sb.append('\t');
        sb.append(fields[3]);
        sb.append('\t');
        sb.append(fields[4]);
        sb.append('\t');
        sb.append(fields[5]);
        sb.append('\t');
        sb.append(fields[6]);
        sb.append('\t');

        Matcher submatcher = pattern.matcher(fields[7].trim());
        String url = "";
        String os = "others";
        String br = "others";
        String ver = "";
        if (submatcher.matches()) {
            url = submatcher.group(1);
            String agent = submatcher.group(2);
            //匹配操作系統
            Set<String> platformKeys = platform.keySet();
            for (String platformKey : platformKeys) {
                Pattern pattern = Pattern.compile( Pattern.quote(platformKey) , Pattern.CASE_INSENSITIVE);
                Matcher matcher = pattern.matcher(agent);
                if (matcher.find()) {
                    os = platform.get(platformKey);
                    break;
                }
            }
            //匹配瀏覽器 和版本
            Set<String> browserKeys = browser.keySet();
            for (String browserKey : browserKeys) {
                Pattern pattern = Pattern.compile( Pattern.quote(browserKey) + ".*?([0-9\\.]+)", Pattern.CASE_INSENSITIVE);
                Matcher matcher = pattern.matcher(agent);
                if (matcher.find()) {
                    ver = matcher.group(1);
                    br = browser.get(browserKey);
                    break;
                }
            }
        }
        sb.append(url);
        sb.append('\t');
        sb.append(os);
        sb.append('\t');
        sb.append(br);
        sb.append('\t');
        sb.append(ver);
        //修改event body
        event.setBody(sb.toString().getBytes());
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
    }
    
    public static class Builder implements Interceptor.Builder {
        //使用Builder初始化Interceptor
        @Override
        public Interceptor build() {
            return new AdRefererLogFormatInterceptor();
        }

        @Override
        public void configure(Context context) {
            
        }
    }
}

部署:

1、將程序打包成AdRerfererLogInterceptor.jar

2、將jar包上傳到FLUME_HOME的lib目錄下(flume1.5采用bin安裝)

3、在配置文件中使用Interceptor

hdp2.sources.s1.interceptors = i1
hdp2.sources.s1.interceptors.i1.type = com.wy.flume.interceptor.AdRefererLogFormatInterceptor$Builder

優勢:

在數據傳輸的同時進行數據的處理,節省步驟,而且有flume幫組管理文件進度,程序中斷時不用手動做恢復(file channel)

總結:

在Interceptor中可以對event的header 和 body 進行處理,進而達到定制化的目的。

上述就是小編為大家分享的flume中如何自定義Interceptor了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

东安县| 泸定县| 旬阳县| 綦江县| 湘西| 邢台县| 洛南县| 万宁市| 新民市| 汉中市| 土默特右旗| 龙门县| 伽师县| 辉南县| 亳州市| 晋宁县| 城市| 凤城市| 西林县| 游戏| 原平市| 富裕县| 黑山县| 大同市| 大城县| 手游| 东兰县| 河南省| 逊克县| 勃利县| 斗六市| 阳东县| 克东县| 清水县| 全椒县| 固阳县| 茂名市| 运城市| 安远县| 延川县| 科技|