# Argentic Framework - AI Agent Development Guide

## Framework Overview

Argentic is a Python microframework for building local AI agents with async MQTT messaging.

**Key Features:**
- Modular async architecture with Agent, Messager, ToolManager, Supervisor
- Multi-LLM support: Ollama, Llama.cpp, Google Gemini
- MQTT-based inter-component communication (agents, tools, clients)
- Single-agent and multi-agent (Supervisor) patterns
- Dynamic tool registration via messaging protocol
- Python 3.11+ with async/await throughout

**Use Cases:**
- Single agent with tools (RAG, file ops, APIs)
- Multi-agent systems with specialized workers (researcher, analyst, coder)
- Distributed tool services communicating via MQTT
- Long-running agent processes with graceful shutdown

**Installation:**
```bash
pip install argentic
# or from source
pip install -e .
```

## Core Components

### Agent (`argentic.core.agent.Agent`)
Main AI agent class that orchestrates LLM, messaging, and tools.

**Key Parameters:**
```python
Agent(
    llm: ModelProvider,                    # LLM provider instance
    messager: Messager,                    # MQTT messaging instance
    tool_manager: ToolManager,             # Tool management (optional but recommended)
    role: str = "agent",                   # Agent identifier/role name
    system_prompt: str = None,             # Custom system prompt
    description: str = "",                 # Agent capability description
    expected_output_format: Literal["json", "text", "code"] = "json",
    task_handling: Literal["direct", "llm"] = "llm",
    enable_dialogue_logging: bool = False, # Debug logging
    # Topic configuration for MQTT
    register_topic: str = "agent/tools/register",
    tool_call_topic_base: str = "agent/tools/call",
    tool_response_topic_base: str = "agent/tools/response",
    status_topic: str = "agent/status/info",
    # Advanced
    graph_id: str = None,                  # For multi-agent graphs
    state_mode: AgentStateMode = STATEFUL, # Stateful/stateless LLM
    publish_to_supervisor: bool = True,
    publish_to_agent_topic: bool = True,
)
```

**Key Methods:**
```python
await agent.async_init()                  # Initialize (subscribe to topics)
response = await agent.query(question)    # Direct query
await agent.process_task(task)            # Process AgentTaskMessage
agent.print_dialogue_summary()            # Print debug logs
```

**Lifecycle:**
```python
# 1. Create
agent = Agent(llm, messager, tool_manager, role="assistant", system_prompt="...")
# 2. Init
await agent.async_init()
# 3. Use
result = await agent.query("question")
# 4. Cleanup
await messager.disconnect()
```

### Messager (`argentic.core.messager.Messager`)
MQTT-based async messaging for all components.

**Initialization:**
```python
Messager(
    broker_address: str = "localhost",
    port: int = 1883,
    client_id: str = "",                  # Auto-generated if empty
    username: str = None,
    password: str = None,
    keepalive: int = 60,
)
```

**Key Methods:**
```python
await messager.connect()
await messager.disconnect()
await messager.subscribe(topic, handler, message_cls)
await messager.publish(topic, message_object)
await messager.log(message, level="info")
```

**Message Protocol:**
All messages inherit from `BaseMessage` (Pydantic models):
- `AgentTaskMessage` - Task for agent
- `AgentTaskResultMessage` - Task result from agent
- `RegisterToolMessage` - Tool registration
- `ToolRegisteredMessage` - Registration confirmation
- `TaskMessage` - Tool execution request
- `TaskResultMessage` / `TaskErrorMessage` - Tool execution response

### ToolManager (`argentic.core.tools.ToolManager`)
Manages tool registration and execution via MQTT.

**Initialization:**
```python
ToolManager(
    messager: Messager,
    register_topic: str = "agent/tools/register",
    tool_call_topic_base: str = "agent/tools/call",
    tool_response_topic_base: str = "agent/tools/response",
    status_topic: str = "agent/status/info",
    default_timeout: int = 30,
)
```

**Key Methods:**
```python
await tool_manager.async_init()           # Subscribe to registration topic
tools_desc = tool_manager.get_tools_description()  # LLM-formatted tool list
result = await tool_manager.execute_tool(tool_name, arguments, timeout=30)
```

**Important:** For multi-agent systems, share ONE ToolManager instance across agents:
```python
tool_manager = ToolManager(messager)
await tool_manager.async_init()

agent1 = Agent(llm, messager, tool_manager, role="researcher")
agent2 = Agent(llm, messager, tool_manager, role="coder")  # Same tool_manager!
```

### LLM Providers (`argentic.core.llm.providers`)

