Skip to content

Fix: Ray Not Working — Cluster Init, Object Store Memory, and Actor Lifecycle Errors

FixDevs ·

Quick Answer

How to fix Ray errors — ray.init connection refused, object store full ObjectStoreFullError, worker died unexpectedly, serialization PickleError for remote function, Ray Tune trials fail, Ray cluster version mismatch, and actor ReferenceError.

The Error

You start a Ray cluster and the client refuses to connect:

ConnectionError: Could not connect to Ray cluster at ray://head:10001
Ray cluster may not be running

Or workers die with object store memory pressure:

ray.exceptions.ObjectStoreFullError: Failed to put object
Workspace is at capacity (50GB / 50GB used)

Or a remote function raises a pickle error you can’t explain:

TypeError: Could not pickle object of type <class '_thread.lock'>

Or you scale up Ray Tune and trials fail one after another:

(TuneError): Error in trial run: RuntimeError: CUDA error: out of memory

Or an Actor suddenly raises a reference error long after you created it:

ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task

Ray is a distributed execution framework — it runs Python functions and stateful Actors across a cluster with a single API. The mental model is simple (@ray.remote + .remote()), but the cluster memory model, object store, and actor lifecycle produce errors that look nothing like normal Python exceptions.

Why This Happens

Ray runs a head node (scheduler, global control store) and worker nodes (execution). Each node has a local object store (Plasma, default 30% of node RAM) that holds inputs and outputs of remote tasks. When you call func.remote(x), Ray serializes x, stores it in the object store, and ships a reference to the worker. Large arguments that don’t fit in the object store cause errors; actors that hold references to GPU memory or large objects compound the problem.

Actor failures cascade — if an actor dies while you hold a handle to it, every future method call raises RayActorError until you recreate the actor.

Fix 1: ray.init() — Cluster Setup Basics

import ray

# Option 1: Start a local cluster (most common for single-machine)
ray.init()
print(ray.cluster_resources())
# {'CPU': 16.0, 'memory': 32000000000, ...}

# Option 2: Connect to an existing cluster
ray.init(address="ray://head-node:10001")

# Option 3: Auto-detect (reads RAY_ADDRESS env var)
ray.init(address="auto")

# Option 4: Shut down before re-initializing
ray.shutdown()
ray.init()

Common connection errors:

ConnectionError: Could not connect to Ray cluster at ray://head:10001

Causes:

  1. Head node not running — check with ray status:
ray status
# Node status
# ---------------------------------------------------------------
# Active:
#  1 node_abc123
# Resources
# ---------------------------------------------------------------
  1. Firewall blocking ports — Ray needs: 6379 (GCS), 10001 (client server), 8265 (dashboard), and random ports for raylets.

  2. Version mismatch between client and cluster:

import ray
print(ray.__version__)   # Must match the cluster's version exactly

Start a head node:

# Start head node with dashboard
ray start --head --port=6379 --dashboard-host=0.0.0.0

# Start worker nodes pointing at the head
ray start --address=head-node:6379

# Stop
ray stop

Context manager pattern for scripts:

import ray

ray.init()
try:
    # Your Ray code
    result = some_remote_function.remote()
    print(ray.get(result))
finally:
    ray.shutdown()

Common Mistake: Calling ray.init() twice in the same process without ray.shutdown() in between. The second call fails silently or raises RuntimeError. If your code might re-init (Jupyter restart, nested calls), use ray.init(ignore_reinit_error=True) or check ray.is_initialized().

Fix 2: Object Store Full — ObjectStoreFullError

ray.exceptions.ObjectStoreFullError: Failed to put object of size 5.0 GB
Workspace has 50 GB total, 48 GB used

Ray’s object store holds all task inputs and outputs. When it fills up, new objects can’t be stored.

Increase the object store size:

import ray

ray.init(object_store_memory=80 * 10**9)   # 80 GB

Default is 30% of node RAM, capped at 200GB. For memory-intensive workloads, set this explicitly.

Check current object store usage:

import ray
print(ray.available_resources())
# Shows 'object_store_memory' current and max

Avoid holding references to large objects:

import ray
import numpy as np

ray.init()

@ray.remote
def compute(arr):
    return arr.sum()

# WRONG — creates 1000 large arrays, all held in object store
refs = [compute.remote(np.random.rand(100_000_000)) for _ in range(1000)]
results = ray.get(refs)

# CORRECT — process in batches, release references as you go
import gc

batch_size = 10
all_results = []
for i in range(0, 1000, batch_size):
    batch_refs = [compute.remote(np.random.rand(100_000_000)) for _ in range(batch_size)]
    batch_results = ray.get(batch_refs)
    all_results.extend(batch_results)
    del batch_refs   # Allow Ray to evict
    gc.collect()

Use ray.put() for large shared data instead of passing it to every call:

import ray
import numpy as np

# WRONG — large array serialized and sent with every call
big_array = np.random.rand(100_000_000)

@ray.remote
def process(chunk_idx, data):
    return data[chunk_idx * 1000: (chunk_idx + 1) * 1000].sum()

results = ray.get([process.remote(i, big_array) for i in range(1000)])

# CORRECT — put once, pass reference
big_array_ref = ray.put(big_array)   # Stored once in object store

@ray.remote
def process(chunk_idx, data_ref):
    data = ray.get(data_ref)   # Retrieved from local store (zero-copy if on same node)
    return data[chunk_idx * 1000: (chunk_idx + 1) * 1000].sum()

results = ray.get([process.remote(i, big_array_ref) for i in range(1000)])

Clear the object store between stages:

# Free specific objects
ray.internal.internal_api.free([ref1, ref2])

# Or let references go out of scope and trigger GC
del some_ref
gc.collect()

Fix 3: Serialization Errors

TypeError: Could not pickle object of type '_thread.lock'
TypeError: cannot pickle 'SSLContext' object

Ray ships tasks and data across processes using pickle. Non-picklable objects (thread locks, database connections, file handles, CUDA contexts at module level) can’t be serialized.

WRONG — non-picklable closure:

import ray
import sqlite3

conn = sqlite3.connect("data.db")   # SQLite connection at module level

@ray.remote
def query(sql):
    return conn.execute(sql).fetchall()   # Uses closure over conn — can't pickle

ray.init()
ray.get(query.remote("SELECT 1"))   # PickleError

CORRECT — create resources inside the remote function:

import ray
import sqlite3

@ray.remote
def query(sql, db_path):
    conn = sqlite3.connect(db_path)   # Created on worker
    try:
        return conn.execute(sql).fetchall()
    finally:
        conn.close()

ray.init()
ray.get(query.remote("SELECT 1", "data.db"))

Or use Actors for persistent state:

import ray
import sqlite3

@ray.remote
class DatabaseActor:
    def __init__(self, db_path):
        self.conn = sqlite3.connect(db_path)

    def query(self, sql):
        return self.conn.execute(sql).fetchall()

    def __del__(self):
        self.conn.close()

ray.init()
db_actor = DatabaseActor.remote("data.db")
result = ray.get(db_actor.query.remote("SELECT 1"))

Pro Tip: Any persistent resource (DB connection, ML model, file handle) belongs in an Actor, not a remote function. Actors hold state across calls — creating the resource in __init__ is paid once per actor. Remote functions create a new process every time; they should be stateless.

Fix 4: GPU Resources and Ray Tune OOM

(TuneError): Error in trial run: RuntimeError: CUDA error: out of memory

Ray Tune runs multiple hyperparameter trials in parallel. Without resource declarations, multiple trials can land on the same GPU, fighting for memory.

Declare per-trial GPU requirements:

from ray import tune
import ray

ray.init()

def train(config):
    # Training code using config["lr"], config["batch_size"]
    pass

tune.run(
    train,
    config={
        "lr": tune.loguniform(1e-5, 1e-1),
        "batch_size": tune.choice([16, 32, 64]),
    },
    resources_per_trial={"cpu": 2, "gpu": 1},   # Each trial gets one full GPU
    num_samples=20,
)

Multi-GPU trials:

tune.run(
    train,
    resources_per_trial={"cpu": 4, "gpu": 2},   # 2 GPUs per trial
    num_samples=10,
)

Fractional GPU — for small models that fit multiple per GPU:

tune.run(
    train,
    resources_per_trial={"cpu": 1, "gpu": 0.5},   # 2 trials per GPU
)

Set CUDA visibility explicitly inside the trial:

import os
import torch

def train(config):
    # Ray Tune sets CUDA_VISIBLE_DEVICES automatically based on resources_per_trial
    print(f"Visible GPUs: {os.environ.get('CUDA_VISIBLE_DEVICES')}")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # ... training code

For PyTorch-specific GPU memory issues and CUDA OOM patterns, see PyTorch not working.

Fix 5: Actor Death and Recovery

ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task

Actors die from: unhandled exceptions, OOM kills, worker node failures, or explicit ray.kill(). Once dead, all future calls on the handle fail.

Detect and recreate:

import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

ray.init()
counter = Counter.remote()

try:
    result = ray.get(counter.increment.remote())
except ray.exceptions.RayActorError:
    print("Actor died, recreating...")
    counter = Counter.remote()
    result = ray.get(counter.increment.remote())
    # WARNING: State is lost — new actor starts from scratch

Add actor max restarts so Ray automatically reanimates:

@ray.remote(max_restarts=3, max_task_retries=2)
class ResilientActor:
    def __init__(self):
        self.value = 0

    def work(self):
        return self.value

max_restarts=-1 restarts indefinitely. max_task_retries retries individual method calls even without actor restart.

Detached actors survive their creator:

# Create a named, detached actor that persists beyond the script
counter = Counter.options(name="global_counter", lifetime="detached").remote()

# In another script or after restart, retrieve by name
counter = ray.get_actor("global_counter")
ray.get(counter.increment.remote())

Detached actors must be explicitly killed:

ray.kill(ray.get_actor("global_counter"))

