Overview
StateGraph provides comprehensive reliability features:
- 🔄 Retry Policies - Automatic retry with exponential backoff
- 💾 Cache Policies - Avoid re-executing expensive operations
- 🛡️ Durability Modes - Control when state is persisted
- 🔁 Failure Recovery - Resume from the last successful checkpoint
Retry Policies
Retry policies handle transient failures automatically.
Basic Retry
from upsonic.graphv2 import StateGraph, RetryPolicy
def unstable_node(state: MyState) -> dict:
"""A node that might fail."""
# Simulate API call
import random
if random.random() < 0.3:
raise ConnectionError("Simulated failure")
return {"data": "success"}
builder = StateGraph(MyState)
# Add node with retry policy
builder.add_node(
"fetch_data",
unstable_node,
retry_policy=RetryPolicy(
max_attempts=3,
initial_interval=1.0,
backoff_factor=2.0,
max_interval=30.0,
jitter=True
)
)
Retry Configuration
| Parameter | Default | Description |
|---|
max_attempts | 3 | Maximum number of attempts |
initial_interval | 0.5 | Seconds to wait before first retry |
backoff_factor | 2.0 | Multiplier for wait time |
max_interval | 128.0 | Maximum seconds between retries |
jitter | True | Add random variation to intervals |
retry_on | Exception | Which exceptions trigger retry |
Selective Retry
# Retry only on connection errors
retry_policy = RetryPolicy(
max_attempts=3,
retry_on=ConnectionError
)
# Retry on multiple exception types
retry_policy = RetryPolicy(
max_attempts=3,
retry_on=(ConnectionError, TimeoutError)
)
Cache Policies
Cache policies avoid re-executing expensive operations.
Basic Caching
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, CachePolicy, InMemoryCache
import time
# Define the State schema
class State(TypedDict):
input: str
output: str
def complex_calculation(input_value: str) -> str:
"""Simulate an expensive computation."""
# Simulate some computation time
time.sleep(0.1)
return f"processed_{input_value}"
def expensive_node(state: State) -> dict:
"""Expensive computation - results are cached."""
result = complex_calculation(state["input"])
return {"output": result}
builder = StateGraph(State)
# Add node with cache policy
builder.add_node(
"compute",
expensive_node,
cache_policy=CachePolicy(ttl=300) # Cache for 5 minutes
)
# Add edges to connect the graph
builder.add_edge(START, "compute")
builder.add_edge("compute", END)
# Compile with cache
cache = InMemoryCache()
graph = builder.compile(cache=cache)
# First call - executes and caches
print("First call (should be slow)...")
start_time = time.time()
result1 = graph.invoke({"input": "test"})
first_duration = time.time() - start_time
print(f"First call took {first_duration:.3f} seconds")
print(f"Result: {result1}")
# Second call with same input - uses cache
print("\nSecond call (should be instant from cache)...")
start_time = time.time()
result2 = graph.invoke({"input": "test"}) # Instant!
second_duration = time.time() - start_time
print(f"Second call took {second_duration:.3f} seconds")
print(f"Result: {result2}")
# Verify results are the same
assert result1 == result2, "Results should be identical"
assert second_duration < first_duration, "Second call should be faster (cached)"
# Test with different input to ensure cache key is working correctly
print("\nThird call with different input (should be slow again)...")
start_time = time.time()
result3 = graph.invoke({"input": "different"})
third_duration = time.time() - start_time
print(f"Third call took {third_duration:.3f} seconds")
print(f"Result: {result3}")
assert result3["output"] == "processed_different", "Different input should produce different output"
# Test cache hit again with original input
print("\nFourth call with original input (should be instant from cache)...")
start_time = time.time()
result4 = graph.invoke({"input": "test"})
fourth_duration = time.time() - start_time
print(f"Fourth call took {fourth_duration:.3f} seconds")
print(f"Result: {result4}")
assert result4 == result1, "Cached result should match original"
assert fourth_duration < third_duration, "Cached call should be faster"
Cache keys are automatically generated from the node’s input state. Same input = cache hit.
Cache Configuration
# Cache forever (no TTL)
cache_policy = CachePolicy(ttl=None)
# Cache for 1 hour
cache_policy = CachePolicy(ttl=3600)
Combined Retry and Cache
Use both for maximum reliability and performance:
builder.add_node(
"fetch",
robust_node,
retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0),
cache_policy=CachePolicy(ttl=600)
)
Order of Operations: Retry happens first, then successful results are cached.
Failure Recovery
Resume execution from the last successful checkpoint:
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "job-123"}}
# First attempt - fails at some point
try:
result = graph.invoke(initial_state, config=config)
except Exception as e:
print(f"Failed: {e}")
# Fix the issue and resume
result = graph.invoke(updated_state, config=config)
Best Practices
1. Retry Transient Failures Only
# ✅ Good - retry network issues
retry_policy = RetryPolicy(
retry_on=(ConnectionError, TimeoutError)
)
2. Set Appropriate TTLs
# ✅ Good - TTL matches data volatility
builder.add_node(
"get_stock_price",
get_price,
cache_policy=CachePolicy(ttl=60) # 1 min for live data
)
3. Choose Right Durability
# Critical operation - sync
builder.compile(checkpointer=cp, durability="sync")
# Normal workflow - async (default)
builder.compile(checkpointer=cp, durability="async")
Next Steps