Skip to main content

Overview

StateGraph automatically saves execution state at every step, enabling powerful features:
  • 🔄 Resume from failures - Restart where you left off
  • 🕐 Time travel - Access any historical state
  • 🌳 Fork execution - Create alternative timelines
  • 💾 Multi-session - Continue conversations across sessions

Checkpointers

Checkpointers store graph state. Choose based on your needs:
CheckpointerStorageUse CasePersistence
MemorySaverIn-memoryDevelopment, testingLost on restart
SqliteCheckpointerSQLite fileProduction, local appsSurvives restarts

Using MemorySaver

from upsonic.graphv2 import StateGraph, MemorySaver

builder = StateGraph(MyState)
# ... add nodes and edges ...

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

Using SqliteCheckpointer

import sqlite3
from upsonic.graphv2 import StateGraph, SqliteCheckpointer

conn = sqlite3.connect("graph_checkpoints.db")
checkpointer = SqliteCheckpointer(conn)
graph = builder.compile(checkpointer=checkpointer)

Threads

Threads organize independent execution histories. Each thread has its own state and checkpoint history.

Multi-Turn Conversations

from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class ChatState(TypedDict):
    messages: Annotated[List[str], operator.add]
    turn_count: Annotated[int, lambda a, b: a + b]

def chat_node(state: ChatState) -> dict:
    history = state["messages"]
    response = f"Response to: {history[-1] if history else 'Hello'}"
    return {
        "messages": [response],
        "turn_count": 1
    }

builder = StateGraph(ChatState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Conversation thread
config = {"configurable": {"thread_id": "conversation-1"}}

# Turn 1
result1 = graph.invoke(
    {"messages": ["User: Hello"], "turn_count": 0},
    config=config
)

# Turn 2 - continues from previous state
result2 = graph.invoke(
    {"messages": ["User: What's the weather?"]},
    config=config
)

print(f"Turn count: {result2['turn_count']}")  # 2
When using checkpointers, you only need to provide new state changes. The graph automatically loads and merges with the previous state.

Time Travel

Access any point in execution history:
# Get last 10 checkpoints
history = graph.get_state_history(config, limit=10)

for i, checkpoint in enumerate(history):
    print(f"Checkpoint {i + 1}:")
    print(f"  State: {checkpoint.values}")
    print(f"  Timestamp: {checkpoint.metadata.get('timestamp')}")

Forking Execution

Create alternative timelines by resuming from historical checkpoints:
# Execute a few steps
graph.invoke({"step": 1}, config=config)
graph.invoke({"step": 2}, config=config)

# Get history
history = graph.get_state_history(config)

# Fork from step 1
fork_checkpoint = history[-1]  # Oldest checkpoint

fork_config = {
    "configurable": {
        "thread_id": "experiment-1",
        "checkpoint_id": fork_checkpoint.config['configurable']['checkpoint_id']
    }
}

# Continue from that point with different input
result = graph.invoke({"alternative_input": "data"}, config=fork_config)

Durability Modes

Control when checkpoints are saved:
ModeBehaviorUse Case
syncSave before continuingMaximum safety, slower
asyncSave in backgroundBalance of safety and speed (default)
exitSave only on completionMaximum speed, less safe
# Sync - guaranteed persistence at each step
graph = builder.compile(
    checkpointer=checkpointer,
    durability="sync"
)

# Async - background persistence (default)
graph = builder.compile(
    checkpointer=checkpointer,
    durability="async"
)

# Exit - only persist at the end
graph = builder.compile(
    checkpointer=checkpointer,
    durability="exit"
)

Next Steps