Skip to content

Fix: Ray Not Working — Cluster Init, Object Store Memory, and Actor Lifecycle Errors

FixDevs · (Updated: )

Part of:  Python Errors

Quick Answer

How to fix Ray errors — ray.init connection refused, object store full ObjectStoreFullError, worker died unexpectedly, serialization PickleError for remote function, Ray Tune trials fail, Ray cluster version mismatch, and actor ReferenceError.

The Error

You start a Ray cluster and the client refuses to connect:

ConnectionError: Could not connect to Ray cluster at ray://head:10001
Ray cluster may not be running

Or workers die with object store memory pressure:

ray.exceptions.ObjectStoreFullError: Failed to put object
Workspace is at capacity (50GB / 50GB used)

Or a remote function raises a pickle error you can’t explain:

TypeError: Could not pickle object of type <class '_thread.lock'>

Or you scale up Ray Tune and trials fail one after another:

(TuneError): Error in trial run: RuntimeError: CUDA error: out of memory

Or an Actor suddenly raises a reference error long after you created it:

ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task

Ray is a distributed execution framework — it runs Python functions and stateful Actors across a cluster with a single API. The mental model is simple (@ray.remote + .remote()), but the cluster memory model, object store, and actor lifecycle produce errors that look nothing like normal Python exceptions.

Why This Happens

Ray runs a head node (scheduler, global control store) and worker nodes (execution). Each node has a local object store (Plasma, default 30% of node RAM) that holds inputs and outputs of remote tasks. When you call func.remote(x), Ray serializes x, stores it in the object store, and ships a reference to the worker. Large arguments that don’t fit in the object store cause errors; actors that hold references to GPU memory or large objects compound the problem.

Actor failures cascade — if an actor dies while you hold a handle to it, every future method call raises RayActorError until you recreate the actor.

Diagnostic Timeline

When Ray jobs fail or hang, the reflex is “restart the cluster.” That works rarely and wastes 10 minutes every time you try it. Walk this timeline first.

Minute 0 — Wrong first instinct. You run ray stop && ray start --head, resubmit the job, and watch it fail the same way. A cluster restart only resolves wedged Raylets or stale GCS state. It does not fix object spilling, actor placement, or resource heterogeneity — which is where most production Ray bugs live.

Minute 1 — Discriminating evidence. Open the dashboard at http://<head>:8265 and check three things before anything else: object store usage per node, the placement of your failing actor, and the spill log. The dashboard surfaces all three. If object store is at 90%+ on one node and 10% on others, you have skew. If your actor is pinned to a node that does not have the resources you declared (e.g., num_gpus=1 but the node has no GPU), it will sit waiting forever and never error.

Minute 2 — Next check. Run ray status from the head node. The “Demands” line shows what scheduling Ray is trying to satisfy but cannot. A line like Demands: {'GPU': 1.0}: 12+ pending means 12 tasks are queued for a GPU that no node has — usually a resource heterogeneity bug. Run ray.cluster_resources() and compare against what your @ray.remote(num_gpus=1, num_cpus=4) decorators ask for. Mismatches between requested and available resources cause silent hangs, not errors.

Minute 3 — Actual root cause. Three causes account for most “Ray is broken” tickets that survive a restart:

  1. Object store spilling. When the object store fills, Ray spills to disk by default — turning a 5-second task into a 60-second one as the worker waits for the spill round trip. Symptom is slow throughput with no errors. Fix is either to size the store properly with object_store_memory, to release references aggressively with del, or to use ray.put() for shared data so the store holds one copy instead of N. Fix 2 covers this.
  2. Resource heterogeneity per node. A cluster mixing CPU-only and GPU nodes appears uniform from ray.init() but tasks tagged num_gpus=1 only schedule on the GPU subset. If you scale the cluster down to CPU-only, those tasks queue forever. Always inspect ray.cluster_resources() after autoscaling events and assert before launching tasks. Fix 4 and Fix 6 cover this.
  3. Actor placement on the wrong node. Without a placement_group, Ray schedules actors greedily — the first one lands on whatever node has resources, even if you wanted the second actor on the same node for data locality. Symptom is unexpected cross-node traffic and degraded throughput. Use placement_group([{...}, {...}], strategy="PACK") to pin co-location. Fix 6 covers this.

If none of these fit, then restart. By then you actually know which subsystem you are restarting.

Fix 1: ray.init() — Cluster Setup Basics

import ray

# Option 1: Start a local cluster (most common for single-machine)
ray.init()
print(ray.cluster_resources())
# {'CPU': 16.0, 'memory': 32000000000, ...}

# Option 2: Connect to an existing cluster
ray.init(address="ray://head-node:10001")

# Option 3: Auto-detect (reads RAY_ADDRESS env var)
ray.init(address="auto")

# Option 4: Shut down before re-initializing
ray.shutdown()
ray.init()

Common connection errors:

ConnectionError: Could not connect to Ray cluster at ray://head:10001

Causes:

  1. Head node not running — check with ray status:
ray status
# Node status
# ---------------------------------------------------------------
# Active:
#  1 node_abc123
# Resources
# ---------------------------------------------------------------
  1. Firewall blocking ports — Ray needs: 6379 (GCS), 10001 (client server), 8265 (dashboard), and random ports for raylets.

  2. Version mismatch between client and cluster:

import ray
print(ray.__version__)   # Must match the cluster's version exactly

Start a head node:

# Start head node with dashboard
ray start --head --port=6379 --dashboard-host=0.0.0.0

# Start worker nodes pointing at the head
ray start --address=head-node:6379

# Stop
ray stop

Context manager pattern for scripts:

import ray

ray.init()
try:
    # Your Ray code
    result = some_remote_function.remote()
    print(ray.get(result))
finally:
    ray.shutdown()

Common Mistake: Calling ray.init() twice in the same process without ray.shutdown() in between. The second call fails silently or raises RuntimeError. If your code might re-init (Jupyter restart, nested calls), use ray.init(ignore_reinit_error=True) or check ray.is_initialized().

Fix 2: Object Store Full — ObjectStoreFullError

ray.exceptions.ObjectStoreFullError: Failed to put object of size 5.0 GB
Workspace has 50 GB total, 48 GB used

Ray’s object store holds all task inputs and outputs. When it fills up, new objects can’t be stored.

Increase the object store size:

import ray

ray.init(object_store_memory=80 * 10**9)   # 80 GB

Default is 30% of node RAM, capped at 200GB. For memory-intensive workloads, set this explicitly.

Check current object store usage:

import ray
print(ray.available_resources())
# Shows 'object_store_memory' current and max

Avoid holding references to large objects:

import ray
import numpy as np

ray.init()

@ray.remote
def compute(arr):
    return arr.sum()

# WRONG — creates 1000 large arrays, all held in object store
refs = [compute.remote(np.random.rand(100_000_000)) for _ in range(1000)]
results = ray.get(refs)

# CORRECT — process in batches, release references as you go
import gc

batch_size = 10
all_results = []
for i in range(0, 1000, batch_size):
    batch_refs = [compute.remote(np.random.rand(100_000_000)) for _ in range(batch_size)]
    batch_results = ray.get(batch_refs)
    all_results.extend(batch_results)
    del batch_refs   # Allow Ray to evict
    gc.collect()

Use ray.put() for large shared data instead of passing it to every call:

import ray
import numpy as np

# WRONG — large array serialized and sent with every call
big_array = np.random.rand(100_000_000)

@ray.remote
def process(chunk_idx, data):
    return data[chunk_idx * 1000: (chunk_idx + 1) * 1000].sum()

results = ray.get([process.remote(i, big_array) for i in range(1000)])

# CORRECT — put once, pass reference
big_array_ref = ray.put(big_array)   # Stored once in object store

@ray.remote
def process(chunk_idx, data_ref):
    data = ray.get(data_ref)   # Retrieved from local store (zero-copy if on same node)
    return data[chunk_idx * 1000: (chunk_idx + 1) * 1000].sum()

results = ray.get([process.remote(i, big_array_ref) for i in range(1000)])

Clear the object store between stages:

# Free specific objects
ray.internal.internal_api.free([ref1, ref2])

# Or let references go out of scope and trigger GC
del some_ref
gc.collect()

Fix 3: Serialization Errors

TypeError: Could not pickle object of type '_thread.lock'
TypeError: cannot pickle 'SSLContext' object

Ray ships tasks and data across processes using pickle. Non-picklable objects (thread locks, database connections, file handles, CUDA contexts at module level) can’t be serialized.

WRONG — non-picklable closure:

import ray
import sqlite3

conn = sqlite3.connect("data.db")   # SQLite connection at module level

@ray.remote
def query(sql):
    return conn.execute(sql).fetchall()   # Uses closure over conn — can't pickle

ray.init()
ray.get(query.remote("SELECT 1"))   # PickleError

CORRECT — create resources inside the remote function:

import ray
import sqlite3

@ray.remote
def query(sql, db_path):
    conn = sqlite3.connect(db_path)   # Created on worker
    try:
        return conn.execute(sql).fetchall()
    finally:
        conn.close()

ray.init()
ray.get(query.remote("SELECT 1", "data.db"))

Or use Actors for persistent state:

import ray
import sqlite3

@ray.remote
class DatabaseActor:
    def __init__(self, db_path):
        self.conn = sqlite3.connect(db_path)

    def query(self, sql):
        return self.conn.execute(sql).fetchall()

    def __del__(self):
        self.conn.close()

ray.init()
db_actor = DatabaseActor.remote("data.db")
result = ray.get(db_actor.query.remote("SELECT 1"))

Pro Tip: Any persistent resource (DB connection, ML model, file handle) belongs in an Actor, not a remote function. Actors hold state across calls — creating the resource in __init__ is paid once per actor. Remote functions create a new process every time; they should be stateless.

Fix 4: GPU Resources and Ray Tune OOM

(TuneError): Error in trial run: RuntimeError: CUDA error: out of memory

Ray Tune runs multiple hyperparameter trials in parallel. Without resource declarations, multiple trials can land on the same GPU, fighting for memory.

Declare per-trial GPU requirements:

from ray import tune
import ray

ray.init()

def train(config):
    # Training code using config["lr"], config["batch_size"]
    pass

tune.run(
    train,
    config={
        "lr": tune.loguniform(1e-5, 1e-1),
        "batch_size": tune.choice([16, 32, 64]),
    },
    resources_per_trial={"cpu": 2, "gpu": 1},   # Each trial gets one full GPU
    num_samples=20,
)

Multi-GPU trials:

tune.run(
    train,
    resources_per_trial={"cpu": 4, "gpu": 2},   # 2 GPUs per trial
    num_samples=10,
)

Fractional GPU — for small models that fit multiple per GPU:

tune.run(
    train,
    resources_per_trial={"cpu": 1, "gpu": 0.5},   # 2 trials per GPU
)

Set CUDA visibility explicitly inside the trial:

import os
import torch

def train(config):
    # Ray Tune sets CUDA_VISIBLE_DEVICES automatically based on resources_per_trial
    print(f"Visible GPUs: {os.environ.get('CUDA_VISIBLE_DEVICES')}")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # ... training code

When trials run out of memory simultaneously, the most common upstream cause is a global PyTorch import in the driver that loads CUDA into every trial worker before they declare their per-trial GPU fraction. Move heavy imports inside the trial function to avoid this.

Fix 5: Actor Death and Recovery

ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task

Actors die from: unhandled exceptions, OOM kills, worker node failures, or explicit ray.kill(). Once dead, all future calls on the handle fail.

Detect and recreate:

import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

ray.init()
counter = Counter.remote()

try:
    result = ray.get(counter.increment.remote())
except ray.exceptions.RayActorError:
    print("Actor died, recreating...")
    counter = Counter.remote()
    result = ray.get(counter.increment.remote())
    # WARNING: State is lost — new actor starts from scratch

Add actor max restarts so Ray automatically reanimates:

@ray.remote(max_restarts=3, max_task_retries=2)
class ResilientActor:
    def __init__(self):
        self.value = 0

    def work(self):
        return self.value

max_restarts=-1 restarts indefinitely. max_task_retries retries individual method calls even without actor restart.

Detached actors survive their creator:

# Create a named, detached actor that persists beyond the script
counter = Counter.options(name="global_counter", lifetime="detached").remote()

# In another script or after restart, retrieve by name
counter = ray.get_actor("global_counter")
ray.get(counter.increment.remote())

Detached actors must be explicitly killed:

ray.kill(ray.get_actor("global_counter"))

Fix 6: Scheduling and Placement

import ray

ray.init()

@ray.remote(num_cpus=4, num_gpus=1)
def heavy_task(x):
    # Ray schedules this only on workers with 4 CPUs and 1 GPU available
    return x * 2

# For precise co-location, use placement groups
from ray.util.placement_group import placement_group

# Reserve resources on a specific node
pg = placement_group([{"CPU": 4, "GPU": 1}, {"CPU": 4, "GPU": 1}], strategy="PACK")
ray.get(pg.ready())

# Schedule actors within the placement group
actor1 = Actor.options(placement_group=pg, placement_group_bundle_index=0).remote()
actor2 = Actor.options(placement_group=pg, placement_group_bundle_index=1).remote()

# Release when done
ray.util.remove_placement_group(pg)

Placement strategies:

StrategyBehavior
PACKAll bundles on one node if possible
SPREADDistribute across nodes for fault tolerance
STRICT_PACKAll bundles MUST be on one node (fails if impossible)
STRICT_SPREADEach bundle on a different node

Fix 7: Ray Dashboard and Debugging

The Ray dashboard (default port 8265) shows job progress, node health, actor state, and logs.

import ray

ray.init(dashboard_host="0.0.0.0", dashboard_port=8265)
# Open http://localhost:8265

Enable remote access:

ray start --head --dashboard-host=0.0.0.0
# Now the dashboard is accessible from other machines

Log remote task output:

import ray

@ray.remote
def noisy_task():
    print("This prints to the driver via the dashboard logs")
    import logging
    logging.warning("This also appears in dashboard logs")

ray.init()
ray.get(noisy_task.remote())

Debug timeouts — wait for tasks with timeout:

ready, not_ready = ray.wait(futures, num_returns=1, timeout=10.0)
if ready:
    print(f"One task finished: {ray.get(ready[0])}")
else:
    print("No tasks finished in 10 seconds")

Memory profiling:

pip install memray
python -X importtime my_script.py 2> import_times.txt
memray run my_script.py
memray flamegraph memray-my_script.py.*.bin

When running Ray inside a Jupyter notebook, set ray.init(ignore_reinit_error=True) because cell re-runs trigger reinit errors that look like cluster failures but are not. The dashboard renders fine in the notebook output cell when you set dashboard_host="0.0.0.0".

Fix 8: Ray Train for Distributed Training

Ray Train wraps PyTorch/TensorFlow training loops for multi-GPU and multi-node training.

import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

ray.init()

def train_fn(config):
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader, TensorDataset

    # Get the Ray-prepared DistributedSampler automatically
    model = nn.Linear(10, 1)
    model = train.torch.prepare_model(model)   # Wraps with DDP

    dataset = TensorDataset(torch.randn(1000, 10), torch.randn(1000, 1))
    loader = DataLoader(dataset, batch_size=32)
    loader = train.torch.prepare_data_loader(loader)   # Adds DistributedSampler

    optimizer = torch.optim.SGD(model.parameters(), lr=config["lr"])
    for epoch in range(10):
        for batch_x, batch_y in loader:
            optimizer.zero_grad()
            loss = nn.functional.mse_loss(model(batch_x), batch_y)
            loss.backward()
            optimizer.step()

        train.report({"loss": loss.item(), "epoch": epoch})

trainer = TorchTrainer(
    train_fn,
    train_loop_config={"lr": 0.001},
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)
result = trainer.fit()
print(result.metrics)

num_workers vs num_gpus — num_workers is the number of parallel training processes. Each gets one GPU if use_gpu=True.

For PyTorch DDP patterns that Ray Train wraps, see PyTorch not working.

Still Not Working?

Ray vs Dask — When to Use Which

  • Ray — Better for stateful distributed computing, ML-specific features (Tune, Train, RLlib, Serve), actor model. Large ecosystem for model serving and RL.
  • Dask — Better for DataFrame/array workloads, Pythonic API, lighter for data science. Simpler for single-machine parallelism.

For Dask’s lazy evaluation and DataFrame patterns, see Dask not working.

Ray on Kubernetes

Use KubeRay for production deployment:

kubectl apply -f https://raw.githubusercontent.com/ray-project/kuberay/master/ray-operator/config/samples/ray-cluster.complete.yaml

Hyperparameter Tuning Alternatives

For simpler hyperparameter search without Ray’s overhead, consider Optuna. For Optuna-specific patterns and storage, see Optuna not working.

ML Experiment Tracking

Ray Train integrates with MLflow and Weights & Biases for metric logging:

from ray.air.integrations.mlflow import MLflowLoggerCallback

trainer = TorchTrainer(
    train_fn,
    run_config=ray.train.RunConfig(
        callbacks=[MLflowLoggerCallback(experiment_name="my_experiment")]
    ),
)

For MLflow tracking URI configuration that interacts with Ray clusters, see MLflow not working.

Shutting Down Stale Clusters

# If scripts leak Ray processes
ray stop --force   # Kill all local Ray processes

# Clean up session directory
rm -rf /tmp/ray/

Ray accumulates logs in /tmp/ray/. On long-running dev machines, clean this periodically or set RAY_SESSION_DIR to a managed location.

Object Spilling Silently Slows Throughput

Your throughput drops 5x after 10 minutes of running and never recovers. Almost always object store spilling — once the in-memory store fills, Ray starts paging objects to disk and every subsequent ray.get() waits for I/O. Watch vllm:gpu_cache_usage_perc-equivalent metrics (object_store_memory in ray.available_resources()) and either size the store larger via ray.init(object_store_memory=...) or del references aggressively after consuming results.

Cluster Autoscaling Removes Nodes Holding Active Actors

KubeRay or the Ray autoscaler scales nodes down based on CPU utilization. If your actor is idle (waiting for input) but pinned to that node, the autoscaler may evict the node and your actor dies with RayActorError. Solutions: set min_replicas so the autoscaler cannot evict; mark actors as detached so they survive node moves; or design actors to be cheap to recreate and use max_restarts=-1.

Ray and Numpy Pickle Versions Mismatch Across Driver and Worker

Driver runs Python 3.11 with numpy 1.26; workers run Python 3.10 with numpy 1.24. A serialized np.ndarray may decode but with subtly wrong dtypes. Symptom is mysterious downstream errors hours into a long job. Always pin Python and numpy versions identically across the cluster in requirements.txt and assert with ray.get(ray.remote(lambda: (sys.version, np.__version__)).remote()) at startup.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles