Skip to content

Fix: Python asyncio.gather Not Handling Errors — Exceptions Swallowed or All Tasks Cancelled

FixDevs · (Updated: )

Part of:  Python Errors

Quick Answer

How to fix asyncio.gather error handling — return_exceptions parameter, partial failures, task cancellation propagation, TaskGroup alternatives, and exception isolation patterns.

The Problem

asyncio.gather() cancels all tasks when one fails:

import asyncio

async def fetch_user(user_id: int):
    if user_id == 2:
        raise ValueError(f"User {user_id} not found")
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),   # Raises ValueError
        fetch_user(3),
    )
    # All tasks are cancelled — only the ValueError propagates
    # fetch_user(1) and fetch_user(3) results are lost

Or exceptions are silently ignored with return_exceptions=True but you don’t check the results:

results = await asyncio.gather(
    fetch_user(1),
    fetch_user(2),   # Raises ValueError
    fetch_user(3),
    return_exceptions=True
)

# results = [{"id": 1, ...}, ValueError("User 2 not found"), {"id": 3, ...}]
for result in results:
    process(result)   # process() called with a ValueError object — unexpected behavior

Or tasks started with asyncio.gather() keep running after an exception:

# With return_exceptions=True — all tasks complete even on failure
# Without it — one exception cancels remaining pending tasks
# The behavior surprises developers either way

Why This Happens

asyncio.gather() has two distinct behaviors controlled by return_exceptions:

  • return_exceptions=False (default) — the first exception immediately propagates to the gather() call. The other tasks are NOT automatically cancelled — they continue running in the background but their results are discarded. The exception from the failed task is raised.
  • return_exceptions=True — all tasks run to completion regardless of exceptions. Results (including exceptions as values) are returned in a list in the same order as the input tasks. No automatic cancellation.

Neither behavior matches what most developers expect. With the default, you lose partial results and leak running tasks. With return_exceptions=True, you get a mixed list of values and exceptions that requires careful filtering. The gap between “what I thought would happen” and “what actually happened” is the root cause of most gather() bugs.

The behavior also varies by environment. On Windows, the default ProactorEventLoop has limitations that can cause gather() to behave differently with subprocess and pipe-related coroutines. In Jupyter notebooks, a running event loop already exists, so calling asyncio.run() with gather() inside raises RuntimeError. And in AWS Lambda, the handler must return before the function times out, but orphaned tasks from a failed gather() keep running until the Lambda runtime kills them.

Common errors:

  • Not checking whether results are exceptions when using return_exceptions=True
  • Assuming other tasks stop when one fails with return_exceptions=False — they don’t
  • Using asyncio.gather() when asyncio.TaskGroup (Python 3.11+) would be safer and clearer
  • Running gather() inside a Jupyter notebook without nest_asyncio or await at the top level

Fix 1: Use return_exceptions=True and Filter Results

Handle mixed success/failure results correctly:

import asyncio
from typing import TypeVar, Union

T = TypeVar('T')

async def fetch_user(user_id: int) -> dict:
    if user_id == 2:
        raise ValueError(f"User {user_id} not found")
    await asyncio.sleep(0.1)   # Simulate I/O
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    user_ids = [1, 2, 3, 4]

    results = await asyncio.gather(
        *[fetch_user(uid) for uid in user_ids],
        return_exceptions=True
    )

    # Separate successes from failures
    successes = []
    failures = []

    for user_id, result in zip(user_ids, results):
        if isinstance(result, Exception):
            failures.append((user_id, result))
            print(f"Failed to fetch user {user_id}: {result}")
        else:
            successes.append(result)

    print(f"Fetched {len(successes)} users, {len(failures)} failures")
    return successes

asyncio.run(main())

Generic helper for gather with error handling:

async def gather_with_errors(*coros, logger=None):
    """
    Run coroutines concurrently. Returns (results, errors) tuple.
    results: list of successful return values
    errors: list of (index, exception) tuples
    """
    raw_results = await asyncio.gather(*coros, return_exceptions=True)

    results = []
    errors = []

    for i, result in enumerate(raw_results):
        if isinstance(result, BaseException):
            errors.append((i, result))
            if logger:
                logger.error(f"Task {i} failed: {result}")
        else:
            results.append(result)

    return results, errors

