您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關如何實現node可讀流之流動模式,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
node的可讀流基于事件
可讀流之流動模式,這種流動模式會有一個"開關",每次當"開關"開啟的時候,流動模式起作用,如果將這個"開關"設置成暫停的話,那么,這個可讀流將不會去讀取文件,直到將這個"開關"重新置為流動。
讀取文件流程
讀取文件內容的流程,主要為:
打開文件,打開文件成功,將觸發open事件,如果打開失敗,觸發error事件和close事件,將文件關閉。
開始讀取文件中的內容,監聽data事件,數據處于流動狀態,可通過修改開關的狀態來暫停讀取。
每次讀取到的內容放入緩存中,并通過data事件將數據發布出去。
當文件中的內容讀取完畢之后,將文件關閉。
這一系列動作都是基于事件來進行操作的,而node中的事件我們都知道是一種發布訂閱模式來實現的。
下面我們來看一看,node是如何使用可讀流來讀取文件中的內容?
node 可讀流參數
首先我們通過fs模塊來創建一個可讀流,可讀流接受兩個參數:
第一個參數是要讀取的文件地址,在這里指明你要讀取哪個文件。
第二個參數是可選項,這個參數是一個對象,用來指定可讀流的一些具體的參數。
如下幾個參數我們來一一說明:
highWaterMark:設置高水位線,這個參數主要用于在讀取文件時,可讀流會將文件中的內容讀取到緩存當中,而這里我們需要創建一個buffer來緩存這些數據,所以這個參數是用來設置buffer的大小,如果不對這個參數進行設置的話,可讀流默認的配置64k。
flags:這個參數主要用于設置文件的執行模式,比如說我們具體的操作適用于讀取文件還是寫入文件等這些操作。如果是寫入文件的話那我們,使用的是w。如果是讀取文件的話那這個操作符就應該是r。
下面這張表格就說明了不同的符號代表不同含義:
符號 | 含義 |
---|---|
r | 讀文件,文件不存在報錯 |
r+ | 讀取并寫入,文件不存在報錯 |
rs | 同步讀取文件并忽略緩存 |
w | 寫入文件,不存在則創建,存在則清空 |
wx | 排它寫入文件 |
w+ | 讀取并寫入文件,不存在則創建,存在則清空 |
wx+ | 和w+類似,排他方式打開 |
a | 追加寫入 |
ax | 與a類似,排他方式寫入 |
a+ | 讀取并追加寫入,不存在則創建 |
ax+ | 作用與a+類似,但是以排他方式打開文件 |
autoClose:這個參數主要用于,對文件的關閉的一些控制。如果文件再打開的過程或者其他操作的過程中出現了錯誤的情況下,需要將文件進行關閉。那這個參數是設置文件是否自動關閉的功能。
encoding:node中用buffer來讀取文件操作的東西二進制數據。這些數據展現出來的話我們是一堆亂碼,所以需要,要我們對這個數據指定一個具體的編碼格式。然后將會對這些數據進行編碼轉化,這樣轉化出來的數據就是我們能看懂的數據。
starts:這個參數主要用于指定從什么位置開始讀取文件中的內容,默認的話是從零開始。
ends:這個參數主要用于指定定具體要讀取文件多長的數據,這里需要說明一下,這個參數是包括本身的位置,也就是所謂的包前和包后。
下面我們來看看可讀流具體例子:
let fs = require("fs"); let rs = fs.createReadStream("./a.js", { highWaterMark: 3, encoding: "utf8", autoClose: true, start: 0, end: 9 }); rs.on("open", () => {console.log("open");}); rs.on("close", () => {console.log("close");}); rs.on("data", data => { console.log(data); rs.pause();//暫停讀取 此時流動模式為暫停模式 }); setInterval(() => { rs.resume();//重新設置為流動模式,開始讀取數據 }, 1000); rs.on("end", () => { console.log("end"); }); rs.on("error", err => { console.log(err); });
手寫可讀流第一步
上面我們說過,node可讀流是基于node的核心模塊事件來完成的,所以在實現我們自己的可讀流時需要繼承events模塊,代碼如下:
let fs = require('fs'); let EventEmitter = require('events'); class ReadStream extends EventEmitter { }
繼承了EventEmitter類,我們就可以使用EventEmitter類中的各個方法,并且同樣是采用發布訂閱的模式了處理事件。
第二步:處理可讀流配置的參數
上面我們提到,node中創建可讀流時可以對這個流配置具體的參數,比如
let rs = fs.createReadStream("./a.js", { highWaterMark: 3, encoding: "utf8", autoClose: true, start: 0, end: 9 });
那么對于這些參數,我們自己實現的可讀流類也需要對這些參數進行處理,那么這些參數該如何進行處理呢?
constructor(path, options = {}) { super(); this.path = path; //指定要讀取的文件地址 this.highWaterMark = options.highWaterMark || 64 * 1024; this.autoClose = options.autoClose || true; //是否自動關閉文件 this.start = options.start || 0; //從文件哪個位置開始讀取 this.pos = this.start; // pos會隨著讀取的位置改變 this.end = options.end || null; // null表示沒傳遞 this.encoding = options.encoding || null;// buffer編碼 this.flags = options.flags || 'r'; this.flowing = null; // 模式開關 this.buffer = Buffer.alloc(this.highWaterMark);// 根據設置創建一個buffer存儲讀出來的數 this.open(); }
通常配置的原則是以用戶配置的參數為準,如果用戶沒有對這個參數進行設置的話,就采用默認的配置。
實現可讀流第三步:打開文件
這里原理是使用node模塊fs中的open方法。首先我們來回顧下fs.open()方法的使用。
fs.open(filename,flags,[mode],callback); //實例 fs.open('./1,txt','r',function(err,fd){});
這里需要說明下,回調函數callback中有2個參數:
第一個是error,node中異步回調都會返回的一個參數,用來說明具體的錯誤信息
第二個參數是fd,是文件描述符,用來標識文件,等價于open函數的第一個參數
好了,現在我們來看看我們自己的可讀流的open方法該如何實現吧:
open() { fs.open(this.path, this.flags, (err, fd) => { //fd標識的就是當前this.path這個文件,從3開始(number類型) if (err) { if (this.autoClose) { // 如果需要自動關閉則去關閉文件 this.destroy(); // 銷毀(關閉文件,觸發關閉事件) } this.emit('error', err); // 如果有錯誤觸發error事件 return; } this.fd = fd; // 保存文件描述符 this.emit('open', this.fd); // 觸發文件的打開的方法 }); }
從代碼上我們可以看出:
fs.open函數是異步函數,也就是說callback是異步執行的,在成功打開文件的情況下,fd這個屬性也是異步獲取到的,這點需要注意。
另外重要的一點是,如果在打開文件發生錯誤時,則表明打開文件失敗,那么此時就需要將文件關閉。
實現可讀流第四步:讀取文件內容
上面我們詳細說過,可讀流自身定義了一個"開關",當我們要讀取文件中的內容的時候,我們需要將這個"開關"打開,那么node可讀流本身是如何來打開這個"開關"的呢?
監聽data事件
node可讀流通過監聽data事件來實現這個"開關"的開啟:
rs.on("data", data => { console.log(data); });
當用戶監聽data事件的時候,"開關"開啟,不停的從文件中讀取內容。那么node是怎么監聽data事件的呢?
答案就是 事件模塊的newListener
這是因為node可讀流是基于事件的,而事件中,服務器就可以通過newListener事件監聽到從用戶這邊過來的所有事件,每個事件都有對應的類型,當用戶監聽的是data事件的時候,我們就可以獲取到,然后就可以去讀取文件中的內容了,那我們自己的可讀流該如何實現呢?
// 監聽newListener事件,看是否監聽了data事件,如果監聽了data事件的話,就開始啟動流動模式,讀取文件中的內容 this.on("newListener", type => { if (type === "data") { // 開啟流動模式,開始讀取文件中的內容 this.flowing = true; this.read(); } });
好了,知道了這個"開關"是如何打開的,那么這個時候就到了真正讀取文件中內容的關鍵時候了,先上代碼先:
read() { // 第一次讀取文件的話,有可能文件是還沒有打開的,此時this.fd可能還沒有值 if (typeof this.fd !== "number") { // 如果此時文件還是沒有打開的話,就觸發一次open事件,這樣文件就真的打開了,然后再讀取 return this.once("open", () => this.read()); } // 具體每次讀取多少個字符,需要進行計算,因為最后一次讀取倒的可能比highWaterMark小 let howMuchRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark; fs.read(this.fd, this.buffer, 0, howMuchRead, this.pos, (err, byteRead) => { // this.pos 是每次讀取文件讀取的位置,是一個偏移量,每次讀取會發生變化 this.pos += byteRead; // 將讀取到的內容轉換成字符串串,然后通過data事件,將內容發布出去 let srr = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead); // 將內容通過data事件發布出去 this.emit("data", srr); // 當讀取到到內容長度和設置的highWaterMark一致的話,并且還是流動模式的話,就繼續讀取 if ((byteRead === this.highWaterMark) && this.flowing) { return this.read(); } // 沒有更多的內容了,此時表示文件中的內容已經讀取完畢 if (byteRead < this.highWaterMark) { // 讀取完成,發布end方法,并關閉文件 this.emit("end"); this.destory(); } }); }
這里我們特別要注意的是:
文件是否已經打開,是否獲取到fd,如果沒有打開的話,則再次觸發open方法
分批次讀取文件內容,每次讀取的內容是變化的,所以位置和偏移量是要動態計算的
控制讀取停止的條件。
實現可讀流第五步:關閉文件
好了,到現在,基礎的讀取工作已經完成,那么就需要將文件關閉了,上面的open和read方法里面都調用了一個方法:destory,沒錯,這個就是關閉文件的方法,好了,那么我們來看看這個方法該如何實現吧
destory() { if (typeof this.fd !== "number") { // 發布close事件 return this.emit("close"); } // 將文件關閉,發布close事件 fs.close(this.fd, () => { this.emit("close"); }); }
當然這塊的原理就是調用fs模塊的close方法啦。
實現可讀流第六步:暫停和恢復
既然都說了,node可讀流有一個神奇的"開關",就像大壩的閥門一樣,可以控制水的流動,同樣也可以控制水的暫停啦。當然在node可讀流中的暫停是停止對文件的讀取,恢復就是將開關打開,繼續讀取文件內容,那么這兩個分別對應的方法就是pause()和resume()方法。
那么我們自己的可讀流類里面該如何實現這兩個方法的功能呢?非常簡單:
我們在定義類的私有屬性的時候,定義了這樣一個屬性flowing,當它的值為true時表示開關打開,反之關閉。
pause() { this.flowing = false;// 將流動模式設置成暫停模式,不會讀取文件 } resume() { this.flowing = true;//將模式設置成流動模式,可以讀取文件 this.read();// 重新開始讀取文件 }
關于“如何實現node可讀流之流動模式”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。