**Factory Pattern:**
```python
from argentic.core.llm.llm_factory import LLMFactory

# From config dict
llm = LLMFactory.create_from_config(config_dict)

# Direct instantiation
from argentic.core.llm.providers.google_gemini import GoogleGeminiProvider
from argentic.core.llm.providers.ollama import OllamaProvider

llm = GoogleGeminiProvider(config={"google_gemini_model_name": "gemini-2.0-flash"})
llm = OllamaProvider(config={"ollama_model_name": "gemma3:12b-it-qat"})
```

**Supported Providers:**
- `google_gemini`: Google Gemini API (requires `GOOGLE_GEMINI_API_KEY`)
- `ollama`: Ollama local models
- `llama_cpp_server`: Llama.cpp HTTP server
- `llama_cpp_cli`: Llama.cpp CLI (direct process)

### Supervisor (`argentic.core.graph.supervisor.Supervisor`)
Multi-agent orchestrator with LLM-based routing.

**Initialization:**
```python
Supervisor(
    llm: ModelProvider,
    messager: Messager,
    role: str = "supervisor",
    system_prompt: str = None,            # Custom supervisor logic
    enable_dialogue_logging: bool = False,
)
```

**Usage:**
```python
supervisor = Supervisor(llm=llm, messager=messager)
supervisor.add_agent(researcher_agent)
supervisor.add_agent(coder_agent)
await supervisor.async_init()

# Start task with callback
def completion_callback(task_id, success, result, error):
    print(f"Task {task_id}: {'Success' if success else 'Failed'}")

task_id = await supervisor.start_task("Build a web app", completion_callback)
```

## Quick Start Patterns

### Pattern 1: Single Agent with Direct Query

```python
import asyncio
from argentic import Agent, Messager, LLMFactory
from argentic.core.tools import ToolManager
import yaml

async def main():
    # Load config
    with open("config.yaml") as f:
        config = yaml.safe_load(f)
    
    # Setup LLM
    llm = LLMFactory.create_from_config(config["llm"])
    
    # Setup Messager
    messager = Messager(
        broker_address=config["messaging"]["broker_address"],
        port=config["messaging"]["port"],
    )
    await messager.connect()
    
    # Setup ToolManager
    tool_manager = ToolManager(messager)
    await tool_manager.async_init()
    
    # Create Agent
    agent = Agent(
        llm=llm,
        messager=messager,
        tool_manager=tool_manager,
        role="assistant",
        system_prompt="You are a helpful AI assistant.",
        enable_dialogue_logging=True,  # For debugging
    )
    await agent.async_init()
    
    # Use agent
    response = await agent.query("What is the capital of France?")
    print(response)
    
    # Cleanup
    await messager.disconnect()

if __name__ == "__main__":
    asyncio.run(main())
```

### Pattern 2: Custom Tool Development

```python
from argentic.core.tools.tool_base import BaseTool
from argentic.core.messager.messager import Messager
from pydantic import BaseModel, Field
from typing import Any
import json

# 1. Define input schema with Pydantic
class WeatherInput(BaseModel):
    city: str = Field(description="City name")
    units: str = Field(default="celsius", description="Temperature units")

# 2. Create tool class
class WeatherTool(BaseTool):
    def __init__(self, messager: Messager):
        super().__init__(
            name="weather_tool",
            manual="Get current weather for a city. Provide 'city' name and optionally 'units' (celsius/fahrenheit).",
            api=json.dumps(WeatherInput.model_json_schema()),
            argument_schema=WeatherInput,
            messager=messager,
        )
    
    async def _execute(self, **kwargs) -> Any:
        """Implement tool logic. Called when agent invokes this tool."""
        city = kwargs.get("city")
        units = kwargs.get("units", "celsius")
        
        # Your implementation here
        # Example: API call, database query, file operation, etc.
        result = f"Weather in {city}: 22°{units[0].upper()}"
        
        return result

# 3. Register tool
async def setup_tool(messager):
    tool = WeatherTool(messager)
    await tool.register(
        registration_topic="agent/tools/register",
        status_topic="agent/status/info",
        call_topic_base="agent/tools/call",
        response_topic_base="agent/tools/response",
    )
    return tool

# 4. Use in agent context
async def main():
    messager = Messager(broker_address="localhost", port=1883)
    await messager.connect()
    
    tool_manager = ToolManager(messager)
    await tool_manager.async_init()
    
    # Register tool
    tool = await setup_tool(messager)
    await asyncio.sleep(1)  # Wait for registration
    
    # Create agent (will auto-discover tool)
    agent = Agent(llm, messager, tool_manager, role="assistant")
    await agent.async_init()
    
    # Agent can now use the tool
    response = await agent.query("What's the weather in London?")
```

### Pattern 3: Multi-Agent System with Supervisor

```python
from argentic.core.graph.supervisor import Supervisor
from argentic import Agent, Messager, LLMFactory
from argentic.core.tools import ToolManager
import asyncio

async def main():
    # Setup (same as single agent)
    llm = LLMFactory.create_from_config(config["llm"])
    messager = Messager(broker_address="localhost", port=1883)
    await messager.connect()
    
    tool_manager = ToolManager(messager)
    await tool_manager.async_init()
    
    # Create specialized agents
    researcher = Agent(
        llm=llm,
        messager=messager,
        tool_manager=tool_manager,
        role="researcher",
        description="Research and information gathering specialist",
        system_prompt="You are a researcher. Find and synthesize information.",
        expected_output_format="text",
        register_topic="agent/researcher/tools/register",  # Separate topics
        tool_call_topic_base="agent/researcher/tools/call",
        tool_response_topic_base="agent/researcher/tools/response",
        status_topic="agent/researcher/status/info",
        graph_id="multi_agent_system",
    )
    await researcher.async_init()
    
    coder = Agent(
        llm=llm,
        messager=messager,
        tool_manager=tool_manager,
        role="coder",
        description="Code writing and debugging specialist",
        system_prompt="You are a coder. Write clean, efficient code.",
        expected_output_format="code",
        register_topic="agent/coder/tools/register",
        tool_call_topic_base="agent/coder/tools/call",
        tool_response_topic_base="agent/coder/tools/response",
        status_topic="agent/coder/status/info",
        graph_id="multi_agent_system",
    )
    await coder.async_init()
    
    # Create supervisor
    supervisor = Supervisor(
        llm=llm,
        messager=messager,
        role="supervisor",
        system_prompt="You are a supervisor. Route tasks to appropriate agents.",
    )
    supervisor.add_agent(researcher)
    supervisor.add_agent(coder)
    await supervisor.async_init()
    
    # Execute workflow
    workflow_complete = asyncio.Event()
    result_data = {}
    
    def completion_callback(task_id, success, result="", error=""):
        result_data["success"] = success
        result_data["result"] = result
        result_data["error"] = error
        workflow_complete.set()
    
    task_id = await supervisor.start_task(
        "Research Python async patterns and write example code",
        completion_callback
    )
    
    await asyncio.wait_for(workflow_complete.wait(), timeout=180)
    
    if result_data["success"]:
        print(f"Success: {result_data['result']}")
    else:
        print(f"Failed: {result_data['error']}")
    
    await messager.disconnect()

if __name__ == "__main__":
    asyncio.run(main())
```

## Configuration

### config.yaml Structure

```yaml
llm:
  # Provider selection (one of: google_gemini, ollama, llama_cpp_server, llama_cpp_cli)
  provider: google_gemini
  
  # Google Gemini
  google_gemini_model_name: gemini-2.0-flash
  google_gemini_api_key: ${GOOGLE_GEMINI_API_KEY}  # Use env var
  google_gemini_parameters:
    temperature: 0.7
    top_p: 0.95
    top_k: 40
    max_output_tokens: 2048
  
  # Ollama
  ollama_model_name: llama3
  ollama_base_url: http://localhost:11434
  ollama_use_chat_model: true
  ollama_parameters:
    temperature: 0.7
    num_predict: 128
    top_p: 0.9
  
  # Llama.cpp Server
  llama_cpp_server_host: 127.0.0.1
  llama_cpp_server_port: 5000
  llama_cpp_server_auto_start: false
  llama_cpp_server_binary: ~/llama.cpp/build/bin/llama-server
  llama_cpp_server_args:
    - -m
    - ~/models/model.gguf
  
  # Llama.cpp CLI
  llama_cpp_cli_binary: ~/llama.cpp/build/bin/llama-cli
  llama_cpp_cli_model_path: ~/models/model.gguf
  llama_cpp_cli_args:
    - --temp
    - "0.7"

messaging:
  protocol: mqtt
  broker_address: localhost
  port: 1883
  keepalive: 60
  client_id: ""  # Auto-generated if empty
  username: null
  password: null

topics:
  # Tool communication
  tools:
    register: "agent/tools/register"
    call: "agent/tools/call"
    response_base: "agent/tools/response"
    status: "agent/tools/status"
  
  # Agent communication
  commands:
    ask_question: "agent/command/ask_question"
  responses:
    answer: "agent/response/answer"
  
  # Logging
  log: "agent/log"
```

### Environment Variables (.env)

```bash
# Google Gemini
GOOGLE_GEMINI_API_KEY=your_api_key_here

# MQTT (if using authentication)
MQTT_USERNAME=username
MQTT_PASSWORD=password

# Logging
LOG_LEVEL=INFO
CONFIG_PATH=config.yaml
```

## Important Details

### Tool Registration Flow

**Step-by-step protocol:**

1. **Tool → ToolManager**: Tool publishes `RegisterToolMessage` to `agent/tools/register`
   ```python
   RegisterToolMessage(tool_name="my_tool", manual="description", api=schema_json)
   ```

2. **ToolManager internal**:
   - Generates unique `tool_id` (UUID)
   - Stores tool metadata
   - Subscribes to `agent/tools/response/{tool_id}` for results

3. **ToolManager → Tool**: Publishes `ToolRegisteredMessage` to `agent/status/info`
   ```python
   ToolRegisteredMessage(tool_name="my_tool", tool_id="uuid-1234...")
   ```

4. **Tool receives confirmation**:
   - Stores assigned `tool_id`
   - Subscribes to `agent/tools/call/{tool_id}` for task requests
   - Tool is now active

### Tool Execution Flow

1. **Agent → ToolManager**: `agent.query()` → LLM decides to use tool
2. **ToolManager → Tool**: Publishes `TaskMessage` to `agent/tools/call/{tool_id}`
   ```python
   TaskMessage(task_id="uuid", tool_id="uuid", arguments={"param": "value"})
   ```
3. **Tool executes**: Validates args, runs `_execute()`, publishes result
4. **Tool → ToolManager**: Publishes `TaskResultMessage` to `agent/tools/response/{tool_id}`
5. **ToolManager → Agent**: Returns result to agent logic
6. **Agent → LLM**: Provides tool result, LLM generates final response

### Async Patterns

**All operations are async/await:**
```python
# Always use await
await messager.connect()
await agent.async_init()
result = await agent.query(question)
await tool_manager.execute_tool(name, args)

# Proper cleanup
try:
    await agent.query(question)
finally:
    await messager.disconnect()
```

**Graceful Shutdown:**
```python
import signal

async def shutdown(messager):
    print("Shutting down...")
    await messager.disconnect()

def signal_handler(sig, frame):
    asyncio.create_task(shutdown(messager))

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
```

### Message Protocol Classes

**Import from `argentic.core.protocol`:**
```python
from argentic.core.protocol.message import (
    AgentTaskMessage,
    AgentTaskResultMessage,
    BaseMessage,
)
from argentic.core.protocol.task import (
    TaskMessage,
    TaskResultMessage,
    TaskErrorMessage,
)
from argentic.core.protocol.tool import (
    RegisterToolMessage,
    ToolRegisteredMessage,
    ToolCallRequest,
)
```

**All inherit from Pydantic BaseModel** - automatic validation.

### Best Practices

1. **Shared ToolManager for Multi-Agent:**
   ```python
   # GOOD: One instance shared
   tool_manager = ToolManager(messager)
   agent1 = Agent(llm, messager, tool_manager, ...)
   agent2 = Agent(llm, messager, tool_manager, ...)
   
   # BAD: Multiple instances
   agent1 = Agent(llm, messager, ToolManager(messager), ...)
   agent2 = Agent(llm, messager, ToolManager(messager), ...)
   ```

2. **Clear System Prompts:**
   ```python
   # GOOD: Clear role definition
   system_prompt = "You are a researcher. Find and synthesize information on any topic."
   
   # BAD: Vague
   system_prompt = "You are helpful."
   ```

3. **Enable Dialogue Logging for Debug:**
   ```python
   agent = Agent(..., enable_dialogue_logging=True)
   # Later
   agent.print_dialogue_summary()
   ```

4. **Handle MQTT Lifecycle:**
   ```python
   try:
       await messager.connect()
       # Use agents/tools
   finally:
       await messager.disconnect()
   ```

5. **Validate Tool Inputs:**
   Always use Pydantic schemas for tool arguments - automatic validation.

6. **Separate Topics for Multi-Agent:**
   Each agent should have its own topic namespace to avoid message conflicts.

7. **Use .env for Secrets:**
   Never hardcode API keys - use environment variables.

## Common Patterns

### Environment Setup

```bash
# Install
pip install argentic

# From source
git clone https://github.com/angkira/argentic.git
cd argentic
pip install -e .

# With dev dependencies
pip install -e ".[dev]"
```

### MQTT Broker Setup

```bash
# Docker (recommended)
docker run -d -p 1883:1883 --name mosquitto eclipse-mosquitto:2.0

# Ubuntu/Debian
sudo apt install mosquitto mosquitto-clients
sudo systemctl start mosquitto
```

### Running Argentic Components

```bash
# CLI commands (after installation)
argentic agent --config-path config.yaml --log-level INFO
argentic rag --config-path config.yaml
argentic environment --config-path config.yaml
argentic cli --config-path config.yaml

# Python module
python -m argentic agent --config-path config.yaml
python -m argentic rag
python -m argentic cli

# Help
argentic --help
argentic agent --help
```

### Project Structure

```
my_project/
├── config.yaml           # Configuration
├── .env                  # API keys (gitignored)
├── main.py              # Main agent script
├── tools/
│   ├── weather_tool.py
│   └── file_tool.py
└── agents/
    ├── researcher.py
    └── analyst.py
```

### Import Patterns

```python
# Top-level imports
from argentic import Agent, Messager, LLMFactory

# Core modules
from argentic.core.tools import ToolManager, BaseTool
from argentic.core.graph.supervisor import Supervisor
from argentic.core.protocol.message import AgentTaskMessage

# LLM providers
from argentic.core.llm.providers.google_gemini import GoogleGeminiProvider
from argentic.core.llm.providers.ollama import OllamaProvider
```

### Testing Tool Independently

```python
# Test tool without agent
async def test_tool():
    messager = Messager(broker_address="localhost")
    await messager.connect()
    
    tool = MyTool(messager)
    
    # Direct execution (bypass MQTT)
    result = await tool._execute(param="test_value")
    print(result)
    
    await messager.disconnect()

asyncio.run(test_tool())
```

## Advanced Topics

### Built-in Tools

Argentic includes ready-to-use tools:

```python
# RAG Tool
from argentic.tools.RAG import KnowledgeBaseTool

# Environment Tool
from argentic.tools.Environment import EnvironmentTool

# Google Search Tool
from argentic.tools.Google import GoogleSearchTool
```

### Endless Cycle Support

For long-running agents:
```python
agent = Agent(
    ...,
    adaptive_max_iterations=True,
    max_consecutive_tool_calls=3,
    tool_call_window_size=5,
    enable_completion_analysis=True,
)
```

### State Management

```python
# Stateful (default) - maintains conversation history
agent = Agent(..., state_mode=AgentStateMode.STATEFUL)

# Stateless - each query independent
agent = Agent(..., state_mode=AgentStateMode.STATELESS)
```

### Custom Message Handling

```python
async def custom_handler(message: CustomMessage):
    print(f"Received: {message}")

await messager.subscribe("custom/topic", custom_handler, CustomMessage)
```

## Troubleshooting

**MQTT Connection Failed:**
- Ensure mosquitto is running: `docker ps` or `systemctl status mosquitto`
- Check firewall: port 1883 must be open
- Verify broker_address in config.yaml

**Tool Not Registered:**
- Wait 1-2 seconds after registration: `await asyncio.sleep(1)`
- Check topics match between tool and ToolManager
- Enable logging: `enable_dialogue_logging=True`

**LLM API Errors:**
- Verify API key in .env: `GOOGLE_GEMINI_API_KEY`
- Check model name in config
- Test LLM independently before agent integration

**Tool Timeout:**
- Increase timeout: `await tool_manager.execute_tool(..., timeout=60)`
- Check tool execution time
- Ensure tool publishes result message

**Multi-Agent Not Routing:**
- Verify supervisor system_prompt includes routing logic
- Check all agents have unique roles
- Enable dialogue logging on supervisor

## Quick Reference

**Core Classes:**
- `Agent` - AI agent with LLM and tools
- `Messager` - MQTT messaging
- `ToolManager` - Tool registry and execution
- `Supervisor` - Multi-agent coordinator
- `BaseTool` - Tool base class

**Key Imports:**
```python
from argentic import Agent, Messager, LLMFactory
from argentic.core.tools import ToolManager, BaseTool
from argentic.core.graph.supervisor import Supervisor
```

**Essential Methods:**
```python
await messager.connect()
await tool_manager.async_init()
await agent.async_init()
result = await agent.query(question)
task_id = await supervisor.start_task(task, callback)
```

**Documentation:**
- README: Framework overview and setup
- examples/: Working examples (single_agent_example.py, multi_agent_example.py)
- docs/: Full documentation site
- ARGENTIC_QUICKREF.md: Extended reference (this directory)

