> ## Documentation Index
> Fetch the complete documentation index at: https://docs.upsonic.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Running Agents

> Execute agents with different methods

Agents can be executed using different methods depending on your needs - synchronous, asynchronous, or streaming.

## Synchronous Execution

The simplest way to run an agent is using the `do()` method, which executes synchronously and returns the result.

```python theme={null}
from upsonic import Agent, Task

# Create agent
agent = Agent("anthropic/claude-sonnet-4-5")

# Execute with Task object
task = Task("What is the capital of France?")
result = agent.print_do(task)
print(result)  # Output: Paris

# Or execute directly with a string
result = agent.print_do("What is the capital of France?")
print(result)  # Output: Paris
```

## List Input (Multiple Tasks)

`do()`, `do_async()`, `print_do()`, and `print_do_async()` accept a **list of strings** or a **list of `Task` objects**. They run each item in sequence and return a list of string results. A single-element list returns a single string (scalar); an empty list returns an empty list. Mixed lists of `str` and `Task` are supported.

### List of strings

```python theme={null}
from upsonic import Agent

agent = Agent("anthropic/claude-sonnet-4-5")
results = agent.do(["What is 2+2?", "What is 3+3?"])
# results is a list of strings, one per input
for r in results:
    print(r)
```

### List of Task objects

```python theme={null}
from upsonic import Agent, Task

agent = Agent("anthropic/claude-sonnet-4-5")
tasks = [
    Task("What is the capital of France?"),
    Task("What is the capital of Germany?"),
]
results = agent.do(tasks)
# results is a list of strings, one per task
for r in results:
    print(r)
```

## Asynchronous Execution

For concurrent operations or async applications, use `do_async()` which returns a coroutine.

```python theme={null}
from upsonic import Agent, Task
import asyncio

async def main():
    # Create agent
    agent = Agent("anthropic/claude-sonnet-4-5")
    
    # Execute asynchronously (accepts Task or string)
    result = await agent.do_async("Explain quantum computing in simple terms")
    print(result)

# Run async function
asyncio.run(main())
```

## Streaming Text Output

For real-time output, use `stream()` to get responses as they're generated.

```python theme={null}
import asyncio
from upsonic import Agent, Task


async def main():
    # Create agent and task
    agent = Agent("anthropic/claude-sonnet-4-5")
    task = Task("Write a short poem about coding")
    
    # Stream the output
    async for text_chunk in agent.astream(task):
        print(text_chunk, end='', flush=True)
    print()  # New line after streaming


if __name__ == "__main__":
    asyncio.run(main())
```

## Event Streaming

For full visibility into agent execution, use `astream()` (or `stream()` for synchronous code) with `events=True` to receive detailed events about every step of the pipeline.

```python theme={null}
import asyncio
from upsonic import Agent, Task
from upsonic.run.events.events import (
    PipelineStartEvent,
    PipelineEndEvent,
    TextDeltaEvent,
    ToolCallEvent,
    ToolResultEvent,
)
from upsonic.tools import tool

@tool
def calculate(x: int, y: int) -> int:
    """Add two numbers."""
    return x + y

async def main():
    agent = Agent("anthropic/claude-sonnet-4-5")
    task = Task("Calculate 5 + 3", tools=[calculate])
    
    async for event in agent.astream(task, events=True):
        if isinstance(event, PipelineStartEvent):
            print(f"🚀 Starting pipeline with {event.total_steps} steps")
        
        elif isinstance(event, ToolCallEvent):
            print(f"\n🔧 Calling: {event.tool_name}({event.tool_args})")
        
        elif isinstance(event, ToolResultEvent):
            status = "❌" if event.is_error else "✅"
            print(f"\n{status} Result: {event.result_preview}")
        
        elif isinstance(event, TextDeltaEvent):
            print("\nText Delta Event: ", event.content, end='', flush=True)
        
        elif isinstance(event, PipelineEndEvent):
            print(f"\n✅ Completed in {event.total_duration:.2f}s")

asyncio.run(main())
```

### Event Categories

#### Pipeline Events

| Event                | Description                   | Key Attributes                                                               |
| -------------------- | ----------------------------- | ---------------------------------------------------------------------------- |
| `PipelineStartEvent` | Emitted when execution begins | `total_steps`, `is_streaming`, `task_description`                            |
| `PipelineEndEvent`   | Emitted when execution ends   | `total_steps`, `executed_steps`, `total_duration`, `status`, `error_message` |

#### Step Events

