Writable Stream

Writable Streams are used for sinks or destinations of data. For example, fs.createWriteStream() writes data in chunks and saves data to a file.

Writable stream events

  1. "close" event is emitted when the stream and any of its underlying resources have been closed. The event indicates that no more events will be emitted, and no further computation will occur.

  2. "drain" event will be emitted when it is appropriate to resume writing data to the stream after a call to stream.write(chunk) returned false.

  3. "error" event is emitted if an error occurred while writing or piping data. The listener callback is passed a single Error argument when called.

  4. "finish" event is emitted after the stream.end() method has been called, and all data has been flushed to the underlying system.

  5. "pipe" event is emitted when the stream.pipe() method is called on a Readable stream, adding this Writable to its set of destinations.

  6. "unpipe" event is emitted when the stream.unpipe() method is called on a Readable stream, removing this Writable from its set of destinations.

Writable stream methods

  1. writable.cork() method forces all written data to be buffered in memory. You should implement writable._writev() method to work with with such batched data.

  2. writable.destroy() destroys the stream. Optionally emit an "error" event, and emit a "close" event. Note that previous calls to the writable.write() method may not have drained yet and could cause errors, therefore you might consider using writable.end() or destroying the stream upon the "drain" event instead.

  3. writable.end() method signals that no more data will be written to the Writable. The optional chunk and encoding arguments allow one final additional chunk of data to be written immediately before closing the stream.

  4. writable.setDefaultEncoding(encoding) method sets the default encoding for a Writable stream.

  5. writable.uncork() method flushes all data buffered since writable.cork() was called. If the writable.cork() method is called multiple times on a stream, the same number of calls to writable.uncork() must be called to flush the buffered data.

  6. writable.write(chunk) method writes data to the stream, and calls the supplied callback once the data has been fully handled. If an error occurs, the callback will be called with the error as its first argument. The callback is called asynchronously and before "error" is emitted. The return value is true if the internal buffer is less than the highWaterMark configured when the stream was created after admitting chunk. If false is returned, further attempts to write data to the stream should stop until the "drain" event is emitted. Once all currently buffered chunks are drained (accepted for delivery by the operating system), the "drain" event will be emitted.

Writable stream properties

  1. writable.closed is true after "close" event has been emitted.

  2. writable.destroyed is true after writable.destroy() has been called.

  3. writable.writable is true when it is safe to call writable.write(), which means the stream has not been destroyed, errored, or ended.

  4. writable.writableEnded is true after writable.end() has been called.

  5. writable.writableCorked number of times writable.uncork() needs to be called in order to fully uncork the stream.

  6. writable.errored returns error if the stream has been destroyed with an error.

  7. writable.writableFinished is set to true immediately before the "finish" event is emitted.

  8. writable.writableHighWaterMark returns the value of highWaterMark property.

  9. writable.writableLength number of bytes (or objects) in the queue ready to be written.

  10. writable.writableNeedDrain is true if the stream's buffer has been full and stream will emit "drain" event.

  11. writable.writableObjectMode is the getter for the property objectMode.

Writable stream options

When creating a Writable stream implementation you can pass an options object to customize the behavior of the stream.

  1. highWaterMark: internal buffer level when stream.write() starts returning false. Default: 65536 (64 KB), or 16 for objectMode streams.

  2. decodeStrings: whether to encode strings when passing to internal Buffer or leave strings as they are. Default: true.

  3. defaultEncoding: default encoding used when no encoding is specified in the argument to writable.write() method call. Default: "utf-8".

  4. objectMode: whether to use the object mode. Default: false.

  5. emitClose: whether to emit "close" event after the stream has been destroyed. Default: true.

  6. write(): implementation for the _write() method.

  7. writev(): implementation for the _writev() method.

  8. destroy(): implementation for the _destroy() method.

  9. final(): implementation for the _final() method.

  10. construct(callback): implementation for the _construct() method.

  11. autoDestroy: whether the stream should automatically call destroy() on itself after ending. Default: true.

  12. signal: a signal for possible cancellation.

