Skip to content

Fix: PySpark Not Working — Java Heap Space, Serialization Errors, and OOM Exceptions

FixDevs ·

Quick Answer

How to fix PySpark errors — Java heap space out of memory, PicklingError cannot serialize, py4j gateway exception, Spark session not found, partition skew causing slow shuffle, lazy evaluation confusion, and collect crashing driver.

The Error

You run a Spark job and it dies with Java out-of-memory:

java.lang.OutOfMemoryError: Java heap space
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:750)

Or you pass a Python function to .map() and it crashes with a pickling error:

_pickle.PicklingError: Could not serialize object:
TypeError: cannot pickle '_thread.lock' object

Or the driver crashes when you call .collect():

java.lang.OutOfMemoryError: Java heap space
Driver heap exhausted

Or a simple query takes hours to finish despite a fast cluster:

Stage 5: [===>                   ] 120/500 tasks (40min elapsed)
# One task stuck processing 95% of the data

Or you try to create a SparkSession and nothing works:

Exception: Java gateway process exited before sending its port number
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect
to the Java server

PySpark is the Python API for Apache Spark — a JVM-based distributed computation engine. Every PySpark call crosses the Python-JVM boundary via py4j, which makes certain errors (serialization, GC, JVM heap) look nothing like typical Python errors. This guide covers each category.

Why This Happens

PySpark runs two processes per machine: a driver that orchestrates the job and an executor (actually the JVM) that does the actual work. When you write df.filter(...).show(), the Python DataFrame API translates your code to a physical plan (Scala/Java), which executes in the JVM. Python user-defined functions (UDFs) serialize with pickle, ship to executors, and run in a Python subprocess that communicates with the JVM.

Every memory error is a JVM heap error — not a Python memory error. Every serialization error happens because PySpark tried to pickle Python objects to ship to executors. Every “slow query” is either partition skew, a wide shuffle, or executor memory pressure causing spills to disk.

Fix 1: Java Heap Space — Out of Memory

java.lang.OutOfMemoryError: Java heap space

Spark’s default memory settings are conservative. On a laptop or small VM, you’ll hit heap limits quickly.

Set executor and driver memory at SparkSession creation:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memoryOverhead", "1g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

For local mode (running Spark on your laptop), use local[*]:

spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .getOrCreate()

Set via environment variables (before starting Python):

export PYSPARK_SUBMIT_ARGS="--driver-memory 8g --executor-memory 4g pyspark-shell"
python my_script.py

Memory breakdown by component:

SettingWhat it controls
spark.driver.memoryJVM heap for the driver (plan compilation, collect results)
spark.executor.memoryJVM heap for each executor (actual data processing)
spark.executor.memoryOverheadOff-heap memory for each executor (default: max(384MB, 0.1 * executor.memory))
spark.driver.maxResultSizeMax size of data returned via collect() (default: 1GB)

Common Mistake: Setting spark.executor.memory to the total machine RAM. The JVM needs room for overhead, the OS needs memory, and other executors share the same machine. A practical cap is 75% of total RAM divided by the number of executors per machine.

Fix 2: PicklingError — Can’t Serialize UDF

_pickle.PicklingError: Could not serialize object:
TypeError: cannot pickle '_thread.lock' object

Python UDFs must be picklable — Spark ships them to executors by serializing them. Anything with thread locks, database connections, or file handles at module level can’t be pickled.

# WRONG — DB connection at module level, not picklable
import psycopg2
db = psycopg2.connect("dbname=test")   # Cannot be pickled

def enrich(row):
    result = db.execute("SELECT ...")   # Won't work on executors
    return row

spark_df = spark_df.rdd.map(enrich)   # PicklingError on collect

Fix — create resources inside the UDF, once per partition:

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

# Use mapPartitions so connection is created once per partition, not per row
def enrich_partition(rows):
    import psycopg2
    db = psycopg2.connect("dbname=test")   # Fresh connection per executor
    try:
        for row in rows:
            result = db.execute("SELECT ...")
            yield (row, result)
    finally:
        db.close()

enriched_rdd = spark_df.rdd.mapPartitions(enrich_partition)

Avoid closures that capture large objects:

# WRONG — captures the entire DataFrame
config = {"large_dict": load_10gb_config()}

def udf(x):
    return x * config["large_dict"]["scale"]

spark_df.rdd.map(udf)   # Serializes the 10GB dict to every executor!

# CORRECT — use a broadcast variable
config_bc = spark.sparkContext.broadcast({"scale": 2.5})

def udf(x):
    return x * config_bc.value["scale"]   # Downloaded once per executor

spark_df.rdd.map(udf)

Prefer built-in functions over UDFs — they run in the JVM without Python serialization overhead:

from pyspark.sql.functions import col, upper, when

# SLOW — Python UDF, serialization overhead per row
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(StringType())
def upper_udf(s):
    return s.upper() if s else None

df.withColumn("name_upper", upper_udf(col("name")))

# FAST — JVM-native function, no Python round-trip
df.withColumn("name_upper", upper(col("name")))

