Fix: RxJS Not Working — Observable Not Emitting, Memory Leak from Unsubscribed Stream, or Operator Behaving Unexpectedly
Part of: React & Frontend Errors
Quick Answer
How to fix RxJS issues — subscription management, switchMap vs mergeMap vs concatMap, error handling with catchError, Subject types, cold vs hot observables, and Angular async pipe.
The Problem
An observable emits values but the subscriber never receives them:
const subject = new Subject<number>();
subject.subscribe(val => console.log(val)); // Never logs
// ... elsewhere in the code
subject.next(1); // Subject was completed before thisOr a memory leak because subscriptions aren’t cleaned up:
// Angular component
ngOnInit() {
this.userService.getUser().subscribe(user => {
this.user = user; // Subscription lives forever — even after component destroys
});
}Or switchMap drops requests unexpectedly:
this.searchInput.valueChanges.pipe(
switchMap(query => this.api.search(query)),
).subscribe(results => this.results = results);
// Fast typing cancels previous requests — some results are lostOr an error in one stream kills the entire observable chain:
this.data$.subscribe({
error: err => console.error(err),
});
// After one error, the observable completes — never emits againWhy This Happens
RxJS has nuanced behavior around subscription lifecycle and operator semantics:
- Completed or errored observables never emit again — once a
Subjectis completed (viacomplete()) or throws an unhandled error, all futurenext()calls are silently ignored. You must create a new Subject or handle errors before they reach the subscriber. - Every
subscribe()call creates a new subscription — subscriptions don’t clean themselves up. In Angular components or React effects, subscriptions created in lifecycle methods must be explicitly unsubscribed when the component destroys or the effect cleans up. switchMapcancels the previous inner observable — by design. UsemergeMapif you want all requests to complete,concatMapif order matters, orexhaustMapif you want to ignore new events while one is in progress.- Errors propagate downstream and terminate the stream — the default RxJS error handling terminates the observable. Use
catchErrorinside the pipe, before the subscriber, to recover.
The deeper trap with RxJS is that subscriptions are push-based callbacks disguised as data structures. The pipeline reads like a chain of pure functions, so engineers think of it as “the observable owns its values.” It doesn’t. Every subscribe() is a fresh consumer attached to a producer that may or may not replay history. A Subject doesn’t replay. A BehaviorSubject replays the latest value. A ReplaySubject(n) replays the last n. Picking the wrong Subject type doesn’t break the build — it produces a UI that looks correct on the first interaction and breaks on the second (e.g. opening a dialog twice, where the second open never receives the initial state).
The other class of issue is the invisible subscription — a stream that’s still emitting in memory because no one called unsubscribe(). This shows up as memory growth in DevTools, but also as a more confusing symptom: console.log statements firing multiple times for what looks like a single event. Each ghost subscription executes the side effect once. If you ever ask “why is this running three times?” the answer is almost always that you’ve created three subscribers and only think you have one. The fix is structural — adopt takeUntil(destroy$), takeUntilDestroyed(), or the async pipe and stop reaching for raw .subscribe() in components.
Diagnostic Timeline
The classic RxJS “it’s not working” feels like the stream is broken. Almost always, the stream is fine and something around it is wrong.
Minute 0–2. First wrong suspicion: “I need takeUntil.” Maybe true for cleanup, but if the problem is “value never arrives,” takeUntil doesn’t help. Add a tap(v => console.log('seen:', v)) immediately before subscribe. If tap fires, your subscriber callback has a bug. If tap doesn’t fire, the stream really isn’t emitting.
Minute 2–7. The stream doesn’t emit. Check what type of source it is. new Subject() will not emit values published before subscribe(). Replace with BehaviorSubject(initial) or ReplaySubject(1) and re-test. If it’s an HTTP call (cold observable), confirm subscribe() is actually being called — Angular’s async pipe subscribes lazily inside *ngIf, and a hidden template branch means the HTTP call never fires.
Minute 7–15. Stream emits once then stops. Real cause: a catchError returned EMPTY after an error, or the source observable completed. Inspect with .pipe(tap({ next: v => log('next', v), error: e => log('err', e), complete: () => log('done') })). The Subject pattern that bites hardest: calling .complete() somewhere on a global Subject, after which all subscribers go dead.
Minute 15–25. “Same event handled twice” symptom. First wrong suspicion: “RxJS bug.” Real cause: you called subscribe() twice — once on the raw observable, once again after piping through shareReplay. Without shareReplay({ refCount: true }), the cached observable keeps its inner subscription alive forever. Search the codebase for every .subscribe( to count subscribers. Use shareReplay({ bufferSize: 1, refCount: true }) to release the inner subscription when the last subscriber leaves.
Minute 25–35. Memory leak. Open Chrome DevTools → Performance → record a session → look at the heap. If a component subscribes inside ngOnInit without takeUntil(this.destroy$) or takeUntilDestroyed(), every navigation away leaves a stale subscription. The fix is mechanical: adopt one cleanup strategy and apply it consistently. Mixing approaches (some components use takeUntil, some use takeUntilDestroyed, some never clean up) makes audits expensive.
Minute 35+. Last-resort cause: a nested subscribe(...) inside another subscribe(...). Anti-pattern. The inner subscription is never cleaned up and never participates in the outer pipeline’s error handling. Refactor with a flattening operator (switchMap/mergeMap/concatMap) so the inner observable joins the same pipe.
Fix 1: Manage Subscriptions to Prevent Memory Leaks
// Angular — traditional unsubscribe pattern
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';
@Component({ ... })
export class UserComponent implements OnInit, OnDestroy {
private subscription = new Subscription();
ngOnInit() {
this.subscription.add(
this.userService.getUser().subscribe(user => this.user = user)
);
this.subscription.add(
this.searchService.getResults().subscribe(r => this.results = r)
);
}
ngOnDestroy() {
this.subscription.unsubscribe(); // Cancels all added subscriptions
}
}
// Angular — modern approach with takeUntilDestroyed (Angular 16+)
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DestroyRef, inject } from '@angular/core';
@Component({ ... })
export class UserComponent {
private destroyRef = inject(DestroyRef);
ngOnInit() {
this.userService.getUser()
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(user => this.user = user);
}
}
// Angular — async pipe (best approach — auto-unsubscribes)
// template:
// <div *ngIf="user$ | async as user">{{ user.name }}</div>
user$ = this.userService.getUser();
// React — useEffect cleanup
useEffect(() => {
const subscription = observable$.subscribe(value => setValue(value));
return () => subscription.unsubscribe(); // Cleanup on unmount
}, []);
// Using take(1) for one-shot subscriptions
this.userService.getUser()
.pipe(take(1)) // Auto-completes after first emission
.subscribe(user => this.user = user);Fix 2: Choose the Right Flattening Operator
All four operators handle inner observables differently:
import { switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs/operators';
// switchMap — cancels previous inner observable when new outer value arrives
// Use for: search autocomplete, navigation, latest-value-wins scenarios
this.searchInput.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.api.search(query)),
// If user types fast, only the last request completes
).subscribe(results => this.results = results);
// mergeMap (flatMap) — all inner observables run concurrently, no cancellation
// Use for: parallel requests where all results matter
const userIds = [1, 2, 3, 4, 5];
from(userIds).pipe(
mergeMap(id => this.api.getUser(id)), // All 5 requests fire at once
).subscribe(user => this.allUsers.push(user));
// concatMap — processes inner observables one by one, in order
// Use for: sequential operations, file uploads, ordered processing
from(filesToUpload).pipe(
concatMap(file => this.api.upload(file)), // Uploads one at a time, in order
).subscribe(result => this.uploadResults.push(result));
// exhaustMap — ignores new outer values while inner is active
// Use for: form submit buttons, preventing double-submit
this.submitButton.clicks.pipe(
exhaustMap(() => this.api.submitForm(this.form)),
// Ignores additional clicks while submit is pending
).subscribe(response => this.handleResponse(response));Fix 3: Handle Errors Without Terminating the Stream
import { catchError, retry, retryWhen, EMPTY, of, throwError } from 'rxjs';
import { delay, take } from 'rxjs/operators';
// catchError — recover and continue
this.api.getUsers().pipe(
catchError(err => {
console.error('Failed to get users:', err);
return of([]); // Return empty array as fallback
// return EMPTY; // Complete without emitting
// return throwError(() => err); // Re-throw to propagate
}),
).subscribe(users => this.users = users);
// Keep the stream alive after errors (for long-lived streams)
this.websocket.messages$.pipe(
switchMap(msg => this.processMessage(msg).pipe(
catchError(err => {
console.error('Processing failed:', err);
return EMPTY; // Skip this message, continue listening
})
)),
).subscribe(result => this.handleResult(result));
// retry — retry on error
this.api.fetchData().pipe(
retry(3), // Retry up to 3 times before propagating error
).subscribe(data => this.data = data);
// Retry with exponential backoff
this.api.fetchData().pipe(
retryWhen(errors => errors.pipe(
delay(1000), // Wait 1 second between retries
take(3), // Give up after 3 retries
)),
).subscribe(data => this.data = data);
// Or use retry with config (RxJS 7+)
this.api.fetchData().pipe(
retry({
count: 3,
delay: (error, retryCount) => timer(retryCount * 1000), // Exponential backoff
}),
).subscribe();Fix 4: Understand Cold vs Hot Observables
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject, share, shareReplay } from 'rxjs';
// COLD observable — each subscriber gets its own execution
// HTTP calls are cold — each subscribe triggers a new request
const users$ = this.http.get('/api/users');
users$.subscribe(u => console.log('Sub 1:', u));
users$.subscribe(u => console.log('Sub 2:', u));
// Two HTTP requests fired!
// HOT observable — all subscribers share the same execution
// Make cold observables hot with share() or shareReplay()
const sharedUsers$ = this.http.get('/api/users').pipe(
shareReplay(1), // Cache last emission — new subscribers get it immediately
);
sharedUsers$.subscribe(u => console.log('Sub 1:', u));
sharedUsers$.subscribe(u => console.log('Sub 2:', u));
// Only ONE HTTP request, both subscribers receive the result
// Subject — manually push values, hot
const events$ = new Subject<Event>();
events$.subscribe(e => console.log('Subscriber 1:', e));
events$.next(new Event('click')); // Both subscribers receive it
events$.subscribe(e => console.log('Subscriber 2:', e));
events$.next(new Event('click')); // Only Subscriber 2 receives this one
// BehaviorSubject — has a current value, new subscribers get latest emission
const currentUser$ = new BehaviorSubject<User | null>(null);
currentUser$.value; // Access current value synchronously
currentUser$.subscribe(u => console.log(u)); // Immediately receives null
currentUser$.next(loggedInUser); // All subscribers get the new user
// ReplaySubject — replays N emissions to new subscribers
const recentActions$ = new ReplaySubject<Action>(5); // Replay last 5
recentActions$.next({ type: 'LOAD' });
recentActions$.next({ type: 'SUCCESS' });
// New subscriber receives both LOAD and SUCCESS immediately
// AsyncSubject — only emits the last value, on complete
const result$ = new AsyncSubject<number>();
result$.next(1);
result$.next(2);
result$.next(3);
result$.complete(); // Now emits 3 to all subscribersFix 5: Common Operators and Patterns
import {
map, filter, tap, take, skip, distinctUntilChanged,
debounceTime, throttleTime, startWith, pairwise,
combineLatest, forkJoin, merge, zip, withLatestFrom,
scan, reduce, buffer, window, groupBy,
} from 'rxjs';
// Transform and filter
source$.pipe(
map(x => x * 2),
filter(x => x > 10),
tap(x => console.log('Value:', x)), // Side effect without transforming
take(5), // Take only first 5 values
distinctUntilChanged(), // Skip duplicate consecutive values
);
// Rate limiting
searchInput$.pipe(
debounceTime(300), // Emit only after 300ms of silence
throttleTime(1000), // Emit at most once per second
);
// Combining streams
combineLatest([user$, settings$]).pipe(
map(([user, settings]) => ({ user, settings })),
);
// Emits when ANY input emits, with latest values from all
forkJoin([
this.api.getUser(id),
this.api.getPosts(id),
]).pipe(
map(([user, posts]) => ({ user, posts })),
);
// Emits once when ALL observables complete (like Promise.all)
withLatestFrom(currentFilter$).pipe(
map(([data, filter]) => applyFilter(data, filter)),
);
// Get latest value from secondary stream without subscribing to it
// Accumulate state
actions$.pipe(
scan((state, action) => reducer(state, action), initialState),
);
// Like Array.reduce, but for streams
// Previous and current values
values$.pipe(
startWith(null),
pairwise(), // Emits [previous, current]
map(([prev, curr]) => ({ prev, curr, changed: prev !== curr })),
);Fix 6: Angular Async Pipe Best Practices
// AVOID: manual subscription in component
@Component({ ... })
class UserComponent implements OnInit, OnDestroy {
user: User;
private sub: Subscription;
ngOnInit() {
this.sub = this.userService.getUser(this.id).subscribe(u => this.user = u);
}
ngOnDestroy() {
this.sub.unsubscribe();
}
}
// PREFER: async pipe in template (auto-unsubscribes, triggers change detection)
@Component({
template: `
<div *ngIf="user$ | async as user; else loading">
<h1>{{ user.name }}</h1>
</div>
<ng-template #loading>Loading...</ng-template>
`,
changeDetection: ChangeDetectionStrategy.OnPush,
})
class UserComponent {
user$ = this.userService.getUser(this.id);
}
// Combine multiple streams in template
@Component({
template: `
<ng-container *ngIf="{
user: user$ | async,
posts: posts$ | async,
loading: loading$ | async
} as vm">
<app-user [user]="vm.user" [posts]="vm.posts" />
<app-spinner *ngIf="vm.loading" />
</ng-container>
`,
})
class DashboardComponent {
user$ = this.userService.getUser();
posts$ = this.postsService.getPosts();
loading$ = combineLatest([this.user$, this.posts$]).pipe(
map(() => false),
startWith(true),
);
}Still Not Working?
Observable emits but async pipe doesn’t update the view — in OnPush change detection mode, Angular only checks for changes when: (1) an input property changes, (2) an event from the component or its children occurs, (3) ChangeDetectorRef.markForCheck() is called, or (4) an async pipe resolves. If you’re using async pipe and OnPush, this should work automatically. If it doesn’t, check that you’re not sharing the observable reference across multiple places without shareReplay.
Subject.next() called before any subscriber — values emitted before a subscriber exists are lost on a Subject. Use BehaviorSubject (replays last value) or ReplaySubject (replays N values) if late subscribers need previous emissions. This is the cold/hot distinction in practice.
forkJoin never emits — forkJoin waits for all inner observables to complete. If any observable is an infinite stream (like BehaviorSubject), it never completes, so forkJoin never emits. Use combineLatest for streams that don’t complete, or add take(1) to force completion:
forkJoin([
user$.pipe(take(1)), // Force completion
settings$.pipe(take(1)),
]).subscribe(([user, settings]) => { ... });shareReplay never releases the inner subscription — the default shareReplay(1) keeps the source subscription alive forever, even after all consumers unsubscribe. For HTTP-backed observables in long-lived services, this leaks the request and any retained closures. Switch to shareReplay({ bufferSize: 1, refCount: true }) so the inner subscription is torn down when the last subscriber leaves, and re-created when a new subscriber arrives.
Nested subscribe() inside another subscribe() callback — service.getUser().subscribe(u => service.getPosts(u.id).subscribe(...)) looks fine but creates an orphan subscription that ignores the outer pipeline’s error handling and cleanup. Flatten with switchMap: service.getUser().pipe(switchMap(u => service.getPosts(u.id))).subscribe(...). Now both errors and cleanup propagate through one chain.
combineLatest waits forever on first emit — combineLatest only emits after every source has emitted at least once. If one of your sources is a Subject that hasn’t been .next()ed yet, nothing happens. Either seed with BehaviorSubject(initial) or startWith(value) so each branch has a starting value.
For related Angular issues, see Fix: Angular RxJS Memory Leak, Fix: Angular Signals Not Updating, Fix: TanStack Query Not Working, and Fix: React useState Not Updating.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Angular Form Validation Not Working — Validators Not Triggering
How to fix Angular form validation not working — Reactive Forms vs Template-Driven, custom validators, async validators, touched/dirty state, and error message display.
Fix: Angular Lazy Loading Not Working — Routes Not Code-Split
How to fix Angular lazy loading not working — loadChildren syntax, standalone components, route configuration mistakes, preloading strategies, and debugging bundle splits.
Fix: Angular RxJS Memory Leak — Subscriptions Not Unsubscribed
How to fix RxJS memory leaks in Angular — unsubscribing from Observables, takeUntilDestroyed, async pipe, subscription management patterns, and detecting leaks with Chrome DevTools.
Fix: Angular Change Detection Not Working — View Not Updating
How to fix Angular change detection issues — OnPush strategy not triggering, async pipe, markForCheck vs detectChanges, zone.js and zoneless patterns, and manual change detection triggers.