# DEVELOPER GUIDE: solace_agent_mesh

## Quick Summary
The `solace_agent_mesh` directory contains the core implementation of a distributed AI agent communication system built on the Solace event mesh. It provides a complete framework for hosting Google ADK (Agent Development Kit) agents with Agent-to-Agent (A2A) protocol support, enabling real-time communication, task delegation, and multi-platform integration.

The architecture consists of four main subsystems: `agent/` for hosting ADK agents with comprehensive tool libraries, `common/` for foundational A2A protocol infrastructure, `core_a2a/` for reusable service layers, and `gateway/` for external platform integration. These components work together to create a distributed AI agent ecosystem with capabilities for data analysis, multimedia processing, web integration, and inter-agent collaboration.

## Files and Subdirectories Overview
- **Direct files:**
  - `__init__.py`: Empty package initialization file
  - `llm.txt`: Comprehensive developer guide documentation
  - `llm_detail.txt`: Detailed concatenated documentation from all subdirectories
- **Subdirectories:**
  - `agent/`: Complete ADK agent hosting framework with A2A protocol integration and comprehensive tool library
  - `common/`: Foundational A2A protocol infrastructure, type systems, and client/server implementations
  - `core_a2a/`: Reusable service layer for core A2A interactions and agent registry operations
  - `gateway/`: Gateway framework with HTTP/SSE, Slack, and Webhook implementations for external platform integration

## Developer API Reference

### Direct Files

#### __init__.py
**Purpose:** Standard Python package initializer that allows the `solace_agent_mesh` directory to be treated as a package
**Import:** `import solace_agent_mesh`

**Classes/Functions/Constants:** [None - empty file]

#### llm.txt
**Purpose:** Contains comprehensive developer guide documentation for the entire system
**Import:** Not applicable - documentation file

**Classes/Functions/Constants:** [None - documentation file]

#### llm_detail.txt
**Purpose:** Contains detailed concatenated documentation from all subdirectories
**Import:** Not applicable - documentation file

**Classes/Functions/Constants:** [None - documentation file]

### Subdirectory APIs

#### agent/
**Purpose:** Provides a complete framework for hosting Google ADK agents with A2A protocol support and a comprehensive, extensible tool library
**Key Exports:** `SamAgentApp`, `SamAgentComponent`, `AppLlmAgent`, and a wide array of built-in tools for data analysis, web requests, multimedia processing, and inter-agent communication
**Import Examples:**
```python
from solace_agent_mesh.agent.sac.app import SamAgentApp
from solace_agent_mesh.agent.sac.component import SamAgentComponent
from solace_agent_mesh.agent.adk.app_llm_agent import AppLlmAgent
from solace_agent_mesh.agent.tools.builtin_data_analysis_tools import query_data_with_sql
from solace_agent_mesh.agent.tools.peer_agent_tool import PeerAgentTool
from solace_agent_mesh.agent.tools.web_tools import web_request
from solace_agent_mesh.agent.tools.image_tools import create_image_from_description
```

#### common/
**Purpose:** Provides the foundational infrastructure for Agent-to-Agent (A2A) communication, including the core protocol, data types, message translation, and client/server implementations
**Key Exports:** A2A protocol functions, Pydantic type definitions (`Message`, `Task`, `AgentCard`), `A2AClient` for interacting with agents, `A2AServer` for building agents, and various utilities
**Import Examples:**
```python
from solace_agent_mesh.common.a2a_protocol import get_agent_request_topic
from solace_agent_mesh.common.types import Message, Task, AgentCard, TextPart
from solace_agent_mesh.common.client import A2AClient, A2ACardResolver
from solace_agent_mesh.common.server import A2AServer, InMemoryTaskManager
from solace_agent_mesh.common.agent_registry import AgentRegistry
from solace_agent_mesh.common.utils.embeds import resolve_embeds_recursively_in_string
```

#### core_a2a/
**Purpose:** Provides a reusable, decoupled service layer for core A2A interactions, handling task submission, cancellation, and agent discovery
**Key Exports:** `CoreA2AService` for managing A2A protocol logic without being tied to a specific gateway or messaging implementation
**Import Examples:**
```python
from solace_agent_mesh.core_a2a.service import CoreA2AService
```

#### gateway/
**Purpose:** Provides a framework and multiple implementations for building gateways that bridge external platforms (like web UIs, Slack, or webhooks) with the A2A messaging system
**Key Exports:** `BaseGatewayApp` and `BaseGatewayComponent` for creating custom gateways, and concrete implementations like `WebUIBackendApp`, `SlackGatewayApp`, and `WebhookGatewayApp`
**Import Examples:**
```python
from solace_agent_mesh.gateway.base.app import BaseGatewayApp
from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp
from solace_agent_mesh.gateway.slack.app import SlackGatewayApp
from solace_agent_mesh.gateway.webhook.app import WebhookGatewayApp
```

## Complete Usage Guide

### 1. Setting Up a Complete AI Agent System

This example demonstrates how to create a comprehensive AI agent system with multiple components working together.

```python
# Step 1: Create an ADK-powered agent
from solace_agent_mesh.agent.sac.app import SamAgentApp

# Configure the agent with comprehensive capabilities
agent_config = {
    "name": "data-analyst-agent",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "agent_name": "data_analyst",
        "model": "gemini-1.5-pro",
        "instruction": "You are a data analysis expert with access to SQL, charting, web tools, and peer collaboration.",
        "tools": [
            {"tool_type": "builtin", "tool_name": "query_data_with_sql"},
            {"tool_type": "builtin", "tool_name": "create_chart"},
            {"tool_type": "builtin", "tool_name": "web_request"},
            {"tool_type": "builtin", "tool_name": "peer_agent_tool"}
        ],
        "agent_card": {
            "description": "AI agent for comprehensive data analysis and reporting",
            "capabilities": ["data_analysis", "web_research", "chart_generation", "peer_collaboration"]
        },
        "agent_card_publishing": {"interval_seconds": 30},
        "agent_discovery": {"enabled": True},
        "inter_agent_communication": {"allow_list": ["*"]}
    }
}

# Create the agent app
agent_app = SamAgentApp(agent_config)

# Step 2: Set up A2A protocol infrastructure
from solace_agent_mesh.common.agent_registry import AgentRegistry
from solace_agent_mesh.common.types import AgentCard, AgentCapabilities, AgentSkill
from solace_agent_mesh.core_a2a.service import CoreA2AService

# Initialize shared agent registry
agent_registry = AgentRegistry()

# Create core A2A service
namespace = "myorg/ai-agents"
a2a_service = CoreA2AService(agent_registry, namespace)

# Register agent capabilities
data_analyst_card = AgentCard(
    name="data_analyst",
    display_name="Data Analyst",
    description="AI agent for data analysis",
    url=f"a2a://{namespace}/data_analyst",
    version="1.0.0",
    capabilities=AgentCapabilities(streaming=True, pushNotifications=True),
    skills=[AgentSkill(id="sql_analysis", name="SQL Data Analysis")]
)
a2a_service.process_discovery_message(data_analyst_card)

# Step 3: Create gateway integrations
from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp
from solace_agent_mesh.gateway.slack.app import SlackGatewayApp

# Web UI Gateway for browser-based interactions
webui_config = {
    "name": "web-gateway",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "gateway_id": "web-ui-gateway",
        "session_secret_key": "a-very-secret-key",
        "fastapi_host": "0.0.0.0",
        "fastapi_port": 8080,
        "artifact_service": {"type": "local_file", "base_path": "./artifacts"}
    }
}
webui_app = WebUIBackendApp(webui_config)

# Slack Gateway for team collaboration
slack_config = {
    "name": "slack-gateway",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "gateway_id": "slack-gateway",
        "slack_bot_token": "${SLACK_BOT_TOKEN}",
        "slack_app_token": "${SLACK_APP_TOKEN}",
        "default_agent_name": "data_analyst"
    }
}
slack_app = SlackGatewayApp(slack_config)
```

### 2. Inter-Agent Communication Pattern

```python
# This code would run within an agent's tool execution context
from solace_agent_mesh.agent.tools.peer_agent_tool import PeerAgentTool

async def analyze_and_delegate_report(component, tool_context):
    # Step 1: Perform local analysis using built-in tools
    from solace_agent_mesh.agent.tools.builtin_data_analysis_tools import query_data_with_sql
    
    analysis_result = await query_data_with_sql(
        sql_query="SELECT * FROM sales_data WHERE date >= '2024-01-01'",
        tool_context=tool_context
    )

    # Step 2: Delegate report generation to a specialist agent
    peer_tool = PeerAgentTool(
        target_agent_name="report_generator",
        host_component=component
    )
    
    report_result = await peer_tool.run_async(
        args={
            "task_description": "Generate a professional PDF report from this analysis",
            "analysis_data": "artifact://analysis_result.json",
            "report_format": "PDF"
        },
        tool_context=tool_context
    )
    
    return report_result
```

### 3. Building Custom Tools

```python
from solace_agent_mesh.agent.tools.registry import tool_registry
from solace_agent_mesh.agent.tools.tool_definition import BuiltinTool
from google.adk.tools import ToolContext

async def custom_database_query(
    query: str,
    database_name: str = "default",
    tool_context: ToolContext = None,
    tool_config: dict = None
) -> dict:
    """Execute a custom database query with enhanced features."""
    
    # Access the host component for shared resources
    host_component = tool_context._invocation_context.agent.host_component
    
    # Get database connection from agent state
    db_connection = host_component.get_agent_specific_state('db_connection')
    
    # Execute query with error handling
    try:
        result = await execute_query(db_connection, query, database_name)
        
        # Save results as an artifact
        from solace_agent_mesh.agent.utils.artifact_helpers import save_artifact_with_metadata
        import json
        from datetime import datetime, timezone
        
        artifact_result = await save_artifact_with_metadata(
            artifact_service=host_component.get_shared_artifact_service(),
            app_name=host_component.get_config()["app_name"],
            user_id=tool_context.user_id,
            session_id=tool_context.session_id,
            filename=f"query_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
            content_bytes=json.dumps(result).encode(),
            mime_type="application/json",
            metadata_dict={
                "query": query,
                "database": database_name,
                "tool": "custom_database_query"
            },
            timestamp=datetime.now(timezone.utc)
        )
        
        return {
            "status": "success",
            "rows_returned": len(result),
            "artifact_filename": artifact_result["filename"],
            "preview": result[:5] if len(result) > 5 else result
        }
        
    except Exception as e:
        return {
            "status": "error",
            "error_message": str(e)
        }

# Register the custom tool
custom_tool = BuiltinTool(
    name="custom_database_query",
    description="Execute custom database queries with enhanced features",
    function=custom_database_query,
    category="data_analysis"
)
tool_registry.register(custom_tool)
```

### 4. Working with Multimedia Tools

```python
from solace_agent_mesh.agent.tools.audio_tools import text_to_speech, multi_speaker_text_to_speech
from solace_agent_mesh.agent.tools.image_tools import create_image_from_description, describe_image

async def multimedia_workflow(tool_context):
    # Generate speech from text
    tts_result = await text_to_speech(
        text="Welcome to our AI-powered presentation system!",
        output_filename="intro.mp3",
        gender="female",
        tone="professional",
        language="en-US",
        tool_context=tool_context
    )
    
    # Create a multi-speaker dialogue
    conversation_result = await multi_speaker_text_to_speech(
        conversation_text="""
        Presenter: Today we'll discuss our quarterly results.
        Analyst: The data shows significant growth in Q4.
        Presenter: Let's dive into the details.
        """,
        speaker_configs=[
            {"name": "Presenter", "gender": "female", "tone": "professional"},
            {"name": "Analyst", "gender": "male", "tone": "analytical"}
        ],
        output_filename="dialogue.mp3",
        tool_context=tool_context
    )
    
    # Generate supporting visuals
    chart_image = await create_image_from_description(
        image_description="A professional bar chart showing quarterly growth with blue and green colors",
        output_filename="quarterly_chart.png",
        tool_context=tool_context
    )
    
    return {
        "intro_audio": tts_result,
        "dialogue_audio": conversation_result,
        "chart_image": chart_image
    }
```

### 5. Client-Side Integration

```python
import asyncio
from solace_agent_mesh.common.client import A2AClient, A2ACardResolver
from solace_agent_mesh.common.types import Message, TextPart

async def client_integration_example():
    # Discover available agents
    resolver = A2ACardResolver("https://agents.myorg.com")
    agent_card = resolver.get_agent_card()
    
    # Create client for agent interaction
    client = A2AClient(agent_card=agent_card)
    
    # Submit a complex task with file upload
    task_payload = {
        "message": {
            "role": "user",
            "parts": [
                {"type": "text", "text": "Please analyze this sales data and create a report"},
                {"type": "file", "file": {"name": "sales_data.csv", "uri": "file://./sales_data.csv"}}
            ]
        }
    }
    
    # Stream the response
    print("Submitting task and streaming response...")
    async for response in client.send_task_streaming(task_payload):
        if hasattr(response.result, 'text_delta'):
            print(response.result.text_delta,

# content_hash: 59d897c57c8a54f55bda0548509f1b6bc7af3a1afde93f59a8525c0eb98659e1
