Fix: Node.js Stream Error — Pipe Not Working, Backpressure, or Premature Close
Part of: JavaScript & TypeScript Errors
Quick Answer
How to fix Node.js stream issues — pipe and pipeline errors, backpressure handling, Transform streams, async iteration, error propagation, and common stream anti-patterns.
The Problem
A Node.js stream pipe produces an error or stops unexpectedly:
const fs = require('fs');
const readable = fs.createReadStream('large-file.csv');
const writable = fs.createWriteStream('output.csv');
readable.pipe(writable);
// Error: write after end
// Or: the output file is empty
// Or: the process hangs without completingOr a Transform stream silently drops data:
const { Transform } = require('stream');
const upper = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
// Missing: callback() ← stream halts here
},
});Or an error in one piped stream crashes the process instead of being caught:
readable.pipe(transform).pipe(writable);
// If transform throws: UnhandledPromiseRejection or uncaught exception
// The error doesn't propagate to writableWhy This Happens
Node.js streams have behaviors that aren’t obvious until they break:
.pipe()doesn’t propagate errors — if a stream in a.pipe()chain emits an error, the destination stream is NOT automatically closed or destroyed. You must handle each stream’s error event individually.- Missing
callback()in Transform — every call totransform()must call itscallbackwhen done, even if no data is pushed. Without it, the stream stalls permanently. - Backpressure ignored — when
writable.write()returnsfalse, the readable should pause untildrainis emitted. Ignoring this overflows the writable’s buffer. - Async operations in streams — mixing
async/awaitwith the callback-based stream API requires care. An unhandled rejection in a transform doesn’t emit an error event. - Stream consumed twice — a readable stream can only be consumed once. Piping it to two destinations or reading it after it’s already been piped causes the second consumer to get nothing.
In production, the worst-case failure mode for stream errors is process termination. When a stream emits an 'error' event and nothing is listening, Node.js does not silently ignore it — it throws an uncaught exception. In a stock Express or Fastify server with no uncaughtException handler, that exception terminates the entire process. Your container orchestrator (Kubernetes, ECS, Nomad, PM2) restarts it, but every in-flight request on that process dies with a connection reset. Under steady traffic, a stream error in one user’s file upload can drop dozens of unrelated requests.
The blast radius is “the entire process and every concurrent request on it.” This is not a per-request failure like a 500 error — it is a service-wide failure that wipes connections, breaks WebSockets, drops in-progress writes, and produces a noisy crash report. If you see your process restart count spike in monitoring, an unhandled stream error is one of the top suspects. The fix is mechanical and applies everywhere streams are used: every stream in a pipe chain must have an 'error' listener, or you must use stream.pipeline(), which handles error propagation and cleanup automatically.
Fix 1: Use pipeline() Instead of pipe()
stream.pipeline() is the modern replacement for .pipe(). It handles error propagation and stream cleanup automatically:
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// WRONG — pipe() doesn't propagate errors or clean up
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.txt.gz'));
// If zlib fails: writable stays open, disk space leaks
// CORRECT — pipeline() cleans up all streams on error
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
// With Promise (Node.js 15+)
const { pipeline } = require('stream/promises');
async function compress() {
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz'),
);
console.log('Done');
}pipeline() vs pipe() comparison:
.pipe() | pipeline() | |
|---|---|---|
| Error propagation | Manual (each stream) | Automatic |
| Stream cleanup on error | Manual | Automatic |
| Callback/Promise | No | Yes |
| Multiple transforms | Chained calls | Single call |
| Recommended | No (legacy) | Yes |
Fix 2: Always Call callback() in Transform Streams
Every invocation of the transform() method must call its callback:
const { Transform } = require('stream');
// WRONG — missing callback() call stalls the stream
const brokenTransform = new Transform({
transform(chunk, encoding, callback) {
const processed = chunk.toString().toUpperCase();
this.push(processed);
// callback() not called → stream hangs
},
});
// CORRECT — always call callback when done
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
const processed = chunk.toString().toUpperCase();
this.push(processed);
callback(); // Signal: ready for next chunk
},
});
// CORRECT — callback takes optional error and data arguments
const parseJSONTransform = new Transform({
objectMode: true, // Work with objects, not Buffers
transform(chunk, encoding, callback) {
try {
const obj = JSON.parse(chunk.toString());
callback(null, obj); // Pass transformed data via callback
} catch (err) {
callback(err); // Pass error — stream emits 'error' event
}
},
});Async transform function:
const asyncTransform = new Transform({
transform(chunk, encoding, callback) {
// Can't use async directly — wrap with a self-invoking async function
(async () => {
const result = await processChunk(chunk);
this.push(result);
callback();
})().catch(callback); // Pass async errors to callback
},
});Fix 3: Handle Backpressure
Backpressure prevents memory overflow when the writable can’t keep up with the readable:
const fs = require('fs');
const readable = fs.createReadStream('huge-file.bin');
const writable = fs.createWriteStream('destination.bin');
// WRONG — ignore backpressure signal, overwhelms writable buffer
readable.on('data', (chunk) => {
writable.write(chunk); // write() may return false — ignored here
});
// CORRECT — handle backpressure
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
readable.pause(); // Stop reading until writable drains
}
});
writable.on('drain', () => {
readable.resume(); // Writable buffer cleared, resume reading
});
readable.on('end', () => {
writable.end();
});
readable.on('error', (err) => console.error('Read error:', err));
writable.on('error', (err) => console.error('Write error:', err));Note:
pipeline()handles backpressure automatically. Use it instead of manually wiringpause/resume.
Fix 4: Propagate Errors in .pipe() Chains
When you must use .pipe(), add error handlers to every stream in the chain:
const readable = fs.createReadStream('input.csv');
const transform = new MyTransform();
const writable = fs.createWriteStream('output.csv');
function destroyAll(err) {
console.error('Stream error:', err);
readable.destroy();
transform.destroy();
writable.destroy();
}
readable.on('error', destroyAll);
transform.on('error', destroyAll);
writable.on('error', destroyAll);
readable.pipe(transform).pipe(writable);Or switch to pipeline() entirely — it’s the proper solution:
// This replaces all the manual error handling above
pipeline(readable, transform, writable, (err) => {
if (err) console.error('Pipeline failed:', err);
});Fix 5: Use Async Iteration for Simpler Stream Consumption
Node.js 12+ supports for await...of on readable streams, which is cleaner than event-based reading:
const fs = require('fs');
const readline = require('readline');
// Read a file line by line
async function processLines(filePath) {
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({ input: fileStream });
for await (const line of rl) {
if (line.startsWith('#')) continue; // Skip comments
await processLine(line);
}
}
// Process a readable stream chunk by chunk
async function processStream(readable) {
for await (const chunk of readable) {
console.log('Chunk:', chunk.length, 'bytes');
}
}
// With error handling
async function safeProcessStream(readable) {
try {
for await (const chunk of readable) {
await handleChunk(chunk);
}
} catch (err) {
console.error('Stream error during iteration:', err);
readable.destroy();
}
}Collecting all stream data into a buffer:
async function streamToBuffer(readable) {
const chunks = [];
for await (const chunk of readable) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}
async function streamToString(readable, encoding = 'utf8') {
const buffer = await streamToBuffer(readable);
return buffer.toString(encoding);
}
// Usage
const content = await streamToString(fs.createReadStream('file.txt'));Fix 6: Create Custom Readable and Writable Streams
Common patterns for building streams correctly:
const { Readable, Writable, Transform } = require('stream');
// Custom Readable — push data on demand
class CounterStream extends Readable {
constructor(max) {
super({ objectMode: true });
this.current = 0;
this.max = max;
}
_read() {
if (this.current <= this.max) {
this.push(this.current++);
} else {
this.push(null); // Signal end of stream
}
}
}
// Custom Writable — consume data
class CollectorStream extends Writable {
constructor() {
super({ objectMode: true });
this.collected = [];
}
_write(chunk, encoding, callback) {
this.collected.push(chunk);
callback(); // Must call — signals ready for next chunk
}
_final(callback) {
// Called when writable is ending — flush any remaining data
console.log('Collected:', this.collected);
callback();
}
}
// Custom Transform
class DoubleTransform extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
this.push(chunk * 2);
callback();
}
}
// Use them together
const { pipeline } = require('stream/promises');
await pipeline(
new CounterStream(10),
new DoubleTransform(),
new CollectorStream(),
);Fix 7: Read Streams Correctly from HTTP Responses
Streaming HTTP responses has specific patterns:
const https = require('https');
const fs = require('fs');
const { pipeline } = require('stream/promises');
// Download a file using streams — memory efficient for large files
async function downloadFile(url, destPath) {
const response = await new Promise((resolve, reject) => {
https.get(url, resolve).on('error', reject);
});
if (response.statusCode !== 200) {
response.resume(); // Drain the response to free memory
throw new Error(`Download failed: ${response.statusCode}`);
}
await pipeline(
response,
fs.createWriteStream(destPath),
);
}
// With the 'got' or 'undici' library (simpler API)
import got from 'got';
const { pipeline } = require('stream/promises');
await pipeline(
got.stream('https://example.com/large-file.zip'),
fs.createWriteStream('file.zip'),
);Fix 8: Defend Against Unhandled Stream Errors at the Process Level
Even with careful per-stream error handling, third-party libraries and forgotten edge cases can still emit unhandled stream errors. As a backstop, register process-level handlers that log the error and shut down gracefully instead of letting Node throw an uncaught exception:
process.on('uncaughtException', (err) => {
// Log with full context — this fires for unhandled stream errors too
console.error('Uncaught exception:', err);
// Don't try to recover — the process is in an unknown state.
// Let your supervisor (PM2, systemd, Kubernetes) restart it.
process.exit(1);
});
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled rejection at:', promise, 'reason:', reason);
process.exit(1);
});Pro Tip: Combine the process-level handler with structured logging so you can identify which stream operation caused the crash. Include request IDs, file paths, and stack traces. Without context, “uncaughtException” alerts are useless — you need to know what code path triggered it.
Audit your codebase for unguarded streams. Search for .pipe( and createReadStream, createWriteStream, createGzip, createDecipheriv, etc. Every one of these should be wrapped in pipeline() or have explicit .on('error', ...) handlers. A grep pass after each major change catches new instances before they ship.
Production Incident Playbook: Stream Error Crashes the Service
Scenario: Your Node.js service handles file uploads. At peak traffic, the process restart count starts climbing. Users see “connection reset” errors. Logs show Error: ENOENT: no such file or directory followed by Process exited with code 1. The orchestrator restarts the container, but in-flight requests die with it.
Blast radius: Every concurrent request on the crashing process. With 100 concurrent connections per worker and a process restart, that’s 100 dropped requests per crash. Under high traffic, this compounds: load balancer sends new connections to the surviving workers, they get overloaded, latency rises, more requests time out. A single bad upload can degrade the whole service.
Detection: Watch the process restart count metric (kube_pod_container_status_restarts_total for Kubernetes, or your supervisor’s restart counter). Any non-zero rate is worth investigating. Also alert on uncaughtException and unhandledRejection log entries.
Diagnosis checklist:
- Pull the crash logs from the last restart. The stack trace usually points at the stream operation that failed.
- Identify the stream chain. Is it
.pipe()orpipeline()?.pipe()is the prime suspect for missing error handlers. - Look for streams that originate from user input — file uploads, HTTP responses from third parties, S3 reads. These are the most likely error sources.
- Check if the failing path has
.on('error', ...)on every stream. Missing handlers on intermediate streams (like agziptransform) are common oversights. - Reproduce locally by triggering the failure mode (truncated file, network disconnect, disk full) and confirm the fix.
Recovery: Add a process-level uncaughtException handler (Fix 8) immediately as a bandage. Then rewrite the offending pipe chain to use pipeline(). Deploy the fix and monitor the restart count return to zero. Recovery time is dominated by deploy duration — typically 15-30 minutes for a hotfix.
Prevention: Lint rule against .pipe() (eslint-plugin-n has rules for this). Mandate pipeline() in code review. Add a smoke test that intentionally triggers a stream error (e.g., read a non-existent file) and confirms the process does not crash. For long-running services, this test catches regressions that local development misses. See Fix: Node.js Unhandled Rejection Crash for the related Promise-side pattern.
Still Not Working?
'finish' vs 'end' event — 'end' is emitted on Readable streams when all data is consumed. 'finish' is emitted on Writable streams when writable.end() is called and all data is flushed. Using the wrong event for the wrong stream type is a common source of timing bugs.
Stream is paused and never resumes — if you attach a 'data' event listener after attaching a 'readable' listener, the stream state can get confused. Use one API consistently: either event-based ('data'/'end') or the readable.read() API.
Streams in objectMode — by default, streams work with Buffer or string. Pass { objectMode: true } in the constructor options when working with JavaScript objects. Mixing objectMode and non-objectMode streams in a pipeline causes type errors.
write() after end() — calling writable.write() after writable.end() throws Error: write after end. This often happens in concurrent scenarios where multiple async operations try to write to the same stream. Use a queue or ensure writes are serialized.
Memory grows unbounded during large file processing — even with backpressure, certain Transform implementations buffer everything in memory before pushing. Watch out for JSON.parse on a full buffer, or string concatenation patterns. Process in true streaming fashion (line by line, chunk by chunk) or you will hit JavaScript heap out of memory. See Fix: Node.js Heap Out of Memory for the broader issue.
File descriptor leaks under stream errors — when a stream errors and you do not destroy related streams, the underlying file descriptor stays open. Over time this exhausts the process limit (EMFILE: too many open files). pipeline() cleans up automatically; manual .pipe() does not.
Streams behave differently in workers vs main thread — worker_threads have their own event loop and stream state. Passing a stream across the worker boundary via MessageChannel requires transferableObject semantics and is error-prone. Do the stream processing entirely inside one thread.
For related Node.js issues, see Fix: Node.js Heap Out of Memory, Fix: Node.js Uncaught Exception, and Fix: Node.js Cannot Find Module.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Fastify Not Working — 404, Plugin Encapsulation, and Schema Validation Errors
How to fix Fastify issues — route 404 from plugin encapsulation, reply already sent, FST_ERR_VALIDATION, request.body undefined, @fastify/cors, hooks not running, and TypeScript type inference.
Fix: Bun Not Working — Node.js Module Incompatible, Native Addon Fails, or bun test Errors
How to fix Bun runtime issues — Node.js API compatibility, native addons (node-gyp), Bun.serve vs Node http, bun test differences from Jest, and common package incompatibilities.
Fix: Node.js UnhandledPromiseRejection and uncaughtException — Crashing Server
How to handle Node.js uncaughtException and unhandledRejection events — graceful shutdown, error logging, async error boundaries, and keeping servers alive safely.
Fix: Socket.IO CORS Error — Cross-Origin Connection Blocked
How to fix Socket.IO CORS errors — server-side CORS configuration, credential handling, polling vs WebSocket transport, proxy setup, and common connection failures.