Fix: Python asyncio.gather Not Handling Errors — Exceptions Swallowed or All Tasks Cancelled
Part of: Python Errors
Quick Answer
How to fix asyncio.gather error handling — return_exceptions parameter, partial failures, task cancellation propagation, TaskGroup alternatives, and exception isolation patterns.
The Problem
asyncio.gather() cancels all tasks when one fails:
import asyncio
async def fetch_user(user_id: int):
if user_id == 2:
raise ValueError(f"User {user_id} not found")
return {"id": user_id, "name": f"User {user_id}"}
async def main():
results = await asyncio.gather(
fetch_user(1),
fetch_user(2), # Raises ValueError
fetch_user(3),
)
# All tasks are cancelled — only the ValueError propagates
# fetch_user(1) and fetch_user(3) results are lostOr exceptions are silently ignored with return_exceptions=True but you don’t check the results:
results = await asyncio.gather(
fetch_user(1),
fetch_user(2), # Raises ValueError
fetch_user(3),
return_exceptions=True
)
# results = [{"id": 1, ...}, ValueError("User 2 not found"), {"id": 3, ...}]
for result in results:
process(result) # process() called with a ValueError object — unexpected behaviorOr tasks started with asyncio.gather() keep running after an exception:
# With return_exceptions=True — all tasks complete even on failure
# Without it — one exception cancels remaining pending tasks
# The behavior surprises developers either wayWhy This Happens
asyncio.gather() has two distinct behaviors controlled by return_exceptions:
return_exceptions=False(default) — the first exception immediately propagates to thegather()call. The other tasks are NOT automatically cancelled — they continue running in the background but their results are discarded. The exception from the failed task is raised.return_exceptions=True— all tasks run to completion regardless of exceptions. Results (including exceptions as values) are returned in a list in the same order as the input tasks. No automatic cancellation.
Neither behavior matches what most developers expect. With the default, you lose partial results and leak running tasks. With return_exceptions=True, you get a mixed list of values and exceptions that requires careful filtering. The gap between “what I thought would happen” and “what actually happened” is the root cause of most gather() bugs.
The behavior also varies by environment. On Windows, the default ProactorEventLoop has limitations that can cause gather() to behave differently with subprocess and pipe-related coroutines. In Jupyter notebooks, a running event loop already exists, so calling asyncio.run() with gather() inside raises RuntimeError. And in AWS Lambda, the handler must return before the function times out, but orphaned tasks from a failed gather() keep running until the Lambda runtime kills them.
Common errors:
- Not checking whether results are exceptions when using
return_exceptions=True - Assuming other tasks stop when one fails with
return_exceptions=False— they don’t - Using
asyncio.gather()whenasyncio.TaskGroup(Python 3.11+) would be safer and clearer - Running
gather()inside a Jupyter notebook withoutnest_asyncioorawaitat the top level
Fix 1: Use return_exceptions=True and Filter Results
Handle mixed success/failure results correctly:
import asyncio
from typing import TypeVar, Union
T = TypeVar('T')
async def fetch_user(user_id: int) -> dict:
if user_id == 2:
raise ValueError(f"User {user_id} not found")
await asyncio.sleep(0.1) # Simulate I/O
return {"id": user_id, "name": f"User {user_id}"}
async def main():
user_ids = [1, 2, 3, 4]
results = await asyncio.gather(
*[fetch_user(uid) for uid in user_ids],
return_exceptions=True
)
# Separate successes from failures
successes = []
failures = []
for user_id, result in zip(user_ids, results):
if isinstance(result, Exception):
failures.append((user_id, result))
print(f"Failed to fetch user {user_id}: {result}")
else:
successes.append(result)
print(f"Fetched {len(successes)} users, {len(failures)} failures")
return successes
asyncio.run(main())Generic helper for gather with error handling:
async def gather_with_errors(*coros, logger=None):
"""
Run coroutines concurrently. Returns (results, errors) tuple.
results: list of successful return values
errors: list of (index, exception) tuples
"""
raw_results = await asyncio.gather(*coros, return_exceptions=True)
results = []
errors = []
for i, result in enumerate(raw_results):
if isinstance(result, BaseException):
errors.append((i, result))
if logger:
logger.error(f"Task {i} failed: {result}")
else:
results.append(result)
return results, errors
# Usage
async def main():
results, errors = await gather_with_errors(
fetch_user(1),
fetch_user(2),
fetch_user(3),
)
# results = [{"id": 1}, {"id": 3}]
# errors = [(1, ValueError("User 2 not found"))]Fix 2: Cancel Remaining Tasks on First Failure
With return_exceptions=False, other tasks continue silently. Cancel them explicitly:
import asyncio
async def gather_cancel_on_first_error(*coros):
"""
Like gather(), but cancels all remaining tasks when one fails.
Returns results or raises the first exception.
"""
tasks = [asyncio.create_task(coro) for coro in coros]
try:
return await asyncio.gather(*tasks)
except Exception:
# Cancel all remaining tasks
for task in tasks:
if not task.done():
task.cancel()
# Wait for cancellations to complete
await asyncio.gather(*tasks, return_exceptions=True)
raise # Re-raise the original exception
async def main():
try:
results = await gather_cancel_on_first_error(
fetch_data(1),
fetch_data(2), # Fails
fetch_data(3),
)
except ValueError as e:
print(f"One task failed: {e}")
print("All other tasks were cancelled")Fix 3: Use asyncio.TaskGroup (Python 3.11+)
asyncio.TaskGroup is the modern replacement for many gather() patterns. It guarantees all tasks are cancelled when any task fails:
import asyncio
async def main():
results = []
try:
async with asyncio.TaskGroup() as tg:
# All tasks start concurrently
task1 = tg.create_task(fetch_user(1))
task2 = tg.create_task(fetch_user(2)) # Will fail
task3 = tg.create_task(fetch_user(3))
# This line only reached if ALL tasks succeed
# tg waits for all tasks; if any fails, remaining are cancelled
results = [task1.result(), task2.result(), task3.result()]
except* ValueError as eg:
# Python 3.11+ ExceptionGroup — collect all failures
for exc in eg.exceptions:
print(f"Task failed: {exc}")
return resultsTaskGroup vs gather() key differences:
| Feature | asyncio.gather() | asyncio.TaskGroup |
|---|---|---|
| Cancel others on failure | No (with return_exceptions=False) | Yes — always |
| Return mixed results | Yes (with return_exceptions=True) | No — raises ExceptionGroup |
| Exception type | Single exception or list | ExceptionGroup |
| Python version | 3.7+ | 3.11+ |
| Task tracking | Must save tasks manually | tg.create_task() returns Task |
If you need partial results with TaskGroup, catch the ExceptionGroup and retrieve results from tasks that succeeded:
async def main():
tasks = []
try:
async with asyncio.TaskGroup() as tg:
for uid in [1, 2, 3, 4]:
tasks.append(tg.create_task(fetch_user(uid)))
except* Exception:
pass # Some tasks failed
# Collect results from tasks that didn't raise
results = []
for task in tasks:
if not task.cancelled() and task.exception() is None:
results.append(task.result())
return resultsFix 4: Windows ProactorEventLoop Limitations
On Windows, Python uses ProactorEventLoop by default (since Python 3.8). This loop has known limitations with certain gather() patterns:
# Windows-specific issue: ProactorEventLoop + subprocess in gather()
import asyncio
import sys
async def run_command(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
return stdout.decode()
async def main():
# This may raise NotImplementedError or RuntimeError on Windows
# because ProactorEventLoop doesn't support some pipe operations in parallel
results = await asyncio.gather(
run_command("echo hello"),
run_command("echo world"),
)
# Fix: Use SelectorEventLoop on Windows for subprocess tasks
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())uvloop (Linux/macOS only) for better performance:
# uvloop replaces the default event loop with a libuv-based implementation
# 2-4x faster for I/O-heavy gather() patterns
# NOT available on Windows
import asyncio
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass # Fall back to default loop on Windows or if uvloop not installed
asyncio.run(main())Fix 5: Jupyter Notebook and Nested Event Loop
Jupyter notebooks run their own event loop, so asyncio.run() inside a cell raises RuntimeError: This event loop is already running:
# WRONG in Jupyter
import asyncio
async def main():
return await asyncio.gather(fetch_user(1), fetch_user(2))
asyncio.run(main()) # RuntimeError: This event loop is already running
# FIX 1: Use top-level await (Jupyter/IPython 7.0+)
results = await asyncio.gather(fetch_user(1), fetch_user(2))
# FIX 2: Use nest_asyncio for compatibility
import nest_asyncio
nest_asyncio.apply()
asyncio.run(main()) # Now works inside Jupyter
# FIX 3: Get the running loop directly
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(fetch_user(1), fetch_user(2)))Fix 6: Lambda Handler Async Patterns
AWS Lambda supports async handlers in Python, but orphaned tasks from a failed gather() can cause unexpected behavior:
# Lambda handler — gather() with proper cleanup
import asyncio
async def handler(event, context):
tasks = [
asyncio.create_task(fetch_data(url))
for url in event['urls']
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
except Exception:
# Cancel all tasks before Lambda freezes the execution environment
for task in tasks:
if not task.done():
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise
# Filter results
successes = [r for r in results if not isinstance(r, Exception)]
return {"statusCode": 200, "body": successes}
# Lambda entry point — use a module-level loop for warm start reuse
def lambda_handler(event, context):
loop = asyncio.get_event_loop()
return loop.run_until_complete(handler(event, context))Note: If you don’t cancel orphaned tasks, they continue running in the frozen Lambda execution environment. When the same environment is reused for a warm start, those zombie tasks may resume and cause data corruption or unexpected side effects.
Fix 7: Set Timeouts on Concurrent Tasks
Individual tasks should have timeouts to prevent one slow task from blocking the group:
import asyncio
async def fetch_with_timeout(coro, timeout: float):
"""Wrap a coroutine with a timeout."""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
raise asyncio.TimeoutError(f"Task timed out after {timeout}s")
async def main():
results = await asyncio.gather(
fetch_with_timeout(fetch_user(1), timeout=5.0),
fetch_with_timeout(fetch_user(2), timeout=5.0),
fetch_with_timeout(fetch_user(3), timeout=5.0),
return_exceptions=True,
)
for i, result in enumerate(results):
if isinstance(result, asyncio.TimeoutError):
print(f"Task {i} timed out")
elif isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")Global timeout with asyncio.wait_for:
async def main():
try:
# All tasks must complete within 10 seconds
results = await asyncio.wait_for(
asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
return_exceptions=True,
),
timeout=10.0
)
except asyncio.TimeoutError:
print("Overall operation timed out")Fix 8: asyncio.wait for More Control
asyncio.wait() gives more control than gather() — process tasks as they complete:
import asyncio
async def main():
tasks = {
asyncio.create_task(fetch_user(uid), name=f"fetch-{uid}")
for uid in [1, 2, 3, 4, 5]
}
# Process tasks as they complete (not in original order)
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION, # Stop after first error
# Options: FIRST_EXCEPTION, FIRST_COMPLETED, ALL_COMPLETED
)
# Process completed tasks
for task in done:
if task.exception():
print(f"Task {task.get_name()} failed: {task.exception()}")
else:
print(f"Task {task.get_name()} result: {task.result()}")
# Cancel remaining tasks
for task in pending:
task.cancel()
# Wait for cancellations
if pending:
await asyncio.wait(pending)Process results as they arrive:
async def main():
tasks = [
asyncio.create_task(fetch_user(uid))
for uid in range(1, 11) # 10 users
]
# Process each task as it completes
for coro in asyncio.as_completed(tasks):
try:
result = await coro
print(f"Got result: {result}")
except Exception as e:
print(f"Task failed: {e}")
# Other tasks continue runningFix 9: Common gather() Patterns
Batch processing — limit concurrent tasks:
import asyncio
async def process_in_batches(items, batch_size: int, processor):
"""Process items in batches to limit concurrency."""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await asyncio.gather(
*[processor(item) for item in batch],
return_exceptions=True,
)
results.extend(batch_results)
return results
# Or use a semaphore for fine-grained control
async def limited_gather(coros, max_concurrent: int):
semaphore = asyncio.Semaphore(max_concurrent)
async def with_semaphore(coro):
async with semaphore:
return await coro
return await asyncio.gather(
*[with_semaphore(coro) for coro in coros],
return_exceptions=True,
)
# Usage — max 5 concurrent HTTP requests
results = await limited_gather(
[fetch_url(url) for url in urls],
max_concurrent=5,
)Retry individual failed tasks:
import asyncio
from typing import TypeVar, Callable, Awaitable
T = TypeVar('T')
async def retry(
coro_factory: Callable[[], Awaitable[T]],
retries: int = 3,
delay: float = 1.0,
) -> T:
last_error = None
for attempt in range(retries):
try:
return await coro_factory()
except asyncio.CancelledError:
raise # Never retry cancellation
except Exception as e:
last_error = e
if attempt < retries - 1:
await asyncio.sleep(delay * (2 ** attempt)) # Exponential backoff
raise last_error
# Usage with gather
results = await asyncio.gather(
retry(lambda: fetch_user(1)),
retry(lambda: fetch_user(2), retries=5),
retry(lambda: fetch_user(3)),
return_exceptions=True,
)Fix 10: Debug asyncio.gather Issues
Identify which tasks failed and why:
import asyncio
import traceback
async def debug_gather(*coros):
"""gather() with detailed error reporting."""
tasks = [asyncio.create_task(coro) for coro in coros]
# Add names to tasks for easier debugging
for i, task in enumerate(tasks):
task.set_name(f"task-{i}")
results = await asyncio.gather(*tasks, return_exceptions=True)
for task, result in zip(tasks, results):
if isinstance(result, Exception):
print(f"\nTask '{task.get_name()}' FAILED:")
traceback.print_exception(type(result), result, result.__traceback__)
else:
print(f"Task '{task.get_name()}' succeeded: {result}")
return results
# Enable asyncio debug mode for more verbose output
asyncio.run(debug_gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
), debug=True)Asyncio debug mode catches common mistakes:
# Enable via environment variable
PYTHONASYNCIODEBUG=1 python main.py
# Or in code
import asyncio
asyncio.get_event_loop().set_debug(True)
# Debug mode warns about:
# - Coroutines that were never awaited
# - Slow callbacks (>100ms blocking the event loop)
# - Misuse of thread-unsafe operationsStill Not Working?
BaseException vs Exception — asyncio.CancelledError is a BaseException, not Exception in Python 3.8+. Using isinstance(result, Exception) to check for failures won’t catch CancelledError. Use isinstance(result, BaseException) or check for CancelledError separately.
Tasks created before gather — tasks created with asyncio.create_task() start immediately, even before gather() is called. If you create_task() and then never await the result (and never call gather()), the task runs independently and exceptions are silently logged as “unhandled exception in task.”
return_exceptions and exception chaining — with return_exceptions=True, exceptions lose their original context (chained exceptions). If you need the full exception chain, use return_exceptions=False with try/except.
Python 3.12 eager_task_factory changes — Python 3.12 introduced asyncio.eager_task_factory, which starts tasks synchronously until their first await. If you set this factory globally, gather() behavior changes subtly: tasks that complete synchronously (without any I/O) return results immediately, but tasks that await something are deferred as usual. This can cause ordering surprises in mixed sync/async gather patterns.
aiohttp / httpx session sharing in gather — if you create a new HTTP session inside each coroutine passed to gather(), you open N connections simultaneously. Share the session across coroutines and use the session’s built-in connection pool limit:
import aiohttp
async def main():
# WRONG — each coroutine creates its own session
results = await asyncio.gather(*[fetch_with_new_session(url) for url in urls])
# CORRECT — share one session
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[fetch_with_session(session, url) for url in urls])For related Python issues, see Fix: Python asyncio Blocking the Event Loop, Fix: Python asyncio Not Running, Fix: Python Multiprocessing Not Working, and Fix: Python Decorator Not Working.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Python contextmanager Not Working — GeneratorExit, Missing yield, or Cleanup Not Running
How to fix Python context manager issues — @contextmanager generator, __enter__ and __exit__, exception handling inside with blocks, async context managers, and common pitfalls.
Fix: Python Protocol Not Working — Type Checker Rejects Compatible Class, runtime_checkable Fails, or Protocol Not Recognized
How to fix Python Protocol class issues — structural subtyping vs nominal typing, runtime_checkable, Protocol inheritance, TypeVar constraints, and common mypy/pyright errors with Protocol.
Fix: Python pathlib Not Working — Path Object Errors, Joins, and Common Pitfalls
How to fix Python pathlib issues — TypeError with string concatenation, path joining, glob patterns, reading files, cross-platform paths, and migrating from os.path.
Fix: Python Decorator Not Working — Function Signature Lost or Decorator Not Applied
How to fix Python decorator issues — functools.wraps, decorator factories with arguments, class decorators, stacking order, async function decorators, and common pitfalls.