| Event            | Description                | Key Attributes                                                   |
| ---------------- | -------------------------- | ---------------------------------------------------------------- |
| `StepStartEvent` | Emitted when a step begins | `step_name`, `step_index`, `step_description`, `total_steps`     |
| `StepEndEvent`   | Emitted when a step ends   | `step_name`, `step_index`, `status`, `execution_time`, `message` |

#### Tool Events

| Event                    | Description                                     | Key Attributes                                                                                         |
| ------------------------ | ----------------------------------------------- | ------------------------------------------------------------------------------------------------------ |
| `ToolCallEvent`          | Emitted when a tool is called                   | `tool_name`, `tool_call_id`, `tool_args`, `tool_index`                                                 |
| `ToolResultEvent`        | Emitted when a tool returns                     | `tool_name`, `tool_call_id`, `result`, `result_preview`, `execution_time`, `is_error`, `error_message` |
| `ExternalToolPauseEvent` | Emitted when execution pauses for external tool | `tool_name`, `tool_call_id`, `tool_args`                                                               |

#### LLM Stream Events

| Event                | Description                              | Key Attributes                                          |
| -------------------- | ---------------------------------------- | ------------------------------------------------------- |
| `TextDeltaEvent`     | Text chunk during streaming              | `content`, `accumulated_content`, `part_index`          |
| `TextCompleteEvent`  | Text streaming complete                  | `content`, `part_index`                                 |
| `ThinkingDeltaEvent` | Reasoning content (for supported models) | `content`, `part_index`                                 |
| `ToolCallDeltaEvent` | Tool call arguments streaming            | `tool_name`, `tool_call_id`, `args_delta`, `part_index` |
| `FinalOutputEvent`   | Final output ready                       | `output`, `output_type`                                 |

#### Initialization & Model Events

| Event                    | Description                             | Key Attributes                                                                  |
| ------------------------ | --------------------------------------- | ------------------------------------------------------------------------------- |
| `AgentInitializedEvent`  | Agent initialized for execution         | `agent_id`, `is_streaming`                                                      |
| `StorageConnectionEvent` | Storage connection established          | `storage_type`, `is_connected`, `has_memory`, `session_id`                      |
| `LLMPreparedEvent`       | LLM manager prepared                    | `default_model`, `requested_model`, `model_changed`                             |
| `ModelSelectedEvent`     | Model selected for execution            | `model_name`, `provider`, `is_override`                                         |
| `ToolsConfiguredEvent`   | Tools configured for the task           | `tool_count`, `tool_names`, `has_mcp_handlers`                                  |
| `MessagesBuiltEvent`     | Request messages built                  | `message_count`, `has_system_prompt`, `has_memory_messages`, `is_continuation`  |
| `ModelRequestStartEvent` | Model request starting                  | `model_name`, `is_streaming`, `has_tools`, `tool_call_count`, `tool_call_limit` |
| `ModelResponseEvent`     | Model response received (non-streaming) | `model_name`, `has_text`, `has_tool_calls`, `tool_call_count`, `finish_reason`  |

#### Context & Input Build Events

These fire once per run as the agent assembles the request sent to the model. Useful for inspecting what the model actually receives (history size, prompt length, attached media).

| Event                    | Description                                  | Key Attributes                                              |
| ------------------------ | -------------------------------------------- | ----------------------------------------------------------- |
| `MemoryPreparedEvent`    | Memory manager preparation finished          | `memory_enabled`, `history_count`                           |
| `ChatHistoryLoadedEvent` | Chat history loaded; run boundary marked     | `history_count`                                             |
| `SystemPromptBuiltEvent` | System prompt assembled                      | `prompt_length`, `has_culture`, `has_skills`                |
| `ContextBuiltEvent`      | Task context assembled (RAG + prior outputs) | `context_length`, `has_knowledge_base`, `has_prior_outputs` |
| `UserInputBuiltEvent`    | User input assembled (text + media)          | `input_type`, `has_images`, `has_documents`, `input_length` |

#### Cache Events

| Event              | Description                         | Key Attributes                                                              |
| ------------------ | ----------------------------------- | --------------------------------------------------------------------------- |
| `CacheCheckEvent`  | Cache checked for existing response | `cache_enabled`, `cache_method`, `cache_hit`, `similarity`, `input_preview` |
| `CacheHitEvent`    | Cache hit occurred                  | `cache_method`, `similarity`, `cached_response_preview`                     |
| `CacheMissEvent`   | Cache miss occurred                 | `cache_method`, `reason`                                                    |
| `CacheStoredEvent` | Response stored in cache            | `cache_method`, `duration_minutes`                                          |

