您好,登錄后才能下訂單哦!
這篇文章給大家介紹kafka-Storm中如何將日志文件打印到local,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
閱讀前提:
1 : 您可能需要對 logback 日志系統有所了解
2 :您可能需要對于 kafka 有初步的了解
3:請代碼查看之前,請您仔細參考系統的業務圖解
由于kafka本身自帶了和『Hadoop』的接口,如果需要將kafka中的文件直接遷移到HDFS,請參看本ID的另外一篇博文:
業務系統-kafka-Storm【日志本地化】 - 2 :直接通過kafka將日志傳遞到HDFS
1: 一個正式環境系統的系統設計圖解:
通過kafka集群,在2個相同的topic之下,通過kafka-storm, he kafka-hadoop,2 個Consumer,針對同樣的一份數據,我們分流了2個管道:
其一: 實時通道
其二:離線通道
在日志本地化的過程之中,前期,由于日志的清洗,過濾的工作是放在Storm集群之中,也就是說,留存到本地locla的日志。是我們在Storm集群之中進行了清洗的數據。
也就是:
如下圖所示:
在kafka之中,通常而言,有如下的 代碼 用來處理:
在這里我們針對了2種日志,有兩個Consumer用來處理
package com.mixbox.kafka.consumer; public class logSave { public static void main(String[] args) throws Exception { Consumer_Thread visitlog = new Consumer_Thread(KafkaProperties.visit); visitlog.start(); Consumer_Thread orderlog = new Consumer_Thread(KafkaProperties.order); orderlog.start(); } }
在這里,我們依據不同的原始字段,將不同的數據保存到不同的文件之中。
package com.mixbox.kafka.consumer; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; /** * @author Yin Shuai */ public class Consumer_Thread extends Thread { // 在事實上我們會依據傳遞的topic名稱,來生成不桐的記錄機器 // private Logger _log_order = LoggerFactory.getLogger("order"); // private Logger _log_visit = LoggerFactory.getLogger("visit"); private Logger _log = null; private final ConsumerConnector _consumer; private final String _topic; public Consumer_Thread(String topic) { _consumer = kafka.consumer.Consumer .createJavaConsumerConnector(createConsumerConfig()); this._topic = topic; _log = LoggerFactory.getLogger(_topic); System.err.println("log的名稱" + _topic); } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); // 在這里我們的組ID為logSave props.put("group.id", KafkaProperties.logSave); props.put("zookeeper.session.timeout.ms", "100000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(_topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = _consumer .createMessageStreams(topicCountMap); for (KafkaStream<byte[], byte[]> kafkaStream : consumerMap.get(_topic)) { ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while (iterator.hasNext()) { MessageAndMetadata<byte[], byte[]> next = iterator.next(); try { // 在這里我們分拆了一個Consumer 來處理visit日志 logFile(next); System.out.println("message:" + new String(next.message(), "utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } } private void logFile(MessageAndMetadata<byte[], byte[]> next) throws UnsupportedEncodingException { _log.info(new String(next.message(), "utf-8")); } }
一個簡單的小tips:
logback.xml ,提醒您注意,這里的配置文件太過粗淺。如有需要,請自行填充。
<?xml version="1.0" encoding="UTF-8" ?> <configuration> <jmxConfigurator /> <!-- 控制臺輸出日志 --> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <!-- 過濾掉 TRACE 和 DEBUG 級別的日志 --> <!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> --> <!-- <level>INFO</level> --> <!-- </filter> --> <!-- 按天來回滾,如果需要按小時來回滾,則設置為{yyyy-MM-dd_HH} --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>f:/opt/log/test.%d{yyyy-MM-dd}.log</fileNamePattern> <!-- 如果按天來回滾,則最大保存時間為1天,1天之前的都將被清理掉 --> </rollingPolicy> <!-- 日志輸出格式 --> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern> %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}-%msg%n</pattern> </layout> </appender> <!-- 記錄到日志 文件的滾動日志 --> <appender name="ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file> e:/logs/error/error.log </file> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level> ERROR </level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> <!-- 定義每天生成一個日志文件 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>e:/logs/yuanshi-%d{yyyy-MM-dd}.log</fileNamePattern> <MaxHistory>10</MaxHistory> </rollingPolicy> <!-- 日志樣式 --> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}-%msg%n</pattern> </layout> </appender> <!-- 記錄到日志 文件的滾動日志 --> <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>E:\logs\file\file.log</file> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>INFO</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> <!-- 定義每天生成一個日志文件 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>e:/logs/venality-%d{yyyy-MM-dd}.log </fileNamePattern> <MaxHistory>10</MaxHistory> </rollingPolicy> <!-- 日志樣式 --> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}-%msg%n</pattern> </layout> </appender> <appender name="visit" class="ch.qos.logback.core.rolling.RollingFileAppender"> <File> E:\logs\visitlog\visit.log </File> <encoder> <pattern>%msg%n</pattern> </encoder> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>E:\logs\visit.log.%d{yyyy-MM-dd} </fileNamePattern> </rollingPolicy> </appender> <logger name="visit" additivity="false" level="INFO"> <appender-ref ref="visit" /> </logger> <appender name="order" class="ch.qos.logback.core.rolling.RollingFileAppender"> <File> E:\logs\orderlog\order.log </File> <encoder> <pattern>%msg%n </pattern> </encoder> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>E:\logs\order.log.%d{yyyy-MM-dd} </fileNamePattern> </rollingPolicy> </appender> <logger name="order" additivity="false" level="INFO"> <appender-ref ref="order" /> </logger> <root level="DEBUG"> <appender-ref ref="FILE" /> </root> </configuration>
關于kafka-Storm中如何將日志文件打印到local就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。