您好,登錄后才能下訂單哦!
在Unix中流是一個標準的概念,有標準的輸入、輸出和標準的錯誤
例如:
打印出所有的js文件交給grep 來過濾出包含http文件的內容,稱之為Unix的管道
cat *.js | grep http
從上節得知Buffer是保存字節的數據,而流是用來暫存和移動數據的,它倆通常是結合起來來使用,我們來拷貝文件,像讀取logo,是全部的讀取入到內存中,然后再寫入到文件中,對于體積比較大的的文件就不夠用了假設我們的服務器需要不斷的去讀取文件,然后返回給客戶端,同時又有好多人都在請求這個文件,這樣每個請求都去讀入一次內存,然后內存很快就爆掉了,最好的方式是邊讀邊寫
這就需要借助流來完成,那NodeJs中哪些模塊涉及到了流;比如http、文件系統、壓縮模塊、tcp socket并且流是以buffer的形式存在,這樣更高效。
改造logo圖片讀取操作
var fs = require('fs') var source = fs.readFileSync('logo.png') fs.writeFileSync('stream_copy_logo.png',source)
但是這樣的操作會不會太簡單了,而沒有辦法精細的控制數據在流里面的傳輸,
以上這些都不用擔心,Stream是基于事件機制進行工作的,
流在各個方面的變化都可以被我們監聽到
var fs = require('fs') //聲明一個可讀流 var readStream = fs.createReadStream('logo_stream.js') //Stream在傳輸的時候會觸發data事件 readStream .on('data',function(chunk){ console.log('data emits') console.log(Buffer.isBuffer(chunk)) console.log(chunk.toString('utf8')) }) //還有readable事件,可讀的 .on('readable',function(){ console.log('data readable') }) .on('end',function(){ console.log('data ends') }) .on('close',function(){ console.log('data close') }) .on('error',function(e){ console.log('data read error:'+e) })
運行結果如下:
借助于Stream的事件機制,我們就能實現更多個性化的定制,
從而對流里面的流程進行更精細化的控制,改造上述代碼:
var fs = require('fs') //聲明一個可讀流 var readStream = fs.createReadStream('logo_stream.js') var n = 0 //Stream在傳輸的時候會觸發data事件 readStream .on('data',function(chunk){ n++ console.log('data emits') console.log(Buffer.isBuffer(chunk)) //console.log(chunk.toString('utf8')) //流暫停 readStream.pause() //設置定時器,模擬異步處理 console.log('data pause') setTimeout(function(){ console.log('data pause end') //再重新啟動 readStream.resume() },3000) }) //還有readable事件,可讀的 .on('readable',function(){ console.log('data readable') }) .on('end',function(){ console.log(n) console.log('data ends') }) .on('close',function(){ console.log('data close') }) .on('error',function(e){ console.log('data read error:'+e) })
運行效果如下:
換一個大一點的文件,3M左右的;打印結果如下:
大概每次是64kb
用事件的方式來重構復制圖片的操作
var fs = require('fs') //放入一個大文件 var readStream = fs.createReadStream('1.pdf') var writeStream = fs.createWriteStream('1_stream.pdf'); //必然觸發一個事件 readStream.on('data',function(chunk){ //寫入目標 if(writeStream.write(chunk)=== false){ //判斷是否已經寫入到目標,來解決爆倉 console.log('still cached') readStream.pause() } }) readStream.on('end',function(){ writeStream.end() }) //耗盡方法 writeStream.on('drain',function(){ console.log('data drains') readStream.resume() }) /* 這是個標準的文件的拷貝操作,但是會有問題; 如果讀的快,寫的慢;因為讀寫的速度并不是恒定的,這個時候數據流內部的 緩存可能會被爆倉,那應該怎么辦 */
運行結果如下:
邊讀邊寫效果.
Stream的種類
Readable:可讀流,用來提供數據;外部來源的數據會被存儲到buffer里緩存起來,兩種模式:流動模式,暫停模式
Writable:可寫流,消費數據;
Duplex:雙通流,可讀可寫
Transform:轉換流,雙通
各自事件,屬性都大同小異.場景如下:請求一張圖片的數據,在瀏覽器中顯示出來
var http = require('http') var fs = require('fs') http .createServer(function(req,res){ /*fs.readFile('logo.png',function(err,data){ if(err){ res.end('file not exist') }else{ res.wirteHeader(200,{'Context-Type':'text/html'}) res.end(data) } })*/ //利用pipe就能夠更簡約的實現這套邏輯 fs.createReadStream('logo.png').pipe(res) }) .listen(8090)
運行效果如下:
不止是本地圖片的讀取,也可以是網絡環境下的
使用NodeJs中的request模塊
//使用之前先安裝,npm install request var request = require('request') request('url').pipe(res) //這樣就可以實現邊下載邊顯示
運行結果同上。
在這里pipe方法會自動幫我們監聽data和end事件,還可以自動控制后端壓力,通過對內存空間的調度就能自動控制流量、避免掉目標被快速讀取,只有末端真正需要數據的時候,數據才會從源頭被取出來然后順著管道一路走下去
再次重構讀取pdf文件
//只需要2行代碼 var fs = require('fs') fs.createReadStream('1.pdf').pipe(fs.createWriteStream('1_pipe.pdf'))
pipe做通道連接時的例子:
var Readable = require('stream').Readable var Writable = require('stream').Writable //拿到兩個實例 var readStream = new Readable() var writStream = new Writable() //push一些數據 readStream.push('I ') readStream.push('Love ') readStream.push('NodeJs ') //讀取完畢 readStream.push(null) //重寫方法 writStream._write = function(chunk,encode,cb){ console.log(chunk.toString()) cb() } //最后,使用 pipe連接起來 readStream.pipe(writStream)
運行結果如下:
來實現一個定制的可讀流,可寫流、轉換流
var stream = require('stream') var util = require('util') //定制的可寫流 function ReadStream(){ //首先改變它的上下文,讓它可以調用Stream里面可讀類的方法 stream.Readable.call(this) } //來讓我們聲明的可讀流繼承流里面可讀的原型 util.inherits(ReadStream,stream.Readable) //然后就可以為可讀流添加原型鏈上的read方法 ReadStream.prototype._read = function(){ //只干一件事,push數據 this.push('I ') this.push('Love ') this.push('NodeJs ') this.push(null) } //聲明可寫流 function WriteStream(){ stream.Writable.call(this) //聲明cache this._cached = new Buffer('') } util.inherits(WriteStream,stream.Writable) WriteStream.prototype._write = function(chunk,encode,cb){ console.log(chunk.toString()) cb() } //聲明轉換流 function TransformStream(){ stream.Transform.call(this) } util.inherits(TransformStream,stream.Transform) TransformStream.prototype._transform = function(chunk,encode,cb){ this.push(chunk) cb() } //flush TransformStream.prototype._flush = function(cb){ this.push('Oh Year!') cb() } //生成實例 var rs = new ReadStream() var ws = new WriteStream() var ts = new TransformStream() //讀到的數據pipe給轉換流 rs.pipe(ts).pipe(ws)
運行結果如下:
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。