Fix: Kotlin Flow Not Working — Not Collecting, StateFlow Not Updating, or Flow Cancelled Unexpectedly
Part of: Java & JVM Errors
Quick Answer
How to fix Kotlin Flow issues — cold vs hot flows, collectLatest vs collect, StateFlow and SharedFlow setup, lifecycle-aware collection in Android, and common Flow cancellation problems.
The Problem
A Flow emits values but collect never receives them:
val flow = flow {
emit(1)
emit(2)
emit(3)
}
// Never prints anything
CoroutineScope(Dispatchers.Main).launch {
flow.collect { value ->
println(value)
}
}Or StateFlow updates are missed in the UI:
class MyViewModel : ViewModel() {
private val _state = MutableStateFlow(0)
val state: StateFlow<Int> = _state
fun increment() {
_state.value++ // Updates — but UI never re-renders
}
}Or a Flow that should emit continuously gets cancelled after the first value:
viewModel.dataFlow
.collect { data ->
updateUI(data)
// After calling another suspending function here — Flow stops
}Why This Happens
Kotlin Flow has several behaviors that differ from RxJava or LiveData:
- Cold flows don’t execute until collected — creating a
flow { }builder doesn’t start the producer. Eachcollectcall starts a new execution. If no one collects, nothing happens. - Flow is tied to the collecting coroutine’s lifecycle — if the coroutine is cancelled (e.g., because the screen rotates, the ViewModel is cleared, or the scope is closed), the Flow collection stops. This is intentional, but can cause values to be missed if the scope is too narrow.
StateFlownever replays when the value hasn’t changed —StateFlowis a conflated hot flow. If you emit the same value twice (_state.value = 5; _state.value = 5), collectors only see one emission. It also starts with an initial value, so late collectors receive the current value immediately.collectis a blocking suspending function — code aftercollectin the same coroutine never runs while the flow is active (unless the flow completes). Uselaunchto collect in a separate coroutine.collectLatestcancels the previous block — if the flow emits faster than you process,collectLatestcancels the previous processing block and starts a new one. Long processing insidecollectLatestmay never complete.
Flow was designed to embrace structured concurrency, which is the second-biggest source of “my flow stopped working” reports after lifecycle scoping. Every collect runs inside a coroutine scope, and when that scope cancels — viewModelScope.cancel(), coroutineScope { } completion, the parent job throwing — every active collector is cancelled with it. This is intentional and prevents leaks, but it means a Flow that “used to work” can fall silent after refactoring an outer coroutine. The fix is almost always reasoning about which scope owns the collector, not about the Flow itself.
Backpressure is the other architectural gap newcomers stumble into. Flow has no built-in subscriber pull-request mechanism the way Reactive Streams does. Instead, suspension provides natural backpressure: an upstream emit suspends until the downstream consumer is ready. That works for in-process pipelines but breaks when you wrap a callback API with callbackFlow and emit faster than the consumer can drain. trySend returns a ChannelResult you must inspect, and buffer(capacity = ..., onBufferOverflow = ...) decides what happens when the channel fills. Skipping that decision leads to dropped events or memory growth.
StateFlow’s equality check is the third hidden gotcha. The check uses equals, so two distinct list instances with identical contents compare equal and the second assignment emits nothing. Data classes with computed fields that look identical but aren’t included in equals produce the opposite problem: identical-looking states that constantly re-emit. Audit the equals of any class you stash in a StateFlow.
Fix 1: Understand Cold vs Hot Flows
Cold flows start fresh for each collector. Hot flows (like StateFlow and SharedFlow) run independently:
// COLD flow — runs once per collector, no sharing
val coldFlow = flow {
println("Producer started") // Prints for EACH collector
emit(1)
emit(2)
emit(3)
}
// Two collectors start two separate producers:
launch { coldFlow.collect { println("A: $it") } } // A: 1, A: 2, A: 3
launch { coldFlow.collect { println("B: $it") } } // B: 1, B: 2, B: 3
// HOT StateFlow — one shared state, late collectors get current value
val stateFlow = MutableStateFlow(0)
stateFlow.value = 5
launch { stateFlow.collect { println("Collector: $it") } }
// Immediately prints: "Collector: 5" (current value)
// HOT SharedFlow — configurable replay, no initial value
val sharedFlow = MutableSharedFlow<Int>(replay = 3) // Replays last 3 values
launch { sharedFlow.emit(1) }
launch { sharedFlow.emit(2) }
// Late collector gets: 1, 2 (if within replay buffer)Convert cold to hot with shareIn or stateIn:
// Share an expensive cold flow between multiple collectors
class Repository(private val db: Database) {
// WITHOUT sharing: each collector starts a new database query
val users: Flow<List<User>> = db.getUsersFlow()
// WITH sharing: one database subscription, shared among all collectors
val usersShared: Flow<List<User>> = db.getUsersFlow()
.shareIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000), // Keep alive 5s after last subscriber
replay = 1
)
// Convert to StateFlow: always has a current value
val usersState: StateFlow<List<User>> = db.getUsersFlow()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
}Fix 2: Collect Flows in the Right Lifecycle Scope
Always collect Android Flows in a lifecycle-aware scope to prevent crashes and leaks:
// WRONG — collects in a scope that survives config changes
// May update destroyed views; also leaks if not cancelled manually
class MyFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
CoroutineScope(Dispatchers.Main).launch {
viewModel.state.collect { state ->
updateUI(state) // May run after view is destroyed
}
}
}
}
// CORRECT — use repeatOnLifecycle (recommended for Android Fragments/Activities)
class MyFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
viewLifecycleOwner.lifecycleScope.launch {
// Starts collection when STARTED, cancels when STOPPED
// Re-collects when STARTED again (e.g., after returning from background)
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.state.collect { state ->
updateUI(state)
}
}
}
}
}
// Multiple flows at the same time
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
launch {
viewModel.users.collect { updateUserList(it) }
}
launch {
viewModel.errors.collect { showError(it) }
}
}
}In Compose — use collectAsStateWithLifecycle:
// Add dependency: lifecycle-runtime-compose
@Composable
fun UserScreen(viewModel: UserViewModel = viewModel()) {
// Lifecycle-aware collection — stops when the screen is not visible
val state by viewModel.state.collectAsStateWithLifecycle()
// Or for non-lifecycle-aware (persists in background — avoid for production)
val stateAlways by viewModel.state.collectAsState()
UserContent(state = state)
}Fix 3: Choose the Right Collection Terminal
Different collection operators have different behavior on backpressure:
// collect — processes every emission, queues if slow
flow.collect { value ->
delay(1000) // Slow processing — all values queue up
processValue(value)
}
// collectLatest — cancels previous block when new value arrives
// Best for: UI updates, search queries (only care about latest)
flow.collectLatest { value ->
delay(1000) // If new value arrives before 1s, this is cancelled
updateUI(value) // Only runs if no new value arrived within 1s
}
// With buffer — decouple producer and consumer speeds
flow
.buffer(capacity = 64) // Buffer up to 64 items
.collect { value ->
delay(100) // Consumer is slower, but producer isn't blocked
processValue(value)
}
// conflate — like collectLatest but at the Flow level (not operator)
flow
.conflate() // Drop intermediate values when consumer is slow
.collect { value ->
processValue(value)
}Choosing the right operator:
// Search box — only care about the latest query
searchQuery
.debounce(300) // Wait 300ms after typing stops
.distinctUntilChanged() // Skip if same query
.flatMapLatest { query ->
repository.search(query) // Cancel previous search on new query
}
.collectLatest { results ->
updateResults(results)
}
// Progress events — need all values
uploadProgress
.collect { percent ->
progressBar.progress = percent
}
// Sensor data — only need latest, not historical
accelerometer
.conflate()
.collect { reading ->
updateVisualization(reading)
}Fix 4: Fix StateFlow and SharedFlow Issues
// StateFlow: initial value required, equality-based, always has a value
class CounterViewModel : ViewModel() {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() {
_count.value++
// OR: _count.update { it + 1 } — thread-safe atomic update
}
// update { } for read-modify-write (thread-safe)
fun addToCart(item: Item) {
_cart.update { currentCart ->
currentCart + item
}
}
}
// StateFlow won't emit if the value is equal to the current value:
_count.value = 5
_count.value = 5 // No emission — same value
// For data classes, ensure equals() is implemented correctly:
data class UserState(val name: String, val count: Int)
_state.value = UserState("Alice", 1)
_state.value = UserState("Alice", 1) // No emission — same data class value
_state.value = UserState("Alice", 2) // Emits — different value// SharedFlow: no initial value, configurable replay and buffer
class EventViewModel : ViewModel() {
// One-time events (no replay — new collectors don't get past events)
private val _events = MutableSharedFlow<UiEvent>()
val events: SharedFlow<UiEvent> = _events.asSharedFlow()
fun showError(message: String) {
viewModelScope.launch {
_events.emit(UiEvent.Error(message))
}
}
// With replay: new collectors see last N events
private val _notifications = MutableSharedFlow<String>(
replay = 3,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
}
// Collecting SharedFlow in Fragment
viewLifecycleOwner.lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.events.collect { event ->
when (event) {
is UiEvent.Error -> showSnackbar(event.message)
is UiEvent.Navigate -> findNavController().navigate(event.destination)
}
}
}
}Fix 5: Combine and Transform Flows
Build complex flows by combining operators:
// Combine multiple flows
val userId: StateFlow<Int> = userViewModel.userId
val preferences: Flow<Preferences> = preferencesRepository.flow
combine(userId, preferences) { id, prefs ->
Pair(id, prefs)
}.collect { (id, prefs) ->
loadUserWithPrefs(id, prefs)
}
// zip — pairs emissions one-to-one
val numbers = flowOf(1, 2, 3)
val letters = flowOf("a", "b", "c")
numbers.zip(letters) { num, letter ->
"$num$letter"
}.collect { println(it) }
// 1a, 2b, 3c
// flatMapLatest — cancel previous inner flow when outer emits
viewModel.selectedUserId
.flatMapLatest { userId ->
repository.getUser(userId) // New flow for each userId
}
.collect { user ->
updateUI(user)
}
// flatMapMerge — run inner flows concurrently
urls.flatMapMerge(concurrency = 4) { url ->
flow { emit(httpClient.get(url)) }
}.collect { response ->
process(response)
}
// mapNotNull — filter and transform in one step
dataFlow
.mapNotNull { it.data } // Skips nulls
.collect { data ->
processData(data)
}Fix 6: Handle Errors in Flows
Unhandled exceptions in a flow cancel it. Use catch to handle errors without terminating the flow:
// WRONG — exception cancels the entire flow
repository.getDataFlow()
.collect { data ->
riskyOperation(data) // Throws — collection stops completely
}
// CORRECT — catch exceptions and continue or emit error state
repository.getDataFlow()
.catch { e ->
emit(emptyList()) // Emit fallback value
// Or: emit(Resource.Error(e.message))
}
.collect { data ->
updateUI(data)
}
// Retry on error with backoff
repository.getDataFlow()
.retry(3) { e ->
e is IOException // Only retry IOExceptions
}
.catch { e ->
// Only reached if all retries fail
showPersistentError(e)
}
.collect { updateUI(it) }
// onEach for side effects (like logging) without consuming the flow
repository.getDataFlow()
.onEach { data -> logAnalytics(data) }
.onStart { showLoading() }
.onCompletion { hideLoading() }
.catch { e -> showError(e) }
.collect { updateUI(it) }Cross-Tool Comparison: Kotlin Flow vs RxJava vs Reactor vs RxJS vs Combine
Reactive streams libraries look superficially identical — they all map, filter, and combine async sequences — but the cold/hot distinction, cancellation model, and backpressure strategy diverge in ways that bite when you port code between ecosystems.
Kotlin Flow is cold by default, integrated with coroutines, and uses structured concurrency for cancellation. When the enclosing scope cancels, every collector is cancelled deterministically. Backpressure is implicit via suspension: the upstream emit suspends until the downstream is ready. Hot variants (StateFlow, SharedFlow) are explicit, not the default, and their behavior is configured via replay, started, and BufferOverflow parameters.
RxJava Observable (and its later Flowable variant) was the inspiration for Flow but predates coroutines. Observable does not support backpressure; Flowable does, using one of six BackpressureStrategy values (BUFFER, DROP, LATEST, ERROR, MISSING, NONE). Subscriptions return Disposable objects you must explicitly dispose, and forgetting to dispose causes leaks that LeakCanary will surface in Android. The mental model is “manage subscriptions” rather than “manage scopes.” Migrating from RxJava to Flow usually means deleting half the CompositeDisposable plumbing.
Project Reactor Flux is the JVM equivalent for the Spring ecosystem. It implements the Reactive Streams specification (with Publisher/Subscriber/Subscription), which mandates explicit request-based backpressure. A Flux does not emit until the subscriber calls request(n). subscribeOn and publishOn control thread switching. Reactor and Flow interop via kotlinx-coroutines-reactor: flux { emit(...) } and someFlow.asFlux() bridge the two worlds.
RxJS Observable (TypeScript/JavaScript) is closest to RxJava in API shape but runs in single-threaded JavaScript environments. There is no thread-switching scheduler in the same sense as JVM reactive libraries; observeOn(asyncScheduler) queues work via microtasks instead. Cold/hot semantics are identical to RxJava: Subject and BehaviorSubject provide the hot variants. Cancellation is via Subscription.unsubscribe().
Swift Combine (Apple) shares the Reactive Streams design with Reactor but plugs into Swift’s structured concurrency via AsyncPublisher. Subscribers return AnyCancellable objects, and .store(in: &cancellables) is the idiomatic way to keep them alive. Combine has built-in operators (assign(to:)) that integrate directly with SwiftUI state — the rough equivalent of StateFlow.collectAsStateWithLifecycle in Compose.
// Kotlin Flow with structured concurrency — cancellation is automatic
viewModelScope.launch {
flow.collect { value -> handle(value) }
} // Cancelled when viewModelScope is cleared
// RxJava equivalent — explicit Disposable management
val disposable = observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { value -> handle(value) }
// Must call disposable.dispose() in onCleared() — easy to forgetThe biggest cross-tool gotcha is backpressure. Flow’s implicit suspension works great for in-process pipelines but breaks when adapting external push-based APIs. RxJava’s Flowable and Reactor’s Flux force you to choose a strategy at the source — there is no implicit suspension because Java has no coroutines. When wrapping a BroadcastReceiver in Flow’s callbackFlow, you must pick a BufferOverflow policy; when wrapping the same in Reactor, you pick a backpressure strategy at the create() call. The decision is the same, but Flow’s API makes it feel optional and it is not.
If you’re migrating from RxJava to Flow, the rough mapping is Observable → Flow, Single → suspend fun, Maybe → suspend fun returning T?, Completable → suspend fun returning Unit, Subject → SharedFlow, BehaviorSubject → StateFlow. The hard part is the cancellation model: replace every CompositeDisposable with a coroutine scope, and let structured concurrency do the cleanup.
Still Not Working?
Flow in ViewModel not updating after config change — viewModelScope survives configuration changes (screen rotation), but the Fragment/Activity is recreated. If you’re collecting in the old Fragment/Activity, it’s collecting in a cancelled scope. Always use viewLifecycleOwner.lifecycleScope in Fragments (not lifecycleScope), and use repeatOnLifecycle to restart collection.
StateFlow initial value causes unintended UI update — StateFlow always emits its current value to new collectors immediately. If you initialize with null or an empty state, your UI receives that before real data arrives. Use a sealed class for loading states:
sealed class UiState<out T> {
object Loading : UiState<Nothing>()
data class Success<T>(val data: T) : UiState<T>()
data class Error(val message: String) : UiState<Nothing>()
}
private val _state = MutableStateFlow<UiState<List<User>>>(UiState.Loading)callbackFlow or channelFlow emitting after close — if you use callbackFlow to wrap a callback-based API, always call awaitClose and clean up listeners:
fun listenToSensor(sensor: Sensor): Flow<SensorEvent> = callbackFlow {
val listener = SensorEventListener { event ->
trySend(event) // Use trySend instead of send (non-suspending)
}
sensorManager.registerListener(listener, sensor)
awaitClose {
// Called when the Flow is cancelled — clean up here
sensorManager.unregisterListener(listener)
}
}distinctUntilChanged not deduplicating with mutable types — Flow’s distinctUntilChanged compares with equals. If you emit a mutable MutableList and mutate it in place between emissions, the equality check sees the same instance and skips the emission entirely. Always emit immutable snapshots — toList() before emit, or use copy() on data classes. This bug is easy to introduce when adapting code that previously used LiveData.setValue(mutableList).
combine emits only after every flow emits at least once — combine(a, b, c) { ... } waits for all upstream flows to produce a value before emitting its first combined result. If one of those flows is a Flow that emits on user action, your combine will never produce a value until that action fires. For initial values, use onStart { emit(defaultValue) } upstream or convert to StateFlow with stateIn(...) so a current value is always available.
For related Kotlin issues, see Fix: Kotlin Coroutine Not Executing, Fix: Kotlin Coroutine Scope Cancelled, Fix: Kotlin Sealed Class Not Working, and Fix: RxJS Not Working.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Kotlin Coroutine Not Executing — launch{} or async{} Blocks Not Running
How to fix Kotlin coroutines not executing — CoroutineScope setup, dispatcher selection, structured concurrency, cancellation handling, blocking vs suspending calls, and exception propagation.
Fix: Kotlin Coroutine Scope Cancelled — JobCancellationException or Coroutine Not Running
How to fix Kotlin coroutine cancellation issues — scope lifecycle, SupervisorJob, CancellationException handling, structured concurrency, viewModelScope, and cooperative cancellation.
Fix: Kotlin Sealed Class Not Working — when Expression Not Exhaustive or Subclass Not Found
How to fix Kotlin sealed class issues — when exhaustiveness, sealed interface vs class, subclass visibility, Result pattern, and sealed classes across modules.
Fix: Fastify Not Working — 404, Plugin Encapsulation, and Schema Validation Errors
How to fix Fastify issues — route 404 from plugin encapsulation, reply already sent, FST_ERR_VALIDATION, request.body undefined, @fastify/cors, hooks not running, and TypeScript type inference.