Skip to content

Fix: LangGraph Not Working — State Errors, Checkpointer Setup, and Cyclic Graph Failures

FixDevs ·

Quick Answer

How to fix LangGraph errors — state not updating between nodes, checkpointer thread_id required, StateGraph compile error, conditional edges not routing, streaming events missing, recursion limit exceeded, and interrupt handling.

The Error

You build a simple LangGraph and state doesn’t update between nodes:

def node_a(state):
    return {"count": state["count"] + 1}

# After running, state["count"] is still 0

Or the checkpointer complains about a missing thread ID:

ValueError: Checkpointer requires a configurable with a thread_id key

Or conditional edges route everything to the same path:

workflow.add_conditional_edges(
    "classifier",
    route_fn,
    {"positive": "positive_node", "negative": "negative_node"},
)
# All queries end up in positive_node regardless of route_fn's return

Or the graph runs forever and hits the recursion limit:

RecursionError: Recursion limit of 25 reached without hitting a stop condition.

Or streaming events don’t appear until the graph finishes:

async for event in graph.astream_events(inputs, version="v1"):
    print(event)   # Nothing until the final node completes

LangGraph is a state-machine framework for LLM agents. Unlike chains (linear) or simple agents (function calling), LangGraph explicitly models nodes, edges, and shared state — which is powerful but introduces failure modes around state reducers, cyclic graphs, and checkpointing that don’t exist elsewhere. This guide covers each.

Why This Happens

LangGraph’s state is a TypedDict with optional reducers — functions that merge updates. Without a reducer, updates replace the value entirely; with add_messages or operator.add, they accumulate. Beginners often forget to set reducers on list fields, causing state to reset each node.

The checkpointer persists state between runs (for long conversations and replay) but requires a thread_id in the config every time you invoke the graph. Forgetting this is the single most common LangGraph error.

Conditional edges are a function mapping state → next node name. When the return value doesn’t match any key in the mapping, LangGraph silently defaults to the first registered node, producing wrong routing.

Fix 1: State Updates Don’t Persist — Reducer Problem

from typing import TypedDict
from langgraph.graph import StateGraph

class State(TypedDict):
    messages: list
    count: int

def increment(state: State) -> dict:
    return {"count": state["count"] + 1}

workflow = StateGraph(State)
workflow.add_node("inc", increment)
workflow.add_edge("__start__", "inc")
workflow.add_edge("inc", "__end__")
app = workflow.compile()

result = app.invoke({"count": 0, "messages": []})
print(result["count"])   # 1 — this works

# But list fields get replaced, not appended
def add_message(state: State) -> dict:
    return {"messages": ["hello"]}

# Each call replaces messages entirely — old messages are lost

Fix — use reducers for fields you want to accumulate:

from typing import Annotated, TypedDict
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
import operator

class State(TypedDict):
    # Annotated tells LangGraph to use this reducer to merge updates
    messages: Annotated[list, add_messages]
    logs: Annotated[list, operator.add]   # Simple concatenation
    count: int   # No reducer — updates replace

add_messages is the reducer for chat message lists:

from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph.message import add_messages

# add_messages handles:
# - Appending new messages
# - De-duplicating by ID
# - Replacing messages with the same ID (for edits)

class ChatState(TypedDict):
    messages: Annotated[list, add_messages]

def respond(state: ChatState) -> dict:
    last_message = state["messages"][-1]
    response = AIMessage(content=f"Echo: {last_message.content}")
    # Returning just the new message — reducer handles appending
    return {"messages": [response]}

Custom reducers for specific merge logic:

from typing import Annotated, TypedDict

def merge_dict(left: dict, right: dict) -> dict:
    """Custom reducer — merges two dicts."""
    return {**left, **right}

class State(TypedDict):
    config: Annotated[dict, merge_dict]
    # Each node's config update merges with existing, doesn't replace

Common Mistake: Forgetting the reducer on the messages field is the root cause of chatbots that lose conversation history between turns. If your agent doesn’t remember previous messages, check whether messages is Annotated[list, add_messages] — not just list.

Fix 2: Checkpointer — thread_id Required

ValueError: Checkpointer requires a configurable with a thread_id key

Checkpointers persist state to a storage backend. Every invocation needs a thread_id so the checkpointer knows which conversation to load/save.

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph

# Set up in-memory checkpointer (for development)
memory = MemorySaver()
workflow = StateGraph(State)
# ... add nodes and edges
app = workflow.compile(checkpointer=memory)

# WRONG — no thread_id
result = app.invoke({"messages": [HumanMessage("hi")]})   # ValueError

# CORRECT — pass thread_id in config
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"messages": [HumanMessage("hi")]}, config=config)

# Continue the same conversation later with the same thread_id
result = app.invoke({"messages": [HumanMessage("what did I just say?")]}, config=config)
# The agent remembers because checkpoint loaded previous state

Persistent checkpointer for production — SQLite:

from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3

conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
memory = SqliteSaver(conn)

app = workflow.compile(checkpointer=memory)

Async variant:

from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from aiosqlite import connect

async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
    app = workflow.compile(checkpointer=checkpointer)
    await app.ainvoke(inputs, config={"configurable": {"thread_id": "user-1"}})

PostgreSQL for production:

from langgraph.checkpoint.postgres import PostgresSaver

with PostgresSaver.from_conn_string("postgresql://user:pass@host/db") as checkpointer:
    checkpointer.setup()   # Creates tables on first run
    app = workflow.compile(checkpointer=checkpointer)

Retrieve saved state from a thread:

config = {"configurable": {"thread_id": "user-123"}}
checkpoint = app.get_state(config)
print(checkpoint.values)   # Full state at the last checkpoint
print(checkpoint.next)      # Which nodes will run next

Time-travel — replay from an earlier checkpoint:

# List all checkpoints for this thread
for snapshot in app.get_state_history(config):
    print(snapshot.config["configurable"]["checkpoint_id"], snapshot.values.get("messages", [])[:1])

# Resume from a specific checkpoint
old_config = {"configurable": {"thread_id": "user-123", "checkpoint_id": "abc..."}}
app.invoke(None, config=old_config)   # Re-runs from that point

Fix 3: Conditional Edges — Route Function Return Values

workflow.add_conditional_edges(
    "classifier",
    route_fn,
    {"positive": "positive_node", "negative": "negative_node"},
)

The route function must return a string that matches a key in the mapping. If it returns anything else, LangGraph can’t route.

from langgraph.graph import END

def classify(state: State) -> dict:
    sentiment = analyze(state["text"])
    return {"sentiment": sentiment}   # "positive" or "negative"

def route(state: State) -> str:
    # MUST return a key from the mapping below
    if state["sentiment"] == "positive":
        return "positive"
    elif state["sentiment"] == "negative":
        return "negative"
    else:
        return "end"   # Handles unknown cases

workflow.add_node("classify", classify)
workflow.add_node("positive", handle_positive)
workflow.add_node("negative", handle_negative)

workflow.add_conditional_edges(
    "classify",
    route,
    {
        "positive": "positive",   # Route fn returns "positive" → go to positive node
        "negative": "negative",
        "end": END,                # END is a special sentinel for termination
    },
)

Debug conditional routing by logging:

def route(state: State) -> str:
    decision = "positive" if state["sentiment"] > 0 else "negative"
    print(f"Routing: state={state}, decision={decision}")
    return decision

Using the Command return pattern (LangGraph 0.2+) — a cleaner alternative:

from langgraph.graph import StateGraph
from langgraph.types import Command
from typing import Literal

def classify(state: State) -> Command[Literal["positive", "negative"]]:
    sentiment = analyze(state["text"])
    if sentiment == "positive":
        return Command(goto="positive", update={"sentiment": "positive"})
    else:
        return Command(goto="negative", update={"sentiment": "negative"})

# Classification node both updates state AND determines next node
workflow.add_node("classify", classify)
workflow.add_edge("__start__", "classify")
# No add_conditional_edges needed — Command handles routing

