Fix: RxJS Not Working — Observable Not Emitting, Memory Leak from Unsubscribed Stream, or Operator Behaving Unexpectedly
Quick Answer
How to fix RxJS issues — subscription management, switchMap vs mergeMap vs concatMap, error handling with catchError, Subject types, cold vs hot observables, and Angular async pipe.
The Problem
An observable emits values but the subscriber never receives them:
const subject = new Subject<number>();
subject.subscribe(val => console.log(val)); // Never logs
// ... elsewhere in the code
subject.next(1); // Subject was completed before thisOr a memory leak because subscriptions aren’t cleaned up:
// Angular component
ngOnInit() {
this.userService.getUser().subscribe(user => {
this.user = user; // Subscription lives forever — even after component destroys
});
}Or switchMap drops requests unexpectedly:
this.searchInput.valueChanges.pipe(
switchMap(query => this.api.search(query)),
).subscribe(results => this.results = results);
// Fast typing cancels previous requests — some results are lostOr an error in one stream kills the entire observable chain:
this.data$.subscribe({
error: err => console.error(err),
});
// After one error, the observable completes — never emits againWhy This Happens
RxJS has nuanced behavior around subscription lifecycle and operator semantics:
- Completed or errored observables never emit again — once a
Subjectis completed (viacomplete()) or throws an unhandled error, all futurenext()calls are silently ignored. You must create a new Subject or handle errors before they reach the subscriber. - Every
subscribe()call creates a new subscription — subscriptions don’t clean themselves up. In Angular components or React effects, subscriptions created in lifecycle methods must be explicitly unsubscribed when the component destroys or the effect cleans up. switchMapcancels the previous inner observable — by design. UsemergeMapif you want all requests to complete,concatMapif order matters, orexhaustMapif you want to ignore new events while one is in progress.- Errors propagate downstream and terminate the stream — the default RxJS error handling terminates the observable. Use
catchErrorinside the pipe, before the subscriber, to recover.
Fix 1: Manage Subscriptions to Prevent Memory Leaks
// Angular — traditional unsubscribe pattern
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';
@Component({ ... })
export class UserComponent implements OnInit, OnDestroy {
private subscription = new Subscription();
ngOnInit() {
this.subscription.add(
this.userService.getUser().subscribe(user => this.user = user)
);
this.subscription.add(
this.searchService.getResults().subscribe(r => this.results = r)
);
}
ngOnDestroy() {
this.subscription.unsubscribe(); // Cancels all added subscriptions
}
}
// Angular — modern approach with takeUntilDestroyed (Angular 16+)
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DestroyRef, inject } from '@angular/core';
@Component({ ... })
export class UserComponent {
private destroyRef = inject(DestroyRef);
ngOnInit() {
this.userService.getUser()
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(user => this.user = user);
}
}
// Angular — async pipe (best approach — auto-unsubscribes)
// template:
// <div *ngIf="user$ | async as user">{{ user.name }}</div>
user$ = this.userService.getUser();
// React — useEffect cleanup
useEffect(() => {
const subscription = observable$.subscribe(value => setValue(value));
return () => subscription.unsubscribe(); // Cleanup on unmount
}, []);
// Using take(1) for one-shot subscriptions
this.userService.getUser()
.pipe(take(1)) // Auto-completes after first emission
.subscribe(user => this.user = user);Fix 2: Choose the Right Flattening Operator
All four operators handle inner observables differently:
import { switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs/operators';
// switchMap — cancels previous inner observable when new outer value arrives
// Use for: search autocomplete, navigation, latest-value-wins scenarios
this.searchInput.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.api.search(query)),
// If user types fast, only the last request completes
).subscribe(results => this.results = results);
// mergeMap (flatMap) — all inner observables run concurrently, no cancellation
// Use for: parallel requests where all results matter
const userIds = [1, 2, 3, 4, 5];
from(userIds).pipe(
mergeMap(id => this.api.getUser(id)), // All 5 requests fire at once
).subscribe(user => this.allUsers.push(user));
// concatMap — processes inner observables one by one, in order
// Use for: sequential operations, file uploads, ordered processing
from(filesToUpload).pipe(
concatMap(file => this.api.upload(file)), // Uploads one at a time, in order
).subscribe(result => this.uploadResults.push(result));
// exhaustMap — ignores new outer values while inner is active
// Use for: form submit buttons, preventing double-submit
this.submitButton.clicks.pipe(
exhaustMap(() => this.api.submitForm(this.form)),
// Ignores additional clicks while submit is pending
).subscribe(response => this.handleResponse(response));Fix 3: Handle Errors Without Terminating the Stream
import { catchError, retry, retryWhen, EMPTY, of, throwError } from 'rxjs';
import { delay, take } from 'rxjs/operators';
// catchError — recover and continue
this.api.getUsers().pipe(
catchError(err => {
console.error('Failed to get users:', err);
return of([]); // Return empty array as fallback
// return EMPTY; // Complete without emitting
// return throwError(() => err); // Re-throw to propagate
}),
).subscribe(users => this.users = users);
// Keep the stream alive after errors (for long-lived streams)
this.websocket.messages$.pipe(
switchMap(msg => this.processMessage(msg).pipe(
catchError(err => {
console.error('Processing failed:', err);
return EMPTY; // Skip this message, continue listening
})
)),
).subscribe(result => this.handleResult(result));
// retry — retry on error
this.api.fetchData().pipe(
retry(3), // Retry up to 3 times before propagating error
).subscribe(data => this.data = data);
// Retry with exponential backoff
this.api.fetchData().pipe(
retryWhen(errors => errors.pipe(
delay(1000), // Wait 1 second between retries
take(3), // Give up after 3 retries
)),
).subscribe(data => this.data = data);
// Or use retry with config (RxJS 7+)
this.api.fetchData().pipe(
retry({
count: 3,
delay: (error, retryCount) => timer(retryCount * 1000), // Exponential backoff
}),
).subscribe();Fix 4: Understand Cold vs Hot Observables
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject, share, shareReplay } from 'rxjs';
// COLD observable — each subscriber gets its own execution
// HTTP calls are cold — each subscribe triggers a new request
const users$ = this.http.get('/api/users');
users$.subscribe(u => console.log('Sub 1:', u));
users$.subscribe(u => console.log('Sub 2:', u));
// Two HTTP requests fired!
// HOT observable — all subscribers share the same execution
// Make cold observables hot with share() or shareReplay()
const sharedUsers$ = this.http.get('/api/users').pipe(
shareReplay(1), // Cache last emission — new subscribers get it immediately
);
sharedUsers$.subscribe(u => console.log('Sub 1:', u));
sharedUsers$.subscribe(u => console.log('Sub 2:', u));
// Only ONE HTTP request, both subscribers receive the result
// Subject — manually push values, hot
const events$ = new Subject<Event>();
events$.subscribe(e => console.log('Subscriber 1:', e));
events$.next(new Event('click')); // Both subscribers receive it
events$.subscribe(e => console.log('Subscriber 2:', e));
events$.next(new Event('click')); // Only Subscriber 2 receives this one
// BehaviorSubject — has a current value, new subscribers get latest emission
const currentUser$ = new BehaviorSubject<User | null>(null);
currentUser$.value; // Access current value synchronously
currentUser$.subscribe(u => console.log(u)); // Immediately receives null
currentUser$.next(loggedInUser); // All subscribers get the new user
// ReplaySubject — replays N emissions to new subscribers
const recentActions$ = new ReplaySubject<Action>(5); // Replay last 5
recentActions$.next({ type: 'LOAD' });
recentActions$.next({ type: 'SUCCESS' });
// New subscriber receives both LOAD and SUCCESS immediately
// AsyncSubject — only emits the last value, on complete
const result$ = new AsyncSubject<number>();
result$.next(1);
result$.next(2);
result$.next(3);
result$.complete(); // Now emits 3 to all subscribersFix 5: Common Operators and Patterns
import {
map, filter, tap, take, skip, distinctUntilChanged,
debounceTime, throttleTime, startWith, pairwise,
combineLatest, forkJoin, merge, zip, withLatestFrom,
scan, reduce, buffer, window, groupBy,
} from 'rxjs';
// Transform and filter
source$.pipe(
map(x => x * 2),
filter(x => x > 10),
tap(x => console.log('Value:', x)), // Side effect without transforming
take(5), // Take only first 5 values
distinctUntilChanged(), // Skip duplicate consecutive values
);
// Rate limiting
searchInput$.pipe(
debounceTime(300), // Emit only after 300ms of silence
throttleTime(1000), // Emit at most once per second
);
// Combining streams
combineLatest([user$, settings$]).pipe(
map(([user, settings]) => ({ user, settings })),
);
// Emits when ANY input emits, with latest values from all
forkJoin([
this.api.getUser(id),
this.api.getPosts(id),
]).pipe(
map(([user, posts]) => ({ user, posts })),
);
// Emits once when ALL observables complete (like Promise.all)
withLatestFrom(currentFilter$).pipe(
map(([data, filter]) => applyFilter(data, filter)),
);
// Get latest value from secondary stream without subscribing to it
// Accumulate state
actions$.pipe(
scan((state, action) => reducer(state, action), initialState),
);
// Like Array.reduce, but for streams
// Previous and current values
values$.pipe(
startWith(null),
pairwise(), // Emits [previous, current]
map(([prev, curr]) => ({ prev, curr, changed: prev !== curr })),
);Fix 6: Angular Async Pipe Best Practices
// AVOID: manual subscription in component
@Component({ ... })
class UserComponent implements OnInit, OnDestroy {
user: User;
private sub: Subscription;
ngOnInit() {
this.sub = this.userService.getUser(this.id).subscribe(u => this.user = u);
}
ngOnDestroy() {
this.sub.unsubscribe();
}
}
// PREFER: async pipe in template (auto-unsubscribes, triggers change detection)
@Component({
template: `
<div *ngIf="user$ | async as user; else loading">
<h1>{{ user.name }}</h1>
</div>
<ng-template #loading>Loading...</ng-template>
`,
changeDetection: ChangeDetectionStrategy.OnPush,
})
class UserComponent {
user$ = this.userService.getUser(this.id);
}
// Combine multiple streams in template
@Component({
template: `
<ng-container *ngIf="{
user: user$ | async,
posts: posts$ | async,
loading: loading$ | async
} as vm">
<app-user [user]="vm.user" [posts]="vm.posts" />
<app-spinner *ngIf="vm.loading" />
</ng-container>
`,
})
class DashboardComponent {
user$ = this.userService.getUser();
posts$ = this.postsService.getPosts();
loading$ = combineLatest([this.user$, this.posts$]).pipe(
map(() => false),
startWith(true),
);
}Still Not Working?
Observable emits but async pipe doesn’t update the view — in OnPush change detection mode, Angular only checks for changes when: (1) an input property changes, (2) an event from the component or its children occurs, (3) ChangeDetectorRef.markForCheck() is called, or (4) an async pipe resolves. If you’re using async pipe and OnPush, this should work automatically. If it doesn’t, check that you’re not sharing the observable reference across multiple places without shareReplay.
Subject.next() called before any subscriber — values emitted before a subscriber exists are lost on a Subject. Use BehaviorSubject (replays last value) or ReplaySubject (replays N values) if late subscribers need previous emissions. This is the cold/hot distinction in practice.
forkJoin never emits — forkJoin waits for all inner observables to complete. If any observable is an infinite stream (like BehaviorSubject), it never completes, so forkJoin never emits. Use combineLatest for streams that don’t complete, or add take(1) to force completion:
forkJoin([
user$.pipe(take(1)), // Force completion
settings$.pipe(take(1)),
]).subscribe(([user, settings]) => { ... });For related Angular issues, see Fix: Angular RxJS Memory Leak and Fix: Angular Signals Not Updating.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Angular Lazy Loading Not Working — Routes Not Code-Split
How to fix Angular lazy loading not working — loadChildren syntax, standalone components, route configuration mistakes, preloading strategies, and debugging bundle splits.
Fix: Angular RxJS Memory Leak — Subscriptions Not Unsubscribed
How to fix RxJS memory leaks in Angular — unsubscribing from Observables, takeUntilDestroyed, async pipe, subscription management patterns, and detecting leaks with Chrome DevTools.
Fix: Angular Form Validation Not Working — Validators Not Triggering
How to fix Angular form validation not working — Reactive Forms vs Template-Driven, custom validators, async validators, touched/dirty state, and error message display.
Fix: Angular Change Detection Not Working — View Not Updating
How to fix Angular change detection issues — OnPush strategy not triggering, async pipe, markForCheck vs detectChanges, zone.js and zoneless patterns, and manual change detection triggers.