Fix: Dask Not Working — Scheduler Errors, Out of Memory, and Delayed Not Computing
Quick Answer
How to fix Dask errors — KilledWorker out of memory, client cannot connect to scheduler, delayed not computing, DataFrame partition size wrong, map_partitions TypeError, diagnostics dashboard not showing, and version mismatch.
The Error
You scale up a computation with Dask and workers start dying:
distributed.scheduler.KilledWorker:
Attempted to run task ... on 3 different workers, but all workers diedOr the Dask Client can’t connect to the scheduler:
OSError: Timed out trying to connect to tcp://localhost:8786 after 30 sOr a @delayed computation never actually runs:
@delayed
def process(x):
return x * 2
result = process(5)
print(result) # Delayed('process-abc123') — not the valueOr Dask DataFrame operations are slow because partitions are too small (or too large):
UserWarning: Sending large graph of size 2.45 MiB.
This may cause some slowdown. Consider scattering data ahead of time.Or .compute() on a large result crashes the client:
MemoryErrorDask parallelizes Python code via task graphs executed by schedulers (threaded, multiprocess, or distributed). The distributed scheduler is where most errors happen — workers die from memory pressure, the client loses connection, or partitions get imbalanced. This guide covers each failure mode.
Why This Happens
Dask is lazy by design — operations build a task graph, and nothing executes until you call .compute() or .persist(). Forgetting this is the most common source of confusion.
The distributed scheduler (via dask.distributed.Client) runs multiple worker processes that communicate via TCP. Workers track memory usage and kill themselves when they hit a threshold to protect the scheduler from cascading failures. Partition size controls both memory per task and parallelism — too small means task overhead dominates, too large means workers OOM.
Fix 1: Lazy Evaluation — Nothing Executes Until .compute()
import dask.dataframe as dd
import dask
df = dd.read_csv("huge.csv")
result = df.groupby("category").sum()
print(result) # Dask DataFrame — not a Pandas DataFrame
# No computation has happened yetThree ways to trigger execution:
# 1. .compute() — returns a regular Pandas DataFrame (brings to driver memory)
pdf = result.compute()
print(pdf.head())
# 2. .persist() — materializes in cluster memory, returns a Dask object
result_persisted = result.persist()
# Now subsequent operations reuse the materialized result
result_persisted.head() # Fast — data is in worker memory
# 3. .visualize() — shows the task graph without executing
result.visualize(filename="graph.png")@dask.delayed for arbitrary Python functions:
import dask
@dask.delayed
def load(filename):
return pd.read_csv(filename)
@dask.delayed
def process(df):
return df.groupby("x").sum()
@dask.delayed
def combine(*results):
return pd.concat(results)
files = ["a.csv", "b.csv", "c.csv"]
loaded = [load(f) for f in files]
processed = [process(df) for df in loaded]
final = combine(*processed)
# Nothing has run yet — just a graph of tasks
result = final.compute() # NOW everything runs in parallelPro Tip: Use dask.delayed when you have existing sequential Python code that you want to parallelize with minimal changes. Use dask.dataframe or dask.array when you’re starting from Pandas/NumPy-like operations. Mixing them works but adds complexity — pick the simpler option for your use case.
Common Mistake: Calling .compute() inside a loop. Each call triggers a separate task graph execution, defeating parallelism:
# WRONG — computes each result serially
results = []
for df in delayed_dfs:
results.append(df.compute()) # Each compute() waits for completion
# CORRECT — compute everything in one graph
results = dask.compute(*delayed_dfs) # Parallel executionFix 2: KilledWorker — Workers Running Out of Memory
distributed.scheduler.KilledWorker: Attempted to run task on 3 different workers,
but all workers died while running it.A worker exceeded its memory limit and was killed (either by the OS or by Dask’s memory manager). After 3 retries on different workers, the scheduler gives up.
Check memory settings:
from dask.distributed import Client
# Limit each worker's memory
client = Client(
n_workers=4,
threads_per_worker=2,
memory_limit="4GB", # Per worker
)Dask’s memory management thresholds:
| Threshold | Fraction of memory_limit | Behavior |
|---|---|---|
target | 0.60 | Start spilling data to disk |
spill | 0.70 | Spill more aggressively |
pause | 0.80 | Stop accepting new tasks |
terminate | 0.95 | Kill the worker |
Configure them:
import dask
dask.config.set({
"distributed.worker.memory.target": 0.60,
"distributed.worker.memory.spill": 0.70,
"distributed.worker.memory.pause": 0.85,
"distributed.worker.memory.terminate": 0.95,
})Fix the actual cause — reduce memory pressure:
# 1. Reduce partition size
df = dd.read_csv("huge.csv", blocksize="64MB") # Smaller chunks
# 2. Use .persist() strategically to avoid re-computing
df_clean = df.dropna().persist() # Materialize once, reuse many times
# 3. Write intermediate results to disk instead of holding in memory
df_transformed.to_parquet("intermediate/", compute=True)
df = dd.read_parquet("intermediate/")
# 4. Use dtypes efficiently
df = df.astype({
"small_int": "int32", # Instead of int64
"category": "category", # Instead of object for repeated strings
})Fix 3: Client Can’t Connect to Scheduler
OSError: Timed out trying to connect to tcp://localhost:8786 after 30 sThe Client is trying to reach a scheduler that isn’t running, or the address is wrong.
Start a local cluster properly:
from dask.distributed import Client, LocalCluster
# Option 1: Simple local cluster (most common)
client = Client() # Starts scheduler and workers locally
print(client.dashboard_link) # http://localhost:8787/status
# Option 2: Explicit LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=2, memory_limit="4GB")
client = Client(cluster)
# Option 3: Connect to an existing scheduler
client = Client("tcp://scheduler-host:8786")
# Always close the client when done
client.close()
cluster.close() # If you created one explicitlyContext manager pattern — recommended:
from dask.distributed import Client, LocalCluster
with LocalCluster(n_workers=4) as cluster, Client(cluster) as client:
df = dd.read_csv("data.csv")
result = df.groupby("x").sum().compute()
# Cluster and client cleaned up automaticallyStart a standalone scheduler (for multi-machine setups):
# On the scheduler machine
dask scheduler
# Scheduler at: tcp://192.168.1.10:8786
# Dashboard at: http://192.168.1.10:8787
# On worker machines
dask worker tcp://192.168.1.10:8786 --nworkers 4 --nthreads 2 --memory-limit 4GB
# On client machine
python
>>> from dask.distributed import Client
>>> client = Client("tcp://192.168.1.10:8786")Firewall blocking the scheduler port (8786) — common in cloud VMs:
# AWS: open security group for port 8786 and 8787 (dashboard)
# GCP: firewall rule for those ports
# SSH tunnel alternative:
ssh -L 8786:localhost:8786 -L 8787:localhost:8787 user@remote-schedulerFix 4: Partition Size — Too Small or Too Large
UserWarning: Sending large graph of size 2.45 MiB. This may cause some slowdown.Rule of thumb: 100MB per partition. Too small → overhead dominates. Too large → OOM risk, poor parallelism.
Check partition sizes:
import dask.dataframe as dd
df = dd.read_csv("data.csv")
print(f"Partitions: {df.npartitions}")
print(f"Total size: {df.memory_usage(deep=True).sum().compute() / 1e6:.1f} MB")
# Per-partition size (approximate)
for i, partition in enumerate(df.partitions):
size = partition.memory_usage(deep=True).sum().compute() / 1e6
print(f"Partition {i}: {size:.1f} MB")Repartition for better balance:
# Target 100MB partitions
df_balanced = df.repartition(partition_size="100MB")
# Or target a specific number of partitions
df_balanced = df.repartition(npartitions=10)Control partition size at read time:
# CSV — control with blocksize
df = dd.read_csv("data.csv", blocksize="64MB") # Each partition ~64MB
# Parquet — one file per partition by default
df = dd.read_parquet("data/")
# Force fewer partitions on small files (by row group size for Parquet)
df = dd.read_parquet("data/", split_row_groups=10)Coalesce small partitions after filtering:
# Filter creates many small partitions
df_filtered = df[df.status == "active"]
# Consolidate before further processing
df_filtered = df_filtered.repartition(partition_size="100MB").persist()Fix 5: map_partitions — Apply Pandas Functions to Partitions
map_partitions applies a function to each partition (as a Pandas DataFrame). It’s more efficient than .apply() for row operations because it works at Pandas level, not individual rows.
import dask.dataframe as dd
import pandas as pd
df = dd.read_csv("data.csv")
# WRONG — row-by-row, very slow
df.apply(lambda row: row.x * 2, axis=1)
# CORRECT — vectorized on each partition
def multiply_x(partition):
partition = partition.copy()
partition["x_doubled"] = partition["x"] * 2
return partition
df_with_doubled = df.map_partitions(multiply_x)Meta parameter for type inference:
# meta tells Dask the output structure — avoids computing a sample
def my_transform(partition):
return partition.assign(new_col=partition.x * 2)
# Without meta, Dask runs the function on an empty DataFrame to infer output
df_new = df.map_partitions(my_transform)
# With meta — faster, more reliable
df_new = df.map_partitions(
my_transform,
meta={"x": "int64", "y": "float64", "new_col": "int64"},
)map_overlap for windowed operations that need context from neighboring partitions:
# Rolling mean — needs data from previous partition
df["rolling_mean"] = df.x.map_overlap(
lambda s: s.rolling(10).mean(),
before=10, # Rows from previous partition
after=0,
)Fix 6: Dask Array — Handling Chunk Sizes
import dask.array as da
import numpy as np
# Create a Dask array from NumPy
arr = da.from_array(np.random.rand(10000, 10000), chunks=(1000, 1000))
# Each chunk is a NumPy array of that size
print(arr.chunks) # ((1000, 1000, ..., 1000), (1000, 1000, ..., 1000))
# Compute chunks in parallel
result = (arr + arr.T).mean(axis=0)
final = result.compute() # Returns numpy arrayChunk size trade-offs (same rule as DataFrame partitions):
- Too small: scheduler overhead dominates
- Too large: workers OOM
- Sweet spot: 100MB–1GB per chunk
# 1D array of 1e9 elements (8GB as float64) — use 100MB chunks
arr = da.zeros(1_000_000_000, chunks=12_500_000) # 100MB per chunk
# 2D — balance both dimensions
arr = da.random.random((50000, 50000), chunks=(5000, 5000)) # 200MB per chunkRechunk for different access patterns:
# Original: row-oriented chunks (good for row operations)
arr = da.random.random((10000, 10000), chunks=(1000, 10000))
# Rechunk for column operations
arr_by_cols = arr.rechunk((10000, 1000))
col_means = arr_by_cols.mean(axis=0).compute()For NumPy-level array operations that underlie Dask arrays, see NumPy not working.
Fix 7: Dask Dashboard and Diagnostics
The Dask dashboard (default port 8787) shows task progress, memory usage, and worker health in real time. It’s the single most useful debugging tool.
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # http://localhost:8787/statusDashboard tabs:
- Status — current task graph and worker utilization
- Workers — memory, CPU, network per worker
- Tasks — timing of recent tasks (find the slow ones)
- Graph — visual representation of the task DAG
- System — host-level metrics
Profile a computation to find bottlenecks:
from dask.distributed import performance_report
with performance_report(filename="dask-report.html"):
result = df.groupby("x").agg({"y": "sum"}).compute()
# Open dask-report.html for an interactive breakdownLog per-task info:
import dask
with dask.config.set({"distributed.worker.log_format": "detailed"}):
result = df.compute()Dashboard not accessible? Some environments need explicit host binding:
client = Client(dashboard_address=":8787") # Bind to all interfaces
# Or
client = Client(host="0.0.0.0", dashboard_address=":8787")For Jupyter environments, the dashboard renders inline:
# Install the Jupyter extension first
# pip install dask-labextension
# Then activate it in JupyterLabFor Jupyter-specific display and port issues, see Jupyter not working.
Fix 8: Version Compatibility
ValueError: Mismatched versions found
Worker: 2024.1.0
Scheduler: 2024.8.0Dask workers and scheduler must run the same version. Mismatches cause protocol errors.
Pin Dask versions explicitly:
# requirements.txt
dask==2024.8.0
distributed==2024.8.0
dask[complete]==2024.8.0For Dask DataFrame, also pin Pandas:
pandas==2.2.0
pyarrow==16.0.0 # Backing Parquet/Arrow supportOn Kubernetes or cloud clusters, ensure the same Docker image everywhere:
FROM python:3.12-slim
RUN pip install --no-cache-dir \
dask[complete]==2024.8.0 \
distributed==2024.8.0 \
pandas==2.2.0 \
pyarrow==16.0.0
WORKDIR /appUse this image for both scheduler and workers — never mix tagged versions.
Still Not Working?
Dask vs PySpark — When to Use Which
- Dask — Pythonic API, integrates with NumPy/Pandas/scikit-learn, lightweight for single-machine or small clusters. Best when your workflow is already Python-heavy.
- PySpark — Mature, larger ecosystem, better for multi-TB data, SQL-first. Has JVM overhead.
For PySpark-specific patterns and why it differs from Dask, see PySpark not working.
Dask vs Polars for Data Processing
For single-machine DataFrame operations under 100GB, Polars is often faster than Dask DataFrame due to less overhead. Use Dask when you need multi-machine parallelism or are building pipelines that mix DataFrames, arrays, and arbitrary Python code. For Polars-specific patterns, see Polars not working.
Integrating Dask with Machine Learning
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LogisticRegression
X_train, X_test, y_train, y_test = train_test_split(X, y)
model = LogisticRegression()
model.fit(X_train, y_train) # Trains in parallel on Dask DataFrame/ArrayFor scikit-learn patterns that inspire Dask-ML’s API, see scikit-learn not working.
Adaptive Scaling on Cloud
from dask.distributed import Client
from dask_cloudprovider.aws import FargateCluster
cluster = FargateCluster(
image="daskdev/dask:latest",
n_workers=5,
)
cluster.adapt(minimum=2, maximum=20) # Auto-scale based on load
client = Client(cluster)Cleaning Up After Failed Jobs
# Restart the cluster if workers are in a bad state
client.restart()
# Close all workers and scheduler
client.shutdown()
# Clear memory on all workers
client.run(lambda: __import__("gc").collect())Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Jupyter Notebook Not Working — Kernel Dead, Module Not Found, and Widget Errors
How to fix Jupyter errors — kernel fails to start or dies, ModuleNotFoundError despite pip install, matplotlib plots not showing, ipywidgets not rendering in JupyterLab, port already in use, and jupyter command not found.
Fix: LightGBM Not Working — Installation Errors, Categorical Features, and Training Issues
How to fix LightGBM errors — ImportError libomp libgomp not found, do not support special JSON characters in feature name, categorical feature index out of range, num_leaves vs max_depth overfitting, early stopping callback changes, and GPU build errors.
Fix: Matplotlib Not Working — Plots Not Showing, Blank Output, and Figure Layout Problems
How to fix Matplotlib errors — plot not displaying, blank figure, RuntimeError main thread not in main loop, tight_layout UserWarning, overlapping subplots, savefig saving blank image, backend errors, and figure/axes confusion.
Fix: NumPy Not Working — Broadcasting Error, dtype Mismatch, and Array Shape Problems
How to fix NumPy errors — ValueError operands could not be broadcast together, setting an array element with a sequence, integer overflow, axis confusion, view vs copy bugs, NaN handling, and NumPy 1.24+ removed type aliases.