Pro Tip: Prefer Command return values for routing when a node already needs to inspect state to decide. It keeps routing logic and state updates in one place instead of splitting them between a node function and a separate route function. For graphs where many nodes make routing decisions, this dramatically reduces boilerplate.

Fix 4: Recursion Limit and Cyclic Graphs

RecursionError: Recursion limit of 25 reached without hitting a stop condition.

LangGraph allows cycles (unlike linear chains) — agents can loop back to earlier nodes. But an agent that never terminates hits the default limit of 25 steps.

Raise the recursion limit:

config = {
    "configurable": {"thread_id": "x"},
    "recursion_limit": 100,   # Default 25
}
app.invoke(inputs, config=config)

The real fix — ensure the graph has a reachable termination:

from langgraph.graph import END

def should_continue(state: State) -> str:
    # Agent decides to stop if the task is complete
    if state["task_complete"]:
        return "end"
    if len(state["messages"]) > 20:   # Hard limit on messages
        return "end"
    return "continue"

workflow.add_conditional_edges(
    "agent",
    should_continue,
    {"continue": "tools", "end": END},
)

Common agentic loop pattern (ReAct-style):

from langgraph.graph import StateGraph, END
from langchain_core.messages import ToolMessage

def call_model(state):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

def should_continue(state):
    last_message = state["messages"][-1]
    if not last_message.tool_calls:   # Model didn't request a tool → done
        return "end"
    return "continue"

def call_tool(state):
    last_message = state["messages"][-1]
    tool_call = last_message.tool_calls[0]
    result = tools[tool_call["name"]].invoke(tool_call["args"])
    return {"messages": [ToolMessage(content=str(result), tool_call_id=tool_call["id"])]}

workflow = StateGraph(State)
workflow.add_node("agent", call_model)
workflow.add_node("tools", call_tool)

workflow.add_edge("__start__", "agent")
workflow.add_conditional_edges("agent", should_continue, {"continue": "tools", "end": END})
workflow.add_edge("tools", "agent")   # Loop back to model

app = workflow.compile()

Fix 5: Streaming Events Not Appearing

async for event in graph.astream_events(inputs, version="v1"):
    print(event)
# Only prints at the end, not in real-time

Use astream_events with the right version and filter by type:

async for event in graph.astream_events(
    {"messages": [HumanMessage("Hello")]},
    version="v2",   # v2 is the current version
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            print(content, end="", flush=True)
    elif kind == "on_tool_start":
        print(f"\nUsing tool: {event['name']}")
    elif kind == "on_tool_end":
        print(f"\nTool result: {event['data']['output']}")

Stream intermediate state with astream:

async for chunk in graph.astream(inputs, stream_mode="updates"):
    # chunk is {node_name: {key: new_value}} for each node as it finishes
    print(chunk)

Stream modes:

ModeWhat it yields
"values"Full state after each node
"updates"Just the update dict from each node
"messages"Each message as it’s generated
"debug"Detailed task events (for logs)

Multiple stream modes at once:

async for mode, chunk in graph.astream(inputs, stream_mode=["updates", "messages"]):
    print(f"[{mode}] {chunk}")

Fix 6: Human-in-the-Loop — Interrupts

LangGraph lets you pause execution for human review before critical actions (e.g., calling an external API, sending an email).

from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

workflow = StateGraph(State)
workflow.add_node("generate_draft", generate_fn)
workflow.add_node("send_email", send_fn)

workflow.add_edge("__start__", "generate_draft")
workflow.add_edge("generate_draft", "send_email")

# Interrupt BEFORE send_email — requires explicit continuation
app = workflow.compile(
    checkpointer=memory,
    interrupt_before=["send_email"],
)

# Run until interrupt
config = {"configurable": {"thread_id": "x"}}
result = app.invoke({"topic": "meeting"}, config=config)
# Graph pauses before send_email — check the draft

# Inspect the state
state = app.get_state(config)
print(state.values["draft"])

# Human approves — resume
app.invoke(None, config=config)   # Resumes from the last checkpoint

Interrupt after a node (after seeing its output):

app = workflow.compile(
    checkpointer=memory,
    interrupt_after=["generate_draft"],
)

Modify state during interrupt:

# Human edits the draft before resuming
app.update_state(config, {"draft": "Edited version of the draft"})
app.invoke(None, config=config)

Dynamic interrupt from within a node:

from langgraph.errors import NodeInterrupt

def send_email(state):
    if state["email_content"] contains_sensitive_info:
        raise NodeInterrupt("Human review required for sensitive content")
    # ... actually send

Fix 7: Subgraphs and Composition

Complex agents often need nested graphs — a planner at the top level, specialized sub-agents for specific tasks.

from langgraph.graph import StateGraph, END

# Subgraph for research
research_graph = StateGraph(ResearchState)
research_graph.add_node("search", search_fn)
research_graph.add_node("summarize", summarize_fn)
research_graph.add_edge("__start__", "search")
research_graph.add_edge("search", "summarize")
research_graph.add_edge("summarize", END)
research_app = research_graph.compile()

# Main graph uses the subgraph as a node
def run_research(state):
    # Subgraph invocation
    result = research_app.invoke({"query": state["question"]})
    return {"research_output": result["summary"]}

main_graph = StateGraph(MainState)
main_graph.add_node("research", run_research)
main_graph.add_node("generate_answer", answer_fn)
main_graph.add_edge("__start__", "research")
main_graph.add_edge("research", "generate_answer")
main_graph.add_edge("generate_answer", END)

Shared state between parent and subgraph — requires matching keys:

# Subgraph expects keys that are a subset of parent state keys
class ParentState(TypedDict):
    messages: Annotated[list, add_messages]
    user_id: str
    context: str

class SubgraphState(TypedDict):
    messages: Annotated[list, add_messages]   # Matches parent
    context: str                                # Matches parent

# When invoked, subgraph automatically receives these keys from parent state

Fix 8: Debugging and Observability

Enable verbose logging:

import logging
logging.basicConfig(level=logging.INFO)

# Or specifically for langgraph
logging.getLogger("langgraph").setLevel(logging.DEBUG)

Visualize the graph:

from IPython.display import Image, display

# Mermaid diagram
display(Image(app.get_graph().draw_mermaid_png()))

# Print text representation
app.get_graph().print_ascii()

LangSmith integration for production tracing:

import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls__..."
os.environ["LANGCHAIN_PROJECT"] = "my-agent"

# Every graph invocation is now traced in LangSmith
app.invoke(inputs, config=config)

For LangChain-specific tracing and debug patterns, see LangChain Python not working.

Still Not Working?

LangGraph vs Other Agent Frameworks

  • LangGraph — Explicit state machines, checkpointing, human-in-the-loop. Best for complex multi-step agents with branching logic.
  • LangChain Agents — Simpler ReAct-style agents. Quick to set up but harder to customize.
  • LlamaIndex Agents — Best when your agent’s primary job is RAG over documents.
  • CrewAI — Multi-agent collaboration. Higher-level abstractions but less flexible.

For LangChain setup and common errors, see LangChain Python not working. For LlamaIndex RAG patterns that pair well with LangGraph agents, see LlamaIndex not working.

Using Different LLM Providers

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

# OpenAI
model = ChatOpenAI(model="gpt-4", temperature=0.7)

# Anthropic
model = ChatAnthropic(model="claude-opus-4-5", temperature=0.7)

# Local Ollama
from langchain_ollama import ChatOllama
model = ChatOllama(model="llama3", temperature=0)

For OpenAI-specific issues, see OpenAI API not working. For local Ollama model setup, see Ollama not working.

Token Limits and Context Management

Long-running agents accumulate messages until they hit the model’s context limit. Summarize or prune older messages:

from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

def trim_messages(state: State) -> dict:
    messages = state["messages"]
    if len(messages) > 20:
        # Keep system message + last 15
        return {"messages": [messages[0]] + messages[-15:]}
    return {}

Or use LangChain’s trim_messages utility for token-aware trimming.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles