Contents

ข้อมูลเบื้องต้นเกี่ยวกับการใช้สตรีมใน Node.js

ประเด็นที่สำคัญ

สตรีมมีบทบาทสำคัญใน Node.js เนื่องจากอำนวยความสะดวกในการประมวลผลและส่งข้อมูลที่มีประสิทธิภาพ จึงให้การสนับสนุนที่ดีที่สุดสำหรับแอปพลิเคชันแบบเรียลไทม์และที่กระตุ้นเหตุการณ์

การใช้ข้อเสนอของโมดูลระบบไฟล์ Node.js อาจใช้ฟังก์ชัน createWriteStream() เพื่อสร้างสตรีมแบบเขียนได้ซึ่งนำข้อมูลไปยังพื้นที่เฉพาะ

สภาพแวดล้อม Node.js ครอบคลุมประเภทสตรีมที่หลากหลายซึ่งตอบสนองวัตถุประสงค์และข้อกำหนดที่แตกต่างกัน หมวดหมู่หลักสี่ประเภทนี้ ได้แก่ สตรีมที่อ่านได้ เขียนได้ ดูเพล็กซ์ และสตรีมการแปลง แต่ละประเภทมีฟังก์ชันที่แตกต่างกันและมีความสามารถเฉพาะ ช่วยให้นักพัฒนาสามารถเลือกตัวเลือกที่เหมาะสมที่สุดสำหรับความต้องการใช้งานเฉพาะของตนได้

โดยพื้นฐานแล้ว สตรีมทำหน้าที่เป็นโครงสร้างการเขียนโปรแกรมที่สำคัญในการอำนวยความสะดวกในการส่งข้อมูลอย่างต่อเนื่องจากที่หนึ่งไปยังอีกที่หนึ่ง แนวคิดของกระแสข้อมูลโดยพื้นฐานแล้วหมุนรอบการถ่ายทอดไบต์อย่างเป็นระบบตามเส้นทางที่กำหนดไว้ล่วงหน้า ตามแหล่งข้อมูลที่เชื่อถือได้ของ Node.js สตรีมถือเป็นเฟรมเวิร์กสมมุติที่ช่วยให้ผู้ใช้สามารถโต้ตอบและจัดการข้อมูลได้

การส่งข้อมูลอย่างมีประสิทธิภาพผ่านสตรีมเป็นตัวอย่างที่สำคัญของการประยุกต์ใช้ในระบบคอมพิวเตอร์และการสื่อสารผ่านเครือข่าย

สตรีมใน Node.js

สตรีมเป็นปัจจัยสำคัญในความเจริญรุ่งเรืองของ Node.js เนื่องจากความเหมาะสมสำหรับการประมวลผลข้อมูลแบบเรียลไทม์และแอปพลิเคชันที่ขับเคลื่อนด้วยเหตุการณ์ ซึ่งเป็นทั้งสองแง่มุมพื้นฐานของสภาพแวดล้อมรันไทม์ของ Node.js

หากต้องการสร้างโฟลว์ใหม่ใน Node.js เราต้องใช้ stream API ซึ่งเน้นการจัดการอ็อบเจ็กต์ String และการบัฟเฟอร์ข้อมูลภายในระบบอย่างเคร่งครัด โดยเฉพาะอย่างยิ่ง มีสตรีมหลักสี่ประเภทที่ Node.js ยอมรับ ได้แก่ แบบเขียนได้ อ่านได้ ดูเพล็กซ์ และแบบแปรเปลี่ยนได้

วิธีสร้างและใช้สตรีมแบบเขียนได้

โมดูล File System (fs) จัดเตรียมคลาส WriteStream ที่เปิดใช้งานการสร้างสตรีมแบบเขียนได้สำหรับการส่งข้อมูลไปยังปลายทางที่กำหนด โดยใช้เมธอด fs.createWriteStream() เราสามารถสร้างสตรีมใหม่และระบุเส้นทางเป้าหมายที่ต้องการโดยใช้พารามิเตอร์ที่ให้มา นอกจากนี้ อาจรวมตัวเลือกการกำหนดค่าเพิ่มเติมไว้ด้วยหากจำเป็น

 const {createWriteStream} = require("fs");

