# DEVELOPER GUIDE: gateway

## Quick Summary
The `gateway` directory provides a comprehensive framework for building gateway implementations that bridge external platforms with the Solace AI Connector's A2A (Agent-to-Agent) messaging system. The architecture consists of a foundational base framework and three specialized gateway implementations: HTTP/SSE for web interfaces, Slack for team collaboration, and Webhook for external system integration. All gateways share common patterns for authentication, message translation, and real-time communication while providing platform-specific features.

## Files and Subdirectories Overview
- **Direct files:**
  - `__init__.py`: Marks the directory as a Python package.
- **Subdirectories:**
  - `base/`: Foundational classes and utilities for building all gateway implementations.
  - `http_sse/`: A complete HTTP/SSE gateway with a FastAPI web server for real-time web UI backends.
  - `slack/`: A gateway for integrating with the Slack collaboration platform.
  - `webhook/`: A universal webhook gateway for receiving HTTP requests from external systems.

## Developer API Reference

### Direct Files

#### __init__.py
**Purpose:** Initializes the `gateway` module, making it a Python package.
**Import:** `from solace_agent_mesh.gateway import ...`

**Classes/Functions/Constants:**
This file is empty and contains no direct exports.

### Subdirectory APIs

#### base/
**Purpose:** Provides the foundational, abstract classes for building all Gateway implementations. It establishes a framework for configuration, A2A message handling, and managing the lifecycle of requests from external platforms.
**Key Exports:** `BaseGatewayApp`, `BaseGatewayComponent`, `TaskContextManager`
**Import Examples:**
```python
from solace_agent_mesh.gateway.base.app import BaseGatewayApp
from solace_agent_mesh.gateway.base.component import BaseGatewayComponent
from solace_agent_mesh.gateway.base.task_context import TaskContextManager
```

#### http_sse/
**Purpose:** Implements a complete HTTP/SSE gateway to serve a web-based user interface, bridging web protocols with the backend A2A messaging fabric.
**Key Exports:** `WebUIBackendApp`, `WebUIBackendComponent`, `SSEManager`, `SessionManager`, and various dependency injectors.
**Import Examples:**
```python
from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp
from solace_agent_mesh.gateway.http_sse.component import WebUIBackendComponent
from solace_agent_mesh.gateway.http_sse.sse_manager import SSEManager
from solace_agent_mesh.gateway.http_sse.session_manager import SessionManager
from solace_agent_mesh.gateway.http_sse.dependencies import get_agent_service, get_task_service, get_user_id
```

#### slack/
**Purpose:** Provides a gateway for integrating the Solace AI Connector with the Slack collaboration platform, enabling bot interactions within Slack channels and threads.
**Key Exports:** `SlackGatewayApp`, `SlackGatewayComponent`, and various utility functions.
**Import Examples:**
```python
from solace_agent_mesh.gateway.slack.app import SlackGatewayApp
from solace_agent_mesh.gateway.slack.component import SlackGatewayComponent
from solace_agent_mesh.gateway.slack.utils import generate_a2a_session_id, send_slack_message, correct_slack_markdown
```

#### webhook/
**Purpose:** Provides a universal webhook gateway for receiving HTTP requests from external systems and triggering A2A tasks. It is highly configurable for different authentication methods, payload formats, and target agents.
**Key Exports:** `WebhookGatewayApp`, `WebhookGatewayComponent`
**Import Examples:**
```python
from solace_agent_mesh.gateway.webhook.app import WebhookGatewayApp
from solace_agent_mesh.gateway.webhook.component import WebhookGatewayComponent
from solace_agent_mesh.gateway.webhook.dependencies import get_sac_component
```

## Complete Usage Guide

### 1. Creating a Custom Gateway

This example shows how to use the `base` module to build a new gateway for a hypothetical external platform.

```python
# my_gateway/app.py
from typing import Type, List, Dict, Any
from solace_agent_mesh.gateway.base.app import BaseGatewayApp
from .component import MyGatewayComponent

class MyGatewayApp(BaseGatewayApp):
    """Defines the application and its configuration for My Platform."""
    
    SPECIFIC_APP_SCHEMA_PARAMS: List[Dict[str, Any]] = [
        {
            "name": "my_platform_api_key",
            "required": True,
            "type": "string",
            "description": "API key for connecting to My Platform."
        },
        {
            "name": "my_platform_webhook_url",
            "required": False,
            "type": "string",
            "description": "Webhook URL for receiving events from My Platform."
        }
    ]

    def _get_gateway_component_class(self) -> Type[MyGatewayComponent]:
        return MyGatewayComponent

# my_gateway/component.py
from typing import Any, Dict, List, Optional, Tuple, Union
from solace_agent_mesh.gateway.base.component import BaseGatewayComponent
from a2a.types import Part as A2APart, TextPart, Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, JSONRPCError

class MyGatewayComponent(BaseGatewayComponent):
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.api_key = self.get_config("my_platform_api_key")
        self.webhook_url = self.get_config("my_platform_webhook_url")
        self.server = None
    
    async def _extract_initial_claims(self, external_event_data: Any) -> Optional[Dict[str, Any]]:
        """Extract user identity from platform-specific event."""
        if hasattr(external_event_data, 'user_id'):
            return {"id": external_event_data.user_id, "source": "my_platform"}
        return None
    
    def _start_listener(self) -> None:
        """Start your platform listener (web server, etc.)."""
        # Example: Start a web server to receive webhooks
        from fastapi import FastAPI
        import uvicorn
        
        app = FastAPI()
        
        @app.post("/webhook")
        async def handle_webhook(request_data: dict):
            await self.handle_external_event(request_data)
            return {"status": "ok"}
        
        self.server = uvicorn.Server(
            uvicorn.Config(app, host="0.0.0.0", port=8080)
        )
        # Start server in background thread
        import threading
        self.server_thread = threading.Thread(target=self.server.run)
        self.server_thread.start()
    
    def _stop_listener(self) -> None:
        """Stop your platform listener."""
        if self.server:
            self.server.should_exit = True
    
    def _translate_external_input(self, external_event: Any) -> Tuple[str, List[A2APart], Dict[str, Any]]:
        """Convert external event to A2A format."""
        target_agent = external_event.get("target_agent", "default-agent")
        message_text = external_event.get("message", "Hello")
        
        a2a_parts = [TextPart(text=message_text)]
        
        context = {
            "platform": "my_platform",
            "user_id_for_artifacts": external_event.get("user_id"),
            "a2a_session_id": f"my-platform-{external_event.get('session_id', 'default')}"
        }
        
        return target_agent, a2a_parts, context
    
    async def _send_update_to_external(self, external_request_context: Dict[str, Any], 
                                     event_data: Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent], 
                                     is_final_chunk_of_update: bool) -> None:
        """Send streaming update back to external platform."""
        # Extract text from event and send to your platform
        if hasattr(event_data, 'text_delta'):
            await self.send_to_platform(
                user_id=external_request_context["user_id_for_artifacts"],
                message=event_data.text_delta,
                is_partial=not is_final_chunk_of_update
            )
    
    async def _send_final_response_to_external(self, external_request_context: Dict[str, Any], 
                                             task_data: Task) -> None:
        """Send final response back to external platform."""
        final_text = ""
        for part in task_data.result.parts:
            if hasattr(part, 'text'):
                final_text += part.text
        
        await self.send_to_platform(
            user_id=external_request_context["user_id_for_artifacts"],
            message=final_text,
            is_final=True
        )
    
    async def _send_error_to_external(self, external_request_context: Dict[str, Any], 
                                     error_data: JSONRPCError) -> None:
        """Send error back to external platform."""
        await self.send_to_platform(
            user_id=external_request_context["user_id_for_artifacts"],
            message=f"Error: {error_data.message}",
            is_error=True
        )
    
    async def send_to_platform(self, user_id: str, message: str, **kwargs):
        """Helper method to send messages to your platform."""
        # Implement your platform-specific sending logic here
        print(f"Sending to {user_id}: {message}")

# Usage
if __name__ == "__main__":
    app_config = {
        "namespace": "myorg/prod",
        "gateway_id": "my-gateway-01",
        "my_platform_api_key": "secret-key-here",
        "my_platform_webhook_url": "https://myplatform.com/webhook"
    }
    
    gateway_app = MyGatewayApp(app_info=app_config)
    gateway_app.run()
```

### 2. Using the HTTP/SSE Gateway

