Readable stream

Readable streams are sources of data. For example, you read a stream of data from a file by calling fs.createReadStream() or from standard input by using process.stdin.

Readable stream modes

  1. Flowing mode corresponds to the state readable.readableFlowing === true, in which data is read as quickly as possible using EventEmitter interface. You should register handlers when "data" event is emitted.

  2. Paused mode corresponds to the state readable.readableFlowing === false, in which the readable.read() method must be called to read chunks of data from the stream. You should register handlers when "readable" event is emitted.

All Readable streams begin in paused mode but can be switched to the flowing mode when:

  1. "data" event handler is added.
  2. readable.resume() method is called.
  3. readable.pipe() method is called.

Readable streams switch back to the paused mode when:

  1. readable.pause() method is called.
  2. readable.unpipe() method is called.
  3. "readable" event handler is added.

Readable stream events

  1. "close" event is emitted when the stream and any of its underlying resources have been closed.

  2. "data" event is emitted when the stream is passing a chunk of data to a consumer. "data" event handlers deal with Readable streams in the flowing mode and must not be registered together with "readable" event handlers.

  3. "end" event is emitted when there is no more data to be consumed from the stream.

  4. "error" event is emitted when an error has occurred.

  5. "pause" event is emitted when the Readable stream is in flowing mode and readable.pause() method is called.

  6. "readable" event is emitted when the stream has new data available in the buffer or when the end of the stream has been reached. "readable" event handlers deal with Readable streams in the paused mode and must not be registered together with "data" event handlers.

  7. "resume" event is emitted when the stream is in paused mode and readable.resume() method is called.

Readable stream methods

  1. readable.destroy() method destroys the stream and optionally emits an "error" event and a "close" event.

  2. readable.pause() method switches a stream to the paused mode.

  3. readable.pipe(destination) method attaches a Writable stream to readable, switching it to the flowing mode with automatically managed data flow. It is possible to chain piped streams such as readable.pipe(z).pipe(w). If Readable stream emits an error, the Writable destination is not closed automatically and must be closed manually.

  4. readable.read() method reads data from the internal buffer; if no data is available, it returns null. You must call readable.read() repeatedly to drain the buffer until null is returned, otherwise you won't get another "readable" event emitted.

  5. readable.resume() causes an explicitly paused stream to resume emitting "data" events, switching the stream to the flowing mode.

  6. readable.setEncoding(encoding) method sets the character encoding for data, causing data to be returned as strings instead of Buffer objects.

  7. readable.unpipe() method detaches all Writable streams.

Readable stream properties

  1. readable.closed is true after "close" event has been emitted.

  2. readable.destroyed is true after readable.destroy() has been called.

  3. readable.readable is true if it is safe to call readable.read().

  4. readable.readableEncoding is the getter for the encoding property.

  5. readable.readableEnded is true when "end" event has been emitted.

  6. readable.readableFlowing reflects the current state of the Readable stream.

  7. readable.readableHighWaterMark is the getter for the highWaterMark property.

  8. readable.readableObjectMode is the getter for the objectMode property.

  9. readable.readableLength contains the number of bytes or objects in the queue to be read.

Readable stream in flowing mode

The following code is an example of using a Readable stream in the flowing mode to copy a file. Notice that when an error is occurred in either of the streams, you need to explicitly close both of them to clean up resources.

import fs from "fs";

const file = "data/alphabet.txt";

// Readable stream in flowing mode
function copyFile(sourceFilename, destinationFilename, callback) {
  const readable = fs.createReadStream(sourceFilename);
  const writable = fs.createWriteStream(destinationFilename);

  readable.on("error", (err) => {
    readable.destroy();
    // Wait until writable internal buffer is drained, then destroy
    // the writable stream, otherwise data in buffer continues to
    // be written after writable stream is destroyed, causing error
    writable.once("drain", writable.destroy);
    callback(err);
  });

  readable.on("data", (chunk) => {
    const writableBufferHasRoom = writable.write(chunk);
    if (!writableBufferHasRoom) {
      readable.pause();
    }
  });

  readable.on("end", () => writable.end());

  writable.on("error", (err) => {
    readable.destroy();
    // In case error occurs in writable stream, destroy it immediately
    writable.destroy();
    callback(err);
  });

  writable.on("drain", () => readable.resume());
  writable.on("finish", callback);
}

copyFile(file, "data/file_copy.txt", (err) => {
  if (err) console.error(err.message);
  else console.log("File copied successfully");
});

Readable stream in paused mode

This is how you can use the paused mode of the Readable stream. Notice that you have to read chunks of data in a loop to drain the internal buffer, otherwise the next "readable" event will not be emitted.

import fs from "fs";
import crypto from "crypto";

function sha256(filename, callback) {
  const readable = fs.createReadStream(filename);
  const hasher = crypto.createHash("sha256");

  readable.on("readable", () => {
    let chunk;
    while ((chunk = readable.read())) {
      hasher.update(chunk);
    }
  });

  readable.on("end", () => {
    const hash = hasher.digest("hex");
    callback(null, hash);
  });

  readable.on("error", callback);
}

sha256(file, (err, hash) => {
  if (err) console.error(err.message);
  else console.log("Computed hash:", hash);
});

Readable stream as async iterator

Readable streams are async iterators so you can use a for/await loop to read chunks of data from a stream without relying on event handlers and mode types.

import fs from "fs";

async function printStream(filename) {
  const readable = fs.createReadStream(filename);
  readable.setEncoding("utf8");

  for await (const chunk of readable) {
    console.log("Chunk length:", chunk.length);
  }
}

printStream(file).catch(console.error);

Piped Readable stream

The pipe() method attaches a Writable stream to the Readable so that Node handles backpressure for you automatically. Notice that you must close the Writable stream in case of an error in the Readable stream and likewise close the Readable stream in case of an error in the Writable.

import fs from "fs";
import zlib from "zlib";

function gzip(filename, callback) {
  const gz = zlib.createGzip();
  const source = fs.createReadStream(filename);
  const destination = fs.createWriteStream(filename + ".gz");

  function handleError(err) {
    source.destroy();
    gz.destroy();
    destination.destroy();
    callback(err);
  }

  source
    .on("error", handleError)
    .pipe(gz)
    .on("error", handleError)
    .pipe(destination)
    .on("error", handleError)
    .on("finish", callback);
}

gzip(file, (err) => {
  if (err) console.error(err.message);
  else console.log("File gzipped successfully");
});

Readable stream options

When creating a Readable stream implementation you can pass an options object to customize the behavior of the stream.

  1. highWaterMark: maximum number of bytes to store in the internal buffer. Default: 65536 (64 KB), or 16 for objectMode streams.

  2. encoding: encoding scheme to decode Buffer to a string. Default: null.

  3. objectMode: whether the stream should be a stream of objects. Default: false.

  4. emitClose: whether to emit "close" event after it has been destroyed. Default: true.

  5. read(): implementation for the _read() method.

  6. destroy(): implementation for the _destroy() method.

  7. construct(): implementation for the _construct() method.

  8. autoDestroy: whether the stream should automatically call destroy() method on itself after ending. Default: true.

  9. signal specifies a signal for possible cancellation.

Readable stream implementation

readable._construct(callback) is scheduled in the next tick by the stream constructor, delaying any _read() and _destroy() calls until callback is called. This is useful to initialize state or asynchronously initialize resources before the stream can be used.

The readable._read(size) method is called by the stream when it needs more data to push to the consumer. The size parameter suggests the amount of data the consumer is ready to handle. The data should be pushed to the internal buffer by push(chunk) method. After the data has been pushed to the internal buffer, the _read() method is called again by the stream once the consumer is ready to accept more data.

The readable._destroy(err, callback) method is called by readable.destroy(). It includes logic for cleanup and handles any errors that might be provided.

The readable.push(chunk) method returns true if additional chunks of data may continue to be pushed; false otherwise. Upon pushing, the chunk of data will be added to the internal queue for users of the stream to consume. Passing chunk as null signals the end of the stream (EOF), after which no more data can be written.

import { Readable } from "stream";

class CustomReadableStream extends Readable {
  constructor(options) {
    super(options);
    this._index = 65; // ASCII code for 'A'
    this._max = 90; // ASCII code for 'Z'
  }

  _read(size) {
    try {
      for (let bytesPushed = 0; bytesPushed < size; bytesPushed++) {
        if (this._index > this._max) {
          // Signal the end of the stream
          this.push(null);
          return;
        }

        const chunk = Buffer.alloc(1);
        chunk.writeUInt8(this._index++);
        console.log("About to add chunk:", chunk.toString("hex"));

        // Add chunk to the internal buffer
        const readableBufferHasRoom = this.push(chunk);

        if (!readableBufferHasRoom) {
          console.log("Buffer is full, stopping push");
          return;
        }
      }
    } catch (err) {
      // Errors must be propagated through stream.destroy(err)
      this.destroy(err);
    }
  }
}

const readable = new CustomReadableStream({
  encoding: "ascii",
  highWaterMark: 4,
});

readable.on("error", (err) => console.error(err.message));
readable.on("end", () => console.log("No more data."));
readable.on("data", (chunk) => console.log(chunk));