Skip to main content

Overview

StateGraph provides comprehensive reliability features:
  • 🔄 Retry Policies - Automatic retry with exponential backoff
  • 💾 Cache Policies - Avoid re-executing expensive operations
  • 🛡️ Durability Modes - Control when state is persisted
  • 🔁 Failure Recovery - Resume from the last successful checkpoint

Retry Policies

Retry policies handle transient failures automatically.

Basic Retry

from upsonic.graphv2 import StateGraph, RetryPolicy

def unstable_node(state: MyState) -> dict:
    """A node that might fail."""
    # Simulate API call
    import random
    if random.random() < 0.3:
        raise ConnectionError("Simulated failure")
    return {"data": "success"}

builder = StateGraph(MyState)

# Add node with retry policy
builder.add_node(
    "fetch_data",
    unstable_node,
    retry_policy=RetryPolicy(
        max_attempts=3,
        initial_interval=1.0,
        backoff_factor=2.0,
        max_interval=30.0,
        jitter=True
    )
)

Retry Configuration

ParameterDefaultDescription
max_attempts3Maximum number of attempts
initial_interval0.5Seconds to wait before first retry
backoff_factor2.0Multiplier for wait time
max_interval128.0Maximum seconds between retries
jitterTrueAdd random variation to intervals
retry_onExceptionWhich exceptions trigger retry

Selective Retry

# Retry only on connection errors
retry_policy = RetryPolicy(
    max_attempts=3,
    retry_on=ConnectionError
)

# Retry on multiple exception types
retry_policy = RetryPolicy(
    max_attempts=3,
    retry_on=(ConnectionError, TimeoutError)
)

Cache Policies

Cache policies avoid re-executing expensive operations.

Basic Caching

from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, CachePolicy, InMemoryCache
import time

# Define the State schema
class State(TypedDict):
    input: str
    output: str

def complex_calculation(input_value: str) -> str:
    """Simulate an expensive computation."""
    # Simulate some computation time
    time.sleep(0.1)
    return f"processed_{input_value}"

def expensive_node(state: State) -> dict:
    """Expensive computation - results are cached."""
    result = complex_calculation(state["input"])
    return {"output": result}

builder = StateGraph(State)

# Add node with cache policy
builder.add_node(
    "compute",
    expensive_node,
    cache_policy=CachePolicy(ttl=300)  # Cache for 5 minutes
)

# Add edges to connect the graph
builder.add_edge(START, "compute")
builder.add_edge("compute", END)

# Compile with cache
cache = InMemoryCache()
graph = builder.compile(cache=cache)

# First call - executes and caches
print("First call (should be slow)...")
start_time = time.time()
result1 = graph.invoke({"input": "test"})
first_duration = time.time() - start_time
print(f"First call took {first_duration:.3f} seconds")
print(f"Result: {result1}")

# Second call with same input - uses cache
print("\nSecond call (should be instant from cache)...")
start_time = time.time()
result2 = graph.invoke({"input": "test"})  # Instant!
second_duration = time.time() - start_time
print(f"Second call took {second_duration:.3f} seconds")
print(f"Result: {result2}")

# Verify results are the same
assert result1 == result2, "Results should be identical"
assert second_duration < first_duration, "Second call should be faster (cached)"

# Test with different input to ensure cache key is working correctly
print("\nThird call with different input (should be slow again)...")
start_time = time.time()
result3 = graph.invoke({"input": "different"})
third_duration = time.time() - start_time
print(f"Third call took {third_duration:.3f} seconds")
print(f"Result: {result3}")
assert result3["output"] == "processed_different", "Different input should produce different output"

# Test cache hit again with original input
print("\nFourth call with original input (should be instant from cache)...")
start_time = time.time()
result4 = graph.invoke({"input": "test"})
fourth_duration = time.time() - start_time
print(f"Fourth call took {fourth_duration:.3f} seconds")
print(f"Result: {result4}")
assert result4 == result1, "Cached result should match original"
assert fourth_duration < third_duration, "Cached call should be faster"

Cache keys are automatically generated from the node’s input state. Same input = cache hit.

Cache Configuration

# Cache forever (no TTL)
cache_policy = CachePolicy(ttl=None)

# Cache for 1 hour
cache_policy = CachePolicy(ttl=3600)

Combined Retry and Cache

Use both for maximum reliability and performance:
builder.add_node(
    "fetch",
    robust_node,
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0),
    cache_policy=CachePolicy(ttl=600)
)
Order of Operations: Retry happens first, then successful results are cached.

Failure Recovery

Resume execution from the last successful checkpoint:
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "job-123"}}

# First attempt - fails at some point
try:
    result = graph.invoke(initial_state, config=config)
except Exception as e:
    print(f"Failed: {e}")

# Fix the issue and resume
result = graph.invoke(updated_state, config=config)

Best Practices

1. Retry Transient Failures Only

# ✅ Good - retry network issues
retry_policy = RetryPolicy(
    retry_on=(ConnectionError, TimeoutError)
)

2. Set Appropriate TTLs

# ✅ Good - TTL matches data volatility
builder.add_node(
    "get_stock_price",
    get_price,
    cache_policy=CachePolicy(ttl=60)  # 1 min for live data
)

3. Choose Right Durability

# Critical operation - sync
builder.compile(checkpointer=cp, durability="sync")

# Normal workflow - async (default)
builder.compile(checkpointer=cp, durability="async")

Next Steps