Fix 6: Scheduling and Placement

import ray

ray.init()

@ray.remote(num_cpus=4, num_gpus=1)
def heavy_task(x):
    # Ray schedules this only on workers with 4 CPUs and 1 GPU available
    return x * 2

# For precise co-location, use placement groups
from ray.util.placement_group import placement_group

# Reserve resources on a specific node
pg = placement_group([{"CPU": 4, "GPU": 1}, {"CPU": 4, "GPU": 1}], strategy="PACK")
ray.get(pg.ready())

# Schedule actors within the placement group
actor1 = Actor.options(placement_group=pg, placement_group_bundle_index=0).remote()
actor2 = Actor.options(placement_group=pg, placement_group_bundle_index=1).remote()

# Release when done
ray.util.remove_placement_group(pg)

Placement strategies:

StrategyBehavior
PACKAll bundles on one node if possible
SPREADDistribute across nodes for fault tolerance
STRICT_PACKAll bundles MUST be on one node (fails if impossible)
STRICT_SPREADEach bundle on a different node

Fix 7: Ray Dashboard and Debugging

The Ray dashboard (default port 8265) shows job progress, node health, actor state, and logs.

import ray

ray.init(dashboard_host="0.0.0.0", dashboard_port=8265)
# Open http://localhost:8265

Enable remote access:

ray start --head --dashboard-host=0.0.0.0
# Now the dashboard is accessible from other machines

Log remote task output:

import ray

@ray.remote
def noisy_task():
    print("This prints to the driver via the dashboard logs")
    import logging
    logging.warning("This also appears in dashboard logs")

ray.init()
ray.get(noisy_task.remote())

Debug timeouts — wait for tasks with timeout:

ready, not_ready = ray.wait(futures, num_returns=1, timeout=10.0)
if ready:
    print(f"One task finished: {ray.get(ready[0])}")
else:
    print("No tasks finished in 10 seconds")

Memory profiling:

pip install memray
python -X importtime my_script.py 2> import_times.txt
memray run my_script.py
memray flamegraph memray-my_script.py.*.bin

For Jupyter notebook integration with Ray and dashboard rendering, see Jupyter not working.

Fix 8: Ray Train for Distributed Training

Ray Train wraps PyTorch/TensorFlow training loops for multi-GPU and multi-node training.

import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

ray.init()

def train_fn(config):
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader, TensorDataset

    # Get the Ray-prepared DistributedSampler automatically
    model = nn.Linear(10, 1)
    model = train.torch.prepare_model(model)   # Wraps with DDP

    dataset = TensorDataset(torch.randn(1000, 10), torch.randn(1000, 1))
    loader = DataLoader(dataset, batch_size=32)
    loader = train.torch.prepare_data_loader(loader)   # Adds DistributedSampler

    optimizer = torch.optim.SGD(model.parameters(), lr=config["lr"])
    for epoch in range(10):
        for batch_x, batch_y in loader:
            optimizer.zero_grad()
            loss = nn.functional.mse_loss(model(batch_x), batch_y)
            loss.backward()
            optimizer.step()

        train.report({"loss": loss.item(), "epoch": epoch})

trainer = TorchTrainer(
    train_fn,
    train_loop_config={"lr": 0.001},
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)
result = trainer.fit()
print(result.metrics)

num_workers vs num_gpus — num_workers is the number of parallel training processes. Each gets one GPU if use_gpu=True.

For PyTorch DDP patterns that Ray Train wraps, see PyTorch not working.

Still Not Working?

Ray vs Dask — When to Use Which

  • Ray — Better for stateful distributed computing, ML-specific features (Tune, Train, RLlib, Serve), actor model. Large ecosystem for model serving and RL.
  • Dask — Better for DataFrame/array workloads, Pythonic API, lighter for data science. Simpler for single-machine parallelism.

For Dask’s lazy evaluation and DataFrame patterns, see Dask not working.

Ray on Kubernetes

Use KubeRay for production deployment:

kubectl apply -f https://raw.githubusercontent.com/ray-project/kuberay/master/ray-operator/config/samples/ray-cluster.complete.yaml

Hyperparameter Tuning Alternatives

For simpler hyperparameter search without Ray’s overhead, consider Optuna. For Optuna-specific patterns and storage, see Optuna not working.

ML Experiment Tracking

Ray Train integrates with MLflow and Weights & Biases for metric logging:

from ray.air.integrations.mlflow import MLflowLoggerCallback

trainer = TorchTrainer(
    train_fn,
    run_config=ray.train.RunConfig(
        callbacks=[MLflowLoggerCallback(experiment_name="my_experiment")]
    ),
)

For MLflow tracking URI configuration that interacts with Ray clusters, see MLflow not working.

Shutting Down Stale Clusters

# If scripts leak Ray processes
ray stop --force   # Kill all local Ray processes

# Clean up session directory
rm -rf /tmp/ray/

Ray accumulates logs in /tmp/ray/. On long-running dev machines, clean this periodically or set RAY_SESSION_DIR to a managed location.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles