Metadata-Version: 2.4
Name: kurier-py
Version: 0.1.0
Summary: Kurier — a unified messaging abstraction layer supporting Kafka, RabbitMQ, and Azure Service Bus
Project-URL: Homepage, https://github.com/peteriadamgabor/kurier-py
Project-URL: Repository, https://github.com/peteriadamgabor/kurier-py
Project-URL: Issues, https://github.com/peteriadamgabor/kurier-py/issues
Author: Peter Adam
License: MIT
License-File: LICENSE
Keywords: async,azure-service-bus,cloudevents,kafka,message-bus,messaging,pubsub,rabbitmq
Classifier: Development Status :: 3 - Alpha
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Topic :: System :: Networking
Classifier: Typing :: Typed
Requires-Python: >=3.10
Requires-Dist: defusedxml>=0.7.1
Requires-Dist: pydantic>=2.11.7
Requires-Dist: pyyaml>=6.0.2
Requires-Dist: tomli>=1.1.0; python_version < '3.11'
Requires-Dist: xmltodict>=0.14.2
Provides-Extra: all
Requires-Dist: azure-servicebus>=7.14.2; extra == 'all'
Requires-Dist: kafka-python-ng>=2.2.3; extra == 'all'
Requires-Dist: pika>=1.3.2; extra == 'all'
Provides-Extra: azure
Requires-Dist: azure-servicebus>=7.14.2; extra == 'azure'
Provides-Extra: dev
Requires-Dist: pytest-asyncio>=1.0.0; extra == 'dev'
Requires-Dist: pytest-cov>=6.2.1; extra == 'dev'
Requires-Dist: pytest>=8.4.1; extra == 'dev'
Requires-Dist: ruff>=0.12.0; extra == 'dev'
Requires-Dist: testcontainers[kafka,rabbitmq]>=4.9.2; extra == 'dev'
Provides-Extra: kafka
Requires-Dist: kafka-python-ng>=2.2.3; extra == 'kafka'
Provides-Extra: rabbitmq
Requires-Dist: pika>=1.3.2; extra == 'rabbitmq'
Description-Content-Type: text/markdown

<p align="center">
  <img src="assets/logo.svg" alt="Kurier Logo" width="300">
</p>

# Kurier

**Kurier** is a unified messaging abstraction layer for Python, designed to simplify interactions with multiple message brokers. It supports **Apache Kafka**, **RabbitMQ**, **Azure Service Bus**, and an **In-Memory** transport, providing a consistent async API for publishing and consuming messages.

Inspired by .NET's MassTransit, Kurier aims to bring robust, type-safe, and easy-to-use messaging patterns to the Python ecosystem.

## Features

- **Multi-Line Support:** Seamlessly switch between Kafka, RabbitMQ, Azure Service Bus, and In-Memory lines.
- **Unified Async API:** Use the same `await publish()` and handler registration logic regardless of the underlying transport.
- **Typed Passengers:** Built-in support for Pydantic models and dataclasses, ensuring message schema validation.
- **Flexible Configuration:** Configure lines using JSON, TOML, YAML, XML, or INI files, with `${VAR}` environment variable interpolation.
- **Extensible:** Register custom transport implementations without modifying Kurier's core.
- **Line Registry:** All lines are registered by name, enabling string-based lookups and enforcing global uniqueness.
- **Distributed Tracing:** Automatic `correlation_id` (UUIDv7) injection for end-to-end message tracking.
- **Asyncio Native:** `publish_async()` and `publish_batch_async()` are the primary APIs; sync wrappers are provided for convenience.
- **Qualified Routing:** Automatically uses fully-qualified type names (`module.ClassName`) to prevent handler collisions in large applications.
- **Python Compatibility:** Supports Python 3.10 through 3.13.

## Installation

```bash
pip install kurier-py
```

To install with specific transport support:

```bash
pip install "kurier-py[azure]"
pip install "kurier-py[kafka]"
pip install "kurier-py[rabbitmq]"
pip install "kurier-py[all]"
```

## Quick Start

### 1. Define a Message

```python
from pydantic import BaseModel
from datetime import datetime

class OrderCreated(BaseModel):
    order_id: str
    amount: float
    created_at: datetime  # Nested datetimes are supported recursively
```

### 2. Configure and Publish

```python
import asyncio
from kurier.transports.inmemory.bus import InMemoryLine
from kurier.core.exceptions import KurierPublishError

async def main():
    # Lines are registered by name for global lookup
    line = InMemoryLine(name="orders_line")

    message = OrderCreated(
        order_id="12345", 
        amount=99.99,
        created_at=datetime.now()
    )

    try:
        # publish_async is the native async API
        await line.publish_async(message)
    except KurierPublishError as e:
        print(f"Failed to publish: {e}")

    await line.close()

asyncio.run(main())
```

### 3. Consume at a Station

Register handlers with `@Station.add_handler`. The handler receives two positional arguments:

1. **The typed message** — Pydantic model, dataclass, or primitive. The type annotation determines routing.
2. **The full envelope dict** — contains `correlation_id`, `object_type_name`, `message_payload`, and `metadata`.

Use `*context` in the signature to accept the envelope as an optional variadic argument:

```python
from kurier import Station, BusType

# Handle messages from ANY in-memory line
@Station.add_handler(BusType.IN_MEMORY)
async def handle_order_created(message: OrderCreated, *context):
    print(f"Received order: {message.order_id} for ${message.amount}")
    # context[0] is the full envelope dict (if needed)

# Handle messages from a SPECIFIC line by name
@Station.add_handler("payments_kafka")
async def handle_payment_order(message: OrderCreated, *context):
    print(f"Payment line received: {message.order_id}")
```

When a message arrives on a line, Kurier checks for handlers registered to that line's name first, then falls back to handlers registered to its `BusType`. Both are invoked if both match.

## Handler Routing

### By BusType (category-level)

Handlers registered with a `BusType` receive messages from **all** lines of that type:

```python
@Station.add_handler(BusType.KAFKA)
def log_all_kafka_messages(message: OrderCreated, *context):
    print(f"Kafka message: {message}")
```

### By Line Name (instance-level)

Handlers registered with a string name only receive messages from that specific line:

```python
@Station.add_handler("orders_kafka")
def handle_orders_only(message: OrderCreated, *context):
    print(f"Orders line: {message}")
```

### Multiple Handlers

Multiple handlers can be registered for the same message type. All matching handlers are invoked:

```python
@Station.add_handler(BusType.IN_MEMORY)
def handler_a(message: OrderCreated, *context):
    print("Handler A")

@Station.add_handler(BusType.IN_MEMORY)
def handler_b(message: OrderCreated, *context):
    print("Handler B")

# Both handler_a and handler_b are called when an OrderCreated arrives
```

### Sync and Async Handlers

Both synchronous and asynchronous handlers are supported. Sync handlers are automatically offloaded to a thread-pool executor so they never block the event loop:

```python
@Station.add_handler(BusType.IN_MEMORY)
def sync_handler(message: OrderCreated, *context):
    db.save(message)  # runs in a thread-pool executor

@Station.add_handler(BusType.IN_MEMORY)
async def async_handler(message: OrderCreated, *context):
    await external_api.notify(message)  # runs on the event loop
```

### Supported Message Types

Handlers support Pydantic models, dataclasses, and primitive types:

```python
from dataclasses import dataclass

@dataclass
class SensorReading:
    device_id: str
    temperature: float

@Station.add_handler(BusType.KAFKA)
def handle_sensor(reading: SensorReading, *context):
    print(f"Device {reading.device_id}: {reading.temperature}C")

@Station.add_handler(BusType.IN_MEMORY)
def handle_raw(message: str, *context):
    print(f"Raw string: {message}")
```

### Handler Errors

Handler exceptions propagate to the caller (the consumer loop / `Station.dispatch`). If a message payload cannot be converted to the handler's target type, a `KurierDeserializationError` is raised:

```python
from kurier.core.exceptions import KurierDeserializationError
```

## Batch Publishing

Efficiently send multiple messages in a single logical operation. Kurier handles chunking and assigns a batch-level correlation ID:

```python
messages = [OrderCreated(order_id=str(i), amount=10.0 * i) for i in range(100)]
await line.publish_batch(messages, meta_data={"source": "bulk_import"}, batch_size=50)
```

## CloudEvents Interop

Kurier now supports CloudEvents as an opt-in serializer. This is intended for cross-language interoperability where both sides agree on event type strings and broker topology explicitly.

### Programmatic Usage

```python
from kurier import Station
from kurier.transports.inmemory.bus import InMemoryLine
from kurier.utilities.serializers import CloudEventSerializer

line = InMemoryLine(
    name="orders",
    serializer=CloudEventSerializer(source="kurier://orders-service"),
)

@Station.add_event_handler("com.example.orders.created", "orders")
async def handle_order(message: OrderCreated, context):
    print(context["event_type"])

await line.publish_async(
    OrderCreated(order_id="12345", amount=99.99),
    meta_data={"cloudevents_type": "com.example.orders.created"},
)
```

### File-Based Configuration

```toml
[rabbitmq.orders]
name = "orders"
host = "localhost"
port = 5672
username = "guest"
password = "guest"
virtual_host = "/"
exchange_name = "contracts.orders"
routing_key = "submit-order"
serializer = "cloudevents"
cloudevents_source = "kurier://orders-service"
```

### Topology Strategy

For interoperability, Kurier uses explicit broker configuration rather than trying to reproduce another framework's topology conventions automatically.

- RabbitMQ: set `exchange_name`, `exchange_type`, `routing_key`, and `queue_name` explicitly
- Azure Service Bus: set `queue_name` or `topic_name` / `subscription_name` explicitly
- Kafka: set `topic_name` explicitly

This keeps topology ownership clear and avoids coupling Kurier to MassTransit-specific infrastructure rules.

## Distributed Tracing

Every message in Kurier carries a `correlation_id` (UUIDv7). This ID is preserved across transport boundaries, allowing you to trace a message from its initial `publish` to its final execution in a handler.

Logs emitted by **Hermes** automatically include this ID at the `DEBUG` and `TRACE` levels.

## Configuration

### Environment Variable Interpolation

Configuration values can reference environment variables using `${VAR}` or `${VAR:-default}` syntax. If a variable is unset and no default is provided, a `ValueError` is raised at load time — this prevents silent misconfiguration:

```toml
[rabbitmq]
name = "main_line"
host = "${RABBIT_HOST:-localhost}"
port = 5672
username = "${RABBIT_USER}"
password = "${RABBIT_PASS}"
virtual_host = "/"
```

### From File

Kurier supports multiple configuration formats (JSON, TOML, YAML, XML, INI/CFG). Here is an example `config.toml`:

```toml
[rabbitmq]
name = "main_line"
host = "localhost"
port = 5672
username = "guest"
password = "guest"
virtual_host = "/"
queue_name = "orders_queue"

[kafka]
name = "events_kafka"
bootstrap_servers = ["localhost:9092"]
topic_name = "events"
group_id = "kurier-consumer"
```

Load and initialize all lines from config:

```python
from kurier.configurations.configurator import Configurator

lines = await Configurator.init_transports("config.toml")
# lines = {"main_line": <RabbitMQLine>, "events_kafka": <KafkaLine>}
```

### Programmatic

Lines can also be created directly:

```python
from kurier.transports.kafka.kafkaline import KafkaLine
from kurier.configurations.models.kafka import KafkaConfig

config = KafkaConfig(
    name="orders_kafka",
    bootstrap_servers=["localhost:9092"],
    topic_name="orders",
    group_id="order-service",
)
kafka_line = KafkaLine(config=config)

# Start consuming
KafkaLine.start_consumer("orders_kafka")
```

## Line Registry

All lines are registered globally by name when created. Names must be unique — creating a line with a duplicate name raises `ValueError`:

```python
from kurier.core.basekurier import BaseKurier

# Lookup a line by name
line = BaseKurier.from_registry("orders_kafka")

# List all lines of a specific type
all_kafka_lines = KafkaLine.get_all_items()

# Duplicate names are rejected
line_a = InMemoryLine()  # name="in_memory_instance_line" -> OK
line_b = InMemoryLine()  # -> ValueError: already registered
```

## Logging

Kurier uses a built-in logging utility called **Hermes**. It provides a simple way to capture internal logs or route them to your preferred logging framework.

### Basic Usage

```python
from kurier.hermes import Hermes

# Redirect all INFO logs to stdout
Hermes.set_consumer("INFO", lambda msg: print(f"[MyLog] {msg}", end=""))

# Redirect ERROR logs to a custom handler
Hermes.set_consumer("ERROR", my_error_handler)
```

### Integration with Python `logging`

```python
import logging
from kurier.hermes import Hermes

logger = logging.getLogger("kurier")

Hermes.set_consumer("INFO", lambda msg: logger.info(msg.strip()))
Hermes.set_consumer("ERROR", lambda msg: logger.error(msg.strip()))
Hermes.set_consumer("DEBUG", lambda msg: logger.debug(msg.strip()))
```

## Extending Kurier: Custom Transports

You can add support for other message brokers (e.g., Redis, SQS) without modifying Kurier's core code.

#### 1. Define a Configuration Model

```python
from pydantic import BaseModel

class RedisConfig(BaseModel):
    name: str
    host: str = "localhost"
    port: int = 6379
    channel: str
```

#### 2. Implement the Transport Line

Subclass `BaseKurier` and implement the abstract methods. Call `self._register()` at the end of `__init__` so that partially-initialized instances are never left in the registry on error.

```python
from typing import Any
from kurier.core.basekurier import BaseKurier
from kurier.core.bus_types import BusType
from kurier.utilities.compatibility import override

class RedisLine(BaseKurier):
    bus_type = BusType.IN_MEMORY  # or define your own

    def __init__(self, config: RedisConfig):
        super().__init__(config.name)
        self.config = config
        # ... set up connections ...
        self._register()  # register only after all init succeeds

    @override
    def _publish_to_bus(self, json_message: str, meta_data: dict[str, Any]):
        # Publish to Redis channel
        pass

    @override
    def _run_consumer_loop(self):
        # Subscribe and dispatch via Station
        # loop.run_until_complete(Station.dispatch(self, envelope_str))
        pass

    @override
    async def close(self):
        pass

    @classmethod
    @override
    def start_consumer(cls, name: str) -> bool:
        pass
```

#### 3. Register with Configurator (optional)

If you want file-based configuration support, register the transport class directly (no import-path strings needed):

```python
from kurier.configurations.configurator import Configurator

Configurator.register_transport(
    name="redis",
    class_or_path=RedisLine,          # direct class reference
    config_class=RedisConfig,
)
```

For lazy-loaded transports with optional dependencies, you can pass a dotted import path string instead:

```python
Configurator.register_transport(
    name="redis",
    class_or_path="my_project.transports.redis.RedisLine",
    config_class=RedisConfig,
    extra_requires="my_project[redis]",
)
```

Then in your `config.toml`:

```toml
[redis]
name = "cache_line"
host = "localhost"
port = 6379
channel = "notifications"
```

## Development

Kurier uses [uv](https://github.com/astral-sh/uv) for dependency management.

```bash
# Install dependencies including all transport extras and dev tools
uv sync --all-extras --dev

# Run the complete test suite (unit and serialization)
uv run pytest

# Run transport-specific mock integration tests
uv run pytest tests/integration/

# Lint and format
uv run ruff check src/ tests/
uv run ruff format src/ tests/
```

## License

Distributed under the MIT License. See `LICENSE` for more information.