# Usage
async def main():
    results, errors = await gather_with_errors(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )
    # results = [{"id": 1}, {"id": 3}]
    # errors = [(1, ValueError("User 2 not found"))]

Fix 2: Cancel Remaining Tasks on First Failure

With return_exceptions=False, other tasks continue silently. Cancel them explicitly:

import asyncio

async def gather_cancel_on_first_error(*coros):
    """
    Like gather(), but cancels all remaining tasks when one fails.
    Returns results or raises the first exception.
    """
    tasks = [asyncio.create_task(coro) for coro in coros]

    try:
        return await asyncio.gather(*tasks)
    except Exception:
        # Cancel all remaining tasks
        for task in tasks:
            if not task.done():
                task.cancel()

        # Wait for cancellations to complete
        await asyncio.gather(*tasks, return_exceptions=True)
        raise   # Re-raise the original exception

async def main():
    try:
        results = await gather_cancel_on_first_error(
            fetch_data(1),
            fetch_data(2),   # Fails
            fetch_data(3),
        )
    except ValueError as e:
        print(f"One task failed: {e}")
        print("All other tasks were cancelled")

Fix 3: Use asyncio.TaskGroup (Python 3.11+)

asyncio.TaskGroup is the modern replacement for many gather() patterns. It guarantees all tasks are cancelled when any task fails:

import asyncio

async def main():
    results = []

    try:
        async with asyncio.TaskGroup() as tg:
            # All tasks start concurrently
            task1 = tg.create_task(fetch_user(1))
            task2 = tg.create_task(fetch_user(2))   # Will fail
            task3 = tg.create_task(fetch_user(3))

        # This line only reached if ALL tasks succeed
        # tg waits for all tasks; if any fails, remaining are cancelled
        results = [task1.result(), task2.result(), task3.result()]

    except* ValueError as eg:
        # Python 3.11+ ExceptionGroup — collect all failures
        for exc in eg.exceptions:
            print(f"Task failed: {exc}")

    return results

TaskGroup vs gather() key differences:

Featureasyncio.gather()asyncio.TaskGroup
Cancel others on failureNo (with return_exceptions=False)Yes — always
Return mixed resultsYes (with return_exceptions=True)No — raises ExceptionGroup
Exception typeSingle exception or listExceptionGroup
Python version3.7+3.11+
Task trackingMust save tasks manuallytg.create_task() returns Task

If you need partial results with TaskGroup, catch the ExceptionGroup and retrieve results from tasks that succeeded:

async def main():
    tasks = []
    try:
        async with asyncio.TaskGroup() as tg:
            for uid in [1, 2, 3, 4]:
                tasks.append(tg.create_task(fetch_user(uid)))
    except* Exception:
        pass   # Some tasks failed

    # Collect results from tasks that didn't raise
    results = []
    for task in tasks:
        if not task.cancelled() and task.exception() is None:
            results.append(task.result())
    return results

Fix 4: Windows ProactorEventLoop Limitations

On Windows, Python uses ProactorEventLoop by default (since Python 3.8). This loop has known limitations with certain gather() patterns:

# Windows-specific issue: ProactorEventLoop + subprocess in gather()
import asyncio
import sys

async def run_command(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()
    return stdout.decode()

async def main():
    # This may raise NotImplementedError or RuntimeError on Windows
    # because ProactorEventLoop doesn't support some pipe operations in parallel
    results = await asyncio.gather(
        run_command("echo hello"),
        run_command("echo world"),
    )

# Fix: Use SelectorEventLoop on Windows for subprocess tasks
if sys.platform == 'win32':
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

asyncio.run(main())

uvloop (Linux/macOS only) for better performance:

# uvloop replaces the default event loop with a libuv-based implementation
# 2-4x faster for I/O-heavy gather() patterns
# NOT available on Windows

import asyncio

try:
    import uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
    pass   # Fall back to default loop on Windows or if uvloop not installed

asyncio.run(main())

Fix 5: Jupyter Notebook and Nested Event Loop

Jupyter notebooks run their own event loop, so asyncio.run() inside a cell raises RuntimeError: This event loop is already running:

# WRONG in Jupyter
import asyncio

async def main():
    return await asyncio.gather(fetch_user(1), fetch_user(2))

asyncio.run(main())   # RuntimeError: This event loop is already running

# FIX 1: Use top-level await (Jupyter/IPython 7.0+)
results = await asyncio.gather(fetch_user(1), fetch_user(2))

# FIX 2: Use nest_asyncio for compatibility
import nest_asyncio
nest_asyncio.apply()

asyncio.run(main())   # Now works inside Jupyter

# FIX 3: Get the running loop directly
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(fetch_user(1), fetch_user(2)))

Fix 6: Lambda Handler Async Patterns

AWS Lambda supports async handlers in Python, but orphaned tasks from a failed gather() can cause unexpected behavior:

# Lambda handler — gather() with proper cleanup
import asyncio

async def handler(event, context):
    tasks = [
        asyncio.create_task(fetch_data(url))
        for url in event['urls']
    ]

    try:
        results = await asyncio.gather(*tasks, return_exceptions=True)
    except Exception:
        # Cancel all tasks before Lambda freezes the execution environment
        for task in tasks:
            if not task.done():
                task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)
        raise

    # Filter results
    successes = [r for r in results if not isinstance(r, Exception)]
    return {"statusCode": 200, "body": successes}

# Lambda entry point — use a module-level loop for warm start reuse
def lambda_handler(event, context):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(handler(event, context))

Note: If you don’t cancel orphaned tasks, they continue running in the frozen Lambda execution environment. When the same environment is reused for a warm start, those zombie tasks may resume and cause data corruption or unexpected side effects.

Fix 7: Set Timeouts on Concurrent Tasks

Individual tasks should have timeouts to prevent one slow task from blocking the group:

import asyncio

async def fetch_with_timeout(coro, timeout: float):
    """Wrap a coroutine with a timeout."""
    try:
        return await asyncio.wait_for(coro, timeout=timeout)
    except asyncio.TimeoutError:
        raise asyncio.TimeoutError(f"Task timed out after {timeout}s")

async def main():
    results = await asyncio.gather(
        fetch_with_timeout(fetch_user(1), timeout=5.0),
        fetch_with_timeout(fetch_user(2), timeout=5.0),
        fetch_with_timeout(fetch_user(3), timeout=5.0),
        return_exceptions=True,
    )

    for i, result in enumerate(results):
        if isinstance(result, asyncio.TimeoutError):
            print(f"Task {i} timed out")
        elif isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

Global timeout with asyncio.wait_for:

async def main():
    try:
        # All tasks must complete within 10 seconds
        results = await asyncio.wait_for(
            asyncio.gather(
                fetch_user(1),
                fetch_user(2),
                fetch_user(3),
                return_exceptions=True,
            ),
            timeout=10.0
        )
    except asyncio.TimeoutError:
        print("Overall operation timed out")

Fix 8: asyncio.wait for More Control

asyncio.wait() gives more control than gather() — process tasks as they complete:

import asyncio

async def main():
    tasks = {
        asyncio.create_task(fetch_user(uid), name=f"fetch-{uid}")
        for uid in [1, 2, 3, 4, 5]
    }

    # Process tasks as they complete (not in original order)
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_EXCEPTION,   # Stop after first error
        # Options: FIRST_EXCEPTION, FIRST_COMPLETED, ALL_COMPLETED
    )

    # Process completed tasks
    for task in done:
        if task.exception():
            print(f"Task {task.get_name()} failed: {task.exception()}")
        else:
            print(f"Task {task.get_name()} result: {task.result()}")

    # Cancel remaining tasks
    for task in pending:
        task.cancel()

    # Wait for cancellations
    if pending:
        await asyncio.wait(pending)

Process results as they arrive:

async def main():
    tasks = [
        asyncio.create_task(fetch_user(uid))
        for uid in range(1, 11)  # 10 users
    ]

    # Process each task as it completes
    for coro in asyncio.as_completed(tasks):
        try:
            result = await coro
            print(f"Got result: {result}")
        except Exception as e:
            print(f"Task failed: {e}")
            # Other tasks continue running

Fix 9: Common gather() Patterns

Batch processing — limit concurrent tasks:

import asyncio

async def process_in_batches(items, batch_size: int, processor):
    """Process items in batches to limit concurrency."""
    results = []

    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        batch_results = await asyncio.gather(
            *[processor(item) for item in batch],
            return_exceptions=True,
        )
        results.extend(batch_results)

    return results

