Streams and Piping
Reading a huge 5GB file using fs.readFile requires loading the entire file into physical RAM. This will crash processes running under tight memory budgets. Node.js Streams solve this problem by processing data in small chunks (buffers) sequentially.
1. The Four Types of Streams
Node.js defines four main types of streams:
- Readable: Streams from which data can be consumed (e.g.,
fs.createReadStream). - Writable: Streams to which data can be written (e.g.,
fs.createWriteStream). - Duplex: Streams that are both Readable and Writable (e.g., a TCP socket connection).
- Transform: A type of Duplex stream where the output is computed based on input modification (e.g.,
zlib.createGzipto compress files).
2. Consuming a Readable Stream via Events
A Readable Stream operates in two modes: flowing (pushes data automatically) and paused (data must be read explicitly).
import fs from "fs";
// Create a readable stream from a file
const reader = fs.createReadStream("largefile.txt", {
encoding: "utf8",
highWaterMark: 16 * 1024 // Size of each chunk in bytes (default is 64KB)
});
// Listen to data events (switches the stream to flowing mode)
reader.on("data", (chunk) => {
console.log(`Received a new chunk of size: ${chunk.length} characters`);
});
// Listen to the end event
reader.on("end", () => {
console.log("Finished reading all file contents");
});
// Handle errors
reader.on("error", (err) => {
console.error("Stream error occurred:", err.message);
});3. Writing Data with Writable Streams
import fs from "fs";
const writer = fs.createWriteStream("output.txt", "utf8");
// Write chunks of data
writer.write("First line of text\n");
writer.write("Second line of text\n");
// Signal that write operation has ended
writer.end("Final line of text\n");
writer.on("finish", () => {
console.log("All data successfully flushed to disk");
});4. Piping Streams (pipe)
The pipe() method links a Readable stream output directly into a Writable stream input. It handles backpressure automatically—ensuring the reader pauses if the writer is overwhelmed by speed.
import fs from "fs";
import zlib from "zlib";
const readStream = fs.createReadStream("document.txt");
const gzipStream = zlib.createGzip();
const writeStream = fs.createWriteStream("document.txt.gz");
// Read -> Compress -> Write to file system
readStream.pipe(gzipStream).pipe(writeStream);
console.log("Piping pipeline started...");5. Modern Stream Piping: pipeline()
While pipe() is convenient, it does not clean up resources automatically if one of the streams fails or throws an error.
The stream/promises helper provides a clean, promise-based pipeline() utility that automatically closes all handles on failure:
import fs from "fs";
import zlib from "zlib";
import { pipeline } from "stream/promises";
async function compressFile() {
try {
await pipeline(
fs.createReadStream("app.log"),
zlib.createGzip(),
fs.createWriteStream("app.log.gz")
);
console.log("Compression pipeline completed successfully");
} catch (err) {
console.error("Compression pipeline failed:", err.message);
}
}
compressFile();