Writable Stream
Writable Streams are used for sinks or destinations of data. For example, fs.createWriteStream()
writes data in chunks and saves data to a file.
Writable stream events
-
"close"
event is emitted when the stream and any of its underlying resources have been closed. The event indicates that no more events will be emitted, and no further computation will occur. -
"drain"
event will be emitted when it is appropriate to resume writing data to the stream after a call tostream.write(chunk)
returnedfalse
. -
"error"
event is emitted if an error occurred while writing or piping data. The listener callback is passed a single Error argument when called. -
"finish"
event is emitted after thestream.end()
method has been called, and all data has been flushed to the underlying system. -
"pipe"
event is emitted when thestream.pipe()
method is called on a Readable stream, adding this Writable to its set of destinations. -
"unpipe"
event is emitted when thestream.unpipe()
method is called on a Readable stream, removing this Writable from its set of destinations.
Writable stream methods
-
writable.cork()
method forces all written data to be buffered in memory. You should implementwritable._writev()
method to work with with such batched data. -
writable.destroy()
destroys the stream. Optionally emit an"error"
event, and emit a"close"
event. Note that previous calls to thewritable.write()
method may not have drained yet and could cause errors, therefore you might consider usingwritable.end()
or destroying the stream upon the"drain"
event instead. -
writable.end()
method signals that no more data will be written to the Writable. The optionalchunk
andencoding
arguments allow one final additional chunk of data to be written immediately before closing the stream. -
writable.setDefaultEncoding(encoding)
method sets the default encoding for a Writable stream. -
writable.uncork()
method flushes all data buffered sincewritable.cork()
was called. If thewritable.cork()
method is called multiple times on a stream, the same number of calls towritable.uncork()
must be called to flush the buffered data. -
writable.write(chunk)
method writes data to the stream, and calls the supplied callback once the data has been fully handled. If an error occurs, the callback will be called with the error as its first argument. The callback is called asynchronously and before"error"
is emitted. The return value istrue
if the internal buffer is less than thehighWaterMark
configured when the stream was created after admitting chunk. Iffalse
is returned, further attempts to write data to the stream should stop until the"drain"
event is emitted. Once all currently buffered chunks are drained (accepted for delivery by the operating system), the"drain"
event will be emitted.
Writable stream properties
-
writable.closed
istrue
after"close"
event has been emitted. -
writable.destroyed
istrue
afterwritable.destroy()
has been called. -
writable.writable
istrue
when it is safe to callwritable.write()
, which means the stream has not been destroyed, errored, or ended. -
writable.writableEnded
istrue
afterwritable.end()
has been called. -
writable.writableCorked
number of timeswritable.uncork()
needs to be called in order to fully uncork the stream. -
writable.errored
returns error if the stream has been destroyed with an error. -
writable.writableFinished
is set totrue
immediately before the"finish"
event is emitted. -
writable.writableHighWaterMark
returns the value ofhighWaterMark
property. -
writable.writableLength
number of bytes (or objects) in the queue ready to be written. -
writable.writableNeedDrain
istrue
if the stream's buffer has been full and stream will emit"drain"
event. -
writable.writableObjectMode
is the getter for the propertyobjectMode
.
Writable stream options
When creating a Writable stream implementation you can pass an options
object to customize the behavior of the stream.
-
highWaterMark
: internal buffer level whenstream.write()
starts returningfalse
. Default:65536
(64 KB), or16
forobjectMode
streams. -
decodeStrings
: whether to encode strings when passing to internal Buffer or leave strings as they are. Default:true
. -
defaultEncoding
: default encoding used when no encoding is specified in the argument towritable.write()
method call. Default:"utf-8"
. -
objectMode
: whether to use the object mode. Default:false
. -
emitClose
: whether to emit"close"
event after the stream has been destroyed. Default:true
. -
write()
: implementation for the_write()
method. -
writev()
: implementation for the_writev()
method. -
destroy()
: implementation for the_destroy()
method. -
final()
: implementation for the_final()
method. -
construct(callback)
: implementation for the_construct()
method. -
autoDestroy
: whether the stream should automatically calldestroy()
on itself after ending. Default:true
. -
signal
: a signal for possible cancellation.
Writable stream implementation
The writable._construct(callback)
method is called in a tick after the stream constructor has returned, delaying any _write()
, _final()
and _destroy()
calls until callback is called. This is useful to initialize state or asynchronously initialize resources before the stream can be used.
The writable._write(chunk, encoding, callback)
method is called when there is data to be written to the stream. The chunk
is the data to be written, encoding
is the encoding type for the chunk if it is a string, and callback
is a function that should be called when the write operation is complete.
The writable._writev(chunks, callback)
method may be implemented in addition or alternatively to _write()
in stream implementations that are capable of processing multiple chunks of data at once. If implemented and if there is buffered data from previous writes, _writev()
will be called instead of _write()
.
The writable._destroy(err, callback)
method is called by writable.destroy()
. It includes logic for cleanup and handles any errors that might be provided.
The writable._final(callback)
method is called before the stream closes, delaying the "finish"
event until callback is called. This is useful to close resources or write buffered data before a stream ends.
import { Writable } from "stream";
import { StringDecoder } from "string_decoder";
export default class CustomWritable extends Writable {
constructor(options) {
super(options);
this._decoder = new StringDecoder(options && options.defaultEncoding);
this.data = "";
}
_write(chunk, encoding, callback) {
try {
if (encoding === "buffer") {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
} catch (err) {
callback(err);
}
}
_final(callback) {
try {
this.data += this._decoder.end();
callback();
} catch (err) {
callback(err);
}
}
}
const writable = new CustomWritable();
writable.write(Buffer.from([0xe2]));
writable.write(Buffer.from([0x82]));
writable.end(Buffer.from([0xac]));
console.log(writable.data);
Handling backpressure
The write()
method returns true
if the internal buffer to which the data has been written is not yet full. Otherwise, if the internal buffer is full, it return false
. This return value is a backpressure message, telling you that the data is being written more quickly than it can be handled.
The proper response to this backpressure is to stop calling write()
method until the Writable stream emits "drain"
event, signaling that there is room in the buffer.
The following is an example of handling backpressure with Promise-based approach.
import fs from "fs";
function writeFile(filename, callback) {
const writable = fs.createWriteStream(filename);
writable.on("error", (err) => {
writable.destroy();
callback(err);
});
writable.on("finish", callback);
function getChunk(i) {
const chunk = Buffer.alloc(65536, i);
chunk.writeUInt8(10, chunk.length - 1);
return chunk;
}
const idxLetterA = 65;
const idxLetterZ = 90;
let i = idxLetterA;
function writeAlphabet() {
let bufferHasRoom = true;
let isLetter = i <= idxLetterZ;
while (bufferHasRoom && isLetter) {
const chunk = getChunk(i++);
bufferHasRoom = writable.write(chunk);
}
if (isLetter) {
writable.once("drain", writeAlphabet);
} else {
writable.end();
}
}
writeAlphabet();
}
writeFile("data/alphabet.txt", (err) => {
if (err) {
console.error(err.message);
} else {
console.log("File was written successfully");
copy();
}
});
function write(writable, chunk) {
let ok = writable.write(chunk);
if (ok) {
return Promise.resolve(null);
} else {
return new Promise((resolve) => {
writable.once("drain", resolve);
});
}
}
async function copy() {
const source = fs.createReadStream("data/alphabet.txt");
const destination = fs.createWriteStream("data/copy.txt");
function handleError(err) {
source.destroy();
destination.destroy();
console.error(err.message);
}
source.on("error", handleError);
destination.on("error", handleError);
for await (let chunk of source) {
await write(destination, chunk);
}
destination.end();
console.log("File copied successfully");
}