# Or use a semaphore for fine-grained control
async def limited_gather(coros, max_concurrent: int):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def with_semaphore(coro):
        async with semaphore:
            return await coro

    return await asyncio.gather(
        *[with_semaphore(coro) for coro in coros],
        return_exceptions=True,
    )

# Usage — max 5 concurrent HTTP requests
results = await limited_gather(
    [fetch_url(url) for url in urls],
    max_concurrent=5,
)

Retry individual failed tasks:

import asyncio
from typing import TypeVar, Callable, Awaitable

T = TypeVar('T')

async def retry(
    coro_factory: Callable[[], Awaitable[T]],
    retries: int = 3,
    delay: float = 1.0,
) -> T:
    last_error = None
    for attempt in range(retries):
        try:
            return await coro_factory()
        except asyncio.CancelledError:
            raise   # Never retry cancellation
        except Exception as e:
            last_error = e
            if attempt < retries - 1:
                await asyncio.sleep(delay * (2 ** attempt))   # Exponential backoff
    raise last_error

# Usage with gather
results = await asyncio.gather(
    retry(lambda: fetch_user(1)),
    retry(lambda: fetch_user(2), retries=5),
    retry(lambda: fetch_user(3)),
    return_exceptions=True,
)

Fix 10: Debug asyncio.gather Issues

Identify which tasks failed and why:

import asyncio
import traceback

async def debug_gather(*coros):
    """gather() with detailed error reporting."""
    tasks = [asyncio.create_task(coro) for coro in coros]

    # Add names to tasks for easier debugging
    for i, task in enumerate(tasks):
        task.set_name(f"task-{i}")

    results = await asyncio.gather(*tasks, return_exceptions=True)

    for task, result in zip(tasks, results):
        if isinstance(result, Exception):
            print(f"\nTask '{task.get_name()}' FAILED:")
            traceback.print_exception(type(result), result, result.__traceback__)
        else:
            print(f"Task '{task.get_name()}' succeeded: {result}")

    return results

# Enable asyncio debug mode for more verbose output
asyncio.run(debug_gather(
    fetch_user(1),
    fetch_user(2),
    fetch_user(3),
), debug=True)

Asyncio debug mode catches common mistakes:

# Enable via environment variable
PYTHONASYNCIODEBUG=1 python main.py

# Or in code
import asyncio
asyncio.get_event_loop().set_debug(True)

# Debug mode warns about:
# - Coroutines that were never awaited
# - Slow callbacks (>100ms blocking the event loop)
# - Misuse of thread-unsafe operations

Still Not Working?

BaseException vs Exceptionasyncio.CancelledError is a BaseException, not Exception in Python 3.8+. Using isinstance(result, Exception) to check for failures won’t catch CancelledError. Use isinstance(result, BaseException) or check for CancelledError separately.

Tasks created before gather — tasks created with asyncio.create_task() start immediately, even before gather() is called. If you create_task() and then never await the result (and never call gather()), the task runs independently and exceptions are silently logged as “unhandled exception in task.”

return_exceptions and exception chaining — with return_exceptions=True, exceptions lose their original context (chained exceptions). If you need the full exception chain, use return_exceptions=False with try/except.

Python 3.12 eager_task_factory changes — Python 3.12 introduced asyncio.eager_task_factory, which starts tasks synchronously until their first await. If you set this factory globally, gather() behavior changes subtly: tasks that complete synchronously (without any I/O) return results immediately, but tasks that await something are deferred as usual. This can cause ordering surprises in mixed sync/async gather patterns.

aiohttp / httpx session sharing in gather — if you create a new HTTP session inside each coroutine passed to gather(), you open N connections simultaneously. Share the session across coroutines and use the session’s built-in connection pool limit:

import aiohttp

async def main():
    # WRONG — each coroutine creates its own session
    results = await asyncio.gather(*[fetch_with_new_session(url) for url in urls])

    # CORRECT — share one session
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*[fetch_with_session(session, url) for url in urls])

For related Python issues, see Fix: Python asyncio Blocking the Event Loop, Fix: Python asyncio Not Running, Fix: Python Multiprocessing Not Working, and Fix: Python Decorator Not Working.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles