這篇文章主要為大家展示了“Node.js中Stream怎么用”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Node.js中Stream怎么用”這篇文章吧。
成都創(chuàng)新互聯(lián)于2013年成立,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項目網(wǎng)站制作、成都做網(wǎng)站網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元明溪做網(wǎng)站,已為上家服務(wù),為明溪各地企業(yè)和個人服務(wù),聯(lián)系電話:18980820575一、什么是Stream(流)
流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口(abstract interface)。 stream 模塊提供了基礎(chǔ)的API。使用這些API可以很容易地來構(gòu)建實現(xiàn)流接口的對象。例如, HTTP 請求 和 process.stdout 就都是流的實例。
流可以是可讀的、可寫的,或是可讀寫的。注意,所有的流都是 EventEmitter 的實例。
二、流的類型
Node.js 中有四種基本的流類型:
Readable - 可讀的流 (例如 fs.createReadStream())。
Writable - 可寫的流 (例如 fs.createWriteStream())。
Duplex - 可讀寫的流(雙工流) (例如 net.Socket)。
Transform - 在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate())。
var Stream = require('stream') //stream 模塊引入方式 var Readable = Stream.Readable //可讀的流 var Writable = Stream.Writable //可寫的流 var Duplex = Stream.Duplex //可讀寫的流 var Transform = Stream.Transform //在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流
Node.js中關(guān)于流的操作被封裝到了Stream模塊中,這個模塊也被多個核心模塊所引用。例如在fs.createReadStream()和fs.createWriteStream()的源碼實現(xiàn)里,都調(diào)用了Stream模塊提供的抽象接口來實現(xiàn)對流數(shù)據(jù)的操作。
三、為什么使用Stream?
我們通過兩個例子,了解一下為什么要使用Stream。
Exp1:
下面是一個讀取文件內(nèi)容的例子:
const fs = require('fs') fs.readFile(file, function (err, content) { //讀出來的content是Buffer console.log(content) console.log(content.toString()) })
但如果文件內(nèi)容較大,譬如在500M時,執(zhí)行上述代碼的輸出為:
<Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... > buffer.js:382 throw new Error('toString failed'); ^ Error: toString failed at Buffer.toString (buffer.js:382:11)
報錯的原因是content這個Buffer對象的長度過大,導(dǎo)致toString方法失敗。
可見,這種一次獲取全部內(nèi)容的做法,不適合操作大文件。
可以考慮使用流來讀取文件內(nèi)容。
var fs = require('fs') fs.createReadStream(bigFile).pipe(process.stdout)
fs.createReadStream創(chuàng)建一個可讀流,連接了源頭(上游,文件)和消耗方(下游,標準輸出)。
執(zhí)行上面代碼時,流會逐次調(diào)用fs.read(ReadStream這個類的源碼里有一個_read方法,這個_read方法在內(nèi)部調(diào)用了fs.read來實現(xiàn)對文件的讀?。?,將文件中的內(nèi)容分批取出傳給下游。
在文件看來,它的內(nèi)容被分塊地連續(xù)取走了。
在下游看來,它收到的是一個先后到達的數(shù)據(jù)序列。
如果不需要一次操作全部內(nèi)容,它可以處理完一個數(shù)據(jù)便丟掉。
在流看來,任一時刻它都只存儲了文件中的一部分數(shù)據(jù),只是內(nèi)容在變化而已。
這種情況就像是用水管去取池子中的水。
每當(dāng)用掉一點水,水管便會從池子中再取出一點。
無論水池有多大,都只存儲了與水管容積等量的水。
Exp2:
下面是一個在線看視頻的例子,假定我們通過HTTP請求返回視頻內(nèi)容給用戶
const http = require('http'); const fs = require('fs'); http.createServer((req, res) => { fs.readFile(videoPath, (err, data) => { res.end(data); }); }).listen(8080);
但這樣有兩個明顯的問題
視頻文件需要全部讀取完,才能返回給用戶,這樣等待時間會很長。
視頻文件一次全放入內(nèi)存中,內(nèi)存吃不消。
用流可以將視頻文件一點一點讀到內(nèi)存中,再一點一點返回給用戶,讀一部分,寫一部分。(利用了 HTTP 協(xié)議的 Transfer-Encoding: chunked 分段傳輸特性),用戶體驗得到優(yōu)化,同時對內(nèi)存的開銷明顯下降。
const http = require('http'); const fs = require('fs'); http.createServer((req, res) => { fs.createReadStream(videoPath).pipe(res); }).listen(8080);
通過上述兩個例子,我們知道,在大數(shù)據(jù)情況下必須使用流式處理。
四、可讀流(Readable Stream)
可讀流(Readable streams)是對提供數(shù)據(jù)的源頭(source)的抽象。
常見的可讀流:
HTTP responses, on the client
HTTP requests, on the server
fs read streams
TCP sockets //sockets是一個雙工流,即可讀可寫的流
process.stdin //標準輸入
所有的 Readable Stream 都實現(xiàn)了 stream.Readable 類定義的接口。
可讀流的兩種模式(flowing 和 paused)
在 flowing 模式下,可讀流自動從系統(tǒng)底層讀取數(shù)據(jù),并通過 EventEmitter 接口的事件盡快將數(shù)據(jù)提供給應(yīng)用(所有的流都是 EventEmitter 的實例)。
在 paused 模式下,必須顯式調(diào)用 stream.read()方法來從流中讀取數(shù)據(jù)片段。
創(chuàng)建流的Readable流,默認是非流動模式(paused模式),默認不會讀取數(shù)據(jù)。所有初始工作模式為paused的Readable流,可以通過下面三種途徑切換為flowing模式:
監(jiān)聽'data'事件
調(diào)用stream.resume()方法
調(diào)用stream.pipe()方法將數(shù)據(jù)發(fā)送到Writable
fs.createReadStream(path[, options])源碼實現(xiàn)
//文件名 ReadStream.js let fs = require('fs');//讀取文件 let EventEmitter = require('events'); class ReadStream extends EventEmitter {//流操作都是基于事件的 constructor(path, options = {}) { super(); //需要的參數(shù) this.path = path;//讀取文件的路徑 this.highWaterMark = options.highWaterMark || 64 * 1024;//緩沖區(qū)大小,默認64KB this.autoClose = options.autoClose || true;//是否需要自動關(guān)閉文件描述符,默認為true this.start = options.start || 0; //options 可以包括 start 和 end 值,使其可以從文件讀取一定范圍的字節(jié)而不是整個文件 this.pos = this.start; // 從文件的那個位置開始讀取內(nèi)容,pos會隨著讀取的位置而改變 this.end = options.end || null; // null表示沒傳遞 this.encoding = options.encoding || null; this.flags = options.flags || 'r';//以何種方式操作文件 // 參數(shù)的問題 this.flowing = null; // 默認為非流動模式 // 建一個buffer存放讀出來的數(shù)據(jù) this.buffer = Buffer.alloc(this.highWaterMark); this.open(); // {newListener:[fn]} // 次方法默認同步調(diào)用的 this.on('newListener', (type) => { // 等待著 它監(jiān)聽data事件 if (type === 'data') {//當(dāng)監(jiān)聽到data事件時,把流設(shè)置為流動模式 this.flowing = true; this.read();// 開始讀取 客戶已經(jīng)監(jiān)聽了data事件 } }) } pause(){//將流從flowing模式切換為paused模式 this.flowing = false; } resume(){//將流從paused模式切換為flowing模式 this.flowing =true; this.read();//將流從paused模式切換為flowing模式后,繼續(xù)讀取文件內(nèi)容 } read(){ // 默認第一次調(diào)用read方法時還沒有獲取fd,文件的打開是異步的,所以不能直接讀 if(typeof this.fd !== 'number'){ //如果fd不是number類型,證明文件還沒有打開,此時需要監(jiān)聽一次open事件,因為文件一打開,就會觸發(fā)open事件,這個在this.open()里寫了 return this.once('open',() => this.read()); // 等待著觸發(fā)open事件后fd肯定拿到了,拿到以后再去執(zhí)行read方法 } // 當(dāng)獲取到fd時 開始讀取文件了 // 第一次應(yīng)該讀2個 第二次應(yīng)該讀2個 // 第二次pos的值是4 end是4 // 讀取文件里一共4有個數(shù)為123 4,我們讀取里面的123 4 let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;//規(guī)定每次讀取多少個字節(jié) fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, byteRead) => { // byteRead為真實的讀到了幾個字節(jié)的內(nèi)容 // 讀取完畢 this.pos += byteRead; // 讀出來兩個,pos位置就往后移兩位 // this.buffer默認就是三個 let b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);//對讀出來的內(nèi)容進行編碼 this.emit('data', b);//觸發(fā)data事件,將讀到的內(nèi)容輸出給用戶 if ((byteRead === this.highWaterMark)&&this.flowing){ return this.read(); // 繼續(xù)讀 } // 這里就是沒有更多的邏輯了 if (byteRead < this.highWaterMark){ // 沒有更多了 this.emit('end'); // 讀取完畢 this.destroy(); // 銷毀即可 } }); } // 打開文件用的 destroy() { if (typeof this.fd != 'number') { return this.emit('close'); } //如果文件還沒打開,直接觸發(fā)close事件 fs.close(this.fd, () => { // 如果文件打開過了 那就關(guān)閉文件并且觸發(fā)close事件 this.emit('close'); }); } open() { fs.open(this.path, this.flags, (err, fd) => { //fd是文件描述符,它標識的就是當(dāng)前this.path這個文件,從3開始(number類型) if (err) { if (this.autoClose) { // 如果需要自動關(guān)閉我再去銷毀fd this.destroy(); // 銷毀(關(guān)閉文件,觸發(fā)關(guān)閉事件) } this.emit('error', err); // 如果有錯誤觸發(fā)error事件 return; } this.fd = fd; // 保存文件描述符 this.emit('open', this.fd); // 文件被打開了,觸發(fā)文件被打開的方法 }); } pipe(dest){//管道流的實現(xiàn) pipe()方法是ReadStream下的方法,它里面的參數(shù)是WritableStream this.on('data',(data)=>{ let flag = dest.write(data); if(!flag){//這個flag就是每次調(diào)用ws.write()后返回的讀狀態(tài)值 this.pause();// 已經(jīng)不能繼續(xù)寫了,等他寫完了再恢復(fù) } }); dest.on('drain',()=>{//當(dāng)讀取緩存區(qū)清空后 console.log('寫一下停一下') this.resume();//繼續(xù)往dest寫入數(shù)據(jù) }); } } module.exports = ReadStream;//導(dǎo)出可讀流
使用fs.createReadStream()
// 流:有序的有方向的,可以自己控制速率 // 讀:讀是將內(nèi)容讀取到內(nèi)存中 // 寫:寫是將內(nèi)存或者文件的內(nèi)容寫入到文件內(nèi) // 讀取的時候默認讀 默認一次讀取64k,encoding 讀取出來的內(nèi)容默認都是buffer //let fs = require('fs'); //let rs = fs.createReadStream({...});//原生實現(xiàn)可讀流 let ReadStream = require('./ReadStream'); let rs = new ReadStream('./2.txt', { highWaterMark: 3, // 字節(jié) flags:'r',//讀文件 autoClose:true, // 默認讀取完畢后自動關(guān)閉文件描述符 start:0, //end:3,// 流是閉合區(qū)間 包start也包end encoding:'utf8' }); // 默認創(chuàng)建一個流 是非流動模式(上述源碼中有寫的),默認不會讀取數(shù)據(jù) // 如果我們需要接收數(shù)據(jù),那我們要監(jiān)聽data事件,這樣數(shù)據(jù)會自動的流出來 rs.on('error',function (err) {// 通常,這會在底層系統(tǒng)內(nèi)部出錯從而不能產(chǎn)生數(shù)據(jù),或當(dāng)流的實現(xiàn)試圖傳遞錯誤數(shù)據(jù)時發(fā)生。 console.log(err) }); rs.on('open',function () {//文件被打開了,獲取到了fd。內(nèi)部會自動的觸發(fā)這個事件 rs.emit('data'); console.log('文件打開了'); }); rs.on('data',function (data) {//有數(shù)據(jù)流出來了 console.log(data); rs.pause(); // 暫停觸發(fā)on('data')事件,將流動模式又轉(zhuǎn)化成了非流動模式 }); setTimeout(()=>{rs.resume()},3000);//三秒鐘之后再將非流動模式轉(zhuǎn)化為流動模式 rs.on('end',function () {// 讀取完畢 console.log('讀取完畢了'); }); rs.on('close',function () {//close 事件將在流或其底層資源(比如一個文件)關(guān)閉后觸發(fā)。close 事件觸發(fā)后,該流將不會再觸發(fā)任何事件。 //console.log('關(guān)閉') });
四、可寫流(Writable Stream)
可寫流是對數(shù)據(jù)流向設(shè)備的抽象,用來消費上游流過來的數(shù)據(jù),通過可寫流程序可以把數(shù)據(jù)寫入設(shè)備,常見的是本地磁盤文件或者 TCP、HTTP 等網(wǎng)絡(luò)響應(yīng)。
常見的可寫流:
HTTP requests, on the client
HTTP responses, on the server
fs write streams
zlib streams
crypto streams
TCP sockets
child process stdin
process.stdout, process.stderr
所有 Writable 流都實現(xiàn)了 stream.Writable 類定義的接口。
可寫流的使用
調(diào)用可寫流實例的 write() 方法就可以把數(shù)據(jù)寫入可寫流
const fs = require('fs'); const rs = fs.createReadStream(sourcePath); const ws = fs.createWriteStream(destPath); rs.setEncoding('utf-8'); // 設(shè)置編碼格式 rs.on('data', chunk => { ws.write(chunk); // 寫入數(shù)據(jù) });
監(jiān)聽了可讀流的data事件就會使可讀流進入流動模式,我們在回調(diào)事件里調(diào)用了可寫流的 write() 方法,這樣數(shù)據(jù)就被寫入了可寫流抽象的設(shè)備destPath中。
write() 方法有三個參數(shù)
chunk {String| Buffer},表示要寫入的數(shù)據(jù)
encoding 當(dāng)寫入的數(shù)據(jù)是字符串的時候可以設(shè)置編碼
callback 數(shù)據(jù)被寫入之后的回調(diào)函數(shù)
drain事件
如果調(diào)用 stream.write(chunk)方法返回false,表示當(dāng)前緩存區(qū)已滿,流將在適當(dāng)?shù)臅r機(緩存區(qū)清空后)觸發(fā)drain事件。
const fs = require('fs'); const rs = fs.createReadStream(sourcePath); const ws = fs.createWriteStream(destPath); rs.setEncoding('utf-8'); // 設(shè)置編碼格式 rs.on('data', chunk => { let flag = ws.write(chunk); // 寫入數(shù)據(jù) if (!flag) { // 如果緩存區(qū)已滿暫停讀取 rs.pause(); } }); ws.on('drain', () => { rs.resume(); // 緩存區(qū)已清空 繼續(xù)讀取寫入 });
fs.createWriteStream(path[, options])源碼實現(xiàn)
// 文件 WriteStream.js let fs = require('fs'); let EventEmitter = require('events'); class WriteStream extends EventEmitter { constructor(path, options = {}) { super(); this.path = path; this.flags = options.flags || 'w'; this.encoding = options.encoding || 'utf8'; this.start = options.start || 0; this.pos = this.start; this.mode = options.mode || 0o666; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark || 16 * 1024; this.open(); // fd 異步的 //觸發(fā)一個open事件,當(dāng)觸發(fā)open事件后fd肯定就存在了 // 寫文件的時候 需要的參數(shù)有哪些 // 第一次寫入是真的往文件里寫 this.writing = false; // 默認第一次就不是正在寫入 // 用簡單的數(shù)組來模擬一下緩存 this.cache = []; // 維護一個變量,表示緩存的長度 this.len = 0; // 是否觸發(fā)drain事件 this.needDrain = false; } clearBuffer() { let buffer = this.cache.shift(); if (buffer) { // 如果緩存里有 this._write(buffer.chunk, buffer.encoding, () => this.clearBuffer()); } else {// 如果緩存里沒有了 if (this.needDrain) { // 需要觸發(fā)drain事件 this.writing = false; // 告訴下次直接寫就可以了 不需要寫到內(nèi)存中了 this.needDrain = false; this.emit('drain'); } } } _write(chunk, encoding, clearBuffer) { // 因為write方法是同步調(diào)用的此時fd還沒有獲取到,所以等待獲取到再執(zhí)行write操作 if (typeof this.fd != 'number') { return this.once('open', () => this._write(chunk, encoding, clearBuffer)); } fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => { this.pos += byteWritten; this.len -= byteWritten; // 每次寫入后就要在內(nèi)存中減少一下 clearBuffer(); // 第一次就寫完了 }) } write(chunk, encoding = this.encoding) { // 客戶調(diào)用的是write方法去寫入內(nèi)容 // 要判斷 chunk必須是buffer或者字符串 為了統(tǒng)一,如果傳遞的是字符串也要轉(zhuǎn)成buffer chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); this.len += chunk.length; // 維護緩存的長度 3 let ret = this.len < this.highWaterMark; if (!ret) { this.needDrain = true; // 表示需要觸發(fā)drain事件 } if (this.writing) { // 表示正在寫入,應(yīng)該放到內(nèi)存中 this.cache.push({ chunk, encoding, }); } else { // 第一次 this.writing = true; this._write(chunk, encoding, () => this.clearBuffer()); // 專門實現(xiàn)寫的方法 } return ret; // 能不能繼續(xù)寫了,false表示下次的寫的時候就要占用更多內(nèi)存了 } destroy() { if (typeof this.fd != 'number') { this.emit('close'); } else { fs.close(this.fd, () => { this.emit('close'); }); } } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit('error', err); if (this.autoClose) { this.destroy(); // 如果自動關(guān)閉就銷毀文件描述符 } return; } this.fd = fd; this.emit('open', this.fd); }); } } module.exports = WriteStream;
使用fs.createWriteStream()
// 可寫流有緩存區(qū)的概念 // 1.第一次寫入是真的向文件里寫,第二次在寫入的時候是放到了緩存區(qū)里 // 2.寫入時會返回一個boolean類型,返回為false時表示緩存區(qū)滿了,不要再寫入了 // 3.當(dāng)內(nèi)存和正在寫入的內(nèi)容消耗完后,會觸發(fā)一個drain事件 //let fs = require('fs'); //let rs = fs.createWriteStream({...});//原生實現(xiàn)可寫流 let WS = require('./WriteStream') let ws = new WS('./2.txt', { flags: 'w', // 寫入文件,默認文件不存在會創(chuàng)建 highWaterMark: 1, // 設(shè)置當(dāng)前緩存區(qū)的大小 encoding: 'utf8', // 文件里存放的都是二進制 start: 0, autoClose: true, // 自動關(guān)閉文件描述符 mode: 0o666, // 可讀可寫 }); // drain的觸發(fā)時機,只有當(dāng)highWaterMark填滿時,才可能觸發(fā)drain // 當(dāng)嘴里的和地下的都吃完了,就會觸發(fā)drain方法 let i = 9; function write() { let flag = true; while (flag && i >= 0) { i--; flag = ws.write('111'); // 987 // 654 // 321 // 0 console.log(flag) } } write(); ws.on('drain', function () { console.log('dry'); write(); });
以上是“Node.js中Stream怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)成都網(wǎng)站設(shè)計公司行業(yè)資訊頻道!
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
分享標題:Node.js中Stream怎么用-創(chuàng)新互聯(lián)
文章出自:http://m.rwnh.cn/article24/hsdce.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計公司、網(wǎng)站建設(shè)、搜索引擎優(yōu)化、網(wǎng)站營銷、靜態(tài)網(wǎng)站、網(wǎng)頁設(shè)計公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容