#### Policy Events

| Event                 | Description                 | Key Attributes                                                                     |
| --------------------- | --------------------------- | ---------------------------------------------------------------------------------- |
| `PolicyCheckEvent`    | Policy validation performed | `policy_type`, `action`, `policies_checked`, `content_modified`, `blocked_reason`  |
| `PolicyFeedbackEvent` | Policy feedback for retry   | `policy_type`, `feedback_message`, `retry_count`, `max_retries`, `violated_policy` |

#### Memory, Reflection & Reliability Events

| Event                    | Description                   | Key Attributes                                                                      |
| ------------------------ | ----------------------------- | ----------------------------------------------------------------------------------- |
| `MemoryUpdateEvent`      | Memory updated                | `messages_added`, `memory_type`                                                     |
| `ReflectionEvent`        | Reflection processing applied | `reflection_applied`, `improvement_made`, `original_preview`, `improved_preview`    |
| `ReliabilityEvent`       | Reliability layer processing  | `reliability_applied`, `modifications_made`                                         |
| `ExecutionCompleteEvent` | Execution complete            | `output_type`, `has_output`, `output_preview`, `total_tool_calls`, `total_duration` |

#### Run Lifecycle Events

| Event               | Description                        | Key Attributes                        |
| ------------------- | ---------------------------------- | ------------------------------------- |
| `RunStartedEvent`   | Run started                        | `agent_id`, `task_description`        |
| `RunCompletedEvent` | Run completed successfully         | `agent_id`, `output_preview`          |
| `RunPausedEvent`    | Run paused (for HITL requirements) | `reason`, `requirements`, `step_name` |
| `RunCancelledEvent` | Run cancelled                      | `message`, `step_name`                |

### Common Attributes

All events inherit from `AgentEvent` and share these base attributes:

| Attribute    | Type            | Description                                                    |
| ------------ | --------------- | -------------------------------------------------------------- |
| `event_id`   | `str`           | Unique identifier (8 chars)                                    |
| `run_id`     | `Optional[str]` | The agent run ID this event belongs to (matches Agent.run\_id) |
| `timestamp`  | `datetime`      | When the event occurred                                        |
| `event_type` | `str`           | Class name of the event (property, not a field)                |
| `event_kind` | `str`           | Event category identifier                                      |

### Synchronous Event Streaming

For synchronous code, use `stream()` with `events=True`:

```python theme={null}
from upsonic import Agent, Task
from upsonic.run.events.events import TextDeltaEvent, PipelineStartEvent, PipelineEndEvent

agent = Agent("anthropic/claude-sonnet-4-5")
task = Task("Explain AI briefly")

# Stream events synchronously
for event in agent.stream(task, events=True):
    if isinstance(event, PipelineStartEvent):
        print(f"Starting pipeline with {event.total_steps} steps")
    elif isinstance(event, TextDeltaEvent):
        print(event.content, end='', flush=True)
    elif isinstance(event, PipelineEndEvent):
        print(f"\nCompleted in {event.total_duration:.2f}s")

# After streaming completes, access the final output
# Option 1: From the task (most direct)
final_output = task.response
print(f"\nFinal (from task.response): {final_output}")

# Option 2: From the agent's run output (includes additional metadata)
run_output = agent.get_run_output()
if run_output:
    print(f"Final (from agent.get_run_output().output): {run_output.output}")
    # You can also access other metadata:
    # - run_output.usage (token usage)
    # - run_output.messages (all messages)
    # - run_output.tools (tool executions)
    # - run_output.execution_stats (execution statistics)
```

**Note:** The `stream()` method returns an iterator that yields either:

* `str` chunks when `events=False` (default) - for text streaming
* `AgentStreamEvent` objects when `events=True` - for event streaming

For async streaming, use `astream()` with the same `events` parameter.

## Tool Management

Tools can be added to agents during initialization or dynamically using `add_tools()`. Use `get_tool_defs()` to retrieve all registered tool definitions.

```python theme={null}
from upsonic import Agent
from upsonic.tools import tool

@tool
def calculate(x: int, y: int) -> int:
    """Add two numbers."""
    return x + y

@tool
def another_tool() -> str:
    """Another tool."""
    return "Another tool"

# Add tools during initialization
agent = Agent("anthropic/claude-sonnet-4-5", tools=[calculate])

# Or add tools after initialization
agent.add_tools([another_tool])

# Get all registered tool definitions
tool_defs = agent.get_tool_defs()

print(tool_defs)
```