Pandas UDFs (vectorized) are the fast alternative when you must use Python:

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd

@pandas_udf(DoubleType())
def normalize(s: pd.Series) -> pd.Series:
    # Receives a whole Pandas Series, not one value at a time
    return (s - s.mean()) / s.std()

df.withColumn("normalized", normalize(col("value")))

Pandas UDFs use Apache Arrow for zero-copy Python↔JVM transfer, often 10–100x faster than row-by-row UDFs.

Fix 3: py4j.protocol.Py4JNetworkError — Gateway Exited

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect
Exception: Java gateway process exited before sending its port number

Python can’t start the JVM subprocess. Common causes:

Cause 1: Java not installed or wrong version.

java -version
# openjdk version "17.0.12" — Spark 3.5+ supports Java 8, 11, 17, 21

# If no Java:
sudo apt install openjdk-17-jdk   # Ubuntu
brew install openjdk@17           # macOS

Cause 2: JAVA_HOME not set.

# Linux/macOS
export JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))

# Or explicitly
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64   # Ubuntu
export JAVA_HOME=$(/usr/libexec/java_home -v 17)      # macOS

# Verify
echo $JAVA_HOME

Cause 3: Spark version incompatible with Java version.

SparkJava
3.0–3.28, 11
3.38, 11, 17
3.4–3.58, 11, 17
4.0+17, 21

Spark 4.0 dropped Java 8 support. If you’re running PySpark 4 with Java 8, you get the gateway error.

Cause 4: PYSPARK_PYTHON points to wrong Python.

export PYSPARK_PYTHON=$(which python)
export PYSPARK_DRIVER_PYTHON=$(which python)

Cause 5: Port already in use.

Spark uses ports 4040 (UI), 7077 (master), and others. If you see “BindException”, kill the previous Spark process:

jps   # List Java processes
kill -9 <PID of SparkSubmit>

Fix 4: Partition Skew — One Task Takes Forever

Stage 5: [===============>         ] 490/500 tasks (45min elapsed)
# 490 tasks finished in 2 minutes, last 10 running for 43 minutes

Partition skew happens when one key has dramatically more data than others. A groupBy on a heavily skewed column causes that key’s partition to do 99% of the work.

Diagnose skew:

from pyspark.sql.functions import count, col

# See partition size distribution
df.groupBy("skewed_column").agg(count("*").alias("n")) \
  .orderBy(col("n").desc()) \
  .show(20)

# Check actual partition sizes
df.rdd.glom().map(len).collect()   # List of per-partition row counts

Fix 1: Salt the skewed key to split it across partitions:

from pyspark.sql.functions import col, concat, lit, rand, floor

# Add a random salt to the skewed key
salted = df.withColumn(
    "user_id_salted",
    concat(col("user_id"), lit("_"), floor(rand() * 100).cast("int"))
)

# Aggregate on salted, then re-aggregate
partial = salted.groupBy("user_id_salted").agg(sum("amount").alias("partial_sum"))
final = partial.withColumn("user_id", split("user_id_salted", "_").getItem(0)) \
               .groupBy("user_id") \
               .agg(sum("partial_sum").alias("total_amount"))

Fix 2: Adaptive Query Execution (Spark 3.0+) automatically handles some skew:

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

Enable these on every SparkSession — they’re free speedups.

Fix 3: Broadcast join instead of shuffle join when one side is small:

from pyspark.sql.functions import broadcast

# Force broadcast join (default threshold is 10MB)
result = large_df.join(broadcast(small_df), "key")

# Or set the threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "104857600")   # 100MB

Fix 5: Lazy Evaluation — Why Your Code Did Nothing

df = spark.read.csv("huge.csv", header=True)
df = df.filter(col("status") == "active")
df = df.groupBy("user_id").agg(count("*").alias("n"))
print("Done!")   # Nothing has actually executed yet

Spark operations are lazy — transformations (filter, groupBy, select) just build a plan. Only actions (show, count, collect, write) trigger execution.

Transformations (lazy):

  • select, filter, where
  • groupBy, agg
  • join
  • withColumn, drop
  • distinct, dropDuplicates
  • orderBy, sort
  • sample

Actions (eager):

  • show(), count(), first(), take(n)
  • collect(), toPandas()
  • write.parquet(...), write.csv(...)
  • foreach(), foreachPartition()

Pro Tip: Write all transformations, call one action at the end. Each intermediate count() or show() re-executes all preceding transformations — a 5-step pipeline with 5 debugging count() calls runs the pipeline 5 times. Use .cache() if you need to materialize intermediate results.

Cache expensive DataFrames that you’ll use multiple times:

from pyspark import StorageLevel

df_clean = df.filter(...).join(...).cache()

# Now multiple actions reuse the cached data
df_clean.count()            # Triggers execution, caches result
df_clean.show()             # Uses cache
df_clean.groupBy(...).agg(...).show()   # Uses cache

df_clean.unpersist()        # Free memory when done

Cache storage levels:

