Overview
StateGraph automatically saves execution state at every step, enabling powerful features:
- 🔄 Resume from failures - Restart where you left off
- 🕐 Time travel - Access any historical state
- 🌳 Fork execution - Create alternative timelines
- 💾 Multi-session - Continue conversations across sessions
Checkpointers
Checkpointers store graph state. Choose based on your needs:
| Checkpointer | Storage | Use Case | Persistence |
MemorySaver | In-memory | Development, testing | Lost on restart |
SqliteCheckpointer | SQLite file | Production, local apps | Survives restarts |
Using MemorySaver
from upsonic.graphv2 import StateGraph, MemorySaver
builder = StateGraph(MyState)
# ... add nodes and edges ...
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
Using SqliteCheckpointer
import sqlite3
from upsonic.graphv2 import StateGraph, SqliteCheckpointer
conn = sqlite3.connect("graph_checkpoints.db")
checkpointer = SqliteCheckpointer(conn)
graph = builder.compile(checkpointer=checkpointer)
Threads
Threads organize independent execution histories. Each thread has its own state and checkpoint history.
Multi-Turn Conversations
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
class ChatState(TypedDict):
messages: Annotated[List[str], operator.add]
turn_count: Annotated[int, lambda a, b: a + b]
def chat_node(state: ChatState) -> dict:
history = state["messages"]
response = f"Response to: {history[-1] if history else 'Hello'}"
return {
"messages": [response],
"turn_count": 1
}
builder = StateGraph(ChatState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Conversation thread
config = {"configurable": {"thread_id": "conversation-1"}}
# Turn 1
result1 = graph.invoke(
{"messages": ["User: Hello"], "turn_count": 0},
config=config
)
# Turn 2 - continues from previous state
result2 = graph.invoke(
{"messages": ["User: What's the weather?"]},
config=config
)
print(f"Turn count: {result2['turn_count']}") # 2
When using checkpointers, you only need to provide new state changes. The graph automatically loads and merges with the previous state.
Time Travel
Access any point in execution history:
# Get last 10 checkpoints
history = graph.get_state_history(config, limit=10)
for i, checkpoint in enumerate(history):
print(f"Checkpoint {i + 1}:")
print(f" State: {checkpoint.values}")
print(f" Timestamp: {checkpoint.metadata.get('timestamp')}")
Forking Execution
Create alternative timelines by resuming from historical checkpoints:
# Execute a few steps
graph.invoke({"step": 1}, config=config)
graph.invoke({"step": 2}, config=config)
# Get history
history = graph.get_state_history(config)
# Fork from step 1
fork_checkpoint = history[-1] # Oldest checkpoint
fork_config = {
"configurable": {
"thread_id": "experiment-1",
"checkpoint_id": fork_checkpoint.config['configurable']['checkpoint_id']
}
}
# Continue from that point with different input
result = graph.invoke({"alternative_input": "data"}, config=fork_config)
Durability Modes
Control when checkpoints are saved:
| Mode | Behavior | Use Case |
sync | Save before continuing | Maximum safety, slower |
async | Save in background | Balance of safety and speed (default) |
exit | Save only on completion | Maximum speed, less safe |
# Sync - guaranteed persistence at each step
graph = builder.compile(
checkpointer=checkpointer,
durability="sync"
)
# Async - background persistence (default)
graph = builder.compile(
checkpointer=checkpointer,
durability="async"
)
# Exit - only persist at the end
graph = builder.compile(
checkpointer=checkpointer,
durability="exit"
)
Next Steps