# LLM Summary Detail File

This file is a concatenation of all individual *llm.txt files found in the directory tree. Each section below corresponds to a specific directory's summary file.

================================================================================

## Section 1: llm.txt

**Source file:** `llm.txt`

# DEVELOPER GUIDE for the directory: src

## Quick Summary
The `src` directory serves as the main source code root for the Solace AI Connector, containing four primary subsystems that work together to enable comprehensive AI agent communication and hosting. The `agent` directory provides a complete framework for hosting Google ADK agents with A2A protocol support, the `common` directory offers foundational A2A protocol infrastructure and utilities, the `core_a2a` directory provides a reusable service layer for core A2A operations, and the `gateway` directory implements various gateway patterns for external platform integration. These components work together to create a distributed AI agent ecosystem with real-time communication, task delegation, and multi-platform integration capabilities.

## Files and Subdirectories Overview
- **Direct files:**
  - `__init__.py`: Empty package initialization file.
- **Subdirectories:**
  - `agent/`: Complete ADK agent hosting framework with A2A protocol integration and comprehensive tool library.
  - `common/`: Foundational A2A protocol infrastructure, type systems, and client/server implementations.
  - `core_a2a/`: Reusable service layer for core A2A interactions and agent registry operations.
  - `gateway/`: Gateway framework with HTTP/SSE, Slack, and Webhook implementations for external platform integration.

## Developer API Reference

### Direct Files

#### __init__.py
**Purpose:** Standard Python package initializer. It allows the `src` directory and its subdirectories to be treated as a package.
**Import:** `from src import agent, common, gateway`

**Classes/Functions/Constants:**
This file is empty and has no public interfaces.

### Subdirectory APIs

#### agent/
**Purpose:** Provides a complete framework for hosting Google ADK agents with A2A protocol support and a comprehensive, extensible tool library.
**Key Exports:** `SamAgentApp`, `SamAgentComponent`, `AppLlmAgent`, and a wide array of built-in tools for data analysis, web requests, multimedia processing, and inter-agent communication.
**Import Examples:**
```python
from src.agent.sac.app import SamAgentApp
from src.agent.sac.component import SamAgentComponent
from src.agent.adk.app_llm_agent import AppLlmAgent
from src.agent.tools.builtin_data_analysis_tools import query_data_with_sql
from src.agent.tools.peer_agent_tool import PeerAgentTool
from src.agent.tools.web_tools import web_request
from src.agent.tools.image_tools import create_image_from_description
```

#### common/
**Purpose:** Provides the foundational infrastructure for Agent-to-Agent (A2A) communication, including the core protocol, data types, message translation, and client/server implementations.
**Key Exports:** A2A protocol functions, Pydantic type definitions (`Message`, `Task`, `AgentCard`), `A2AClient` for interacting with agents, `A2AServer` for building agents, and various utilities.
**Import Examples:**
```python
from src.common.a2a_protocol import get_agent_request_topic
from src.common.types import Message, Task, AgentCard, TextPart
from src.common.client import A2AClient, A2ACardResolver
from src.common.server import A2AServer, InMemoryTaskManager
from src.common.agent_registry import AgentRegistry
from src.common.utils.embeds import resolve_embeds_in_string
```

#### core_a2a/
**Purpose:** Provides a reusable, decoupled service layer for core A2A interactions, handling task submission, cancellation, and agent discovery.
**Key Exports:** `CoreA2AService` for managing A2A protocol logic without being tied to a specific gateway or messaging implementation.
**Import Examples:**
```python
from src.core_a2a.service import CoreA2AService
```

#### gateway/
**Purpose:** Provides a framework and multiple implementations for building gateways that bridge external platforms (like web UIs, Slack, or webhooks) with the A2A messaging system.
**Key Exports:** `BaseGatewayApp` and `BaseGatewayComponent` for creating custom gateways, and concrete implementations like `WebUIBackendApp`, `SlackGatewayApp`, and `WebhookGatewayApp`.
**Import Examples:**
```python
from src.gateway.base.app import BaseGatewayApp
from src.gateway.http_sse.app import WebUIBackendApp
from src.gateway.slack.app import SlackGatewayApp
from src.gateway.webhook.app import WebhookGatewayApp
from src.gateway.base.authorization_service import ConfigurableRbacAuthorizationService
```

## Complete Usage Guide
This guide demonstrates how the different subdirectories within `src` work together to build a complete, distributed AI agent system.

### 1. How to import and use functionality from subdirectories
The following examples show how to import and instantiate components from each major subdirectory.

```python
# 1. Import from the 'agent' directory to create an AI agent
from src.agent.sac.app import SamAgentApp

# 2. Import from the 'common' and 'core_a2a' directories for protocol infrastructure
from src.common.agent_registry import AgentRegistry
from src.common.types import AgentCard, AgentCapabilities, AgentSkill
from src.core_a2a.service import CoreA2AService

# 3. Import from the 'gateway' directory to create interfaces
from src.gateway.http_sse.app import WebUIBackendApp
from src.gateway.slack.app import SlackGatewayApp
from src.gateway.webhook.app import WebhookGatewayApp

# 4. Import tools from the 'agent/tools' subdirectory
from src.agent.tools.peer_agent_tool import PeerAgentTool
from src.agent.tools.builtin_data_analysis_tools import query_data_with_sql
```

### 2. How different parts work together
This section shows a step-by-step process for building a system, illustrating the synergy between the components.

#### Step 1: Create an ADK-powered agent (`agent/`)
First, define and configure an agent. This agent will automatically be equipped with a rich set of tools and A2A communication capabilities.

```python
# File: my_system.py
from src.agent.sac.app import SamAgentApp

# Configure the agent with all capabilities
agent_config = {
    "name": "data-analyst-agent",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "agent_name": "data_analyst",
        "model": "gemini-1.5-pro",
        "instruction": "You are a data analysis expert with access to SQL, charting, web tools, and peer collaboration.",
        "agent_card": {
            "description": "AI agent for comprehensive data analysis and reporting",
            "capabilities": ["data_analysis", "web_research", "chart_generation", "peer_collaboration"]
        },
        "agent_card_publishing": {"interval_seconds": 30},
        "agent_discovery": {"enabled": True},
        "inter_agent_communication": {"allow_list": ["*"]}
    }
}

# Create the agent app (in a real scenario, this is run by the SAC framework)
agent_app = SamAgentApp(agent_config)
```

#### Step 2: Set Up A2A Protocol Infrastructure (`common/` and `core_a2a/`)
Next, set up the core services that manage agent discovery and task routing. This is often handled by the gateway components but can be used directly.

```python
# File: my_system.py (continued)
from src.common.agent_registry import AgentRegistry
from src.common.types import AgentCard, AgentCapabilities, AgentSkill
from src.core_a2a.service import CoreA2AService

# Initialize a shared agent registry
agent_registry = AgentRegistry()

# Create the core A2A service, which uses the registry
namespace = "myorg/ai-agents"
a2a_service = CoreA2AService(agent_registry, namespace)

# Manually register an agent's capabilities (this is usually done automatically by the agent itself)
data_analyst_card = AgentCard(
    name="data_analyst",
    display_name="Data Analyst",
    description="AI agent for data analysis",
    url=f"a2a://{namespace}/data_analyst",
    version="1.0.0",
    capabilities=AgentCapabilities(streaming=True, pushNotifications=True),
    skills=[AgentSkill(id="sql_analysis", name="SQL Data Analysis")]
)
a2a_service.process_discovery_message(data_analyst_card)
```

#### Step 3: Create Gateway Integrations (`gateway/`)
Create one or more gateways to expose the agent(s) to external platforms.

```python
# File: my_system.py (continued)
from src.gateway.http_sse.app import WebUIBackendApp
from src.gateway.slack.app import SlackGatewayApp

# Web UI Gateway for browser-based interactions
webui_config = {
    "name": "web-gateway",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "gateway_id": "web-ui-gateway",
        "session_secret_key": "a-very-secret-key",
        "fastapi_host": "0.0.0.0",
        "fastapi_port": 8080,
        "artifact_service": {"type": "local_file", "base_path": "./artifacts"}
    }
}
webui_app = WebUIBackendApp(webui_config)

# Slack Gateway for team collaboration
slack_config = {
    "name": "slack-gateway",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "gateway_id": "slack-gateway",
        "slack_bot_token": "${SLACK_BOT_TOKEN}",
        "slack_app_token": "${SLACK_APP_TOKEN}",
        "default_agent_name": "data_analyst"
    }
}
slack_app = SlackGatewayApp(slack_config)
```

### 3. Common usage patterns

#### Pattern 1: Inter-Agent Communication
An agent can use the `PeerAgentTool` (from `agent/tools/`) to delegate tasks to other agents, leveraging the `common/` protocol infrastructure.

```python
# This code would run within an agent's tool execution context.
from src.agent.tools.peer_agent_tool import PeerAgentTool

async def analyze_and_delegate_report(component, tool_context):
    # Assume 'component' is the SamAgentComponent instance hosting the current agent.
    
    # Step 1: Perform local analysis (using another tool)
    # ... analysis_result = await query_data_with_sql(...) ...

    # Step 2: Delegate report generation to a specialist agent
    peer_tool = PeerAgentTool(
        target_agent_name="report_generator",
        host_component=component
    )
    
    report_result = await peer_tool.run_async(
        args={
            "task_description": "Generate a professional PDF report from this analysis",
            "analysis_data": "artifact://analysis_result.json",
            "report_format": "PDF"
        },
        tool_context=tool_context
    )
    
    return report_result
```

#### Pattern 2: Building Custom Tools
Developers can extend the agent's capabilities by creating custom tools that integrate with the existing framework.

```python
from src.agent.tools.registry import tool_registry
from src.agent.tools.tool_definition import BuiltinTool
from google.adk.tools import ToolContext

async def custom_database_query(
    query: str,
    database_name: str = "default",
    tool_context: ToolContext = None,
    tool_config: dict = None
) -> dict:
    """Execute a custom database query with enhanced features."""
    
    # Access the host component for shared resources
    host_component = tool_context._invocation_context.agent.host_component
    
    # Get database connection from agent state
    db_connection = host_component.get_agent_specific_state('db_connection')
    
    # Execute query and save results as artifact
    try:
        result = await execute_query(db_connection, query, database_name)
        
        # Save results using the artifact helpers
        from src.agent.utils.artifact_helpers import save_artifact_with_metadata
        import json
        from datetime import datetime, timezone
        
        artifact_result = await save_artifact_with_metadata(
            artifact_service=host_component.get_shared_artifact_service(),
            app_name=host_component.get_config()["app_name"],
            user_id=tool_context.user_id,
            session_id=tool_context.session_id,
            filename=f"query_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
            content_bytes=json.dumps(result).encode(),
            mime_type="application/json",
            metadata_dict={
                "query": query,
                "database": database_name,
                "tool": "custom_database_query"
            },
            timestamp=datetime.now(timezone.utc)
        )
        
        return {
            "status": "success",
            "rows_returned": len(result),
            "artifact_filename": artifact_result["filename"]
        }
        
    except Exception as e:
        return {"status": "error", "error_message": str(e)}

# Register the custom tool
custom_tool = BuiltinTool(
    name="custom_database_query",
    description="Execute custom database queries with enhanced features",
    function=custom_database_query,
    category="data_analysis"
)
tool_registry.register(custom_tool)
```

#### Pattern 3: Client-Side Integration
External applications can interact with the agent system using the client library from the `common/` directory.

```python
import asyncio
from src.common.client import A2AClient, A2ACardResolver

async def client_integration_example():
    # Discover available agents
    resolver = A2ACardResolver("https://agents.myorg.com")
    agent_card = resolver.get_agent_card()
    
    # Create client for agent interaction
    client = A2AClient(agent_card=agent_card)
    
    # Submit a task with streaming response
    task_payload = {
        "action": "analyze_data",
        "data_file": "sales_report.csv",
        "analysis_type": "quarterly_trends"
    }
    
    print("Submitting task and streaming response...")
    async for response in client.send_task_streaming(task_payload):
        if hasattr(response.result, 'text_delta'):
            print(response.result.text_delta, end='', flush=True)
        elif hasattr(response.result, 'artifact'):
            print(f"\nArtifact created: {response.result.artifact.name}")

# Run the client example
# asyncio.run(client_integration_example())
```

#### Pattern 4: Multimedia Processing Workflow
The system supports rich multimedia processing through specialized tools.

```python
from src.agent.tools.audio_tools import text_to_speech, multi_speaker_text_to_speech
from src.agent.tools.image_tools import create_image_from_description

async def multimedia_workflow(tool_context):
    # Generate speech from text
    tts_result = await text_to_speech(
        text="Welcome to our AI-powered presentation system!",
        output_filename="intro.mp3",
        gender="female",
        tone="professional",
        language="en-US",
        tool_context=tool_context
    )
    
    # Create a multi-speaker dialogue
    conversation_result = await multi_speaker_text_to_speech(
        conversation_text="""
        Presenter: Today we'll discuss our quarterly results.
        Analyst: The data shows significant growth in Q4.
        Presenter: Let's dive into the details.
        """,
        speaker_configs=[
            {"name": "Presenter", "gender": "female", "tone": "professional"},
            {"name": "

# content_hash: 0d0be1e24971f7ec1b894394591b39ffa220bb091711dd13c3f2834930698ba2

================================================================================

## Section 2: solace_agent_mesh/agent/adk/adk_llm.txt

**Source file:** `solace_agent_mesh/agent/adk/adk_llm.txt`

# DEVELOPER GUIDE for adk directory

## Quick Summary
The `adk` directory serves as the core integration layer between the Solace AI Connector framework and Google's Agent Development Kit (ADK). It provides the essential components for building, configuring, and running sophisticated AI agents within a Solace messaging environment.

The architecture is designed for modularity and extensibility. The `setup.py` module acts as the main configuration hub, using factory functions from `services.py` to initialize pluggable services (like `FilesystemArtifactService` for artifact storage) and loading tools (Python functions, MCP tools) via the `ADKToolWrapper`.

Once initialized, the `AppLlmAgent` (a custom agent class) is managed by the `runner.py` module, which handles the asynchronous task execution loop. The agent's behavior is dynamically augmented at runtime by a rich set of callbacks from `callbacks.py`. These callbacks inject dynamic instructions, manage large tool responses, log events to Solace, and handle advanced features like streaming artifact creation and auto-continuation of conversations. The `models/` subdirectory provides the concrete LLM clients, with `LiteLlm` offering broad compatibility with various model providers.

## Files and Subdirectories Overview
- **Direct files:**
  - `__init__.py`: Standard Python package initializer
  - `app_llm_agent.py`: Defines a custom `LlmAgent` subclass that holds a reference to its host component
  - `callbacks.py`: Provides a rich set of ADK callback functions for dynamic instructions, metadata injection, and Solace integration
  - `embed_resolving_mcp_toolset.py`: Custom MCPToolset that resolves embeds in tool parameters before calling MCP tools
  - `filesystem_artifact_service.py`: A local filesystem-based implementation of ADK's `BaseArtifactService`
  - `intelligent_mcp_callbacks.py`: Intelligent MCP callback functions for processing and saving MCP tool responses as typed artifacts
  - `invocation_monitor.py`: A utility for monitoring and logging agent invocations to YAML files for debugging
  - `mcp_content_processor.py`: Intelligent processing of MCP tool responses, converting raw content into appropriately typed artifacts
  - `runner.py`: Manages the asynchronous execution of ADK agent tasks, including cancellation support
  - `services.py`: Contains factory functions for initializing ADK services (session, artifact, memory) based on configuration
  - `setup.py`: Handles the high-level initialization of the ADK agent, tools, and runner
  - `stream_parser.py`: An internal utility for parsing fenced artifact blocks from an LLM's streaming response
  - `tool_wrapper.py`: A wrapper for Python functions to make them compatible with ADK, handling embed resolution and config injection
- **Subdirectories:**
  - `artifacts/`: Contains filesystem and S3-compatible artifact storage implementations
  - `models/`: Contains concrete `BaseLlm` implementations for interfacing with various LLM providers

## Developer API Reference

### Direct Files

#### app_llm_agent.py
**Purpose:** A custom `LlmAgent` subclass that includes a reference to its hosting component, allowing callbacks and tools to access host-level configurations and services.
**Import:** `from solace_agent_mesh.agent.adk.app_llm_agent import AppLlmAgent`

**Classes/Functions/Constants:**
- `AppLlmAgent(host_component: Any = None, **kwargs)`: A custom `LlmAgent` that can be linked to a host component. The `host_component` is set post-initialization and is excluded from serialization.

#### callbacks.py
**Purpose:** Provides a suite of ADK callback functions that hook into the agent's lifecycle to inject custom logic. These are typically not called directly but are assigned to the agent during setup.
**Import:** `from solace_agent_mesh.agent.adk import callbacks`

**Classes/Functions/Constants:**
- `inject_dynamic_instructions_callback(...)`: Injects instructions into the prompt based on host configuration, active tools, and peer agents
- `manage_large_mcp_tool_responses_callback(...)`: Intercepts large tool responses, saves them as artifacts, and returns a truncated summary to the LLM
- `after_tool_callback_inject_metadata(...)`: After a tool creates an artifact, this loads its metadata and injects it into the tool response
- `process_artifact_blocks_callback(...)`: Processes streaming text to identify and save fenced artifact blocks (e.g., `«««save_artifact:...»»»`)
- `auto_continue_on_max_tokens_callback(...)`: Automatically continues a conversation if the LLM response was interrupted due to token limits
- `notify_tool_invocation_start_callback(...)`: Sends a status update over Solace when a tool is about to be invoked
- `solace_llm_invocation_callback(...)`: Sends a status update over Solace when the agent calls the LLM
- `repair_history_callback(...)`: Proactively checks for and repairs dangling tool calls in conversation history

#### embed_resolving_mcp_toolset.py
**Purpose:** Custom MCPToolset that resolves embeds in tool parameters before calling MCP tools, enabling dynamic content injection.
**Import:** `from solace_agent_mesh.agent.adk.embed_resolving_mcp_toolset import EmbedResolvingMCPToolset, EmbedResolvingMCPTool`

**Classes/Functions/Constants:**
- `EmbedResolvingMCPToolset(connection_params, tool_filter=None, auth_scheme=None, auth_credential=None, tool_config=None)`: Custom MCPToolset that creates EmbedResolvingMCPTool instances
- `EmbedResolvingMCPTool(original_mcp_tool, tool_config=None)`: Custom MCPTool that resolves embeds in parameters before calling the actual MCP tool

#### filesystem_artifact_service.py
**Purpose:** An implementation of `BaseArtifactService` that stores artifacts on the local filesystem, organized by scope, user, and session.
**Import:** `from solace_agent_mesh.agent.adk.filesystem_artifact_service import FilesystemArtifactService`

**Classes/Functions/Constants:**
- `FilesystemArtifactService(base_path: str)`: A service for managing artifacts on the local disk
  - `async save_artifact(...) -> int`: Saves an artifact and returns its version number
  - `async load_artifact(...) -> Optional[adk_types.Part]`: Loads a specific version of an artifact, or the latest if unspecified
  - `async list_artifact_keys(...) -> List[str]`: Lists the names of all artifacts for a given user/session
  - `async delete_artifact(...)`: Deletes an artifact and all its versions
  - `async list_versions(...) -> List[int]`: Lists all version numbers for a specific artifact

#### intelligent_mcp_callbacks.py
**Purpose:** Intelligent MCP callback functions that use intelligent content processing to save MCP tool responses as appropriately typed artifacts.
**Import:** `from solace_agent_mesh.agent.adk.intelligent_mcp_callbacks import save_mcp_response_as_artifact_intelligent, McpSaveResult, McpSaveStatus`

**Classes/Functions/Constants:**
- `save_mcp_response_as_artifact_intelligent(tool, tool_context, host_component, mcp_response_dict, original_tool_args) -> McpSaveResult`: Intelligently processes and saves MCP tool response content as typed artifacts
- `McpSaveStatus`: Enumeration for the status of an MCP save operation (SUCCESS, PARTIAL_SUCCESS, ERROR)
- `McpSaveResult`: The definitive result of an MCP response save operation with status, message, and artifact details

#### invocation_monitor.py
**Purpose:** A debugging utility that logs the entire lifecycle of an agent invocation, from the initial request to the final response, into a structured YAML file.
**Import:** `from solace_agent_mesh.agent.adk.invocation_monitor import InvocationMonitor`

**Classes/Functions/Constants:**
- `InvocationMonitor()`: A class that monitors and logs agent message flows
  - `log_message_event(direction: str, topic: str, payload: any, ...)`: Logs a single message event
  - `cleanup()`: Finalizes any active logging sessions

#### mcp_content_processor.py
**Purpose:** Intelligent processing of MCP tool responses, converting raw MCP content into appropriately typed and formatted artifacts based on the MCP specification content types.
**Import:** `from solace_agent_mesh.agent.adk.mcp_content_processor import MCPContentProcessor, MCPContentItem, MCPContentProcessorConfig`

**Classes/Functions/Constants:**
- `MCPContentProcessor(tool_name: str, tool_args: Dict[str, Any])`: Main processor for MCP tool response content
  - `process_mcp_response(mcp_response_dict) -> List[MCPContentItem]`: Process an MCP tool response and extract content items
- `MCPContentItem`: Represents a processed MCP content item with metadata
- `MCPContentProcessorConfig`: Configuration for MCP content processing

#### runner.py
**Purpose:** Provides the core asynchronous task execution logic for the ADK agent, including robust cancellation handling.
**Import:** `from solace_agent_mesh.agent.adk.runner import run_adk_async_task_thread_wrapper, TaskCancelledError`

**Classes/Functions/Constants:**
- `run_adk_async_task_thread_wrapper(component, adk_session, adk_content, ...)`: A high-level wrapper that runs an ADK task in a separate thread and handles all cleanup and error finalization
- `run_adk_async_task(component, task_context, adk_session, adk_content, run_config, a2a_context) -> bool`: Runs the ADK Runner asynchronously and processes intermediate events
- `TaskCancelledError(Exception)`: Custom exception raised when an agent task is cancelled externally

#### services.py
**Purpose:** Provides factory functions to initialize the various ADK services based on the agent's configuration file.
**Import:** `from solace_agent_mesh.agent.adk.services import initialize_session_service, initialize_artifact_service, initialize_memory_service, ScopedArtifactServiceWrapper`

**Classes/Functions/Constants:**
- `initialize_session_service(component) -> BaseSessionService`: Creates a session service (e.g., `InMemorySessionService`)
- `initialize_artifact_service(component) -> BaseArtifactService`: Creates an artifact service (e.g., `FilesystemArtifactService`, `GcsArtifactService`)
- `initialize_memory_service(component) -> BaseMemoryService`: Creates a memory service (e.g., `InMemoryMemoryService`)
- `ScopedArtifactServiceWrapper`: A wrapper that transparently applies configured scope to artifact operations

#### setup.py
**Purpose:** The main entry point for configuring and instantiating the ADK agent and its dependencies. These functions tie all the other modules together.
**Import:** `from solace_agent_mesh.agent.adk.setup import load_adk_tools, initialize_adk_agent, initialize_adk_runner`

**Classes/Functions/Constants:**
- `async load_adk_tools(component) -> Tuple[List[Union[BaseTool, Callable]], List[BuiltinTool]]`: Loads all configured tools, including Python functions, MCP tools, and built-ins, wrapping them with `ADKToolWrapper`
- `initialize_adk_agent(component, loaded_explicit_tools, enabled_builtin_tools) -> AppLlmAgent`: Creates an `AppLlmAgent` instance, assigns all the necessary callbacks from `callbacks.py`, and attaches the tools
- `initialize_adk_runner(component) -> Runner`: Initializes the ADK Runner with the agent and services

#### stream_parser.py
**Purpose:** A stateful stream parser for identifying and extracting fenced artifact blocks from an LLM's text stream.
**Import:** `from solace_agent_mesh.agent.adk.stream_parser import FencedBlockStreamParser, BlockStartedEvent, BlockCompletedEvent`

**Classes/Functions/Constants:**
- `FencedBlockStreamParser(progress_update_interval_bytes=4096)`: Processes a stream of text chunks to identify and extract fenced artifact blocks
  - `process_chunk(text_chunk: str) -> ParserResult`: Processes the next chunk of text from the stream
  - `finalize() -> ParserResult`: Call at the end of an LLM turn to handle any unterminated blocks
- `BlockStartedEvent`, `BlockCompletedEvent`, `BlockProgressedEvent`, `BlockInvalidatedEvent`: Events emitted by the parser

#### tool_wrapper.py
**Purpose:** A wrapper for Python functions to make them compatible with ADK, handling embed resolution and config injection.
**Import:** `from solace_agent_mesh.agent.adk.tool_wrapper import ADKToolWrapper`

**Classes/Functions/Constants:**
- `ADKToolWrapper(original_func, tool_config, tool_name, origin, raw_string_args=None)`: A consolidated wrapper for ADK tools that handles metadata preservation, embed resolution, config injection, and error handling

### Subdirectory APIs

#### artifacts/
**Purpose:** Contains filesystem and S3-compatible artifact storage implementations for managing artifacts with versioning, user namespacing, and session-based organization
**Key Exports:** `FilesystemArtifactService`, `S3ArtifactService` classes for local and cloud artifact storage
**Import Examples:**
```python
from solace_agent_mesh.agent.adk.artifacts.filesystem_artifact_service import FilesystemArtifactService
from solace_agent_mesh.agent.adk.artifacts.s3_artifact_service import S3ArtifactService
```

#### models/
**Purpose:** Contains concrete `BaseLlm` implementations for interfacing with various LLM providers
**Key Exports:** `LiteLlm` class for broad model provider compatibility
**Import Examples:**
```python
from solace_agent_mesh.agent.adk.models.lite_llm import LiteLlm
```

## Complete Usage Guide

### 1. Basic ADK Agent Setup

```python
from solace_agent_mesh.agent.adk.setup import load_adk_tools, initialize_adk_agent, initialize_adk_runner
from solace_agent_mesh.agent.adk.services import initialize_session_service, initialize_artifact_service, initialize_memory_service
from solace_agent_mesh.agent.adk.models.lite_llm import LiteLlm

# Initialize services
session_service = initialize_session_service(component)
artifact_service = initialize_artifact_service(component)
memory_service = initialize_memory_service(component)

# Load tools
loaded_tools, builtin_tools = await load_adk_tools(component)

# Initialize agent
agent = initialize_adk_agent(component, loaded_tools, builtin_tools)

# Initialize runner
runner = initialize_adk_runner(component)
```

### 2. Custom LLM Model Configuration

```python
from solace_agent_mesh.agent.adk.models.lite_llm import LiteLlm

# Configure LiteLlm for different providers
# OpenAI
llm = LiteLlm(
    model="gpt-4-turbo",
    temperature=0.7,
    max_completion_tokens=1000
)

# Anthropic
llm = LiteLlm(
    model="claude-3-sonnet-20240229",
    temperature=0.5
)

# Vertex AI
llm = LiteLlm(
    model="gemini-pro",
    temperature=0.3
)
```

### 3. Artifact Service Usage

```python
from solace_agent_mesh.agent.adk.artifacts.filesystem_artifact_service import FilesystemArtifactService
from solace_agent_mesh.agent.adk.artifacts.s3_artifact_service import S3ArtifactService
from google.genai import types as adk_types

# Initialize filesystem artifact service
artifact_service = FilesystemArtifactService(base_path="/tmp/artifacts")

# Or initialize S3 artifact service
artifact_service = S3ArtifactService(bucket_name="my-artifacts-bucket")

# Save an artifact
content = "Hello, world!"
artifact = adk_types.Part.from_text(content

# content_hash: 629ea64c24ef561214d3d053d3b8ecb94c0903034ab01354904f0b05ef06f235

================================================================================

## Section 3: solace_agent_mesh/agent/adk/artifacts/artifacts_llm.txt

**Source file:** `solace_agent_mesh/agent/adk/artifacts/artifacts_llm.txt`

# DEVELOPER GUIDE: artifacts

## Quick Summary
The artifacts directory provides ADK ArtifactService implementations for the Solace Agent Mesh. It includes filesystem and S3-compatible storage backends for managing artifacts with versioning, user namespacing, and session-based organization.

## Files Overview
- `__init__.py` - Package initialization for artifact service implementations
- `filesystem_artifact_service.py` - Local filesystem-based artifact storage implementation
- `s3_artifact_service.py` - Amazon S3 compatible storage implementation for artifacts

## Developer API Reference

### filesystem_artifact_service.py
**Purpose:** Provides local filesystem storage for artifacts with structured directory organization and metadata management.

**Import:** `from solace_agent_mesh.agent.adk.artifacts.filesystem_artifact_service import FilesystemArtifactService`

**Classes:**
- `FilesystemArtifactService(base_path: str)` - Filesystem-based artifact service implementation
  - `save_artifact(*, app_name: str, user_id: str, session_id: str, filename: str, artifact: adk_types.Part) -> int` - Saves an artifact and returns version number
  - `load_artifact(*, app_name: str, user_id: str, session_id: str, filename: str, version: int | None = None) -> adk_types.Part | None` - Loads an artifact by version (latest if None)
  - `list_artifact_keys(*, app_name: str, user_id: str, session_id: str) -> list[str]` - Lists all artifact filenames for a scope
  - `delete_artifact(*, app_name: str, user_id: str, session_id: str, filename: str) -> None` - Deletes all versions of an artifact
  - `list_versions(*, app_name: str, user_id: str, session_id: str, filename: str) -> list[int]` - Lists all version numbers for an artifact
  - `base_path: str` - Root directory for artifact storage

**Constants/Variables:**
- `METADATA_FILE_SUFFIX: str` - File suffix for metadata files (".meta")

**Usage Examples:**
```python
from solace_agent_mesh.agent.adk.artifacts.filesystem_artifact_service import FilesystemArtifactService
from google.genai import types as adk_types

# Initialize the service
artifact_service = FilesystemArtifactService(base_path="/path/to/artifacts")

# Save an artifact
artifact_data = b"Hello, World!"
artifact_part = adk_types.Part.from_bytes(data=artifact_data, mime_type="text/plain")
version = await artifact_service.save_artifact(
    app_name="my_app",
    user_id="user123",
    session_id="session456",
    filename="greeting.txt",
    artifact=artifact_part
)

# Load the latest version
loaded_artifact = await artifact_service.load_artifact(
    app_name="my_app",
    user_id="user123", 
    session_id="session456",
    filename="greeting.txt"
)

# Load a specific version
specific_version = await artifact_service.load_artifact(
    app_name="my_app",
    user_id="user123",
    session_id="session456", 
    filename="greeting.txt",
    version=1
)

# List all artifacts
artifact_keys = await artifact_service.list_artifact_keys(
    app_name="my_app",
    user_id="user123",
    session_id="session456"
)

# List versions of an artifact
versions = await artifact_service.list_versions(
    app_name="my_app",
    user_id="user123",
    session_id="session456",
    filename="greeting.txt"
)

# Delete an artifact
await artifact_service.delete_artifact(
    app_name="my_app",
    user_id="user123",
    session_id="session456",
    filename="greeting.txt"
)
```

### s3_artifact_service.py
**Purpose:** Provides S3-compatible storage for artifacts with structured key organization and AWS integration.

**Import:** `from solace_agent_mesh.agent.adk.artifacts.s3_artifact_service import S3ArtifactService`

**Classes:**
- `S3ArtifactService(bucket_name: str, s3_client: BaseClient | None = None, **kwargs)` - S3-based artifact service implementation
  - `save_artifact(*, app_name: str, user_id: str, session_id: str, filename: str, artifact: adk_types.Part) -> int` - Saves an artifact to S3 and returns version number
  - `load_artifact(*, app_name: str, user_id: str, session_id: str, filename: str, version: int | None = None) -> adk_types.Part | None` - Loads an artifact from S3 by version (latest if None)
  - `list_artifact_keys(*, app_name: str, user_id: str, session_id: str) -> list[str]` - Lists all artifact filenames for a scope
  - `delete_artifact(*, app_name: str, user_id: str, session_id: str, filename: str) -> None` - Deletes all versions of an artifact from S3
  - `list_versions(*, app_name: str, user_id: str, session_id: str, filename: str) -> list[int]` - Lists all version numbers for an artifact
  - `bucket_name: str` - S3 bucket name for storage
  - `s3: BaseClient` - Boto3 S3 client instance

**Usage Examples:**
```python
from solace_agent_mesh.agent.adk.artifacts.s3_artifact_service import S3ArtifactService
from google.genai import types as adk_types
import boto3

# Initialize with default credentials
artifact_service = S3ArtifactService(bucket_name="my-artifacts-bucket")

# Initialize with custom S3 client
s3_client = boto3.client(
    's3',
    endpoint_url='https://minio.example.com',
    aws_access_key_id='access_key',
    aws_secret_access_key='secret_key'
)
artifact_service = S3ArtifactService(
    bucket_name="my-artifacts-bucket",
    s3_client=s3_client
)

# Save an artifact
artifact_data = b"Hello, S3!"
artifact_part = adk_types.Part.from_bytes(data=artifact_data, mime_type="text/plain")
version = await artifact_service.save_artifact(
    app_name="my_app",
    user_id="user123",
    session_id="session456", 
    filename="greeting.txt",
    artifact=artifact_part
)

# Load the latest version
loaded_artifact = await artifact_service.load_artifact(
    app_name="my_app",
    user_id="user123",
    session_id="session456",
    filename="greeting.txt"
)

# Save user-scoped artifact (persists across sessions)
user_artifact = adk_types.Part.from_bytes(data=b"User data", mime_type="text/plain")
await artifact_service.save_artifact(
    app_name="my_app", 
    user_id="user123",
    session_id="session456",
    filename="user:profile.json",  # user: prefix for user-scoped storage
    artifact=user_artifact
)

# List all artifacts (includes both session and user-scoped)
artifact_keys = await artifact_service.list_artifact_keys(
    app_name="my_app",
    user_id="user123", 
    session_id="session456"
)

# Delete an artifact
await artifact_service.delete_artifact(
    app_name="my_app",
    user_id="user123",
    session_id="session456",
    filename="greeting.txt"
)
```

# content_hash: bc2a538c503d0082a1ea06c7bfd9c69450f53d5aa958ca1f3c19819c676eb183

================================================================================

## Section 4: solace_agent_mesh/agent/adk/models/models_llm.txt

**Source file:** `solace_agent_mesh/agent/adk/models/models_llm.txt`

# DEVELOPER GUIDE for models directory

## Quick Summary
This directory contains concrete implementations of the `BaseLlm` interface, providing wrappers for various Large Language Model APIs. These classes translate the ADK's standard `LlmRequest` into provider-specific formats and parse responses back into standard `LlmResponse` objects.

## Files Overview
- `lite_llm.py` - LLM client using the `litellm` library to support hundreds of models from different providers
- `models_llm.txt` - Documentation file containing developer guide content

## Developer API Reference

### lite_llm.py
**Purpose:** Provides the `LiteLlm` class, a `BaseLlm` implementation that interfaces with hundreds of LLM models through the `litellm` library. Supports models from OpenAI, Anthropic, Vertex AI, and many other providers by simply changing the model string.

**Import:** `from solace_agent_mesh.agent.adk.models.lite_llm import LiteLlm`

**Classes:**
- `LiteLlm(model: str, **kwargs)` - Wrapper around `litellm` supporting any model it recognizes
  - `generate_content_async(llm_request: LlmRequest, stream: bool = False) -> AsyncGenerator[LlmResponse, None]` - Generates content asynchronously with optional streaming
  - `supported_models() -> list[str]` - Returns list of supported models (empty for LiteLlm due to dynamic model support)
  - `model: str` - The name of the LiteLlm model
  - `llm_client: LiteLLMClient` - The LLM client instance used for API calls

- `LiteLLMClient()` - Internal client providing completion methods for better testability
  - `acompletion(model, messages, tools, **kwargs) -> Union[ModelResponse, CustomStreamWrapper]` - Asynchronous completion call
  - `completion(model, messages, tools, stream=False, **kwargs) -> Union[ModelResponse, CustomStreamWrapper]` - Synchronous completion call

- `FunctionChunk(BaseModel)` - Represents a function call chunk in streaming responses
  - `id: Optional[str]` - Function call ID
  - `name: Optional[str]` - Function name
  - `args: Optional[str]` - Function arguments as JSON string
  - `index: Optional[int]` - Index of the function call

- `TextChunk(BaseModel)` - Represents a text chunk in streaming responses
  - `text: str` - The text content

- `UsageMetadataChunk(BaseModel)` - Represents token usage information
  - `prompt_tokens: int` - Number of tokens in the prompt
  - `completion_tokens: int` - Number of tokens in the completion
  - `total_tokens: int` - Total number of tokens used

**Functions:**
- `_content_to_message_param(content: types.Content) -> Union[Message, list[Message]]` - Converts ADK Content to litellm Message format
- `_get_content(parts: Iterable[types.Part]) -> Union[OpenAIMessageContent, str]` - Converts parts to litellm content format
- `_function_declaration_to_tool_param(function_declaration: types.FunctionDeclaration) -> dict` - Converts function declarations to OpenAPI spec format
- `_model_response_to_generate_content_response(response: ModelResponse) -> LlmResponse` - Converts litellm response to LlmResponse

**Usage Examples:**
```python
import asyncio
import os
from solace_agent_mesh.agent.adk.models.lite_llm import LiteLlm
from solace_agent_mesh.agent.adk.models.llm_request import LlmRequest, LlmConfig
from google.genai.types import Content, Part

# Set environment variables for your chosen provider
# For OpenAI:
# os.environ["OPENAI_API_KEY"] = "your-api-key"
# For Vertex AI:
# os.environ["VERTEXAI_PROJECT"] = "your-project-id"
# os.environ["VERTEXAI_LOCATION"] = "your-location"

async def main():
    # Initialize LiteLlm with a specific model
    llm = LiteLlm(
        model="gpt-4-turbo",
        temperature=0.7,
        max_completion_tokens=150
    )
    
    # Create a request
    request = LlmRequest(
        contents=[
            Content(
                role="user",
                parts=[Part.from_text("Explain quantum computing in simple terms")]
            )
        ],
        config=LlmConfig(
            temperature=0.5,
            max_output_tokens=200
        )
    )
    
    # Non-streaming generation
    print("=== Non-streaming ===")
    async for response in llm.generate_content_async(request, stream=False):
        print(f"Response: {response.text}")
        if response.usage_metadata:
            print(f"Tokens used: {response.usage_metadata.total_token_count}")
    
    # Streaming generation
    print("\n=== Streaming ===")
    async for response in llm.generate_content_async(request, stream=True):
        if response.text:
            print(response.text, end="", flush=True)
        if response.usage_metadata:
            print(f"\nTotal tokens: {response.usage_metadata.total_token_count}")

# Example with function calling
async def function_calling_example():
    from google.genai.types import FunctionDeclaration, Schema, Type, Tool
    
    # Define a function for the LLM to call
    get_weather_func = FunctionDeclaration(
        name="get_weather",
        description="Get current weather for a location",
        parameters=Schema(
            type=Type.OBJECT,
            properties={
                "location": Schema(type=Type.STRING, description="City name"),
                "unit": Schema(type=Type.STRING, description="Temperature unit")
            },
            required=["location"]
        )
    )
    
    llm = LiteLlm(model="gpt-4-turbo")
    
    request = LlmRequest(
        contents=[
            Content(
                role="user", 
                parts=[Part.from_text("What's the weather like in Tokyo?")]
            )
        ],
        config=LlmConfig(
            tools=[Tool(function_declarations=[get_weather_func])]
        )
    )
    
    async for response in llm.generate_content_async(request):
        if response.function_calls:
            for func_call in response.function_calls:
                print(f"Function called: {func_call.name}")
                print(f"Arguments: {func_call.args}")

if __name__ == "__main__":
    asyncio.run(main())
    # asyncio.run(function_calling_example())
```

# content_hash: 12789ad2e16cd9ea5a81abdd68258d9ef30520bed5c51ba8d00ea66014191964

================================================================================

## Section 5: solace_agent_mesh/agent/agent_llm.txt

**Source file:** `solace_agent_mesh/agent/agent_llm.txt`

# DEVELOPER GUIDE for agent directory

## Quick Summary
The `agent` directory provides a comprehensive framework for hosting Google ADK (Agent Development Kit) agents within the Solace AI Connector ecosystem. It bridges ADK agents with the A2A (Agent-to-Agent) protocol over Solace messaging, enabling distributed agent communication, task delegation, and rich tool functionality.

The architecture is modular, consisting of several key components:
- **`sac/` (Solace AI Connector):** The main entry point, providing the `SamAgentApp` and `SamAgentComponent` to host the agent and manage its lifecycle and communication over the Solace event mesh.
- **`adk/` (Agent Development Kit):** The core integration layer with Google's ADK. It defines the custom `AppLlmAgent`, manages asynchronous task execution, and provides a rich set of callbacks to augment agent behavior.
- **`tools/`:** A comprehensive and extensible library of tools available to the agent, covering data analysis, artifact management, web requests, multimedia processing, and inter-agent communication.
- **`protocol/`:** The underlying implementation of the A2A (Agent-to-Agent) communication protocol, handling message routing and event processing.
- **`utils/`:** A collection of helper modules for common tasks like artifact management, configuration parsing, and context handling.
- **`testing/`:** Utilities to aid in debugging and testing custom agent implementations.

These components work together to create a robust environment where an ADK agent can be configured with specific instructions and tools, communicate with other agents, and execute complex tasks in a distributed, event-driven manner.

## Files and Subdirectories Overview
- **Direct files:**
  - `__init__.py`: Standard Python package initializer that marks the `agent` directory as a Python package
  - `agent_llm.txt`: Documentation file containing comprehensive developer guide content
- **Subdirectories:**
  - `adk/`: Provides the core integration layer with Google's ADK, including custom agents, services, and callbacks
  - `protocol/`: Implements the A2A protocol event handlers for message routing and agent communication
  - `sac/`: Contains the Solace AI Connector app and component implementations for hosting ADK agents
  - `testing/`: Provides utilities for testing the A2A framework and debugging agent behavior
  - `tools/`: A comprehensive, registry-based tool library for AI agents
  - `utils/`: Contains helper utilities for configuration, context handling, and artifact management

## Developer API Reference

### Direct Files

#### __init__.py
**Purpose:** Standard Python package initializer. It allows the `agent` directory to be treated as a package.
**Import:** `import solace_agent_mesh.agent`

**Classes/Functions/Constants:** [None - empty file]

#### agent_llm.txt
**Purpose:** Documentation file containing comprehensive developer guide content
**Import:** Not applicable - this is a documentation file, not a code module

**Classes/Functions/Constants:** [None - documentation file]

### Subdirectory APIs

#### adk/
**Purpose:** Provides the core integration layer between the Solace AI Connector and Google's ADK
**Key Exports:** `AppLlmAgent`, `initialize_adk_agent`, `initialize_adk_runner`, `load_adk_tools`, `FilesystemArtifactService`, `LiteLlm`
**Import Examples:**
```python
from solace_agent_mesh.agent.adk.setup import load_adk_tools, initialize_adk_agent, initialize_adk_runner
from solace_agent_mesh.agent.adk.app_llm_agent import AppLlmAgent
from solace_agent_mesh.agent.adk.filesystem_artifact_service import FilesystemArtifactService
from solace_agent_mesh.agent.adk.models.lite_llm import LiteLlm
from solace_agent_mesh.agent.adk.services import initialize_session_service, initialize_artifact_service
```

#### protocol/
**Purpose:** Implements the core logic for Agent-to-Agent (A2A) communication protocol
**Key Exports:** `process_event`, `handle_a2a_request`, `handle_agent_card_message`, `publish_agent_card`
**Import Examples:**
```python
from solace_agent_mesh.agent.protocol.event_handlers import process_event, publish_agent_card
```

#### sac/
**Purpose:** Provides the Solace AI Connector app and component implementations for hosting ADK agents
**Key Exports:** `SamAgentApp`, `SamAgentComponent`, `TaskExecutionContext`
**Import Examples:**
```python
from solace_agent_mesh.agent.sac.app import SamAgentApp
from solace_agent_mesh.agent.sac.component import SamAgentComponent
from solace_agent_mesh.agent.sac.task_execution_context import TaskExecutionContext
```

#### testing/
**Purpose:** Provides utilities for testing the A2A framework and debugging agent behavior
**Key Exports:** `pretty_print_event_history`
**Import Examples:**
```python
from solace_agent_mesh.agent.testing.debug_utils import pretty_print_event_history
```

#### tools/
**Purpose:** A comprehensive, registry-based tool library for AI agents
**Key Exports:** `tool_registry`, `BuiltinTool`, `PeerAgentTool`, and various tool functions
**Import Examples:**
```python
from solace_agent_mesh.agent.tools.registry import tool_registry
from solace_agent_mesh.agent.tools.peer_agent_tool import PeerAgentTool
from solace_agent_mesh.agent.tools.audio_tools import text_to_speech
from solace_agent_mesh.agent.tools.builtin_artifact_tools import list_artifacts, load_artifact
```

#### utils/
**Purpose:** Contains helper utilities for configuration, context handling, and artifact management
**Key Exports:** `save_artifact_with_metadata`, `load_artifact_content_or_metadata`, `resolve_instruction_provider`
**Import Examples:**
```python
from solace_agent_mesh.agent.utils.artifact_helpers import save_artifact_with_metadata, load_artifact_content_or_metadata
from solace_agent_mesh.agent.utils.config_parser import resolve_instruction_provider
from solace_agent_mesh.agent.utils.context_helpers import get_session_from_callback_context
```

## Complete Usage Guide

### 1. Basic Agent Setup and Configuration

```python
# Import the main SAC components
from solace_agent_mesh.agent.sac.app import SamAgentApp
from solace_agent_mesh.agent.sac.component import SamAgentComponent

# The agent is typically configured via YAML and instantiated by the SAC framework
# Example agent-config.yaml:
"""
app:
  class_name: solace_agent_mesh.agent.sac.app.SamAgentApp
  app_config:
    namespace: "my-org/production"
    agent_name: "customer-support-agent"
    model: "gemini-1.5-pro-latest"
    tools:
      - tool_type: "builtin"
        tool_name: "text_to_speech"
      - tool_type: "builtin"
        tool_name: "list_artifacts"
    agent_card:
      description: "An agent that can answer questions about customer accounts."
    session_service:
      type: "memory"
    artifact_service:
      type: "filesystem"
      base_path: "/tmp/artifacts"
"""
```

### 2. Working with ADK Components

```python
from solace_agent_mesh.agent.adk.setup import load_adk_tools, initialize_adk_agent, initialize_adk_runner
from solace_agent_mesh.agent.adk.services import initialize_session_service, initialize_artifact_service
from solace_agent_mesh.agent.adk.models.lite_llm import LiteLlm

async def setup_adk_agent(component):
    # Initialize services
    session_service = initialize_session_service(component)
    artifact_service = initialize_artifact_service(component)
    
    # Load tools
    loaded_tools, builtin_tools = await load_adk_tools(component)
    
    # Initialize agent with LLM
    agent = initialize_adk_agent(component, loaded_tools, builtin_tools)
    
    # Initialize runner
    runner = initialize_adk_runner(component)
    
    return agent, runner
```

### 3. Custom Tool Development

```python
from solace_agent_mesh.agent.tools.registry import tool_registry
from solace_agent_mesh.agent.tools.tool_definition import BuiltinTool
from google.adk.tools import ToolContext

# Define a custom tool function
async def my_custom_tool(
    query: str,
    tool_context: ToolContext = None,
    tool_config: dict = None
) -> dict:
    """A custom tool that processes queries."""
    # Access the host component
    host_component = tool_context._invocation_context.agent.host_component
    
    # Use agent state
    db_connection = host_component.get_agent_specific_state('db_connection')
    
    # Process the query
    result = await process_query(query, db_connection)
    
    return {"result": result, "status": "success"}

# Register the tool
custom_tool = BuiltinTool(
    name="my_custom_tool",
    description="Processes custom queries",
    function=my_custom_tool,
    category="custom"
)
tool_registry.register(custom_tool)
```

### 4. Artifact Management

```python
from solace_agent_mesh.agent.utils.artifact_helpers import (
    save_artifact_with_metadata,
    load_artifact_content_or_metadata,
    get_artifact_info_list
)
from datetime import datetime, timezone

async def artifact_operations(component, artifact_service):
    # Save an artifact with metadata
    content = b"Hello, world!"
    result = await save_artifact_with_metadata(
        artifact_service=artifact_service,
        app_name=component.get_config()["app_name"],
        user_id="user123",
        session_id="session456",
        filename="greeting.txt",
        content_bytes=content,
        mime_type="text/plain",
        metadata_dict={"source": "custom_tool", "description": "A greeting"},
        timestamp=datetime.now(timezone.utc)
    )
    
    # Load the artifact
    loaded = await load_artifact_content_or_metadata(
        artifact_service=artifact_service,
        app_name=component.get_config()["app_name"],
        user_id="user123",
        session_id="session456",
        filename="greeting.txt",
        version="latest"
    )
    
    # List all artifacts
    artifacts = await get_artifact_info_list(
        artifact_service=artifact_service,
        app_name=component.get_config()["app_name"],
        user_id="user123",
        session_id="session456"
    )
    
    return artifacts
```

### 5. Inter-Agent Communication

```python
from solace_agent_mesh.agent.tools.peer_agent_tool import PeerAgentTool

# Create a peer agent tool (typically done automatically by the framework)
peer_tool = PeerAgentTool("data_analyst_agent", host_component)

# The LLM can then use this tool to delegate tasks:
# "Please use the data_analyst_agent to analyze the sales data in report.csv"

# The framework handles the A2A protocol communication automatically
```

### 6. Audio and Multimedia Tools

```python
from solace_agent_mesh.agent.tools.audio_tools import text_to_speech, multi_speaker_text_to_speech
from solace_agent_mesh.agent.tools.image_tools import create_image_from_description, describe_image

async def multimedia_example(tool_context):
    # Generate speech from text
    tts_result = await text_to_speech(
        text="Welcome to our service!",
        output_filename="welcome.mp3",
        gender="female",
        tone="friendly",
        tool_context=tool_context
    )
    
    # Create a multi-speaker conversation
    conversation_result = await multi_speaker_text_to_speech(
        conversation_text="Alice: Hello there!\nBob: Hi Alice, how are you?",
        speaker_configs=[
            {"name": "Alice", "gender": "female", "tone": "bright"},
            {"name": "Bob", "gender": "male", "tone": "warm"}
        ],
        tool_context=tool_context
    )
    
    # Generate an image
    image_result = await create_image_from_description(
        image_description="A futuristic cityscape at sunset",
        output_filename="cityscape.png",
        tool_context=tool_context
    )
    
    return tts_result, conversation_result, image_result
```

### 7. Testing and Debugging

```python
from solace_agent_mesh.agent.testing.debug_utils import pretty_print_event_history
from solace_agent_mesh.agent.adk.invocation_monitor import InvocationMonitor

# Debug event history in tests
def test_agent_behavior():
    event_history = [
        {"result": {"status": {"state": "EXECUTING"}}},
        {"result": {"status": {"state": "COMPLETED"}}}
    ]
    
    # Print formatted event history for debugging
    pretty_print_event_history(event_history)

# Monitor agent invocations
monitor = InvocationMonitor()
monitor.log_message_event("incoming", "agent/request", {"task": "analyze data"})
```

### 8. Configuration and Context Handling

```python
from solace_agent_mesh.agent.utils.config_parser import resolve_instruction_provider
from solace_agent_mesh.agent.utils.context_helpers import get_session_from_callback_context

# Resolve dynamic instructions
def setup_agent_instructions(component):
    # Static instruction
    static_instruction = resolve_instruction_provider(
        component, 
        "You are a helpful customer service agent."
    )
    
    # Dynamic instruction function
    def dynamic_instruction(context):
        return f"You are assisting user {context.user_id} in session {context.session_id}"
    
    dynamic_provider = resolve_instruction_provider(component, dynamic_instruction)
    
    return static_instruction, dynamic_provider

# Safe context extraction in tools
def my_tool_with_context(tool_context):
    session = get_session_from_callback_context(tool_context)
    original_session_id = get_original_session_id(tool_context._invocation_context)
    
    return {"session_id": original_session_id, "session_data": session}
```

### 9. Complete Agent Implementation Example

```python
from solace_agent_mesh.agent.sac.app import SamAgentApp
from solace_agent_mesh.agent.sac.component import SamAgentComponent
from solace_agent_mesh.agent.adk.setup import load_adk_tools, initialize_adk_agent, initialize_adk_runner
from solace_agent_mesh.agent.tools.registry import tool_registry
from solace_agent_mesh.agent.tools.tool_definition import BuiltinTool

# Custom initialization function
def initialize_my_agent(host_component: SamAgentComponent, config: dict):
    """Custom initialization function for the agent."""
    # Store custom state
    host_component.set_agent_specific_state('custom_data', config.get('custom_data'))
    
    # Set dynamic system instruction
    def dynamic_instruction(context):
        return f"You are a specialized agent for user {context.user_id}"
    
    host_component.set_agent_system_instruction_callback(dynamic_instruction)

# Custom tool
async def my_business_tool(
    query: str,
    tool_context = None,
    tool_config: dict = None
) -> dict:
    """Custom business logic tool."""
    host_component = tool_context._invocation_context.agent.host_component
    custom_data = host_component.get_agent_specific_state('custom_data')
    
    # Process business logic
    result =

# content_hash: 0f715df09d7832604a952539e00e04abb912609b2657b9801bcf5f46a1bf6369

================================================================================

## Section 6: solace_agent_mesh/agent/protocol/protocol_llm.txt

**Source file:** `solace_agent_mesh/agent/protocol/protocol_llm.txt`

# DEVELOPER GUIDE: protocol

## Quick Summary
The `protocol` directory implements the core logic for Agent-to-Agent (A2A) communication. It handles receiving and processing requests, responses, and discovery messages (Agent Cards) over the Solace event mesh. It acts as the bridge between the A2A protocol and the underlying Google ADK execution environment.

## Files Overview
- `__init__.py` - Empty package initialization file
- `event_handlers.py` - Contains the primary logic for handling all A2A protocol events, including routing incoming messages, managing task execution, and handling agent discovery

## Developer API Reference

### __init__.py
**Purpose:** Standard Python package initialization file
**Import:** `from solace_agent_mesh.agent.protocol import *`

This is an empty package initialization file and has no public interfaces.

### event_handlers.py
**Purpose:** Central hub for processing all events related to the A2A protocol. Routes events to appropriate handlers and manages task lifecycle.
**Import:** `from solace_agent_mesh.agent.protocol.event_handlers import process_event, handle_a2a_request, handle_agent_card_message, handle_a2a_response, publish_agent_card`

**Functions:**
- `process_event(component, event: Event) -> None` - Main event router that processes incoming events and delegates to specific handlers based on event type and topic
- `handle_a2a_request(component, message: SolaceMessage) -> None` - Handles incoming A2A request messages, starts ADK runner for SendTask requests, and processes CancelTask requests
- `handle_agent_card_message(component, message: SolaceMessage) -> None` - Processes incoming Agent Card discovery messages and updates peer agent registry
- `handle_a2a_response(component, message: SolaceMessage) -> None` - Handles responses and status updates from peer agents, manages parallel task completion
- `publish_agent_card(component) -> None` - Publishes the agent's capabilities and information to the discovery topic

**Internal Helper Functions:**
- `_register_peer_artifacts_in_parent_context(parent_task_context: "TaskExecutionContext", peer_task_object: Task, log_identifier: str) -> None` - Registers artifacts produced by peer agents in the parent task context
- `_publish_peer_tool_result_notification(component: "SamAgentComponent", correlation_data: Dict[str, Any], payload_to_queue: Any, log_identifier: str) -> None` - Publishes a ToolResultData status update for a completed peer tool call

**Usage Examples:**
```python
# Main event processing (typically called by the SAC framework)
from solace_agent_mesh.agent.protocol.event_handlers import process_event
from solace_ai_connector.common.event import Event, EventType

# Process an incoming event
await process_event(component, event)

# Publish agent discovery card
from solace_agent_mesh.agent.protocol.event_handlers import publish_agent_card

publish_agent_card(component)

# Handle specific message types (usually called internally by process_event)
from solace_agent_mesh.agent.protocol.event_handlers import handle_a2a_request

await handle_a2a_request(component, solace_message)
```

**Key Event Flow:**
1. `process_event()` receives all events and routes based on type (MESSAGE, TIMER, CACHE_EXPIRY)
2. For MESSAGE events, routes to specific handlers based on topic patterns:
   - Agent request topics → `handle_a2a_request()`
   - Discovery topics → `handle_agent_card_message()`
   - Response/status topics → `handle_a2a_response()`
3. For TIMER events, handles periodic agent card publishing
4. For CACHE_EXPIRY events, delegates to component's cache handling

**Dependencies:**
- Requires `SamAgentComponent` instance with proper configuration
- Uses A2A protocol types from `a2a.types`
- Integrates with Google ADK for task execution
- Manages task contexts through `TaskExecutionContext`

# content_hash: 7bb2b04b131e3e12275d860f347ad5830de20541f5bf1d5470c31bf7a38044a3

================================================================================

## Section 7: solace_agent_mesh/agent/sac/sac_llm.txt

**Source file:** `solace_agent_mesh/agent/sac/sac_llm.txt`

# DEVELOPER GUIDE for the directory: sac

## Quick Summary
The `sac` (Solace AI Connector) directory provides the core implementation for hosting a Google ADK (Agent Development Kit) agent within the Solace AI Connector framework. It acts as a bridge, enabling ADK agents to communicate using the A2A (Agent-to-Agent) protocol over Solace messaging. This allows for the creation of distributed, collaborative agent systems where agents can delegate tasks, share information, and work together to solve complex problems.

## Files Overview
- `__init__.py` - Empty package marker file
- `app.py` - Custom SAC App class that automatically configures Solace subscriptions and broker settings for A2A communication
- `component.py` - Main SAC Component that hosts the ADK agent, manages its lifecycle, and handles all A2A protocol messaging
- `patch_adk.py` - Runtime patches for the Google ADK library to enhance or correct its behavior
- `task_execution_context.py` - State management class that encapsulates all runtime information for a single, in-flight A2A task

## Developer API Reference

### app.py
**Purpose:** Provides a custom SAC App class that simplifies the configuration of an A2A agent
**Import:** `from solace_agent_mesh.agent.sac.app import SamAgentApp`

**Classes:**
- `SamAgentApp(app_info: Dict[str, Any], **kwargs)` - Custom App class for SAM Agent Host with namespace prefixing and automatic subscription generation
  - `app_schema: Dict` - Class attribute defining comprehensive configuration schema for agent host validation

**Constants/Variables:**
- `info: Dict[str, str]` - Metadata dictionary about the SamAgentApp class for SAC framework discovery

**Usage Examples:**
```python
# SamAgentApp is typically instantiated by the SAC framework from YAML config
# Example agent-config.yaml:
# app:
#   class_name: solace_agent_mesh.agent.sac.app.SamAgentApp
#   app_config:
#     namespace: "my-org/production"
#     agent_name: "customer-support-agent"
#     model: "gemini-1.5-pro-latest"
#     tools:
#       - tool_type: "builtin"
#         tool_name: "file_search"
#     agent_card:
#       description: "An agent that can answer questions about customer accounts."
#     agent_card_publishing:
#       interval_seconds: 60
#     session_service:
#       type: "memory"
```

### component.py
**Purpose:** Core component that hosts a Google ADK agent and bridges communication to A2A protocol
**Import:** `from solace_agent_mesh.agent.sac.component import SamAgentComponent`

**Classes:**
- `SamAgentComponent(**kwargs)` - Solace AI Connector component that hosts a Google ADK agent
  - `process_event(event: Event) -> None` - Main entry point for all SAC framework events
  - `handle_timer_event(timer_data: Dict[str, Any]) -> None` - Handles scheduled timer events for agent card publishing
  - `handle_cache_expiry_event(cache_data: Dict[str, Any]) -> None` - Handles cache expiry events for peer agent timeouts
  - `finalize_task_success(a2a_context: Dict) -> None` - Async method to finalize successful task completion
  - `finalize_task_canceled(a2a_context: Dict) -> None` - Finalizes task as CANCELED
  - `finalize_task_error(exception: Exception, a2a_context: Dict) -> None` - Async method to finalize failed tasks
  - `cleanup() -> None` - Cleans up resources on component shutdown
  - `set_agent_specific_state(key: str, value: Any) -> None` - Sets key-value pair in agent state dictionary
  - `get_agent_specific_state(key: str, default: Optional[Any] = None) -> Any` - Retrieves value from agent state
  - `get_async_loop() -> Optional[asyncio.AbstractEventLoop]` - Returns dedicated asyncio event loop
  - `set_agent_system_instruction_string(instruction_string: str) -> None` - Sets static system prompt injection
  - `set_agent_system_instruction_callback(callback_function: Callable) -> None` - Sets dynamic system prompt callback
  - `get_gateway_id() -> str` - Returns unique identifier for agent host instance
  - `submit_a2a_task(target_agent_name: str, a2a_message: A2AMessage, user_id: str, user_config: Dict[str, Any], sub_task_id: str) -> str` - Submits task to peer agent
  - `get_agent_context() -> Dict[str, Any]` - Returns agent context for middleware interactions

**Constants/Variables:**
- `info: Dict` - Metadata dictionary for SAC framework
- `CORRELATION_DATA_PREFIX: str` - Prefix for cache keys when tracking peer requests
- `HOST_COMPONENT_VERSION: str` - Version string of the host component

**Usage Examples:**
```python
# Custom initialization function example
from solace_agent_mesh.agent.sac.component import SamAgentComponent

def initialize_my_agent(host_component: SamAgentComponent, config: dict):
    """Custom initialization function for the agent."""
    # Store database connection in agent state
    db_connection = create_database_connection(config.get('db_url'))
    host_component.set_agent_specific_state('db_connection', db_connection)
    
    # Set custom system instruction
    host_component.set_agent_system_instruction_string(
        "You are a specialized customer service agent with access to our database."
    )

# Tool accessing agent state
def my_custom_tool(host_component: SamAgentComponent, query: str) -> str:
    """Tool that uses stored database connection."""
    db_connection = host_component.get_agent_specific_state('db_connection')
    if db_connection:
        return db_connection.execute_query(query)
    return "Database not available"

# Scheduling async work from synchronous code
def schedule_background_task(host_component: SamAgentComponent):
    """Schedule async work on the component's event loop."""
    loop = host_component.get_async_loop()
    if loop:
        asyncio.run_coroutine_threadsafe(my_async_task(), loop)
```

### patch_adk.py
**Purpose:** Contains runtime patches for the Google ADK library to enhance behavior
**Import:** `from solace_agent_mesh.agent.sac.patch_adk import patch_adk`

**Functions:**
- `patch_adk() -> None` - Applies all necessary patches to the ADK library

**Usage Examples:**
```python
from solace_agent_mesh.agent.sac.patch_adk import patch_adk

# Apply patches before using ADK
patch_adk()
```

### task_execution_context.py
**Purpose:** State management class for single, in-flight agent tasks
**Import:** `from solace_agent_mesh.agent.sac.task_execution_context import TaskExecutionContext`

**Classes:**
- `TaskExecutionContext(task_id: str, a2a_context: Dict[str, Any])` - Encapsulates runtime state for a single agent task
  - `cancel() -> None` - Signals that the task should be cancelled
  - `is_cancelled() -> bool` - Checks if cancellation event has been set
  - `append_to_streaming_buffer(text: str) -> None` - Appends text to streaming buffer
  - `flush_streaming_buffer() -> str` - Returns and clears streaming buffer content
  - `get_streaming_buffer_content() -> str` - Returns buffer content without clearing
  - `append_to_run_based_buffer(text: str) -> None` - Appends text to run-based response buffer
  - `register_peer_sub_task(sub_task_id: str, correlation_data: Dict[str, Any]) -> None` - Adds peer sub-task tracking
  - `claim_sub_task_completion(sub_task_id: str) -> Optional[Dict[str, Any]]` - Atomically retrieves and removes sub-task data
  - `register_parallel_call_sent(invocation_id: str) -> None` - Registers new parallel tool call
  - `handle_peer_timeout(sub_task_id: str, correlation_data: Dict, timeout_sec: int, invocation_id: str) -> bool` - Handles peer timeout
  - `record_parallel_result(result: Dict, invocation_id: str) -> bool` - Records parallel tool call result
  - `clear_parallel_invocation_state(invocation_id: str) -> None` - Removes completed invocation state
  - `register_produced_artifact(filename: str, version: int) -> None` - Tracks newly created artifacts
  - `add_artifact_signal(signal: Dict[str, Any]) -> None` - Adds artifact return signal
  - `get_and_clear_artifact_signals() -> List[Dict[str, Any]]` - Retrieves and clears artifact signals
  - `set_event_loop(loop: asyncio.AbstractEventLoop) -> None` - Stores event loop reference
  - `get_event_loop() -> Optional[asyncio.AbstractEventLoop]` - Retrieves stored event loop

**Usage Examples:**
```python
from solace_agent_mesh.agent.sac.task_execution_context import TaskExecutionContext

# Create task context
a2a_context = {
    "logical_task_id": "task-123",
    "user_id": "user-456",
    "session_id": "session-789"
}
task_context = TaskExecutionContext("task-123", a2a_context)

# Use streaming buffer
task_context.append_to_streaming_buffer("Hello ")
task_context.append_to_streaming_buffer("world!")
content = task_context.flush_streaming_buffer()  # Returns "Hello world!"

# Track peer sub-tasks
correlation_data = {
    "peer_agent_name": "math-agent",
    "adk_function_call_id": "call-123"
}
task_context.register_peer_sub_task("sub-task-456", correlation_data)

# Handle completion
completed_data = task_context.claim_sub_task_completion("sub-task-456")
if completed_data:
    print(f"Sub-task completed: {completed_data}")
```

# content_hash: 9e9079e6a746598c6cb0653d11458f19ac567dabc3e8e1f3d138a5fb10153b1c

================================================================================

## Section 8: solace_agent_mesh/agent/testing/testing_llm.txt

**Source file:** `solace_agent_mesh/agent/testing/testing_llm.txt`

## Quick Summary
The `testing` directory provides utilities for testing the A2A (Agent-to-Agent) framework, with a focus on debugging tools that help developers understand test failures by providing readable representations of agent event histories.

## Files Overview
- `__init__.py` - Package initialization file marking the directory as a Python module
- `debug_utils.py` - Debugging utilities including pretty-printing for A2A event history
- `testing_llm.txt` - Documentation file (not a code module)

## Developer API Reference

### debug_utils.py
**Purpose:** Provides debugging utilities for the declarative test framework, including a pretty-printer for A2A event history
**Import:** `from solace_agent_mesh.agent.testing.debug_utils import pretty_print_event_history`

**Functions:**
- `pretty_print_event_history(event_history: List[Dict[str, Any]], max_string_length: int = 200) -> None` - Formats and prints a list of A2A event payloads for debugging, intelligently parsing different event types and truncating long strings for readability

**Usage Examples:**
```python
# Import the debugging utility
from solace_agent_mesh.agent.testing.debug_utils import pretty_print_event_history
from typing import List, Dict, Any

# Example: Debug a failed test by printing event history
event_history: List[Dict[str, Any]] = [
    {
        "result": {
            "status": {
                "state": "EXECUTING",
                "message": {
                    "parts": [
                        {"type": "text", "text": "Processing your request..."}
                    ]
                }
            },
            "final": False
        }
    },
    {
        "error": {
            "code": "TIMEOUT_ERROR",
            "message": "Request timed out after 30 seconds"
        }
    }
]

# Print formatted event history for debugging
pretty_print_event_history(event_history)

# Print with custom string truncation length
pretty_print_event_history(event_history, max_string_length=100)

# Handle empty event history (when test fails before any events)
pretty_print_event_history([])
```

# content_hash: 890ba89aa47c5be30f5ec9cdbb4a05e9ee3bd022e56a56fcc4feea72aac653e8

================================================================================

## Section 9: solace_agent_mesh/agent/tools/tools_llm.txt

**Source file:** `solace_agent_mesh/agent/tools/tools_llm.txt`

# DEVELOPER GUIDE for tools

## Quick Summary
The `tools` directory contains the built-in tool system for the Solace Agent Mesh. It provides a declarative registry-based architecture for ADK tools including artifact management, audio processing, data analysis, image generation, web scraping, and peer agent communication. Tools follow a registry-based pattern where each module defines its functions and registers them with a central `tool_registry` for automatic discovery and dynamic availability.

## Files Overview
- `__init__.py` - Imports all tool modules to trigger declarative registration
- `audio_tools.py` - Text-to-speech, voice selection, audio concatenation and transcription tools
- `builtin_artifact_tools.py` - Artifact CRUD operations, content extraction, and embed processing
- `builtin_data_analysis_tools.py` - Plotly chart generation from JSON/YAML configs
- `general_agent_tools.py` - File conversion (MarkItDown), Mermaid diagram generation
- `image_tools.py` - Image generation, description, editing with various APIs
- `peer_agent_tool.py` - Dynamic tool for delegating tasks to peer agents
- `registry.py` - Singleton registry for tool discovery and management
- `test_tools.py` - Testing utilities (delays, failures, dangling calls)
- `tool_definition.py` - Pydantic model for tool definitions
- `web_tools.py` - Web scraping and content extraction tools

## Developer API Reference

### __init__.py
**Purpose:** Ensures all tool modules are imported to trigger declarative registration
**Import:** `from solace_agent_mesh.agent.tools import *` or `import solace_agent_mesh.agent.tools`

**Usage Examples:**
```python
# Import triggers registration of all tools
from solace_agent_mesh.agent.tools import *

# Or import the package to register all tools
import solace_agent_mesh.agent.tools

# Access the registry to see all registered tools
from solace_agent_mesh.agent.tools.registry import tool_registry
all_tools = tool_registry.get_all_tools()
print(f"Registered {len(all_tools)} tools")
```

### audio_tools.py
**Purpose:** Audio processing tools for TTS, voice selection, and transcription
**Import:** `from solace_agent_mesh.agent.tools.audio_tools import select_voice, text_to_speech, multi_speaker_text_to_speech, concatenate_audio, transcribe_audio`

**Functions:**
- `select_voice(gender: Optional[str] = None, tone: Optional[str] = None, exclude_voices: Optional[List[str]] = None, tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Selects a voice based on gender/tone criteria
- `text_to_speech(text: str, output_filename: Optional[str] = None, voice_name: Optional[str] = None, gender: Optional[str] = None, tone: Optional[str] = None, language: Optional[str] = None, tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Converts text to speech using Gemini TTS
- `multi_speaker_text_to_speech(conversation_text: str, output_filename: Optional[str] = None, speaker_configs: Optional[List[Dict[str, str]]] = None, language: Optional[str] = None, tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Multi-speaker TTS for conversations
- `concatenate_audio(clips_to_join: List[Dict[str, Any]], output_filename: Optional[str] = None, tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Combines multiple audio clips with custom pauses
- `transcribe_audio(audio_filename: str, tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Transcribes audio using OpenAI-compatible API

**Constants/Variables:**
- `VOICE_TONE_MAPPING: Dict[str, List[str]]` - Maps tones to available voices
- `GENDER_TO_VOICE_MAPPING: Dict[str, List[str]]` - Maps genders to available voices
- `ALL_AVAILABLE_VOICES: List[str]` - Complete list of available voice names
- `SUPPORTED_LANGUAGES: Dict[str, str]` - Maps language names to BCP-47 codes
- `DEFAULT_VOICE: str` - Default voice name ("Kore")

**Usage Examples:**
```python
from google.adk.tools import ToolContext
from solace_agent_mesh.agent.tools.audio_tools import text_to_speech, select_voice

# Select a voice with specific criteria
voice_result = await select_voice(
    gender="female",
    tone="friendly",
    tool_context=context
)

# Convert text to speech
result = await text_to_speech(
    text="Hello world",
    voice_name="Kore",
    language="en-US",
    tool_context=context,
    tool_config={"gemini_api_key": "your_key"}
)
```

### builtin_artifact_tools.py
**Purpose:** Comprehensive artifact management and content processing tools
**Import:** `from solace_agent_mesh.agent.tools.builtin_artifact_tools import list_artifacts, load_artifact, signal_artifact_for_return, apply_embed_and_create_artifact, extract_content_from_artifact, append_to_artifact, delete_artifact`

**Functions:**
- `list_artifacts(tool_context: ToolContext = None) -> Dict[str, Any]` - Lists all artifacts with metadata summaries
- `load_artifact(filename: str, version: int, load_metadata_only: bool = False, max_content_length: Optional[int] = None, tool_context: ToolContext = None) -> Dict[str, Any]` - Loads artifact content or metadata
- `signal_artifact_for_return(filename: str, version: int, tool_context: ToolContext = None) -> Dict[str, Any]` - Signals artifact for return to caller
- `apply_embed_and_create_artifact(output_filename: str, embed_directive: str, output_metadata: Optional[Dict[str, Any]] = None, tool_context: ToolContext = None) -> Dict[str, Any]` - Resolves embeds and creates new artifacts
- `extract_content_from_artifact(filename: str, extraction_goal: str, version: Optional[str] = "latest", output_filename_base: Optional[str] = None, tool_context: ToolContext = None) -> Dict[str, Any]` - Uses LLM to extract/transform artifact content
- `append_to_artifact(filename: str, content_chunk: str, mime_type: str, tool_context: ToolContext = None) -> Dict[str, Any]` - Appends content to existing artifacts
- `delete_artifact(filename: str, version: Optional[int] = None, tool_context: ToolContext = None) -> Dict[str, Any]` - Deletes artifact versions

**Usage Examples:**
```python
from solace_agent_mesh.agent.tools.builtin_artifact_tools import load_artifact, list_artifacts

# List all artifacts
artifacts = await list_artifacts(tool_context=context)

# Load latest version of an artifact
result = await load_artifact(
    filename="data.csv",
    version=1,
    load_metadata_only=False,
    tool_context=context
)

# Extract content using LLM
extracted = await extract_content_from_artifact(
    filename="document.pdf",
    extraction_goal="Extract all email addresses",
    tool_context=context
)
```

### builtin_data_analysis_tools.py
**Purpose:** Data visualization and chart generation tools
**Import:** `from solace_agent_mesh.agent.tools.builtin_data_analysis_tools import create_chart_from_plotly_config`

**Functions:**
- `create_chart_from_plotly_config(config_content: str, config_format: Literal["json", "yaml"], output_filename: str, output_format: Optional[str] = "png", tool_context: ToolContext = None) -> Dict[str, Any]` - Generates charts from Plotly configurations

**Usage Examples:**
```python
from solace_agent_mesh.agent.tools.builtin_data_analysis_tools import create_chart_from_plotly_config

# Create chart from JSON config
result = await create_chart_from_plotly_config(
    config_content='{"data": [{"x": [1,2,3], "y": [4,5,6], "type": "scatter"}]}',
    config_format="json",
    output_filename="chart.png",
    tool_context=context
)
```

### general_agent_tools.py
**Purpose:** General utility tools for file conversion and diagram generation
**Import:** `from solace_agent_mesh.agent.tools.general_agent_tools import convert_file_to_markdown, mermaid_diagram_generator`

**Functions:**
- `convert_file_to_markdown(input_filename: str, tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Converts files to Markdown using MarkItDown
- `mermaid_diagram_generator(mermaid_syntax: str, output_filename: Optional[str] = None, tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Generates PNG diagrams from Mermaid syntax

**Usage Examples:**
```python
from solace_agent_mesh.agent.tools.general_agent_tools import mermaid_diagram_generator, convert_file_to_markdown

# Convert PDF to Markdown
markdown_result = await convert_file_to_markdown(
    input_filename="document.pdf",
    tool_context=context
)

# Generate diagram from Mermaid syntax
result = await mermaid_diagram_generator(
    mermaid_syntax="graph TD; A-->B; B-->C;",
    output_filename="diagram.png",
    tool_context=context
)
```

### image_tools.py
**Purpose:** Image generation, description, and editing capabilities
**Import:** `from solace_agent_mesh.agent.tools.image_tools import create_image_from_description, describe_image, describe_audio, edit_image_with_gemini`

**Functions:**
- `create_image_from_description(image_description: str, output_filename: Optional[str] = None, tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Generates images from text descriptions
- `describe_image(image_filename: str, prompt: str = "What is in this image?", tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Describes images using vision APIs
- `describe_audio(audio_filename: str, prompt: str = "What is in this recording?", tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Describes audio using multimodal APIs
- `edit_image_with_gemini(image_filename: str, edit_prompt: str, output_filename: Optional[str] = None, tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Edits images using Gemini

**Usage Examples:**
```python
from solace_agent_mesh.agent.tools.image_tools import create_image_from_description, describe_image

# Generate image from description
result = await create_image_from_description(
    image_description="A sunset over mountains",
    output_filename="sunset.png",
    tool_context=context,
    tool_config={"model": "dall-e-3", "api_key": "key", "api_base": "url"}
)

# Describe an image
description = await describe_image(
    image_filename="photo.jpg",
    prompt="What objects are in this image?",
    tool_context=context,
    tool_config={"model": "gpt-4-vision", "api_key": "key", "api_base": "url"}
)
```

### peer_agent_tool.py
**Purpose:** Dynamic tool for delegating tasks to peer agents
**Import:** `from solace_agent_mesh.agent.tools.peer_agent_tool import PeerAgentTool, ArtifactIdentifier`

**Classes:**
- `ArtifactIdentifier(filename: str, version: Union[str, int] = "latest")` - Identifies specific artifact versions
- `PeerAgentTool(target_agent_name: str, host_component)` - Tool for delegating to specific peer agents
  - `run_async(args: Dict[str, Any], tool_context: ToolContext) -> Any` - Executes peer delegation
  - `is_long_running: bool` - Always True for async delegation

**Usage Examples:**
```python
from solace_agent_mesh.agent.tools.peer_agent_tool import PeerAgentTool, ArtifactIdentifier

# Create tool for specific peer agent (typically done automatically)
peer_tool = PeerAgentTool("data_analyst", host_component)

# Use in agent tool registry
result = await peer_tool.run_async(
    args={"task_description": "Analyze this data", "artifacts": [{"filename": "data.csv"}]},
    tool_context=context
)

# Artifact identifier usage
artifact_id = ArtifactIdentifier("report.pdf", version=2)
```

### registry.py
**Purpose:** Singleton registry for tool discovery and management
**Import:** `from solace_agent_mesh.agent.tools.registry import tool_registry`

**Classes:**
- `_ToolRegistry()` - Singleton registry for built-in tools
  - `register(tool: BuiltinTool) -> None` - Registers a tool definition
  - `get_tool_by_name(name: str) -> Optional[BuiltinTool]` - Retrieves tool by name
  - `get_tools_by_category(category_name: str) -> List[BuiltinTool]` - Gets tools by category
  - `get_all_tools() -> List[BuiltinTool]` - Returns all registered tools
  - `clear() -> None` - Clears registry (testing only)

**Constants/Variables:**
- `tool_registry: _ToolRegistry` - Global singleton instance

**Usage Examples:**
```python
from solace_agent_mesh.agent.tools.registry import tool_registry
from solace_agent_mesh.agent.tools.tool_definition import BuiltinTool

# Get all registered tools
all_tools = tool_registry.get_all_tools()
print(f"Total tools: {len(all_tools)}")

# Register a new tool
tool_registry.register(my_tool_def)

# Get tool by name
tool = tool_registry.get_tool_by_name("my_tool")

# Get all audio tools
audio_tools = tool_registry.get_tools_by_category("audio")
```

### test_tools.py
**Purpose:** Testing utilities for tool behavior validation
**Import:** `from solace_agent_mesh.agent.tools.test_tools import time_delay, always_fail_tool, dangling_tool_call_test_tool`

**Functions:**
- `time_delay(seconds: float, tool_context: ToolContext = None, tool_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]` - Pauses execution for testing
- `always_fail_tool() -> dict` - Always raises exception for error testing
- `dangling_tool_call_test_tool() -> None` - Returns None to test history repair

**Usage Examples:**
```python
from solace_agent_mesh.agent.tools.test_tools import time_delay

# Add delay for testing
result = await time_delay(

# content_hash: f7491febb2c839b6bcdea784c843205240da0fea5de565d830fef6a544524875

================================================================================

## Section 10: solace_agent_mesh/agent/utils/utils_llm.txt

**Source file:** `solace_agent_mesh/agent/utils/utils_llm.txt`

# DEVELOPER GUIDE for utils

## Quick Summary
The `utils` directory provides a collection of helper modules designed to support the core functionality of the agent. These utilities encapsulate common, reusable logic for tasks such as artifact management (saving, loading, schema inference), configuration parsing, and safe interaction with the ADK's invocation context.

## Files Overview
- `__init__.py` - Empty package marker file
- `artifact_helpers.py` - Comprehensive artifact management functions including save/load operations, metadata handling, and schema inference
- `config_parser.py` - Configuration parsing utilities for resolving instruction providers
- `context_helpers.py` - Safe utilities for extracting data from ADK callback and invocation contexts

## Developer API Reference

### artifact_helpers.py
**Purpose:** Comprehensive artifact management with automatic metadata generation, schema inference, and async operations
**Import:** `from solace_agent_mesh.agent.utils.artifact_helpers import save_artifact_with_metadata, load_artifact_content_or_metadata, get_artifact_info_list, is_filename_safe, ensure_correct_extension`

**Functions:**
- `is_filename_safe(filename: str) -> bool` - Validates filename safety (no path traversal, separators, or reserved names)
- `ensure_correct_extension(filename_from_llm: str, desired_extension: str) -> str` - Ensures filename has correct extension
- `save_artifact_with_metadata(artifact_service: BaseArtifactService, app_name: str, user_id: str, session_id: str, filename: str, content_bytes: bytes, mime_type: str, metadata_dict: Dict[str, Any], timestamp: datetime, explicit_schema: Optional[Dict] = None, schema_inference_depth: int = 2, schema_max_keys: int = 20, tool_context: Optional["ToolContext"] = None) -> Dict[str, Any]` - Saves artifact with auto-generated metadata and schema inference
- `load_artifact_content_or_metadata(artifact_service: BaseArtifactService, app_name: str, user_id: str, session_id: str, filename: str, version: Union[int, str], load_metadata_only: bool = False, return_raw_bytes: bool = False, max_content_length: Optional[int] = None, component: Optional[Any] = None, log_identifier_prefix: str = "[ArtifactHelper:load]", encoding: str = "utf-8", error_handling: str = "strict") -> Dict[str, Any]` - Loads artifact content or metadata with flexible options
- `get_artifact_info_list(artifact_service: BaseArtifactService, app_name: str, user_id: str, session_id: str) -> List[ArtifactInfo]` - Retrieves detailed info for all artifacts
- `get_latest_artifact_version(artifact_service: BaseArtifactService, app_name: str, user_id: str, session_id: str, filename: str) -> Optional[int]` - Gets latest version number for an artifact
- `format_metadata_for_llm(metadata: Dict[str, Any]) -> str` - Formats metadata into LLM-friendly text
- `decode_and_get_bytes(content_str: str, mime_type: str, log_identifier: str) -> Tuple[bytes, str]` - Decodes content based on MIME type (base64 for binary, UTF-8 for text)
- `generate_artifact_metadata_summary(component: "SamAgentComponent", artifact_identifiers: List[Dict[str, Any]], user_id: str, session_id: str, app_name: str, header_text: Optional[str] = None) -> str` - Generates YAML summary of multiple artifacts' metadata

**Constants/Variables:**
- `METADATA_SUFFIX: str` - Suffix for metadata files (".metadata.json")
- `DEFAULT_SCHEMA_MAX_KEYS: int` - Default max keys for schema inference (20)

**Usage Examples:**
```python
import asyncio
from datetime import datetime, timezone
from solace_agent_mesh.agent.utils.artifact_helpers import (
    save_artifact_with_metadata,
    load_artifact_content_or_metadata,
    get_artifact_info_list,
    ensure_correct_extension
)

async def artifact_example():
    # Ensure safe filename
    safe_name = ensure_correct_extension("report", "csv")  # -> "report.csv"
    
    # Save artifact with metadata
    csv_data = b"name,age\nAlice,30\nBob,25"
    result = await save_artifact_with_metadata(
        artifact_service=service,
        app_name="my_app",
        user_id="user123",
        session_id="session456",
        filename=safe_name,
        content_bytes=csv_data,
        mime_type="text/csv",
        metadata_dict={"source": "user_upload", "description": "Employee data"},
        timestamp=datetime.now(timezone.utc)
    )
    
    # Load artifact content
    loaded = await load_artifact_content_or_metadata(
        artifact_service=service,
        app_name="my_app",
        user_id="user123", 
        session_id="session456",
        filename=safe_name,
        version="latest"
    )
    
    # List all artifacts
    artifacts = await get_artifact_info_list(
        artifact_service=service,
        app_name="my_app",
        user_id="user123",
        session_id="session456"
    )
```

### config_parser.py
**Purpose:** Resolves configuration values that can be static strings or dynamic callable providers
**Import:** `from solace_agent_mesh.agent.utils.config_parser import resolve_instruction_provider, InstructionProvider`

**Functions:**
- `resolve_instruction_provider(component, config_value: Any) -> Union[str, InstructionProvider]` - Resolves instruction config from string or invoke block

**Usage Examples:**
```python
from solace_agent_mesh.agent.utils.config_parser import resolve_instruction_provider

# Static string instruction
instruction = resolve_instruction_provider(component, "You are a helpful assistant.")
# Returns: "You are a helpful assistant."

# Dynamic instruction provider (from YAML invoke block)
def dynamic_instruction(context):
    return f"Assistant for {context.user_id}"

instruction_func = resolve_instruction_provider(component, dynamic_instruction)
# Returns: the callable function
```

### context_helpers.py
**Purpose:** Safe utilities for extracting information from ADK contexts
**Import:** `from solace_agent_mesh.agent.utils.context_helpers import get_session_from_callback_context, get_original_session_id`

**Functions:**
- `get_session_from_callback_context(callback_context: CallbackContext) -> Session` - Safely extracts Session object from CallbackContext
- `get_original_session_id(invocation_context: Any) -> str` - Extracts base session ID, removing any colon-separated suffixes

**Usage Examples:**
```python
from solace_agent_mesh.agent.utils.context_helpers import (
    get_session_from_callback_context,
    get_original_session_id
)

# In a tool function with callback_context
def my_tool(callback_context):
    # Get full session object
    session = get_session_from_callback_context(callback_context)
    
    # Get original session ID (strips suffixes after colon)
    original_id = get_original_session_id(tool_context._invocation_context)
    # "session123:tool456" -> "session123"
```

# content_hash: 1a5c75fb7c2f7fef4a6476d720c82bdf459656e39ff089be1fe5554bd089e173

================================================================================

## Section 11: solace_agent_mesh/common/a2a/a2a_llm.txt

**Source file:** `solace_agent_mesh/common/a2a/a2a_llm.txt`

# DEVELOPER GUIDE: a2a

## Quick Summary
The `a2a` directory provides a comprehensive abstraction layer for the A2A (Agent-to-Agent) protocol, offering helper functions for creating, consuming, and translating A2A protocol objects. It acts as a facade that insulates applications from the specifics of the underlying a2a-sdk, providing simplified interfaces for messages, artifacts, tasks, events, and protocol-level operations.

## Files Overview
- `__init__.py` - Main entry point exposing all commonly used A2A helpers
- `artifact.py` - Helpers for creating and consuming A2A Artifact objects
- `events.py` - Helpers for creating and consuming A2A asynchronous event objects
- `message.py` - Helpers for creating and consuming A2A Message and Part objects
- `protocol.py` - Helpers for A2A protocol-level concerns like topic construction and JSON-RPC
- `task.py` - Helpers for creating and consuming A2A Task objects
- `translation.py` - Helpers for translating between A2A protocol objects and other domains
- `types.py` - Custom type aliases and models for the A2A helper layer

## Developer API Reference

### __init__.py
**Purpose:** Main entry point that exposes all commonly used A2A helpers for easy access
**Import:** `from solace_agent_mesh.common.a2a import *`

This file re-exports all public functions from the other modules, allowing developers to import everything from the main package.

### artifact.py
**Purpose:** Provides helpers for creating and consuming A2A Artifact objects
**Import:** `from solace_agent_mesh.common.a2a.artifact import create_text_artifact, create_data_artifact, get_artifact_id`

**Functions:**
- `create_text_artifact(name: str, text: str, description: str = "", artifact_id: Optional[str] = None) -> Artifact` - Creates a new Artifact containing a single TextPart
- `create_data_artifact(name: str, data: dict[str, Any], description: str = "", artifact_id: Optional[str] = None) -> Artifact` - Creates a new Artifact containing a single DataPart
- `update_artifact_parts(artifact: Artifact, new_parts: List[ContentPart]) -> Artifact` - Returns a new Artifact with replaced parts
- `get_artifact_id(artifact: Artifact) -> str` - Safely retrieves the ID from an Artifact
- `get_artifact_name(artifact: Artifact) -> Optional[str]` - Safely retrieves the name from an Artifact
- `get_parts_from_artifact(artifact: Artifact) -> List[ContentPart]` - Extracts unwrapped content parts from an Artifact

**Usage Examples:**
```python
from solace_agent_mesh.common.a2a.artifact import create_text_artifact, get_artifact_id

# Create a text artifact
artifact = create_text_artifact(
    name="My Document",
    text="This is the content of my document",
    description="A sample text document"
)

# Get artifact ID
artifact_id = get_artifact_id(artifact)
```

### events.py
**Purpose:** Provides helpers for creating and consuming A2A asynchronous event objects
**Import:** `from solace_agent_mesh.common.a2a.events import create_status_update, create_artifact_update`

**Functions:**
- `create_data_signal_event(task_id: str, context_id: str, signal_data: SignalData, agent_name: str, part_metadata: Optional[Dict[str, Any]] = None) -> TaskStatusUpdateEvent` - Creates a TaskStatusUpdateEvent from signal data
- `create_status_update(task_id: str, context_id: str, message: Message, is_final: bool = False, metadata: Optional[Dict[str, Any]] = None) -> TaskStatusUpdateEvent` - Creates a new TaskStatusUpdateEvent
- `create_artifact_update(task_id: str, context_id: str, artifact: Artifact, append: bool = False, last_chunk: bool = False, metadata: Optional[Dict[str, Any]] = None) -> TaskArtifactUpdateEvent` - Creates a new TaskArtifactUpdateEvent
- `get_message_from_status_update(event: TaskStatusUpdateEvent) -> Optional[Message]` - Extracts Message from TaskStatusUpdateEvent
- `get_data_parts_from_status_update(event: TaskStatusUpdateEvent) -> List[DataPart]` - Extracts DataPart objects from status update
- `get_artifact_from_artifact_update(event: TaskArtifactUpdateEvent) -> Optional[Artifact]` - Extracts Artifact from TaskArtifactUpdateEvent

**Usage Examples:**
```python
from solace_agent_mesh.common.a2a.events import create_status_update
from solace_agent_mesh.common.a2a.message import create_agent_text_message

# Create a status update event
message = create_agent_text_message("Processing your request...")
status_event = create_status_update(
    task_id="task-123",
    context_id="context-456",
    message=message,
    is_final=False
)
```

### message.py
**Purpose:** Provides helpers for creating and consuming A2A Message and Part objects
**Import:** `from solace_agent_mesh.common.a2a.message import create_agent_text_message, create_text_part, get_text_from_message`

**Functions:**
- `create_agent_text_message(text: str, task_id: Optional[str] = None, context_id: Optional[str] = None, message_id: Optional[str] = None) -> Message` - Creates agent message with TextPart
- `create_agent_data_message(data: dict[str, Any], task_id: Optional[str] = None, context_id: Optional[str] = None, message_id: Optional[str] = None, part_metadata: Optional[Dict[str, Any]] = None) -> Message` - Creates agent message with DataPart
- `create_agent_parts_message(parts: List[ContentPart], task_id: Optional[str] = None, context_id: Optional[str] = None, message_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> Message` - Creates agent message with multiple parts
- `create_user_message(parts: List[ContentPart], task_id: Optional[str] = None, context_id: Optional[str] = None, message_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> Message` - Creates user message with multiple parts
- `create_text_part(text: str, metadata: Optional[Dict[str, Any]] = None) -> TextPart` - Creates a TextPart object
- `create_file_part_from_uri(uri: str, name: Optional[str] = None, mime_type: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> FilePart` - Creates FilePart from URI
- `create_file_part_from_bytes(content_bytes: bytes, name: Optional[str] = None, mime_type: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> FilePart` - Creates FilePart from bytes
- `create_data_part(data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None) -> DataPart` - Creates a DataPart object
- `get_text_from_message(message: Message, delimiter: str = "\n") -> str` - Extracts and joins all text content from Message
- `get_data_parts_from_message(message: Message) -> List[DataPart]` - Extracts DataPart objects from Message
- `get_file_parts_from_message(message: Message) -> List[FilePart]` - Extracts FilePart objects from Message
- `get_message_id(message: Message) -> str` - Gets message ID
- `get_context_id(message: Message) -> Optional[str]` - Gets context ID
- `get_task_id(message: Message) -> Optional[str]` - Gets task ID

**Usage Examples:**
```python
from solace_agent_mesh.common.a2a.message import create_agent_text_message, create_text_part, create_user_message

# Create a simple text message
message = create_agent_text_message(
    text="Hello, how can I help you?",
    task_id="task-123",
    context_id="context-456"
)

# Create a user message with multiple parts
text_part = create_text_part("Please analyze this data:")
data_part = create_data_part({"values": [1, 2, 3, 4, 5]})
user_message = create_user_message(
    parts=[text_part, data_part],
    task_id="task-123"
)
```

### protocol.py
**Purpose:** Provides helpers for A2A protocol-level concerns like topic construction and JSON-RPC
**Import:** `from solace_agent_mesh.common.a2a.protocol import get_agent_request_topic, create_send_message_request`

**Constants/Variables:**
- `A2A_VERSION: str` - Current A2A protocol version ("v1")
- `A2A_BASE_PATH: str` - Base path for A2A topics ("a2a/v1")

**Functions:**
- `get_a2a_base_topic(namespace: str) -> str` - Returns base topic prefix for A2A communication
- `get_discovery_topic(namespace: str) -> str` - Returns topic for agent card discovery
- `get_agent_request_topic(namespace: str, agent_name: str) -> str` - Returns topic for sending requests to specific agent
- `get_gateway_status_topic(namespace: str, gateway_id: str, task_id: str) -> str` - Returns topic for publishing status updates to gateway
- `get_gateway_response_topic(namespace: str, gateway_id: str, task_id: str) -> str` - Returns topic for publishing final response to gateway
- `create_send_message_request(message: Message, task_id: str, metadata: Optional[Dict[str, Any]] = None) -> SendMessageRequest` - Creates SendMessageRequest object
- `create_send_streaming_message_request(message: Message, task_id: str, metadata: Optional[Dict[str, Any]] = None) -> SendStreamingMessageRequest` - Creates SendStreamingMessageRequest object
- `create_success_response(result: Any, request_id: Optional[Union[str, int]]) -> JSONRPCResponse` - Creates successful JSON-RPC response
- `create_internal_error_response(message: str, request_id: Optional[Union[str, int]], data: Optional[Dict[str, Any]] = None) -> JSONRPCResponse` - Creates internal error response
- `get_request_id(request: A2ARequest) -> str | int` - Gets JSON-RPC request ID
- `get_request_method(request: A2ARequest) -> str` - Gets JSON-RPC method name
- `topic_matches_subscription(topic: str, subscription: str) -> bool` - Checks if topic matches Solace subscription pattern

**Usage Examples:**
```python
from solace_agent_mesh.common.a2a.protocol import get_agent_request_topic, create_send_message_request
from solace_agent_mesh.common.a2a.message import create_agent_text_message

# Get topic for sending request to an agent
topic = get_agent_request_topic("my-namespace", "my-agent")

# Create a send message request
message = create_agent_text_message("Hello agent!")
request = create_send_message_request(
    message=message,
    task_id="task-123",
    metadata={"priority": "high"}
)
```

### task.py
**Purpose:** Provides helpers for creating and consuming A2A Task objects
**Import:** `from solace_agent_mesh.common.a2a.task import create_initial_task, create_final_task, get_task_id`

**Functions:**
- `create_initial_task(task_id: str, context_id: str, agent_name: str) -> Task` - Creates initial Task with 'submitted' status
- `create_task_status(state: TaskState, message: Optional[Message] = None) -> TaskStatus` - Creates TaskStatus object with current timestamp
- `create_final_task(task_id: str, context_id: str, final_status: TaskStatus, artifacts: Optional[List[Artifact]] = None, metadata: Optional[Dict[str, Any]] = None) -> Task` - Creates final Task object
- `get_task_id(task: Task) -> str` - Gets task ID
- `get_task_context_id(task: Task) -> str` - Gets context ID
- `get_task_status(task: Task) -> TaskState` - Gets task state
- `get_task_history(task: Task) -> Optional[List[Message]]` - Gets task history
- `get_task_artifacts(task: Task) -> Optional[List[Artifact]]` - Gets task artifacts
- `get_task_metadata(task: Task) -> Optional[Dict[str, Any]]` - Gets task metadata

**Usage Examples:**
```python
from solace_agent_mesh.common.a2a.task import create_initial_task, create_task_status
from a2a.types import TaskState

# Create an initial task
task = create_initial_task(
    task_id="task-123",
    context_id="context-456",
    agent_name="my-agent"
)

# Create a task status
status = create_task_status(
    state=TaskState.working,
    message=None
)
```

### translation.py
**Purpose:** Provides helpers for translating between A2A protocol objects and other domains like Google ADK
**Import:** `from solace_agent_mesh.common.a2a.translation import translate_a2a_to_adk_content, format_adk_event_as_a2a`

**Functions:**
- `translate_a2a_to_adk_content(a2a_message: A2AMessage, log_identifier: str) -> adk_types.Content` - Translates A2A Message to ADK Content
- `format_adk_event_as_a2a(adk_event: ADKEvent, a2a_context: Dict, log_identifier: str) -> Tuple[Optional[JSONRPCResponse], List[Tuple[int, Any]]]` - Translates ADK Event to A2A JSON-RPC message
- `format_and_route_adk_event(adk_event: ADKEvent, a2a_context: Dict, component) -> Tuple[Optional[Dict], Optional[str], Optional[Dict], List[Tuple[int, Any]]]` - Formats ADK event to A2A payload and determines routing

**Usage Examples:**
```python
from solace_agent_mesh.common.a2a.translation import translate_a2a_to_adk_content
from solace_agent_mesh.common.a2a.message import create_agent_text_message

# Translate A2A message to ADK content
a2a_message = create_agent_text_message("Hello from A2A!")
adk_content = translate_a2a_to_adk_content(a2a_message, "my-component")
```

### types.py
**Purpose:** Defines custom type aliases and models for the A2A helper layer
**Import:** `from solace_agent_mesh.common.a2a.types import ContentPart, ArtifactInfo`

**Classes:**
- `ArtifactInfo(BaseModel)` - Represents information about an artifact for listing or display
  - `filename: str` - The filename of the artifact
  - `mime_type: Optional[str]` - MIME type of the artifact
  - `size: int` - Size of the artifact in bytes
  - `last_modified: Optional[str]` - Last modification timestamp
  - `description: Optional[str]` - Description of the artifact
  - `schema_definition: Optional[Dict[str, Any]]` - Schema definition for the artifact
  - `uri: Optional[str]` - URI location of the artifact
  - `version: Optional[Union[int, str]]` - Version of

# content_hash: 57a4b476fc7379ff3fb9589a459c54231dd93eec9c305de5812330debc006da6

================================================================================

## Section 12: solace_agent_mesh/common/a2a_spec/a2a_spec_llm.txt

**Source file:** `solace_agent_mesh/common/a2a_spec/a2a_spec_llm.txt`

# DEVELOPER GUIDE: a2a_spec

## Quick Summary
The `a2a_spec` directory contains the complete Agent-to-Agent (A2A) communication specification for the Solace Agent Mesh. It includes the main JSON schema definition (`a2a.json`) that defines all data structures, request/response types, and error codes for agent communication, plus a `schemas/` subdirectory containing specialized validation schemas for various agent signals and progress updates. Together, these provide a comprehensive framework for validating and implementing compliant agent-to-agent communication.

## Files and Subdirectories Overview
- **Direct files:**
  - `a2a.json` - Complete JSON Schema specification for A2A protocol including all data types, requests, responses, and error definitions
  - `a2a_spec_llm.txt` - Developer guide documentation for the A2A specification
- **Subdirectories:**
  - `schemas/` - JSON Schema definitions for agent communication signals (progress updates, tool invocations, LLM calls, artifact creation)

## Developer API Reference

### Direct Files

#### a2a.json
**Purpose:** Complete JSON Schema specification defining the Agent-to-Agent communication protocol
**Import:** This is a JSON Schema file, typically loaded for validation purposes

**Key Schema Definitions:**
- **AgentCard** - Self-describing manifest for agents with capabilities, skills, and endpoints
- **Message** - Individual messages in agent conversations with parts (text, files, data)
- **Task** - Stateful operations/conversations between clients and agents
- **A2ARequest/A2AResponse** - All supported JSON-RPC request and response types
- **Security Schemes** - OAuth2, API Key, mTLS, and other authentication methods
- **Error Types** - Standard JSON-RPC and A2A-specific error definitions

**Core Data Structures:**
```typescript
// Agent Card - describes agent capabilities
AgentCard {
  name: string
  description: string
  url: string
  skills: AgentSkill[]
  capabilities: AgentCapabilities
  security: SecurityRequirement[]
  // ... additional fields
}

// Message - conversation content
Message {
  messageId: string
  role: "user" | "agent"
  parts: Part[] // TextPart | FilePart | DataPart
  taskId?: string
  contextId?: string
}

// Task - stateful operation
Task {
  id: string
  contextId: string
  status: TaskStatus
  history?: Message[]
  artifacts?: Artifact[]
}
```

#### a2a_spec_llm.txt
**Purpose:** Developer documentation and usage guide for the A2A specification
**Import:** Documentation file for reference

### Subdirectory APIs

#### schemas/
**Purpose:** Provides JSON Schema definitions for agent communication signals and progress updates
**Key Exports:** Schema definitions for progress tracking, tool invocations, LLM calls, and artifact creation
**Import Examples:**
```python
import json
from jsonschema import validate

# Load and use schemas for validation
with open('solace_agent_mesh/common/a2a_spec/schemas/agent_progress_update.json') as f:
    progress_schema = json.load(f)
```

**Available Schemas:**
- `agent_progress_update.json` - General progress status messages
- `artifact_creation_progress.json` - File/artifact creation tracking with chunked data
- `llm_invocation.json` - LLM model invocation signals
- `tool_invocation_start.json` - Tool execution start notifications
- `tool_result.json` - Tool execution completion results

## Complete Usage Guide

### 1. Loading and Using the A2A Schema

```python
import json
from jsonschema import validate, Draft7Validator

# Load the main A2A schema
with open('solace_agent_mesh/common/a2a_spec/a2a.json') as f:
    a2a_schema = json.load(f)

# Create validator for specific types
def validate_agent_card(card_data):
    """Validate an AgentCard against the schema"""
    card_schema = a2a_schema['definitions']['AgentCard']
    validate(instance=card_data, schema=card_schema)

def validate_message(message_data):
    """Validate a Message against the schema"""
    message_schema = a2a_schema['definitions']['Message']
    validate(instance=message_data, schema=message_schema)

def validate_request(request_data):
    """Validate an A2A request"""
    request_schema = a2a_schema['definitions']['A2ARequest']
    validate(instance=request_data, schema=request_schema)
```

### 2. Creating Valid A2A Data Structures

```python
# Create a valid Message
message = {
    "kind": "message",
    "messageId": "msg-123",
    "role": "user",
    "parts": [
        {
            "kind": "text",
            "text": "Hello, can you help me with a task?"
        }
    ]
}

# Create a SendMessage request
send_request = {
    "jsonrpc": "2.0",
    "id": "req-456",
    "method": "message/send",
    "params": {
        "message": message
    }
}

# Validate the request
validate_request(send_request)
```

### 3. Using Agent Communication Schemas

```python
import json
from jsonschema import validate

# Load and validate progress update
with open('solace_agent_mesh/common/a2a_spec/schemas/agent_progress_update.json') as f:
    progress_schema = json.load(f)

progress_update = {
    "type": "agent_progress_update",
    "status_text": "Processing your request..."
}
validate(instance=progress_update, schema=progress_schema)

# Load and validate tool invocation
with open('solace_agent_mesh/common/a2a_spec/schemas/tool_invocation_start.json') as f:
    tool_schema = json.load(f)

tool_invocation = {
    "type": "tool_invocation_start",
    "tool_name": "file_reader",
    "tool_args": {"filepath": "/data/file.txt"},
    "function_call_id": "call_123"
}
validate(instance=tool_invocation, schema=tool_schema)

# Load and validate tool result
with open('solace_agent_mesh/common/a2a_spec/schemas/tool_result.json') as f:
    result_schema = json.load(f)

tool_result = {
    "type": "tool_result",
    "tool_name": "file_reader",
    "result_data": {"content": "File contents...", "size": 1024},
    "function_call_id": "call_123"
}
validate(instance=tool_result, schema=result_schema)
```

### 4. Working with Agent Cards

```python
# Create a complete AgentCard
agent_card = {
    "name": "Document Processor",
    "description": "Agent that processes and analyzes documents",
    "url": "https://api.example.com/agent",
    "version": "1.0.0",
    "protocolVersion": "0.3.0",
    "capabilities": {
        "streaming": True,
        "pushNotifications": False,
        "stateTransitionHistory": True
    },
    "defaultInputModes": ["text/plain", "application/pdf"],
    "defaultOutputModes": ["text/plain", "application/json"],
    "skills": [
        {
            "id": "document-analysis",
            "name": "Document Analysis",
            "description": "Analyze and extract information from documents",
            "tags": ["document", "analysis", "extraction"]
        }
    ]
}

# Validate the agent card
validate_agent_card(agent_card)
```

### 5. Artifact Creation Progress Tracking

```python
# Load artifact creation schema
with open('solace_agent_mesh/common/a2a_spec/schemas/artifact_creation_progress.json') as f:
    artifact_schema = json.load(f)

# Track artifact creation with chunked data
artifact_progress = {
    "type": "artifact_creation_progress",
    "filename": "report.pdf",
    "bytes_saved": 1024,
    "artifact_chunk": "JVBERi0xLjQKJcOkw7zDtsO..."  # Base64 encoded chunk
}
validate(instance=artifact_progress, schema=artifact_schema)
```

### 6. LLM Invocation Tracking

```python
# Load LLM invocation schema
with open('solace_agent_mesh/common/a2a_spec/schemas/llm_invocation.json') as f:
    llm_schema = json.load(f)

# Track LLM calls
llm_invocation = {
    "type": "llm_invocation",
    "request": {
        "model": "gpt-4",
        "messages": [{"role": "user", "content": "Analyze this data"}],
        "temperature": 0.7
    }
}
validate(instance=llm_invocation, schema=llm_schema)
```

### 7. Complete Request/Response Flow with Progress Tracking

```python
# 1. Create and send a message
message = {
    "kind": "message",
    "messageId": "msg-001",
    "role": "user",
    "parts": [{"kind": "text", "text": "Analyze this document"}]
}

request = {
    "jsonrpc": "2.0",
    "id": "req-001",
    "method": "message/send",
    "params": {
        "message": message,
        "configuration": {
            "blocking": False,
            "acceptedOutputModes": ["text/plain", "application/json"]
        }
    }
}

# 2. Send progress updates during processing
def send_progress_update(status_text):
    progress = {
        "type": "agent_progress_update",
        "status_text": status_text
    }
    # Validate and send progress update
    validate(instance=progress, schema=progress_schema)
    return progress

# 3. Track tool invocations
def track_tool_invocation(tool_name, args, call_id):
    invocation = {
        "type": "tool_invocation_start",
        "tool_name": tool_name,
        "tool_args": args,
        "function_call_id": call_id
    }
    validate(instance=invocation, schema=tool_schema)
    return invocation

# 4. Track tool results
def track_tool_result(tool_name, result_data, call_id):
    result = {
        "type": "tool_result",
        "tool_name": tool_name,
        "result_data": result_data,
        "function_call_id": call_id
    }
    validate(instance=result, schema=result_schema)
    return result
```

### 8. Comprehensive Schema Validation Utilities

```python
class A2AValidator:
    """Utility class for A2A schema validation"""
    
    def __init__(self, schema_dir='solace_agent_mesh/common/a2a_spec'):
        self.schema_dir = schema_dir
        self.main_schema = self._load_main_schema()
        self.signal_schemas = self._load_signal_schemas()
    
    def _load_main_schema(self):
        with open(f'{self.schema_dir}/a2a.json') as f:
            return json.load(f)
    
    def _load_signal_schemas(self):
        schemas = {}
        schema_files = [
            'agent_progress_update.json',
            'artifact_creation_progress.json',
            'llm_invocation.json',
            'tool_invocation_start.json',
            'tool_result.json'
        ]
        for filename in schema_files:
            with open(f'{self.schema_dir}/schemas/{filename}') as f:
                schema_name = filename.replace('.json', '')
                schemas[schema_name] = json.load(f)
        return schemas
    
    def validate_definition(self, data, definition_name):
        """Validate data against a specific A2A definition"""
        schema = self.main_schema['definitions'][definition_name]
        validate(instance=data, schema=schema)
    
    def validate_signal(self, data, signal_type):
        """Validate agent communication signal"""
        schema = self.signal_schemas[signal_type]
        validate(instance=data, schema=schema)
    
    def validate_a2a_message(self, message_data):
        """Validate a complete A2A message"""
        self.validate_definition(message_data, 'Message')
    
    def validate_agent_card(self, card_data):
        """Validate an agent card"""
        self.validate_definition(card_data, 'AgentCard')
    
    def validate_task(self, task_data):
        """Validate a task object"""
        self.validate_definition(task_data, 'Task')

# Usage example
validator = A2AValidator()

# Validate main A2A objects
validator.validate_agent_card(agent_card)
validator.validate_a2a_message(message)

# Validate communication signals
validator.validate_signal(progress_update, 'agent_progress_update')
validator.validate_signal(tool_invocation, 'tool_invocation_start')
validator.validate_signal(tool_result, 'tool_result')
validator.validate_signal(artifact_progress, 'artifact_creation_progress')
validator.validate_signal(llm_invocation, 'llm_invocation')
```

### 9. Error Handling with A2A Error Types

```python
# Create A2A-specific errors using the schema definitions
task_not_found_error = {
    "code": -32001,
    "message": "Task not found",
    "data": {"taskId": "task-123"}
}

content_type_error = {
    "code": -32005,
    "message": "Incompatible content types",
    "data": {"requested": "image/png", "supported": ["text/plain", "application/json"]}
}

# Create error response
error_response = {
    "jsonrpc": "2.0",
    "id": "req-456",
    "error": task_not_found_error
}

# Validate error response
validator.validate_definition(error_response, 'JSONRPCErrorResponse')
```

This comprehensive guide shows how to use both the main A2A specification and the specialized signal schemas together to build compliant agent-to-agent communication systems in the Solace Agent Mesh, including progress tracking, tool invocation monitoring, LLM call tracking, and artifact creation progress.

# content_hash: f9b143dcd949464f8cf51960be146ae5a87bdd25fd2af5cd93c400925212bb24

================================================================================

## Section 13: solace_agent_mesh/common/a2a_spec/schemas/schemas_llm.txt

**Source file:** `solace_agent_mesh/common/a2a_spec/schemas/schemas_llm.txt`

# DEVELOPER GUIDE: schemas

## Quick Summary
This directory contains JSON Schema definitions for various agent-to-agent (A2A) communication signals in the Solace Agent Mesh. These schemas define the structure and validation rules for different types of progress updates, tool invocations, and LLM interactions that agents can send to each other.

## Files Overview
- `agent_progress_update.json` - Schema for general agent progress status messages
- `artifact_creation_progress.json` - Schema for tracking file/artifact creation progress with chunked data
- `llm_invocation.json` - Schema for LLM model invocation signals
- `tool_invocation_start.json` - Schema for tool execution start notifications
- `tool_result.json` - Schema for tool execution completion results
- `schemas_llm.txt` - Previous developer guide (legacy documentation)

## Developer API Reference

### agent_progress_update.json
**Purpose:** Defines the schema for agent progress update signals that communicate human-readable status messages between agents.

**Schema Structure:**
```json
{
  "type": "agent_progress_update",
  "status_text": "string"
}
```

**Properties:**
- `type: "agent_progress_update"` - Constant identifier for this signal type (required)
- `status_text: string` - Human-readable progress message (required)

**Usage Examples:**
```python
import json
from jsonschema import validate

# Load and use schema
with open('solace_agent_mesh/common/a2a_spec/schemas/agent_progress_update.json') as f:
    schema = json.load(f)

# Valid data example
data = {
    "type": "agent_progress_update",
    "status_text": "Analyzing the report..."
}
validate(instance=data, schema=schema)
```

### artifact_creation_progress.json
**Purpose:** Defines the schema for tracking progress during file or artifact creation operations with chunked data transfer.

**Schema Structure:**
```json
{
  "type": "artifact_creation_progress",
  "filename": "string",
  "bytes_saved": "integer",
  "artifact_chunk": "string"
}
```

**Properties:**
- `type: "artifact_creation_progress"` - Constant identifier for this signal type (required)
- `filename: string` - Name of the artifact being created (required)
- `bytes_saved: integer` - Number of bytes saved so far (required)
- `artifact_chunk: string` - The chunk of artifact data that was saved in this update (required)

**Usage Examples:**
```python
import json
from jsonschema import validate

# Load and use schema
with open('solace_agent_mesh/common/a2a_spec/schemas/artifact_creation_progress.json') as f:
    schema = json.load(f)

# Valid data example
data = {
    "type": "artifact_creation_progress",
    "filename": "report.pdf",
    "bytes_saved": 1024,
    "artifact_chunk": "JVBERi0xLjQKJcOkw7zDtsO..."
}
validate(instance=data, schema=schema)
```

### llm_invocation.json
**Purpose:** Defines the schema for LLM invocation signals that communicate when an agent is calling a language model.

**Schema Structure:**
```json
{
  "type": "llm_invocation",
  "request": "object"
}
```

**Properties:**
- `type: "llm_invocation"` - Constant identifier for this signal type (required)
- `request: object` - Sanitized representation of the LlmRequest object sent to the model (required)

**Usage Examples:**
```python
import json
from jsonschema import validate

# Load and use schema
with open('solace_agent_mesh/common/a2a_spec/schemas/llm_invocation.json') as f:
    schema = json.load(f)

# Valid data example
data = {
    "type": "llm_invocation",
    "request": {
        "model": "gpt-4",
        "messages": [{"role": "user", "content": "Analyze this data"}],
        "temperature": 0.7
    }
}
validate(instance=data, schema=schema)
```

### tool_invocation_start.json
**Purpose:** Defines the schema for tool invocation start signals that notify when an agent begins executing a tool.

**Schema Structure:**
```json
{
  "type": "tool_invocation_start",
  "tool_name": "string",
  "tool_args": "object",
  "function_call_id": "string"
}
```

**Properties:**
- `type: "tool_invocation_start"` - Constant identifier for this signal type (required)
- `tool_name: string` - Name of the tool being called (required)
- `tool_args: object` - Arguments passed to the tool (required)
- `function_call_id: string` - ID from the LLM's function call (required)

**Usage Examples:**
```python
import json
from jsonschema import validate

# Load and use schema
with open('solace_agent_mesh/common/a2a_spec/schemas/tool_invocation_start.json') as f:
    schema = json.load(f)

# Valid data example
data = {
    "type": "tool_invocation_start",
    "tool_name": "file_reader",
    "tool_args": {
        "filepath": "/path/to/file.txt",
        "encoding": "utf-8"
    },
    "function_call_id": "call_abc123"
}
validate(instance=data, schema=schema)
```

### tool_result.json
**Purpose:** Defines the schema for tool execution result signals that communicate the completion and results of tool invocations.

**Schema Structure:**
```json
{
  "type": "tool_result",
  "tool_name": "string",
  "result_data": "any",
  "function_call_id": "string"
}
```

**Properties:**
- `type: "tool_result"` - Constant identifier for this signal type (required)
- `tool_name: string` - Name of the tool that was called (required)
- `result_data: any` - The data returned by the tool (required, can be any type)
- `function_call_id: string` - ID from the LLM's function call that this result corresponds to (required)

**Usage Examples:**
```python
import json
from jsonschema import validate

# Load and use schema
with open('solace_agent_mesh/common/a2a_spec/schemas/tool_result.json') as f:
    schema = json.load(f)

# Valid data example
data = {
    "type": "tool_result",
    "tool_name": "file_reader",
    "result_data": {
        "content": "File contents here...",
        "size": 1024,
        "encoding": "utf-8"
    },
    "function_call_id": "call_abc123"
}
validate(instance=data, schema=schema)
```

**Common Usage Pattern:**
```python
import json
from jsonschema import validate
from pathlib import Path

def validate_a2a_signal(signal_data: dict, schema_name: str) -> bool:
    """Validate A2A signal data against its schema."""
    schema_path = Path(f"solace_agent_mesh/common/a2a_spec/schemas/{schema_name}.json")
    
    with open(schema_path) as f:
        schema = json.load(f)
    
    try:
        validate(instance=signal_data, schema=schema)
        return True
    except Exception as e:
        print(f"Validation failed: {e}")
        return False

# Example usage
progress_data = {
    "type": "agent_progress_update",
    "status_text": "Processing request..."
}

if validate_a2a_signal(progress_data, "agent_progress_update"):
    print("Signal is valid!")
```

# content_hash: 3e5a2f693dc3f4536f9c958168677fe2e3a0e18b3dc319891aceccb5ccfdda8a

================================================================================

## Section 14: solace_agent_mesh/common/common_llm.txt

**Source file:** `solace_agent_mesh/common/common_llm.txt`

# DEVELOPER GUIDE: common

## Quick Summary
The `common` directory provides the foundational infrastructure for Agent-to-Agent (A2A) communication within the Solace Agent Mesh. It establishes the core protocol, data types, and message translation logic that underpins all interactions between AI agents and gateways.

The architecture is designed for clarity and extensibility. Core, low-level definitions are located in **direct files**:
- `types.py` defines the canonical data structures (e.g., `Message`, `Task`, `AgentCard`).
- `a2a_protocol.py` handles the construction of Solace topics and the translation between A2A and Google ADK message formats.
- `agent_registry.py` provides a simple, thread-safe mechanism for discovering and tracking available agents.

This foundation is then leveraged by specialized **subdirectories**, which provide higher-level, ready-to-use components:
- `client/`: A complete client library for discovering and interacting with remote agents.
- `server/`: A stand-alone server implementation for building A2A-compliant agents.
- `middleware/`: A pluggable framework for customizing configuration and feature access.
- `services/`: A factory-based system for integrating identity and other external data sources.
- `utils/`: A collection of cross-cutting utilities for caching, logging, and dynamic content processing.

Together, these components form a cohesive ecosystem, enabling developers to either build new agents from scratch using the `server` components or interact with existing agents using the `client` library, all while relying on the same underlying protocol and types.

## Files and Subdirectories Overview
- **Direct files:**
  - `__init__.py`: Package initialization file.
  - `a2a_protocol.py`: Handles A2A topic construction and translation between A2A and ADK message formats.
  - `agent_registry.py`: A thread-safe registry for managing discovered agent cards.
  - `constants.py`: Common constants used across the system.
  - `data_parts.py`: Pydantic models for structured data payloads used in A2A DataPart objects.
  - `exceptions.py`: Custom exceptions for Solace Agent Mesh.
  - `types.py`: Contains all Pydantic models for A2A protocol messages, tasks, and data structures.
- **Subdirectories:**
  - `a2a/`: Comprehensive abstraction layer providing helper functions for creating, consuming, and translating A2A protocol objects.
  - `a2a_spec/`: Complete JSON Schema specification for the A2A protocol.
  - `client/`: Provides a high-level client for discovering and communicating with remote A2A agents.
  - `middleware/`: A pluggable framework for configuration resolution and system extensibility.
  - `server/`: A complete A2A server implementation with JSON-RPC support and task management.
  - `services/`: Provides shared services like identity management using a factory pattern.
  - `utils/`: Contains common utility functions and an embedded expression processing system.

## Developer API Reference

### Direct Files

#### a2a_protocol.py
**Purpose:** Provides the core functions for constructing Solace topics according to the A2A specification and for translating messages between the A2A format and the Google ADK format.
**Import:** `from solace_agent_mesh.common.a2a_protocol import get_agent_request_topic, translate_a2a_to_adk_content`

**Classes/Functions/Constants:**
- **Constants**:
  - `A2A_VERSION: str`: The current version of the A2A protocol (e.g., "v1").
  - `A2A_BASE_PATH: str`: The base path used in all A2A topics (e.g., "a2a/v1").
- **Topic Construction Functions**:
  - `get_a2a_base_topic(namespace: str) -> str`: Returns the base topic prefix for all A2A communication.
  - `get_discovery_topic(namespace: str) -> str`: Returns the topic for agent card discovery.
  - `get_agent_request_topic(namespace: str, agent_name: str) -> str`: Returns the topic for sending requests to a specific agent.
  - `get_gateway_status_topic(namespace: str, gateway_id: str, task_id: str) -> str`: Returns the topic for an agent to publish status updates to a gateway.
  - `get_gateway_response_topic(namespace: str, gateway_id: str, task_id: str) -> str`: Returns the topic for an agent to publish final responses to a gateway.
  - `get_client_response_topic(namespace: str, client_id: str) -> str`: Returns the topic for publishing final responses to a specific client.
  - `get_client_status_topic(namespace: str, client_id: str, task_id: str) -> str`: Returns the topic for publishing status updates to a specific client.
- **Message Translation Functions**:
  - `translate_a2a_to_adk_content(a2a_message: A2AMessage, log_identifier: str) -> adk_types.Content`: Translates an A2A `Message` object into the Google ADK `Content` format.
  - `format_adk_event_as_a2a(...) -> Tuple[Optional[JSONRPCResponse], ...]`: Translates an ADK `Event` into an A2A `JSONRPCResponse` containing a `TaskStatusUpdateEvent`.
  - `format_and_route_adk_event(...) -> Tuple[Optional[Dict], Optional[str], ...]`: A higher-level wrapper that formats an ADK event and determines the correct Solace topic to publish it to.

#### agent_registry.py
**Purpose:** Provides a simple, thread-safe, in-memory store for discovered `AgentCard` objects. This is useful for components that need to keep track of available agents in the network.
**Import:** `from solace_agent_mesh.common.agent_registry import AgentRegistry`

**Classes/Functions/Constants:**
- **`AgentRegistry`**: A thread-safe class for storing and managing agent cards.
  - `add_or_update_agent(self, agent_card: AgentCard)`: Adds a new agent or updates an existing one.
  - `get_agent(self, agent_name: str) -> Optional[AgentCard]`: Retrieves an agent card by its unique name.
  - `get_agent_names(self) -> List[str]`: Returns a sorted list of all discovered agent names.
  - `clear(self)`: Clears all agents from the registry.

#### constants.py
**Purpose:** Defines common constants used throughout the Solace Agent Mesh system.
**Import:** `from solace_agent_mesh.common.constants import DEFAULT_COMMUNICATION_TIMEOUT`

**Classes/Functions/Constants:**
- `DEFAULT_COMMUNICATION_TIMEOUT: int`: Default timeout for communication operations (600 seconds / 10 minutes).

#### data_parts.py
**Purpose:** Defines Pydantic models for structured data payloads used in A2A DataPart objects, corresponding to JSON schemas for agent communication signals.
**Import:** `from solace_agent_mesh.common.data_parts import ToolInvocationStartData, LlmInvocationData`

**Classes/Functions/Constants:**
- **`ToolInvocationStartData`**: Data model for tool invocation start signals.
  - `type: Literal["tool_invocation_start"]`: The constant type identifier.
  - `tool_name: str`: The name of the tool being called.
  - `tool_args: Dict[str, Any]`: The arguments passed to the tool.
  - `function_call_id: str`: The ID from the LLM's function call.
- **`LlmInvocationData`**: Data model for LLM invocation signals.
  - `type: Literal["llm_invocation"]`: The constant type identifier.
  - `request: Dict[str, Any]`: A sanitized representation of the LlmRequest object.
- **`AgentProgressUpdateData`**: Data model for agent progress update signals.
  - `type: Literal["agent_progress_update"]`: The constant type identifier.
  - `status_text: str`: A human-readable progress message.
- **`ArtifactCreationProgressData`**: Data model for artifact creation progress signals.
  - `type: Literal["artifact_creation_progress"]`: The constant type identifier.
  - `filename: str`: The name of the artifact being created.
  - `bytes_saved: int`: The number of bytes saved so far.
  - `artifact_chunk: str`: The chunk of artifact data that was saved in this progress update.
- **`ToolResultData`**: Data model for tool execution result signals.
  - `type: Literal["tool_result"]`: The constant type identifier.
  - `tool_name: str`: The name of the tool that was called.
  - `result_data: Any`: The data returned by the tool.
  - `function_call_id: str`: The ID from the LLM's function call.

#### exceptions.py
**Purpose:** Defines custom exceptions specific to the Solace Agent Mesh system.
**Import:** `from solace_agent_mesh.common.exceptions import MessageSizeExceededError`

**Classes/Functions/Constants:**
- **`MessageSizeExceededError(Exception)`**: Raised when a message exceeds the maximum allowed size.
  - `__init__(self, actual_size: int, max_size: int, message: str = None)`: Initialize with size information.
  - `actual_size: int`: The actual size of the message in bytes.
  - `max_size: int`: The maximum allowed size in bytes.

#### types.py
**Purpose:** Defines all the Pydantic data models that constitute the A2A protocol. These types ensure data consistency and provide validation across all components.
**Import:** `from solace_agent_mesh.common.types import Message, Task, AgentCard, JSONRPCRequest, TaskState`

**Classes/Functions/Constants:**
- **Core Data Structures**:
  - `Message`: Represents a message from a user or agent, containing a list of `Part` objects.
  - `Part`: A discriminated union of `TextPart`, `FilePart`, and `DataPart`.
  - `Task`: The central object representing a complete task, including its ID, status, history, and artifacts.
  - `TaskStatus`: Describes the current state of a task (e.g., `WORKING`, `COMPLETED`).
  - `TaskState(Enum)`: An enumeration of all possible task states.
  - `AgentCard`: A comprehensive description of an agent's identity, capabilities, and skills.
  - `Artifact`: Represents a task output, such as a generated file or structured data.
- **JSON-RPC Structures**:
  - `JSONRPCRequest`: The base model for all JSON-RPC requests.
  - `JSONRPCResponse`: The base model for all JSON-RPC responses.
  - `SendTaskRequest`, `GetTaskRequest`, etc.: Specific request types inheriting from `JSONRPCRequest`.
- **Error Structures**:
  - `JSONRPCError`: The base model for errors.
  - `InternalError`, `TaskNotFoundError`, etc.: Specific error types inheriting from `JSONRPCError`.

### Subdirectory APIs

#### a2a/
**Purpose:** Comprehensive abstraction layer providing helper functions for creating, consuming, and translating A2A protocol objects
**Key Exports:** Helper functions for messages, tasks, artifacts, events, and protocol operations
**Import Examples:**
```python
from solace_agent_mesh.common.a2a import create_agent_text_message, create_initial_task, translate_a2a_to_adk_content
from solace_agent_mesh.common.a2a.message import create_text_part, get_text_from_message
from solace_agent_mesh.common.a2a.protocol import get_agent_request_topic, create_send_message_request
```

#### a2a_spec/
**Purpose:** Contains the complete Agent-to-Agent (A2A) communication specification including JSON schema definitions
**Key Exports:** JSON Schema specifications for A2A protocol and agent communication signals
**Import Examples:**
```python
import json
from jsonschema import validate

# Load main A2A schema
with open('solace_agent_mesh/common/a2a_spec/a2a.json') as f:
    a2a_schema = json.load(f)
```

#### client/
**Purpose:** Provides a high-level, asynchronous client library for discovering and interacting with remote A2A agents.
**Key Exports:** `A2AClient`, `A2ACardResolver`
**Import Examples:**
```python
from solace_agent_mesh.common.client import A2AClient, A2ACardResolver
```

#### middleware/
**Purpose:** A pluggable middleware framework for customizing system behavior, such as resolving user-specific configurations and feature flags.
**Key Exports:** `ConfigResolver`, `MiddlewareRegistry`
**Import Examples:**
```python
from solace_agent_mesh.common.middleware import ConfigResolver, MiddlewareRegistry
```

#### server/
**Purpose:** A complete, stand-alone server for building A2A-compliant agents, handling HTTP requests, JSON-RPC, and task lifecycle management.
**Key Exports:** `A2AServer`, `TaskManager`, `InMemoryTaskManager`
**Import Examples:**
```python
from solace_agent_mesh.common.server import A2AServer, TaskManager, InMemoryTaskManager
```

#### services/
**Purpose:** A factory-based system for integrating external data sources for identity, employee information, and more.
**Key Exports:** `BaseIdentityService`, `create_identity_service`
**Import Examples:**
```python
from solace_agent_mesh.common.services.identity_service import create_identity_service, BaseIdentityService
```

#### utils/
**Purpose:** A collection of cross-cutting utilities for caching, logging, MIME type handling, and dynamic content processing.
**Key Exports:** `InMemoryCache`, `is_text_based_mime_type`, `resolve_embeds_in_string`
**Import Examples:**
```python
from solace_agent_mesh.common.utils.in_memory_cache import InMemoryCache
from solace_agent_mesh.common.utils import is_text_based_mime_type
from solace_agent_mesh.common.utils.embeds import resolve_embeds_recursively_in_string
```

## Complete Usage Guide

### 1. Basic A2A Protocol Usage
This example shows how to use the core protocol functions and types to build A2A communication.

```python
import uuid
from datetime import datetime, timezone
from solace_agent_mesh.common.a2a_protocol import (
    get_agent_request_topic, 
    get_gateway_status_topic,
    translate_a2a_to_adk_content
)
from solace_agent_mesh.common.types import (
    Message, 
    TextPart, 
    Task, 
    TaskStatus, 
    TaskState,
    AgentCard
)
from solace_agent_mesh.common.agent_registry import AgentRegistry

# Create a message
message = Message(
    role="user",
    parts=[TextPart(text="Hello, can you help me analyze this document?")]
)

# Translate to ADK format for processing
adk_content = translate_a2a_to_adk_content(message, "[Example]")
print(f"ADK Content: {adk_content}")

# Generate topic names for communication
namespace = "my-company/ai-agents"
agent_name = "document-analyzer"
gateway_id = "gateway-001"
task_id = str(uuid.uuid4())

request_topic = get_agent_request_topic(namespace, agent_name)
status_topic = get_gateway_status_topic(namespace, gateway_id, task_id)

print(f"Request Topic: {request_topic}")
print(f"Status Topic: {status_topic}")

# Use agent registry to track discovered agents
registry = AgentRegistry()
agent_card = AgentCard(
    name="document-analyzer",
    display_name="Document Analyzer",
    description="Analyzes and extracts information from documents",
    url="https://agents.company.com/document-analyzer",
    version="1.0.0",
    capabilities={"streaming": True, "pushNotifications": False},
    skills=[],
    peer_agents={}
)

registry.add_or_update_agent(agent_

# content_hash: 1e02a8692a4f781b66ebf5b56ec8adedbbcfec3a02cc8f1cd02a22b95e057b14

================================================================================

## Section 15: solace_agent_mesh/common/middleware/middleware_llm.txt

**Source file:** `solace_agent_mesh/common/middleware/middleware_llm.txt`

# DEVELOPER GUIDE: middleware

## Quick Summary
The `middleware` directory provides a pluggable framework for system components that can be extended or replaced at runtime. It offers a registry system to dynamically bind custom implementations for core functionalities like configuration resolution. The default implementations provide permissive behavior, making them suitable for development and testing environments where all features are enabled by default.

## Files Overview
- `__init__.py`: Exposes the main public classes of the middleware package for easy importing.
- `config_resolver.py`: Defines the default, permissive configuration resolution middleware.
- `registry.py`: Provides the `MiddlewareRegistry` for dynamically binding custom middleware implementations.

## Developer API Reference

### __init__.py
**Purpose:** This file serves as the entry point to the `middleware` package, exporting the primary public interfaces for developers to use.

**Import:** `from solace_agent_mesh.common.middleware import ConfigResolver, MiddlewareRegistry`

**Usage Examples:**
```python
# Import the main classes directly from the middleware package
from solace_agent_mesh.common.middleware import ConfigResolver, MiddlewareRegistry

# Now you can use ConfigResolver and MiddlewareRegistry
print(ConfigResolver)
print(MiddlewareRegistry)
```

### config_resolver.py
**Purpose:** This file provides a pluggable interface for resolving user-specific configuration and determining feature availability. The default `ConfigResolver` class is permissive, allowing all operations and enabling all features, which is ideal for development or simple deployments.

**Import:** `from solace_agent_mesh.common.middleware import ConfigResolver`

**Classes:**
- `ConfigResolver()` - A class containing static methods to resolve user-specific configuration and determine feature availability. This default implementation is permissive.
  - `resolve_user_config(user_identity: Any, gateway_context: Dict[str, Any], base_config: Dict[str, Any]) -> Dict[str, Any]` - (async) Resolves user-specific configuration. The default implementation returns the `base_config` unchanged.
  - `is_feature_enabled(user_config: Dict[str, Any], feature_descriptor: Dict[str, Any], context: Dict[str, Any]) -> bool` - Checks if a feature is enabled for a user. The default implementation always returns `True`.
  - `validate_operation_config(user_config: Dict[str, Any], operation_spec: Dict[str, Any], validation_context: Dict[str, Any]) -> Dict[str, Any]` - Validates if an operation is allowed for a user. The default implementation always returns a dictionary with `{'valid': True}`.
  - `filter_available_options(user_config: Dict[str, Any], available_options: List[Dict[str, Any]], filter_context: Dict[str, Any]) -> List[Dict[str, Any]]` - Filters a list of options based on user permissions. The default implementation returns the original `available_options` list.

**Usage Examples:**
```python
import asyncio
from solace_agent_mesh.common.middleware import ConfigResolver

async def main():
    # Example user identity and base configuration
    user_id = "test-user@example.com"
    base_conf = {"api_key": "default_key", "allowed_models": ["gpt-3.5-turbo"]}

    # 1. Resolve user configuration (default implementation returns base_conf)
    user_config = await ConfigResolver.resolve_user_config(
        user_identity=user_id,
        gateway_context={"gateway_id": "gw-1"},
        base_config=base_conf
    )
    print(f"Resolved User Config: {user_config}")

    # 2. Check if a feature is enabled (default is always True)
    feature_desc = {"feature_type": "ai_tool", "function_name": "code_interpreter"}
    is_enabled = ConfigResolver.is_feature_enabled(
        user_config=user_config,
        feature_descriptor=feature_desc,
        context={}
    )
    print(f"Is Feature Enabled: {is_enabled}")

    # 3. Validate an operation (default is always valid)
    op_spec = {"operation_type": "model_inference", "model": "gpt-4"}
    validation = ConfigResolver.validate_operation_config(
        user_config=user_config,
        operation_spec=op_spec,
        validation_context={}
    )
    print(f"Operation Validation: {validation}")

    # 4. Filter available options (default returns all options)
    all_models = [
        {"name": "gpt-3.5-turbo", "provider": "openai"},
        {"name": "gpt-4", "provider": "openai"},
    ]
    available_models = ConfigResolver.filter_available_options(
        user_config=user_config,
        available_options=all_models,
        filter_context={"type": "language_model"}
    )
    print(f"Filtered Options: {available_models}")

if __name__ == "__main__":
    asyncio.run(main())
```

### registry.py
**Purpose:** This file provides the `MiddlewareRegistry`, a static class that allows developers to dynamically bind, or "plug in," their own custom middleware implementations at runtime. This is the core of the pluggable system.

**Import:** `from solace_agent_mesh.common.middleware import MiddlewareRegistry`

**Classes:**
- `MiddlewareRegistry()` - A registry for managing middleware implementations. All methods are class methods.
  - `bind_config_resolver(resolver_class: Type)` - Binds a custom class that implements the `ConfigResolver` interface. This new class will be used for all subsequent configuration resolution calls.
  - `get_config_resolver() -> Type` - Returns the currently bound `ConfigResolver` class. If no custom resolver has been bound, it returns the default `ConfigResolver`.
  - `register_initialization_callback(callback: callable)` - Registers a function to be executed when `initialize_middleware()` is called. Useful for setting up custom middleware components at application startup.
  - `initialize_middleware()` - Executes all registered initialization callbacks. This should be called once during application startup.
  - `reset_bindings()` - Resets all bindings back to their defaults. This is primarily useful for testing environments.
  - `get_registry_status() -> Dict[str, Any]` - Returns a dictionary containing the current status of the registry, such as which resolver is bound.

**Usage Examples:**
```python
import asyncio
from typing import Any, Dict, List
from solace_agent_mesh.common.middleware import MiddlewareRegistry, ConfigResolver

# 1. Define a custom ConfigResolver implementation
class MyCustomConfigResolver:
    """A custom resolver that only allows 'admin' users to use 'gpt-4'."""
    @staticmethod
    async def resolve_user_config(user_identity: Any, gateway_context: Dict[str, Any], base_config: Dict[str, Any]) -> Dict[str, Any]:
        if user_identity == "admin":
            return {"role": "admin", "allowed_models": ["gpt-4", "gpt-3.5-turbo"]}
        return {"role": "user", "allowed_models": ["gpt-3.5-turbo"]}

    @staticmethod
    def validate_operation_config(user_config: Dict, operation_spec: Dict, validation_context: Dict) -> Dict:
        model = operation_spec.get("model")
        if model and model not in user_config.get("allowed_models", []):
            return {"valid": False, "reason": f"Model '{model}' not allowed for this user."}
        return {"valid": True}
    
    # Inherit other methods from the default for simplicity
    is_feature_enabled = ConfigResolver.is_feature_enabled
    filter_available_options = ConfigResolver.filter_available_options

# 2. Define an initialization callback
def setup_custom_logging():
    print("Custom middleware initialization logic is running!")

# 3. Bind the custom components
MiddlewareRegistry.bind_config_resolver(MyCustomConfigResolver)
MiddlewareRegistry.register_initialization_callback(setup_custom_logging)

# 4. Initialize the middleware (e.g., at application startup)
print("--- Initializing Middleware ---")
MiddlewareRegistry.initialize_middleware()
print("--- Initialization Complete ---")

# 5. Use the middleware system
async def check_permissions():
    # The registry will now use MyCustomConfigResolver automatically
    CurrentResolver = MiddlewareRegistry.get_config_resolver()
    print(f"Current resolver is: {CurrentResolver.__name__}")

    # Check an admin user
    admin_config = await CurrentResolver.resolve_user_config("admin", {}, {})
    validation_result = CurrentResolver.validate_operation_config(
        admin_config, {"model": "gpt-4"}, {}
    )
    print(f"Admin validation for gpt-4: {validation_result}")

    # Check a regular user
    user_config = await CurrentResolver.resolve_user_config("user", {}, {})
    validation_result = CurrentResolver.validate_operation_config(
        user_config, {"model": "gpt-4"}, {}
    )
    print(f"User validation for gpt-4: {validation_result}")

# Run the example
asyncio.run(check_permissions())

# 6. Check status and reset (useful for testing)
print(f"\nRegistry Status: {MiddlewareRegistry.get_registry_status()}")
MiddlewareRegistry.reset_bindings()
print(f"Registry Status after reset: {MiddlewareRegistry.get_registry_status()}")
```

# content_hash: 9f505d5203463aeca42959c92c78fe35212f34e4ea0288b67c1c6a6450117d6c

================================================================================

## Section 16: solace_agent_mesh/common/server/server_llm.txt

**Source file:** `solace_agent_mesh/common/server/server_llm.txt`

# DEVELOPER GUIDE: server

## Quick Summary
The `server` directory provides a complete Agent-to-Agent (A2A) communication server implementation built on Starlette. It implements JSON-RPC 2.0 protocol for handling task-related requests, supports both standard request-response and streaming via Server-Sent Events (SSE), and includes an extensible task management system with push notification capabilities.

## Files Overview
- `__init__.py` - Exposes main public classes for easy import
- `server.py` - Core HTTP server implementation with JSON-RPC request routing
- `task_manager.py` - Abstract task manager interface and in-memory implementation
- `utils.py` - Utility functions for error responses and modality compatibility checks

## Developer API Reference

### __init__.py
**Purpose:** Provides convenient access to the main server components
**Import:** `from solace_agent_mesh.common.server import A2AServer, TaskManager, InMemoryTaskManager`

### server.py
**Purpose:** Main HTTP server that handles A2A communication via JSON-RPC 2.0 protocol
**Import:** `from solace_agent_mesh.common.server import A2AServer`

**Classes:**
- `A2AServer(host: str = "0.0.0.0", port: int = 5000, endpoint: str = "/", agent_card: AgentCard = None, task_manager: TaskManager = None)` - Starlette-based web server for A2A communication
  - `start() -> None` - Starts the uvicorn server (raises ValueError if agent_card or task_manager not set)
  - `host: str` - Server bind address
  - `port: int` - Server port
  - `endpoint: str` - Main API endpoint path
  - `agent_card: AgentCard` - Agent metadata served at `/.well-known/agent.json`
  - `task_manager: TaskManager` - Handler for task operations

**Usage Examples:**
```python
from solace_agent_mesh.common.server import A2AServer, InMemoryTaskManager
from solace_agent_mesh.common.types import AgentCard, SendTaskRequest, SendTaskResponse

# Define agent capabilities
agent_card = AgentCard(
    id="my-agent-v1",
    name="My Agent",
    version="1.0.0",
    description="A sample agent",
    supported_tasks=["summarize"],
    input_modalities=["text/plain"],
    output_modalities=["text/plain"]
)

# Create custom task manager
class MyTaskManager(InMemoryTaskManager):
    async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
        task = await self.upsert_task(request.params)
        # Process task here
        return SendTaskResponse(id=request.id, result=task)
    
    async def on_send_task_subscribe(self, request):
        # Handle streaming tasks
        pass

# Start server
server = A2AServer(
    host="127.0.0.1",
    port=8080,
    agent_card=agent_card,
    task_manager=MyTaskManager()
)
server.start()
```

### task_manager.py
**Purpose:** Defines task management interface and provides in-memory implementation
**Import:** `from solace_agent_mesh.common.server import TaskManager, InMemoryTaskManager`

**Classes:**
- `TaskManager()` - Abstract base class defining task operation interface
  - `on_get_task(request: GetTaskRequest) -> GetTaskResponse` - Retrieve task status and details
  - `on_cancel_task(request: CancelTaskRequest) -> CancelTaskResponse` - Cancel an ongoing task
  - `on_send_task(request: SendTaskRequest) -> SendTaskResponse` - Handle standard task submission
  - `on_send_task_subscribe(request: SendTaskStreamingRequest) -> Union[AsyncIterable[SendTaskStreamingResponse], JSONRPCResponse]` - Handle streaming task submission
  - `on_set_task_push_notification(request: SetTaskPushNotificationRequest) -> SetTaskPushNotificationResponse` - Configure push notifications
  - `on_get_task_push_notification(request: GetTaskPushNotificationRequest) -> GetTaskPushNotificationResponse` - Get push notification config
  - `on_resubscribe_to_task(request: TaskResubscriptionRequest) -> Union[AsyncIterable[SendTaskResponse], JSONRPCResponse]` - Resubscribe to streaming task

- `InMemoryTaskManager()` - Concrete implementation with in-memory storage and SSE support
  - `upsert_task(task_send_params: TaskSendParams) -> Task` - Create or update task with new message
  - `update_store(task_id: str, status: TaskStatus, artifacts: list[Artifact]) -> Task` - Update task status and artifacts
  - `set_push_notification_info(task_id: str, notification_config: PushNotificationConfig) -> None` - Store push notification config
  - `get_push_notification_info(task_id: str) -> PushNotificationConfig` - Retrieve push notification config
  - `has_push_notification_info(task_id: str) -> bool` - Check if push notification config exists
  - `setup_sse_consumer(task_id: str, is_resubscribe: bool = False) -> asyncio.Queue` - Create SSE subscriber queue
  - `enqueue_events_for_sse(task_id: str, task_update_event: Any) -> None` - Send event to all SSE subscribers
  - `dequeue_events_for_sse(request_id: str, task_id: str, sse_event_queue: asyncio.Queue) -> AsyncIterable[SendTaskStreamingResponse]` - Async generator for SSE events

**Usage Examples:**
```python
import asyncio
from solace_agent_mesh.common.server import InMemoryTaskManager
from solace_agent_mesh.common.types import (
    SendTaskRequest, SendTaskResponse, SendTaskStreamingRequest,
    TaskStatus, TaskState, TaskStatusUpdateEvent
)

class CustomTaskManager(InMemoryTaskManager):
    async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
        # Create/update task
        task = await self.upsert_task(request.params)
        
        # Process task
        result = f"Processed: {request.params.message.content}"
        
        # Update status
        status = TaskStatus(state=TaskState.COMPLETED)
        await self.update_store(task.id, status, [])
        
        return SendTaskResponse(id=request.id, result=task)
    
    async def on_send_task_subscribe(self, request: SendTaskStreamingRequest):
        await self.upsert_task(request.params)
        sse_queue = await self.setup_sse_consumer(request.params.id)
        
        # Start background processing
        asyncio.create_task(self._process_streaming(request.params.id))
        
        return self.dequeue_events_for_sse(request.id, request.params.id, sse_queue)
    
    async def _process_streaming(self, task_id: str):
        for i in range(3):
            await asyncio.sleep(1)
            event = TaskStatusUpdateEvent(
                status=TaskStatus(state=TaskState.IN_PROGRESS),
                message={"content": f"Step {i+1}"}
            )
            await self.enqueue_events_for_sse(task_id, event)
        
        # Final event
        final_event = TaskStatusUpdateEvent(
            status=TaskStatus(state=TaskState.COMPLETED),
            final=True
        )
        await self.enqueue_events_for_sse(task_id, final_event)
```

### utils.py
**Purpose:** Utility functions for error handling and compatibility checks
**Import:** `from solace_agent_mesh.common.server.utils import are_modalities_compatible, new_incompatible_types_error, new_not_implemented_error`

**Functions:**
- `are_modalities_compatible(server_output_modes: List[str], client_output_modes: List[str]) -> bool` - Check if modality lists have common elements
- `new_incompatible_types_error(request_id) -> JSONRPCResponse` - Create content type not supported error response
- `new_not_implemented_error(request_id) -> JSONRPCResponse` - Create unsupported operation error response

**Usage Examples:**
```python
from solace_agent_mesh.common.server.utils import are_modalities_compatible, new_not_implemented_error

# Check modality compatibility
server_modes = ["text/plain", "application/json"]
client_modes = ["text/plain"]
compatible = are_modalities_compatible(server_modes, client_modes)  # True

# Create error responses
error_response = new_not_implemented_error("request-123")
```

# content_hash: 0c17c4e0887c76ab8302a54f91fb714956b040c181af103198193442fe18697f

================================================================================

## Section 17: solace_agent_mesh/common/services/providers/providers_llm.txt

**Source file:** `solace_agent_mesh/common/services/providers/providers_llm.txt`

## Quick Summary
This directory contains concrete implementations (providers) for the abstract services defined in the parent `services` package. These providers offer specific ways to fulfill service contracts, such as sourcing user identity information from a local file.

## Files Overview
- `__init__.py` - Package initialization file marking the directory as a Python package
- `local_file_identity_service.py` - File-based identity service implementation that reads user data from local JSON files

## Developer API Reference

### __init__.py
**Purpose:** Initializes the providers package
**Import:** `from solace_agent_mesh.common.services import providers`

This file contains no public classes or functions - it serves only as package documentation.

### local_file_identity_service.py
**Purpose:** Provides a file-based identity service that reads user profiles from a local JSON file, ideal for development, testing, or small-scale deployments
**Import:** `from solace_agent_mesh.common.services.providers.local_file_identity_service import LocalFileIdentityService`

**Classes:**
- `LocalFileIdentityService(config: Dict[str, Any])` - Identity service that sources user data from a local JSON file
  - `async get_user_profile(auth_claims: Dict[str, Any]) -> Optional[Dict[str, Any]]` - Looks up a user profile using the lookup key from auth claims
  - `async search_users(query: str, limit: int = 10) -> List[Dict[str, Any]]` - Performs case-insensitive search on user names and emails
  - `file_path: str` - Path to the JSON file containing user data
  - `lookup_key: str` - Key used to identify users (defaults to "id")
  - `all_users: List[Dict[str, Any]]` - Complete list of user profiles loaded from file
  - `user_index: Dict[str, Dict[str, Any]]` - In-memory index mapping lookup keys to user profiles

**Usage Examples:**
```python
import asyncio
import json
from solace_agent_mesh.common.services.providers.local_file_identity_service import LocalFileIdentityService

# Create sample users.json file
users_data = [
    {
        "id": "jdoe",
        "email": "jane.doe@example.com", 
        "name": "Jane Doe",
        "title": "Senior Engineer",
        "manager_id": "ssmith"
    },
    {
        "id": "ssmith",
        "email": "sam.smith@example.com",
        "name": "Sam Smith", 
        "title": "Engineering Manager"
    }
]

with open("users.json", "w") as f:
    json.dump(users_data, f)

async def main():
    # Initialize the service
    config = {
        "file_path": "users.json",
        "lookup_key": "id"  # Optional, defaults to "id"
    }
    
    identity_service = LocalFileIdentityService(config)
    
    # Get user profile by ID
    auth_claims = {"id": "jdoe"}
    profile = await identity_service.get_user_profile(auth_claims)
    print(f"User profile: {profile}")
    
    # Search for users
    results = await identity_service.search_users("jane", limit=5)
    print(f"Search results: {results}")
    
    # Handle missing user
    missing = await identity_service.get_user_profile({"id": "nonexistent"})
    print(f"Missing user: {missing}")  # Returns None

asyncio.run(main())
```

# content_hash: 3661ffe07466b1391797c2feb90e2c97544b1a9c6c8cd15fc448cf95f4be6015

================================================================================

## Section 18: solace_agent_mesh/common/services/services_llm.txt

**Source file:** `solace_agent_mesh/common/services/services_llm.txt`

# DEVELOPER GUIDE: services

## Quick Summary
The `services` directory provides a modular and extensible framework for integrating external data sources related to identity and employee information into the Solace AI Connector. It is built on a provider pattern, defining abstract base classes (`BaseIdentityService`, `BaseEmployeeService`) that establish a clear contract for what data and functionality a service must provide.

The core architecture revolves around factory functions (`create_identity_service`, `create_employee_service`) that instantiate specific service providers based on a configuration dictionary. This allows the application to remain decoupled from the concrete implementations. Providers can be either built-in (like the file-based identity service located in the `providers/` subdirectory) or dynamically loaded as external plugins, making the system highly flexible and easy to extend.

## Files and Subdirectories Overview
- **Direct files:**
  - `__init__.py`: Marks the directory as a Python package with shared, reusable services
  - `employee_service.py`: Defines the abstract contract and factory for employee data services
  - `identity_service.py`: Defines the abstract contract and factory for user identity services
- **Subdirectories:**
  - `providers/`: Contains concrete implementations of the service contracts, including a file-based identity provider

## Developer API Reference

### Direct Files

#### employee_service.py
**Purpose:** Defines the abstract base class (`BaseEmployeeService`) that all employee service providers must implement, and a factory function (`create_employee_service`) to instantiate them. It enforces a canonical schema for employee data to ensure consistency across different providers.
**Import:** `from solace_agent_mesh.common.services.employee_service import BaseEmployeeService, create_employee_service`

**Classes/Functions/Constants:**
- **`class BaseEmployeeService(ABC)`**: The abstract base class for employee service providers.
    - **`__init__(self, config: Dict[str, Any])`**: Initializes the service, setting up configuration and an optional in-memory cache.
    - **`async def get_employee_dataframe(self) -> pd.DataFrame`**: (Abstract) Returns the entire employee directory as a pandas DataFrame.
    - **`async def get_employee_profile(self, employee_id: str) -> Optional[Dict[str, Any]]`**: (Abstract) Fetches the profile for a single employee, conforming to the canonical schema.
    - **`async def get_time_off_data(self, employee_id: str) -> List[Dict[str, Any]]`**: (Abstract) Retrieves a list of time-off entries for an employee.
    - **`async def get_employee_profile_picture(self, employee_id: str) -> Optional[str]`**: (Abstract) Fetches an employee's profile picture as a data URI string.
- **`def create_employee_service(config: Optional[Dict[str, Any]]) -> Optional[BaseEmployeeService]`**: A factory function that dynamically loads and instantiates an employee service provider based on the `type` specified in the configuration. It primarily uses Python's entry points to find and load external plugins.

#### identity_service.py
**Purpose:** Defines the abstract base class (`BaseIdentityService`) for identity providers and a factory function (`create_identity_service`) to create instances of them. This service is used for user lookups and profile enrichment.
**Import:** `from solace_agent_mesh.common.services.identity_service import BaseIdentityService, create_identity_service`

**Classes/Functions/Constants:**
- **`class BaseIdentityService(ABC)`**: The abstract base class for identity service providers.
    - **`__init__(self, config: Dict[str, Any])`**: Initializes the service, setting up configuration and an optional in-memory cache.
    - **`async def get_user_profile(self, auth_claims: Dict[str, Any]) -> Optional[Dict[str, Any]]`**: (Abstract) Fetches additional profile details for an authenticated user based on claims.
    - **`async def search_users(self, query: str, limit: int = 10) -> List[Dict[str, Any]]`**: (Abstract) Searches for users based on a query string (e.g., for autocomplete).
- **`def create_identity_service(config: Optional[Dict[str, Any]]) -> Optional[BaseIdentityService]`**: A factory function that instantiates an identity service provider. It has special handling for the built-in `local_file` provider and uses Python entry points for all other provider types.

### Subdirectory APIs

#### providers/
**Purpose:** This subdirectory contains concrete implementations of the abstract service classes. It ships with a built-in provider for the `IdentityService` that is useful for development and testing.
**Key Exports:** `LocalFileIdentityService`
**Import Examples:**
```python
# Typically, you would use the factory function.
# But for direct instantiation (e.g., in tests), you can do this:
from solace_agent_mesh.common.services.providers.local_file_identity_service import LocalFileIdentityService
```

## Complete Usage Guide

### 1. Using Service Factories (Recommended Approach)
The factories are the primary way to create and use services. They abstract away the specific implementation details and handle plugin loading.

**Example: Creating Identity and Employee Services**

```python
import asyncio
from solace_agent_mesh.common.services.identity_service import create_identity_service
from solace_agent_mesh.common.services.employee_service import create_employee_service

async def main():
    # --- Identity Service Example (using built-in provider) ---
    identity_config = {
        "type": "local_file",
        "file_path": "path/to/your/users.json",
        "lookup_key": "email",  # Key to use for lookups from auth_claims
        "cache_ttl_seconds": 3600
    }
    identity_service = create_identity_service(identity_config)

    if identity_service:
        print("Identity Service created.")
        # Fetch a user profile
        auth_claims = {"email": "jane.doe@example.com"}
        user_profile = await identity_service.get_user_profile(auth_claims)
        print(f"User Profile: {user_profile}")

        # Search for users
        search_results = await identity_service.search_users("Jane")
        print(f"Search Results: {search_results}")

    # --- Employee Service Example (using external plugin) ---
    # The 'type' must match the name of a registered plugin entry point
    employee_config = {
        "type": "bamboohr_plugin",
        "api_key": "your-secret-api-key",
        "subdomain": "your-company",
        "cache_ttl_seconds": 7200
    }
    employee_service = create_employee_service(employee_config)

    if employee_service:
        print("\nEmployee Service created.")
        # Get a detailed employee profile
        employee_profile = await employee_service.get_employee_profile("jane.doe@example.com")
        print(f"Employee Profile: {employee_profile}")

        # Get time off data
        time_off = await employee_service.get_time_off_data("jane.doe@example.com")
        print(f"Time Off Data: {time_off}")

        # Get employee directory as DataFrame
        df = await employee_service.get_employee_dataframe()
        print(f"Employee Directory Shape: {df.shape}")

# Run the example
asyncio.run(main())
```

### 2. Direct Provider Instantiation
While factories are preferred, you can instantiate providers from the `providers/` directory directly. This is useful for testing or when you know you will always use a specific built-in provider.

**Example: Direct Use of LocalFileIdentityService**

```python
import asyncio
import json
from solace_agent_mesh.common.services.providers.local_file_identity_service import LocalFileIdentityService

async def main():
    # First, create a sample users.json file
    users_data = [
        {
            "id": "jdoe",
            "email": "jane.doe@example.com", 
            "name": "Jane Doe",
            "title": "Senior Engineer",
            "manager_id": "ssmith"
        },
        {
            "id": "ssmith",
            "email": "sam.smith@example.com",
            "name": "Sam Smith", 
            "title": "Engineering Manager"
        }
    ]

    with open("users.json", "w") as f:
        json.dump(users_data, f)

    # Configuration does not need a 'type' key for direct instantiation
    config = {
        "file_path": "users.json",
        "lookup_key": "id",
        "cache_ttl_seconds": 1800
    }

    # Instantiate the class directly
    local_service = LocalFileIdentityService(config)
    print("LocalFileIdentityService created directly")

    # Get user profile by ID
    auth_claims = {"id": "jdoe"}
    profile = await local_service.get_user_profile(auth_claims)
    print(f"User profile: {profile}")
    
    # Search for users
    results = await local_service.search_users("jane", limit=5)
    print(f"Search results: {results}")

asyncio.run(main())
```

### 3. Creating Custom Service Providers
To create your own service provider, inherit from the appropriate base class and implement all abstract methods.

**Example: Custom Employee Service Provider**

```python
import pandas as pd
from typing import Any, Dict, List, Optional
from solace_agent_mesh.common.services.employee_service import BaseEmployeeService

class CustomEmployeeService(BaseEmployeeService):
    """Custom employee service that connects to your HR system."""
    
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.api_endpoint = config.get("api_endpoint")
        self.api_key = config.get("api_key")
    
    async def get_employee_dataframe(self) -> pd.DataFrame:
        """Fetch all employees and return as DataFrame."""
        # Your implementation here
        # This should return a DataFrame with canonical schema columns:
        # id, displayName, workEmail, jobTitle, department, location, supervisorId, hireDate, mobilePhone
        employees_data = [
            {
                "id": "jdoe@company.com",
                "displayName": "Jane Doe",
                "workEmail": "jdoe@company.com",
                "jobTitle": "Software Engineer",
                "department": "Engineering",
                "location": "San Francisco",
                "supervisorId": "manager@company.com",
                "hireDate": "2023-01-15",
                "mobilePhone": "+1-555-0123"
            }
        ]
        return pd.DataFrame(employees_data)
    
    async def get_employee_profile(self, employee_id: str) -> Optional[Dict[str, Any]]:
        """Get single employee profile."""
        # Your implementation here
        return {
            "id": employee_id,
            "displayName": "Jane Doe",
            "workEmail": employee_id,
            "jobTitle": "Software Engineer"
        }
    
    async def get_time_off_data(self, employee_id: str) -> List[Dict[str, Any]]:
        """Get employee time off data."""
        # Your implementation here
        return [
            {
                'start': '2025-07-04',
                'end': '2025-07-04',
                'type': 'Holiday',
                'amount': 'full_day'
            }
        ]
    
    async def get_employee_profile_picture(self, employee_id: str) -> Optional[str]:
        """Get employee profile picture as data URI."""
        # Your implementation here
        return None  # or return "data:image/jpeg;base64,..."

# Usage
async def use_custom_service():
    config = {
        "api_endpoint": "https://your-hr-api.com",
        "api_key": "your-api-key",
        "cache_ttl_seconds": 3600
    }
    
    service = CustomEmployeeService(config)
    profile = await service.get_employee_profile("jdoe@company.com")
    print(f"Custom service profile: {profile}")
```

### 4. Working with Both Services Together
Often you'll want to use both identity and employee services together for comprehensive user information.

**Example: Combined Service Usage**

```python
import asyncio
from solace_agent_mesh.common.services.identity_service import create_identity_service
from solace_agent_mesh.common.services.employee_service import create_employee_service

async def get_complete_user_info(user_email: str):
    """Get comprehensive user information from both services."""
    
    # Configure services
    identity_config = {
        "type": "local_file",
        "file_path": "users.json",
        "lookup_key": "email"
    }
    
    employee_config = {
        "type": "your_hr_plugin",
        "api_key": "your-key"
    }
    
    # Create services
    identity_service = create_identity_service(identity_config)
    employee_service = create_employee_service(employee_config)
    
    # Gather information
    user_info = {}
    
    if identity_service:
        auth_claims = {"email": user_email}
        identity_profile = await identity_service.get_user_profile(auth_claims)
        if identity_profile:
            user_info.update(identity_profile)
    
    if employee_service:
        employee_profile = await employee_service.get_employee_profile(user_email)
        if employee_profile:
            user_info.update(employee_profile)
            
        # Get additional employee data
        time_off = await employee_service.get_time_off_data(user_email)
        user_info["time_off"] = time_off
        
        profile_pic = await employee_service.get_employee_profile_picture(user_email)
        if profile_pic:
            user_info["profile_picture"] = profile_pic
    
    return user_info

# Usage
async def main():
    complete_info = await get_complete_user_info("jane.doe@example.com")
    print(f"Complete user information: {complete_info}")

asyncio.run(main())
```

This comprehensive guide shows how the services framework provides a clean, extensible way to integrate various data sources while maintaining consistent interfaces and supporting both built-in providers and external plugins.

# content_hash: 63457e2c72256810f713e62325c1601a675951fe47c1121a647c404690179206

================================================================================

## Section 19: solace_agent_mesh/common/utils/embeds/embeds_llm.txt

**Source file:** `solace_agent_mesh/common/utils/embeds/embeds_llm.txt`

# DEVELOPER GUIDE: embeds

## Quick Summary
The `embeds` directory provides a comprehensive system for finding, parsing, and resolving embedded expressions within strings. These expressions use `«...»` syntax and can represent dynamic values like mathematical calculations, datetimes, UUIDs, or content from stored artifacts. The system supports multi-step data transformation pipelines, recursive embed resolution, and includes safety features like depth and size limits. It's designed as a core component for dynamic content generation and data processing in agent workflows.

## Files Overview
- `__init__.py` - Main public entry point exporting key functions and constants
- `constants.py` - Defines embed syntax (delimiters, separators), regex patterns, and type classifications
- `converter.py` - Data format conversion and serialization functions
- `evaluators.py` - Specific evaluation logic for simple embed types (math, datetime, uuid, etc.)
- `modifiers.py` - Data transformation functions that can be chained together (jsonpath, slice, grep, etc.)
- `resolver.py` - Core orchestration engine handling embed resolution, modifier chains, and recursion
- `types.py` - DataFormat enum for tracking data types during transformations

## Developer API Reference

### __init__.py
**Purpose:** Main public entry point that exports the most commonly used functions and constants from other modules.

**Import:** `from solace_agent_mesh.common.utils.embeds import resolve_embeds_recursively_in_string, evaluate_embed, EMBED_REGEX`

**Functions:**
- `evaluate_embed(embed_type: str, expression: str, format_spec: Optional[str], context: Dict[str, Any], log_identifier: str, config: Optional[Dict] = None, current_depth: int = 0, visited_artifacts: Optional[Set[Tuple[str, int]]] = None) -> Union[Tuple[str, Optional[str], int], Tuple[None, str, Any]]` - Evaluates a single parsed embed expression
- `resolve_embeds_in_string(text: str, context: Any, resolver_func: Callable, types_to_resolve: Set[str], log_identifier: str = "[EmbedUtil]", config: Optional[Dict[str, Any]] = None) -> Tuple[str, int, List[Tuple[int, Any]]]` - Resolves embeds in a string for a single pass (non-recursive)
- `resolve_embeds_recursively_in_string(text: str, context: Any, resolver_func: Callable, types_to_resolve: Set[str], log_identifier: str, config: Optional[Dict], max_depth: int, current_depth: int = 0, visited_artifacts: Optional[Set[Tuple[str, int]]] = None, accumulated_size: int = 0, max_total_size: int = -1) -> str` - Recursively resolves all embeds in a string with depth and size limits

**Constants/Variables:**
- `EMBED_DELIMITER_OPEN: str` - Opening delimiter (`«`)
- `EMBED_DELIMITER_CLOSE: str` - Closing delimiter (`»`)
- `EMBED_TYPE_SEPARATOR: str` - Type/expression separator (`:`)
- `EMBED_FORMAT_SEPARATOR: str` - Format specifier separator (`|`)
- `EMBED_CHAIN_DELIMITER: str` - Modifier chain separator (`>>>`)
- `EMBED_REGEX: re.Pattern` - Compiled regex for finding embeds
- `EARLY_EMBED_TYPES: Set[str]` - Types resolved in initial pass
- `LATE_EMBED_TYPES: Set[str]` - Types resolved in subsequent pass

**Usage Examples:**
```python
from solace_agent_mesh.common.utils.embeds import resolve_embeds_recursively_in_string, evaluate_embed, EMBED_REGEX

# Basic embed resolution
context = {
    "artifact_service": my_artifact_service,
    "session_context": {"app_name": "myapp", "user_id": "user123", "session_id": "sess456"}
}

text = "The result is «math:10 * 1.15 | .2f» and ID is «uuid:new»"
resolved = await resolve_embeds_recursively_in_string(
    text=text,
    context=context,
    resolver_func=evaluate_embed,
    types_to_resolve={"math", "uuid"},
    log_identifier="[MyApp]",
    config={},
    max_depth=5
)
```

### constants.py
**Purpose:** Defines all static constants governing embed syntax and classification.

**Import:** `from solace_agent_mesh.common.utils.embeds.constants import EMBED_REGEX, EARLY_EMBED_TYPES`

**Constants/Variables:**
- `EMBED_DELIMITER_OPEN: str` - Opening delimiter (`«`)
- `EMBED_DELIMITER_CLOSE: str` - Closing delimiter (`»`)
- `EMBED_TYPE_SEPARATOR: str` - Type/expression separator (`:`)
- `EMBED_FORMAT_SEPARATOR: str` - Format specifier separator (`|`)
- `EMBED_CHAIN_DELIMITER: str` - Modifier chain separator (`>>>`)
- `EMBED_REGEX: re.Pattern` - Compiled regex with capture groups for type, expression, and format
- `EARLY_EMBED_TYPES: Set[str]` - Simple embed types resolved first (`math`, `datetime`, `uuid`, `artifact_meta`, `status_update`)
- `LATE_EMBED_TYPES: Set[str]` - Complex embed types resolved later (`artifact_content`)
- `TEXT_CONTAINER_MIME_TYPES: Set[str]` - MIME types considered text-based

**Usage Examples:**
```python
from solace_agent_mesh.common.utils.embeds.constants import EMBED_REGEX

text = "Price: «math:10 * 1.15 | .2f» ID: «uuid:new»"
for match in EMBED_REGEX.finditer(text):
    embed_type = match.group(1)      # "math" or "uuid"
    expression = match.group(2)      # "10 * 1.15 " or "new"
    format_spec = match.group(3)     # " .2f" or None
    print(f"Type: {embed_type}, Expr: '{expression}', Format: '{format_spec}'")
```

### converter.py
**Purpose:** Provides data conversion between different formats and serialization to final string representations.

**Import:** `from solace_agent_mesh.common.utils.embeds.converter import convert_data, serialize_data`

**Functions:**
- `convert_data(current_data: Any, current_format: Optional[DataFormat], target_format: DataFormat, log_id: str = "[Converter]", original_mime_type: Optional[str] = None) -> Tuple[Any, DataFormat, Optional[str]]` - Converts data between DataFormat types using MIME type hints
- `serialize_data(data: Any, data_format: Optional[DataFormat], target_string_format: Optional[str], original_mime_type: Optional[str], log_id: str = "[Serializer]") -> Tuple[str, Optional[str]]` - Serializes data to final string format (text, json, csv, datauri, or Python format specs)

**Usage Examples:**
```python
from solace_agent_mesh.common.utils.embeds.converter import convert_data, serialize_data
from solace_agent_mesh.common.utils.embeds.types import DataFormat

# Convert CSV bytes to list of dictionaries
csv_bytes = b"id,name\n1,Alice\n2,Bob"
list_data, new_format, err = convert_data(
    current_data=csv_bytes,
    current_format=DataFormat.BYTES,
    target_format=DataFormat.LIST_OF_DICTS,
    original_mime_type="text/csv"
)

# Serialize to pretty JSON
json_str, err = serialize_data(
    data=list_data,
    data_format=DataFormat.LIST_OF_DICTS,
    target_string_format="json_pretty",
    original_mime_type=None
)
```

### evaluators.py
**Purpose:** Contains evaluation logic for simple embed types and the evaluator registry.

**Import:** `from solace_agent_mesh.common.utils.embeds.evaluators import EMBED_EVALUATORS`

**Functions:**
- `_evaluate_math_embed(expression: str, context: Any, log_identifier: str, format_spec: Optional[str] = None) -> Tuple[str, Optional[str], int]` - Evaluates mathematical expressions using asteval
- `_evaluate_datetime_embed(expression: str, context: Any, log_identifier: str, format_spec: Optional[str] = None) -> Tuple[str, Optional[str], int]` - Formats current datetime
- `_evaluate_uuid_embed(expression: str, context: Any, log_identifier: str, format_spec: Optional[str] = None) -> Tuple[str, Optional[str], int]` - Generates UUID4 strings
- `_evaluate_artifact_meta_embed(expression: str, context: Dict[str, Any], log_identifier: str, format_spec: Optional[str] = None) -> Tuple[str, Optional[str], int]` - Loads and formats artifact metadata
- `_evaluate_artifact_content_embed(expression: str, context: Any, log_identifier: str, config: Optional[Dict] = None) -> Tuple[Optional[bytes], Optional[str], Optional[str]]` - Loads raw artifact content

**Constants/Variables:**
- `EMBED_EVALUATORS: Dict[str, Callable]` - Registry mapping embed types to evaluator functions
- `MATH_SAFE_SYMBOLS: Dict[str, Any]` - Safe mathematical functions and constants for math embeds

**Usage Examples:**
```python
from solace_agent_mesh.common.utils.embeds.evaluators import EMBED_EVALUATORS

# Math evaluation
result, error, size = EMBED_EVALUATORS["math"]("2 + 3 * 4", {}, "[Test]", ".2f")
# result: "14.00", error: None, size: 5

# DateTime formatting  
result, error, size = EMBED_EVALUATORS["datetime"]("%Y-%m-%d", {}, "[Test]")
# result: "2024-01-15", error: None, size: 10
```

### modifiers.py
**Purpose:** Implements data transformation functions that can be chained together in artifact_content embeds.

**Import:** `from solace_agent_mesh.common.utils.embeds.modifiers import MODIFIER_DEFINITIONS, _parse_modifier_chain`

**Functions:**
- `_apply_jsonpath(current_data: Any, expression: str, mime_type: Optional[str], log_id: str) -> Tuple[Any, Optional[str], Optional[str]]` - Applies JSONPath expressions to JSON data
- `_apply_select_cols(current_data: List[Dict], cols_str: str, mime_type: Optional[str], log_id: str) -> Tuple[Any, Optional[str], Optional[str]]` - Selects specific columns from tabular data
- `_apply_filter_rows_eq(current_data: List[Dict], filter_spec: str, mime_type: Optional[str], log_id: str) -> Tuple[Any, Optional[str], Optional[str]]` - Filters rows by column value equality
- `_apply_slice_rows(current_data: List[Dict], slice_spec: str, mime_type: Optional[str], log_id: str) -> Tuple[Any, Optional[str], Optional[str]]` - Slices rows using Python slice notation
- `_apply_slice_lines(current_data: str, slice_spec: str, mime_type: Optional[str], log_id: str) -> Tuple[Any, Optional[str], Optional[str]]` - Slices text lines
- `_apply_grep(current_data: str, pattern: str, mime_type: Optional[str], log_id: str) -> Tuple[Any, Optional[str], Optional[str]]` - Filters lines matching regex pattern
- `_apply_head(current_data: str, n_str: str, mime_type: Optional[str], log_id: str) -> Tuple[Any, Optional[str], Optional[str]]` - Returns first N lines
- `_apply_tail(current_data: str, n_str: str, mime_type: Optional[str], log_id: str) -> Tuple[Any, Optional[str], Optional[str]]` - Returns last N lines
- `_apply_template(current_data: Any, template_spec: str, mime_type: Optional[str], log_id: str, context: Any) -> Tuple[Any, Optional[str], Optional[str]]` - Applies Mustache templates from artifacts
- `_parse_modifier_chain(expression: str) -> Tuple[str, List[Tuple[str, str]], Optional[str]]` - Parses artifact_content expression into components

**Constants/Variables:**
- `MODIFIER_IMPLEMENTATIONS: Dict[str, Callable]` - Registry of modifier functions
- `MODIFIER_DEFINITIONS: Dict[str, Dict[str, Any]]` - Modifier metadata including accepted/produced formats

**Usage Examples:**
```python
from solace_agent_mesh.common.utils.embeds.modifiers import _parse_modifier_chain

# Parse a complex artifact_content expression
expression = "data.csv:1 >>> select_cols:name,age >>> filter_rows_eq:age:25 >>> format:json"
artifact_spec, modifiers, output_format = _parse_modifier_chain(expression)
# artifact_spec: "data.csv:1"
# modifiers: [("select_cols", "name,age"), ("filter_rows_eq", "age:25")]
# output_format: "json"
```

### resolver.py
**Purpose:** Core orchestration engine that handles the complete embed resolution process including modifier chains and recursion.

**Import:** `from solace_agent_mesh.common.utils.embeds.resolver import resolve_embeds_in_string, evaluate_embed`

**Functions:**
- `resolve_embeds_in_string(text: str, context: Any, resolver_func: Callable, types_to_resolve: Set[str], log_identifier: str = "[EmbedUtil]", config: Optional[Dict[str, Any]] = None) -> Tuple[str, int, List[Tuple[int, Any]]]` - Single-pass embed resolution with buffering support
- `resolve_embeds_recursively_in_string(text: str, context: Any, resolver_func: Callable, types_to_resolve: Set[str], log_identifier: str, config: Optional[Dict], max_depth: int, current_depth: int = 0, visited_artifacts: Optional[Set[Tuple[str, int]]] = None, accumulated_size: int = 0, max_total_size: int = -1) -> str` - Recursive embed resolution with safety limits
- `evaluate_embed(embed_type: str, expression: str, format_spec: Optional[str], context: Dict[str, Any], log_identifier: str, config: Optional[Dict] = None, current_depth: int = 0, visited_artifacts: Optional[Set[Tuple[str, int]]] = None) -> Union[Tuple[str, Optional[str], int], Tuple[None, str, Any]]` - Main embed evaluation dispatcher

**Usage Examples:**
```python
from solace_agent_mesh.common.utils.embeds.resolver import resolve_embeds_in_string, evaluate_embed

# Single-pass resolution
text = "Result: «math:2+3» and «uuid:new»"
context = {"artifact_service": service, "session_context": session_ctx}

resolved_text, processed_index, signals = await resolve_embeds_in_string(
    text=text,
    context=context,
    resolver_func=evaluate_embed,
    types_to_resolve={"math", "uuid"},
    log_identifier="[MyApp]",
    config={}
)

# Complex artifact content with modifiers
result, error, size = await evaluate_embed(
    embed_type="artifact_content",
    expression="sales.csv >>> select_cols:product,revenue >>> format:json",
    format_spec=None,
    context=context,
    log_identifier="[Sales]"
)
```

### types.py

# content_hash: cbca48790a6e817c5bf49ddd8212cbbc663e920e9dd4dbece95bcf57d79c1837

================================================================================

## Section 20: solace_agent_mesh/common/utils/utils_llm.txt

**Source file:** `solace_agent_mesh/common/utils/utils_llm.txt`

## Developer API Reference

### Direct Files

#### __init__.py
**Purpose:** Serves as the main entry point for the `utils` package, exporting the most common utility functions for easy importing.
**Import:** `from solace_agent_mesh.common.utils import is_text_based_mime_type`

**Classes/Functions/Constants:**
*   `is_text_based_mime_type(mime_type: Optional[str]) -> bool`: Checks if a given MIME type is considered text-based.

#### asyncio_macos_fix.py
**Purpose:** Provides a targeted, automatic fix for a `NotImplementedError` that occurs when creating subprocesses with asyncio on macOS. This module is imported for its side effects and should be loaded early in the application's lifecycle.
**Import:** `from solace_agent_mesh.common.utils import asyncio_macos_fix` (Importing the module is sufficient to apply the patch).

**Classes/Functions/Constants:**
*   `apply_macos_asyncio_fix() -> bool`: Applies the asyncio fix for macOS subprocess support.
*   `ensure_asyncio_compatibility() -> bool`: Ensures asyncio compatibility for subprocess creation on macOS.

#### in_memory_cache.py
**Purpose:** Provides a simple, thread-safe, in-memory cache implemented as a singleton. It's useful for storing frequently accessed data with an optional time-to-live (TTL).
**Import:** `from solace_agent_mesh.common.utils.in_memory_cache import InMemoryCache`

**Classes/Functions/Constants:**
*   **`InMemoryCache`**: A singleton class for caching.
    *   `set(self, key: str, value: Any, ttl: Optional[int] = None) -> None`: Sets a key-value pair with an optional TTL in seconds.
    *   `get(self, key: str, default: Any = None) -> Any`: Retrieves a value by its key, returning a default if the key is not found or has expired.
    *   `delete(self, key: str) -> bool`: Deletes a key-value pair from the cache.
    *   `clear(self) -> bool`: Removes all items from the cache.

#### initializer.py
**Purpose:** Handles initialization of enterprise features if available, loading configuration from environment variables.
**Import:** `from solace_agent_mesh.common.utils.initializer import initialize`

**Classes/Functions/Constants:**
*   `initialize() -> None`: Initializes enterprise features if available, loading configuration from SAM_AUTHORIZATION_CONFIG environment variable.

#### log_formatters.py
**Purpose:** Contains custom logging formatters to structure log output for specific platforms, such as Datadog.
**Import:** `from solace_agent_mesh.common.utils.log_formatters import DatadogJsonFormatter`

**Classes/Functions/Constants:**
*   **`DatadogJsonFormatter(logging.Formatter)`**: A formatter that outputs log records as a JSON string, compatible with Datadog's standard log attributes. It automatically includes tracing information (`dd.trace_id`, `dd.span_id`) if available.

#### message_utils.py
**Purpose:** Provides utilities for calculating and validating message sizes to ensure they don't exceed configured limits.
**Import:** `from solace_agent_mesh.common.utils.message_utils import calculate_message_size, validate_message_size`

**Classes/Functions/Constants:**
*   `calculate_message_size(payload: Dict[str, Any]) -> int`: Calculates the exact size of a message payload in bytes using JSON serialization + UTF-8 encoding.
*   `validate_message_size(payload: Dict[str, Any], max_size_bytes: int, component_identifier: str = "Unknown") -> Tuple[bool, int]`: Validates that a message payload doesn't exceed the maximum size limit.
*   `MAX_UTF8_BYTES_PER_CHARACTER: int`: Maximum bytes per character in UTF-8 encoding (4 bytes).

#### mime_helpers.py
**Purpose:** Provides utilities for handling and classifying MIME types, with a focus on identifying which types represent text-based content.
**Import:** `from solace_agent_mesh.common.utils.mime_helpers import is_text_based_mime_type, get_extension_for_mime_type`

**Classes/Functions/Constants:**
*   `is_text_based_mime_type(mime_type: Optional[str]) -> bool`: Returns `True` if the MIME type starts with `text/` or is in the list of known text-based application types.
*   `is_text_based_file(mime_type: Optional[str], content_bytes: Optional[bytes] = None) -> bool`: Determines if a file is text-based based on its MIME type and content.
*   `get_extension_for_mime_type(mime_type: Optional[str], default_extension: str = ".dat") -> str`: Returns a file extension for a given MIME type.
*   `TEXT_CONTAINER_MIME_TYPES: Set[str]`: A set of non-`text/*` MIME types that are considered to contain text.

#### push_notification_auth.py
**Purpose:** Implements JWT-based authentication for sending and receiving push notifications with request integrity verification.
**Import:** `from solace_agent_mesh.common.utils.push_notification_auth import PushNotificationSenderAuth, PushNotificationReceiverAuth`

**Classes/Functions/Constants:**
*   **`PushNotificationSenderAuth`**: Handles sending authenticated push notifications.
    *   `generate_jwk(self) -> None`: Generates RSA key pair for signing.
    *   `handle_jwks_endpoint(self, request: Request) -> JSONResponse`: Returns public keys for client verification.
    *   `send_push_notification(self, url: str, data: dict[str, Any]) -> None`: Sends authenticated push notification.
    *   `verify_push_notification_url(url: str) -> bool`: Verifies a push notification URL.
*   **`PushNotificationReceiverAuth`**: Handles receiving and verifying authenticated push notifications.
    *   `load_jwks(self, jwks_url: str) -> None`: Loads public keys from JWKS endpoint.
    *   `verify_push_notification(self, request: Request) -> bool`: Verifies incoming push notification authenticity.

### Subdirectory APIs

#### embeds/
**Purpose:** Provides a comprehensive system for finding, parsing, and resolving embedded dynamic expressions within strings using `«...»` syntax.
**Key Exports:** Main functions for embed resolution, evaluation, and constants for embed syntax.
**Import Examples:**
```python
from solace_agent_mesh.common.utils.embeds import resolve_embeds_recursively_in_string, evaluate_embed, EMBED_REGEX
from solace_agent_mesh.common.utils.embeds.constants import EARLY_EMBED_TYPES, LATE_EMBED_TYPES
from solace_agent_mesh.common.utils.embeds.types import DataFormat
```

## Complete Usage Guide

### 1. Basic Utility Imports and Usage

```python
# Import commonly used utilities
from solace_agent_mesh.common.utils import is_text_based_mime_type
from solace_agent_mesh.common.utils.in_memory_cache import InMemoryCache
from solace_agent_mesh.common.utils.message_utils import validate_message_size
from solace_agent_mesh.common.utils.mime_helpers import get_extension_for_mime_type

# Check if content type is text-based
if is_text_based_mime_type("application/json"):
    print("JSON is text-based")

# Use singleton cache
cache = InMemoryCache()
cache.set("user_data", {"name": "Alice"}, ttl=300)  # 5 minute TTL
user_data = cache.get("user_data", {})

# Validate message size
payload = {"message": "Hello world", "data": [1, 2, 3]}
is_valid, size = validate_message_size(payload, max_size_bytes=1024)
if not is_valid:
    print(f"Message too large: {size} bytes")

# Get file extension from MIME type
extension = get_extension_for_mime_type("image/png")  # Returns ".png"
```

### 2. Platform Compatibility and Initialization

```python
# Early in your application startup
from solace_agent_mesh.common.utils import asyncio_macos_fix  # Auto-applies fix
from solace_agent_mesh.common.utils.initializer import initialize

# Initialize enterprise features if available
initialize()

# Now asyncio subprocess creation will work on macOS
import asyncio
async def run_subprocess():
    process = await asyncio.create_subprocess_exec("echo", "hello")
    await process.wait()
```

### 3. Custom Logging Setup

```python
import logging
from solace_agent_mesh.common.utils.log_formatters import DatadogJsonFormatter

# Set up Datadog-compatible JSON logging
logger = logging.getLogger("my_app")
handler = logging.StreamHandler()
handler.setFormatter(DatadogJsonFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Logs will be formatted as JSON with Datadog attributes
logger.info("User logged in", extra={"user_id": "123", "dd.trace_id": "abc"})
```

### 4. Push Notification Authentication

```python
from solace_agent_mesh.common.utils.push_notification_auth import (
    PushNotificationSenderAuth, 
    PushNotificationReceiverAuth
)
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response

# Sender setup
sender_auth = PushNotificationSenderAuth()
sender_auth.generate_jwk()

# Verify and send notification
url = "https://client.example.com/webhook"
if await sender_auth.verify_push_notification_url(url):
    await sender_auth.send_push_notification(url, {"event": "update", "data": "value"})

# Receiver setup
app = Starlette()
receiver_auth = PushNotificationReceiverAuth()
await receiver_auth.load_jwks("https://sender.example.com/.well-known/jwks.json")

@app.route("/webhook", methods=["POST"])
async def webhook(request: Request):
    if await receiver_auth.verify_push_notification(request):
        # Process authenticated notification
        data = await request.json()
        return Response("OK")
    return Response("Unauthorized", status_code=401)

# JWKS endpoint for sender
@app.route("/.well-known/jwks.json")
async def jwks(request: Request):
    return sender_auth.handle_jwks_endpoint(request)
```

### 5. Advanced Embed Processing

```python
from solace_agent_mesh.common.utils.embeds import (
    resolve_embeds_recursively_in_string, 
    evaluate_embed,
    EARLY_EMBED_TYPES,
    LATE_EMBED_TYPES
)

# Set up context for embed resolution
context = {
    "artifact_service": my_artifact_service,
    "session_context": {
        "app_name": "myapp",
        "user_id": "user123", 
        "session_id": "sess456"
    }
}

# Simple embed resolution
text = """
Current time: «datetime:%Y-%m-%d %H:%M:%S»
Calculation: «math:10 * 1.15 | .2f»
Unique ID: «uuid:new»
"""

resolved = await resolve_embeds_recursively_in_string(
    text=text,
    context=context,
    resolver_func=evaluate_embed,
    types_to_resolve=EARLY_EMBED_TYPES,
    log_identifier="[MyApp]",
    config={},
    max_depth=5
)

# Complex artifact content processing
artifact_text = """
Sales Report:
«artifact_content:sales.csv:1 >>> select_cols:product,revenue >>> filter_rows_eq:category:electronics >>> format:json_pretty»

Summary: «artifact_content:summary.txt >>> head:5»
"""

final_result = await resolve_embeds_recursively_in_string(
    text=artifact_text,
    context=context,
    resolver_func=evaluate_embed,
    types_to_resolve=LATE_EMBED_TYPES,
    log_identifier="[Report]",
    config={"max_artifact_size": 1024*1024},
    max_depth=3
)
```

### 6. Integrated Workflow Example

```python
from solace_agent_mesh.common.utils import is_text_based_mime_type
from solace_agent_mesh.common.utils.in_memory_cache import InMemoryCache
from solace_agent_mesh.common.utils.message_utils import validate_message_size
from solace_agent_mesh.common.utils.embeds import resolve_embeds_recursively_in_string, evaluate_embed

async def process_user_request(user_id: str, template: str, context: dict):
    """Complete workflow using multiple utils together."""
    
    # Get cache instance
    cache = InMemoryCache()
    
    # Check cache first
    cache_key = f"processed_template_{user_id}_{hash(template)}"
    cached_result = cache.get(cache_key)
    if cached_result:
        return cached_result
    
    # Process embeds in template
    full_context = {
        **context,
        "session_context": {"user_id": user_id}
    }
    
    processed_text = await resolve_embeds_recursively_in_string(
        text=template,
        context=full_context,
        resolver_func=evaluate_embed,
        types_to_resolve={"math", "datetime", "uuid", "artifact_content"},
        log_identifier=f"[User:{user_id}]",
        config={},
        max_depth=5
    )
    
    # Validate result size
    result_payload = {"processed_text": processed_text, "user_id": user_id}
    is_valid, size = validate_message_size(result_payload, max_size_bytes=10*1024*1024)
    
    if not is_valid:
        raise ValueError(f"Processed result too large: {size} bytes")
    
    # Cache result for 10 minutes
    cache.set(cache_key, processed_text, ttl=600)
    
    return processed_text

# Usage
template = """
Hello! Today is «datetime:%Y-%m-%d».
Your order total: «math:«artifact_content:cart.json >>> jsonpath:$.total» * 1.08 | .2f»
Order ID: «uuid:new»
"""

result = await process_user_request("user123", template, {"artifact_service": service})
```

This comprehensive guide shows how the `utils` directory provides essential building blocks that work together to create robust, dynamic applications with caching, security, platform compatibility, and advanced content processing capabilities.

# content_hash: c81d0ce65418094841137a435b82562156225f13e9ccb949474aa23ac5ff9a47

================================================================================

## Section 21: solace_agent_mesh/core_a2a/core_a2a_llm.txt

**Source file:** `solace_agent_mesh/core_a2a/core_a2a_llm.txt`

# DEVELOPER GUIDE: core_a2a

## Quick Summary
The `core_a2a` directory provides a reusable service layer for core Agent-to-Agent (A2A) interactions. It handles task submission (both regular and streaming), task cancellation, and agent discovery processing while being decoupled from specific gateway implementations and SAC messaging details.

## Files Overview
- `__init__.py` - Package initialization file for the core A2A service layer
- `service.py` - Main service class that encapsulates A2A protocol logic and agent registry operations

## Developer API Reference

### __init__.py
**Purpose:** Package initialization for the core A2A service layer
**Import:** `import solace_agent_mesh.core_a2a`

No public classes, functions, or constants defined.

### service.py
**Purpose:** Provides the main CoreA2AService class for handling A2A protocol operations
**Import:** `from solace_agent_mesh.core_a2a.service import CoreA2AService`

**Classes:**
- `CoreA2AService(agent_registry: AgentRegistry, namespace: str)` - Main service class for A2A operations
  - `submit_task(agent_name: str, a2a_message: A2AMessage, session_id: str, client_id: str, reply_to_topic: str, user_id: str = "default_user", a2a_user_config: Optional[Dict[str, Any]] = None, metadata_override: Optional[Dict[str, Any]] = None) -> Tuple[str, Dict, Dict]` - Constructs topic, payload, and user properties for non-streaming task requests
  - `submit_streaming_task(agent_name: str, a2a_message: A2AMessage, session_id: str, client_id: str, reply_to_topic: str, status_to_topic: str, user_id: str = "default_user", a2a_user_config: Optional[Dict[str, Any]] = None, metadata_override: Optional[Dict[str, Any]] = None) -> Tuple[str, Dict, Dict]` - Constructs topic, payload, and user properties for streaming task requests
  - `cancel_task(agent_name: str, task_id: str, client_id: str, user_id: str = "default_user") -> Tuple[str, Dict, Dict]` - Constructs topic, payload, and user properties for task cancellation
  - `get_agent(agent_name: str) -> Optional[AgentCard]` - Retrieves a specific agent card by name from the registry
  - `get_all_agents() -> List[AgentCard]` - Retrieves all currently discovered agent cards from the registry
  - `process_discovery_message(agent_card: AgentCard)` - Processes an incoming agent card discovery message
  - `agent_registry: AgentRegistry` - The shared agent registry instance
  - `namespace: str` - The namespace string
  - `log_identifier: str` - Identifier used for logging

**Functions:**
None (all functionality is encapsulated in the CoreA2AService class)

**Constants/Variables:**
None

**Usage Examples:**
```python
# Import required dependencies
from solace_agent_mesh.core_a2a.service import CoreA2AService
from solace_agent_mesh.common.agent_registry import AgentRegistry
from a2a.types import A2AMessage, AgentCard

# Initialize the service
agent_registry = AgentRegistry()
namespace = "my_NAMESPACE"
service = CoreA2AService(agent_registry, namespace)

# Submit a regular task
message = A2AMessage(parts=[{"type": "text", "content": "Hello"}])
topic, payload, user_props = service.submit_task(
    agent_name="my_agent",
    a2a_message=message,
    session_id="session_123",
    client_id="client_456",
    reply_to_topic="responses/client_456",
    user_id="user_789"
)

# Submit a streaming task
topic, payload, user_props = service.submit_streaming_task(
    agent_name="my_agent",
    a2a_message=message,
    session_id="session_123",
    client_id="client_456",
    reply_to_topic="responses/client_456",
    status_to_topic="status/client_456",
    user_id="user_789"
)

# Cancel a task
topic, payload, user_props = service.cancel_task(
    agent_name="my_agent",
    task_id="task-abc123",
    client_id="client_456"
)

# Get agent information
agent = service.get_agent("my_agent")
all_agents = service.get_all_agents()

# Process discovery message
agent_card = AgentCard(name="new_agent", description="A new agent")
service.process_discovery_message(agent_card)
```

# content_hash: fcc947b32fa06db11f6cdf1064dfd9a68ee152373677a574602c182fc8c62f56

================================================================================

## Section 22: solace_agent_mesh/gateway/base/base_llm.txt

**Source file:** `solace_agent_mesh/gateway/base/base_llm.txt`

# DEVELOPER GUIDE: base

## Quick Summary
The `base` directory provides foundational abstract classes for building Gateway implementations within the Solace AI Connector. It establishes a framework for handling common gateway tasks such as application configuration, Solace broker integration, A2A (Agent-to-Agent) message protocol handling, and managing the lifecycle of requests from external platforms. Developers should subclass `BaseGatewayApp` and `BaseGatewayComponent` to create new gateways.

## Files Overview
- `__init__.py` - Marks the directory as a Python package
- `app.py` - Contains the base application class that handles configuration, schema merging, and broker setup
- `component.py` - Contains the core logic class for processing A2A messages and integrating with external platforms
- `task_context.py` - Provides a thread-safe manager for mapping A2A task IDs to their original request context

## Developer API Reference

### __init__.py
**Purpose:** Initializes the `gateway.base` Python package
**Import:** `from solace_agent_mesh.gateway.base import ...`

---

### app.py
**Purpose:** Provides the base application class for gateway implementations with automated configuration schema merging, Solace broker setup, and component instantiation
**Import:** `from solace_agent_mesh.gateway.base.app import BaseGatewayApp, BaseGatewayComponent`

**Classes:**
- `BaseGatewayComponent(ComponentBase)` - Base marker class for gateway components
- `BaseGatewayApp(app_info: Dict[str, Any], **kwargs)` - Main application class to be subclassed for new gateways
  - `_get_gateway_component_class(self) -> Type[BaseGatewayComponent]` - **[Abstract Method]** Must return the specific gateway component class
  - `namespace: str` - Absolute topic prefix for A2A communication (e.g., 'myorg/dev')
  - `gateway_id: str` - Unique ID for this gateway instance (auto-generated if not provided)
  - `artifact_service_config: Dict` - Configuration for the shared ADK Artifact Service
  - `enable_embed_resolution: bool` - Flag to enable/disable late-stage 'artifact_content' embed resolution
  - `gateway_max_artifact_resolve_size_bytes: int` - Maximum size for resolving artifacts (default: 104857600)
  - `gateway_recursive_embed_depth: int` - Maximum depth for recursive embed resolution (default: 12)

**Constants/Variables:**
- `BASE_GATEWAY_APP_SCHEMA: Dict[str, List[Dict[str, Any]]]` - Base configuration schema automatically merged with subclass parameters
- `SPECIFIC_APP_SCHEMA_PARAMS_ATTRIBUTE_NAME: str` - Class attribute name ("SPECIFIC_APP_SCHEMA_PARAMS") for subclass-specific config parameters

**Usage Examples:**
```python
from typing import Type, List, Dict, Any
from solace_agent_mesh.gateway.base.app import BaseGatewayApp
from .component import MyGatewayComponent

class MyGatewayApp(BaseGatewayApp):
    """Custom gateway application for My Platform."""
    
    # Define additional configuration parameters
    SPECIFIC_APP_SCHEMA_PARAMS: List[Dict[str, Any]] = [
        {
            "name": "my_platform_api_key",
            "required": True,
            "type": "string",
            "description": "API key for connecting to My Platform."
        }
    ]

    def _get_gateway_component_class(self) -> Type[MyGatewayComponent]:
        return MyGatewayComponent

# Usage in YAML config:
# app_config:
#   namespace: "myorg/prod"
#   gateway_id: "my-gateway-instance-01"
#   artifact_service:
#     type: "local_file"
#     base_path: "/data/artifacts"
#   my_platform_api_key: "secret-key-here"
```

---

### component.py
**Purpose:** Provides the abstract base class for gateway components containing core A2A protocol logic, service management, and external platform integration interface
**Import:** `from solace_agent_mesh.gateway.base.component import BaseGatewayComponent`

**Classes:**
- `BaseGatewayComponent(**kwargs: Any)` - Abstract base class for gateway components
  - **Public Methods:**
    - `get_config(self, key: str, default: Any = None) -> Any` - Retrieves configuration from nested app_config or component_config
    - `publish_a2a_message(self, topic: str, payload: Dict, user_properties: Optional[Dict] = None) -> None` - Publishes A2A messages to Solace broker
    - `authenticate_and_enrich_user(self, external_event_data: Any) -> Optional[Dict[str, Any]]` - Orchestrates user authentication and identity enrichment
    - `submit_a2a_task(self, target_agent_name: str, a2a_parts: List[ContentPart], external_request_context: Dict[str, Any], user_identity: Any, is_streaming: bool = True, api_version: str = "v2") -> str` - Submits task to target agent, returns task_id
    - `run(self) -> None` - Starts component's async operations and external platform listener
    - `cleanup(self) -> None` - Cleans up resources and stops background threads
  - **Abstract Methods (Must be Implemented):**
    - `_extract_initial_claims(self, external_event_data: Any) -> Optional[Dict[str, Any]]` - Extract identity claims from platform event (must return dict with 'id' key)
    - `_start_listener(self) -> None` - Start external platform listener (e.g., web server, WebSocket)
    - `_stop_listener(self) -> None` - Stop external platform listener
    - `_translate_external_input(self, external_event: Any) -> Tuple[str, List[ContentPart], Dict[str, Any]]` - Convert external event to A2A format: (target_agent_name, a2a_parts, context)
    - `_send_update_to_external(self, external_request_context: Dict[str, Any], event_data: Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent], is_final_chunk_of_update: bool) -> None` - Send streaming update to external platform
    - `_send_final_response_to_external(self, external_request_context: Dict[str, Any], task_data: Task) -> None` - Send final response to external platform
    - `_send_error_to_external(self, external_request_context: Dict[str, Any], error_data: JSONRPCError) -> None` - Send error to external platform
  - **Properties:**
    - `namespace: str` - A2A communication namespace
    - `gateway_id: str` - Unique gateway instance ID
    - `agent_registry: AgentRegistry` - Registry for discovered agents
    - `core_a2a_service: CoreA2AService` - Core A2A protocol service
    - `shared_artifact_service: Optional[BaseArtifactService]` - Artifact service instance
    - `task_context_manager: TaskContextManager` - Thread-safe task context storage
    - `identity_service: Optional[BaseIdentityService]` - Identity enrichment service

**Usage Examples:**
```python
from typing import Any, Dict, List, Optional, Tuple, Union
from solace_agent_mesh.gateway.base.component import BaseGatewayComponent
from solace_agent_mesh.common.a2a.types import ContentPart
from a2a.types import TextPart, Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, JSONRPCError

class MyGatewayComponent(BaseGatewayComponent):
    
    async def _extract_initial_claims(self, external_event_data: Any) -> Optional[Dict[str, Any]]:
        """Extract user identity from platform-specific event."""
        # Example for HTTP request
        if hasattr(external_event_data, 'headers'):
            user_id = external_event_data.headers.get('X-User-ID')
            if user_id:
                return {"id": user_id, "source": "http_header"}
        return None
    
    def _start_listener(self) -> None:
        """Start your platform listener (web server, etc.)."""
        # Example: Start FastAPI server, WebSocket connection, etc.
        pass
    
    def _stop_listener(self) -> None:
        """Stop your platform listener."""
        pass
    
    async def _translate_external_input(self, external_event: Any) -> Tuple[str, List[ContentPart], Dict[str, Any]]:
        """Convert external event to A2A format."""
        # Example translation
        target_agent = "my-agent"
        message_text = getattr(external_event, 'message', 'Hello')
        a2a_parts = [TextPart(text=message_text)]
        context = {
            "platform": "my_platform",
            "user_id_for_artifacts": "user123",
            "a2a_session_id": "session456"
        }
        return target_agent, a2a_parts, context
    
    async def _send_update_to_external(self, external_request_context: Dict[str, Any], 
                                     event_data: Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent], 
                                     is_final_chunk_of_update: bool) -> None:
        """Send streaming update back to external platform."""
        # Extract text from event and send to your platform
        pass
    
    async def _send_final_response_to_external(self, external_request_context: Dict[str, Any], 
                                             task_data: Task) -> None:
        """Send final response back to external platform."""
        # Extract final result and send to your platform
        pass
    
    async def _send_error_to_external(self, external_request_context: Dict[str, Any], 
                                     error_data: JSONRPCError) -> None:
        """Send error back to external platform."""
        # Send error message to your platform
        pass

# Usage in your handler:
async def handle_external_request(request):
    # Authenticate user
    user_identity = await component.authenticate_and_enrich_user(request)
    if not user_identity:
        return "Authentication failed"
    
    # Translate request
    target_agent, a2a_parts, context = await component._translate_external_input(request)
    
    # Submit to A2A system
    task_id = await component.submit_a2a_task(
        target_agent_name=target_agent,
        a2a_parts=a2a_parts,
        external_request_context=context,
        user_identity=user_identity,
        is_streaming=True
    )
    
    return f"Task submitted: {task_id}"
```

---

### task_context.py
**Purpose:** Provides thread-safe storage for mapping A2A task IDs to their original external request context
**Import:** `from solace_agent_mesh.gateway.base.task_context import TaskContextManager`

**Classes:**
- `TaskContextManager()` - Thread-safe context storage manager
  - `store_context(self, task_id: str, context_data: Dict[str, Any]) -> None` - Store context for a task ID
  - `get_context(self, task_id: str) -> Optional[Dict[str, Any]]` - Retrieve context for a task ID
  - `remove_context(self, task_id: str) -> Optional[Dict[str, Any]]` - Remove and return context for a task ID
  - `clear_all_contexts_for_testing(self) -> None` - Clear all contexts (testing only)

**Usage Examples:**
```python
from solace_agent_mesh.gateway.base.task_context import TaskContextManager

# Initialize manager
context_manager = TaskContextManager()

# Store context when submitting task
task_id = "gdk-task-abc123"
context = {
    "platform": "slack",
    "channel_id": "C1234567890",
    "thread_ts": "1234567890.123456",
    "user_identity": {"id": "user123", "name": "John Doe"}
}
context_manager.store_context(task_id, context)

# Retrieve context when processing response
retrieved_context = context_manager.get_context(task_id)
if retrieved_context:
    channel_id = retrieved_context["channel_id"]
    # Send response back to Slack channel

# Clean up when task is complete
context_manager.remove_context(task_id)
```

# content_hash: 427a78e5f29b1bf3caf01b3388c0cc4925fdbd3d519920aade46598c9062e02a

================================================================================

## Section 23: solace_agent_mesh/gateway/gateway_llm.txt

**Source file:** `solace_agent_mesh/gateway/gateway_llm.txt`

# DEVELOPER GUIDE: gateway

## Quick Summary
The `gateway` directory provides a comprehensive framework for building gateway implementations that bridge external platforms with the Solace AI Connector's A2A (Agent-to-Agent) messaging system. The architecture consists of a foundational base framework and three specialized gateway implementations: HTTP/SSE for web interfaces, Slack for team collaboration, and Webhook for external system integration. All gateways share common patterns for authentication, message translation, and real-time communication while providing platform-specific features.

## Files and Subdirectories Overview
- **Direct files:**
  - `__init__.py`: Marks the directory as a Python package.
- **Subdirectories:**
  - `base/`: Foundational classes and utilities for building all gateway implementations.
  - `http_sse/`: A complete HTTP/SSE gateway with a FastAPI web server for real-time web UI backends.
  - `slack/`: A gateway for integrating with the Slack collaboration platform.
  - `webhook/`: A universal webhook gateway for receiving HTTP requests from external systems.

## Developer API Reference

### Direct Files

#### __init__.py
**Purpose:** Initializes the `gateway` module, making it a Python package.
**Import:** `from solace_agent_mesh.gateway import ...`

**Classes/Functions/Constants:**
This file is empty and contains no direct exports.

### Subdirectory APIs

#### base/
**Purpose:** Provides the foundational, abstract classes for building all Gateway implementations. It establishes a framework for configuration, A2A message handling, and managing the lifecycle of requests from external platforms.
**Key Exports:** `BaseGatewayApp`, `BaseGatewayComponent`, `TaskContextManager`
**Import Examples:**
```python
from solace_agent_mesh.gateway.base.app import BaseGatewayApp
from solace_agent_mesh.gateway.base.component import BaseGatewayComponent
from solace_agent_mesh.gateway.base.task_context import TaskContextManager
```

#### http_sse/
**Purpose:** Implements a complete HTTP/SSE gateway to serve a web-based user interface, bridging web protocols with the backend A2A messaging fabric.
**Key Exports:** `WebUIBackendApp`, `WebUIBackendComponent`, `SSEManager`, `SessionManager`, and various dependency injectors.
**Import Examples:**
```python
from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp
from solace_agent_mesh.gateway.http_sse.component import WebUIBackendComponent
from solace_agent_mesh.gateway.http_sse.sse_manager import SSEManager
from solace_agent_mesh.gateway.http_sse.session_manager import SessionManager
from solace_agent_mesh.gateway.http_sse.dependencies import get_agent_service, get_task_service, get_user_id
```

#### slack/
**Purpose:** Provides a gateway for integrating the Solace AI Connector with the Slack collaboration platform, enabling bot interactions within Slack channels and threads.
**Key Exports:** `SlackGatewayApp`, `SlackGatewayComponent`, and various utility functions.
**Import Examples:**
```python
from solace_agent_mesh.gateway.slack.app import SlackGatewayApp
from solace_agent_mesh.gateway.slack.component import SlackGatewayComponent
from solace_agent_mesh.gateway.slack.utils import generate_a2a_session_id, send_slack_message, correct_slack_markdown
```

#### webhook/
**Purpose:** Provides a universal webhook gateway for receiving HTTP requests from external systems and triggering A2A tasks. It is highly configurable for different authentication methods, payload formats, and target agents.
**Key Exports:** `WebhookGatewayApp`, `WebhookGatewayComponent`
**Import Examples:**
```python
from solace_agent_mesh.gateway.webhook.app import WebhookGatewayApp
from solace_agent_mesh.gateway.webhook.component import WebhookGatewayComponent
from solace_agent_mesh.gateway.webhook.dependencies import get_sac_component
```

## Complete Usage Guide

### 1. Creating a Custom Gateway

This example shows how to use the `base` module to build a new gateway for a hypothetical external platform.

```python
# my_gateway/app.py
from typing import Type, List, Dict, Any
from solace_agent_mesh.gateway.base.app import BaseGatewayApp
from .component import MyGatewayComponent

class MyGatewayApp(BaseGatewayApp):
    """Defines the application and its configuration for My Platform."""
    
    SPECIFIC_APP_SCHEMA_PARAMS: List[Dict[str, Any]] = [
        {
            "name": "my_platform_api_key",
            "required": True,
            "type": "string",
            "description": "API key for connecting to My Platform."
        },
        {
            "name": "my_platform_webhook_url",
            "required": False,
            "type": "string",
            "description": "Webhook URL for receiving events from My Platform."
        }
    ]

    def _get_gateway_component_class(self) -> Type[MyGatewayComponent]:
        return MyGatewayComponent

# my_gateway/component.py
from typing import Any, Dict, List, Optional, Tuple, Union
from solace_agent_mesh.gateway.base.component import BaseGatewayComponent
from a2a.types import Part as A2APart, TextPart, Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, JSONRPCError

class MyGatewayComponent(BaseGatewayComponent):
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.api_key = self.get_config("my_platform_api_key")
        self.webhook_url = self.get_config("my_platform_webhook_url")
        self.server = None
    
    async def _extract_initial_claims(self, external_event_data: Any) -> Optional[Dict[str, Any]]:
        """Extract user identity from platform-specific event."""
        if hasattr(external_event_data, 'user_id'):
            return {"id": external_event_data.user_id, "source": "my_platform"}
        return None
    
    def _start_listener(self) -> None:
        """Start your platform listener (web server, etc.)."""
        # Example: Start a web server to receive webhooks
        from fastapi import FastAPI
        import uvicorn
        
        app = FastAPI()
        
        @app.post("/webhook")
        async def handle_webhook(request_data: dict):
            await self.handle_external_event(request_data)
            return {"status": "ok"}
        
        self.server = uvicorn.Server(
            uvicorn.Config(app, host="0.0.0.0", port=8080)
        )
        # Start server in background thread
        import threading
        self.server_thread = threading.Thread(target=self.server.run)
        self.server_thread.start()
    
    def _stop_listener(self) -> None:
        """Stop your platform listener."""
        if self.server:
            self.server.should_exit = True
    
    def _translate_external_input(self, external_event: Any) -> Tuple[str, List[A2APart], Dict[str, Any]]:
        """Convert external event to A2A format."""
        target_agent = external_event.get("target_agent", "default-agent")
        message_text = external_event.get("message", "Hello")
        
        a2a_parts = [TextPart(text=message_text)]
        
        context = {
            "platform": "my_platform",
            "user_id_for_artifacts": external_event.get("user_id"),
            "a2a_session_id": f"my-platform-{external_event.get('session_id', 'default')}"
        }
        
        return target_agent, a2a_parts, context
    
    async def _send_update_to_external(self, external_request_context: Dict[str, Any], 
                                     event_data: Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent], 
                                     is_final_chunk_of_update: bool) -> None:
        """Send streaming update back to external platform."""
        # Extract text from event and send to your platform
        if hasattr(event_data, 'text_delta'):
            await self.send_to_platform(
                user_id=external_request_context["user_id_for_artifacts"],
                message=event_data.text_delta,
                is_partial=not is_final_chunk_of_update
            )
    
    async def _send_final_response_to_external(self, external_request_context: Dict[str, Any], 
                                             task_data: Task) -> None:
        """Send final response back to external platform."""
        final_text = ""
        for part in task_data.result.parts:
            if hasattr(part, 'text'):
                final_text += part.text
        
        await self.send_to_platform(
            user_id=external_request_context["user_id_for_artifacts"],
            message=final_text,
            is_final=True
        )
    
    async def _send_error_to_external(self, external_request_context: Dict[str, Any], 
                                     error_data: JSONRPCError) -> None:
        """Send error back to external platform."""
        await self.send_to_platform(
            user_id=external_request_context["user_id_for_artifacts"],
            message=f"Error: {error_data.message}",
            is_error=True
        )
    
    async def send_to_platform(self, user_id: str, message: str, **kwargs):
        """Helper method to send messages to your platform."""
        # Implement your platform-specific sending logic here
        print(f"Sending to {user_id}: {message}")

# Usage
if __name__ == "__main__":
    app_config = {
        "namespace": "myorg/prod",
        "gateway_id": "my-gateway-01",
        "my_platform_api_key": "secret-key-here",
        "my_platform_webhook_url": "https://myplatform.com/webhook"
    }
    
    gateway_app = MyGatewayApp(app_info=app_config)
    gateway_app.run()
```

### 2. Using the HTTP/SSE Gateway

```python
from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp

# Configuration for the web UI gateway
app_config = {
    "name": "my-webui-gateway",
    "session_secret_key": "your-secret-key-here",
    "fastapi_host": "0.0.0.0",
    "fastapi_port": 8000,
    "namespace": "myorg/prod",
    "gateway_id": "webui-gateway-01",
    "cors_allowed_origins": ["http://localhost:3000"],
    "frontend_welcome_message": "Welcome to my A2A system!",
    "frontend_bot_name": "My Assistant",
    "frontend_enable_file_upload": True,
    "frontend_enable_agent_selection": True
}

# Initialize and run the web UI gateway
webui_app = WebUIBackendApp(app_info=app_config)
webui_app.run()

# The gateway will start a FastAPI server with endpoints like:
# GET /api/agents - List available agents
# POST /api/tasks - Submit new tasks
# GET /api/tasks/{task_id}/sse - Stream task updates via SSE
# POST /api/artifacts - Upload files
```

### 3. Creating Custom API Endpoints for HTTP/SSE Gateway

```python
from fastapi import APIRouter, Depends, Request
from solace_agent_mesh.gateway.http_sse.dependencies import (
    get_agent_registry,
    get_user_id,
    get_publish_a2a_func,
    get_sse_manager,
    get_session_manager
)

# Create a custom router
custom_router = APIRouter(prefix="/api/custom")

@custom_router.get("/my-agents")
async def get_my_agents(
    user_id: str = Depends(get_user_id),
    agent_registry = Depends(get_agent_registry)
):
    """Get agents filtered by user permissions."""
    all_agents = agent_registry.get_all_agents()
    # Filter agents based on user permissions
    user_agents = [agent for agent in all_agents if user_can_access_agent(user_id, agent)]
    return {"agents": user_agents, "user_id": user_id}

@custom_router.post("/broadcast-message")
async def broadcast_message(
    message: str,
    publish_func = Depends(get_publish_a2a_func),
    user_id: str = Depends(get_user_id)
):
    """Broadcast a message to all agents."""
    publish_func(
        topic="/myorg/prod/a2a/v1/broadcast",
        payload={
            "method": "broadcast/message",
            "params": {"message": message, "from_user": user_id}
        },
        user_properties={"clientId": user_id}
    )
    return {"status": "broadcasted", "message": message}

# Add the router to your FastAPI app
# This would typically be done in the main.py or during app setup
```

### 4. Using the Slack Gateway

```python
from solace_agent_mesh.gateway.slack.app import SlackGatewayApp

# Configuration for Slack gateway
slack_config = {
    "name": "my-slack-gateway",
    "namespace": "myorg/prod",
    "gateway_id": "slack-gateway-01",
    "slack_bot_token": "xoxb-your-bot-token",
    "slack_app_token": "xapp-your-app-token",
    "slack_signing_secret": "your-signing-secret",
    "default_agent_name": "assistant",
    "enable_socket_mode": True,
    "enable_file_upload": True
}

# Initialize and run the Slack gateway
slack_app = SlackGatewayApp(app_info=slack_config)
slack_app.run()

# The gateway will:
# - Connect to Slack via Socket Mode or HTTP
# - Listen for mentions and direct messages
# - Translate Slack messages to A2A format
# - Send responses back to Slack channels/threads
```

### 5. Using the Webhook Gateway

```python
from solace_agent_mesh.gateway.webhook.app import WebhookGatewayApp

# Configuration for webhook gateway
webhook_config = {
    "name": "my-webhook-gateway",
    "namespace": "myorg/prod",
    "gateway_id": "webhook-gateway-01",
    "webhook_host": "0.0.0.0",
    "webhook_port": 9000,
    "webhook_path": "/webhook",
    "target_agent_name": "data-processor",
    "auth_method": "header",
    "auth_header_name": "X-API-Key",
    "auth_header_value": "secret-webhook-key",
    "payload_format": "json",
    "user_id_extraction": {
        "method": "json_path",
        "path": "$.user.id"
    }
}

# Initialize and run the webhook gateway
webhook_app = WebhookGatewayApp(app_info=webhook_config)
webhook_app.run()

# The gateway will:
# - Start an HTTP server on the specified host/port
# - Authenticate incoming requests
# - Extract user identity from payloads
# - Convert webhook data to A2A messages
# - Send to the specified target agent
```

### 6. Working with Task Context Management

```python
from solace_agent_mesh.gateway.base.task_context import TaskContextManager

# Initialize the task context manager (usually done by BaseGat

# content_hash: 8ff3c690b39ed85edd13c1e9f2965e9da9050eeeea66823dbbad322f600f0d09

================================================================================

## Section 24: solace_agent_mesh/gateway/http_sse/components/components_llm.txt

**Source file:** `solace_agent_mesh/gateway/http_sse/components/components_llm.txt`

# DEVELOPER GUIDE: components

## Quick Summary
This directory contains components for the HTTP SSE (Server-Sent Events) gateway, designed to work within the Solace AI Connector (SAC) framework. The primary component forwards messages received from the Solace broker to an internal queue, enabling real-time visualization in a web-based user interface.

## Files Overview
- `__init__.py` - Makes the `VisualizationForwarderComponent` class directly importable from the `components` package
- `visualization_forwarder_component.py` - Defines a SAC component that forwards messages from a broker input to a Python `queue.Queue` for visualization

## Developer API Reference

### __init__.py
**Purpose:** Exposes the public components of this directory for easy importing
**Import:** `from solace_agent_mesh.gateway.http_sse.components import VisualizationForwarderComponent`

**Exports:**
- `VisualizationForwarderComponent` - The main component class for forwarding messages to a visualization queue

### visualization_forwarder_component.py
**Purpose:** A Solace AI Connector (SAC) component that listens for messages from a `BrokerInput` and forwards them to a specified Python `queue.Queue` for real-time visualization
**Import:** `from solace_agent_mesh.gateway.http_sse.components.visualization_forwarder_component import VisualizationForwarderComponent`

**Classes:**
- `VisualizationForwarderComponent(**kwargs: Any)` - A component that forwards messages to a target queue, initialized with configuration parameters including `target_queue_ref`
  - `invoke(message: SolaceMessage, data: Dict[str, Any]) -> None` - Core method called by SAC framework for each incoming message; formats data and places it onto the target queue
  - `target_queue: queue.Queue` - The queue instance where messages are forwarded

**Constants/Variables:**
- `info: Dict` - Metadata dictionary required by SAC framework describing component configuration, input schema, and purpose

**Usage Examples:**
```python
import queue
from solace_agent_mesh.gateway.http_sse.components import VisualizationForwarderComponent
from solace_ai_connector.common.message import Message as SolaceMessage

# 1. Create a target queue that will receive the forwarded messages
visualization_queue = queue.Queue()

# 2. Instantiate the component with target queue reference
forwarder = VisualizationForwarderComponent(
    name="my_forwarder",
    target_queue_ref=visualization_queue
)

# 3. The invoke method is called automatically by SAC framework
# when messages arrive from connected BrokerInput component

# 4. Consume forwarded messages from the queue
if not visualization_queue.empty():
    forwarded_data = visualization_queue.get()
    print(f"Topic: {forwarded_data['topic']}")
    print(f"Payload: {forwarded_data['payload']}")
    print(f"User Properties: {forwarded_data['user_properties']}")
    
# Expected structure of forwarded_data:
# {
#     "topic": "some/broker/topic",
#     "payload": {"key": "value"},
#     "user_properties": {"prop1": "value1"},
#     "_original_broker_message": <SolaceMessage object>
# }
```

# content_hash: cee7f7b4ea7e87ab3f94f7e24d463f22fa045e50d54991c61b84fe95e8a7f77d

================================================================================

## Section 25: solace_agent_mesh/gateway/http_sse/http_sse_llm.txt

**Source file:** `solace_agent_mesh/gateway/http_sse/http_sse_llm.txt`

# DEVELOPER GUIDE for the http_sse directory

## Quick Summary
The `http_sse` directory implements a complete HTTP/SSE (Server-Sent Events) gateway for the A2A (Agent-to-Agent) system. Its primary purpose is to serve a web-based user interface and act as a bridge between standard web protocols (HTTP, WebSockets/SSE) and the backend A2A messaging fabric.

The architecture is centered around the `WebUIBackendComponent`, a custom Solace AI Connector (SAC) component that hosts an embedded FastAPI web server. This component manages shared state and resources, such as the `SSEManager` for real-time updates, the `SessionManager` for user sessions, and the `AgentRegistry` for discovering available agents.

Subdirectories organize the functionality:
- `routers/` defines the REST API endpoints (e.g., `/tasks`, `/agents`).
- `services/` contains the business logic that the API endpoints call.
- `dependencies.py` uses FastAPI's dependency injection system to provide the routers and services with safe access to the shared resources managed by the main component.
- `components/` contains specialized SAC components, for example, to forward A2A messages for real-time visualization.

This design creates a clean separation of concerns, where the web layer (FastAPI) is decoupled from the core messaging and state management layer (SAC Component).

## Files and Subdirectories Overview
- **Direct files:**
  - `__init__.py`: Standard Python package initializer.
  - `app.py`: Defines the main SAC `WebUIBackendApp`, which specifies configuration and launches the component.
  - `component.py`: The core SAC component that hosts the FastAPI server and manages all shared resources and A2A logic.
  - `dependencies.py`: Provides FastAPI dependency injectors for accessing shared resources like services and managers.
  - `main.py`: The main FastAPI application instance, including middleware, router mounting, and exception handling.
  - `session_manager.py`: Manages web user sessions and maps them to unique A2A client and session IDs.
  - `sse_manager.py`: Manages Server-Sent Event (SSE) connections for streaming real-time updates to clients.
- **Subdirectories:**
  - `components/`: Contains specialized SAC components, such as for forwarding messages to the visualization system.
  - `routers/`: Defines the FastAPI `APIRouter` modules for all REST API endpoints.
  - `services/`: Encapsulates business logic for agents, tasks, and other domain-specific operations.

## Developer API Reference

### Direct Files

#### app.py
**Purpose:** This file defines the `WebUIBackendApp`, a custom SAC (Solace AI Connector) App class. It is responsible for defining the configuration schema for the entire HTTP/SSE gateway and programmatically creating the `WebUIBackendComponent`.
**Import:** `from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp`

**Classes/Functions/Constants:**
- **`WebUIBackendApp(BaseGatewayApp)`**: The main application class. It extends `BaseGatewayApp` and adds a list of WebUI-specific configuration parameters to the application schema.
- **`SPECIFIC_APP_SCHEMA_PARAMS: List[Dict[str, Any]]`**: A constant list defining the configuration parameters specific to the HTTP/SSE gateway, such as `session_secret_key`, `fastapi_host`, `fastapi_port`, and various frontend-related settings.

#### component.py
**Purpose:** This is the core component of the gateway. It hosts the FastAPI server, manages all shared state (like the SSE and Session managers), handles the lifecycle of the web server, and implements the logic for translating between external HTTP requests and internal A2A messages.
**Import:** `from solace_agent_mesh.gateway.http_sse.component import WebUIBackendComponent`

**Classes/Functions/Constants:**
- **`WebUIBackendComponent(BaseGatewayComponent)`**: The main component class. Developers will primarily interact with its instances via the dependency injection system.
  - **Public Accessor Methods (for Dependencies):**
    - `get_sse_manager() -> SSEManager`: Returns the shared `SSEManager` instance.
    - `get_session_manager() -> SessionManager`: Returns the shared `SessionManager` instance.
    - `get_agent_registry() -> AgentRegistry`: Returns the shared `AgentRegistry` instance.
    - `get_core_a2a_service() -> CoreA2AService`: Returns the core service for creating A2A messages.
    - `get_shared_artifact_service() -> Optional[BaseArtifactService]`: Returns the service for artifact storage.
    - `get_namespace() -> str`: Returns the configured A2A namespace.
    - `get_gateway_id() -> str`: Returns the unique ID of this gateway.
  - **Core Logic Methods:**
    - `publish_a2a(topic: str, payload: Dict, user_properties: Optional[Dict] = None)`: Publishes a message onto the A2A messaging fabric. This is the primary method for sending data to agents.
  - **Gateway-Development-Kit (GDK) Hooks:** These methods implement the `BaseGatewayComponent` abstract interface.
    - `_start_listener()`: Starts the FastAPI/Uvicorn server.
    - `_stop_listener()`: Stops the FastAPI/Uvicorn server.
    - `_translate_external_input(...)`: Translates an incoming HTTP request (e.g., form data with files) into a structured A2A message (`List[ContentPart]`).
    - `_send_update_to_external(...)`: Sends an intermediate status update from an agent back to the client via SSE.
    - `_send_final_response_to_external(...)`: Sends the final A2A Task result to the client via SSE.
    - `_send_error_to_external(...)`: Sends an error notification to the client via SSE.

#### dependencies.py
**Purpose:** Defines FastAPI dependency injectors to access shared resources managed by the WebUIBackendComponent.
**Import:** `from solace_agent_mesh.gateway.http_sse.dependencies import *`

**Functions:**
- `get_sac_component() -> WebUIBackendComponent`: Returns the main component instance.
- `get_agent_registry() -> AgentRegistry`: Returns the shared agent registry.
- `get_sse_manager() -> SSEManager`: Returns the SSE manager for real-time updates.
- `get_session_manager() -> SessionManager`: Returns the session manager.
- `get_user_id(request: Request) -> str`: Returns the current user's identity.
- `get_publish_a2a_func() -> PublishFunc`: Returns the function for publishing A2A messages.
- `get_core_a2a_service() -> CoreA2AService`: Returns the core A2A service.
- `get_shared_artifact_service() -> Optional[BaseArtifactService]`: Returns the artifact service.

#### main.py
**Purpose:** Defines the FastAPI application instance, mounts routers, and configures middleware.
**Import:** `from solace_agent_mesh.gateway.http_sse.main import app, setup_dependencies`

**Classes/Functions/Constants:**
- **`app: FastAPI`**: The main FastAPI application instance.
- **`setup_dependencies(component: WebUIBackendComponent)`**: Configures middleware, routers, and dependency injection based on the component instance.

#### session_manager.py
**Purpose:** Manages web user sessions and mapping to A2A Client IDs.
**Import:** `from solace_agent_mesh.gateway.http_sse.session_manager import SessionManager`

**Classes/Functions/Constants:**
- **`SessionManager(secret_key: str, app_config: Dict[str, Any])`**: Manages user sessions and A2A identity mapping.
  - `get_a2a_client_id(request: Request) -> str`: Returns the A2A client ID for the current request.
  - `start_new_a2a_session(request: Request) -> str`: Creates a new A2A session ID.
  - `ensure_a2a_session(request: Request) -> str`: Ensures a session ID exists, creating one if necessary.
  - `store_auth_tokens(request: Request, access_token: str, refresh_token: Optional[str])`: Stores authentication tokens.
  - `get_access_token(request: Request) -> Optional[str]`: Retrieves stored access token.

#### sse_manager.py
**Purpose:** Manages Server-Sent Event (SSE) connections for streaming task updates.
**Import:** `from solace_agent_mesh.gateway.http_sse.sse_manager import SSEManager`

**Classes/Functions/Constants:**
- **`SSEManager(max_queue_size: int = 200)`**: Manages active SSE connections and distributes events.
  - `create_sse_connection(task_id: str) -> asyncio.Queue`: Creates a new SSE connection queue for a task.
  - `send_event(task_id: str, event_data: Dict[str, Any], event_type: str)`: Sends an event to all connections for a task.
  - `close_all_for_task(task_id: str)`: Closes all SSE connections for a specific task.
  - `close_all()`: Closes all active SSE connections.

### Subdirectory APIs

#### components/
**Purpose:** Contains specialized SAC components for message forwarding and visualization
**Key Exports:** `VisualizationForwarderComponent`
**Import Examples:**
```python
from solace_agent_mesh.gateway.http_sse.components import VisualizationForwarderComponent
```

#### routers/
**Purpose:** Defines FastAPI APIRouter modules for all REST API endpoints
**Key Exports:** Router instances for agents, tasks, SSE, artifacts, auth, config, sessions, people, users, visualization
**Import Examples:**
```python
from solace_agent_mesh.gateway.http_sse.routers import agents, tasks, sse, artifacts
from solace_agent_mesh.gateway.http_sse.routers.tasks import CancelTaskApiPayload
```

#### services/
**Purpose:** Encapsulates business logic for domain-specific operations
**Key Exports:** `AgentService`, `TaskService`, `PeopleService`
**Import Examples:**
```python
from solace_agent_mesh.gateway.http_sse.services.agent_service import AgentService
from solace_agent_mesh.gateway.http_sse.services.task_service import TaskService
from solace_agent_mesh.gateway.http_sse.services.people_service import PeopleService
```

## Complete Usage Guide

### 1. Setting Up the Gateway Application

```python
from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp

# Create the gateway app with configuration
app_config = {
    "name": "my-webui-gateway",
    "session_secret_key": "your-secret-key-here",
    "fastapi_host": "0.0.0.0",
    "fastapi_port": 8000,
    "namespace": "/my-namespace",
    "gateway_id": "webui-gateway-01",
    "cors_allowed_origins": ["http://localhost:3000"],
    "frontend_welcome_message": "Welcome to my A2A system!",
    "frontend_bot_name": "My Assistant",
    # ... other configuration parameters
}

# Initialize the app
webui_app = WebUIBackendApp(app_info=app_config)

# Run the app (this starts the FastAPI server)
webui_app.run()
```

### 2. Using Dependencies in Custom Routers

```python
from fastapi import APIRouter, Depends
from solace_agent_mesh.gateway.http_sse.dependencies import (
    get_agent_registry,
    get_user_id,
    get_publish_a2a_func,
    get_core_a2a_service
)
from solace_agent_mesh.common.agent_registry import AgentRegistry

router = APIRouter()

@router.get("/my-custom-endpoint")
async def my_custom_endpoint(
    user_id: str = Depends(get_user_id),
    agent_registry: AgentRegistry = Depends(get_agent_registry),
    publish_func = Depends(get_publish_a2a_func)
):
    # Access discovered agents
    agents = agent_registry.get_all_agents()
    
    # Publish a message to the A2A fabric
    publish_func(
        topic=f"/my-namespace/a2a/v1/agent/request/some-agent",
        payload={"method": "custom/request", "params": {"user": user_id}},
        user_properties={"clientId": user_id}
    )
    
    return {"agents_count": len(agents), "user_id": user_id}
```

### 3. Managing SSE Connections

```python
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from solace_agent_mesh.gateway.http_sse.dependencies import get_sse_manager
from solace_agent_mesh.gateway.http_sse.sse_manager import SSEManager
import asyncio
import json

router = APIRouter()

@router.get("/my-sse-endpoint/{task_id}")
async def my_sse_endpoint(
    task_id: str,
    sse_manager: SSEManager = Depends(get_sse_manager)
):
    # Create SSE connection for the task
    connection_queue = await sse_manager.create_sse_connection(task_id)
    
    async def event_generator():
        try:
            while True:
                # Wait for events from the queue
                event = await connection_queue.get()
                if event is None:  # Close signal
                    break
                
                # Format as SSE
                yield f"event: {event['event']}\n"
                yield f"data: {event['data']}\n\n"
                
        except asyncio.CancelledError:
            pass
        finally:
            # Clean up connection
            await sse_manager.remove_sse_connection(task_id, connection_queue)
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

# Send events to SSE connections
async def send_custom_event(sse_manager: SSEManager, task_id: str):
    await sse_manager.send_event(
        task_id=task_id,
        event_data={"message": "Custom event", "timestamp": "2024-01-01T00:00:00Z"},
        event_type="custom_event"
    )
```

### 4. Working with Sessions and User Identity

```python
from fastapi import APIRouter, Depends, Request
from solace_agent_mesh.gateway.http_sse.dependencies import (
    get_session_manager,
    get_user_id,
    ensure_session_id
)
from solace_agent_mesh.gateway.http_sse.session_manager import SessionManager

router = APIRouter()

@router.post("/start-conversation")
async def start_conversation(
    request: Request,
    user_id: str = Depends(get_user_id),
    session_id: str = Depends(ensure_session_id),
    session_manager: SessionManager = Depends(get_session_manager)
):
    # Start a new A2A session for this conversation
    new_session_id = session_manager.start_new_a2a_session(request)
    
    return {
        "user_id": user_id,
        "old_session_id": session_id,
        "new_session_id": new_session_id,
        "message": "New conversation started"
    }
```

### 5. Using Services for Business Logic

```python
from fastapi import APIRouter, Depends
from solace_agent_mesh.gateway.http_sse.dependencies import (
    get_agent_service,
    get_task_service,
    get_people_service
)
from solace_agent_mesh.gateway.http_sse.services.agent_service import AgentService

# content_hash: 765fb33c6a30298c67ac7c1525df5787791b2b37e0d69b046e405aec85287feb

================================================================================

## Section 26: solace_agent_mesh/gateway/http_sse/routers/routers_llm.txt

**Source file:** `solace_agent_mesh/gateway/http_sse/routers/routers_llm.txt`

# DEVELOPER GUIDE for the routers directory

## Quick Summary
The `routers` directory contains FastAPI `APIRouter` modules that define the REST API endpoints for the HTTP SSE Gateway. Each file groups endpoints by a specific domain of functionality, such as agent discovery, artifact management, user authentication, task submission, and real-time event streaming. These routers are the primary interface for frontend applications and other clients to interact with the gateway.

## Files Overview
- `__init__.py`: Marks the directory as a Python package.
- `agents.py`: API endpoints for discovering available A2A agents.
- `artifacts.py`: REST endpoints for managing session-specific artifacts (upload, download, list, delete).
- `auth.py`: Endpoints for handling the user authentication flow (login, callback, refresh, CSRF).
- `config.py`: API endpoint for providing configuration settings to the frontend application.
- `people.py`: API endpoints for user search functionality, typically for autocomplete features.
- `sessions.py`: API endpoints for managing user sessions (creating new sessions, getting current session info).
- `sse.py`: The Server-Sent Events (SSE) endpoint for streaming real-time task updates to the client.
- `tasks.py`: API endpoints for submitting tasks to agents and managing their lifecycle (e.g., cancellation).
- `users.py`: API endpoint for retrieving information about the currently authenticated user.
- `visualization.py`: API endpoints for managing A2A message visualization streams for monitoring and debugging.

## Developer API Reference

### agents.py
**Purpose:** Provides REST endpoints for agent discovery.
**Import:** `from solace_agent_mesh.gateway.http_sse.routers.agents import router`

**Functions:**
- `get_discovered_agents() -> List[AgentCard]` - Retrieves a list of all currently discovered and available A2A agents.

**Usage Examples:**
```python
# To include this router in a FastAPI application
from fastapi import FastAPI
from solace_agent_mesh.gateway.http_sse.routers.agents import router

app = FastAPI()
app.include_router(router, prefix="/api/v1")

# A client would make a GET request to /api/v1/agents
```

### artifacts.py
**Purpose:** Manages session-specific artifacts via REST endpoints (upload, download, list, delete).
**Import:** `from solace_agent_mesh.gateway.http_sse.routers.artifacts import router`

**Functions:**
- `list_artifact_versions(filename: str) -> List[int]` - Lists available version numbers for a specific artifact.
- `list_artifacts() -> List[ArtifactInfo]` - Retrieves detailed information for all artifacts in the current session.
- `get_latest_artifact(filename: str) -> StreamingResponse` - Downloads the latest version of an artifact.
- `get_specific_artifact_version(filename: str, version: Union[int, str]) -> StreamingResponse` - Downloads a specific version of an artifact.
- `get_artifact_by_uri(uri: str) -> StreamingResponse` - Downloads an artifact using its formal artifact:// URI.
- `upload_artifact(filename: str, upload_file: UploadFile, metadata_json: Optional[str]) -> Dict[str, Any]` - Uploads a new artifact version with optional metadata.
- `delete_artifact(filename: str) -> Response` - Deletes an artifact and all its versions.

**Usage Examples:**
```python
# Include in FastAPI app
from fastapi import FastAPI
from solace_agent_mesh.gateway.http_sse.routers.artifacts import router

app = FastAPI()
app.include_router(router, prefix="/api/v1/artifacts")

# Client usage examples:
# GET /api/v1/artifacts/ - List all artifacts
# GET /api/v1/artifacts/myfile.txt - Download latest version
# GET /api/v1/artifacts/myfile.txt/versions/1 - Download specific version
# POST /api/v1/artifacts/myfile.txt - Upload new version
# DELETE /api/v1/artifacts/myfile.txt - Delete artifact
```

### auth.py
**Purpose:** Handles user authentication flow including login, callback, and token refresh.
**Import:** `from solace_agent_mesh.gateway.http_sse.routers.auth import router`

**Functions:**
- `initiate_login(request: FastAPIRequest) -> RedirectResponse` - Initiates the login flow by redirecting to external auth service.
- `get_csrf_token(response: Response) -> Dict[str, str]` - Generates and returns a CSRF token.
- `auth_callback(request: FastAPIRequest) -> RedirectResponse` - Handles the callback from the OIDC provider.
- `refresh_token(request: FastAPIRequest) -> Dict[str, str]` - Refreshes an access token using the external auth service.

**Usage Examples:**
```python
# Include in FastAPI app
from fastapi import FastAPI
from solace_agent_mesh.gateway.http_sse.routers.auth import router

app = FastAPI()
app.include_router(router, prefix="/api/v1")

# Client usage:
# GET /api/v1/auth/login - Start login flow
# GET /api/v1/csrf-token - Get CSRF token
# GET /api/v1/auth/callback - OAuth callback endpoint
# POST /api/v1/auth/refresh - Refresh access token
```

### config.py
**Purpose:** Provides configuration settings needed by the frontend application.
**Import:** `from solace_agent_mesh.gateway.http_sse.routers.config import router`

**Functions:**
- `get_app_config() -> Dict[str, Any]` - Returns frontend configuration settings including auth URLs, welcome messages, and feature flags.

**Usage Examples:**
```python
# Include in FastAPI app
from fastapi import FastAPI
from solace_agent_mesh.gateway.http_sse.routers.config import router

app = FastAPI()
app.include_router(router, prefix="/api/v1")

# Client usage:
# GET /api/v1/config - Get frontend configuration
```

### people.py
**Purpose:** Provides user search functionality for autocomplete features.
**Import:** `from solace_agent_mesh.gateway.http_sse.routers.people import router`

**Functions:**
- `search_people(q: str, limit: int = 10) -> List[Dict[str, Any]]` - Searches for users to populate frontend autocomplete suggestions.

**Usage Examples:**
```python
# Include in FastAPI app
from fastapi import FastAPI
from solace_agent_mesh.gateway.http_sse.routers.people import router

app = FastAPI()
app.include_router(router, prefix="/api/v1")

# Client usage:
# GET /api/v1/people/search?q=john&limit=5 - Search for users
```

### sessions.py
**Purpose:** Manages user sessions including creation and retrieval of session information.
**Import:** `from solace_agent_mesh.gateway.http_sse.routers.sessions import router`

**Functions:**
- `create_new_session(request: FastAPIRequest) -> JSONRPCSuccessResponse` - Forces creation of a new A2A session.
- `get_current_session(request: FastAPIRequest) -> JSONRPCSuccessResponse` - Returns information about the current session.

**Usage Examples:**
```python
# Include in FastAPI app
from fastapi import FastAPI
from solace_agent_mesh.gateway.http_sse.routers.sessions import router

app = FastAPI()
app.include_router(router, prefix="/api/v1/sessions")

# Client usage:
# POST /api/v1/sessions/new - Create new session
# GET /api/v1/sessions/current - Get current session info
```

### sse.py
**Purpose:** Provides Server-Sent Events (SSE) endpoint for streaming real-time task updates.
**Import:** `from solace_agent_mesh.gateway.http_sse.routers.sse import router`

**Functions:**
- `subscribe_to_task_events(task_id: str, request: FastAPIRequest) -> EventSourceResponse` - Establishes an SSE connection for real-time task updates.

**Usage Examples:**
```python
# Include in FastAPI app
from fastapi import FastAPI
from solace_agent_mesh.gateway.http_sse.routers.sse import router

app = FastAPI()
app.include_router(router, prefix="/api/v1/sse")

# Client usage:
# GET /api/v1/sse/subscribe/{task_id} - Subscribe to task events via SSE
```

### tasks.py
**Purpose:** Handles task submission and management including cancellation.
**Import:** `from solace_agent_mesh.gateway.http_sse.routers.tasks import router`

**Functions:**
- `send_task_to_agent(request: FastAPIRequest, payload: SendMessageRequest) -> SendMessageSuccessResponse` - Submits a non-streaming task request.
- `subscribe_task_from_agent(request: FastAPIRequest, payload: SendStreamingMessageRequest) -> SendStreamingMessageSuccessResponse` - Submits a streaming task request.
- `cancel_agent_task(request: FastAPIRequest, taskId: str, payload: CancelTaskRequest) -> Dict[str, str]` - Sends a cancellation request for a specific task.

**Usage Examples:**
```python
# Include in FastAPI app
from fastapi import FastAPI
from solace_agent_mesh.gateway.http_sse.routers.tasks import router

app = FastAPI()
app.include_router(router, prefix="/api/v1/tasks")

# Client usage:
# POST /api/v1/tasks/message:send - Submit non-streaming task
# POST /api/v1/tasks/message:stream - Submit streaming task
# POST /api/v1/tasks/{taskId}:cancel - Cancel a task
```

### users.py
**Purpose:** Provides information about the currently authenticated user.
**Import:** `from solace_agent_mesh.gateway.http_sse.routers.users import router`

**Functions:**
- `get_current_user(request: FastAPIRequest) -> Dict[str, Any]` - Retrieves information about the currently authenticated user.

**Usage Examples:**
```python
# Include in FastAPI app
from fastapi import FastAPI
from solace_agent_mesh.gateway.http_sse.routers.users import router

app = FastAPI()
app.include_router(router, prefix="/api/v1/users")

# Client usage:
# GET /api/v1/users/me - Get current user info
```

### visualization.py
**Purpose:** Manages A2A message visualization streams for monitoring and debugging.
**Import:** `from solace_agent_mesh.gateway.http_sse.routers.visualization import router, SubscriptionTarget, VisualizationSubscribeRequest`

**Classes:**
- `SubscriptionTarget(BaseModel)` - Defines an abstract target for A2A message visualization.
  - `type: str` - Type of target to monitor
  - `identifier: Optional[str]` - Identifier for the target
- `VisualizationSubscribeRequest(BaseModel)` - Request body for initiating a visualization stream.
  - `subscription_targets: Optional[List[SubscriptionTarget]]` - Targets to monitor
  - `client_stream_id: Optional[str]` - Client-generated ID for idempotency
- `VisualizationSubscribeResponse(BaseModel)` - Response for successful visualization subscription.
- `VisualizationConfigUpdateRequest(BaseModel)` - Request for updating stream configuration.
- `VisualizationConfigUpdateResponse(BaseModel)` - Response for configuration updates.

**Functions:**
- `subscribe_to_visualization_stream(request_data: VisualizationSubscribeRequest) -> VisualizationSubscribeResponse` - Initiates a new A2A message visualization stream.
- `get_visualization_stream_events(stream_id: str) -> EventSourceResponse` - Establishes SSE connection for filtered A2A messages.
- `update_visualization_stream_config(stream_id: str, update_request: VisualizationConfigUpdateRequest) -> VisualizationConfigUpdateResponse` - Modifies stream configuration.
- `unsubscribe_from_visualization_stream(stream_id: str) -> Response` - Terminates an active visualization stream.

**Usage Examples:**
```python
# Include in FastAPI app
from fastapi import FastAPI
from solace_agent_mesh.gateway.http_sse.routers.visualization import router, SubscriptionTarget, VisualizationSubscribeRequest

app = FastAPI()
app.include_router(router, prefix="/api/v1/visualization")

# Client usage:
# POST /api/v1/visualization/subscribe - Start visualization stream
# GET /api/v1/visualization/{stream_id}/events - SSE endpoint for events
# PUT /api/v1/visualization/{stream_id}/config - Update stream config
# DELETE /api/v1/visualization/{stream_id}/unsubscribe - Stop stream

# Example subscription request:
targets = [SubscriptionTarget(type="current_namespace_a2a_messages")]
request = VisualizationSubscribeRequest(subscription_targets=targets)
```

# content_hash: 06355b3b169d52d43490f8937b9b15c25a298b29debe3daab4ee810c3131288a

================================================================================

## Section 27: solace_agent_mesh/gateway/http_sse/services/services_llm.txt

**Source file:** `solace_agent_mesh/gateway/http_sse/services/services_llm.txt`

# DEVELOPER GUIDE: services

## Quick Summary
The `services` directory contains the business logic layer for the HTTP SSE Gateway. It provides high-level services for agent management (discovering and retrieving A2A agents), user identity operations (searching for users), and task management (cancelling A2A tasks). These services abstract the complexities of interacting with agent registries, identity providers, and A2A messaging protocols.

## Files Overview
- `__init__.py` - Package initialization file marking the directory as a Python package
- `agent_service.py` - Service for retrieving information about discovered A2A agents from the registry
- `people_service.py` - Service for searching users via configured identity services
- `task_service.py` - Service for handling A2A task cancellation operations

## Developer API Reference

### __init__.py
**Purpose:** Marks the services directory as a Python package
**Import:** N/A - No public interfaces

### agent_service.py
**Purpose:** Provides methods for accessing information about discovered A2A agents from the shared AgentRegistry
**Import:** `from solace_agent_mesh.gateway.http_sse.services.agent_service import AgentService`

**Classes:**
- `AgentService(agent_registry: AgentRegistry)` - Service for accessing discovered A2A agent information
  - `get_all_agents() -> List[AgentCard]` - Retrieves all currently discovered and registered agent cards
  - `get_agent_by_name(agent_name: str) -> Optional[AgentCard]` - Retrieves a specific agent card by name, returns None if not found

**Usage Examples:**
```python
from solace_agent_mesh.gateway.http_sse.services.agent_service import AgentService
from solace_agent_mesh.common.agent_registry import AgentRegistry

# Initialize with shared agent registry
agent_registry = AgentRegistry()  # Usually injected as shared instance
agent_service = AgentService(agent_registry=agent_registry)

# Get all available agents
all_agents = agent_service.get_all_agents()
print(f"Found {len(all_agents)} agents")

# Get specific agent by name
agent = agent_service.get_agent_by_name("data-processor")
if agent:
    print(f"Found agent: {agent.name}")
else:
    print("Agent not found")
```

### people_service.py
**Purpose:** Provides user search functionality via configured identity services
**Import:** `from solace_agent_mesh.gateway.http_sse.services.people_service import PeopleService`

**Classes:**
- `PeopleService(identity_service: Optional[BaseIdentityService])` - Service for searching and retrieving user information
  - `search_for_users(query: str, limit: int = 10) -> List[Dict[str, Any]]` - Asynchronously searches for users, returns empty list if no identity service configured or query too short

**Usage Examples:**
```python
import asyncio
from solace_agent_mesh.gateway.http_sse.services.people_service import PeopleService
from solace_agent_mesh.common.services.identity_service import BaseIdentityService

# Initialize with identity service
identity_service = SomeIdentityService()  # Your identity service implementation
people_service = PeopleService(identity_service=identity_service)

async def search_users():
    # Search for users
    users = await people_service.search_for_users("john", limit=5)
    for user in users:
        print(f"User: {user.get('name')} - {user.get('email')}")

# Initialize without identity service (graceful degradation)
people_service_no_id = PeopleService(identity_service=None)
# search_for_users will return empty list

asyncio.run(search_users())
```

### task_service.py
**Purpose:** Handles A2A task operations, specifically task cancellation using CoreA2AService and message publishing
**Import:** `from solace_agent_mesh.gateway.http_sse.services.task_service import TaskService, PublishFunc`

**Type Aliases:**
- `PublishFunc: Callable[[str, Dict, Optional[Dict]], None]` - Function type for publishing messages (topic, payload, user_properties)

**Classes:**
- `TaskService(core_a2a_service: CoreA2AService, publish_func: PublishFunc, namespace: str, gateway_id: str, sse_manager: SSEManager, task_context_map: Dict[str, Dict], task_context_lock: threading.Lock, app_name: str)` - Service for managing A2A task operations
  - `cancel_task(agent_name: str, task_id: str, client_id: str, user_id: str = "web_user") -> None` - Asynchronously cancels a task by publishing A2A CancelTaskRequest message

**Usage Examples:**
```python
import asyncio
import threading
from solace_agent_mesh.gateway.http_sse.services.task_service import TaskService, PublishFunc
from solace_agent_mesh.core_a2a.service import CoreA2AService
from solace_agent_mesh.gateway.http_sse.sse_manager import SSEManager

# Define publish function
def my_publish_func(topic: str, payload: dict, user_properties: dict = None):
    print(f"Publishing to {topic}: {payload}")
    # Your actual message publishing logic here

# Initialize dependencies
core_a2a_service = CoreA2AService()  # Your core A2A service
sse_manager = SSEManager()
task_context_map = {}
task_context_lock = threading.Lock()

# Create task service
task_service = TaskService(
    core_a2a_service=core_a2a_service,
    publish_func=my_publish_func,
    namespace="my-namespace",
    gateway_id="gateway-01",
    sse_manager=sse_manager,
    task_context_map=task_context_map,
    task_context_lock=task_context_lock,
    app_name="my-app"
)

async def cancel_task_example():
    # Cancel a task
    await task_service.cancel_task(
        agent_name="data-processor",
        task_id="task-123",
        client_id="client-456",
        user_id="user@example.com"
    )

asyncio.run(cancel_task_example())
```

# content_hash: 83ddf6b403dc50598ed550e4b3a5445f832b3956dad75f7a3fbbb7e6e5c6c115

================================================================================

## Section 28: solace_agent_mesh/llm.txt

**Source file:** `solace_agent_mesh/llm.txt`

Here is the comprehensive developer guide for the `src` directory.

## Quick Summary
The `src` directory serves as the main source code root for the Solace AI Connector, containing four primary subsystems that work together to enable comprehensive AI agent communication and hosting. The `agent` directory provides a complete framework for hosting Google ADK agents with A2A protocol support, the `common` directory offers foundational A2A protocol infrastructure and utilities, the `core_a2a` directory provides a reusable service layer for core A2A operations, and the `gateway` directory implements various gateway patterns for external platform integration. These components work together to create a distributed AI agent ecosystem with real-time communication, task delegation, and multi-platform integration capabilities.

## Files and Subdirectories Overview
- **Direct files:**
  - `__init__.py`: Empty package initialization file.
- **Subdirectories:**
  - `agent/`: Complete ADK agent hosting framework with A2A protocol integration and comprehensive tool library.
  - `common/`: Foundational A2A protocol infrastructure, type systems, and client/server implementations.
  - `core_a2a/`: Reusable service layer for core A2A interactions and agent registry operations.
  - `gateway/`: Gateway framework with HTTP/SSE, Slack, and Webhook implementations for external platform integration.

## Developer API Reference

### Direct Files

#### __init__.py
**Purpose:** Standard Python package initializer. It allows the `src` directory and its subdirectories to be treated as a package.
**Import:** `from src import agent, common, gateway`

**Classes/Functions/Constants:**
This file is empty and has no public interfaces.

### Subdirectory APIs

#### agent/
**Purpose:** Provides a complete framework for hosting Google ADK agents with A2A protocol support and a comprehensive, extensible tool library.
**Key Exports:** `SamAgentApp`, `SamAgentComponent`, `AppLlmAgent`, and a wide array of built-in tools for data analysis, web requests, multimedia processing, and inter-agent communication.
**Import Examples:**
```python
from src.solace_agent_mesh.agent.sac.app import SamAgentApp
from src.solace_agent_mesh.agent.sac.component import SamAgentComponent
from src.solace_agent_mesh.agent.adk.app_llm_agent import AppLlmAgent
from src.solace_agent_mesh.agent.tools.builtin_data_analysis_tools import query_data_with_sql
from src.solace_agent_mesh.agent.tools.peer_agent_tool import PeerAgentTool
from src.solace_agent_mesh.agent.tools.web_tools import web_request
from src.solace_agent_mesh.agent.tools.image_tools import create_image_from_description
```

#### common/
**Purpose:** Provides the foundational infrastructure for Agent-to-Agent (A2A) communication, including the core protocol, data types, message translation, and client/server implementations.
**Key Exports:** A2A protocol functions, Pydantic type definitions (`Message`, `Task`, `AgentCard`), `A2AClient` for interacting with agents, `A2AServer` for building agents, and various utilities.
**Import Examples:**
```python
from src.solace_agent_mesh.common.a2a_protocol import get_agent_request_topic
from src.solace_agent_mesh.common.types import Message, Task, AgentCard, TextPart
from src.solace_agent_mesh.common.client import A2AClient, A2ACardResolver
from src.solace_agent_mesh.common.server import A2AServer, InMemoryTaskManager
from src.solace_agent_mesh.common.agent_registry import AgentRegistry
from src.solace_agent_mesh.common.utils.embeds import resolve_embeds_in_string
```

#### core_a2a/
**Purpose:** Provides a reusable, decoupled service layer for core A2A interactions, handling task submission, cancellation, and agent discovery.
**Key Exports:** `CoreA2AService` for managing A2A protocol logic without being tied to a specific gateway or messaging implementation.
**Import Examples:**
```python
from src.solace_agent_mesh.core_a2a.service import CoreA2AService
```

#### gateway/
**Purpose:** Provides a framework and multiple implementations for building gateways that bridge external platforms (like web UIs, Slack, or webhooks) with the A2A messaging system.
**Key Exports:** `BaseGatewayApp` and `BaseGatewayComponent` for creating custom gateways, and concrete implementations like `WebUIBackendApp`, `SlackGatewayApp`, and `WebhookGatewayApp`.
**Import Examples:**
```python
from src.solace_agent_mesh.gateway.base.app import BaseGatewayApp
from src.solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp
from src.solace_agent_mesh.gateway.slack.app import SlackGatewayApp
from src.solace_agent_mesh.gateway.webhook.app import WebhookGatewayApp
from src.solace_agent_mesh.gateway.base.authorization_service import ConfigurableRbacAuthorizationService
```

## Complete Usage Guide
This guide demonstrates how the different subdirectories within `src` work together to build a complete, distributed AI agent system.

### 1. How to import and use functionality from subdirectories
The following examples show how to import and instantiate components from each major subdirectory.

```python
# 1. Import from the 'agent' directory to create an AI agent
from src.solace_agent_mesh.agent.sac.app import SamAgentApp

# 2. Import from the 'common' and 'core_a2a' directories for protocol infrastructure
from src.solace_agent_mesh.common.agent_registry import AgentRegistry
from src.solace_agent_mesh.common.types import AgentCard, AgentCapabilities, AgentSkill
from src.solace_agent_mesh.core_a2a.service import CoreA2AService

# 3. Import from the 'gateway' directory to create interfaces
from src.solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp
from src.solace_agent_mesh.gateway.slack.app import SlackGatewayApp
from src.solace_agent_mesh.gateway.webhook.app import WebhookGatewayApp

# 4. Import tools from the 'agent/tools' subdirectory
from src.solace_agent_mesh.agent.tools.peer_agent_tool import PeerAgentTool
from src.solace_agent_mesh.agent.tools.builtin_data_analysis_tools import query_data_with_sql
```

### 2. How different parts work together
This section shows a step-by-step process for building a system, illustrating the synergy between the components.

#### Step 1: Create an ADK-powered agent (`agent/`)
First, define and configure an agent. This agent will automatically be equipped with a rich set of tools and A2A communication capabilities.

```python
# File: my_system.py
from src.solace_agent_mesh.agent.sac.app import SamAgentApp

# Configure the agent with all capabilities
agent_config = {
    "name": "data-analyst-agent",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "agent_name": "data_analyst",
        "model": "gemini-1.5-pro",
        "instruction": "You are a data analysis expert with access to SQL, charting, web tools, and peer collaboration.",
        "agent_card": {
            "description": "AI agent for comprehensive data analysis and reporting",
            "capabilities": ["data_analysis", "web_research", "chart_generation", "peer_collaboration"]
        },
        "agent_card_publishing": {"interval_seconds": 30},
        "agent_discovery": {"enabled": True},
        "inter_agent_communication": {"allow_list": ["*"]}
    }
}

# Create the agent app (in a real scenario, this is run by the SAC framework)
agent_app = SamAgentApp(agent_config)
```

#### Step 2: Set Up A2A Protocol Infrastructure (`common/` and `core_a2a/`)
Next, set up the core services that manage agent discovery and task routing. This is often handled by the gateway components but can be used directly.

```python
# File: my_system.py (continued)
from src.solace_agent_mesh.common.agent_registry import AgentRegistry
from src.solace_agent_mesh.common.types import AgentCard, AgentCapabilities, AgentSkill
from src.solace_agent_mesh.core_a2a.service import CoreA2AService

# Initialize a shared agent registry
agent_registry = AgentRegistry()

# Create the core A2A service, which uses the registry
namespace = "myorg/ai-agents"
a2a_service = CoreA2AService(agent_registry, namespace)

# Manually register an agent's capabilities (this is usually done automatically by the agent itself)
data_analyst_card = AgentCard(
    name="data_analyst",
    display_name="Data Analyst",
    description="AI agent for data analysis",
    url=f"a2a://{namespace}/data_analyst",
    version="1.0.0",
    capabilities=AgentCapabilities(streaming=True, pushNotifications=True),
    skills=[AgentSkill(id="sql_analysis", name="SQL Data Analysis")]
)
a2a_service.process_discovery_message(data_analyst_card)
```

#### Step 3: Create Gateway Integrations (`gateway/`)
Create one or more gateways to expose the agent(s) to external platforms.

```python
# File: my_system.py (continued)
from src.solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp
from src.solace_agent_mesh.gateway.slack.app import SlackGatewayApp

# Web UI Gateway for browser-based interactions
webui_config = {
    "name": "web-gateway",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "gateway_id": "web-ui-gateway",
        "session_secret_key": "a-very-secret-key",
        "fastapi_host": "0.0.0.0",
        "fastapi_port": 8080,
        "artifact_service": {"type": "local_file", "base_path": "./artifacts"}
    }
}
webui_app = WebUIBackendApp(webui_config)

# Slack Gateway for team collaboration
slack_config = {
    "name": "slack-gateway",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "gateway_id": "slack-gateway",
        "slack_bot_token": "${SLACK_BOT_TOKEN}",
        "slack_app_token": "${SLACK_APP_TOKEN}",
        "default_agent_name": "data_analyst"
    }
}
slack_app = SlackGatewayApp(slack_config)
```

### 3. Common usage patterns

#### Pattern 1: Inter-Agent Communication
An agent can use the `PeerAgentTool` (from `agent/tools/`) to delegate tasks to other agents, leveraging the `common/` protocol infrastructure.

```python
# This code would run within an agent's tool execution context.
from src.solace_agent_mesh.agent.tools.peer_agent_tool import PeerAgentTool

async def analyze_and_delegate_report(component, tool_context):
    # Assume 'component' is the SamAgentComponent instance hosting the current agent.
    
    # Step 1: Perform local analysis (using another tool)
    # ... analysis_result = await query_data_with_sql(...) ...

    # Step 2: Delegate report generation to a specialist agent
    peer_tool = PeerAgentTool(
        target_agent_name="report_generator",
        host_component=component
    )
    
    report_result = await peer_tool.run_async(
        args={
            "task_description": "Generate a professional PDF report from this analysis",
            "analysis_data": "artifact://analysis_result.json",
            "report_format": "PDF"
        },
        tool_context=tool_context
    )
    
    return report_result
```

================================================================================

## Section 29: solace_agent_mesh/solace_agent_mesh_llm.txt

**Source file:** `solace_agent_mesh/solace_agent_mesh_llm.txt`

# DEVELOPER GUIDE: solace_agent_mesh

## Quick Summary
The `solace_agent_mesh` directory contains the core implementation of a distributed AI agent communication system built on the Solace event mesh. It provides a complete framework for hosting Google ADK (Agent Development Kit) agents with Agent-to-Agent (A2A) protocol support, enabling real-time communication, task delegation, and multi-platform integration.

The architecture consists of four main subsystems: `agent/` for hosting ADK agents with comprehensive tool libraries, `common/` for foundational A2A protocol infrastructure, `core_a2a/` for reusable service layers, and `gateway/` for external platform integration. These components work together to create a distributed AI agent ecosystem with capabilities for data analysis, multimedia processing, web integration, and inter-agent collaboration.

## Files and Subdirectories Overview
- **Direct files:**
  - `__init__.py`: Empty package initialization file
  - `llm.txt`: Comprehensive developer guide documentation
  - `llm_detail.txt`: Detailed concatenated documentation from all subdirectories
- **Subdirectories:**
  - `agent/`: Complete ADK agent hosting framework with A2A protocol integration and comprehensive tool library
  - `common/`: Foundational A2A protocol infrastructure, type systems, and client/server implementations
  - `core_a2a/`: Reusable service layer for core A2A interactions and agent registry operations
  - `gateway/`: Gateway framework with HTTP/SSE, Slack, and Webhook implementations for external platform integration

## Developer API Reference

### Direct Files

#### __init__.py
**Purpose:** Standard Python package initializer that allows the `solace_agent_mesh` directory to be treated as a package
**Import:** `import solace_agent_mesh`

**Classes/Functions/Constants:** [None - empty file]

#### llm.txt
**Purpose:** Contains comprehensive developer guide documentation for the entire system
**Import:** Not applicable - documentation file

**Classes/Functions/Constants:** [None - documentation file]

#### llm_detail.txt
**Purpose:** Contains detailed concatenated documentation from all subdirectories
**Import:** Not applicable - documentation file

**Classes/Functions/Constants:** [None - documentation file]

### Subdirectory APIs

#### agent/
**Purpose:** Provides a complete framework for hosting Google ADK agents with A2A protocol support and a comprehensive, extensible tool library
**Key Exports:** `SamAgentApp`, `SamAgentComponent`, `AppLlmAgent`, and a wide array of built-in tools for data analysis, web requests, multimedia processing, and inter-agent communication
**Import Examples:**
```python
from solace_agent_mesh.agent.sac.app import SamAgentApp
from solace_agent_mesh.agent.sac.component import SamAgentComponent
from solace_agent_mesh.agent.adk.app_llm_agent import AppLlmAgent
from solace_agent_mesh.agent.tools.builtin_data_analysis_tools import query_data_with_sql
from solace_agent_mesh.agent.tools.peer_agent_tool import PeerAgentTool
from solace_agent_mesh.agent.tools.web_tools import web_request
from solace_agent_mesh.agent.tools.image_tools import create_image_from_description
```

#### common/
**Purpose:** Provides the foundational infrastructure for Agent-to-Agent (A2A) communication, including the core protocol, data types, message translation, and client/server implementations
**Key Exports:** A2A protocol functions, Pydantic type definitions (`Message`, `Task`, `AgentCard`), `A2AClient` for interacting with agents, `A2AServer` for building agents, and various utilities
**Import Examples:**
```python
from solace_agent_mesh.common.a2a_protocol import get_agent_request_topic
from solace_agent_mesh.common.types import Message, Task, AgentCard, TextPart
from solace_agent_mesh.common.client import A2AClient, A2ACardResolver
from solace_agent_mesh.common.server import A2AServer, InMemoryTaskManager
from solace_agent_mesh.common.agent_registry import AgentRegistry
from solace_agent_mesh.common.utils.embeds import resolve_embeds_recursively_in_string
```

#### core_a2a/
**Purpose:** Provides a reusable, decoupled service layer for core A2A interactions, handling task submission, cancellation, and agent discovery
**Key Exports:** `CoreA2AService` for managing A2A protocol logic without being tied to a specific gateway or messaging implementation
**Import Examples:**
```python
from solace_agent_mesh.core_a2a.service import CoreA2AService
```

#### gateway/
**Purpose:** Provides a framework and multiple implementations for building gateways that bridge external platforms (like web UIs, Slack, or webhooks) with the A2A messaging system
**Key Exports:** `BaseGatewayApp` and `BaseGatewayComponent` for creating custom gateways, and concrete implementations like `WebUIBackendApp`, `SlackGatewayApp`, and `WebhookGatewayApp`
**Import Examples:**
```python
from solace_agent_mesh.gateway.base.app import BaseGatewayApp
from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp
from solace_agent_mesh.gateway.slack.app import SlackGatewayApp
from solace_agent_mesh.gateway.webhook.app import WebhookGatewayApp
```

## Complete Usage Guide

### 1. Setting Up a Complete AI Agent System

This example demonstrates how to create a comprehensive AI agent system with multiple components working together.

```python
# Step 1: Create an ADK-powered agent
from solace_agent_mesh.agent.sac.app import SamAgentApp

# Configure the agent with comprehensive capabilities
agent_config = {
    "name": "data-analyst-agent",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "agent_name": "data_analyst",
        "model": "gemini-1.5-pro",
        "instruction": "You are a data analysis expert with access to SQL, charting, web tools, and peer collaboration.",
        "tools": [
            {"tool_type": "builtin", "tool_name": "query_data_with_sql"},
            {"tool_type": "builtin", "tool_name": "create_chart"},
            {"tool_type": "builtin", "tool_name": "web_request"},
            {"tool_type": "builtin", "tool_name": "peer_agent_tool"}
        ],
        "agent_card": {
            "description": "AI agent for comprehensive data analysis and reporting",
            "capabilities": ["data_analysis", "web_research", "chart_generation", "peer_collaboration"]
        },
        "agent_card_publishing": {"interval_seconds": 30},
        "agent_discovery": {"enabled": True},
        "inter_agent_communication": {"allow_list": ["*"]}
    }
}

# Create the agent app
agent_app = SamAgentApp(agent_config)

# Step 2: Set up A2A protocol infrastructure
from solace_agent_mesh.common.agent_registry import AgentRegistry
from solace_agent_mesh.common.types import AgentCard, AgentCapabilities, AgentSkill
from solace_agent_mesh.core_a2a.service import CoreA2AService

# Initialize shared agent registry
agent_registry = AgentRegistry()

# Create core A2A service
namespace = "myorg/ai-agents"
a2a_service = CoreA2AService(agent_registry, namespace)

# Register agent capabilities
data_analyst_card = AgentCard(
    name="data_analyst",
    display_name="Data Analyst",
    description="AI agent for data analysis",
    url=f"a2a://{namespace}/data_analyst",
    version="1.0.0",
    capabilities=AgentCapabilities(streaming=True, pushNotifications=True),
    skills=[AgentSkill(id="sql_analysis", name="SQL Data Analysis")]
)
a2a_service.process_discovery_message(data_analyst_card)

# Step 3: Create gateway integrations
from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp
from solace_agent_mesh.gateway.slack.app import SlackGatewayApp

# Web UI Gateway for browser-based interactions
webui_config = {
    "name": "web-gateway",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "gateway_id": "web-ui-gateway",
        "session_secret_key": "a-very-secret-key",
        "fastapi_host": "0.0.0.0",
        "fastapi_port": 8080,
        "artifact_service": {"type": "local_file", "base_path": "./artifacts"}
    }
}
webui_app = WebUIBackendApp(webui_config)

# Slack Gateway for team collaboration
slack_config = {
    "name": "slack-gateway",
    "app_config": {
        "namespace": "myorg/ai-agents",
        "gateway_id": "slack-gateway",
        "slack_bot_token": "${SLACK_BOT_TOKEN}",
        "slack_app_token": "${SLACK_APP_TOKEN}",
        "default_agent_name": "data_analyst"
    }
}
slack_app = SlackGatewayApp(slack_config)
```

### 2. Inter-Agent Communication Pattern

```python
# This code would run within an agent's tool execution context
from solace_agent_mesh.agent.tools.peer_agent_tool import PeerAgentTool

async def analyze_and_delegate_report(component, tool_context):
    # Step 1: Perform local analysis using built-in tools
    from solace_agent_mesh.agent.tools.builtin_data_analysis_tools import query_data_with_sql
    
    analysis_result = await query_data_with_sql(
        sql_query="SELECT * FROM sales_data WHERE date >= '2024-01-01'",
        tool_context=tool_context
    )

    # Step 2: Delegate report generation to a specialist agent
    peer_tool = PeerAgentTool(
        target_agent_name="report_generator",
        host_component=component
    )
    
    report_result = await peer_tool.run_async(
        args={
            "task_description": "Generate a professional PDF report from this analysis",
            "analysis_data": "artifact://analysis_result.json",
            "report_format": "PDF"
        },
        tool_context=tool_context
    )
    
    return report_result
```

### 3. Building Custom Tools

```python
from solace_agent_mesh.agent.tools.registry import tool_registry
from solace_agent_mesh.agent.tools.tool_definition import BuiltinTool
from google.adk.tools import ToolContext

async def custom_database_query(
    query: str,
    database_name: str = "default",
    tool_context: ToolContext = None,
    tool_config: dict = None
) -> dict:
    """Execute a custom database query with enhanced features."""
    
    # Access the host component for shared resources
    host_component = tool_context._invocation_context.agent.host_component
    
    # Get database connection from agent state
    db_connection = host_component.get_agent_specific_state('db_connection')
    
    # Execute query with error handling
    try:
        result = await execute_query(db_connection, query, database_name)
        
        # Save results as an artifact
        from solace_agent_mesh.agent.utils.artifact_helpers import save_artifact_with_metadata
        import json
        from datetime import datetime, timezone
        
        artifact_result = await save_artifact_with_metadata(
            artifact_service=host_component.get_shared_artifact_service(),
            app_name=host_component.get_config()["app_name"],
            user_id=tool_context.user_id,
            session_id=tool_context.session_id,
            filename=f"query_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
            content_bytes=json.dumps(result).encode(),
            mime_type="application/json",
            metadata_dict={
                "query": query,
                "database": database_name,
                "tool": "custom_database_query"
            },
            timestamp=datetime.now(timezone.utc)
        )
        
        return {
            "status": "success",
            "rows_returned": len(result),
            "artifact_filename": artifact_result["filename"],
            "preview": result[:5] if len(result) > 5 else result
        }
        
    except Exception as e:
        return {
            "status": "error",
            "error_message": str(e)
        }

# Register the custom tool
custom_tool = BuiltinTool(
    name="custom_database_query",
    description="Execute custom database queries with enhanced features",
    function=custom_database_query,
    category="data_analysis"
)
tool_registry.register(custom_tool)
```

### 4. Working with Multimedia Tools

```python
from solace_agent_mesh.agent.tools.audio_tools import text_to_speech, multi_speaker_text_to_speech
from solace_agent_mesh.agent.tools.image_tools import create_image_from_description, describe_image

async def multimedia_workflow(tool_context):
    # Generate speech from text
    tts_result = await text_to_speech(
        text="Welcome to our AI-powered presentation system!",
        output_filename="intro.mp3",
        gender="female",
        tone="professional",
        language="en-US",
        tool_context=tool_context
    )
    
    # Create a multi-speaker dialogue
    conversation_result = await multi_speaker_text_to_speech(
        conversation_text="""
        Presenter: Today we'll discuss our quarterly results.
        Analyst: The data shows significant growth in Q4.
        Presenter: Let's dive into the details.
        """,
        speaker_configs=[
            {"name": "Presenter", "gender": "female", "tone": "professional"},
            {"name": "Analyst", "gender": "male", "tone": "analytical"}
        ],
        output_filename="dialogue.mp3",
        tool_context=tool_context
    )
    
    # Generate supporting visuals
    chart_image = await create_image_from_description(
        image_description="A professional bar chart showing quarterly growth with blue and green colors",
        output_filename="quarterly_chart.png",
        tool_context=tool_context
    )
    
    return {
        "intro_audio": tts_result,
        "dialogue_audio": conversation_result,
        "chart_image": chart_image
    }
```

### 5. Client-Side Integration

```python
import asyncio
from solace_agent_mesh.common.client import A2AClient, A2ACardResolver
from solace_agent_mesh.common.types import Message, TextPart

async def client_integration_example():
    # Discover available agents
    resolver = A2ACardResolver("https://agents.myorg.com")
    agent_card = resolver.get_agent_card()
    
    # Create client for agent interaction
    client = A2AClient(agent_card=agent_card)
    
    # Submit a complex task with file upload
    task_payload = {
        "message": {
            "role": "user",
            "parts": [
                {"type": "text", "text": "Please analyze this sales data and create a report"},
                {"type": "file", "file": {"name": "sales_data.csv", "uri": "file://./sales_data.csv"}}
            ]
        }
    }
    
    # Stream the response
    print("Submitting task and streaming response...")
    async for response in client.send_task_streaming(task_payload):
        if hasattr(response.result, 'text_delta'):
            print(response.result.text_delta,

# content_hash: 59d897c57c8a54f55bda0548509f1b6bc7af3a1afde93f59a8525c0eb98659e1

================================================================================