```python
from solace_agent_mesh.gateway.http_sse.app import WebUIBackendApp

# Configuration for the web UI gateway
app_config = {
    "name": "my-webui-gateway",
    "session_secret_key": "your-secret-key-here",
    "fastapi_host": "0.0.0.0",
    "fastapi_port": 8000,
    "namespace": "myorg/prod",
    "gateway_id": "webui-gateway-01",
    "cors_allowed_origins": ["http://localhost:3000"],
    "frontend_welcome_message": "Welcome to my A2A system!",
    "frontend_bot_name": "My Assistant",
    "frontend_enable_file_upload": True,
    "frontend_enable_agent_selection": True
}

# Initialize and run the web UI gateway
webui_app = WebUIBackendApp(app_info=app_config)
webui_app.run()

# The gateway will start a FastAPI server with endpoints like:
# GET /api/agents - List available agents
# POST /api/tasks - Submit new tasks
# GET /api/tasks/{task_id}/sse - Stream task updates via SSE
# POST /api/artifacts - Upload files
```

### 3. Creating Custom API Endpoints for HTTP/SSE Gateway

```python
from fastapi import APIRouter, Depends, Request
from solace_agent_mesh.gateway.http_sse.dependencies import (
    get_agent_registry,
    get_user_id,
    get_publish_a2a_func,
    get_sse_manager,
    get_session_manager
)

# Create a custom router
custom_router = APIRouter(prefix="/api/custom")

@custom_router.get("/my-agents")
async def get_my_agents(
    user_id: str = Depends(get_user_id),
    agent_registry = Depends(get_agent_registry)
):
    """Get agents filtered by user permissions."""
    all_agents = agent_registry.get_all_agents()
    # Filter agents based on user permissions
    user_agents = [agent for agent in all_agents if user_can_access_agent(user_id, agent)]
    return {"agents": user_agents, "user_id": user_id}

@custom_router.post("/broadcast-message")
async def broadcast_message(
    message: str,
    publish_func = Depends(get_publish_a2a_func),
    user_id: str = Depends(get_user_id)
):
    """Broadcast a message to all agents."""
    publish_func(
        topic="/myorg/prod/a2a/v1/broadcast",
        payload={
            "method": "broadcast/message",
            "params": {"message": message, "from_user": user_id}
        },
        user_properties={"clientId": user_id}
    )
    return {"status": "broadcasted", "message": message}

# Add the router to your FastAPI app
# This would typically be done in the main.py or during app setup
```

### 4. Using the Slack Gateway

```python
from solace_agent_mesh.gateway.slack.app import SlackGatewayApp

# Configuration for Slack gateway
slack_config = {
    "name": "my-slack-gateway",
    "namespace": "myorg/prod",
    "gateway_id": "slack-gateway-01",
    "slack_bot_token": "xoxb-your-bot-token",
    "slack_app_token": "xapp-your-app-token",
    "slack_signing_secret": "your-signing-secret",
    "default_agent_name": "assistant",
    "enable_socket_mode": True,
    "enable_file_upload": True
}

# Initialize and run the Slack gateway
slack_app = SlackGatewayApp(app_info=slack_config)
slack_app.run()

# The gateway will:
# - Connect to Slack via Socket Mode or HTTP
# - Listen for mentions and direct messages
# - Translate Slack messages to A2A format
# - Send responses back to Slack channels/threads
```

### 5. Using the Webhook Gateway

```python
from solace_agent_mesh.gateway.webhook.app import WebhookGatewayApp

# Configuration for webhook gateway
webhook_config = {
    "name": "my-webhook-gateway",
    "namespace": "myorg/prod",
    "gateway_id": "webhook-gateway-01",
    "webhook_host": "0.0.0.0",
    "webhook_port": 9000,
    "webhook_path": "/webhook",
    "target_agent_name": "data-processor",
    "auth_method": "header",
    "auth_header_name": "X-API-Key",
    "auth_header_value": "secret-webhook-key",
    "payload_format": "json",
    "user_id_extraction": {
        "method": "json_path",
        "path": "$.user.id"
    }
}

# Initialize and run the webhook gateway
webhook_app = WebhookGatewayApp(app_info=webhook_config)
webhook_app.run()

# The gateway will:
# - Start an HTTP server on the specified host/port
# - Authenticate incoming requests
# - Extract user identity from payloads
# - Convert webhook data to A2A messages
# - Send to the specified target agent
```

### 6. Working with Task Context Management

```python
from solace_agent_mesh.gateway.base.task_context import TaskContextManager

# Initialize the task context manager (usually done by BaseGat

# content_hash: 8ff3c690b39ed85edd13c1e9f2965e9da9050eeeea66823dbbad322f600f0d09
