Contents

在 Node.js 中使用流簡介

要點

串流在 Node.js 中發揮著重要作用,因為它們促進高效的資料處理和傳輸,從而為即時和事件觸發的應用程式提供最佳支援。

利用 Node.js 檔案系統模組的功能,我們可以使用 createWriteStream() 函數來建立一個可寫流,將資料導向到特定區域。

Node.js 環境包含滿足不同目的和要求的各種流類型。這四個主要類別包括可讀、可寫入、雙工和轉換流。每種類型都有不同的功能並提供特定的功能,讓開發人員可以根據其特定的應用程式需求選擇最合適的選項。

本質上,流是促進資訊從一個位置連續傳輸到另一個位置的關鍵程式結構。流的概念本質上圍繞著沿著預定路徑系統地傳送位元組。根據Node.js的權威資源,流構成了一個假設的框架,使用戶能夠與資料互動和操作。

透過串流有效傳輸資訊是其在電腦系統和網路通訊中應用的一個主要例子。

Node.js 中的流

串流一直是 Node.js 繁榮的關鍵因素,因為它們適合即時資料處理和事件驅動應用程序,而這兩者都是 Node.js 運行時環境的基本方面。

要在 Node.js 中建立一種新穎的流程,必須使用流 API,該 API 嚴格致力於處理 String 物件並在系統內緩衝資料。值得注意的是,Node.js 辨識的流有四種主要類別,即可寫入流、可讀流、雙工流和轉換流。

如何建立和使用可寫入流

檔案系統 (fs) 模組提供 WriteStream 類,該類允許建立可寫入流以將資料傳輸到指定目的地。透過利用 fs.createWriteStream() 方法,可以建立一個新流並使用提供的參數指定所需的目標路徑。此外,如有必要,還可以包括一系列可選的配置選項。

 const {createWriteStream} = require("fs");

(() => {
  const file = "myFile.txt";
   const myWriteStream = createWriteStream(file);
  let x = 0;
  const writeNumber = 10000;

  const writeData = () => {
    while (x < writeNumber) {
      const chunk = Buffer.from(`${x}, `, "utf-8");
      if (x === writeNumber-1) return myWriteStream.end(chunk);
       if (!myWriteStream.write(chunk)) break;
      x\+\+
    }
  };

  writeData();
})();

給定的程式碼匯入了createWriteStream()函數,該函數在匿名箭頭函數中使用來產生將資料附加到指定檔案的檔案流,在本例中為「myFile.txt」。在匿名函數中,有一個名為 writeData() 的嵌入函數,負責將資訊寫入指定檔案。

createWriteStream() 函數利用緩衝區在指定的輸出檔案中寫入一系列數字(範圍從 0 到 9,999)。執行時,此腳本會在目前目錄中產生一個文件,並用以下資訊填入該文件:

/bc/images/myfile-initial-data-1.jpg

目前的數據總數為 2,915;但是,它應該包含高達 9 的數字。這種差異是由於每個 WriteStream 使用在任何給定時刻保留預定數量的資訊的快取造成的。為了辨別該設定的預設值,必須參考 highWaterMark 選項。

 console.log("The highWaterMark value is: " \+
  myWriteStream.writableHighWaterMark \+ " bytes."); 

將上述指令合併到未命名函數中將導致在命令提示字元上產生後續訊息,如下所示:

/bc/images/writestream-highwatermark-1.jpg

終端顯示的輸出顯示預先配置的 highWaterMark 閾值預設為 16,384 位元組。因此,它限制了該緩衝區的容量,使其同時容納不超過 16,384 位元組的資訊。因此,前 2,915 個字元(包括任何逗號或空格)涵蓋了緩衝區中可以一次性儲存的資料的限制。

為了解決緩衝區錯誤問題,建議使用流事件。流在資料傳輸過程中會經歷在不同時間點發生的多個事件。在這些事件中,drain 事件特別適合處理出現緩衝區錯誤的情況。

在 writeData() 函數的實作中,呼叫 WriteStream 物件的 write() 方法傳回一個布林值,指示分配的資料區塊或內部緩衝區是否已達到其預定閾值,稱為“高水位線。”如果滿足此條件,則表示應用程式能夠將附加資料傳輸到關聯的輸出流。相反,一旦 write() 方法傳回假值,控制流就會繼續耗盡緩衝區中的所有剩餘數據,因為在緩衝區清空之前無法執行進一步的寫入。

 myWriteStream.on('drain', () => {
  console.log("a drain has occurred...");
  writeData();
}); 

將上述耗盡事件程式碼合併到匿名函數中可以在 WriteStream 的緩衝區達到其最大容量時耗盡它。因此,這會觸發呼叫 writeData() 方法以啟用進一步的資料傳輸。執行修改後的程序,得到以下結果:

/bc/images/the-writestream-drain-event-1.jpg

重要的是要考慮到應用程式在整個操作過程中被迫在三個不同的場合清除 WriteStream 緩衝區。此外,文字檔案似乎也進行了某些更改。

/bc/images/myfile-updated-data-1.jpg

如何建立和使用可讀流

要啟動讀取資料的過程,首先透過使用「fs.createReadStream()」函數建立一個可理解的流。

 const {createReadStream} = require("fs");

