Metadata-Version: 2.4
Name: reliable-cmd
Version: 0.1.7
Summary: Command Bus abstraction over PostgreSQL + PGMQ for reliable async command processing
Project-URL: Homepage, https://github.com/FreeSideNomad/rcmd
Project-URL: Documentation, https://github.com/FreeSideNomad/rcmd#readme
Project-URL: Repository, https://github.com/FreeSideNomad/rcmd
Project-URL: Issues, https://github.com/FreeSideNomad/rcmd/issues
Project-URL: Changelog, https://github.com/FreeSideNomad/rcmd/releases
Author: FreeSideNomad
License-Expression: MIT
Keywords: async,command-bus,commandbus,cqrs,message-queue,pgmq,postgresql
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Database
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: orjson>=3.10.0
Requires-Dist: psycopg[binary,pool]>=3.2.0
Provides-Extra: dev
Requires-Dist: mypy>=1.13.0; extra == 'dev'
Requires-Dist: pre-commit>=4.0.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.24.0; extra == 'dev'
Requires-Dist: pytest-cov>=6.0.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.8.0; extra == 'dev'
Provides-Extra: e2e
Requires-Dist: fastapi>=0.128.0; extra == 'e2e'
Requires-Dist: jinja2>=3.1.6; extra == 'e2e'
Requires-Dist: python-dotenv>=1.0.1; extra == 'e2e'
Requires-Dist: python-multipart>=0.0.20; extra == 'e2e'
Requires-Dist: uvicorn[standard]>=0.40.0; extra == 'e2e'
Provides-Extra: test
Requires-Dist: pytest-asyncio>=0.24.0; extra == 'test'
Requires-Dist: pytest-cov>=6.0.0; extra == 'test'
Requires-Dist: pytest>=8.0.0; extra == 'test'
Requires-Dist: testcontainers[postgres]>=4.0.0; extra == 'test'
Description-Content-Type: text/markdown

# rcmd - Reliable Commands

[![PyPI version](https://img.shields.io/pypi/v/reliable-cmd)](https://pypi.org/project/reliable-cmd/)
[![Python Versions](https://img.shields.io/pypi/pyversions/reliable-cmd)](https://pypi.org/project/reliable-cmd/)
[![License: MIT](https://img.shields.io/pypi/l/reliable-cmd)](https://opensource.org/licenses/MIT)

A Python library providing Command Bus abstraction over PostgreSQL + PGMQ for reliable, transactional command processing.

## Table of Contents

- [Why Reliable Commands?](#why-reliable-commands)
- [Ubiquitous Language](#ubiquitous-language)
- [Architecture Overview](#architecture-overview)
- [Command Lifecycle](#command-lifecycle)
- [PGMQ Integration](#pgmq-integration)
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Developer Guide](#developer-guide)
- [Worker Best Practices](#worker-best-practices)
- [E2E Test Application](#e2e-test-application)

---

## Why Reliable Commands?

In distributed systems, ensuring that operations complete reliably is challenging. Consider these common problems:

### The Lost Update Problem

```
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Client    │────▶│   Service   │────▶│  External   │
│             │     │             │     │    API      │
└─────────────┘     └─────────────┘     └─────────────┘
                           │
                    ❌ Crash here
                           │
                    Data saved to DB
                    but API never called
```

When your service crashes between saving data and calling an external API, you end up with inconsistent state.

### The Dual-Write Problem

```python
# DANGEROUS: Two separate operations, no atomicity
await db.save(order)           # ✓ Succeeds
await email_service.send(...)  # ❌ Fails - but order is already saved!
```

### The Solution: Transactional Outbox

Reliable Commands implements the **Transactional Outbox Pattern**:

```
┌─────────────────────────────────────────────────────────┐
│                    PostgreSQL                           │
│  ┌─────────────────┐    ┌─────────────────┐             │
│  │  Business Data  │    │  Command Queue  │             │
│  │    (orders)     │    │     (PGMQ)      │             │
│  └────────┬────────┘    └────────┬────────┘             │
│           │                      │                      │
│           └──────────┬───────────┘                      │
│                      │                                  │
│              SINGLE TRANSACTION                         │
└──────────────────────┼──────────────────────────────────┘
                       │
                       ▼
               ┌───────────────┐
               │    Worker     │
               │  (separate    │
               │   process)    │
               └───────────────┘
```

**Benefits:**
- **Atomicity**: Command is queued in the same transaction as your business data
- **Durability**: PostgreSQL guarantees persistence
- **At-least-once delivery**: Failed commands are automatically retried
- **Observability**: Full audit trail of all state transitions

---

## Ubiquitous Language

Understanding these terms is essential for working with the library:

### Core Concepts

| Term | Definition |
|------|------------|
| **Command** | A request to perform an action. Immutable once created. Contains a unique ID, type, and payload data. |
| **Domain** | A logical grouping of related commands (e.g., "orders", "payments", "inventory"). Each domain has its own queue. |
| **Handler** | An async function that processes a specific command type. Registered via the `@handler` decorator. |
| **Worker** | A long-running process that polls for commands and dispatches them to handlers. |

### State & Lifecycle

| Term | Definition |
|------|------------|
| **Pending** | Command is queued and waiting to be processed. |
| **In Progress** | Worker has claimed the command and handler is executing. |
| **Completed** | Handler returned successfully. Terminal state. |
| **Failed** | Handler raised an error. May be retried or moved to TSQ. |
| **Troubleshooting Queue (TSQ)** | Holds commands that exhausted retries or had permanent failures. Requires operator intervention. |

### Reliability Mechanisms

| Term | Definition |
|------|------------|
| **Visibility Timeout** | How long a worker has to process a command before it becomes visible to other workers again. Prevents message loss if a worker crashes. |
| **Retry Policy** | Rules for how many times and how often to retry failed commands. |
| **Backoff Schedule** | Increasing delays between retry attempts (e.g., 10s, 60s, 300s). |
| **Transient Error** | Temporary failure (network timeout, database lock). Command will be retried. |
| **Permanent Error** | Unrecoverable failure (invalid data, business rule violation). Command goes directly to TSQ. |

### Batches & Correlation

| Term | Definition |
|------|------------|
| **Batch** | A group of related commands tracked together. Useful for bulk operations. |
| **Correlation ID** | Links related commands across a workflow. Useful for tracing. |
| **Audit Event** | Immutable record of a state change. Provides complete history. |

---

## Architecture Overview

```
┌─────────────────────────────────────────────────────────────────────────┐
│                              Your Application                           │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   ┌──────────────┐         ┌──────────────────────────────────────┐     │
│   │  CommandBus  │         │              Worker                  │     │
│   │              │         │  ┌────────────────────────────────┐  │     │
│   │  .send()     │         │  │        HandlerRegistry         │  │     │
│   │  .send_batch │         │  │  ┌──────────┐  ┌──────────┐    │  │     │
│   │  .get_batch  │         │  │  │ Handler  │  │ Handler  │    │  │     │
│   │              │         │  │  │ @orders  │  │ @payments│    │  │     │
│   └──────┬───────┘         │  │  └──────────┘  └──────────┘    │  │     │
│          │                 │  └────────────────────────────────┘  │     │
│          │                 └──────────────────┬───────────────────┘     │
│          │                                    │                         │
└──────────┼────────────────────────────────────┼─────────────────────────┘
           │                                    │
           ▼                                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                            PostgreSQL + PGMQ                            │
│                                                                         │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐          │
│  │  commandbus.    │  │    pgmq.        │  │  commandbus.    │          │
│  │    command      │  │  q_<domain>     │  │    audit        │          │
│  │                 │  │                 │  │                 │          │
│  │  - command_id   │  │  - msg_id       │  │  - event_type   │          │
│  │  - status       │  │  - payload      │  │  - timestamp    │          │
│  │  - attempts     │  │  - vt (visible) │  │  - details      │          │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘          │
│                                                                         │
│  ┌─────────────────┐  ┌─────────────────┐                               │
│  │  commandbus.    │  │  commandbus.    │                               │
│  │    batch        │  │    tsq          │                               │
│  │                 │  │                 │                               │
│  │  - batch_id     │  │  - Failed       │                               │
│  │  - total_count  │  │    commands     │                               │
│  │  - completed    │  │    for review   │                               │
│  └─────────────────┘  └─────────────────┘                               │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘
```

---

## Command Lifecycle

### State Machine

```
                                    ┌────────────────────────────────┐
                                    │                                │
                                    ▼                                │
┌─────────┐    ┌─────────────┐    ┌─────────┐    ┌───────────┐       │
│ PENDING │───▶│ IN_PROGRESS │───▶│ FAILED  │───▶│  PENDING  │───────┘
└─────────┘    └──────┬──────┘    └────┬────┘    └───────────┘
                      │                │          (retry with backoff)
                      │                │
                      ▼                ▼
               ┌───────────┐    ┌─────────────┐
               │ COMPLETED │    │     TSQ     │
               │  (final)  │    │   (final)   │
               └───────────┘    └─────────────┘
```

### State Transitions

| From | To | Trigger |
|------|----|---------|
| — | PENDING | `bus.send()` or `bus.create_batch()` |
| PENDING | IN_PROGRESS | Worker receives message from queue |
| IN_PROGRESS | COMPLETED | Handler returns successfully |
| IN_PROGRESS | FAILED | Handler raises exception |
| FAILED | PENDING | Transient error + retries remaining |
| FAILED | TSQ | Permanent error OR retries exhausted |

### Visibility Timeout Flow

```
Time ──────────────────────────────────────────────────────────────▶

Worker A claims message (VT = now + 30s)
     │
     ├─────────── Processing ───────────┤
     │                                  │
     │                          Worker A completes
     │                          Message deleted ✓
     │
     │
Alternative: Worker A crashes
     │
     ├─────────── Processing ───────────┤
     │                                  │
     X  Worker A dies                   │
                                        │
                              ┌─────────┴─────────┐
                              │  VT expires       │
                              │  Message visible  │
                              │  again            │
                              └─────────┬─────────┘
                                        │
                              Worker B claims message
                              Processing continues...
```

---

## PGMQ Integration

### What is PGMQ?

[PGMQ](https://github.com/tembo-io/pgmq) is a lightweight message queue built as a PostgreSQL extension. It provides:

- **Transactional enqueue**: Messages are only visible after commit
- **Visibility timeout**: Claimed messages are invisible to other consumers
- **Delivery guarantees**: Messages persist until explicitly deleted

### How rcmd Uses PGMQ

```
┌─────────────────────────────────────────────────────────────────┐
│                        Your Transaction                          │
│                                                                  │
│   BEGIN;                                                         │
│                                                                  │
│   -- Your business logic                                         │
│   INSERT INTO orders (id, product, qty) VALUES (...);            │
│                                                                  │
│   -- rcmd: Queue command in same transaction                     │
│   SELECT pgmq.send('q_orders', '{"type": "CreateOrder", ...}');  │
│                                                                  │
│   COMMIT;  ◀── Both succeed or both fail                         │
│                                                                  │
└──────────────────────────────────────────────────────────────────┘
```

### Queue Naming Convention

Each domain gets its own PGMQ queue with the naming pattern `{domain}__commands`:

| Domain | Queue Name |
|--------|------------|
| `orders` | `orders__commands` |
| `payments` | `payments__commands` |
| `inventory` | `inventory__commands` |

### Message Flow

```
Producer                           PGMQ                           Consumer
   │                                │                                │
   │  pgmq.send(queue, payload)     │                                │
   ├───────────────────────────────▶│                                │
   │                                │                                │
   │                                │◀─── pgmq.read(queue, vt=30) ───┤
   │                                │                                │
   │                                │     Returns message + msg_id   │
   │                                ├───────────────────────────────▶│
   │                                │                                │
   │                                │     (invisible for 30s)        │
   │                                │                                │
   │                                │◀─── pgmq.delete(queue, id) ────┤
   │                                │                                │
   │                                │     (on success)               │
   │                                │                                │
```

### LISTEN/NOTIFY for Low Latency

Instead of constant polling, rcmd uses PostgreSQL's LISTEN/NOTIFY:

```
┌──────────────┐                    ┌──────────────┐
│   Producer   │                    │    Worker    │
└──────┬───────┘                    └──────┬───────┘
       │                                   │
       │  INSERT + NOTIFY                  │  LISTEN q_orders
       ├──────────────────────────────────▶│
       │                                   │
       │                                   │  Immediate wake-up!
       │                                   │  (no polling delay)
       │                                   │
```

---

## Installation

```bash
pip install reliable-cmd
```

## Requirements

- Python 3.11+
- PostgreSQL 15+ with [PGMQ extension](https://github.com/tembo-io/pgmq)

## Quick Start

### 1. Database Setup

First, ensure you have PostgreSQL with PGMQ extension installed. Then set up the commandbus schema:

```python
import asyncio
from psycopg_pool import AsyncConnectionPool
from commandbus import setup_database

async def main():
    pool = AsyncConnectionPool(
        conninfo="postgresql://user:pass@localhost:5432/mydb"  # pragma: allowlist secret
    )
    await pool.open()

    # Create commandbus schema, tables, and stored procedures
    created = await setup_database(pool)
    if created:
        print("Database schema created successfully")
    else:
        print("Schema already exists")

    await pool.close()

asyncio.run(main())
```

The `setup_database()` function is idempotent - it safely skips if the schema already exists.

### 2. Alternative: Manual SQL Setup

If you prefer to manage migrations separately (e.g., with Flyway or Alembic), you can get the raw SQL:

```python
from commandbus import get_schema_sql

sql = get_schema_sql()
# Execute this SQL in your migration tool
```

Or copy the SQL file from the installed package:
```bash
python -c "from commandbus import get_schema_sql; print(get_schema_sql())" > schema.sql
```

---

## Developer Guide

This section covers how to set up command handlers and configure workers for your domain.

### 1. Define Command Handlers

Use the `@handler` decorator to mark methods as command handlers. Handlers are organized in classes with constructor-injected dependencies:

```python
from typing import Any

from psycopg_pool import AsyncConnectionPool
from commandbus import Command, HandlerContext, handler

class OrderHandlers:
    """Handlers for order domain commands."""

    def __init__(self, pool: AsyncConnectionPool) -> None:
        """Inject dependencies via constructor."""
        self._pool = pool

    @handler(domain="orders", command_type="CreateOrder")
    async def handle_create_order(
        self, cmd: Command, ctx: HandlerContext
    ) -> dict[str, Any]:
        """Handle CreateOrder command.

        Args:
            cmd: The command with command_id and data
            ctx: Handler context (currently provides metadata)

        Returns:
            Result dict stored in command record
        """
        order_data = cmd.data
        # Process the order...
        return {"status": "created", "order_id": str(cmd.command_id)}

    @handler(domain="orders", command_type="CancelOrder")
    async def handle_cancel_order(
        self, cmd: Command, ctx: HandlerContext
    ) -> dict[str, Any]:
        """Handle CancelOrder command."""
        # Cancel logic...
        return {"status": "cancelled"}
```

### 2. Handle Errors

Use built-in exception types to control retry behavior:

```python
from commandbus.exceptions import PermanentCommandError, TransientCommandError

@handler(domain="orders", command_type="ProcessPayment")
async def handle_payment(self, cmd: Command, ctx: HandlerContext) -> dict[str, Any]:
    try:
        result = await payment_gateway.process(cmd.data)
        return {"status": "paid", "transaction_id": result.id}
    except PaymentDeclined as e:
        # Permanent failure - no retry, moves to troubleshooting queue
        raise PermanentCommandError(
            code="PAYMENT_DECLINED",
            message=str(e)
        )
    except GatewayTimeout as e:
        # Transient failure - will be retried according to policy
        raise TransientCommandError(
            code="GATEWAY_TIMEOUT",
            message=str(e)
        )
```

### 3. Register Handlers and Create Worker

Create a composition root that wires up dependencies and registers handlers:

```python
from psycopg_pool import AsyncConnectionPool
from commandbus import HandlerRegistry, RetryPolicy, Worker

async def create_pool() -> AsyncConnectionPool:
    pool = AsyncConnectionPool(
        conninfo="postgresql://localhost:5432/mydb",  # configure auth as needed
        min_size=2,
        max_size=10,
    )
    await pool.open()
    return pool

def create_registry(pool: AsyncConnectionPool) -> HandlerRegistry:
    """Create registry and register all handlers."""
    # Create handler instances with dependencies
    order_handlers = OrderHandlers(pool)
    inventory_handlers = InventoryHandlers(pool)

    # Register handlers - decorator metadata is used for routing
    registry = HandlerRegistry()
    registry.register_instance(order_handlers)
    registry.register_instance(inventory_handlers)

    return registry

def create_worker(pool: AsyncConnectionPool) -> Worker:
    """Create worker with retry policy."""
    registry = create_registry(pool)

    retry_policy = RetryPolicy(
        max_attempts=3,
        backoff_schedule=[10, 60, 300],  # seconds between retries
    )

    return Worker(
        pool=pool,
        domain="orders",
        registry=registry,
        retry_policy=retry_policy,
        visibility_timeout=30,  # seconds before message redelivery
    )

async def run_worker() -> None:
    """Main entry point."""
    pool = await create_pool()
    try:
        worker = create_worker(pool)
        await worker.run(
            concurrency=4,      # concurrent command handlers
            poll_interval=1.0,  # seconds between queue polls
        )
    finally:
        await pool.close()

if __name__ == "__main__":
    import asyncio
    asyncio.run(run_worker())
```

### 4. Send Commands

Use the `CommandBus` to send commands:

```python
from uuid import uuid4
from commandbus import CommandBus

async def create_order(bus: CommandBus, order_data: dict) -> UUID:
    command_id = uuid4()

    await bus.send(
        domain="orders",
        command_type="CreateOrder",
        command_id=command_id,
        data=order_data,
        max_attempts=3,  # optional, overrides retry policy
    )

    return command_id
```

### 5. Using Batches

Batches group related commands together and track their collective progress. Use batches when you need to:
- Track completion of multiple related commands
- Get notified when all commands in a group complete
- Monitor success/failure rates for a set of operations

```python
from uuid import uuid4
from commandbus import CommandBus, BatchCommand, BatchMetadata

async def process_monthly_billing(bus: CommandBus, accounts: list[dict]) -> UUID:
    """Create a batch of billing commands with completion callback."""

    # Define callback for when batch completes
    async def on_batch_complete(batch: BatchMetadata) -> None:
        print(f"Batch {batch.batch_id} finished:")
        print(f"  - Completed: {batch.completed_count}/{batch.total_count}")
        print(f"  - Failed: {batch.in_troubleshooting_count}")
        print(f"  - Status: {batch.status.value}")

    # Create batch with commands
    result = await bus.create_batch(
        domain="billing",
        commands=[
            BatchCommand(
                command_type="ProcessPayment",
                command_id=uuid4(),
                data={"account_id": acc["id"], "amount": acc["balance"]},
            )
            for acc in accounts
        ],
        name="Monthly billing - January 2026",
        on_complete=on_batch_complete,  # Called when all commands finish
    )

    print(f"Created batch {result.batch_id} with {result.total_commands} commands")
    return result.batch_id


async def monitor_batch(bus: CommandBus, batch_id: UUID) -> None:
    """Poll batch status for progress monitoring."""
    batch = await bus.get_batch("billing", batch_id)
    if batch:
        progress = (batch.completed_count + batch.in_troubleshooting_count) / batch.total_count
        print(f"Batch progress: {progress:.1%}")
        print(f"  Status: {batch.status.value}")
        print(f"  Completed: {batch.completed_count}")
        print(f"  In TSQ: {batch.in_troubleshooting_count}")
```

**Batch Status Lifecycle:**
- `PENDING` → Batch created, commands waiting to be processed
- `IN_PROGRESS` → At least one command has started processing
- `COMPLETED` → All commands completed successfully
- `COMPLETED_WITH_FAILURES` → All commands finished, some failed (in TSQ)

**Note:** Batch callbacks are in-memory only and will be lost on worker restart. For critical workflows, poll `get_batch()` as a fallback.

---

## Worker Best Practices

Writing efficient and reliable handlers requires understanding how the worker processes commands.

### Concurrency Model

```
┌─────────────────────────────────────────────────────────────────────┐
│                           Worker Process                            │
│                                                                     │
│   ┌─────────────┐                                                   │
│   │  Event Loop │                                                   │
│   │   (asyncio) │                                                   │
│   └──────┬──────┘                                                   │
│          │                                                          │
│          ├──────────────┬──────────────┬──────────────┐             │
│          ▼              ▼              ▼              ▼             │
│   ┌───────────┐  ┌───────────┐  ┌───────────┐  ┌───────────┐        │
│   │  Handler  │  │  Handler  │  │  Handler  │  │  Handler  │        │
│   │  Task 1   │  │  Task 2   │  │  Task 3   │  │  Task 4   │        │
│   │           │  │           │  │           │  │           │        │
│   │  await    │  │  await    │  │  await    │  │  await    │        │
│   │  db.query │  │  api.call │  │  db.save  │  │  sleeping │        │
│   └───────────┘  └───────────┘  └───────────┘  └───────────┘        │
│                                                                     │
│   concurrency=4 means 4 handlers run concurrently                   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```

### Critical: Use Async I/O

**The #1 rule: Never block the event loop.**

```python
# ❌ BAD: Blocking call destroys concurrency
@handler(domain="orders", command_type="ProcessOrder")
async def handle_order(self, cmd: Command, ctx: HandlerContext):
    import requests
    response = requests.get("https://api.example.com")  # BLOCKS!
    # While this runs, ALL other handlers are frozen

# ✓ GOOD: Async call allows concurrency
@handler(domain="orders", command_type="ProcessOrder")
async def handle_order(self, cmd: Command, ctx: HandlerContext):
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")  # Yields!
        # Other handlers continue while waiting for response
```

### Impact of Blocking Code

```
concurrency=4, but with blocking I/O:

Time ─────────────────────────────────────────────────▶

Handler 1: ████████████████████████  (blocking HTTP call)
Handler 2:                         ▓▓▓▓▓▓▓▓  (waiting)
Handler 3:                                   ░░░░░░░░  (waiting)
Handler 4:                                             (waiting)

Effective throughput: 1 command at a time!


concurrency=4, with async I/O:

Time ─────────────────────────────────────────────────▶

Handler 1: ██░░░░██░░░░██  (I/O waits yield to others)
Handler 2: ░░██░░░░██░░░░██
Handler 3: ░░░░██░░░░██░░░░██
Handler 4: ██░░░░██░░░░██░░░░

Effective throughput: 4 commands concurrently!
```

### Transaction Participation via ctx.conn

**Handlers can participate in the worker's transaction** by using `ctx.conn`. This ensures your business logic and command completion are atomic:

```python
@handler(domain="orders", command_type="CreateOrder")
async def handle_create_order(self, cmd: Command, ctx: HandlerContext):
    # Use ctx.conn to participate in the worker's transaction
    # Your operations + command completion are now atomic
    await ctx.conn.execute(
        "INSERT INTO orders (id, data) VALUES (%s, %s)",
        (cmd.command_id, Json(cmd.data))
    )
    await ctx.conn.execute(
        "UPDATE inventory SET qty = qty - %s WHERE product_id = %s",
        (cmd.data["quantity"], cmd.data["product_id"])
    )
    # If handler returns successfully: both inserts + command completion COMMIT
    # If handler raises exception: everything ROLLS BACK, command retries

    return {"status": "created"}
```

**Why use ctx.conn?**

```
┌─────────────────────────────────────────────────────────────────────┐
│                    Worker Transaction                                │
│                                                                      │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  Handler executes (using ctx.conn)                           │   │
│  │  ├── INSERT INTO orders ...                                  │   │
│  │  └── UPDATE inventory ...                                    │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                              │                                       │
│                              ▼                                       │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  Command completion                                          │   │
│  │  ├── DELETE from PGMQ queue                                  │   │
│  │  ├── UPDATE command status to COMPLETED                      │   │
│  │  └── INSERT audit event                                      │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                              │                                       │
│                              ▼                                       │
│                          COMMIT                                      │
│              (all operations succeed together)                       │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘
```

**If handler fails:**

```
┌─────────────────────────────────────────────────────────────────────┐
│                    Worker Transaction                                │
│                                                                      │
│  Handler executes...                                                 │
│  ├── INSERT INTO orders ...  ✓                                      │
│  ├── UPDATE inventory ...    ✓                                      │
│  └── raise TransientCommandError("timeout")  ✗                      │
│                              │                                       │
│                              ▼                                       │
│                          ROLLBACK                                    │
│              (orders insert reverted, inventory unchanged)           │
│              (command will be retried after backoff)                 │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘
```

### Service Layer Pattern

For services that can be called from handlers or standalone, accept an optional `conn` parameter:

```python
class PaymentService:
    def __init__(self, pool: AsyncConnectionPool):
        self._pool = pool

    async def transfer(
        self,
        from_account: str,
        to_account: str,
        amount: Decimal,
        conn: AsyncConnection | None = None,  # Optional connection
    ) -> dict:
        """Transfer money. Uses provided conn or acquires from pool."""
        if conn is not None:
            # Participate in caller's transaction
            return await self._transfer_impl(from_account, to_account, amount, conn)

        # Standalone call - manage own transaction
        async with self._pool.connection() as acquired_conn:
            async with acquired_conn.transaction():
                return await self._transfer_impl(from_account, to_account, amount, acquired_conn)


class PaymentHandlers:
    def __init__(self, payment_service: PaymentService):
        self._service = payment_service

    @handler(domain="payments", command_type="Transfer")
    async def handle_transfer(self, cmd: Command, ctx: HandlerContext) -> dict:
        # Pass ctx.conn to service - transfer is atomic with command completion
        return await self._service.transfer(
            from_account=cmd.data["from"],
            to_account=cmd.data["to"],
            amount=Decimal(cmd.data["amount"]),
            conn=ctx.conn,  # Transaction participation
        )
```

### Idempotency

Commands may be delivered more than once (at-least-once delivery). Design handlers to be idempotent:

```python
@handler(domain="payments", command_type="ProcessPayment")
async def handle_payment(self, cmd: Command, ctx: HandlerContext):
    async with self._pool.connection() as conn:
        # Check if already processed
        result = await conn.execute(
            "SELECT 1 FROM payments WHERE command_id = %s",
            (cmd.command_id,)
        )
        if await result.fetchone():
            # Already processed - return cached result
            return {"status": "already_processed"}

        # Process payment
        await conn.execute(
            "INSERT INTO payments (command_id, amount) VALUES (%s, %s)",
            (cmd.command_id, cmd.data["amount"])
        )

    return {"status": "processed"}
```

### Long-Running Operations

For operations that may exceed visibility timeout, you have two options:

**Option 1: Extend visibility timeout periodically**

```python
@handler(domain="reports", command_type="GenerateReport")
async def handle_report(self, cmd: Command, ctx: HandlerContext):
    records = cmd.data["records"]
    results = []

    for i, record in enumerate(records):
        # Extend visibility every 100 records to prevent timeout
        if i > 0 and i % 100 == 0:
            await ctx.extend_visibility(30)  # Add 30 more seconds

        results.append(await process_record(record))

    return {"status": "completed", "processed": len(results)}
```

**Option 2: Break into a tracked batch of smaller commands**

```python
@handler(domain="reports", command_type="GenerateReport")
async def handle_report(self, cmd: Command, ctx: HandlerContext):
    from uuid import uuid4
    from commandbus import BatchCommand

    chunks = split_into_chunks(cmd.data["records"], chunk_size=100)
    report_id = cmd.data["report_id"]

    # Create a batch to track all chunk processing
    result = await self._bus.create_batch(
        domain="reports",
        commands=[
            BatchCommand(
                command_type="ProcessReportChunk",
                command_id=uuid4(),
                data={"chunk": chunk, "report_id": report_id, "chunk_index": i},
            )
            for i, chunk in enumerate(chunks)
        ],
        name=f"Report {report_id} chunks",
    )

    return {"status": "chunked", "batch_id": str(result.batch_id), "chunks": len(chunks)}
```

### Summary: Handler Checklist

| Requirement | Why |
|-------------|-----|
| Use `async`/`await` for all I/O | Blocking calls kill concurrency |
| Use async database drivers | psycopg3 with `AsyncConnection` |
| Use async HTTP clients | httpx, aiohttp (not requests) |
| Handle idempotency | Commands may be delivered multiple times |
| Keep handlers fast | Long operations risk visibility timeout |
| Use transactions | Ensure atomicity of database operations |
| Raise appropriate errors | `TransientCommandError` vs `PermanentCommandError` |

---

## E2E Test Application

The repository includes an end-to-end test application with a web UI for testing command processing with **probabilistic behaviors**.

### Prerequisites

- Docker and Docker Compose
- Python 3.11+ with dependencies installed (see Quick Start)

### Running the E2E Application

**1. Start the database:**

```bash
make docker-up
```

**2. Start the web UI:**

```bash
make e2e-app
```

The web UI is available at http://localhost:5001

**3. Start workers (in a separate terminal):**

```bash
cd tests/e2e
python -m app.worker
```

To run multiple workers for load testing:

```bash
cd tests/e2e
for i in {1..4}; do
  python -m app.worker &
done
```

### Probabilistic Behavior Model

Commands use a **probabilistic behavior model** with configurable outcome percentages:

| Parameter | Description |
|-----------|-------------|
| `fail_permanent_pct` | Chance of permanent failure (0-100%) |
| `fail_transient_pct` | Chance of transient failure (0-100%) |
| `timeout_pct` | Chance of timeout behavior (0-100%) |
| `min_duration_ms` | Minimum execution time (ms) |
| `max_duration_ms` | Maximum execution time (ms) |

**Evaluation Order:** Probabilities are evaluated sequentially - permanent failure first, then transient, then timeout. If none trigger, the command succeeds with execution time sampled from a normal distribution between min and max duration.

**Example Configurations:**

| Scenario | Settings |
|----------|----------|
| **Pure throughput test** | All percentages 0%, duration 0ms |
| **Realistic workload** | 1% permanent, 5% transient, 100-500ms duration |
| **High failure rate** | 10% permanent, 20% transient |
| **Stress test retries** | 50% transient failure rate |

### Outcome Calculator

The UI includes an **Expected Outcomes Calculator** that shows predicted results based on your probability settings:

```
For 10,000 commands with 2% permanent, 8% transient:
├── ~200 permanent failures → TSQ immediately
├── ~800 transient failures → Retry (some recover)
└── ~9,000 succeed on first attempt
```

### Bulk Generation

For load testing, use the bulk generation form:
1. Adjust probability sliders for desired failure rates
2. Set execution time range (0ms for maximum throughput)
3. Set count (up to 1,000,000)
4. Click "Generate Bulk Commands"

### Monitoring

The E2E UI provides:
- **Dashboard**: Real-time status counts and throughput metrics
- **Commands**: List and filter commands by status
- **Troubleshooting Queue**: View and action failed commands
- **Audit Trail**: Full event history per command

---

## License

MIT