Writable stream implementation

The writable._construct(callback) method is called in a tick after the stream constructor has returned, delaying any _write(), _final() and _destroy() calls until callback is called. This is useful to initialize state or asynchronously initialize resources before the stream can be used.

The writable._write(chunk, encoding, callback) method is called when there is data to be written to the stream. The chunk is the data to be written, encoding is the encoding type for the chunk if it is a string, and callback is a function that should be called when the write operation is complete.

The writable._writev(chunks, callback) method may be implemented in addition or alternatively to _write() in stream implementations that are capable of processing multiple chunks of data at once. If implemented and if there is buffered data from previous writes, _writev() will be called instead of _write().

The writable._destroy(err, callback) method is called by writable.destroy(). It includes logic for cleanup and handles any errors that might be provided.

The writable._final(callback) method is called before the stream closes, delaying the "finish" event until callback is called. This is useful to close resources or write buffered data before a stream ends.

import { Writable } from "stream";
import { StringDecoder } from "string_decoder";

export default class CustomWritable extends Writable {
  constructor(options) {
    super(options);
    this._decoder = new StringDecoder(options && options.defaultEncoding);
    this.data = "";
  }

  _write(chunk, encoding, callback) {
    try {
      if (encoding === "buffer") {
        chunk = this._decoder.write(chunk);
      }
      this.data += chunk;
      callback();
    } catch (err) {
      callback(err);
    }
  }

  _final(callback) {
    try {
      this.data += this._decoder.end();
      callback();
    } catch (err) {
      callback(err);
    }
  }
}

const writable = new CustomWritable();
writable.write(Buffer.from([0xe2]));
writable.write(Buffer.from([0x82]));
writable.end(Buffer.from([0xac]));
console.log(writable.data);

Handling backpressure

The write() method returns true if the internal buffer to which the data has been written is not yet full. Otherwise, if the internal buffer is full, it return false. This return value is a backpressure message, telling you that the data is being written more quickly than it can be handled.

The proper response to this backpressure is to stop calling write() method until the Writable stream emits "drain" event, signaling that there is room in the buffer.

The following is an example of handling backpressure with Promise-based approach.

import fs from "fs";

function writeFile(filename, callback) {
  const writable = fs.createWriteStream(filename);

  writable.on("error", (err) => {
    writable.destroy();
    callback(err);
  });

  writable.on("finish", callback);

  function getChunk(i) {
    const chunk = Buffer.alloc(65536, i);
    chunk.writeUInt8(10, chunk.length - 1);
    return chunk;
  }

  const idxLetterA = 65;
  const idxLetterZ = 90;
  let i = idxLetterA;

  function writeAlphabet() {
    let bufferHasRoom = true;
    let isLetter = i <= idxLetterZ;

    while (bufferHasRoom && isLetter) {
      const chunk = getChunk(i++);
      bufferHasRoom = writable.write(chunk);
    }

    if (isLetter) {
      writable.once("drain", writeAlphabet);
    } else {
      writable.end();
    }
  }

  writeAlphabet();
}

writeFile("data/alphabet.txt", (err) => {
  if (err) {
    console.error(err.message);
  } else {
    console.log("File was written successfully");
    copy();
  }
});

function write(writable, chunk) {
  let ok = writable.write(chunk);

  if (ok) {
    return Promise.resolve(null);
  } else {
    return new Promise((resolve) => {
      writable.once("drain", resolve);
    });
  }
}

async function copy() {
  const source = fs.createReadStream("data/alphabet.txt");
  const destination = fs.createWriteStream("data/copy.txt");

  function handleError(err) {
    source.destroy();
    destination.destroy();
    console.error(err.message);
  }

  source.on("error", handleError);
  destination.on("error", handleError);

  for await (let chunk of source) {
    await write(destination, chunk);
  }

  destination.end();
  console.log("File copied successfully");
}