(() => {
  const file = "myFile.txt";
   const myReadStream = createReadStream(file);

  myReadStream.on("open", () => {
    console.log(`The read stream has successfully opened ${file}.`);
  });

  myReadStream.on("data", chunk => {
    console.log("The file contains the following data: " \+ chunk.toString());
  });

  myReadStream.on("close", () => {
    console.log("The file has been successfully closed.");
  });
})(); 

該腳本利用 createReadStream() 方法來存取名為「myFile.txt」的文件,該文件是先前由早期程式碼迭代產生的。此特定方法接收一個檔案路徑,該路徑可以以字串、緩衝區或 URL 格式呈現,以及各種可選參數作為其處理參數。

在與流相關的匿名函數的上下文中,值得注意的是存在多個與流相關的重要事件。儘管如此,人們可能會觀察到沒有任何關於「耗盡」事件發生的跡象。這種現象可以歸因於以下事實:可讀流通常不會緩衝數據,直到調用“stream.push(chunk)”函數或利用“read”事件。

每當使用者開啟檔案供讀取時,都會觸發 open 事件。透過將資料事件附加到本質上連續的流,該流會轉變為資料可用時可以立即傳輸的狀態。執行提供的程式碼將產生以下輸出:

/bc/images/read-stream-terminal-output-1.jpg

如何建立和使用雙工流

雙工流包含可寫入和可讀的能力,從而允許透過同一實例同時進行讀取和寫入操作。一個說明性的例子涉及網路模組的使用以及 TCP 套接字的建立。

用於說明雙工通訊通道特性的一種直接方法涉及開發能夠交換資訊的傳輸控制協定(TCP)伺服器和客戶端系統。

server.js 文件

 const net = require('net');
const port = 5000;
const host = '127.0.0.1';

const server = net.createServer();

server.on('connection', (socket)=> {
    console.log('Connection established from client.');

    socket.on('data', (data) => {
        console.log(data.toString());
    });

    socket.write("Hi client, I am server " \+ server.address().address);

    socket.on('close', ()=> {
        console.log('the socket is closed')
    });
});

server.listen(port, host, () => {
    console.log('TCP server is running on port: ' \+ port);
}); 

client.js 文件

 const net = require('net');
const client = new net.Socket();
const port = 5000;
const host = '127.0.0.1';

client.connect(port, host, ()=> {
    console.log("connected to server!");
    client.write("Hi, I'm client " \+ client.address().address);
});

client.on('data', (data) => {
    console.log(data.toString());
    client.write("Goodbye");
    client.end();
});

client.on('end', () => {
    console.log('disconnected from server.');
});

事實上,伺服器和客戶端腳本都利用可讀可寫入的流來促進通信,從而實現它們之間的資料交換。方便的是,首先啟動伺服器應用程式並開始主動等待傳入連線。啟動客戶端後,它透過指定與伺服器建立連接

/bc/images/tcp-server-and-client-terminal-output-1.jpg

形成連接後,客戶端透過使用其 WriteStream 向伺服器發送訊息來促進資料傳輸。同時,伺服器在利用自己的 WriteStream 將資料傳回客戶端之前,將傳入的資料記錄在終端中。隨後,客戶端記錄接收到的資料並繼續交換更多信息,然後終止連線。此時,伺服器保持開啟以容納來自客戶端的任何進一步連線。

如何建立和使用轉換流

zlib 和加密流。 Zlib 流有助於在文件傳輸期間壓縮和隨後解壓縮文字文件,而加密流則透過加密和解密操作實現安全通訊。

compressFile.js 應用程式

 const zlib = require('zlib');
const { createReadStream, createWriteStream } = require('fs');

(() => {
    const source = createReadStream('myFile.txt');
    const destination = createWriteStream('myFile.txt.gz');

    source.pipe(zlib.createGzip()).pipe(destination);
})(); 

由於可讀流的 pipeline() 功能的有效性,這個簡單的腳本透過取得初始文字文件、壓縮它並將其歸檔到目前資料夾中來運行。透過流管道技術消除緩衝區有助於此過程。

在將資料寫入腳本的可寫入流之前,它會透過 zlib 庫的 createGzip() 方法促進的壓縮過程進行輕微的轉移。所述方法產生檔案的壓縮版本並傳回 Gzip 物件的新實例,該實例隨後成為寫入流的接收者。

decompressFile.js 應用程式

 const zlib = require('zlib');
 const { createReadStream, createWriteStream } = require('fs');
 
(() => {
    const source = createReadStream('myFile.txt.gz');
    const destination = createWriteStream('myFile2.txt');
       
    source.pipe(zlib.createUnzip()).pipe(destination);
})(); 

現在的應用程式擴展了壓縮文檔,如果檢查新建立的名為「myFile2.txt」的文件,他們會發現其內容與原始文檔的內容完全一致。

/bc/images/myfile2-data-1.jpg

為什麼流很重要?

串流透過促進客戶端和伺服器系統之間的無縫通信,同時也支援有效壓縮和傳輸大量檔案大小,在優化資料傳輸方面發揮至關重要的作用。

流透過簡化資料傳輸過程顯著增強了程式語言的功能。缺乏串流功能會導致資料傳輸操作的複雜性增加,需要開發人員進行更高程度的手動幹預,這可能會導致錯誤增加和效能下降。