Metadata-Version: 2.4
Name: rapids-streams
Version: 0.2.0
Summary: Rapids - A Python package
Author-email: Fedir Skitsko <fed@azx.io>
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.12
Requires-Dist: orjson>=3.9.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: redis>=5.0.0
Description-Content-Type: text/markdown

# Rapids

A typed, async-first event router for Redis Streams. Build event-driven architectures with Pydantic-powered type safety, automatic event registration, and production-ready reliability features.

## Table of Contents

- [Features](#features)
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Events](#events)
  - [BaseEvent](#baseevent)
  - [Event Inheritance](#event-inheritance)
  - [GenericEvent](#genericevent)
  - [Schema Versioning](#schema-versioning)
- [EventEmitter](#eventemitter)
  - [Causation Chains](#causation-chains)
- [Event Router](#event-router)
  - [Basic Setup](#basic-setup)
  - [Handler Matching](#handler-matching)
  - [Multiple Handlers](#multiple-handlers)
- [Correlation Tracking](#correlation-tracking)
- [Error Handling and Retries](#error-handling-and-retries)
  - [Two-Tier Retry System](#two-tier-retry-system)
  - [Handler-Level Configuration](#handler-level-configuration)
  - [Backoff Strategies](#backoff-strategies)
  - [Dead-Letter Queue (DLQ)](#dead-letter-queue-dlq)
  - [Exception Types](#exception-types)
- [Configuration Reference](#configuration-reference)
- [Complete Example](#complete-example)
- [Framework Integration](#framework-integration)
- [Redis Streams Primer](#redis-streams-primer)
- [Development](#development)
- [License](#license)

## Features

- **Typed Events** - Define events as Pydantic models with automatic naming and registration
- **Async-First** - Built for `asyncio` with non-blocking I/O
- **Flexible Routing** - Match events by type, base class, union types, or glob patterns
- **Consumer Groups** - Distributed consumption with automatic load balancing
- **Reliability** - Retries, backoff strategies, timeouts, and dead-letter queues
- **Correlation Tracking** - Trace event chains with correlation and causation IDs
- **Fast Serialization** - Uses `orjson` for ~6x faster JSON encoding

## Installation

```bash
pip install rapids-streams
```

Requirements: Python 3.12+, Redis 6.2+ (for XAUTOCLAIM support)

## Quick Start

### 1. Define Events

```python
from rapids import BaseEvent

class UserCreated(BaseEvent):
    user_id: str
    email: str

class OrderPlaced(BaseEvent):
    order_id: str
    user_id: str
    total: float
```

Events are automatically named based on class name:
- `UserCreated` → `user.created`
- `OrderPlaced` → `order.placed`
- `S3ObjectUploaded` → `s3_object.uploaded`

### 2. Emit Events

```python
from redis.asyncio import Redis
from rapids import EventEmitter

redis = Redis()

# Create an emitter instance
emitter = EventEmitter(redis=redis, stream="events:all")

# Emit events
await emitter.emit(UserCreated(user_id="123", email="user@example.com"))

# Batch emit (single Redis round-trip)
await emitter.emit([
    OrderPlaced(order_id="order-1", user_id="123", total=99.99),
    OrderPlaced(order_id="order-2", user_id="456", total=149.99),
])
```

### 3. Consume Events

```python
from rapids import EventRouter

router = EventRouter(
    redis=redis,
    stream="events:all",
    group="order-service",
)

@router.on(OrderPlaced)
async def handle_order(event: OrderPlaced):
    print(f"Processing order {event.order_id} for ${event.total}")

# Start consuming (blocks until stopped)
await router.start()
```

## Events

### BaseEvent

All custom events inherit from `BaseEvent`, a frozen Pydantic model with automatic fields:

| Field | Type | Description |
|-------|------|-------------|
| `id` | `str` | Unique UUID (auto-generated) |
| `timestamp` | `datetime` | UTC creation time (auto-generated) |
| `correlation_id` | `str \| None` | Groups related events together |
| `causation_id` | `str \| None` | ID of the event that caused this one |
| `metadata` | `dict` | Arbitrary key-value data |

```python
from rapids import BaseEvent

class DocumentCreated(BaseEvent):
    doc_id: str
    title: str
    author: str

# Events are frozen (immutable)
event = DocumentCreated(
    doc_id="doc-123",
    title="Annual Report",
    author="alice",
    correlation_id="request-abc",
    metadata={"source": "api", "version": 2},
)

print(event.id)           # "550e8400-e29b-41d4-a716-446655440000"
print(event.timestamp)    # 2024-11-28T12:00:00+00:00
print(event.event_type)   # "document.created"
```

### Event Inheritance

Create event hierarchies for shared fields and handler matching:

```python
class DocumentEvent(BaseEvent):
    """Base class for all document events."""
    doc_id: str

class DocumentCreated(DocumentEvent):
    title: str
    author: str

class DocumentDeleted(DocumentEvent):
    deleted_by: str

class DocumentArchived(DocumentEvent):
    archive_location: str
```

### GenericEvent

Unknown event types are deserialized as `GenericEvent`:

```python
from rapids import GenericEvent

@router.on(GenericEvent)
async def handle_unknown(event: GenericEvent):
    print(f"Unknown type: {event.raw_type}")
    print(f"Raw data: {event.data}")
```

### Schema Versioning

Events evolve over time. Rapids supports schema versioning to handle breaking changes without disrupting consumers.

#### Forward Compatibility

By default, events ignore unknown fields (`extra="ignore"`), so adding new optional fields is non-breaking:

```python
# v1: Original event
class UserCreated(BaseEvent):
    user_id: str
    email: str

# v2: Added optional field - old consumers ignore it
class UserCreated(BaseEvent):
    user_id: str
    email: str
    display_name: str | None = None  # New optional field
```

#### Breaking Changes with Migrations

For breaking changes (new required fields, type changes, renamed fields), increment `schema_version` and use a Pydantic `model_validator` to migrate old data:

```python
from typing import ClassVar
from pydantic import model_validator
from rapids import BaseEvent

class UserCreated(BaseEvent):
    schema_version: ClassVar[int] = 2  # Increment for breaking changes

    user_id: str
    email: str
    created_at: datetime  # New required field in v2

    @model_validator(mode="before")
    @classmethod
    def _migrate(cls, data: dict) -> dict:
        # _schema_version is injected during deserialization
        version = data.pop("_schema_version", cls.schema_version)

        if version < 2:
            # v1 → v2: Use event timestamp as created_at
            data["created_at"] = data.get("_timestamp")

        return data
```

The deserializer injects `_schema_version` (the version stored in the message) and `_timestamp` (the event timestamp) for use in migrations.

#### Multiple Version Migrations

For events with multiple version upgrades, chain migrations sequentially:

```python
class OrderPlaced(BaseEvent):
    schema_version: ClassVar[int] = 3

    order_id: str
    user_id: str
    total: Decimal      # Changed from float in v2
    currency: str       # Added in v3

    @model_validator(mode="before")
    @classmethod
    def _migrate(cls, data: dict) -> dict:
        version = data.pop("_schema_version", cls.schema_version)

        # v1 → v2: Convert total from float to Decimal string
        if version < 2:
            data["total"] = str(data.get("total", 0))

        # v2 → v3: Add default currency
        if version < 3:
            data["currency"] = "USD"

        return data
```

#### Best Practices

1. **Increment `schema_version`** only for breaking changes
2. **Keep migrations simple** - transform data, don't add business logic
3. **Test migrations** with fixture data from each historical version
4. **Document changes** in the migration method docstring
5. **Use `_timestamp`** when new fields can derive from event metadata

## EventEmitter

```python
from rapids import EventEmitter

emitter = EventEmitter(
    redis=redis,
    stream="events:all",
    maxlen=100_000,      # Auto-trim stream (None to disable)
    max_retries=3,       # Retry on connection failures
    retry_delay=0.1,     # Base delay for exponential backoff
)

# Single event
msg_id = await emitter.emit(UserCreated(user_id="123", email="a@b.com"))

# Batch emit (returns list of message IDs)
msg_ids = await emitter.emit([event1, event2, event3])
```

### Causation Chains

Link events to track cause-effect relationships:

```python
@router.on(DocumentCreated)
async def index_document(event: DocumentCreated):
    # Index the document...

    # Emit with causation link
    await emitter.emit(
        DocumentIndexed(doc_id=event.doc_id),
        caused_by=event,  # Sets causation_id to event.id
    )
```

## Event Router

### Basic Setup

```python
from rapids import EventRouter, GroupStartPosition

router = EventRouter(
    redis=redis,
    stream="events:all",
    group="my-service",
    consumer="worker-1",  # Auto-generated if None
)

@router.on(UserCreated)
async def handle_user(event: UserCreated):
    print(f"New user: {event.email}")

# Start consuming
await router.start()

# Graceful shutdown
await router.stop()
```

### Handler Matching

Rapids supports multiple ways to match events to handlers:

#### Exact Type Match

```python
@router.on(UserCreated)
async def handle_user_created(event: UserCreated):
    pass
```

#### Base Class Match

Matches the base class and all its subclasses:

```python
@router.on(DocumentEvent)
async def audit_all_documents(event: DocumentEvent):
    # Handles DocumentCreated, DocumentDeleted, DocumentArchived
    print(f"Document {event.doc_id}: {event.event_type}")
```

#### Union Types

Match multiple specific types:

```python
@router.on(DocumentCreated | DocumentDeleted)
async def handle_lifecycle(event: DocumentCreated | DocumentDeleted):
    pass
```

#### Glob Patterns

Use wildcards for flexible matching:

```python
@router.on("document.*")
async def handle_all_document_events(event):
    # Matches document.created, document.deleted, document.archived
    pass

@router.on("user.*")
async def handle_all_user_events(event):
    pass

@router.on("*")
async def log_everything(event):
    # Catch-all handler
    print(f"Event: {event.routing_key}")
```

### Multiple Handlers

Multiple handlers can process the same event:

```python
@router.on(OrderPlaced)
async def send_confirmation(event: OrderPlaced):
    # Send email
    pass

@router.on(OrderPlaced)
async def update_inventory(event: OrderPlaced):
    # Decrease stock
    pass

@router.on("*")
async def record_metrics(event):
    # Track all events
    pass
```

## Correlation Tracking

Track related events across your system:

### Correlation Scope

```python
from rapids import correlation_scope, EventEmitter

emitter = EventEmitter(redis=redis, stream="events:all")

async def process_upload(file):
    async with correlation_scope() as correlation_id:
        # All events in this scope share the same correlation_id
        await emitter.emit(UploadStarted(filename=file.name))

        doc = await parse(file)
        await emitter.emit(DocumentCreated(doc_id=doc.id))

        await emitter.emit(UploadCompleted(doc_id=doc.id))

        print(f"All events have correlation_id: {correlation_id}")
```

### Explicit Correlation

```python
from rapids import get_correlation_id, get_causation_id

@router.on(OrderPlaced)
async def process_order(event: OrderPlaced):
    # Access correlation context
    print(f"Correlation: {event.correlation_id}")
    print(f"Causation: {event.causation_id}")

    # Continue the chain
    await emitter.emit(
        PaymentProcessed(order_id=event.order_id),
        caused_by=event,
    )
```

## Error Handling and Retries

### Two-Tier Retry System

Rapids provides two levels of retry protection:

1. **Quick In-Process Retries** - Fast retries within the handler
2. **Redis-Level Retries** - Automatic re-delivery via consumer groups

### Handler-Level Configuration

```python
from rapids.backoff import exponential, linear, constant

@router.on(
    ExternalApiEvent,
    timeout=60.0,                           # Handler timeout in seconds
    retries=3,                              # Quick retry attempts
    backoff=exponential(base=2, max_delay=30),  # Delay between retries
    retryable_exceptions=(ConnectionError, TimeoutError),
)
async def call_external_api(event: ExternalApiEvent):
    async with httpx.AsyncClient() as client:
        await client.post("https://api.example.com", json=event.model_dump())
```

### Backoff Strategies

```python
from rapids.backoff import exponential, linear, constant

# Exponential: 2, 4, 8, 16, 32, 60... (with jitter to prevent thundering herd)
exponential(base=2.0, max_delay=60.0, jitter=True)

# Linear: 1, 2, 3, 4, 5...
linear(delay=1.0, max_delay=60.0)

# Constant: 5, 5, 5, 5...
constant(delay=5.0)
```

### Dead-Letter Queue (DLQ)

Failed messages are sent to a DLQ for later inspection:

```python
router = EventRouter(
    redis=redis,
    stream="events:all",
    group="my-service",
    max_deliveries=5,              # Send to DLQ after 5 failed deliveries
    dlq_stream="events:dlq",       # DLQ stream name
    dlq_maxlen=10_000,             # Trim DLQ to 10k messages
)
```

Messages are sent to DLQ when:
- Delivery count exceeds `max_deliveries`
- Deserialization fails
- Handler raises a non-retryable exception

DLQ entry format:
```json
{
  "original_msg_id": "1701234567890-0",
  "original_stream": "events:all",
  "original_group": "my-service",
  "reason": "max_deliveries_exceeded",
  "error": "Connection refused",
  "handler": "call_external_api",
  "failed_at": "2024-11-28T12:00:00+00:00",
  "event_type": "external_api.event",
  "data": "{...}"
}
```

### Exception Types

```python
from rapids.exceptions import (
    RapidsError,              # Base exception
    EmitError,                # Failed to emit after retries
    DeserializationError,     # Invalid event data
    HandlerTimeoutError,      # Handler exceeded timeout
    MaxRetriesExceededError,  # Quick retries exhausted
    NonRetryableError,        # Handler raised non-retryable exception
)
```

## Configuration Reference

### EventEmitter

| Option | Default | Description |
|--------|---------|-------------|
| `redis` | required | Redis async client |
| `stream` | `"events:all"` | Target stream name |
| `maxlen` | `100_000` | Auto-trim stream (None to disable) |
| `max_retries` | `3` | Retry attempts on emit failure |
| `retry_delay` | `0.1` | Base delay for exponential backoff |

### EventRouter

| Option | Default | Description |
|--------|---------|-------------|
| `redis` | required | Redis async client |
| `stream` | `"events:all"` | Source stream name |
| `group` | `"default"` | Consumer group name |
| `consumer` | auto | Consumer name (hostname-pid if None) |
| `batch_size` | `10` | Messages per XREADGROUP |
| `block_ms` | `5000` | Blocking read timeout (ms) |
| `handler_timeout` | `30.0` | Default handler timeout (seconds) |
| `group_start_id` | `BEGINNING` | Where new groups start |
| `claim_interval` | `10` | XAUTOCLAIM every N reads |
| `claim_min_idle_ms` | `60_000` | Claim messages idle > this |
| `default_retries` | `0` | Default quick retries |
| `default_backoff` | `exponential()` | Default backoff strategy |
| `max_deliveries` | `5` | Deliveries before DLQ |
| `dlq_stream` | `None` | DLQ stream (None to disable) |
| `dlq_maxlen` | `10_000` | DLQ trim size |

### GroupStartPosition

```python
from rapids import GroupStartPosition

# Read entire stream history (for new groups that need historical data)
GroupStartPosition.BEGINNING  # "0"

# Only new messages after group creation
GroupStartPosition.LATEST     # "$"
```

### Handler Decorator

```python
@router.on(
    target,                    # Event class, union, or pattern
    timeout=30.0,              # Override router default
    retries=0,                 # Quick retry attempts
    backoff=exponential(),     # Backoff strategy
    retryable_exceptions=(),   # Exceptions to retry
)
```

## Complete Example

```python
import asyncio
from redis.asyncio import Redis
from rapids import (
    BaseEvent,
    EventEmitter,
    EventRouter,
    correlation_scope,
    GroupStartPosition,
)
from rapids.backoff import exponential


# Define events
class OrderPlaced(BaseEvent):
    order_id: str
    user_id: str
    items: list[str]
    total: float


class PaymentProcessed(BaseEvent):
    order_id: str
    payment_id: str
    amount: float


class OrderShipped(BaseEvent):
    order_id: str
    tracking_number: str


# Application setup
async def main():
    redis = Redis(host="localhost", port=6379)

    # Create emitter instance
    emitter = EventEmitter(redis=redis, stream="orders:events")

    # Setup router
    router = EventRouter(
        redis=redis,
        stream="orders:events",
        group="fulfillment-service",
        handler_timeout=60.0,
        max_deliveries=3,
        dlq_stream="orders:dlq",
    )

    @router.on(OrderPlaced)
    async def process_payment(event: OrderPlaced):
        print(f"Processing payment for order {event.order_id}")
        # Process payment...
        payment_id = f"pay-{event.order_id}"

        await emitter.emit(
            PaymentProcessed(
                order_id=event.order_id,
                payment_id=payment_id,
                amount=event.total,
            ),
            caused_by=event,
        )

    @router.on(
        PaymentProcessed,
        retries=3,
        backoff=exponential(base=2, max_delay=30),
    )
    async def ship_order(event: PaymentProcessed):
        print(f"Shipping order {event.order_id}")
        # Create shipment...

        await emitter.emit(
            OrderShipped(
                order_id=event.order_id,
                tracking_number="1Z999AA10123456784",
            ),
            caused_by=event,
        )

    @router.on("order.*")
    async def audit_log(event):
        print(f"[AUDIT] {event.routing_key}: {event.id}")

    # Start consuming
    print("Starting order fulfillment service...")
    await router.start()


if __name__ == "__main__":
    asyncio.run(main())
```

## Framework Integration

### FastAPI

Rapids supports deferred Redis setup, ideal for FastAPI's lifespan pattern:

```python
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from redis.asyncio import Redis
from rapids import BaseEvent, EventEmitter, EventRouter

# Define events
class UserCreated(BaseEvent):
    user_id: str
    email: str

# Create at module level - no Redis yet
router = EventRouter(stream="events", group="my-service")
emitter = EventEmitter(stream="events")

@router.on(UserCreated)
async def handle_user(event: UserCreated):
    print(f"New user: {event.email}")

@asynccontextmanager
async def lifespan(app: FastAPI):
    redis = Redis()
    emitter.configure(redis)       # Configure emitter
    task = asyncio.create_task(router.start(redis))  # Start router
    yield
    await router.stop()
    task.cancel()
    await redis.close()

app = FastAPI(lifespan=lifespan)

@app.post("/users")
async def create_user(email: str):
    event = UserCreated(user_id="123", email=email)
    await emitter.emit(event)
    return {"status": "ok"}
```

## Redis Streams Primer

Rapids uses Redis Streams consumer groups for reliable message delivery:

- **At-least-once delivery**: Messages are redelivered until acknowledged
- **Consumer groups**: Multiple consumers share the workload
- **Pending messages**: Failed messages stay pending for redelivery
- **XAUTOCLAIM**: Automatically reclaim messages from dead consumers

Key Redis commands used:
- `XADD` - Add events to stream
- `XGROUP CREATE` - Create consumer group
- `XREADGROUP` - Read messages for consumer
- `XACK` - Acknowledge message processing
- `XAUTOCLAIM` - Claim idle messages from dead consumers

## Development

### Setup

```bash
# Install uv if not already installed
curl -LsSf https://astral.sh/uv/install.sh | sh

# Install dependencies
make install
```

### Commands

```bash
make help          # Show all commands
make test          # Run all tests
make test-unit     # Run unit tests only (fast, no Docker)
make test-integration  # Run integration tests (requires Docker)
make lint          # Run linting
make format        # Format code
make typecheck     # Run type checking
make ci            # Run full CI checks
```

### Testing

Unit tests use `fakeredis` for fast, isolated testing. Integration tests use `testcontainers` to spin up real Redis instances.

## License

MIT