(() => {
  const file = "myFile.txt";
   const myWriteStream = createWriteStream(file);
  let x = 0;
  const writeNumber = 10000;

  const writeData = () => {
    while (x < writeNumber) {
      const chunk = Buffer.from(`${x}, `, "utf-8");
      if (x === writeNumber-1) return myWriteStream.end(chunk);
       if (!myWriteStream.write(chunk)) break;
      x\+\+
    }
  };

  writeData();
})();

รหัสที่กำหนดนำเข้าฟังก์ชัน createWriteStream() ซึ่งใช้ภายในฟังก์ชันลูกศรที่ไม่ระบุชื่อเพื่อสร้างสตรีมไฟล์ที่ผนวกข้อมูลเข้ากับไฟล์ที่ระบุ ในกรณีนี้คือ"myFile.txt"ภายในฟังก์ชันที่ไม่ระบุชื่อ มีฟังก์ชันฝังตัวชื่อ writeData() ซึ่งรับผิดชอบในการเขียนข้อมูลไปยังไฟล์ที่กำหนด

ฟังก์ชัน createWriteStream() ใช้บัฟเฟอร์เพื่อเขียนชุดตัวเลข (ตั้งแต่ 0 ถึง 9,999) ภายในไฟล์เอาต์พุตที่ระบุ เมื่อดำเนินการ สคริปต์นี้จะสร้างไฟล์ที่อยู่ภายในไดเร็กทอรีปัจจุบันและเติมข้อมูลด้วยข้อมูลดังต่อไปนี้:

/th/images/myfile-initial-data-1.jpg

การแบ่งประเภทของตัวเลขในปัจจุบันสรุปได้ที่ 2,915; อย่างไรก็ตาม มันควรจะรวมตัวเลขให้สูงถึง 9 ความแตกต่างนี้เป็นผลมาจาก WriteStream แต่ละตัวที่ใช้แคชที่เก็บรักษาปริมาณข้อมูลที่กำหนดไว้ล่วงหน้า ณ เวลาใดก็ตาม เพื่อที่จะแยกแยะค่าเริ่มต้นของการตั้งค่านี้ เราจะต้องอ้างอิงถึงตัวเลือก highWaterMark

 console.log("The highWaterMark value is: " \+
  myWriteStream.writableHighWaterMark \+ " bytes."); 

การรวมคำสั่งดังกล่าวไว้ในฟังก์ชันที่ไม่มีชื่อจะส่งผลให้เกิดการสร้างข้อความที่ตามมาบนพร้อมท์คำสั่งดังต่อไปนี้:

/th/images/writestream-highwatermark-1.jpg

เอาต์พุตที่แสดงจากเทอร์มินัลบ่งชี้ว่าเกณฑ์ highWaterMark ที่กำหนดค่าไว้ล่วงหน้าถูกตั้งค่าเป็น 16,384 ไบต์ตามค่าเริ่มต้น ดังนั้นจึงจำกัดความจุของบัฟเฟอร์นี้เพื่อรองรับข้อมูลได้ไม่เกิน 16,384 ไบต์พร้อมกัน ดังนั้น อักขระ 2,915 ตัวแรก (รวมเครื่องหมายจุลภาคหรือช่องว่าง) จึงครอบคลุมขีดจำกัดของข้อมูลที่สามารถจัดเก็บภายในบัฟเฟอร์ได้ในคราวเดียว

เพื่อแก้ไขปัญหาข้อผิดพลาดของบัฟเฟอร์ ขอแนะนำให้ใช้เหตุการณ์สตรีม สตรีมมีเหตุการณ์หลายอย่างในระหว่างกระบวนการส่งข้อมูลซึ่งเกิดขึ้นที่จุดต่างๆ ของเวลา ในบรรดาเหตุการณ์เหล่านี้ เหตุการณ์การเดรนเหมาะอย่างยิ่งสำหรับการจัดการสถานการณ์ที่เกิดข้อผิดพลาดของบัฟเฟอร์

ในการใช้งานฟังก์ชัน writeData() การเรียกใช้เมธอด write() ของอ็อบเจ็กต์ WriteStream จะส่งคืนบูลีนเพื่อระบุว่าก้อนข้อมูลหรือบัฟเฟอร์ภายในที่ได้รับการจัดสรรนั้นถึงเกณฑ์ที่กำหนดไว้ล่วงหน้าหรือไม่ ซึ่งเรียกว่า"ระดับน้ำสูง” หากตรงตามเงื่อนไขนี้ แสดงว่าแอปพลิเคชันสามารถส่งข้อมูลเพิ่มเติมไปยังสตรีมเอาต์พุตที่เกี่ยวข้องได้ ในทางกลับกัน เมื่อเมธอด write() ส่งกลับค่าเท็จ โฟลว์การควบคุมจะดำเนินการระบายข้อมูลที่เหลืออยู่ในบัฟเฟอร์ เนื่องจากไม่สามารถเขียนเพิ่มเติมได้จนกว่าบัฟเฟอร์จะว่างเปล่า

 myWriteStream.on('drain', () => {
  console.log("a drain has occurred...");
  writeData();
}); 

การรวมโค้ดเหตุการณ์การระบายที่กล่าวมาข้างต้นภายในฟังก์ชันที่ไม่ระบุชื่อทำให้บัฟเฟอร์ของ WriteStream หมดลงเมื่อถึงความจุสูงสุด ด้วยเหตุนี้ จึงทำให้เกิดการเรียกคืนเมธอด writeData() เพื่อเปิดใช้งานการรับส่งข้อมูลเพิ่มเติม เมื่อรันโปรแกรมที่แก้ไขแล้ว จะได้ผลลัพธ์ดังต่อไปนี้:

/th/images/the-writestream-drain-event-1.jpg

สิ่งสำคัญคือต้องคำนึงว่าแอปพลิเคชันถูกบังคับให้ล้างบัฟเฟอร์ WriteStream ในโอกาสที่แยกจากกันสามครั้งตลอดการดำเนินการ นอกจากนี้ ดูเหมือนว่าไฟล์ข้อความมีการเปลี่ยนแปลงบางอย่างเช่นกัน

/th/images/myfile-updated-data-1.jpg

วิธีสร้างและใช้สตรีมที่อ่านได้

ในการเริ่มต้นกระบวนการอ่านข้อมูล ให้เริ่มต้นด้วยการสร้างสตรีมที่เข้าใจได้ผ่านการใช้ฟังก์ชัน fs.createReadStream()

 const {createReadStream} = require("fs");

(() => {
  const file = "myFile.txt";
   const myReadStream = createReadStream(file);

  myReadStream.on("open", () => {
    console.log(`The read stream has successfully opened ${file}.`);
  });

  myReadStream.on("data", chunk => {
    console.log("The file contains the following data: " \+ chunk.toString());
  });

  myReadStream.on("close", () => {
    console.log("The file has been successfully closed.");
  });
})(); 

สคริปต์ใช้เมธอด createReadStream() เพื่อเข้าถึงไฟล์ชื่อ “myFile.txt” ซึ่งสร้างขึ้นก่อนหน้านี้โดยการวนซ้ำโค้ดก่อนหน้านี้ วิธีการเฉพาะนี้รับเส้นทางของไฟล์ซึ่งอาจแสดงในรูปแบบสตริง บัฟเฟอร์ หรือ URL พร้อมด้วยพารามิเตอร์ทางเลือกต่างๆ เป็นอาร์กิวเมนต์สำหรับการประมวลผล

ในบริบทของฟังก์ชันที่ไม่ระบุชื่อที่เกี่ยวข้องกับสตรีม เป็นที่น่าสังเกตว่ามีเหตุการณ์ที่เกี่ยวข้องกับสตรีมที่สำคัญหลายครั้ง อย่างไรก็ตาม อาจสังเกตได้ว่าไม่มีข้อบ่งชี้ใดๆ เกี่ยวกับการเกิดเหตุการณ์"ท่อระบายน้ำ"ปรากฏการณ์นี้สามารถนำมาประกอบกับความจริงที่ว่าสตรีมที่อ่านได้โดยทั่วไปจะไม่บัฟเฟอร์ข้อมูลจนกว่าจะมีการเรียกใช้ฟังก์ชัน"stream.push (chunk)“หรือใช้เหตุการณ์"ที่อ่านได้”

