# DEVELOPER GUIDE for the directory: sac

## Quick Summary
The `sac` (Solace AI Connector) directory provides the core implementation for hosting a Google ADK (Agent Development Kit) agent within the Solace AI Connector framework. It acts as a bridge, enabling ADK agents to communicate using the A2A (Agent-to-Agent) protocol over Solace messaging. This allows for the creation of distributed, collaborative agent systems where agents can delegate tasks, share information, and work together to solve complex problems.

## Files Overview
- `__init__.py` - Empty package marker file
- `app.py` - Custom SAC App class that automatically configures Solace subscriptions and broker settings for A2A communication
- `component.py` - Main SAC Component that hosts the ADK agent, manages its lifecycle, and handles all A2A protocol messaging
- `patch_adk.py` - Runtime patches for the Google ADK library to enhance or correct its behavior
- `task_execution_context.py` - State management class that encapsulates all runtime information for a single, in-flight A2A task

## Developer API Reference

### app.py
**Purpose:** Provides a custom SAC App class that simplifies the configuration of an A2A agent
**Import:** `from solace_agent_mesh.agent.sac.app import SamAgentApp`

**Classes:**
- `SamAgentApp(app_info: Dict[str, Any], **kwargs)` - Custom App class for SAM Agent Host with namespace prefixing and automatic subscription generation
  - `app_schema: Dict` - Class attribute defining comprehensive configuration schema for agent host validation

**Constants/Variables:**
- `info: Dict[str, str]` - Metadata dictionary about the SamAgentApp class for SAC framework discovery

**Usage Examples:**
```python
# SamAgentApp is typically instantiated by the SAC framework from YAML config
# Example agent-config.yaml:
# app:
#   class_name: solace_agent_mesh.agent.sac.app.SamAgentApp
#   app_config:
#     namespace: "my-org/production"
#     agent_name: "customer-support-agent"
#     model: "gemini-1.5-pro-latest"
#     tools:
#       - tool_type: "builtin"
#         tool_name: "file_search"
#     agent_card:
#       description: "An agent that can answer questions about customer accounts."
#     agent_card_publishing:
#       interval_seconds: 60
#     session_service:
#       type: "memory"
```

### component.py
**Purpose:** Core component that hosts a Google ADK agent and bridges communication to A2A protocol
**Import:** `from solace_agent_mesh.agent.sac.component import SamAgentComponent`

**Classes:**
- `SamAgentComponent(**kwargs)` - Solace AI Connector component that hosts a Google ADK agent
  - `process_event(event: Event) -> None` - Main entry point for all SAC framework events
  - `handle_timer_event(timer_data: Dict[str, Any]) -> None` - Handles scheduled timer events for agent card publishing
  - `handle_cache_expiry_event(cache_data: Dict[str, Any]) -> None` - Handles cache expiry events for peer agent timeouts
  - `finalize_task_success(a2a_context: Dict) -> None` - Async method to finalize successful task completion
  - `finalize_task_canceled(a2a_context: Dict) -> None` - Finalizes task as CANCELED
  - `finalize_task_error(exception: Exception, a2a_context: Dict) -> None` - Async method to finalize failed tasks
  - `cleanup() -> None` - Cleans up resources on component shutdown
  - `set_agent_specific_state(key: str, value: Any) -> None` - Sets key-value pair in agent state dictionary
  - `get_agent_specific_state(key: str, default: Optional[Any] = None) -> Any` - Retrieves value from agent state
  - `get_async_loop() -> Optional[asyncio.AbstractEventLoop]` - Returns dedicated asyncio event loop
  - `set_agent_system_instruction_string(instruction_string: str) -> None` - Sets static system prompt injection
  - `set_agent_system_instruction_callback(callback_function: Callable) -> None` - Sets dynamic system prompt callback
  - `get_gateway_id() -> str` - Returns unique identifier for agent host instance
  - `submit_a2a_task(target_agent_name: str, a2a_message: A2AMessage, user_id: str, user_config: Dict[str, Any], sub_task_id: str) -> str` - Submits task to peer agent
  - `get_agent_context() -> Dict[str, Any]` - Returns agent context for middleware interactions

**Constants/Variables:**
- `info: Dict` - Metadata dictionary for SAC framework
- `CORRELATION_DATA_PREFIX: str` - Prefix for cache keys when tracking peer requests
- `HOST_COMPONENT_VERSION: str` - Version string of the host component

**Usage Examples:**
```python
# Custom initialization function example
from solace_agent_mesh.agent.sac.component import SamAgentComponent

def initialize_my_agent(host_component: SamAgentComponent, config: dict):
    """Custom initialization function for the agent."""
    # Store database connection in agent state
    db_connection = create_database_connection(config.get('db_url'))
    host_component.set_agent_specific_state('db_connection', db_connection)
    
    # Set custom system instruction
    host_component.set_agent_system_instruction_string(
        "You are a specialized customer service agent with access to our database."
    )

# Tool accessing agent state
def my_custom_tool(host_component: SamAgentComponent, query: str) -> str:
    """Tool that uses stored database connection."""
    db_connection = host_component.get_agent_specific_state('db_connection')
    if db_connection:
        return db_connection.execute_query(query)
    return "Database not available"

# Scheduling async work from synchronous code
def schedule_background_task(host_component: SamAgentComponent):
    """Schedule async work on the component's event loop."""
    loop = host_component.get_async_loop()
    if loop:
        asyncio.run_coroutine_threadsafe(my_async_task(), loop)
```

### patch_adk.py
**Purpose:** Contains runtime patches for the Google ADK library to enhance behavior
**Import:** `from solace_agent_mesh.agent.sac.patch_adk import patch_adk`

**Functions:**
- `patch_adk() -> None` - Applies all necessary patches to the ADK library

**Usage Examples:**
```python
from solace_agent_mesh.agent.sac.patch_adk import patch_adk

# Apply patches before using ADK
patch_adk()
```

### task_execution_context.py
**Purpose:** State management class for single, in-flight agent tasks
**Import:** `from solace_agent_mesh.agent.sac.task_execution_context import TaskExecutionContext`

**Classes:**
- `TaskExecutionContext(task_id: str, a2a_context: Dict[str, Any])` - Encapsulates runtime state for a single agent task
  - `cancel() -> None` - Signals that the task should be cancelled
  - `is_cancelled() -> bool` - Checks if cancellation event has been set
  - `append_to_streaming_buffer(text: str) -> None` - Appends text to streaming buffer
  - `flush_streaming_buffer() -> str` - Returns and clears streaming buffer content
  - `get_streaming_buffer_content() -> str` - Returns buffer content without clearing
  - `append_to_run_based_buffer(text: str) -> None` - Appends text to run-based response buffer
  - `register_peer_sub_task(sub_task_id: str, correlation_data: Dict[str, Any]) -> None` - Adds peer sub-task tracking
  - `claim_sub_task_completion(sub_task_id: str) -> Optional[Dict[str, Any]]` - Atomically retrieves and removes sub-task data
  - `register_parallel_call_sent(invocation_id: str) -> None` - Registers new parallel tool call
  - `handle_peer_timeout(sub_task_id: str, correlation_data: Dict, timeout_sec: int, invocation_id: str) -> bool` - Handles peer timeout
  - `record_parallel_result(result: Dict, invocation_id: str) -> bool` - Records parallel tool call result
  - `clear_parallel_invocation_state(invocation_id: str) -> None` - Removes completed invocation state
  - `register_produced_artifact(filename: str, version: int) -> None` - Tracks newly created artifacts
  - `add_artifact_signal(signal: Dict[str, Any]) -> None` - Adds artifact return signal
  - `get_and_clear_artifact_signals() -> List[Dict[str, Any]]` - Retrieves and clears artifact signals
  - `set_event_loop(loop: asyncio.AbstractEventLoop) -> None` - Stores event loop reference
  - `get_event_loop() -> Optional[asyncio.AbstractEventLoop]` - Retrieves stored event loop

**Usage Examples:**
```python
from solace_agent_mesh.agent.sac.task_execution_context import TaskExecutionContext

# Create task context
a2a_context = {
    "logical_task_id": "task-123",
    "user_id": "user-456",
    "session_id": "session-789"
}
task_context = TaskExecutionContext("task-123", a2a_context)

# Use streaming buffer
task_context.append_to_streaming_buffer("Hello ")
task_context.append_to_streaming_buffer("world!")
content = task_context.flush_streaming_buffer()  # Returns "Hello world!"

# Track peer sub-tasks
correlation_data = {
    "peer_agent_name": "math-agent",
    "adk_function_call_id": "call-123"
}
task_context.register_peer_sub_task("sub-task-456", correlation_data)

# Handle completion
completed_data = task_context.claim_sub_task_completion("sub-task-456")
if completed_data:
    print(f"Sub-task completed: {completed_data}")
```

# content_hash: 79c043cb89eecb2b6a6f05113ff223c5064861f1a537d8e832ebb2738f5801e8
