Fix: LangGraph Not Working — State Errors, Checkpointer Setup, and Cyclic Graph Failures
Quick Answer
How to fix LangGraph errors — state not updating between nodes, checkpointer thread_id required, StateGraph compile error, conditional edges not routing, streaming events missing, recursion limit exceeded, and interrupt handling.
The Error
You build a simple LangGraph and state doesn’t update between nodes:
def node_a(state):
return {"count": state["count"] + 1}
# After running, state["count"] is still 0Or the checkpointer complains about a missing thread ID:
ValueError: Checkpointer requires a configurable with a thread_id keyOr conditional edges route everything to the same path:
workflow.add_conditional_edges(
"classifier",
route_fn,
{"positive": "positive_node", "negative": "negative_node"},
)
# All queries end up in positive_node regardless of route_fn's returnOr the graph runs forever and hits the recursion limit:
RecursionError: Recursion limit of 25 reached without hitting a stop condition.Or streaming events don’t appear until the graph finishes:
async for event in graph.astream_events(inputs, version="v1"):
print(event) # Nothing until the final node completesLangGraph is a state-machine framework for LLM agents. Unlike chains (linear) or simple agents (function calling), LangGraph explicitly models nodes, edges, and shared state — which is powerful but introduces failure modes around state reducers, cyclic graphs, and checkpointing that don’t exist elsewhere. This guide covers each.
Why This Happens
LangGraph’s state is a TypedDict with optional reducers — functions that merge updates. Without a reducer, updates replace the value entirely; with add_messages or operator.add, they accumulate. Beginners often forget to set reducers on list fields, causing state to reset each node.
The checkpointer persists state between runs (for long conversations and replay) but requires a thread_id in the config every time you invoke the graph. Forgetting this is the single most common LangGraph error.
Conditional edges are a function mapping state → next node name. When the return value doesn’t match any key in the mapping, LangGraph silently defaults to the first registered node, producing wrong routing.
Fix 1: State Updates Don’t Persist — Reducer Problem
from typing import TypedDict
from langgraph.graph import StateGraph
class State(TypedDict):
messages: list
count: int
def increment(state: State) -> dict:
return {"count": state["count"] + 1}
workflow = StateGraph(State)
workflow.add_node("inc", increment)
workflow.add_edge("__start__", "inc")
workflow.add_edge("inc", "__end__")
app = workflow.compile()
result = app.invoke({"count": 0, "messages": []})
print(result["count"]) # 1 — this works
# But list fields get replaced, not appended
def add_message(state: State) -> dict:
return {"messages": ["hello"]}
# Each call replaces messages entirely — old messages are lostFix — use reducers for fields you want to accumulate:
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
import operator
class State(TypedDict):
# Annotated tells LangGraph to use this reducer to merge updates
messages: Annotated[list, add_messages]
logs: Annotated[list, operator.add] # Simple concatenation
count: int # No reducer — updates replaceadd_messages is the reducer for chat message lists:
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph.message import add_messages
# add_messages handles:
# - Appending new messages
# - De-duplicating by ID
# - Replacing messages with the same ID (for edits)
class ChatState(TypedDict):
messages: Annotated[list, add_messages]
def respond(state: ChatState) -> dict:
last_message = state["messages"][-1]
response = AIMessage(content=f"Echo: {last_message.content}")
# Returning just the new message — reducer handles appending
return {"messages": [response]}Custom reducers for specific merge logic:
from typing import Annotated, TypedDict
def merge_dict(left: dict, right: dict) -> dict:
"""Custom reducer — merges two dicts."""
return {**left, **right}
class State(TypedDict):
config: Annotated[dict, merge_dict]
# Each node's config update merges with existing, doesn't replaceCommon Mistake: Forgetting the reducer on the messages field is the root cause of chatbots that lose conversation history between turns. If your agent doesn’t remember previous messages, check whether messages is Annotated[list, add_messages] — not just list.
Fix 2: Checkpointer — thread_id Required
ValueError: Checkpointer requires a configurable with a thread_id keyCheckpointers persist state to a storage backend. Every invocation needs a thread_id so the checkpointer knows which conversation to load/save.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
# Set up in-memory checkpointer (for development)
memory = MemorySaver()
workflow = StateGraph(State)
# ... add nodes and edges
app = workflow.compile(checkpointer=memory)
# WRONG — no thread_id
result = app.invoke({"messages": [HumanMessage("hi")]}) # ValueError
# CORRECT — pass thread_id in config
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"messages": [HumanMessage("hi")]}, config=config)
# Continue the same conversation later with the same thread_id
result = app.invoke({"messages": [HumanMessage("what did I just say?")]}, config=config)
# The agent remembers because checkpoint loaded previous statePersistent checkpointer for production — SQLite:
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
memory = SqliteSaver(conn)
app = workflow.compile(checkpointer=memory)Async variant:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from aiosqlite import connect
async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
app = workflow.compile(checkpointer=checkpointer)
await app.ainvoke(inputs, config={"configurable": {"thread_id": "user-1"}})PostgreSQL for production:
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://user:pass@host/db") as checkpointer:
checkpointer.setup() # Creates tables on first run
app = workflow.compile(checkpointer=checkpointer)Retrieve saved state from a thread:
config = {"configurable": {"thread_id": "user-123"}}
checkpoint = app.get_state(config)
print(checkpoint.values) # Full state at the last checkpoint
print(checkpoint.next) # Which nodes will run nextTime-travel — replay from an earlier checkpoint:
# List all checkpoints for this thread
for snapshot in app.get_state_history(config):
print(snapshot.config["configurable"]["checkpoint_id"], snapshot.values.get("messages", [])[:1])
# Resume from a specific checkpoint
old_config = {"configurable": {"thread_id": "user-123", "checkpoint_id": "abc..."}}
app.invoke(None, config=old_config) # Re-runs from that pointFix 3: Conditional Edges — Route Function Return Values
workflow.add_conditional_edges(
"classifier",
route_fn,
{"positive": "positive_node", "negative": "negative_node"},
)The route function must return a string that matches a key in the mapping. If it returns anything else, LangGraph can’t route.
from langgraph.graph import END
def classify(state: State) -> dict:
sentiment = analyze(state["text"])
return {"sentiment": sentiment} # "positive" or "negative"
def route(state: State) -> str:
# MUST return a key from the mapping below
if state["sentiment"] == "positive":
return "positive"
elif state["sentiment"] == "negative":
return "negative"
else:
return "end" # Handles unknown cases
workflow.add_node("classify", classify)
workflow.add_node("positive", handle_positive)
workflow.add_node("negative", handle_negative)
workflow.add_conditional_edges(
"classify",
route,
{
"positive": "positive", # Route fn returns "positive" → go to positive node
"negative": "negative",
"end": END, # END is a special sentinel for termination
},
)Debug conditional routing by logging:
def route(state: State) -> str:
decision = "positive" if state["sentiment"] > 0 else "negative"
print(f"Routing: state={state}, decision={decision}")
return decisionUsing the Command return pattern (LangGraph 0.2+) — a cleaner alternative:
from langgraph.graph import StateGraph
from langgraph.types import Command
from typing import Literal
def classify(state: State) -> Command[Literal["positive", "negative"]]:
sentiment = analyze(state["text"])
if sentiment == "positive":
return Command(goto="positive", update={"sentiment": "positive"})
else:
return Command(goto="negative", update={"sentiment": "negative"})
# Classification node both updates state AND determines next node
workflow.add_node("classify", classify)
workflow.add_edge("__start__", "classify")
# No add_conditional_edges needed — Command handles routingPro Tip: Prefer Command return values for routing when a node already needs to inspect state to decide. It keeps routing logic and state updates in one place instead of splitting them between a node function and a separate route function. For graphs where many nodes make routing decisions, this dramatically reduces boilerplate.
Fix 4: Recursion Limit and Cyclic Graphs
RecursionError: Recursion limit of 25 reached without hitting a stop condition.LangGraph allows cycles (unlike linear chains) — agents can loop back to earlier nodes. But an agent that never terminates hits the default limit of 25 steps.
Raise the recursion limit:
config = {
"configurable": {"thread_id": "x"},
"recursion_limit": 100, # Default 25
}
app.invoke(inputs, config=config)The real fix — ensure the graph has a reachable termination:
from langgraph.graph import END
def should_continue(state: State) -> str:
# Agent decides to stop if the task is complete
if state["task_complete"]:
return "end"
if len(state["messages"]) > 20: # Hard limit on messages
return "end"
return "continue"
workflow.add_conditional_edges(
"agent",
should_continue,
{"continue": "tools", "end": END},
)Common agentic loop pattern (ReAct-style):
from langgraph.graph import StateGraph, END
from langchain_core.messages import ToolMessage
def call_model(state):
response = model.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state):
last_message = state["messages"][-1]
if not last_message.tool_calls: # Model didn't request a tool → done
return "end"
return "continue"
def call_tool(state):
last_message = state["messages"][-1]
tool_call = last_message.tool_calls[0]
result = tools[tool_call["name"]].invoke(tool_call["args"])
return {"messages": [ToolMessage(content=str(result), tool_call_id=tool_call["id"])]}
workflow = StateGraph(State)
workflow.add_node("agent", call_model)
workflow.add_node("tools", call_tool)
workflow.add_edge("__start__", "agent")
workflow.add_conditional_edges("agent", should_continue, {"continue": "tools", "end": END})
workflow.add_edge("tools", "agent") # Loop back to model
app = workflow.compile()Fix 5: Streaming Events Not Appearing
async for event in graph.astream_events(inputs, version="v1"):
print(event)
# Only prints at the end, not in real-timeUse astream_events with the right version and filter by type:
async for event in graph.astream_events(
{"messages": [HumanMessage("Hello")]},
version="v2", # v2 is the current version
):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="", flush=True)
elif kind == "on_tool_start":
print(f"\nUsing tool: {event['name']}")
elif kind == "on_tool_end":
print(f"\nTool result: {event['data']['output']}")Stream intermediate state with astream:
async for chunk in graph.astream(inputs, stream_mode="updates"):
# chunk is {node_name: {key: new_value}} for each node as it finishes
print(chunk)Stream modes:
| Mode | What it yields |
|---|---|
"values" | Full state after each node |
"updates" | Just the update dict from each node |
"messages" | Each message as it’s generated |
"debug" | Detailed task events (for logs) |
Multiple stream modes at once:
async for mode, chunk in graph.astream(inputs, stream_mode=["updates", "messages"]):
print(f"[{mode}] {chunk}")Fix 6: Human-in-the-Loop — Interrupts
LangGraph lets you pause execution for human review before critical actions (e.g., calling an external API, sending an email).
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
workflow = StateGraph(State)
workflow.add_node("generate_draft", generate_fn)
workflow.add_node("send_email", send_fn)
workflow.add_edge("__start__", "generate_draft")
workflow.add_edge("generate_draft", "send_email")
# Interrupt BEFORE send_email — requires explicit continuation
app = workflow.compile(
checkpointer=memory,
interrupt_before=["send_email"],
)
# Run until interrupt
config = {"configurable": {"thread_id": "x"}}
result = app.invoke({"topic": "meeting"}, config=config)
# Graph pauses before send_email — check the draft
# Inspect the state
state = app.get_state(config)
print(state.values["draft"])
# Human approves — resume
app.invoke(None, config=config) # Resumes from the last checkpointInterrupt after a node (after seeing its output):
app = workflow.compile(
checkpointer=memory,
interrupt_after=["generate_draft"],
)Modify state during interrupt:
# Human edits the draft before resuming
app.update_state(config, {"draft": "Edited version of the draft"})
app.invoke(None, config=config)Dynamic interrupt from within a node:
from langgraph.errors import NodeInterrupt
def send_email(state):
if state["email_content"] contains_sensitive_info:
raise NodeInterrupt("Human review required for sensitive content")
# ... actually sendFix 7: Subgraphs and Composition
Complex agents often need nested graphs — a planner at the top level, specialized sub-agents for specific tasks.
from langgraph.graph import StateGraph, END
# Subgraph for research
research_graph = StateGraph(ResearchState)
research_graph.add_node("search", search_fn)
research_graph.add_node("summarize", summarize_fn)
research_graph.add_edge("__start__", "search")
research_graph.add_edge("search", "summarize")
research_graph.add_edge("summarize", END)
research_app = research_graph.compile()
# Main graph uses the subgraph as a node
def run_research(state):
# Subgraph invocation
result = research_app.invoke({"query": state["question"]})
return {"research_output": result["summary"]}
main_graph = StateGraph(MainState)
main_graph.add_node("research", run_research)
main_graph.add_node("generate_answer", answer_fn)
main_graph.add_edge("__start__", "research")
main_graph.add_edge("research", "generate_answer")
main_graph.add_edge("generate_answer", END)Shared state between parent and subgraph — requires matching keys:
# Subgraph expects keys that are a subset of parent state keys
class ParentState(TypedDict):
messages: Annotated[list, add_messages]
user_id: str
context: str
class SubgraphState(TypedDict):
messages: Annotated[list, add_messages] # Matches parent
context: str # Matches parent
# When invoked, subgraph automatically receives these keys from parent stateFix 8: Debugging and Observability
Enable verbose logging:
import logging
logging.basicConfig(level=logging.INFO)
# Or specifically for langgraph
logging.getLogger("langgraph").setLevel(logging.DEBUG)Visualize the graph:
from IPython.display import Image, display
# Mermaid diagram
display(Image(app.get_graph().draw_mermaid_png()))
# Print text representation
app.get_graph().print_ascii()LangSmith integration for production tracing:
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls__..."
os.environ["LANGCHAIN_PROJECT"] = "my-agent"
# Every graph invocation is now traced in LangSmith
app.invoke(inputs, config=config)For LangChain-specific tracing and debug patterns, see LangChain Python not working.
Still Not Working?
LangGraph vs Other Agent Frameworks
- LangGraph — Explicit state machines, checkpointing, human-in-the-loop. Best for complex multi-step agents with branching logic.
- LangChain Agents — Simpler ReAct-style agents. Quick to set up but harder to customize.
- LlamaIndex Agents — Best when your agent’s primary job is RAG over documents.
- CrewAI — Multi-agent collaboration. Higher-level abstractions but less flexible.
For LangChain setup and common errors, see LangChain Python not working. For LlamaIndex RAG patterns that pair well with LangGraph agents, see LlamaIndex not working.
Using Different LLM Providers
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
# OpenAI
model = ChatOpenAI(model="gpt-4", temperature=0.7)
# Anthropic
model = ChatAnthropic(model="claude-opus-4-5", temperature=0.7)
# Local Ollama
from langchain_ollama import ChatOllama
model = ChatOllama(model="llama3", temperature=0)For OpenAI-specific issues, see OpenAI API not working. For local Ollama model setup, see Ollama not working.
Token Limits and Context Management
Long-running agents accumulate messages until they hit the model’s context limit. Summarize or prune older messages:
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
def trim_messages(state: State) -> dict:
messages = state["messages"]
if len(messages) > 20:
# Keep system message + last 15
return {"messages": [messages[0]] + messages[-15:]}
return {}Or use LangChain’s trim_messages utility for token-aware trimming.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: CrewAI Not Working — Agent Delegation, Task Context, and LLM Configuration Errors
How to fix CrewAI errors — LLM not configured ValidationError, agent delegation loop, task context not passed between agents, tool output truncated, process hierarchical vs sequential, and memory not persisting across runs.
Fix: LangChain Python Not Working — ImportError, Pydantic, and Deprecated Classes
How to fix LangChain Python errors — ImportError from package split, Pydantic v2 compatibility, AgentExecutor deprecated, ConversationBufferMemory removed, LCEL output type mismatches, and tool calling failures.
Fix: Hugging Face Transformers Not Working — OSError, CUDA OOM, and Generation Errors
How to fix Hugging Face Transformers errors — OSError can't load tokenizer, gated repo access, CUDA out of memory with device_map auto, bitsandbytes not installed, tokenizer padding mismatch, pad_token_id warning, and LoRA adapter loading failures.
Fix: LlamaIndex Not Working — Import Errors, Vector Store Issues, and Query Engine Failures
How to fix LlamaIndex errors — ImportError llama_index.core module not found, ServiceContext deprecated use Settings instead, vector store index not persisting, query engine returns irrelevant results, and LlamaIndex 0.10 migration.