Transform stream

Transform streams are Duplex streams that modify or transform the data as it is read or written. Examples include zlib.createGzip() for compressing data, zlib.createGunzip() for decompressing data, and crypto.createCipheriv() function to encrypt or decrypt data written to it.

There is no requirement that the output be the same size as the input, the same number of chunks, or arrive at the same time.

Transform stream options

The options object is passed to both Writable and Readable constructors. In addition to the Readable and Writable option fields, the Transform options object has the following fields:

  1. transform: implementation for the _transform() method.

  2. flush: implementation for the _flush() method.

Transform stream implementation

The transform._flush(callback) method will be called when there is no more written data to be consumed, but before the "end" event is emitted signaling the end of the Readable stream. Within the transform._flush() implementation, the transform.push() method may be called zero or more times, as appropriate. The callback function must be called when the flush operation is complete.

The transform._transform(chunk, encoding, callback) must be provided by an implementation to accept input and produce output. The transform._transform() implementation handles the bytes being written, computes an output, then passes that output off to the readable portion using the transform.push() method.

The transform.push() method may be called zero or more times to generate output from a single input chunk, depending on how much is to be output as a result of the chunk.

The callback function must be called only when the current chunk is completely consumed. The first argument passed to the callback must be an Error object if an error occurred while processing the input or null otherwise. If a second argument is passed to the callback, it will be forwarded on to the transform.push() method, but only if the first argument is falsy.

import { Readable, Writable, Transform } from "stream";

class CustomTransform extends Transform {
  constructor(pattern, options) {
    super({ ...options, decodeStrings: false });
    this.pattern = pattern;
    this.incompleteLine = "";
  }

  _transform(chunk, encoding, callback) {
    if (typeof chunk !== "string") {
      callback(new Error("Expected a sting to be written"));
      return;
    }

    const lines = (this.incompleteLine + chunk).split("\n");
    this.incompleteLine = lines.pop();

    let output = lines.filter((l) => this.pattern.test(l)).join("\n");
    if (output) {
      output += "\n";
    }

    this.push(output);
    callback();
  }

  _flush(callback) {
    if (this.pattern.test(this.incompleteLine)) {
      this.push(this.incompleteLine + "\n");
    }

    callback();
  }
}

const readable = new Readable({
  encoding: "utf-8",
  read() {
    this.push("hello world\n");
    this.push("nodejs streams are powerful\n");
    this.push(null);
  },
});

const writable = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },
});

const transform = new CustomTransform(/js/);
readable.pipe(transform).pipe(writable);