df.cache()                                      # MEMORY_AND_DISK (default)
df.persist(StorageLevel.MEMORY_ONLY)           # Fastest, fails if doesn't fit
df.persist(StorageLevel.MEMORY_AND_DISK)       # Spills to disk if needed
df.persist(StorageLevel.DISK_ONLY)             # Slow but survives OOM
df.persist(StorageLevel.MEMORY_ONLY_SER)       # Serialized, smaller but slower access

Fix 6: collect() Crashes the Driver

all_rows = df.collect()   # Driver OOM if df is large

collect() pulls every row from the cluster to the driver. If the data doesn’t fit in the driver’s memory, the JVM crashes.

Alternatives to collect():

# Sample — small subset for inspection
sample = df.sample(fraction=0.01).collect()

# Take first N rows
rows = df.take(100)

# Show in console without returning
df.show(20, truncate=False)

# Write to disk in parallel
df.write.mode("overwrite").parquet("output/")

# Convert to Pandas (still driver-side — same size limit)
pdf = df.toPandas()

# For large DataFrames, process partition-wise
for partition in df.rdd.glom().toLocalIterator():
    # partition is a list of rows from one executor
    process(partition)

toPandas() with Arrow for fast Python DataFrame conversion:

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
pdf = df.toPandas()   # Now uses Arrow format, 10–100x faster

Still limited by driver memory — only usable for small results.

For Pandas DataFrame operations after toPandas(), see pandas SettingWithCopyWarning.

Fix 7: Shuffle Partitions — Too Many or Too Few

spark.conf.set("spark.sql.shuffle.partitions", "200")   # Default

After a shuffle (groupBy, join, distinct), Spark creates 200 partitions by default. This is wrong for both small and large datasets.

Symptoms of wrong partition count:

  • Too few partitions: Tasks process too much data each → OOM or slow
  • Too many partitions: Task overhead dominates → slow, tiny output files

Rule of thumb: target ~128MB per partition after shuffle.

# For a 100GB dataset, aim for ~800 partitions
spark.conf.set("spark.sql.shuffle.partitions", "800")

# For a 1GB test dataset, reduce to avoid overhead
spark.conf.set("spark.sql.shuffle.partitions", "8")

Coalesce before writing to avoid thousands of tiny output files:

# After aggregation, reduce partition count before writing
result.coalesce(10).write.parquet("output/")

# repartition for larger increases (does a full shuffle)
big_result.repartition(100).write.parquet("output/")

coalesce() vs repartition():

OperationWhenBehavior
coalesce(n)Reducing partitionsCombines existing partitions (no shuffle)
repartition(n)Increasing or evenly redistributingFull shuffle

Fix 8: Reading Data — File Formats and Schemas

Always provide a schema for production — schema inference is slow and may be wrong:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

schema = StructType([
    StructField("user_id", IntegerType(), nullable=False),
    StructField("event", StringType(), nullable=True),
    StructField("timestamp", TimestampType(), nullable=True),
    StructField("amount", IntegerType(), nullable=True),
])

df = spark.read.schema(schema).csv("events.csv", header=True)

Prefer Parquet over CSV for any non-trivial data:

# CSV — full scan, no column pruning, no predicate pushdown
df = spark.read.csv("events.csv", header=True)

# Parquet — columnar, supports pushdown, 10x+ faster
df.write.mode("overwrite").parquet("events.parquet")
df = spark.read.parquet("events.parquet")

# Read specific partitions only (partition pruning)
df = spark.read.parquet("events.parquet") \
          .filter(col("date") == "2025-04-09")   # Pushed down to file scan

Read many files as one DataFrame:

df = spark.read.parquet("/data/events/*.parquet")   # Glob pattern
df = spark.read.parquet("/data/events/")            # Directory — reads all Parquet files

Still Not Working?

Checkpointing for Long Jobs

Long pipelines can lose progress if an executor fails. Checkpoint saves a DataFrame to disk so lineage is truncated:

spark.sparkContext.setCheckpointDir("/checkpoint-dir")

df_heavy = df.join(other_df, "key") \
             .groupBy("category") \
             .agg(...)
df_heavy.checkpoint()   # Materializes to disk, truncates lineage

# Subsequent operations start from the checkpoint
result = df_heavy.filter(...).show()

Spark UI for Diagnostics

Spark’s web UI (default port 4040) shows stage-by-stage progress, task durations, and shuffle metrics. Access during a running job:

http://localhost:4040

The “SQL” tab shows the physical plan for DataFrame operations — invaluable for diagnosing why a query is slow.

PySpark vs Pandas / Polars

For datasets under 100GB on a single machine, Polars is often faster than PySpark. Spark’s overhead (JVM startup, query planning, shuffle) is too much for small data. For Polars-based alternatives, see Polars not working.

Deploying PySpark Jobs

# Submit to a cluster
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --num-executors 10 \
    --executor-memory 4g \
    --driver-memory 4g \
    my_job.py input_path output_path

For scheduling PySpark jobs in data pipelines, see Airflow not working. For integrating with dbt for SQL-based transformations on top of Spark, see dbt not working.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles