Overview
StateGraph provides advanced features for complex workflows:- 🔀 Send API - Dynamic parallel execution (orchestrator-worker pattern)
- ⚡ Parallel Nodes - Automatic concurrent execution
- 🎯 Task Decorators - Durable functions with retry and cache
Send API
The Send API enables dynamic parallelization where a node can spawn multiple worker instances that execute concurrently.Basic Send Pattern
Copy
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, Send
class State(TypedDict):
items: List[str]
results: Annotated[List[str], operator.add]
def orchestrator(state: State) -> dict:
"""Prepare items for processing."""
return {"items": state["items"]}
def fan_out(state: State) -> List[Send]:
"""Create a worker for each item."""
return [
Send("worker", {"item": item})
for item in state["items"]
]
def worker(state: dict) -> dict:
"""Process a single item."""
item = state["item"]
result = f"processed_{item}"
return {"results": [result]}
# Build graph
builder = StateGraph(State)
builder.add_node("orchestrator", orchestrator)
builder.add_node("worker", worker)
builder.add_edge(START, "orchestrator")
builder.add_conditional_edges("orchestrator", fan_out, ["worker"])
builder.add_edge("worker", END)
graph = builder.compile()
# Execute
result = graph.invoke({
"items": ["a", "b", "c"],
"results": []
})
print(result["results"]) # ['processed_a', 'processed_b', 'processed_c']
Workers execute in parallel and their results are automatically merged using reducers.
Map-Reduce Pattern
Copy
"""
MapReduce pattern implementation using StateGraph with Send objects.
This demonstrates:
1. Dynamic parallelization using Send objects
2. State merging with reducers (operator.add for lists)
3. Automatic deduplication of next nodes after Send execution
"""
from typing import List, Annotated
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, Send
class MapReduceState(TypedDict):
data: List[int]
mapped: Annotated[List[int], operator.add]
reduced: int
def map_phase(state: MapReduceState) -> List[Send]:
"""Map: send each item to a worker.
Returns a list of Send objects, one for each data item.
All Send objects will execute the "mapper" node in parallel.
"""
return [
Send("mapper", {"value": val})
for val in state["data"]
]
def mapper(state: dict) -> dict:
"""Map function: square the value.
Each mapper receives a single value and returns the squared result.
The results are merged using the operator.add reducer.
"""
squared = state["value"] ** 2
return {"mapped": [squared]}
def reduce_phase(state: MapReduceState) -> dict:
"""Reduce: sum all mapped values.
This executes ONCE after all mappers complete, thanks to deduplication.
Even though 5 mappers all route to "reduce", it executes only once.
"""
total = sum(state["mapped"])
return {"reduced": total}
# Build the graph
builder = StateGraph(MapReduceState)
builder.add_node("start", lambda s: {"data": s["data"]})
builder.add_node("mapper", mapper)
builder.add_node("reduce", reduce_phase)
# Define edges
builder.add_edge(START, "start")
builder.add_conditional_edges("start", map_phase, ["mapper"]) # Returns Send objects
builder.add_edge("mapper", "reduce") # All mappers route here (deduplicated)
builder.add_edge("reduce", END)
# Compile the graph
graph = builder.compile()
# Execute
result = graph.invoke({
"data": [1, 2, 3, 4, 5],
"mapped": [],
"reduced": 0
})
# Verify result
expected = sum(x**2 for x in [1, 2, 3, 4, 5]) # 1+4+9+16+25 = 55
assert result['reduced'] == expected, f"Expected {expected}, got {result['reduced']}"
assert result['mapped'] == [1, 4, 9, 16, 25], f"Expected [1,4,9,16,25], got {result['mapped']}"
print(f"✓ Sum of squares: {result['reduced']}") # 1+4+9+16+25 = 55
print(f"✓ Mapped values: {result['mapped']}")
print(f"✓ All tests passed!")
Parallel Node Execution
StateGraph automatically executes nodes in parallel when they have no dependencies:Copy
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END
class ParallelState(TypedDict):
input: str
results_a: Annotated[List[str], operator.add]
results_b: Annotated[List[str], operator.add]
final: str
def setup(state: ParallelState) -> dict:
return {"input": state["input"]}
def process_a(state: ParallelState) -> dict:
result = f"A processed: {state['input']}"
return {"results_a": [result]}
def process_b(state: ParallelState) -> dict:
result = f"B processed: {state['input']}"
return {"results_b": [result]}
def merge(state: ParallelState) -> dict:
combined = state['results_a'] + state['results_b']
return {"final": ", ".join(combined)}
builder = StateGraph(ParallelState)
builder.add_node("setup", setup)
builder.add_node("process_a", process_a)
builder.add_node("process_b", process_b)
builder.add_node("merge", merge)
builder.add_edge(START, "setup")
builder.add_edge("setup", "process_a")
builder.add_edge("setup", "process_b")
builder.add_edge("process_a", "merge")
builder.add_edge("process_b", "merge")
builder.add_edge("merge", END)
graph = builder.compile()
result = graph.invoke({
"input": "test data",
"results_a": [],
"results_b": [],
"final": ""
})
print(result["final"])
Automatic Parallelization: When multiple nodes have the same parent and don’t depend on each other, they execute concurrently.
Task Decorator
The@task decorator creates durable functions with built-in retry and caching:
Basic Task
Copy
from upsonic.graphv2 import task
@task
def expensive_computation(x: int) -> int:
"""A function decorated as a task."""
import time
time.sleep(0.1) # Simulate work
return x * 2
# Use in a node
def my_node(state: MyState) -> dict:
result = expensive_computation(state["value"]).result()
return {"output": result}
Task with Retry and Cache
Copy
from upsonic.graphv2 import task, RetryPolicy, CachePolicy, InMemoryCache
@task(
retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0),
cache_policy=CachePolicy(ttl=300)
)
def robust_expensive_call(param: str) -> str:
"""Retries on failure, caches on success."""
# Simulate API call
if param == "fail":
raise ConnectionError("Simulated failure")
return f"Result for {param}"
# Provide cache to graph
cache = InMemoryCache()
graph = builder.compile(cache=cache)
Next Steps
- Reliability - Master retry and durability modes
- Persistence - Review checkpointing features
- Building Agents - Apply to agent workflows

