Metadata-Version: 2.4
Name: kivot-agents
Version: 0.1.0
Summary: Kivot agent execution: canonical events, budget tracking, durable execution
Project-URL: Documentation, https://github.com/kivot-platform/kivot/tree/main/docs
Project-URL: Repository, https://github.com/kivot-platform/kivot
Requires-Python: >=3.11
Description-Content-Type: text/markdown
Requires-Dist: openai-agents>=0.0.16
Requires-Dist: aiohttp>=3.9.0
Provides-Extra: durable
Requires-Dist: temporalio>=1.7.0; extra == "durable"
Provides-Extra: all
Requires-Dist: kivot-agents[durable]; extra == "all"
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Requires-Dist: ruff>=0.1.0; extra == "dev"
Provides-Extra: examples

# Agent Workflows (Python)

Python Temporal worker for agent execution using **pure OpenAI Agents SDK** with KivotRunner.

## Architecture

```
Client → Go HTTP API → Python AgentWorkflow (Temporal) → Go Activities (Model/Tool) → Events (Kafka)
```

- **Python AgentWorkflow**: Workflow using pure OpenAI Agents SDK
- **KivotRunner**: Drop-in replacement for official Runner, routes to Go activities
- **Go Activities**: All I/O operations (model calls, tool calls, event emission)
- **Cross-language**: Python workflow calls Go activities via Temporal

## Project Structure

```
openai-agents/
├── kivot-agents/              # KivotRunner + Plugin implementation
│   ├── __init__.py
│   ├── runner.py                 # Main runner logic
│   ├── plugin.py                 # OpenAI Agents Plugin (NEW!)
│   ├── overrides.py              # Runtime configuration context (NEW!)
│   ├── types.py                  # AgentResult type
│   ├── runner_test.py            # Unit tests (14 passing)
│   └── plugin_test.py            # Plugin tests (7 passing, NEW!)
├── examples/
│   ├── README.md                 # Examples guide
│   ├── plugin_example.py         # Example: Streaming Path
│   ├── result_only_example.py    # Example: Result Path
│   └── both_paths_example.py     # Example: Using both paths
├── docs/
│   └── streaming-vs-result.md    # Guide: Two data paths (NEW!)
├── worker.py                     # Temporal worker with plugin
├── requirements.txt
├── pyproject.toml                # Package config with pytest, mypy
├── TODO.md                       # Known issues and future improvements
└── README.md
```

## Quick Start

### 1. Write Agent Workflow (Pure OpenAI SDK!)

```python
from agents import Agent
from kivot-agents import KivotRunner
from temporalio import workflow

@workflow.defn
class MyAgent:
    @workflow.run
    async def run(self, prompt: str) -> str:
        # Pure OpenAI Agents SDK code!
        agent = Agent(
            name="Assistant",
            instructions="You are helpful.",
            model="gpt-4o-mini",
        )

        result = await KivotRunner.run(agent, input=prompt)
        return result.final_output
```

That's it! No Kivot-specific wrappers.

### 2. Development Setup

#### Prerequisites

- Python 3.11+
- Temporal server running (via docker-compose)
- Go worker running (for activities)

#### Install Dependencies

```bash
cd openai-agents
python -m venv venv
source venv/bin/activate  # or `venv\Scripts\activate` on Windows
pip install -r requirements.txt
```

#### Run Example Worker

Each example includes its own worker mode:

```bash
# Option 1: Streaming example (interactive UI)
python examples/plugin_example.py worker

# Option 2: Result example (tests/API)
python examples/result_only_example.py worker

# Option 3: Both paths example (production)
python examples/both_paths_example.py worker
```

The worker will:
1. Connect to Temporal at `localhost:7233` (configurable via `TEMPORAL_HOST`)
2. Listen on task queue `agent-workflows`
3. Execute agent workflows started by Go HTTP API

### 3. Test Your Agent

Start a run:

```bash
curl -X POST http://localhost:8080/v1/runs/start \
  -H 'Content-Type: application/json' \
  -d '{
    "agent_id": "demo-agent",
    "input": "Hello!"
  }'
```

Stream events:

```bash
wscat -c "ws://localhost:8080/v1/runs/stream?run_id=<run_id>"
```

Check Temporal UI:

```bash
open http://localhost:8088
```

## How KivotRunner Works

```
Developer Code (Pure OpenAI SDK)
    Agent(...) + KivotRunner.run(agent, input)
        ↓
KivotRunner (Kivot Integration)
    Agent loop: model call → tool calls → model call → completion
        ↓ ModelStepActivity (Go)
        ↓ ToolStepActivity (Go)
            ↓
        ┌───────────────────────────────┐
        │   TWO INDEPENDENT PATHS:      │
        ├───────────────────────────────┤
        │ PATH 1: Events → Kafka →      │
        │         WebSocket (Streaming) │
        │                               │
        │ PATH 2: Result → Workflow     │
        │         Return (Final Answer) │
        └───────────────────────────────┘
```

**Inside workflow**: Routes to Go activities for durable execution

**Outside workflow**: Falls back to official Runner (perfect for local testing)

## Two Independent Data Paths

Kivot provides **two ways** to get data from agent workflows:

### Path 1: Streaming (Kafka → WebSocket)
**Purpose:** Real-time events for interactive UX

```python
# Stream events as they happen
async with session.ws_connect(ws_url) as ws:
    async for event in ws:
        if event["type"] == "model.delta":
            print(event["delta"]["content"], end="")  # Typing effect!
```

**Use cases:**
- Web UI with typing effect
- CLI with progress indicators
- Real-time monitoring

### Path 2: Result (Temporal Workflow)
**Purpose:** Final answer for programmatic access

```python
# Get final result
result = await client.execute_workflow(
    SimpleAgent.run,
    "What is 2+2?",
    id="calc-123",
    task_queue="agent-workflows"
)
print(result)  # "The answer is 4"
```

**Use cases:**
- Unit tests
- API integrations
- Batch processing

### Both Paths Work Simultaneously!

```python
# Start workflow (PATH 2)
handle = await client.start_workflow(Agent.run, question)

# Stream events (PATH 1) in background
asyncio.create_task(stream_events(handle.id))

# Await final result (PATH 2)
result = await handle.result()
```

**📖 Full guide:** [`docs/streaming-vs-result.md`](docs/streaming-vs-result.md)

## Agent Loop Example

### Simple Agent (No Tools)

```python
result = await KivotRunner.run(agent, input="Hello")

# KivotRunner internally:
# 1. Call Go ModelStepActivity
# 2. Return final output
```

### Agent with Tools

```python
result = await KivotRunner.run(agent, input="Search logs for errors")

# KivotRunner agent loop:
# Iteration 1:
#   1. Call Go ModelStepActivity (analyze request)
#   2. Detect tool_calls in response
#   3. For each tool call:
#      - Call Go ToolStepActivity
#      - Add tool result to message history
#
# Iteration 2:
#   4. Call Go ModelStepActivity (with tool results)
#   5. Return final output (no more tool_calls)
```

## Testing

### Unit Tests

Run all tests:

```bash
cd openai-agents
pytest kivot-agents/ -v
```

**Test results**: 21 passed, 1 skipped

#### Runner Tests (`runner_test.py`)

```bash
pytest kivot-agents/runner_test.py -v
```

**Coverage**:
- Message preparation (string, dict, messages format)
- Tool formatting (dict, callable, to_dict method)
- Agent loop (simple completion, tool calls, max iterations)

**Results**: 14 passed, 1 skipped

#### Plugin Tests (`plugin_test.py`)

```bash
pytest kivot-agents/plugin_test.py -v
```

**Coverage**:
- Plugin initialization with defaults
- Custom parameters configuration
- Context manager (set_kivot_agent_overrides)
- Nested contexts
- Task queue and policy refs

**Results**: 7 passed

### Run All Tests

```bash
# All tests
pytest kivot-agents/ -v

# Or just use pytest (configured in pyproject.toml)
pytest
```

## Examples

We provide **3 comprehensive examples** demonstrating different data access patterns:

| Example | Focus | Use Case |
|---------|-------|----------|
| **`plugin_example.py`** | 🌊 Streaming Path | Interactive UI with real-time events |
| **`result_only_example.py`** | 📦 Result Path | Tests, API, batch processing |
| **`both_paths_example.py`** | 🔀 Both Paths | Production APIs with streaming + storage |

### Quick Start: Plugin Example (Streaming)

**File**: `examples/plugin_example.py`

```python
# Worker setup with plugin
plugin = OpenAIAgentsKivotPlugin(
    model_params=ActivityParameters(
        start_to_close_timeout=timedelta(seconds=90)
    )
)
client = await Client.connect("localhost:7233", plugins=[plugin])
worker = Worker(client, task_queue="agent-workflows", workflows=[...])

# Run and stream events
python examples/plugin_example.py worker   # Terminal 1
python examples/plugin_example.py client   # Terminal 2 - see streaming!
```

### Quick Start: Result Only (Programmatic)

**File**: `examples/result_only_example.py`

```python
# Simple await - no streaming complexity
result = await client.execute_workflow(
    Agent.run,
    "What is 2+2?",
    id="calc-123",
    task_queue="agent-workflows"
)
print(result)  # "The answer is 4"

# Run examples
python examples/result_only_example.py worker     # Terminal 1
python examples/result_only_example.py examples   # Terminal 2
```

### Quick Start: Both Paths (Production)

**File**: `examples/both_paths_example.py`

```python
# Start workflow
handle = await client.start_workflow(Agent.run, question)

# Stream events in background
asyncio.create_task(stream_events(handle.id))

# Await final result
result = await handle.result()

# Run demo
python examples/both_paths_example.py worker  # Terminal 1
python examples/both_paths_example.py demo    # Terminal 2
```

**📖 Full examples guide:** [`examples/README.md`](examples/README.md)

## Environment Variables

- `TEMPORAL_HOST`: Temporal server address (default: `localhost:7233`)
- `TASK_QUEUE`: Task queue name (default: `openai-agents`)

## Implementation Status

### ✅ Phase 1-2: KivotRunner Implementation (Complete)

- [x] Package structure created (`kivot-agents/`)
- [x] `KivotRunner` class with agent loop
- [x] Helper functions for messages and tools
- [x] Type hints and error handling
- [x] Comprehensive unit tests (14 passing)
- [x] Falls back to official Runner outside workflow

### ✅ Phase 3: Examples & Documentation (Complete)

- [x] Simple agent example (`examples/simple_agent.py`)
- [x] Ops agent with tools (`examples/ops_agent.py`)
- [x] Documentation updated (`docs/AGENT_WORKFLOWS.md`)
- [x] Migration guide from old approach
- [x] Worker updated to use new examples
- [x] Legacy workflows removed

### ✅ Phase 4: Plugin Integration (Complete)

- [x] `OpenAIAgentsKivotPlugin` class (`plugin.py`)
- [x] Runtime configuration context (`overrides.py`)
- [x] Plugin unit tests (7 passing)
- [x] Worker integration with plugin
- [x] Example: Streaming Path (`plugin_example.py`)
- [x] Example: Result Path (`result_only_example.py`)
- [x] Example: Both Paths (`both_paths_example.py`)
- [x] Documentation: Two data paths (`docs/streaming-vs-result.md`)
- [x] Examples guide (`examples/README.md`)

### 🔧 Phase 5: Integration & Testing (Next)

- [ ] Integration tests with actual Temporal workflows
- [ ] E2E tests with Go activities
- [ ] Performance testing
- [ ] Error scenario testing

## Why This Approach?

### ✅ CORRECT Developer Experience

```python
from agents import Agent
from kivot-agents import KivotRunner

agent = Agent(name="Assistant", instructions="You are helpful.")
result = await KivotRunner.run(agent, input=prompt)
```

**Benefits**:
- Pure OpenAI SDK code
- Compatible with official Temporal integration examples
- Easy to port from https://temporal.io/blog/announcing-openai-agents-sdk-integration
- No Kivot-specific wrappers in developer code
- Works locally without Temporal (falls back to official Runner)

### ❌ WRONG (Old Approach)

```python
# Kivot-specific code
result = await self._call_model(agent_config, user_question)
```

**Problems**:
- Not compatible with OpenAI SDK
- Can't use official examples
- Higher learning curve
- Kivot lock-in

## References

- **Developer Guide**: `../docs/AGENT_WORKFLOWS.md`
- **Integration Plan**: `../docs/agent-service-integration-plan.md`
- **Integration Approach**: `../docs/INTEGRATION-APPROACH.md`
- **Temporal + OpenAI**: https://temporal.io/blog/announcing-openai-agents-sdk-integration
- **Official Examples**: https://github.com/temporalio/samples-python/tree/main/openai_agents
- **Temporal Python SDK**: https://docs.temporal.io/develop/python
- **OpenAI Agents SDK**: https://github.com/openai/openai-agents-python
