Skip to content

Fix: Node.js Stream pipeline() Not Working — Backpressure, Error Propagation, AbortSignal, and Web Streams Interop

FixDevs ·

Quick Answer

How to fix Node.js stream/promises pipeline errors — uncaught stream errors, backpressure ignored, AbortSignal not propagating, async iterators in pipeline, Transform stream object mode, and converting between Node and Web Streams.

The Error

You write a streaming pipeline and an error crashes your process:

import { createReadStream, createWriteStream } from "node:fs";

const src = createReadStream("input.bin");
const dst = createWriteStream("output.bin");
src.pipe(dst);
// If src errors mid-read, dst leaks an open file descriptor and the process can crash.

Or pipeline() swallows your Transform’s errors:

import { pipeline } from "node:stream/promises";

await pipeline(
  createReadStream("input.txt"),
  new Transform({ transform(chunk, enc, cb) { throw new Error("nope"); } }),
  createWriteStream("output.txt"),
);
// pipeline awaits and rejects — but only if you use the promise version.

Or AbortSignal doesn’t cancel a long-running pipeline:

const controller = new AbortController();
setTimeout(() => controller.abort(), 1000);

await pipeline(slowSource, slowSink);
// Keeps running until completion — signal not wired.

Or you mix Node and Web Streams and types break:

const webReadable: ReadableStream = ...;
await pipeline(webReadable, createWriteStream("out.bin"));
// TypeError: source.on is not a function

Why This Happens

Node.js streams have three layers of API:

  • Legacy .pipe() from EventEmitter days. Doesn’t handle errors well. Error in any stream leaves others dangling.
  • stream.pipeline(...) callback form. Handles cleanup and error propagation.
  • stream/promises pipeline() — same but returns a Promise. The modern default.

Backpressure (the consumer pausing the producer when it can’t keep up) is automatic with pipeline(). With pipe() you have to wire it manually.

Web Streams (ReadableStream/WritableStream) are a separate, browser-compatible API. Node has both — interop requires conversion via Readable.fromWeb() / Readable.toWeb().

Fix 1: Use pipeline() From stream/promises

The modern pattern:

import { pipeline } from "node:stream/promises";
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";

await pipeline(
  createReadStream("input.txt"),
  createGzip(),
  createWriteStream("output.txt.gz"),
);

pipeline():

  • Wires all the streams together.
  • Closes downstream when upstream finishes.
  • Closes upstream when downstream fails.
  • Surfaces the first error as a rejection.
  • Cleans up file descriptors on error.

For a stream that fails partway:

try {
  await pipeline(src, transform, dst);
  console.log("done");
} catch (err) {
  console.error("pipeline failed:", err);
}

Inside the try block, you don’t need stream-specific error handling — pipeline collects them all.

Pro Tip: Always prefer stream/promises pipeline() over the callback stream.pipeline() and certainly over .pipe(). The promise form composes with async/await and proper try/catch.

Fix 2: Backpressure Just Works (Mostly)

With pipeline(), backpressure is automatic:

const src = createReadStream("huge.bin");        // 10 GB file
const transform = new Transform({ ... });        // Slow
const dst = createWriteStream("output.bin");

await pipeline(src, transform, dst);
// src reads only as fast as transform can process.
// transform writes only as fast as dst accepts.
// Memory usage stays bounded.

With raw .pipe(), you need to respect write()’s return value:

// Without backpressure (bad):
src.on("data", (chunk) => dst.write(chunk));
// src may emit faster than dst drains; memory grows.

// With backpressure (good):
src.on("data", (chunk) => {
  const ok = dst.write(chunk);
  if (!ok) src.pause();
});
dst.on("drain", () => src.resume());

This is exactly what pipeline() does for you.

Common Mistake: Using for await on a stream then writing without checking backpressure:

// Bad — accumulates memory:
for await (const chunk of src) {
  dst.write(chunk);   // Doesn't wait for dst to drain
}

// Good — explicit await:
for await (const chunk of src) {
  if (!dst.write(chunk)) {
    await new Promise((resolve) => dst.once("drain", resolve));
  }
}

Or just use pipeline().

Fix 3: Transform Streams With Error Handling

import { Transform } from "node:stream";

const upper = new Transform({
  transform(chunk, encoding, callback) {
    try {
      const result = chunk.toString().toUpperCase();
      callback(null, result);
    } catch (err) {
      callback(err);
    }
  },
});

await pipeline(src, upper, dst);

Three rules for Transform:

  1. Always call callback. Forgetting to call it hangs the pipeline.
  2. Pass errors as the first arg. callback(err) — pipeline catches it.
  3. Pass output as the second arg. callback(null, chunk) — emits the transformed chunk.

For async transforms:

const fetchEnriched = new Transform({
  objectMode: true,
  async transform(item, encoding, callback) {
    try {
      const data = await fetchExternal(item.id);
      callback(null, { ...item, data });
    } catch (err) {
      callback(err);
    }
  },
});

objectMode: true lets the stream pass JS objects instead of strings/Buffers.

For higher-order patterns, use compose():

import { compose, pipeline } from "node:stream/promises";

const combined = compose(filter, enrich, format);
await pipeline(src, combined, dst);

compose returns a single Transform that chains the inputs.

Fix 4: AbortSignal for Cancellation

const controller = new AbortController();
setTimeout(() => controller.abort(), 5000);

try {
  await pipeline(src, transform, dst, { signal: controller.signal });
} catch (err) {
  if (err.name === "AbortError") {
    console.log("cancelled");
  } else {
    throw err;
  }
}

The signal option propagates to all streams; abort destroys them all and rejects the promise.

For interactive cancellation (e.g. user clicks cancel):

const controller = new AbortController();

