Metadata-Version: 2.4
Name: agentradar
Version: 1.0.4
Summary: Local-first visual debugger for multi-agent AI systems
Author: Yossef Ayman Zedan
License: MIT
License-File: LICENSE
Keywords: agents,autogen,crewai,debugging,langgraph,llm,observability
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Topic :: Software Development :: Debuggers
Requires-Python: >=3.9
Requires-Dist: aiofiles>=23.0.0
Requires-Dist: click>=8.0.0
Requires-Dist: fastapi>=0.110.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: python-multipart>=0.0.9
Requires-Dist: rich>=13.0.0
Requires-Dist: uvicorn[standard]>=0.27.0
Requires-Dist: websockets>=12.0
Description-Content-Type: text/markdown

# agentradar

**Local-first visual debugger for multi-agent AI systems.**

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

Drop one import into any Python AI pipeline and get a live dashboard - graph, tokens, time travel, failures - no cloud, no setup.

---

## Demo

<!-- Replace with your actual GIF/screenshot -->
![agentradar dashboard demo](https://raw.githubusercontent.com/yossefaymanzedan/agentradar/main/assets/demo.gif)

---

## Dashboard views

<!-- Replace each image path with your actual screenshots -->

| Live Graph | Tokens |
|:---:|:---:|
| ![Live Graph](https://raw.githubusercontent.com/yossefaymanzedan/agentradar/main/assets/screenshot_graph.png) | ![Tokens](https://raw.githubusercontent.com/yossefaymanzedan/agentradar/main/assets/screenshot_tokens.png) |

| Time Travel | Failures |
|:---:|:---:|
| ![Time Travel](https://raw.githubusercontent.com/yossefaymanzedan/agentradar/main/assets/screenshot_timetravel.png) | ![Failures](https://raw.githubusercontent.com/yossefaymanzedan/agentradar/main/assets/screenshot_failures.png) |

| Tab | What you see |
|-----|-------------|
| **Live Graph** | DAG of agents as they run - nodes light up green/red in real time |
| **State** | Every step's input/output with diff vs previous step |
| **Time Travel** | Scrub back through the run like a video timeline |
| **Tokens** | Bar chart per agent + cumulative line + donut breakdown |
| **Failures** | Auto-saved error runs with full traceback, updated live |

---

## Install

### Via pip

```bash
pip install agentradar
```

### Via git clone

```bash
git clone https://github.com/yossefaymanzedan/agentradar.git
cd agentradar
pip install -e .
```

---

## Quickstart

```python
import agentradar   # server starts on :7111, browser opens automatically
```

---

## The `@radar` decorator

For custom pipelines (no framework), use `@radar` for explicit tracing:

```python
from agentradar import radar, reset_run

@radar(tag="researcher")
def researcher(topic: str) -> str:
    ...

@radar(tag="writer", llm_input=["researcher"])
def writer(notes: str) -> str:
    ...

reset_run()
notes = researcher("quantum computing")
article = writer(notes)
```

`llm_input=["tag"]` draws an edge from `researcher -> writer` in the graph.

---

## Pipeline patterns

### Sequential chain

Each agent declares its upstream with `llm_input=["previous_tag"]`:

```python
@radar(tag="researcher")
def researcher(topic): ...

@radar(tag="writer", llm_input=["researcher"])
def writer(notes): ...

@radar(tag="editor", llm_input=["writer"])
def editor(draft): ...
```

Graph: `researcher -> writer -> editor`

---

### Fan-out (parallel agents)

Define a coordinator with no `llm_input`. Children declare `llm_input=["coordinator_tag"]`.
The library **auto-spawns all children in parallel** - no `asyncio.gather` needed:

```python
@radar(tag="coordinator")
async def coordinator(query: str):
    pass  # library spawns children, returns [result_a, result_b, result_c]

@radar(tag="fetch_a", llm_input=["coordinator"])
async def fetch_a(query: str) -> str: ...

@radar(tag="fetch_b", llm_input=["coordinator"])
async def fetch_b(query: str) -> str: ...
```

```python
results = await coordinator(query)  # -> [fetch_a result, fetch_b result]
```

Graph: `coordinator -> fetch_a, fetch_b` (parallel band)

---

### Fan-in (merge point)

Declare multiple parents - edges are drawn from all of them:

```python
@radar(tag="aggregator", llm_input=["fetch_a", "fetch_b", "fetch_c"])
async def aggregator(results: list) -> str: ...
```

Graph: `fetch_a, fetch_b, fetch_c -> aggregator`

---

### Full fan-out + fan-in

```python
@radar(tag="coordinator")
async def coordinator(query): pass

@radar(tag="fetch_a", llm_input=["coordinator"])
async def fetch_a(query): ...

@radar(tag="fetch_b", llm_input=["coordinator"])
async def fetch_b(query): ...

@radar(tag="aggregator", llm_input=["fetch_a", "fetch_b"])
async def aggregator(results): ...

async def main():
    reset_run()
    results = await coordinator(query)   # auto-spawns fetch_a + fetch_b
    summary = await aggregator(results)  # merge point, draws edges from both
```

---

## `@radar` decorator parameters

| Parameter | Type | Description |
|-----------|------|-------------|
| `tag` | `str` | Identifier for this agent. Used to draw edges and auto-spawn children. |
| `llm_input` | `list[str]` | Upstream tags - draws edges and marks parent agents. Single parent = auto-spawn candidate. |
| `notes` | `str` | Freeform annotation shown in the dashboard inspector panel. |

---

## Token tracking

agentradar automatically captures token usage from **LangChain** (`BaseChatModel.invoke` / `ainvoke`) with no extra code. Prompt tokens, completion tokens, and total are shown per agent in the Tokens tab.

For other libraries, return a dict with a `tokens` key:

```python
@radar(tag="my_agent")
def my_agent(prompt):
    response = my_llm.call(prompt)
    return {
        "content": response.text,
        "tokens": {"prompt": 120, "completion": 80, "total": 200}
    }
```

---

## Multi-run across a session

Call `reset_run()` before each pipeline execution to start a fresh run:

```python
from agentradar import reset_run

reset_run()
result = await my_pipeline(input)
```

Without `reset_run()`, all agent calls accumulate into the same run.

---

## Configuration

```python
import agentradar
agentradar.config(
    port=7111,           # dashboard port (auto-increments if taken)
    open_browser=True,   # open browser on start
)
```

Or suppress server start entirely (for CI/tests):

```bash
AGENT_TRACE_NO_SERVER=1 python your_script.py
```

---

## Examples

| File | Pattern |
|------|---------|
| `examples/01_single_agent.py` | Single agent baseline |
| `examples/02_sequential_chain.py` | 4-step linear chain (research -> write -> edit -> publish) |
| `examples/03_parallel_agents.py` | Fan-out coordinator + fan-in aggregator |
| `examples/04_error_handling.py` | Error capture, fallback chains, failure history |
| `examples/05_nested_agents.py` | Deep nested agent calls |
| `examples/06_rag_pipeline.py` | RAG: 3 parallel retrievers -> reranker -> answer |
| `examples/07_code_review_crew.py` | 3 specialist reviewers in parallel |
| `examples/08_debate_agents.py` | Multi-round debate with judge |
| `examples/09_self_healing_pipeline.py` | Retry loop with fallback |

---

## How it works

1. `import agentradar` starts a FastAPI + WebSocket server on a background thread
2. The `@radar` decorator emits `agent_start` / `agent_end` / `agent_error` events into a thread-safe queue
3. The server drains the queue and broadcasts events over WebSocket to the browser
4. The React dashboard renders the live DAG, state diffs, token charts, and failure history in real time
5. All runs are stored as `.agtrace` files in `~/.agentradar/` for offline replay

No cloud. No API keys. No data leaves your machine.

---

## Contributing

Contributions are welcome. Please open an issue first to discuss what you'd like to change.

```bash
git clone https://github.com/yossefaymanzedan/agentradar.git
cd agentradar
pip install -e ".[dev]"
```

---

## License

[MIT](LICENSE) - Yossef Ayman Zedan