การเริ่มทำงานของเหตุการณ์เปิดจะเกิดขึ้นทุกครั้งที่เปิดไฟล์เพื่อให้ผู้ใช้อ่าน ด้วยการแนบเหตุการณ์ข้อมูลเข้ากับสตรีมที่ต่อเนื่องโดยเนื้อแท้ สตรีมจะเปลี่ยนไปสู่สถานะที่สามารถส่งข้อมูลได้ทันทีเมื่อมีความพร้อมใช้งาน การรันโค้ดที่ให้มาจะสร้างผลลัพธ์ต่อไปนี้:

/th/images/read-stream-terminal-output-1.jpg

วิธีสร้างและใช้สตรีมดูเพล็กซ์

สตรีมดูเพล็กซ์ครอบคลุมทั้งความสามารถในการเขียนและอ่านได้ ดังนั้นจึงช่วยให้สามารถดำเนินการอ่านและเขียนพร้อมกันผ่านอินสแตนซ์เดียวกันได้ กรณีตัวอย่างเกี่ยวข้องกับการใช้ net module ร่วมกับการสร้างซ็อกเก็ต TCP

วิธีการที่ตรงไปตรงมาในการแสดงให้เห็นลักษณะของช่องทางการสื่อสารแบบดูเพล็กซ์นั้นเกี่ยวข้องกับการพัฒนาเซิร์ฟเวอร์ Transmission Control Protocol (TCP) และระบบไคลเอนต์ที่สามารถแลกเปลี่ยนข้อมูลได้

ไฟล์ server.js

 const net = require('net');
const port = 5000;
const host = '127.0.0.1';

const server = net.createServer();

server.on('connection', (socket)=> {
    console.log('Connection established from client.');

    socket.on('data', (data) => {
        console.log(data.toString());
    });

    socket.write("Hi client, I am server " \+ server.address().address);

    socket.on('close', ()=> {
        console.log('the socket is closed')
    });
});

server.listen(port, host, () => {
    console.log('TCP server is running on port: ' \+ port);
}); 

ไฟล์ client.js

 const net = require('net');
const client = new net.Socket();
const port = 5000;
const host = '127.0.0.1';

client.connect(port, host, ()=> {
    console.log("connected to server!");
    client.write("Hi, I'm client " \+ client.address().address);
});

client.on('data', (data) => {
    console.log(data.toString());
    client.write("Goodbye");
    client.end();
});

client.on('end', () => {
    console.log('disconnected from server.');
});

แท้จริงแล้วทั้งสคริปต์เซิร์ฟเวอร์และไคลเอนต์ใช้สตรีมที่สามารถอ่านและเขียนได้เพื่ออำนวยความสะดวกในการสื่อสาร ทำให้เกิดการแลกเปลี่ยนข้อมูลระหว่างกัน สะดวก แอปพลิเคชันเซิร์ฟเวอร์จะเริ่มต้นก่อนและเริ่มรอการเชื่อมต่อขาเข้า เมื่อเริ่มต้นไคลเอนต์ มันจะสร้างการเชื่อมต่อกับเซิร์ฟเวอร์โดยการระบุ

/th/images/tcp-server-and-client-terminal-output-1.jpg

เมื่อสร้างการเชื่อมต่อ ไคลเอ็นต์จะอำนวยความสะดวกในการส่งข้อมูลโดยใช้ WriteStream เพื่อส่งข้อมูลไปยังเซิร์ฟเวอร์ ในขณะเดียวกัน เซิร์ฟเวอร์จะบันทึกข้อมูลขาเข้าในเทอร์มินัลก่อนที่จะใช้ WriteStream ของตัวเองเพื่อส่งข้อมูลกลับไปยังไคลเอนต์ ต่อจากนั้น ไคลเอนต์จะบันทึกข้อมูลที่ได้รับและยังคงแลกเปลี่ยนข้อมูลเพิ่มเติมต่อไป ตามด้วยการยกเลิกการเชื่อมต่อ ณ จุดนี้ เซิร์ฟเวอร์จะรักษาสถานะเปิดไว้เพื่อรองรับการเชื่อมต่อเพิ่มเติมจากไคลเอนต์

วิธีสร้างและใช้สตรีมการแปลง

zlib และสตรีมการเข้ารหัส สตรีม Zlib อำนวยความสะดวกในการบีบอัดและคลายการบีบอัดไฟล์ข้อความในระหว่างการถ่ายโอนไฟล์ ในขณะที่สตรีมการเข้ารหัสช่วยให้การสื่อสารที่ปลอดภัยผ่านการดำเนินการเข้ารหัสและถอดรหัส

แอปพลิเคชัน compressFile.js

 const zlib = require('zlib');
const { createReadStream, createWriteStream } = require('fs');

(() => {
    const source = createReadStream('myFile.txt');
    const destination = createWriteStream('myFile.txt.gz');

    source.pipe(zlib.createGzip()).pipe(destination);
})(); 

สคริปต์ที่ตรงไปตรงมานี้ทำงานโดยนำเอกสารข้อความเริ่มต้น บีบอัด และเก็บไว้ในโฟลเดอร์ปัจจุบัน เนื่องจากประสิทธิภาพของฟังก์ชันไปป์ () ของสตรีมที่อ่านได้ การกำจัดบัฟเฟอร์ผ่านเทคโนโลยีไปป์ไลน์ช่วยให้ขั้นตอนนี้สะดวกขึ้น

ก่อนที่ข้อมูลจะถูกเขียนลงในสตรีมที่เขียนได้ของสคริปต์ ข้อมูลนั้นจะต้องผ่านกระบวนการบีบอัดเล็กน้อยซึ่งอำนวยความสะดวกโดยเมธอด createGzip() ของไลบรารี zlib วิธีการดังกล่าวจะสร้างไฟล์เวอร์ชันบีบอัดและส่งกลับอินสแตนซ์ใหม่ของออบเจ็กต์ Gzip ซึ่งต่อมาจะกลายเป็นผู้รับสตรีมการเขียน

แอปพลิเคชัน decompressFile.js

 const zlib = require('zlib');
 const { createReadStream, createWriteStream } = require('fs');
 
(() => {
    const source = createReadStream('myFile.txt.gz');
    const destination = createWriteStream('myFile2.txt');
       
    source.pipe(zlib.createUnzip()).pipe(destination);
})(); 

แอปพลิเคชันในปัจจุบันขยายออกไปตามเอกสารที่บีบอัด และหากใครตรวจสอบไฟล์ที่สร้างขึ้นใหม่ชื่อ “myFile2.txt” พวกเขาจะพบว่าเนื้อหาของไฟล์นั้นสอดคล้องกับเนื้อหาในเอกสารต้นฉบับทุกประการ

/th/images/myfile2-data-1.jpg

เหตุใดสตรีมจึงมีความสำคัญ

สตรีมมีบทบาทสำคัญในการเพิ่มประสิทธิภาพการรับส่งข้อมูลโดยอำนวยความสะดวกในการสื่อสารที่ราบรื่นระหว่างระบบไคลเอนต์และเซิร์ฟเวอร์ ขณะเดียวกันก็สนับสนุนการบีบอัดและถ่ายโอนไฟล์ขนาดมากอย่างมีประสิทธิภาพ

สตรีมช่วยเพิ่มประสิทธิภาพการทำงานของภาษาการเขียนโปรแกรมอย่างมากโดยทำให้กระบวนการถ่ายโอนข้อมูลง่ายขึ้น การไม่มีฟีเจอร์สตรีมนำไปสู่ความซับซ้อนที่เพิ่มขึ้นในการดำเนินการถ่ายโอนข้อมูล จำเป็นต้องมีการแทรกแซงด้วยตนเองในระดับที่สูงขึ้นจากนักพัฒนา ซึ่งอาจส่งผลให้ข้อผิดพลาดเพิ่มขึ้นและประสิทธิภาพลดลง