您好,登錄后才能下訂單哦!
TaildirSource類圖如下(列出主要類)
TailDirSource類
TailDirSource繼承了AbstractSource類,而AbstractSource類中channelProcessor屬性負責將Source中的Event提交給Channel組件
TailDirSource類通過配置參數匹配日志文件,獲取日志文件更新內容并且將已經讀取的偏移量記錄到特定的文件當中(position file)
configure()方法:
1.判斷從配置文件加載的配置是否合法,其中包括了對filegroups,以及以filegroups為單位的文件路徑是否存在等條件。
2.對batchSize,skipToEnd,writePosInterval,idleTimeout等變量進行初始化工作
batchSize定義了往Channel中發送Event的批量處理大小
skipToEnd定義了每次程序啟動,對文件進行讀取的時候,是否從文件尾部開始讀取數據,或者從文件最開始讀取。
writePosInterval,TaildirSource讀取每個監控文件都在位置文件中記錄監控文件的已經讀取的偏移量,writePosInterval則是定義了更新位置文件的間隔。
idleTimeout日志文件在idleTimeout間隔時間,沒有被修改,文件將被關閉
start()方法:
通過configure()初始化后的變量創建了ReliableTaildirEventReader對象,同時創建兩個線程池idleFileChecker和positionWriter,分別用于監控日志文件和記錄日志文件讀取的偏移量。
idleFileChecker實現一個Runnable接口,遍歷reader所有監控的文件,檢查文件最后修改時間+idleTimeout是否小于當前時間,說明日志文件在idleTimeout時間內沒有被修改,該文件將被關閉。
private class idleFileCheckerRunnable implements Runnable {
@Override
public void run() {
try {
long now = System.currentTimeMillis();
for (TailFile tf : reader.getTailFiles().values()) {
if (tf.getLastUpdated() + idleTimeout < now && tf.getRaf() != null) {
idleInodes.add(tf.getInode());
}
}
} catch (Throwable t) {
logger.error("Uncaught exception in IdleFileChecker thread", t);
}
}
}
positionWriter主要作用是記錄日志文件讀取的偏移量,以json格式("inode", inode, "pos", tf.getPos(), "file", tf.getPath()),其中inode是linux系統中特有屬性,在適應其他系統(Windows等)日志采集時ReliableTaildirEventReader.getInode()方法需要修改(注意:在利用Linux系統上inode實現上,文件是通過inode記錄日志讀取偏移量。所以即使文件名改變了,也不影響日志讀取,在我實現Window版本上,只采用了文件名對應日志讀取偏移量,文件名改變影響日志讀取
)。pos則是記錄的日志讀取的偏移量,file記錄了日志文件的路徑
process()方法:
process方法記錄了TailDirSource類中主要的邏輯,獲取每個監控的日志文件,調用tailFileProcess獲取每個日志文件的更新數據,并將每條記錄轉換為Event(具體細節要看ReliableTaildirEventReader的readEvents方法)
public Status process() {
Status status = Status.READY;
try {
existingInodes.clear();
existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
if (tf.needTail()) {
tailFileProcess(tf, true);
}
}
closeTailFiles();
try {
TimeUnit.MILLISECONDS.sleep(retryInterval);
} catch (InterruptedException e) {
logger.info("Interrupted while sleeping");
}
} catch (Throwable t) {
logger.error("Unable to tail files", t);
status = Status.BACKOFF;
}
return status;
}
ReliableTaildirEventReader類
構造ReliableTaildirEventReader對象的時候,首先會判斷各種必須參數是否合法等,然后加載position file獲取每個文件上次記錄的日志文件讀取的偏移量
loadPositionFile(String filePath) 不粘貼方法的具體代碼,主要就是獲取每個監控日志文件的讀取偏移量
readEvents()的各個不同參數方法中,下面這個是最主要的,該方法獲取當前日志文件的偏移量,調用TailFile.readEvents(numEvents, backoffWithoutNL, addByteOffset)方法將日志文件每行轉換為Flume的消息對象Event,并循環將每個event添加header信息。
public List<Event> readEvents(int numEvents, boolean backoffWithoutNL)
throws IOException {
if (!committed) {
if (currentFile == null) {
throw new IllegalStateException("current file does not exist. " + currentFile.getPath());
}
logger.info("Last read was never committed - resetting position");
long lastPos = currentFile.getPos();
currentFile.updateFilePos(lastPos);
}
List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset);
if (events.isEmpty()) {
return events;
}
Map<String, String> headers = currentFile.getHeaders();
if (annotateFileName || (headers != null && !headers.isEmpty())) {
for (Event event : events) {
if (headers != null && !headers.isEmpty()) {
event.getHeaders().putAll(headers);
}
if (annotateFileName) {
event.getHeaders().put(fileNameHeader, currentFile.getPath());
}
}
}
committed = false;
return events;
}
openFile(File file, Map<String, String> headers, long inode, long pos) 方法根據日志文件對象,headers,inode和偏移量pos創建一個TailFile對象
TailFile類
TaildirSource通過TailFile類操作處理每個日志文件,包含了RandomAccessFile類,以及記錄日志文件偏移量pos,最新更新時間lastUpdated等屬性
RandomAccessFile完美的符合TaildirSource的應用場景,RandomAccessFile支持使用seek()方法隨機訪問文件,配合position file中記錄的日志文件讀取偏移量,能夠輕松簡單的seek到文件偏移量,然后向后讀取日志內容,并重新將新的偏移量記錄到position file中。
readEvent(boolean backoffWithoutNL, boolean addByteOffset)方法:
下圖描述了該方法的調用層級,readEvent簡單的理解就是將每行日志轉為Event消息體,方法最終調用的是readFile()方法。
readLine()方法,有點難還在研究
public LineResult readLine() throws IOException {
LineResult lineResult = null;
while (true) {
if (bufferPos == NEED_READING) {
if (raf.getFilePointer() < raf.length()) {//當文件指針位置小于文件總長度的時候,就需要讀取指針位置到文件最后的數據
readFile();
} else {
if (oldBuffer.length > 0) {
lineResult = new LineResult(false, oldBuffer);
oldBuffer = new byte[0];
setLineReadPos(lineReadPos + lineResult.line.length);
}
break;
}
}
for (int i = bufferPos; i < buffer.length; i++) {
if (buffer[i] == BYTE_NL) {
int oldLen = oldBuffer.length;
// Don't copy last byte(NEW_LINE)
int lineLen = i - bufferPos;
// For windows, check for CR
if (i > 0 && buffer[i - 1] == BYTE_CR) {
lineLen -= 1;
} else if (oldBuffer.length > 0 && oldBuffer[oldBuffer.length - 1] == BYTE_CR) {
oldLen -= 1;
}
lineResult = new LineResult(true,
concatByteArrays(oldBuffer, 0, oldLen, buffer, bufferPos, lineLen));
setLineReadPos(lineReadPos + (oldBuffer.length + (i - bufferPos + 1)));
oldBuffer = new byte[0];
if (i + 1 < buffer.length) {
bufferPos = i + 1;
} else {
bufferPos = NEED_READING;
}
break;
}
}
if (lineResult != null) {
break;
}
// NEW_LINE not showed up at the end of the buffer
oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length,
buffer, bufferPos, buffer.length - bufferPos);
bufferPos = NEED_READING;
}
return lineResult;
}
readFile()按BUFFER_SIZE(默認8KB)作為緩沖讀取日志文件數據
private void readFile() throws IOException {
if ((raf.length() - raf.getFilePointer()) < BUFFER_SIZE) {
buffer = new byte[(int) (raf.length() - raf.getFilePointer())];
} else {
buffer = new byte[BUFFER_SIZE];
}
raf.read(buffer, 0, buffer.length);
bufferPos = 0;
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。