Duplex Stream

Duplex streams combine a Readable stream and a Writable stream into one object. For example, net.connect of the Socket object is a Duplex stream.

Duplex stream options

The options object is passed to both Writable and Readable constructors. In addition to the Readable and Writable option fields, the Duplex options object has the following fields:

  1. allowHalfOpen: if set to false, then the stream will automatically end the Writable side when the Readable side ends. Default: true.

  2. readable: sets whether Duplex stream should be readable. Default: true.

  3. writable: sets whether Duplex stream should be writable. Default: true.

  4. readableObjectMode: sets objectMode for Readable side of the stream. Default: false.

  5. writableObjectMode: sets objectMode for Writable side of the stream. Default: false.

  6. readableHighWaterMark: sets highWaterMark for the readable side of the stream.

  7. writableHighWaterMark: sets highWaterMark for the writable side of the stream.

Duplex stream implementation

Custom Duplex streams must call the new stream.Duplex() constructor and implement both the readable._read() and writable._write() methods.

import { Duplex } from "stream";

class CustomDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.buffer = []; // Internal buffer to store data
  }

  // _read is called when data is needed to be read from the stream
  _read(size) {
    // If there is data in the buffer, push it to the readable side
    if (this.buffer.length > 0) {
      this.push(this.buffer.shift());
    } else {
      // No data to read, push null to signal the end of the stream
      this.push(null);
    }
  }

  // _write is called when data is written to the stream
  _write(chunk, encoding, callback) {
    // Add the written data to the internal buffer
    this.buffer.push(chunk);
    // Immediately signal that writing is complete
    callback();
  }
}

const duplex = new CustomDuplex();

duplex.write("First chunk of data");
duplex.write("Second chunk of data");
duplex.end();

duplex.on("data", (chunk) => {
  console.log(`Data received: ${chunk.toString()}`);
});

duplex.on("end", () => {
  console.log("Stream ended");
});

duplex.on("error", (err) => {
  console.error(`Stream error: ${err.message}`);
});