# DEVELOPER GUIDE: base

## Quick Summary
The `base` directory provides foundational abstract classes for building Gateway implementations within the Solace AI Connector. It establishes a framework for handling common gateway tasks such as application configuration, Solace broker integration, A2A (Agent-to-Agent) message protocol handling, and managing the lifecycle of requests from external platforms. Developers should subclass `BaseGatewayApp` and `BaseGatewayComponent` to create new gateways.

## Files Overview
- `__init__.py` - Marks the directory as a Python package
- `app.py` - Contains the base application class that handles configuration, schema merging, and broker setup
- `component.py` - Contains the core logic class for processing A2A messages and integrating with external platforms
- `task_context.py` - Provides a thread-safe manager for mapping A2A task IDs to their original request context

## Developer API Reference

### __init__.py
**Purpose:** Initializes the `gateway.base` Python package
**Import:** `from solace_agent_mesh.gateway.base import ...`

---

### app.py
**Purpose:** Provides the base application class for gateway implementations with automated configuration schema merging, Solace broker setup, and component instantiation
**Import:** `from solace_agent_mesh.gateway.base.app import BaseGatewayApp, BaseGatewayComponent`

**Classes:**
- `BaseGatewayComponent(ComponentBase)` - Base marker class for gateway components
- `BaseGatewayApp(app_info: Dict[str, Any], **kwargs)` - Main application class to be subclassed for new gateways
  - `_get_gateway_component_class(self) -> Type[BaseGatewayComponent]` - **[Abstract Method]** Must return the specific gateway component class
  - `namespace: str` - Absolute topic prefix for A2A communication (e.g., 'myorg/dev')
  - `gateway_id: str` - Unique ID for this gateway instance (auto-generated if not provided)
  - `artifact_service_config: Dict` - Configuration for the shared ADK Artifact Service
  - `enable_embed_resolution: bool` - Flag to enable/disable late-stage 'artifact_content' embed resolution
  - `gateway_max_artifact_resolve_size_bytes: int` - Maximum size for resolving artifacts (default: 104857600)
  - `gateway_recursive_embed_depth: int` - Maximum depth for recursive embed resolution (default: 12)

**Constants/Variables:**
- `BASE_GATEWAY_APP_SCHEMA: Dict[str, List[Dict[str, Any]]]` - Base configuration schema automatically merged with subclass parameters
- `SPECIFIC_APP_SCHEMA_PARAMS_ATTRIBUTE_NAME: str` - Class attribute name ("SPECIFIC_APP_SCHEMA_PARAMS") for subclass-specific config parameters

**Usage Examples:**
```python
from typing import Type, List, Dict, Any
from solace_agent_mesh.gateway.base.app import BaseGatewayApp
from .component import MyGatewayComponent

class MyGatewayApp(BaseGatewayApp):
    """Custom gateway application for My Platform."""
    
    # Define additional configuration parameters
    SPECIFIC_APP_SCHEMA_PARAMS: List[Dict[str, Any]] = [
        {
            "name": "my_platform_api_key",
            "required": True,
            "type": "string",
            "description": "API key for connecting to My Platform."
        }
    ]

    def _get_gateway_component_class(self) -> Type[MyGatewayComponent]:
        return MyGatewayComponent

# Usage in YAML config:
# app_config:
#   namespace: "myorg/prod"
#   gateway_id: "my-gateway-instance-01"
#   artifact_service:
#     type: "local_file"
#     base_path: "/data/artifacts"
#   my_platform_api_key: "secret-key-here"
```

---

### component.py
**Purpose:** Provides the abstract base class for gateway components containing core A2A protocol logic, service management, and external platform integration interface
**Import:** `from solace_agent_mesh.gateway.base.component import BaseGatewayComponent`

**Classes:**
- `BaseGatewayComponent(**kwargs: Any)` - Abstract base class for gateway components
  - **Public Methods:**
    - `get_config(self, key: str, default: Any = None) -> Any` - Retrieves configuration from nested app_config or component_config
    - `publish_a2a_message(self, topic: str, payload: Dict, user_properties: Optional[Dict] = None) -> None` - Publishes A2A messages to Solace broker
    - `authenticate_and_enrich_user(self, external_event_data: Any) -> Optional[Dict[str, Any]]` - Orchestrates user authentication and identity enrichment
    - `submit_a2a_task(self, target_agent_name: str, a2a_parts: List[ContentPart], external_request_context: Dict[str, Any], user_identity: Any, is_streaming: bool = True, api_version: str = "v2") -> str` - Submits task to target agent, returns task_id
    - `run(self) -> None` - Starts component's async operations and external platform listener
    - `cleanup(self) -> None` - Cleans up resources and stops background threads
  - **Abstract Methods (Must be Implemented):**
    - `_extract_initial_claims(self, external_event_data: Any) -> Optional[Dict[str, Any]]` - Extract identity claims from platform event (must return dict with 'id' key)
    - `_start_listener(self) -> None` - Start external platform listener (e.g., web server, WebSocket)
    - `_stop_listener(self) -> None` - Stop external platform listener
    - `_translate_external_input(self, external_event: Any) -> Tuple[str, List[ContentPart], Dict[str, Any]]` - Convert external event to A2A format: (target_agent_name, a2a_parts, context)
    - `_send_update_to_external(self, external_request_context: Dict[str, Any], event_data: Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent], is_final_chunk_of_update: bool) -> None` - Send streaming update to external platform
    - `_send_final_response_to_external(self, external_request_context: Dict[str, Any], task_data: Task) -> None` - Send final response to external platform
    - `_send_error_to_external(self, external_request_context: Dict[str, Any], error_data: JSONRPCError) -> None` - Send error to external platform
  - **Properties:**
    - `namespace: str` - A2A communication namespace
    - `gateway_id: str` - Unique gateway instance ID
    - `agent_registry: AgentRegistry` - Registry for discovered agents
    - `core_a2a_service: CoreA2AService` - Core A2A protocol service
    - `shared_artifact_service: Optional[BaseArtifactService]` - Artifact service instance
    - `task_context_manager: TaskContextManager` - Thread-safe task context storage
    - `identity_service: Optional[BaseIdentityService]` - Identity enrichment service

**Usage Examples:**
```python
from typing import Any, Dict, List, Optional, Tuple, Union
from solace_agent_mesh.gateway.base.component import BaseGatewayComponent
from solace_agent_mesh.common.a2a.types import ContentPart
from a2a.types import TextPart, Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, JSONRPCError

class MyGatewayComponent(BaseGatewayComponent):
    
    async def _extract_initial_claims(self, external_event_data: Any) -> Optional[Dict[str, Any]]:
        """Extract user identity from platform-specific event."""
        # Example for HTTP request
        if hasattr(external_event_data, 'headers'):
            user_id = external_event_data.headers.get('X-User-ID')
            if user_id:
                return {"id": user_id, "source": "http_header"}
        return None
    
    def _start_listener(self) -> None:
        """Start your platform listener (web server, etc.)."""
        # Example: Start FastAPI server, WebSocket connection, etc.
        pass
    
    def _stop_listener(self) -> None:
        """Stop your platform listener."""
        pass
    
    async def _translate_external_input(self, external_event: Any) -> Tuple[str, List[ContentPart], Dict[str, Any]]:
        """Convert external event to A2A format."""
        # Example translation
        target_agent = "my-agent"
        message_text = getattr(external_event, 'message', 'Hello')
        a2a_parts = [TextPart(text=message_text)]
        context = {
            "platform": "my_platform",
            "user_id_for_artifacts": "user123",
            "a2a_session_id": "session456"
        }
        return target_agent, a2a_parts, context
    
    async def _send_update_to_external(self, external_request_context: Dict[str, Any], 
                                     event_data: Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent], 
                                     is_final_chunk_of_update: bool) -> None:
        """Send streaming update back to external platform."""
        # Extract text from event and send to your platform
        pass
    
    async def _send_final_response_to_external(self, external_request_context: Dict[str, Any], 
                                             task_data: Task) -> None:
        """Send final response back to external platform."""
        # Extract final result and send to your platform
        pass
    
    async def _send_error_to_external(self, external_request_context: Dict[str, Any], 
                                     error_data: JSONRPCError) -> None:
        """Send error back to external platform."""
        # Send error message to your platform
        pass

# Usage in your handler:
async def handle_external_request(request):
    # Authenticate user
    user_identity = await component.authenticate_and_enrich_user(request)
    if not user_identity:
        return "Authentication failed"
    
    # Translate request
    target_agent, a2a_parts, context = await component._translate_external_input(request)
    
    # Submit to A2A system
    task_id = await component.submit_a2a_task(
        target_agent_name=target_agent,
        a2a_parts=a2a_parts,
        external_request_context=context,
        user_identity=user_identity,
        is_streaming=True
    )
    
    return f"Task submitted: {task_id}"
```

---

### task_context.py
**Purpose:** Provides thread-safe storage for mapping A2A task IDs to their original external request context
**Import:** `from solace_agent_mesh.gateway.base.task_context import TaskContextManager`

**Classes:**
- `TaskContextManager()` - Thread-safe context storage manager
  - `store_context(self, task_id: str, context_data: Dict[str, Any]) -> None` - Store context for a task ID
  - `get_context(self, task_id: str) -> Optional[Dict[str, Any]]` - Retrieve context for a task ID
  - `remove_context(self, task_id: str) -> Optional[Dict[str, Any]]` - Remove and return context for a task ID
  - `clear_all_contexts_for_testing(self) -> None` - Clear all contexts (testing only)

**Usage Examples:**
```python
from solace_agent_mesh.gateway.base.task_context import TaskContextManager

# Initialize manager
context_manager = TaskContextManager()

# Store context when submitting task
task_id = "gdk-task-abc123"
context = {
    "platform": "slack",
    "channel_id": "C1234567890",
    "thread_ts": "1234567890.123456",
    "user_identity": {"id": "user123", "name": "John Doe"}
}
context_manager.store_context(task_id, context)

# Retrieve context when processing response
retrieved_context = context_manager.get_context(task_id)
if retrieved_context:
    channel_id = retrieved_context["channel_id"]
    # Send response back to Slack channel

# Clean up when task is complete
context_manager.remove_context(task_id)
```

# content_hash: 016addd094bc383fc1606b1540887c181ef630740fff8f4d801ff03d91e1cbd4