uiCancelButton.onclick = () => controller.abort();

await pipeline(networkRead, transform, fileWrite, { signal: controller.signal });

The abort closes the network socket, stops the transform, closes the file. Cleanup is automatic.

Common Mistake: Wrapping pipeline() in a setTimeout(..., timeout) and trying to manually destroy streams on timeout. Use the signal option — it’s cleaner and handles cleanup correctly.

Fix 5: Async Iterators as Pipeline Sources

Any async iterable can be the source:

async function* generateLines() {
  for (let i = 0; i < 1000; i++) {
    yield `line ${i}\n`;
  }
}

await pipeline(generateLines, createWriteStream("output.txt"));

pipeline accepts:

  • Readable streams.
  • Async generators / iterables.
  • Strings, Buffers (one-shot).
  • Functions returning iterables.

For transforms via async generators:

async function* upperCase(source) {
  for await (const chunk of source) {
    yield chunk.toString().toUpperCase();
  }
}

await pipeline(
  createReadStream("input.txt"),
  upperCase,
  createWriteStream("output.txt"),
);

This is often clearer than a Transform subclass — straight async code with yield for emitting.

Pro Tip: For data-processing pipelines with multiple steps, async generators are easier to write and debug than Transform streams. The Transform class has its place when you need fine-grained control (writev, finalv, etc.).

Fix 6: Web Streams Interop

Convert between Node and Web Streams with the static helpers:

import { Readable, Writable } from "node:stream";

// Node Readable → Web ReadableStream:
const nodeReadable = createReadStream("input.txt");
const webReadable: ReadableStream<Uint8Array> = Readable.toWeb(nodeReadable);

// Web ReadableStream → Node Readable:
const response = await fetch("https://api.example.com/big.json");
const nodeReadable = Readable.fromWeb(response.body!);

await pipeline(nodeReadable, createWriteStream("downloaded.json"));

For fetch responses specifically:

const response = await fetch(url);
await pipeline(
  Readable.fromWeb(response.body!),
  createWriteStream("downloaded.bin"),
);

For Writables:

import { WritableStream } from "node:stream/web";

const webWritable: WritableStream<Uint8Array> = ...;
const nodeWritable = Writable.fromWeb(webWritable);

await pipeline(src, nodeWritable);

Note: Web Streams in Node.js are the same standard as in browsers but the implementations differ in subtle ways (object mode, error timing). For Node-only code, prefer Node streams — only convert at API boundaries.

Fix 7: Performance — Highwatermark and Object Mode

Default chunk buffering is 16 KB (64 for object mode). For high-throughput files:

const src = createReadStream("huge.bin", { highWaterMark: 1024 * 1024 });  // 1 MB chunks
const dst = createWriteStream("out.bin", { highWaterMark: 1024 * 1024 });

await pipeline(src, dst);

Larger chunks = fewer syscalls, better throughput at the cost of more memory.

For object streams (records, not bytes):

const lineStream = new Transform({
  objectMode: true,
  highWaterMark: 100,  // Up to 100 buffered objects
  transform(chunk, enc, cb) {
    // Process per record
  },
});

objectMode: true is essential for streams of structured data — without it, Node tries to coerce to Buffer.

Common Mistake: Mixing object-mode and byte-mode streams in one pipeline. The first non-object-mode stream coerces objects to strings via String(). Either keep the pipeline all-object-mode or serialize/deserialize explicitly.

Fix 8: Memory Leaks From Forgotten Cleanup

For pipeline(), cleanup is automatic. For raw .pipe() or manual streams:

import { finished } from "node:stream/promises";

const src = createReadStream("input.bin");
const dst = createWriteStream("output.bin");

src.pipe(dst);

try {
  await finished(dst);  // Wait for downstream to finish
} catch (err) {
  src.destroy(err);
  dst.destroy(err);
  throw err;
}

finished() is the Promise-based version of stream’s “finish”/“end” events. Use it when pipeline() doesn’t fit (e.g. you need to keep the source open for multiple sinks).

For multiplexing (one source → many sinks):

import { PassThrough } from "node:stream";

const source = createReadStream("data.bin");
const sink1 = createWriteStream("copy1.bin");
const sink2 = createWriteStream("copy2.bin");

const tee = new PassThrough();
source.pipe(tee);
tee.pipe(sink1);
tee.pipe(sink2);

await Promise.all([finished(sink1), finished(sink2)]);

For more sinks, write a Tee stream that emits to N destinations.

Still Not Working?

A few less-obvious failures:

  • stream emit error after end. A stream finished but an error fired afterward (cleanup error). Listen for error before destruction completes.
  • Premature close errors. A downstream closed before upstream finished. Usually means a write to a closed file/socket. Use pipeline() to handle cleanup uniformly.
  • Cannot read properties of null (reading 'read'). A stream was destroyed mid-pipeline. Check whether something is calling .destroy() externally.
  • High CPU usage in a pipeline. Transform is sync-CPU-heavy. Offload to a Worker thread or use worker_threads + a message-based pipeline pattern.
  • Different behavior between Node versions. Streams API has had subtle changes across 18/20/22. Pin Node version or test on the lowest supported.
  • Compression slower than expected. zlib level defaults to 6 (balanced). For speed: createGzip({ level: 1 }). For size: createGzip({ level: 9 }).
  • Cannot pipe, not readable. Trying to pipe from a Writable. Direction matters — sources go on the left of pipeline, sinks on the right.
  • Pipeline returns immediately without processing. Empty source. createReadStream("missing.txt") doesn’t throw; it emits an error event during the pipeline.

For related Node, async, and IO issues, see Node stream error, Node heap out of memory, Node uncaught exception, and JavaScript heap out of memory.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles