Metadata-Version: 2.4
Name: h_message_bus
Version: 0.0.41
Summary: Message bus integration for HAI
Author-email: shoebill <shoebill.hai@gmail.com>
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: nats-py~=2.10.0
Provides-Extra: dev
Requires-Dist: pytest; extra == "dev"

# H Message Bus

A message bus integration for HAI applications based on NATS.io

## Overview

H Message Bus provides a robust, asynchronous messaging infrastructure built on NATS.io for HAI applications. It enables seamless communication between components through a publish-subscribe pattern, supporting both fire-and-forget messaging and request-response patterns.

## Features

- **Asynchronous Communication**: Built for modern, non-blocking I/O operations
- **Flexible Message Routing**: Publish and subscribe to specific topics
- **High Reliability**: Automatic reconnection handling and configurable timeouts
- **Simple API**: Focus on core messaging functionality with minimal dependencies

## Installation

```bash
pip install h_message_bus
```

## Requirements

- Python 3.10+
- NATS.io server (can be run via Docker)

## Topics

H Message Bus uses topic strings for message routing. It's recommended to follow the convention: `hai.[source].[destination].[action]`

Example topics:

| Topic String           | Description                         |
|------------------------|-------------------------------------|
| `hai.ai.tg.chat.send`  | AI sending message to Telegram chat |
| `hai.ai.vectors.save`  | AI saving data to vector database   |
| `hai.ai.vectors.query` | AI querying vector database         |
| `hai.tg.ai.chat.send`  | Telegram sending message to AI      |

You can use these example topics or create your own topic strings based on your application's needs.

## Quick Start

### Start a NATS Server

The easiest way to get started is with Docker:

```bash
docker-compose up -d
```

### Create a Publisher

```python
import asyncio
import uuid
from h_message_bus import NatsConfig, NatsPublisherAdapter, HaiMessage, NatsClientRepository

async def main():
    # Configure NATS connection
    config = NatsConfig(server="nats://localhost:4222")

    # Create NATS client repository
    client_repository = NatsClientRepository(config)

    # Create publisher adapter
    publisher = NatsPublisherAdapter(client_repository)

    # Connect to NATS
    await client_repository.connect()

    # Create and publish a message
    message = HaiMessage(
        id=str(uuid.uuid4()),
        topic="hai.tg.ai.chat.send",
        payload={"text": "Hello AI, this is a message from Telegram", "chat_id": 12345}
    )

    # Publish message
    await publisher.publish(message)

    # Clean up
    await publisher.close()

if __name__ == "__main__":
    asyncio.run(main())
```

### Create a Subscriber

```python
import asyncio
from h_message_bus import NatsConfig, NatsSubscriberAdapter, HaiMessage, NatsClientRepository, MessageProcessor

class MyMessageProcessor(MessageProcessor):
    async def process(self, message: HaiMessage):
        print(f"Received message: {message.id}")
        print(f"Topic: {message.topic}")
        print(f"Payload: {message.payload}")
        print(f"Timestamp: {message.timestamp}")

async def main():
    # Configure NATS connection
    config = NatsConfig(server="nats://localhost:4222")

    # Create NATS client repository
    client_repository = NatsClientRepository(config)

    # Create subscriber
    subscriber = NatsSubscriberAdapter(client_repository)

    # Connect to NATS
    await client_repository.connect()

    # Create message processor
    processor = MyMessageProcessor()

    # Subscribe to a topic
    await subscriber.subscribe("hai.tg.ai.chat.send", processor)

    # Keep the application running
    try:
        print("Subscriber running. Press Ctrl+C to exit.")
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        # Clean up
        await client_repository.close()

if __name__ == "__main__":
    asyncio.run(main())
```

## Advanced Usage

### Request-Response Pattern

```python
import asyncio
import uuid
from h_message_bus import NatsConfig, NatsPublisherAdapter, HaiMessage, NatsClientRepository

async def main():
    # Configure NATS connection
    config = NatsConfig(server="nats://localhost:4222")

    # Create NATS client repository
    client_repository = NatsClientRepository(config)

    # Create publisher adapter
    publisher = NatsPublisherAdapter(client_repository)

    # Connect to NATS
    await client_repository.connect()

    request_message = HaiMessage(
        id=str(uuid.uuid4()),
        topic="hai.ai.vectors.query",
        payload={"query": "find similar documents", "limit": 10}
    )

    # Send request and wait for response (with timeout)
    response = await publisher.request(request_message, timeout=5.0)

    if response:
        print(f"Received response: {response.payload}")
    else:
        print("Request timed out")

    # Clean up
    await client_repository.close()

if __name__ == "__main__":
    asyncio.run(main())
```

### Creating a Service for Request-Response

```python
import asyncio
import uuid
import json
from h_message_bus import NatsConfig, NatsSubscriberAdapter, NatsPublisherAdapter, HaiMessage, NatsClientRepository, MessageProcessor

class RequestResponseProcessor(MessageProcessor):
    def __init__(self, publisher):
        self.publisher = publisher

    async def process(self, request: HaiMessage):
        print(f"Received request: {request.id}")
        print(f"Payload: {request.payload}")

        # Process the request
        result = {"status": "success", "data": {"result": 42}}

        # Create a response message
        response = HaiMessage(
            id=str(uuid.uuid4()),
            topic=f"{request.topic}.response",
            payload=result
        )

        # Publish the response
        await self.publisher.publish(response)

async def main():
    # Configure NATS connection
    config = NatsConfig(server="nats://localhost:4222")

    # Create NATS client repository
    client_repository = NatsClientRepository(config)

    # Connect to NATS
    await client_repository.connect()

    # Create publisher and subscriber
    publisher = NatsPublisherAdapter(client_repository)
    subscriber = NatsSubscriberAdapter(client_repository)

    # Create request processor that uses the publisher
    processor = RequestResponseProcessor(publisher)

    # Subscribe to the request topic
    await subscriber.subscribe("hai.ai.vectors.query", processor)

    # Keep the application running
    try:
        print("Service running. Press Ctrl+C to exit.")
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        # Clean up
        await client_repository.close()

if __name__ == "__main__":
    asyncio.run(main())
```

## Configuration Options

The `NatsConfig` class allows you to customize your NATS connection:

| Parameter                | Description                                  | Default       |
|--------------------------|----------------------------------------------|---------------|
| `server`                 | NATS server address                          | Required      |
| `max_reconnect_attempts` | Maximum reconnection attempts                | 10            |
| `reconnect_time_wait`    | Time between reconnection attempts (seconds) | 2             |
| `connection_timeout`     | Connection timeout (seconds)                 | 2             |
| `ping_interval`          | Interval for ping frames (seconds)           | 20            |
| `max_outstanding_pings`  | Maximum unanswered pings before disconnect   | 5             |

## API Reference

### Exported Classes

The following classes are exported directly from the package:

- `NatsConfig` - Configuration for the NATS connection
- `HaiMessage` - Message structure for HAI communication
- `NatsPublisherAdapter` - Adapter for publishing messages
- `NatsSubscriberAdapter` - Adapter for subscribing to messages
- `MessageProcessor` - Processing incoming messages
- `NatsClientRepository` - Low-level NATS client operations

### HaiMessage Structure

The `HaiMessage` class is the core data structure used for all messaging:

```python
class HaiMessage:
    id: str               # Unique identifier for the message
    topic: str            # The topic or channel for the message
    payload: dict         # Actual message data
    timestamp: str = None # Optional message creation timestamp (ISO format)
```

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

## License

This project is licensed under the MIT License - see the LICENSE file for details.
