Fix: PySpark Not Working — Java Heap Space, Serialization Errors, and OOM Exceptions
Quick Answer
How to fix PySpark errors — Java heap space out of memory, PicklingError cannot serialize, py4j gateway exception, Spark session not found, partition skew causing slow shuffle, lazy evaluation confusion, and collect crashing driver.
The Error
You run a Spark job and it dies with Java out-of-memory:
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:750)Or you pass a Python function to .map() and it crashes with a pickling error:
_pickle.PicklingError: Could not serialize object:
TypeError: cannot pickle '_thread.lock' objectOr the driver crashes when you call .collect():
java.lang.OutOfMemoryError: Java heap space
Driver heap exhaustedOr a simple query takes hours to finish despite a fast cluster:
Stage 5: [===> ] 120/500 tasks (40min elapsed)
# One task stuck processing 95% of the dataOr you try to create a SparkSession and nothing works:
Exception: Java gateway process exited before sending its port number
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect
to the Java serverPySpark is the Python API for Apache Spark — a JVM-based distributed computation engine. Every PySpark call crosses the Python-JVM boundary via py4j, which makes certain errors (serialization, GC, JVM heap) look nothing like typical Python errors. This guide covers each category.
Why This Happens
PySpark runs two processes per machine: a driver that orchestrates the job and an executor (actually the JVM) that does the actual work. When you write df.filter(...).show(), the Python DataFrame API translates your code to a physical plan (Scala/Java), which executes in the JVM. Python user-defined functions (UDFs) serialize with pickle, ship to executors, and run in a Python subprocess that communicates with the JVM.
Every memory error is a JVM heap error — not a Python memory error. Every serialization error happens because PySpark tried to pickle Python objects to ship to executors. Every “slow query” is either partition skew, a wide shuffle, or executor memory pressure causing spills to disk.
Fix 1: Java Heap Space — Out of Memory
java.lang.OutOfMemoryError: Java heap spaceSpark’s default memory settings are conservative. On a laptop or small VM, you’ll hit heap limits quickly.
Set executor and driver memory at SparkSession creation:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memoryOverhead", "1g") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()For local mode (running Spark on your laptop), use local[*]:
spark = SparkSession.builder \
.master("local[*]") \
.config("spark.driver.memory", "8g") \
.config("spark.driver.maxResultSize", "4g") \
.getOrCreate()Set via environment variables (before starting Python):
export PYSPARK_SUBMIT_ARGS="--driver-memory 8g --executor-memory 4g pyspark-shell"
python my_script.pyMemory breakdown by component:
| Setting | What it controls |
|---|---|
spark.driver.memory | JVM heap for the driver (plan compilation, collect results) |
spark.executor.memory | JVM heap for each executor (actual data processing) |
spark.executor.memoryOverhead | Off-heap memory for each executor (default: max(384MB, 0.1 * executor.memory)) |
spark.driver.maxResultSize | Max size of data returned via collect() (default: 1GB) |
Common Mistake: Setting spark.executor.memory to the total machine RAM. The JVM needs room for overhead, the OS needs memory, and other executors share the same machine. A practical cap is 75% of total RAM divided by the number of executors per machine.
Fix 2: PicklingError — Can’t Serialize UDF
_pickle.PicklingError: Could not serialize object:
TypeError: cannot pickle '_thread.lock' objectPython UDFs must be picklable — Spark ships them to executors by serializing them. Anything with thread locks, database connections, or file handles at module level can’t be pickled.
# WRONG — DB connection at module level, not picklable
import psycopg2
db = psycopg2.connect("dbname=test") # Cannot be pickled
def enrich(row):
result = db.execute("SELECT ...") # Won't work on executors
return row
spark_df = spark_df.rdd.map(enrich) # PicklingError on collectFix — create resources inside the UDF, once per partition:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
# Use mapPartitions so connection is created once per partition, not per row
def enrich_partition(rows):
import psycopg2
db = psycopg2.connect("dbname=test") # Fresh connection per executor
try:
for row in rows:
result = db.execute("SELECT ...")
yield (row, result)
finally:
db.close()
enriched_rdd = spark_df.rdd.mapPartitions(enrich_partition)Avoid closures that capture large objects:
# WRONG — captures the entire DataFrame
config = {"large_dict": load_10gb_config()}
def udf(x):
return x * config["large_dict"]["scale"]
spark_df.rdd.map(udf) # Serializes the 10GB dict to every executor!
# CORRECT — use a broadcast variable
config_bc = spark.sparkContext.broadcast({"scale": 2.5})
def udf(x):
return x * config_bc.value["scale"] # Downloaded once per executor
spark_df.rdd.map(udf)Prefer built-in functions over UDFs — they run in the JVM without Python serialization overhead:
from pyspark.sql.functions import col, upper, when
# SLOW — Python UDF, serialization overhead per row
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
@udf(StringType())
def upper_udf(s):
return s.upper() if s else None
df.withColumn("name_upper", upper_udf(col("name")))
# FAST — JVM-native function, no Python round-trip
df.withColumn("name_upper", upper(col("name")))Pandas UDFs (vectorized) are the fast alternative when you must use Python:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
@pandas_udf(DoubleType())
def normalize(s: pd.Series) -> pd.Series:
# Receives a whole Pandas Series, not one value at a time
return (s - s.mean()) / s.std()
df.withColumn("normalized", normalize(col("value")))Pandas UDFs use Apache Arrow for zero-copy Python↔JVM transfer, often 10–100x faster than row-by-row UDFs.
Fix 3: py4j.protocol.Py4JNetworkError — Gateway Exited
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect
Exception: Java gateway process exited before sending its port numberPython can’t start the JVM subprocess. Common causes:
Cause 1: Java not installed or wrong version.
java -version
# openjdk version "17.0.12" — Spark 3.5+ supports Java 8, 11, 17, 21
# If no Java:
sudo apt install openjdk-17-jdk # Ubuntu
brew install openjdk@17 # macOSCause 2: JAVA_HOME not set.
# Linux/macOS
export JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))
# Or explicitly
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 # Ubuntu
export JAVA_HOME=$(/usr/libexec/java_home -v 17) # macOS
# Verify
echo $JAVA_HOMECause 3: Spark version incompatible with Java version.
| Spark | Java |
|---|---|
| 3.0–3.2 | 8, 11 |
| 3.3 | 8, 11, 17 |
| 3.4–3.5 | 8, 11, 17 |
| 4.0+ | 17, 21 |
Spark 4.0 dropped Java 8 support. If you’re running PySpark 4 with Java 8, you get the gateway error.
Cause 4: PYSPARK_PYTHON points to wrong Python.
export PYSPARK_PYTHON=$(which python)
export PYSPARK_DRIVER_PYTHON=$(which python)Cause 5: Port already in use.
Spark uses ports 4040 (UI), 7077 (master), and others. If you see “BindException”, kill the previous Spark process:
jps # List Java processes
kill -9 <PID of SparkSubmit>Fix 4: Partition Skew — One Task Takes Forever
Stage 5: [===============> ] 490/500 tasks (45min elapsed)
# 490 tasks finished in 2 minutes, last 10 running for 43 minutesPartition skew happens when one key has dramatically more data than others. A groupBy on a heavily skewed column causes that key’s partition to do 99% of the work.
Diagnose skew:
from pyspark.sql.functions import count, col
# See partition size distribution
df.groupBy("skewed_column").agg(count("*").alias("n")) \
.orderBy(col("n").desc()) \
.show(20)
# Check actual partition sizes
df.rdd.glom().map(len).collect() # List of per-partition row countsFix 1: Salt the skewed key to split it across partitions:
from pyspark.sql.functions import col, concat, lit, rand, floor
# Add a random salt to the skewed key
salted = df.withColumn(
"user_id_salted",
concat(col("user_id"), lit("_"), floor(rand() * 100).cast("int"))
)
# Aggregate on salted, then re-aggregate
partial = salted.groupBy("user_id_salted").agg(sum("amount").alias("partial_sum"))
final = partial.withColumn("user_id", split("user_id_salted", "_").getItem(0)) \
.groupBy("user_id") \
.agg(sum("partial_sum").alias("total_amount"))Fix 2: Adaptive Query Execution (Spark 3.0+) automatically handles some skew:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")Enable these on every SparkSession — they’re free speedups.
Fix 3: Broadcast join instead of shuffle join when one side is small:
from pyspark.sql.functions import broadcast
# Force broadcast join (default threshold is 10MB)
result = large_df.join(broadcast(small_df), "key")
# Or set the threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "104857600") # 100MBFix 5: Lazy Evaluation — Why Your Code Did Nothing
df = spark.read.csv("huge.csv", header=True)
df = df.filter(col("status") == "active")
df = df.groupBy("user_id").agg(count("*").alias("n"))
print("Done!") # Nothing has actually executed yetSpark operations are lazy — transformations (filter, groupBy, select) just build a plan. Only actions (show, count, collect, write) trigger execution.
Transformations (lazy):
select,filter,wheregroupBy,aggjoinwithColumn,dropdistinct,dropDuplicatesorderBy,sortsample
Actions (eager):
show(),count(),first(),take(n)collect(),toPandas()write.parquet(...),write.csv(...)foreach(),foreachPartition()
Pro Tip: Write all transformations, call one action at the end. Each intermediate count() or show() re-executes all preceding transformations — a 5-step pipeline with 5 debugging count() calls runs the pipeline 5 times. Use .cache() if you need to materialize intermediate results.
Cache expensive DataFrames that you’ll use multiple times:
from pyspark import StorageLevel
df_clean = df.filter(...).join(...).cache()
# Now multiple actions reuse the cached data
df_clean.count() # Triggers execution, caches result
df_clean.show() # Uses cache
df_clean.groupBy(...).agg(...).show() # Uses cache
df_clean.unpersist() # Free memory when doneCache storage levels:
df.cache() # MEMORY_AND_DISK (default)
df.persist(StorageLevel.MEMORY_ONLY) # Fastest, fails if doesn't fit
df.persist(StorageLevel.MEMORY_AND_DISK) # Spills to disk if needed
df.persist(StorageLevel.DISK_ONLY) # Slow but survives OOM
df.persist(StorageLevel.MEMORY_ONLY_SER) # Serialized, smaller but slower accessFix 6: collect() Crashes the Driver
all_rows = df.collect() # Driver OOM if df is largecollect() pulls every row from the cluster to the driver. If the data doesn’t fit in the driver’s memory, the JVM crashes.
Alternatives to collect():
# Sample — small subset for inspection
sample = df.sample(fraction=0.01).collect()
# Take first N rows
rows = df.take(100)
# Show in console without returning
df.show(20, truncate=False)
# Write to disk in parallel
df.write.mode("overwrite").parquet("output/")
# Convert to Pandas (still driver-side — same size limit)
pdf = df.toPandas()
# For large DataFrames, process partition-wise
for partition in df.rdd.glom().toLocalIterator():
# partition is a list of rows from one executor
process(partition)toPandas() with Arrow for fast Python DataFrame conversion:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
pdf = df.toPandas() # Now uses Arrow format, 10–100x fasterStill limited by driver memory — only usable for small results.
For Pandas DataFrame operations after toPandas(), see pandas SettingWithCopyWarning.
Fix 7: Shuffle Partitions — Too Many or Too Few
spark.conf.set("spark.sql.shuffle.partitions", "200") # DefaultAfter a shuffle (groupBy, join, distinct), Spark creates 200 partitions by default. This is wrong for both small and large datasets.
Symptoms of wrong partition count:
- Too few partitions: Tasks process too much data each → OOM or slow
- Too many partitions: Task overhead dominates → slow, tiny output files
Rule of thumb: target ~128MB per partition after shuffle.
# For a 100GB dataset, aim for ~800 partitions
spark.conf.set("spark.sql.shuffle.partitions", "800")
# For a 1GB test dataset, reduce to avoid overhead
spark.conf.set("spark.sql.shuffle.partitions", "8")Coalesce before writing to avoid thousands of tiny output files:
# After aggregation, reduce partition count before writing
result.coalesce(10).write.parquet("output/")
# repartition for larger increases (does a full shuffle)
big_result.repartition(100).write.parquet("output/")coalesce() vs repartition():
| Operation | When | Behavior |
|---|---|---|
coalesce(n) | Reducing partitions | Combines existing partitions (no shuffle) |
repartition(n) | Increasing or evenly redistributing | Full shuffle |
Fix 8: Reading Data — File Formats and Schemas
Always provide a schema for production — schema inference is slow and may be wrong:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
schema = StructType([
StructField("user_id", IntegerType(), nullable=False),
StructField("event", StringType(), nullable=True),
StructField("timestamp", TimestampType(), nullable=True),
StructField("amount", IntegerType(), nullable=True),
])
df = spark.read.schema(schema).csv("events.csv", header=True)Prefer Parquet over CSV for any non-trivial data:
# CSV — full scan, no column pruning, no predicate pushdown
df = spark.read.csv("events.csv", header=True)
# Parquet — columnar, supports pushdown, 10x+ faster
df.write.mode("overwrite").parquet("events.parquet")
df = spark.read.parquet("events.parquet")
# Read specific partitions only (partition pruning)
df = spark.read.parquet("events.parquet") \
.filter(col("date") == "2025-04-09") # Pushed down to file scanRead many files as one DataFrame:
df = spark.read.parquet("/data/events/*.parquet") # Glob pattern
df = spark.read.parquet("/data/events/") # Directory — reads all Parquet filesStill Not Working?
Checkpointing for Long Jobs
Long pipelines can lose progress if an executor fails. Checkpoint saves a DataFrame to disk so lineage is truncated:
spark.sparkContext.setCheckpointDir("/checkpoint-dir")
df_heavy = df.join(other_df, "key") \
.groupBy("category") \
.agg(...)
df_heavy.checkpoint() # Materializes to disk, truncates lineage
# Subsequent operations start from the checkpoint
result = df_heavy.filter(...).show()Spark UI for Diagnostics
Spark’s web UI (default port 4040) shows stage-by-stage progress, task durations, and shuffle metrics. Access during a running job:
http://localhost:4040The “SQL” tab shows the physical plan for DataFrame operations — invaluable for diagnosing why a query is slow.
PySpark vs Pandas / Polars
For datasets under 100GB on a single machine, Polars is often faster than PySpark. Spark’s overhead (JVM startup, query planning, shuffle) is too much for small data. For Polars-based alternatives, see Polars not working.
Deploying PySpark Jobs
# Submit to a cluster
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 4g \
--driver-memory 4g \
my_job.py input_path output_pathFor scheduling PySpark jobs in data pipelines, see Airflow not working. For integrating with dbt for SQL-based transformations on top of Spark, see dbt not working.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Ray Not Working — Cluster Init, Object Store Memory, and Actor Lifecycle Errors
How to fix Ray errors — ray.init connection refused, object store full ObjectStoreFullError, worker died unexpectedly, serialization PickleError for remote function, Ray Tune trials fail, Ray cluster version mismatch, and actor ReferenceError.
Fix: Apache Airflow Not Working — DAG Not Found, Task Failures, and Scheduler Issues
How to fix Apache Airflow errors — DAG not appearing in UI, ImportError preventing DAG load, task stuck in running or queued, scheduler not scheduling, XCom too large, connection not found, and database migration errors.
Fix: BeautifulSoup Not Working — Parser Errors, Encoding Issues, and find_all Returns Empty
How to fix BeautifulSoup errors — bs4.FeatureNotFound install lxml, find_all returns empty list, Unicode decode error, JavaScript-rendered content not found, select vs find_all confusion, and slow parsing on large HTML.
Fix: ChromaDB Not Working — Persistent Client, Collection Errors, and Embedding Function Issues
How to fix ChromaDB errors — persistent client not saving data, collection already exists error, dimension mismatch in embeddings, embedding function required, HTTP client connection refused, and memory growing unbounded.