Fix: LangGraph Not Working — State Errors, Checkpointer Setup, and Cyclic Graph Failures
Part of: Python Errors
Quick Answer
How to fix LangGraph errors — state not updating between nodes, checkpointer thread_id required, StateGraph compile error, conditional edges not routing, streaming events missing, recursion limit exceeded, and interrupt handling.
The Error
You build a simple LangGraph and state doesn’t update between nodes:
def node_a(state):
return {"count": state["count"] + 1}
# After running, state["count"] is still 0Or the checkpointer complains about a missing thread ID:
ValueError: Checkpointer requires a configurable with a thread_id keyOr conditional edges route everything to the same path:
workflow.add_conditional_edges(
"classifier",
route_fn,
{"positive": "positive_node", "negative": "negative_node"},
)
# All queries end up in positive_node regardless of route_fn's returnOr the graph runs forever and hits the recursion limit:
RecursionError: Recursion limit of 25 reached without hitting a stop condition.Or streaming events don’t appear until the graph finishes:
async for event in graph.astream_events(inputs, version="v1"):
print(event) # Nothing until the final node completesLangGraph is a state-machine framework for LLM agents. Unlike chains (linear) or simple agents (function calling), LangGraph explicitly models nodes, edges, and shared state — which is powerful but introduces failure modes around state reducers, cyclic graphs, and checkpointing that don’t exist elsewhere. This guide covers each.
Why This Happens
LangGraph’s state is a TypedDict with optional reducers — functions that merge updates. Without a reducer, updates replace the value entirely; with add_messages or operator.add, they accumulate. Beginners often forget to set reducers on list fields, causing state to reset each node.
The checkpointer persists state between runs (for long conversations and replay) but requires a thread_id in the config every time you invoke the graph. Forgetting this is the single most common LangGraph error.
Conditional edges are a function mapping state → next node name. When the return value doesn’t match any key in the mapping, LangGraph silently defaults to the first registered node, producing wrong routing.
Diagnostic Timeline — When State Resets Between Turns
The first reflex when state looks wrong is “let me redo the state schema.” That rarely helps. The actual causes cluster around three places: the checkpointer isn’t wired up, a conditional edge is returning the wrong literal, or you’re confusing END with an interrupt. Walk through it.
Minute 0 — Print the tracking URI of state. Add print(app.get_state(config)) after each invoke. If values is empty between turns, your checkpointer isn’t persisting. If values is populated but the next invoke ignores it, you’re passing a fresh input dict that overwrites the reducer-merged state. The checkpointer doesn’t auto-apply state — your reducers do.
Minute 1 — Confirm thread_id is the same. The single most common LangGraph bug: each invoke generates a new thread_id (because you forgot to pass config), so the checkpointer happily creates a new thread every turn. The graph “remembers nothing” because every call is a brand-new conversation. Log config["configurable"]["thread_id"] at the call site to verify continuity.
Minute 3 — Inspect conditional edge return values. Add print(f"route returns: {repr(decision)}") inside every routing function. Compare against the keys you passed to add_conditional_edges. A return of "END" (string) versus END (sentinel) versus "end" (your custom key) is three different routes, and LangGraph won’t tell you which one mismatched — it routes to whichever key it finds first.
Minute 5 — Distinguish END from interrupt. END terminates the graph; the checkpoint marks it as finished and the next invoke starts a new run. interrupt_before or NodeInterrupt pauses execution and the next invoke resumes from the checkpoint. Confusing the two produces “the graph restarts when I expected it to continue” or “the graph never finishes when I expected it to terminate.” Check state.next after each invoke — () means done, a tuple of node names means waiting on an interrupt.
Minute 8 — Reducer return-type audit. Returning {"messages": new_msg} (single message) versus {"messages": [new_msg]} (list) behaves identically with add_messages but breaks with operator.add. If a list field loses entries between nodes, check whether your reducer expects a list or a singleton.
The first guess is always “redo the state schema.” The actual answer is usually a missing checkpointer config, a conditional-edge return that doesn’t match any mapping key, or an END where you meant interrupt_before.
Fix 1: State Updates Don’t Persist — Reducer Problem
from typing import TypedDict
from langgraph.graph import StateGraph
class State(TypedDict):
messages: list
count: int
def increment(state: State) -> dict:
return {"count": state["count"] + 1}
workflow = StateGraph(State)
workflow.add_node("inc", increment)
workflow.add_edge("__start__", "inc")
workflow.add_edge("inc", "__end__")
app = workflow.compile()
result = app.invoke({"count": 0, "messages": []})
print(result["count"]) # 1 — this works
# But list fields get replaced, not appended
def add_message(state: State) -> dict:
return {"messages": ["hello"]}
# Each call replaces messages entirely — old messages are lostFix — use reducers for fields you want to accumulate:
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
import operator
class State(TypedDict):
# Annotated tells LangGraph to use this reducer to merge updates
messages: Annotated[list, add_messages]
logs: Annotated[list, operator.add] # Simple concatenation
count: int # No reducer — updates replaceadd_messages is the reducer for chat message lists:
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph.message import add_messages
# add_messages handles:
# - Appending new messages
# - De-duplicating by ID
# - Replacing messages with the same ID (for edits)
class ChatState(TypedDict):
messages: Annotated[list, add_messages]
def respond(state: ChatState) -> dict:
last_message = state["messages"][-1]
response = AIMessage(content=f"Echo: {last_message.content}")
# Returning just the new message — reducer handles appending
return {"messages": [response]}Custom reducers for specific merge logic:
from typing import Annotated, TypedDict
def merge_dict(left: dict, right: dict) -> dict:
"""Custom reducer — merges two dicts."""
return {**left, **right}
class State(TypedDict):
config: Annotated[dict, merge_dict]
# Each node's config update merges with existing, doesn't replaceCommon Mistake: Forgetting the reducer on the messages field is the root cause of chatbots that lose conversation history between turns. If your agent doesn’t remember previous messages, check whether messages is Annotated[list, add_messages] — not just list.
Fix 2: Checkpointer — thread_id Required
ValueError: Checkpointer requires a configurable with a thread_id keyCheckpointers persist state to a storage backend. Every invocation needs a thread_id so the checkpointer knows which conversation to load/save.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
# Set up in-memory checkpointer (for development)
memory = MemorySaver()
workflow = StateGraph(State)
# ... add nodes and edges
app = workflow.compile(checkpointer=memory)
# WRONG — no thread_id
result = app.invoke({"messages": [HumanMessage("hi")]}) # ValueError
# CORRECT — pass thread_id in config
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"messages": [HumanMessage("hi")]}, config=config)
# Continue the same conversation later with the same thread_id
result = app.invoke({"messages": [HumanMessage("what did I just say?")]}, config=config)
# The agent remembers because checkpoint loaded previous statePersistent checkpointer for production — SQLite:
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
memory = SqliteSaver(conn)
app = workflow.compile(checkpointer=memory)Async variant:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from aiosqlite import connect
async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
app = workflow.compile(checkpointer=checkpointer)
await app.ainvoke(inputs, config={"configurable": {"thread_id": "user-1"}})PostgreSQL for production:
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://user:pass@host/db") as checkpointer:
checkpointer.setup() # Creates tables on first run
app = workflow.compile(checkpointer=checkpointer)Retrieve saved state from a thread:
config = {"configurable": {"thread_id": "user-123"}}
checkpoint = app.get_state(config)
print(checkpoint.values) # Full state at the last checkpoint
print(checkpoint.next) # Which nodes will run nextTime-travel — replay from an earlier checkpoint:
# List all checkpoints for this thread
for snapshot in app.get_state_history(config):
print(snapshot.config["configurable"]["checkpoint_id"], snapshot.values.get("messages", [])[:1])
# Resume from a specific checkpoint
old_config = {"configurable": {"thread_id": "user-123", "checkpoint_id": "abc..."}}
app.invoke(None, config=old_config) # Re-runs from that pointFix 3: Conditional Edges — Route Function Return Values
workflow.add_conditional_edges(
"classifier",
route_fn,
{"positive": "positive_node", "negative": "negative_node"},
)The route function must return a string that matches a key in the mapping. If it returns anything else, LangGraph can’t route.
from langgraph.graph import END
def classify(state: State) -> dict:
sentiment = analyze(state["text"])
return {"sentiment": sentiment} # "positive" or "negative"
def route(state: State) -> str:
# MUST return a key from the mapping below
if state["sentiment"] == "positive":
return "positive"
elif state["sentiment"] == "negative":
return "negative"
else:
return "end" # Handles unknown cases
workflow.add_node("classify", classify)
workflow.add_node("positive", handle_positive)
workflow.add_node("negative", handle_negative)
workflow.add_conditional_edges(
"classify",
route,
{
"positive": "positive", # Route fn returns "positive" → go to positive node
"negative": "negative",
"end": END, # END is a special sentinel for termination
},
)Debug conditional routing by logging:
def route(state: State) -> str:
decision = "positive" if state["sentiment"] > 0 else "negative"
print(f"Routing: state={state}, decision={decision}")
return decisionUsing the Command return pattern (LangGraph 0.2+) — a cleaner alternative:
from langgraph.graph import StateGraph
from langgraph.types import Command
from typing import Literal
def classify(state: State) -> Command[Literal["positive", "negative"]]:
sentiment = analyze(state["text"])
if sentiment == "positive":
return Command(goto="positive", update={"sentiment": "positive"})
else:
return Command(goto="negative", update={"sentiment": "negative"})
# Classification node both updates state AND determines next node
workflow.add_node("classify", classify)
workflow.add_edge("__start__", "classify")
# No add_conditional_edges needed — Command handles routingPro Tip: Prefer Command return values for routing when a node already needs to inspect state to decide. It keeps routing logic and state updates in one place instead of splitting them between a node function and a separate route function. For graphs where many nodes make routing decisions, this dramatically reduces boilerplate.
Fix 4: Recursion Limit and Cyclic Graphs
RecursionError: Recursion limit of 25 reached without hitting a stop condition.LangGraph allows cycles (unlike linear chains) — agents can loop back to earlier nodes. But an agent that never terminates hits the default limit of 25 steps.
Raise the recursion limit:
config = {
"configurable": {"thread_id": "x"},
"recursion_limit": 100, # Default 25
}
app.invoke(inputs, config=config)The real fix — ensure the graph has a reachable termination:
from langgraph.graph import END
def should_continue(state: State) -> str:
# Agent decides to stop if the task is complete
if state["task_complete"]:
return "end"
if len(state["messages"]) > 20: # Hard limit on messages
return "end"
return "continue"
workflow.add_conditional_edges(
"agent",
should_continue,
{"continue": "tools", "end": END},
)Common agentic loop pattern (ReAct-style):
from langgraph.graph import StateGraph, END
from langchain_core.messages import ToolMessage
def call_model(state):
response = model.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state):
last_message = state["messages"][-1]
if not last_message.tool_calls: # Model didn't request a tool → done
return "end"
return "continue"
def call_tool(state):
last_message = state["messages"][-1]
tool_call = last_message.tool_calls[0]
result = tools[tool_call["name"]].invoke(tool_call["args"])
return {"messages": [ToolMessage(content=str(result), tool_call_id=tool_call["id"])]}
workflow = StateGraph(State)
workflow.add_node("agent", call_model)
workflow.add_node("tools", call_tool)
workflow.add_edge("__start__", "agent")
workflow.add_conditional_edges("agent", should_continue, {"continue": "tools", "end": END})
workflow.add_edge("tools", "agent") # Loop back to model
app = workflow.compile()Fix 5: Streaming Events Not Appearing
async for event in graph.astream_events(inputs, version="v1"):
print(event)
# Only prints at the end, not in real-timeUse astream_events with the right version and filter by type:
async for event in graph.astream_events(
{"messages": [HumanMessage("Hello")]},
version="v2", # v2 is the current version
):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="", flush=True)
elif kind == "on_tool_start":
print(f"\nUsing tool: {event['name']}")
elif kind == "on_tool_end":
print(f"\nTool result: {event['data']['output']}")Stream intermediate state with astream:
async for chunk in graph.astream(inputs, stream_mode="updates"):
# chunk is {node_name: {key: new_value}} for each node as it finishes
print(chunk)Stream modes:
| Mode | What it yields |
|---|---|
"values" | Full state after each node |
"updates" | Just the update dict from each node |
"messages" | Each message as it’s generated |
"debug" | Detailed task events (for logs) |
Multiple stream modes at once:
async for mode, chunk in graph.astream(inputs, stream_mode=["updates", "messages"]):
print(f"[{mode}] {chunk}")Fix 6: Human-in-the-Loop — Interrupts
LangGraph lets you pause execution for human review before critical actions (e.g., calling an external API, sending an email).
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
workflow = StateGraph(State)
workflow.add_node("generate_draft", generate_fn)
workflow.add_node("send_email", send_fn)
workflow.add_edge("__start__", "generate_draft")
workflow.add_edge("generate_draft", "send_email")
# Interrupt BEFORE send_email — requires explicit continuation
app = workflow.compile(
checkpointer=memory,
interrupt_before=["send_email"],
)
# Run until interrupt
config = {"configurable": {"thread_id": "x"}}
result = app.invoke({"topic": "meeting"}, config=config)
# Graph pauses before send_email — check the draft
# Inspect the state
state = app.get_state(config)
print(state.values["draft"])
# Human approves — resume
app.invoke(None, config=config) # Resumes from the last checkpointInterrupt after a node (after seeing its output):
app = workflow.compile(
checkpointer=memory,
interrupt_after=["generate_draft"],
)Modify state during interrupt:
# Human edits the draft before resuming
app.update_state(config, {"draft": "Edited version of the draft"})
app.invoke(None, config=config)Dynamic interrupt from within a node:
from langgraph.errors import NodeInterrupt
def send_email(state):
if state["email_content"] contains_sensitive_info:
raise NodeInterrupt("Human review required for sensitive content")
# ... actually sendFix 7: Subgraphs and Composition
Complex agents often need nested graphs — a planner at the top level, specialized sub-agents for specific tasks.
from langgraph.graph import StateGraph, END
# Subgraph for research
research_graph = StateGraph(ResearchState)
research_graph.add_node("search", search_fn)
research_graph.add_node("summarize", summarize_fn)
research_graph.add_edge("__start__", "search")
research_graph.add_edge("search", "summarize")
research_graph.add_edge("summarize", END)
research_app = research_graph.compile()
# Main graph uses the subgraph as a node
def run_research(state):
# Subgraph invocation
result = research_app.invoke({"query": state["question"]})
return {"research_output": result["summary"]}
main_graph = StateGraph(MainState)
main_graph.add_node("research", run_research)
main_graph.add_node("generate_answer", answer_fn)
main_graph.add_edge("__start__", "research")
main_graph.add_edge("research", "generate_answer")
main_graph.add_edge("generate_answer", END)Shared state between parent and subgraph — requires matching keys:
# Subgraph expects keys that are a subset of parent state keys
class ParentState(TypedDict):
messages: Annotated[list, add_messages]
user_id: str
context: str
class SubgraphState(TypedDict):
messages: Annotated[list, add_messages] # Matches parent
context: str # Matches parent
# When invoked, subgraph automatically receives these keys from parent stateFix 8: Debugging and Observability
Enable verbose logging:
import logging
logging.basicConfig(level=logging.INFO)
# Or specifically for langgraph
logging.getLogger("langgraph").setLevel(logging.DEBUG)Visualize the graph:
from IPython.display import Image, display
# Mermaid diagram
display(Image(app.get_graph().draw_mermaid_png()))
# Print text representation
app.get_graph().print_ascii()LangSmith integration for production tracing:
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls__..."
os.environ["LANGCHAIN_PROJECT"] = "my-agent"
# Every graph invocation is now traced in LangSmith
app.invoke(inputs, config=config)For LangChain-specific tracing and debug patterns, see LangChain Python not working.
Still Not Working?
LangGraph vs Other Agent Frameworks
- LangGraph — Explicit state machines, checkpointing, human-in-the-loop. Best for complex multi-step agents with branching logic.
- LangChain Agents — Simpler ReAct-style agents. Quick to set up but harder to customize.
- LlamaIndex Agents — Best when your agent’s primary job is RAG over documents.
- CrewAI — Multi-agent collaboration. Higher-level abstractions but less flexible.
For LlamaIndex RAG patterns that pair well with LangGraph agents, see LlamaIndex not working.
Using Different LLM Providers
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
# OpenAI
model = ChatOpenAI(model="gpt-4", temperature=0.7)
# Anthropic
model = ChatAnthropic(model="claude-opus-4-5", temperature=0.7)
# Local Ollama
from langchain_ollama import ChatOllama
model = ChatOllama(model="llama3", temperature=0)For OpenAI-specific issues, see OpenAI API not working. For local Ollama model setup, see Ollama not working.
Token Limits and Context Management
Long-running agents accumulate messages until they hit the model’s context limit. Summarize or prune older messages:
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
def trim_messages(state: State) -> dict:
messages = state["messages"]
if len(messages) > 20:
# Keep system message + last 15
return {"messages": [messages[0]] + messages[-15:]}
return {}Or use LangChain’s trim_messages utility for token-aware trimming.
Checkpoint Bloat in Production
SQLite and Postgres checkpointers write full state snapshots at every step. With long conversations or rich state objects, the checkpoint table grows fast — hundreds of MB per active thread over a week. Schedule a cleanup job that deletes checkpoints older than N days, or store only the latest checkpoint per thread by truncating on resume. For SQLite, vacuum after deletes to actually reclaim disk space.
Concurrent Invocations on the Same thread_id
If two requests for the same thread_id hit the graph simultaneously (rare in chat, common in webhooks), both load the same checkpoint, both write back, and the second silently overwrites the first. There’s no built-in lock. Either serialize per-thread (queue) or generate per-message thread_ids if you don’t actually need persistent memory across calls.
Subgraph State Key Collisions
When a parent and subgraph share a key like messages with add_messages, every message produced by the subgraph also lands in the parent. That’s usually what you want for chat. But if a subgraph uses messages for internal scratchpad reasoning, those internal messages leak into the parent transcript. Rename the subgraph’s internal key to internal_messages or use a non-shared state schema for clean isolation.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: CrewAI Not Working — Agent Delegation, Task Context, and LLM Configuration Errors
How to fix CrewAI errors — LLM not configured ValidationError, agent delegation loop, task context not passed between agents, tool output truncated, process hierarchical vs sequential, and memory not persisting across runs.
Fix: LangChain Python Not Working — ImportError, Pydantic, and Deprecated Classes
How to fix LangChain Python errors — ImportError from package split, Pydantic v2 compatibility, AgentExecutor deprecated, ConversationBufferMemory removed, LCEL output type mismatches, and tool calling failures.
Fix: Hugging Face Transformers Not Working — OSError, CUDA OOM, and Generation Errors
How to fix Hugging Face Transformers errors — OSError can't load tokenizer, gated repo access, CUDA out of memory with device_map auto, bitsandbytes not installed, tokenizer padding mismatch, pad_token_id warning, and LoRA adapter loading failures.
Fix: LlamaIndex Not Working — Import Errors, Vector Store Issues, and Query Engine Failures
How to fix LlamaIndex errors — ImportError llama_index.core module not found, ServiceContext deprecated use Settings instead, vector store index not persisting, query engine returns irrelevant results, and LlamaIndex 0.10 migration.