Metadata-Version: 2.4
Name: fastapi-watch
Version: 1.0.1
Summary: Structured health and readiness check system for FastAPI
Project-URL: Repository, https://github.com/rgreen1207/fastapi-watch
License: MIT
Keywords: fastapi,health,kubernetes,liveness,probes,readiness
Classifier: Development Status :: 5 - Production/Stable
Classifier: Framework :: FastAPI
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.11
Requires-Dist: fastapi>=0.100.0
Requires-Dist: pydantic>=2.0
Provides-Extra: all
Requires-Dist: aio-pika>=9.4; extra == 'all'
Requires-Dist: aiohttp>=3.9; extra == 'all'
Requires-Dist: aiokafka>=0.9; extra == 'all'
Requires-Dist: aiomcache>=0.6; extra == 'all'
Requires-Dist: aiomysql>=0.2; extra == 'all'
Requires-Dist: asyncpg>=0.30; extra == 'all'
Requires-Dist: motor>=3.0; extra == 'all'
Requires-Dist: redis>=4.2; extra == 'all'
Requires-Dist: sqlalchemy>=2.0; extra == 'all'
Provides-Extra: http
Requires-Dist: aiohttp>=3.9; extra == 'http'
Provides-Extra: kafka
Requires-Dist: aiokafka>=0.9; extra == 'kafka'
Provides-Extra: memcached
Requires-Dist: aiomcache>=0.6; extra == 'memcached'
Provides-Extra: mongo
Requires-Dist: motor>=3.0; extra == 'mongo'
Provides-Extra: mysql
Requires-Dist: aiomysql>=0.2; extra == 'mysql'
Provides-Extra: postgres
Requires-Dist: asyncpg>=0.30; extra == 'postgres'
Provides-Extra: rabbitmq
Requires-Dist: aio-pika>=9.4; extra == 'rabbitmq'
Requires-Dist: aiohttp>=3.9; extra == 'rabbitmq'
Provides-Extra: redis
Requires-Dist: redis>=4.2; extra == 'redis'
Provides-Extra: sqlalchemy
Requires-Dist: sqlalchemy>=2.0; extra == 'sqlalchemy'
Description-Content-Type: text/markdown

# fastapi-watch

Structured health and readiness check system for [FastAPI](https://fastapi.tiangolo.com/).

Add `/health/live`, `/health/ready`, `/health/status`, and `/health/history` endpoints to any FastAPI app with a single registry call. All probes run concurrently, so a slow dependency never blocks the others. Each probe returns rich service-specific details alongside the pass/fail result.

Connect a browser or monitoring tool to the SSE streaming endpoints (`/health/ready/stream`, `/health/status/stream`) and receive live updates as long as you stay connected — the background poll loop starts automatically on the first connection and stops when the last client disconnects.

---

## Table of contents

- [Installation](#installation)
- [Quick start](#quick-start)
- [Endpoints](#endpoints)
- [Probe management](#probe-management)
  - [Adding probes](#adding-probes)
  - [Critical vs non-critical probes](#critical-vs-non-critical-probes)
  - [Per-probe timeout](#per-probe-timeout)
- [Live streaming](#live-streaming)
- [Polling and caching](#polling-and-caching)
- [State-change callbacks](#state-change-callbacks)
- [Startup grace period](#startup-grace-period)
- [Probe result history](#probe-result-history)
- [Response format](#response-format)
- [Writing a custom probe](#writing-a-custom-probe)
- [Built-in probes](#built-in-probes)
  - [Watching PostgreSQL](#watching-postgresql)
  - [Watching MySQL / MariaDB](#watching-mysql--mariadb)
  - [Watching Redis](#watching-redis)
  - [Watching Memcached](#watching-memcached)
  - [Watching RabbitMQ](#watching-rabbitmq)
  - [Watching Kafka](#watching-kafka)
  - [Watching MongoDB](#watching-mongodb)
  - [Watching an HTTP endpoint](#watching-an-http-endpoint)
  - [SQLAlchemy engine probe](#sqlalchemy-engine-probe)
  - [All built-in probes](#all-built-in-probes)
- [Configuration reference](#configuration-reference)
- [Kubernetes integration](#kubernetes-integration)
- [License](#license)

---

## Installation

Install only the extras you actually use. Nothing is pulled in by default beyond FastAPI and Pydantic.

> **zsh users:** wrap the package name in quotes to prevent the shell from interpreting `[` and `]` as glob patterns.

```bash
# Core package — includes the always-passing MemoryProbe, no other deps
pip install fastapi-watch

# Add individual service probes as needed
pip install "fastapi-watch[postgres]"     # PostgreSQL        (asyncpg)
pip install "fastapi-watch[mysql]"        # MySQL / MariaDB   (aiomysql)
pip install "fastapi-watch[sqlalchemy]"   # Any SQLAlchemy 2.x async engine
pip install "fastapi-watch[redis]"        # Redis             (redis)
pip install "fastapi-watch[memcached]"    # Memcached         (aiomcache)
pip install "fastapi-watch[rabbitmq]"     # RabbitMQ          (aio-pika + aiohttp)
pip install "fastapi-watch[kafka]"        # Kafka             (aiokafka)
pip install "fastapi-watch[mongo]"        # MongoDB           (motor)
pip install "fastapi-watch[http]"         # HTTP endpoint     (aiohttp)

# Or pull everything in one shot
pip install "fastapi-watch[all]"
```

Multiple extras can be combined:

```bash
pip install "fastapi-watch[postgres,redis,rabbitmq]"
```

---

## Quick start

Create a `HealthRegistry`, attach it to your FastAPI `app`, and call `.add()` for each service you want to monitor. The registry mounts all health endpoints automatically.

```python
import logging
from fastapi import FastAPI
from fastapi_watch import HealthRegistry
from fastapi_watch.probes import PostgreSQLProbe, RedisProbe

app = FastAPI()

registry = HealthRegistry(
    app,
    poll_interval_ms=60_000,           # re-run probes every 60 s while streaming
    logger=logging.getLogger(__name__), # optional — omit to silence all logging
    grace_period_ms=10_000,            # hold /ready for 10 s while the app warms up
    history_size=20,                   # keep the last 20 results per probe
)

registry.add(PostgreSQLProbe(url="postgresql://user:pass@localhost/mydb"))
registry.add(RedisProbe(url="redis://localhost:6379"), critical=False)
```

That's it. The following routes are now live:

```
GET /health/live          → always 200
GET /health/ready         → 200 / 503
GET /health/status        → 200 / 207
GET /health/ready/stream  → SSE stream
GET /health/status/stream → SSE stream
GET /health/history       → rolling probe history
```

---

## Endpoints

| Endpoint | Purpose | Healthy | Degraded |
|---|---|---|---|
| `GET /health/live` | **Liveness** — is the process alive? | `200 OK` | never fails |
| `GET /health/ready` | **Readiness** — are all critical probes passing? | `200 OK` | `503 Service Unavailable` |
| `GET /health/status` | **Status** — full detail on every probe | `200 OK` | `207 Multi-Status` |
| `GET /health/ready/stream` | **Readiness stream** — SSE; polls while connected | `200 OK` | stream of events |
| `GET /health/status/stream` | **Status stream** — SSE; polls while connected | `200 OK` | stream of events |
| `GET /health/history` | **History** — last N results per probe | `200 OK` | always `200` |

The prefix defaults to `/health` and can be changed at construction time:

```python
registry = HealthRegistry(app, prefix="/ops/health")
# → /ops/health/live
# → /ops/health/ready
# → /ops/health/status
# → /ops/health/ready/stream
# → /ops/health/status/stream
# → /ops/health/history
```

---

## Probe management

### Adding probes

Add probes one at a time with `add()`, or pass a list with `add_probes()`. Both methods return `self` for chaining. Adding the same instance twice is a no-op.

```python
# Single probe
registry.add(probe_a)

# Multiple probes in one call
registry.add_probes([probe_a, probe_b, probe_c])

# Chained
registry.add(probe_a).add(probe_b).add(probe_c)

# Duplicate ignored — probe_a is only registered once
registry.add(probe_a)
registry.add(probe_a)
```

Probes run **concurrently** on every check — a slow or failing probe never delays the others.

### Critical vs non-critical probes

By default every probe is **critical** — a failing critical probe sets the overall `status` to `"unhealthy"` and causes `/health/ready` to return `503`.

Mark a probe as non-critical when its failure should be visible in reports but shouldn't block traffic:

```python
# Database is essential; fail readiness if it's unreachable
registry.add(PostgreSQLProbe(url="postgresql://..."), critical=True)

# Cache is nice-to-have; don't fail readiness if it's down
registry.add(RedisProbe(url="redis://localhost"), critical=False)
```

Non-critical probes always appear in `/health/status` with their real result and a `"critical": false` field. They simply don't affect the overall `status` or `/ready`.

`add_probes()` accepts the same flag, applied to every probe in the list:

```python
registry.add_probes([probe_a, probe_b], critical=False)
```

### Per-probe timeout

Set a `timeout` (in seconds) on any probe class or instance. If the check doesn't complete within that time, the probe is recorded as unhealthy — all other probes are unaffected and still run concurrently.

**On the class:**

```python
class MyServiceProbe(BaseProbe):
    name = "my-service"
    timeout = 5.0  # fail after 5 seconds

    async def check(self) -> ProbeResult:
        ...
```

**On an instance:**

```python
probe = MyServiceProbe()
probe.timeout = 2.0
registry.add(probe)
```

`timeout = None` (the default) means no limit. Timed-out probes produce an unhealthy result with `error: "TimeoutError: "`.

---

## Live streaming

The two streaming endpoints (`/health/ready/stream`, `/health/status/stream`) use [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) (SSE) to push probe results to connected clients.

**The poll loop is demand-driven** — it starts when the first SSE client connects and stops automatically when the last one disconnects. No background work is done when nobody is watching.

Each event is a JSON-encoded health report on a `data:` line:

```
data: {"status": "healthy", "checked_at": "2024-06-01T12:00:00.123456+00:00", "probes": [...]}

data: {"status": "unhealthy", "checked_at": "2024-06-01T12:00:05.456789+00:00", "probes": [...]}
```

### Connecting from JavaScript

```js
const es = new EventSource('/health/status/stream');

es.onmessage = (event) => {
    const report = JSON.parse(event.data);
    console.log(report.status, report.probes);
};

es.onerror = () => es.close();
```

### Connecting with curl

```bash
curl -N http://localhost:8000/health/status/stream
```

### Configuring the poll interval

```python
# Default — poll every 60 seconds while a client is connected
registry = HealthRegistry(app)

# Custom interval
registry = HealthRegistry(app, poll_interval_ms=10_000)  # every 10 s

# Minimum enforced interval is 1000 ms; lower values are clamped
registry = HealthRegistry(app, poll_interval_ms=500)     # → 1000 ms

# Disable polling — streaming endpoints emit one result then close
registry = HealthRegistry(app, poll_interval_ms=None)
```

The interval can also be changed at any point after startup:

```python
registry.set_poll_interval(30_000)   # switch to every 30 s
registry.set_poll_interval(0)        # disable — single-fetch mode
```

---

## Polling and caching

The regular `GET /health/ready` and `GET /health/status` endpoints always respond immediately:

- **When SSE clients are connected** — the poll loop is running, so these endpoints serve the most recent cached probe results without re-running any probes.
- **When no streaming is active** — probes are run on demand. A built-in lock prevents a thundering herd if multiple requests arrive simultaneously before the first result is cached.

This means your GET endpoints are fast under all conditions, regardless of whether anyone is streaming.

---

## State-change callbacks

React to probe status transitions in real time. Register one or more callbacks with `on_state_change()`; each receives the probe name, old status, and new status whenever a probe's result changes.

```python
import logging

logger = logging.getLogger(__name__)

def on_change(probe_name: str, old_status, new_status):
    logger.warning("Probe %s changed: %s → %s", probe_name, old_status, new_status)

registry.on_state_change(on_change)
```

Async callbacks are also supported:

```python
async def alert(probe_name, old_status, new_status):
    await send_slack_alert(f"{probe_name} is now {new_status}")

registry.on_state_change(alert)
```

Key behaviours:

- Callbacks fire after every `run_all()` for each probe whose status differs from the previous run.
- The **first run** seeds the initial state — no callbacks are fired until a subsequent run sees a different result.
- Multiple callbacks can be registered; all are called in registration order.
- `on_state_change()` returns `self` for chaining.

---

## Startup grace period

Pass `grace_period_ms` to hold `/health/ready` in a `503 {"status": "starting"}` state for a fixed window after the registry is created. This prevents a load balancer from routing traffic before the application has had time to warm up — without requiring all probes to pass immediately on boot.

```python
registry = HealthRegistry(
    app,
    grace_period_ms=15_000,  # hold readiness for 15 s after startup
)
```

- `/health/ready` returns `503 {"status": "starting"}` while the grace period is active.
- `/health/status` and `/health/live` are **not** affected — they always reflect real probe results.
- After the grace period expires, `/ready` resumes normal probe-based behaviour.
- `grace_period_ms=0` (default) disables the grace period entirely.

Pair with Kubernetes' `initialDelaySeconds` for belt-and-suspenders protection during slow startup:

```yaml
readinessProbe:
  httpGet:
    path: /health/ready
    port: 8000
  initialDelaySeconds: 5   # k8s waits 5 s before its first check
  periodSeconds: 10
```

```python
# App-side grace covers the remaining warmup window
registry = HealthRegistry(app, grace_period_ms=20_000)
```

---

## Probe result history

Every probe result is stored in a rolling per-probe history. Use `GET /health/history` to inspect past runs — useful for debugging flapping probes or tracking latency over time.

```python
registry = HealthRegistry(
    app,
    history_size=20,  # keep the last 20 results per probe (default: 10)
)
```

**`GET /health/history` — response format:**

```json
{
  "probes": {
    "postgresql": [
      {
        "name": "postgresql",
        "status": "healthy",
        "critical": true,
        "latency_ms": 1.8,
        "error": null,
        "details": { "version": "PostgreSQL 16.2 ...", "active_connections": 5 }
      },
      {
        "name": "postgresql",
        "status": "healthy",
        "critical": true,
        "latency_ms": 2.1,
        "error": null,
        "details": { "version": "PostgreSQL 16.2 ...", "active_connections": 6 }
      }
    ],
    "redis": [
      {
        "name": "redis",
        "status": "unhealthy",
        "critical": false,
        "latency_ms": 5002.0,
        "error": "Connection refused",
        "details": null
      },
      {
        "name": "redis",
        "status": "healthy",
        "critical": false,
        "latency_ms": 0.9,
        "error": null,
        "details": { "version": "7.2.4", "total_keys": 312 }
      }
    ]
  }
}
```

Results are ordered oldest-first. History is in-memory and resets on process restart.

---

## Response format

Every response from `/health/ready`, `/health/status`, and the SSE streams shares the same shape.

### Health report

| Field | Type | Description |
|-------|------|-------------|
| `status` | `"healthy"` \| `"unhealthy"` | Overall result — determined by critical probes only |
| `checked_at` | `string` \| `null` | UTC ISO 8601 timestamp of the last probe run; `null` before the first run |
| `probes` | `array` | Individual probe results (see below) |

### Probe result

| Field | Type | Description |
|-------|------|-------------|
| `name` | `string` | Probe identifier |
| `status` | `"healthy"` \| `"unhealthy"` | Pass/fail for this probe |
| `critical` | `boolean` | `true` if the probe affects overall status and readiness |
| `latency_ms` | `number` | How long the check took in milliseconds |
| `error` | `string` \| `null` | Error message; only present on failure |
| `details` | `object` \| `null` | Service-specific metadata (see each probe's section) |

### Example: all healthy — `200`

```json
{
  "status": "healthy",
  "checked_at": "2024-06-01T12:00:00.123456+00:00",
  "probes": [
    {
      "name": "postgresql",
      "status": "healthy",
      "critical": true,
      "latency_ms": 1.8,
      "error": null,
      "details": {
        "version": "PostgreSQL 16.2 on aarch64-unknown-linux-gnu",
        "active_connections": 5,
        "max_connections": 100,
        "database_size": "42 MB"
      }
    },
    {
      "name": "redis",
      "status": "healthy",
      "critical": false,
      "latency_ms": 0.6,
      "error": null,
      "details": {
        "version": "7.2.4",
        "uptime_seconds": 86400,
        "used_memory_human": "2.50M",
        "connected_clients": 8,
        "total_keys": 312
      }
    }
  ]
}
```

### Example: one critical probe failing — `503` on `/ready`, `207` on `/status`

```json
{
  "status": "unhealthy",
  "checked_at": "2024-06-01T12:00:05.456789+00:00",
  "probes": [
    {
      "name": "postgresql",
      "status": "unhealthy",
      "critical": true,
      "latency_ms": 5002.1,
      "error": "Connection refused",
      "details": null
    },
    {
      "name": "redis",
      "status": "healthy",
      "critical": false,
      "latency_ms": 0.6,
      "error": null,
      "details": { "version": "7.2.4" }
    }
  ]
}
```

### Example: non-critical probe failing — still `200` on `/ready`

```json
{
  "status": "healthy",
  "checked_at": "2024-06-01T12:00:10.000000+00:00",
  "probes": [
    {
      "name": "postgresql",
      "status": "healthy",
      "critical": true,
      "latency_ms": 1.9,
      "error": null,
      "details": { "active_connections": 5 }
    },
    {
      "name": "redis",
      "status": "unhealthy",
      "critical": false,
      "latency_ms": 5001.3,
      "error": "Connection timed out",
      "details": null
    }
  ]
}
```

---

## Writing a custom probe

Any class that extends `BaseProbe` and implements `check()` works as a probe. This is the right approach for internal services, third-party SDKs, business-logic checks, or composite conditions.

### Minimal probe

```python
from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus

class MyServiceProbe(BaseProbe):
    name = "my-service"

    async def check(self) -> ProbeResult:
        ok = await call_my_service()
        return ProbeResult(
            name=self.name,
            status=ProbeStatus.HEALTHY if ok else ProbeStatus.UNHEALTHY,
        )

registry.add(MyServiceProbe())
```

`check()` must be an async method and must return a `ProbeResult`. Any unhandled exception raised by `check()` is caught by the registry, automatically recorded as an unhealthy result, and optionally logged — your probe never needs to worry about crashing the health system.

### Recording latency and details

Use `time.perf_counter()` to measure the check duration and populate `latency_ms`. The `details` dict accepts any JSON-serializable data.

```python
import time
from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus

class PaymentGatewayProbe(BaseProbe):
    name = "payment-gateway"

    async def check(self) -> ProbeResult:
        start = time.perf_counter()
        try:
            info = await ping_payment_gateway()
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.HEALTHY,
                latency_ms=round(latency, 2),
                details={
                    "region": info.region,
                    "provider_version": info.version,
                    "response_ms": round(latency, 2),
                },
            )
        except Exception as exc:
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.UNHEALTHY,
                latency_ms=round(latency, 2),
                error=str(exc),
            )
```

### Configurable probe

Pass configuration through `__init__` so the same probe class can be reused with different settings.

```python
class S3BucketProbe(BaseProbe):
    def __init__(self, bucket: str, region: str = "us-east-1", name: str = "s3") -> None:
        self.bucket = bucket
        self.region = region
        self.name = name

    async def check(self) -> ProbeResult:
        import time
        import aiobotocore.session

        start = time.perf_counter()
        try:
            session = aiobotocore.session.get_session()
            async with session.create_client("s3", region_name=self.region) as client:
                await client.head_bucket(Bucket=self.bucket)
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.HEALTHY,
                latency_ms=round(latency, 2),
                details={"bucket": self.bucket, "region": self.region},
            )
        except Exception as exc:
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.UNHEALTHY,
                latency_ms=round(latency, 2),
                error=str(exc),
            )

# Register multiple buckets as separate probes
registry.add(S3BucketProbe(bucket="my-app-uploads", region="eu-west-1", name="s3-uploads"))
registry.add(S3BucketProbe(bucket="my-app-backups", region="us-east-1", name="s3-backups"))
```

### Adding a timeout

Set the `timeout` attribute (in seconds) on the class or instance. The registry will cancel the check and record it as unhealthy if it runs too long.

```python
class SlowExternalProbe(BaseProbe):
    name = "slow-external"
    timeout = 3.0  # class-level default

    async def check(self) -> ProbeResult:
        result = await call_slow_external_api()
        return ProbeResult(name=self.name, status=ProbeStatus.HEALTHY)

# Override on a specific instance
probe = SlowExternalProbe()
probe.timeout = 1.5
registry.add(probe)
```

### Composite probe

Wrap multiple inner probes to build custom aggregation logic — for example, reporting unhealthy only when both a primary and replica are down simultaneously.

```python
import asyncio
from fastapi_watch.probes import BaseProbe, RedisProbe
from fastapi_watch.models import ProbeResult, ProbeStatus

class RedisHAProbe(BaseProbe):
    name = "redis-ha"

    def __init__(self, primary_url: str, replica_url: str) -> None:
        self._primary = RedisProbe(url=primary_url, name="primary")
        self._replica = RedisProbe(url=replica_url, name="replica")

    async def check(self) -> ProbeResult:
        primary, replica = await asyncio.gather(
            self._primary.check(), self._replica.check()
        )
        if primary.is_healthy or replica.is_healthy:
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.HEALTHY,
                details={
                    "primary": primary.status.value,
                    "replica": replica.status.value,
                },
            )
        return ProbeResult(
            name=self.name,
            status=ProbeStatus.UNHEALTHY,
            error=f"both nodes down — primary: {primary.error}, replica: {replica.error}",
        )

registry.add(RedisHAProbe(
    primary_url="redis://primary.internal:6379",
    replica_url="redis://replica.internal:6379",
))
```

### Exception handling

If `check()` raises an unhandled exception, the registry catches it and returns an unhealthy result automatically — you do **not** need to wrap your entire probe body in a try/except for this purpose. The auto-generated result looks like:

```json
{
  "name": "my-service",
  "status": "unhealthy",
  "critical": true,
  "latency_ms": 0.0,
  "error": "RuntimeError: connection pool exhausted",
  "details": null
}
```

If a `logger` was passed to `HealthRegistry`, the exception is also logged with full traceback via `logger.exception()`.

You should still catch exceptions yourself inside `check()` if you want to record partial details, a meaningful `latency_ms`, or a more specific `error` message.

### Testing a custom probe

Use `pytest-asyncio` to test `check()` directly without needing to spin up an HTTP server.

```python
import pytest
from fastapi_watch.models import ProbeStatus
from myapp.probes import MyServiceProbe

@pytest.mark.asyncio
async def test_healthy_when_service_responds():
    probe = MyServiceProbe()
    result = await probe.check()
    assert result.status == ProbeStatus.HEALTHY
    assert result.name == "my-service"

@pytest.mark.asyncio
async def test_unhealthy_when_service_raises(monkeypatch):
    async def fail():
        raise ConnectionError("refused")

    monkeypatch.setattr("myapp.probes.call_my_service", fail)
    probe = MyServiceProbe()
    result = await probe.check()
    assert result.status == ProbeStatus.UNHEALTHY
    assert "refused" in result.error
```

You can also run the full registry against a real or mock dependency:

```python
from fastapi import FastAPI
from fastapi_watch import HealthRegistry

@pytest.mark.asyncio
async def test_registry_run_all():
    app = FastAPI()
    registry = HealthRegistry(app, poll_interval_ms=None)
    registry.add(MyServiceProbe())

    results = await registry.run_all()
    assert results[0].status == ProbeStatus.HEALTHY
```

### Probe implementation checklist

- `name` must be set — either as a class attribute or in `__init__` via `self.name`.
- `check()` must be `async` and return a `ProbeResult`.
- Set `latency_ms` for probes where response time matters.
- Populate `details` with any data useful for diagnosis.
- Set `timeout` if the underlying call can hang indefinitely.
- Do not call `registry.run_all()` or other registry methods from inside `check()`.

---

## Built-in probes

### Probe details

Every built-in probe populates the `details` field with service-specific metadata. Details are always best-effort — if the metadata query fails after a successful connectivity check, `details` will contain whatever was collected up to that point. The probe status reflects connectivity only, not the completeness of `details`.

---

### Watching PostgreSQL

```bash
pip install "fastapi-watch[postgres]"
```

`PostgreSQLProbe` uses `asyncpg` directly — no SQLAlchemy required. It opens a connection, runs `SELECT version()` and a set of metadata queries concurrently, then closes the connection.

```python
from fastapi_watch.probes import PostgreSQLProbe

registry.add(
    PostgreSQLProbe(
        url="postgresql://app_user:secret@localhost:5432/mydb",
        name="primary-db",  # default: "postgresql"
    )
)
```

**Details returned:**

```json
{
  "version": "PostgreSQL 16.2 on aarch64-unknown-linux-gnu, compiled by gcc 12.2.0",
  "active_connections": 5,
  "max_connections": 100,
  "database_size": "42 MB"
}
```

**Checking a read replica separately:**

```python
registry.add(PostgreSQLProbe(url="postgresql://reader:secret@replica.host/mydb", name="replica-db"))
```

**With a connection timeout** (default 5 seconds):

```python
registry.add(PostgreSQLProbe(url="postgresql://...", timeout=2.0))
```

> If you are already using SQLAlchemy, see [SQLAlchemy engine probe](#sqlalchemy-engine-probe) to reuse your existing engine instead.

---

### Watching MySQL / MariaDB

```bash
pip install "fastapi-watch[mysql]"
```

`MySQLProbe` accepts either a URL or explicit connection kwargs.

```python
from fastapi_watch.probes import MySQLProbe

# URL form
registry.add(MySQLProbe(url="mysql://app_user:secret@localhost:3306/mydb"))

# Keyword form
registry.add(MySQLProbe(host="localhost", port=3306, user="app_user", password="secret", db="mydb"))
```

**Details returned:**

```json
{
  "version": "8.0.36",
  "connected_threads": 4,
  "uptime_seconds": 172800,
  "max_used_connections": 12
}
```

**Constructor arguments:**

| Argument | Default | Description |
|----------|---------|-------------|
| `url` | `None` | Full DSN — overrides all other kwargs when set |
| `host` | `"localhost"` | |
| `port` | `3306` | |
| `user` | `"root"` | |
| `password` | `""` | |
| `db` | `""` | |
| `name` | `"mysql"` | Probe label |
| `connect_timeout` | `5` | Seconds |

---

### Watching Redis

```bash
pip install "fastapi-watch[redis]"
```

`RedisProbe` sends `PING`, then collects server info and scans key prefixes to build a cluster breakdown.

```python
from fastapi_watch.probes import RedisProbe

registry.add(RedisProbe(url="redis://localhost:6379"))
```

**Details returned:**

```json
{
  "version": "7.2.4",
  "uptime_seconds": 86400,
  "used_memory_human": "2.50M",
  "connected_clients": 8,
  "role": "master",
  "total_keys": 312,
  "clusters": {
    "session": { "keys": 150, "ttl_seconds": 3600 },
    "cache":   { "keys": 162, "ttl_seconds": 900  }
  }
}
```

`clusters` groups keys by the segment before the first `:`. For example, a key named `session:abc123` falls into the `session` cluster. `ttl_seconds` is sampled from one key in the group; `null` means the key has no expiry.

**Common URL forms:**

```python
# Password-protected
RedisProbe(url="redis://:mypassword@localhost:6379")

# Specific database index
RedisProbe(url="redis://localhost:6379/2", name="task-queue")

# TLS
RedisProbe(url="rediss://redis.internal:6380")

# Watching Redis as both a cache and a queue
registry.add(RedisProbe(url="redis://localhost:6379/0", name="cache"))
registry.add(RedisProbe(url="redis://localhost:6379/1", name="task-queue"))
```

---

### Watching Memcached

```bash
pip install "fastapi-watch[memcached]"
```

`MemcachedProbe` calls `stats()` to verify the server is reachable and responding.

```python
from fastapi_watch.probes import MemcachedProbe

registry.add(MemcachedProbe(host="localhost", port=11211))
```

**Constructor arguments:**

| Argument | Default | Description |
|----------|---------|-------------|
| `host` | `"localhost"` | |
| `port` | `11211` | |
| `name` | `"memcached"` | Probe label |
| `pool_size` | `1` | aiomcache connection pool size |

---

### Watching RabbitMQ

```bash
pip install "fastapi-watch[rabbitmq]"
```

`RabbitMQProbe` has two modes:

- **Connectivity only** (default) — opens and closes an AMQP connection. No channels or queues are touched.
- **Rich mode** — when `management_url` is set, the probe also calls the RabbitMQ Management HTTP API and returns per-queue stats, message rates, and cluster metadata.

#### Connectivity only

```python
from fastapi_watch.probes import RabbitMQProbe

registry.add(
    RabbitMQProbe(
        url="amqp://guest:guest@localhost:5672/",
        name="rabbitmq",  # default
    )
)
```

**Details returned (connectivity only):**

```json
{ "connected": true }
```

#### Rich mode — with Management API

Pass `management_url` pointing at the RabbitMQ Management plugin (default port `15672`). Credentials are taken from the AMQP URL automatically.

```python
registry.add(
    RabbitMQProbe(
        url="amqp://guest:guest@localhost:5672/",
        management_url="http://localhost:15672",
    )
)
```

**Details returned (rich mode):**

```json
{
  "connected": true,
  "server": {
    "rabbitmq_version": "3.12.0",
    "erlang_version": "26.0",
    "cluster_name": "rabbit@my-node",
    "node": "rabbit@my-node",
    "connections": 4,
    "channels": 8,
    "exchanges": 14,
    "queues": 3,
    "consumers": 6
  },
  "totals": {
    "messages": 142,
    "messages_ready": 140,
    "messages_unacknowledged": 2,
    "publish_rate": 12.5,
    "deliver_rate": 11.8,
    "ack_rate": 11.8
  },
  "queues": {
    "tasks": {
      "state": "running",
      "messages": 120,
      "messages_ready": 118,
      "messages_unacknowledged": 2,
      "consumers": 4,
      "memory_bytes": 32768,
      "publish_rate": 10.0,
      "deliver_rate": 9.5,
      "ack_rate": 9.5,
      "durable": true,
      "auto_delete": false,
      "idle_since": null
    }
  }
}
```

If the Management API is unreachable, a `management_api_error` key is added to `details` and the probe still reports the AMQP connection status.

**Other connection forms:**

```python
# Dedicated monitoring vhost
RabbitMQProbe(url="amqp://monitor:secret@rabbitmq.internal/monitoring", management_url="http://rabbitmq.internal:15672")

# TLS / AMQPS
RabbitMQProbe(url="amqps://user:secret@rabbitmq.internal/", name="rabbitmq-tls")

# Multiple cluster nodes — one probe per node
for i, host in enumerate(["rmq-1.internal", "rmq-2.internal", "rmq-3.internal"], start=1):
    registry.add(RabbitMQProbe(url=f"amqp://guest:guest@{host}/", name=f"rabbitmq-node-{i}"))
```

**Constructor arguments:**

| Argument | Default | Description |
|----------|---------|-------------|
| `url` | `"amqp://guest:guest@localhost/"` | AMQP(S) connection URL |
| `name` | `"rabbitmq"` | Probe label |
| `management_url` | `None` | Base URL of the Management HTTP API. When set, enables rich queue-level details. Credentials are taken from `url`. |

---

### Watching Kafka

```bash
pip install "fastapi-watch[kafka]"
```

`KafkaProbe` starts an `AIOKafkaAdminClient` to verify broker reachability, then lists topics and describes the cluster.

```python
from fastapi_watch.probes import KafkaProbe

# Single broker
registry.add(KafkaProbe(bootstrap_servers="localhost:9092"))

# Multiple brokers
registry.add(KafkaProbe(bootstrap_servers=["b1:9092", "b2:9092", "b3:9092"]))
```

**Details returned:**

```json
{
  "broker_count": 3,
  "controller_id": 1,
  "topics": ["orders", "payments", "notifications"],
  "internal_topics": ["__consumer_offsets"]
}
```

`topics` contains user-defined topics only. `internal_topics` lists Kafka-managed topics (those prefixed with `__`).

**Constructor arguments:**

| Argument | Default | Description |
|----------|---------|-------------|
| `bootstrap_servers` | `"localhost:9092"` | String or list of `host:port` entries |
| `name` | `"kafka"` | Probe label |
| `request_timeout_ms` | `5000` | Admin client metadata request timeout |

---

### Watching MongoDB

```bash
pip install "fastapi-watch[mongo]"
```

`MongoProbe` runs `serverStatus` on the `admin` database to collect version, connection pool stats, memory, and storage engine.

```python
from fastapi_watch.probes import MongoProbe

registry.add(MongoProbe(url="mongodb://localhost:27017"))
```

**Details returned:**

```json
{
  "version": "7.0.5",
  "uptime_seconds": 172800,
  "connections": {
    "current": 12,
    "available": 838,
    "total_created": 150
  },
  "memory_mb": {
    "resident": 128,
    "virtual": 1024
  },
  "storage_engine": "wiredTiger"
}
```

**Constructor arguments:**

| Argument | Default | Description |
|----------|---------|-------------|
| `url` | `"mongodb://localhost:27017"` | MongoDB connection URI |
| `name` | `"mongodb"` | Probe label |
| `server_selection_timeout_ms` | `2000` | How long to wait for a server before giving up |

---

### Watching an HTTP endpoint

```bash
pip install "fastapi-watch[http]"
```

`HttpProbe` performs an HTTP GET and checks the response status code.

```python
from fastapi_watch.probes import HttpProbe

registry.add(HttpProbe(url="https://api.upstream.com/health"))
```

**Details returned:**

```json
{
  "status_code": 200,
  "content_type": "application/json",
  "response_bytes": 43
}
```

`details` is populated for both healthy and unhealthy responses so you can see what status code an upstream actually returned.

**Constructor arguments:**

| Argument | Default | Description |
|----------|---------|-------------|
| `url` | required | URL to GET |
| `timeout` | `5.0` | Request timeout in seconds |
| `name` | URL host | Probe label |
| `expected_status` | `200` | HTTP status code considered healthy |

```python
# Expect a 204 instead of 200
registry.add(HttpProbe(url="https://api.example.com/ping", expected_status=204))

# Shorter timeout, explicit name
registry.add(HttpProbe(url="https://api.payments.com/health", timeout=2.0, name="payments-api"))
```

---

### SQLAlchemy engine probe

```bash
pip install "fastapi-watch[sqlalchemy]"
```

`SqlAlchemyProbe` reuses your existing `AsyncEngine` so no extra connections are opened. Works with any database SQLAlchemy supports (PostgreSQL, MySQL, SQLite, etc.).

```python
from sqlalchemy.ext.asyncio import create_async_engine
from fastapi_watch.probes import SqlAlchemyProbe

engine = create_async_engine("postgresql+asyncpg://app_user:secret@localhost/mydb")

registry.add(SqlAlchemyProbe(engine=engine, name="primary-db"))
```

**Details returned:**

```json
{
  "dialect": "postgresql",
  "driver": "asyncpg",
  "server_version": "16.2.0"
}
```

**Constructor arguments:**

| Argument | Default | Description |
|----------|---------|-------------|
| `engine` | required | A SQLAlchemy 2.x `AsyncEngine` instance |
| `name` | `"database"` | Probe label |

---

### All built-in probes

#### Databases

| Probe | Extra | Key constructor args | Details fields |
|-------|-------|---------------------|----------------|
| `PostgreSQLProbe` | `postgres` | `url`, `name`, `timeout` | `version`, `active_connections`, `max_connections`, `database_size` |
| `MySQLProbe` | `mysql` | `url` or `host`/`port`/`user`/`password`/`db`, `name`, `connect_timeout` | `version`, `connected_threads`, `uptime_seconds`, `max_used_connections` |
| `SqlAlchemyProbe` | `sqlalchemy` | `engine`, `name` | `dialect`, `driver`, `server_version` |

#### Caches

| Probe | Extra | Key constructor args | Details fields |
|-------|-------|---------------------|----------------|
| `RedisProbe` | `redis` | `url`, `name` | `version`, `uptime_seconds`, `used_memory_human`, `connected_clients`, `role`, `total_keys`, `clusters` |
| `MemcachedProbe` | `memcached` | `host`, `port`, `name`, `pool_size` | — |

#### Queues / messaging

| Probe | Extra | Key constructor args | Details fields |
|-------|-------|---------------------|----------------|
| `RabbitMQProbe` | `rabbitmq` | `url`, `name`, `management_url` | `connected`; + `server`, `totals`, `queues` when `management_url` is set |
| `KafkaProbe` | `kafka` | `bootstrap_servers`, `name`, `request_timeout_ms` | `broker_count`, `controller_id`, `topics`, `internal_topics` |

#### Document stores

| Probe | Extra | Key constructor args | Details fields |
|-------|-------|---------------------|----------------|
| `MongoProbe` | `mongo` | `url`, `name`, `server_selection_timeout_ms` | `version`, `uptime_seconds`, `connections`, `memory_mb`, `storage_engine` |

#### HTTP

| Probe | Extra | Key constructor args | Details fields |
|-------|-------|---------------------|----------------|
| `HttpProbe` | `http` | `url`, `timeout`, `name`, `expected_status` | `status_code`, `content_type`, `response_bytes` |

#### Testing / placeholder

| Probe | Extra | Key constructor args | Details fields |
|-------|-------|---------------------|----------------|
| `MemoryProbe` | built-in | `name` | — |

---

## Configuration reference

### `HealthRegistry`

| Argument | Type | Default | Description |
|----------|------|---------|-------------|
| `app` | `FastAPI` | required | The FastAPI application instance |
| `prefix` | `str` | `"/health"` | URL prefix for all health endpoints |
| `tags` | `list[str]` | `["health"]` | OpenAPI tags applied to all health routes |
| `poll_interval_ms` | `int \| None` | `60000` | How often (ms) to re-run probes while an SSE client is connected. `0` or `None` disables polling — each request or stream event runs probes on demand. Values below `1000` are clamped to `1000`. |
| `logger` | `logging.Logger \| None` | `None` | Logger for warnings (e.g. clamped interval) and probe exception messages. Pass `None` to emit no logs. |
| `grace_period_ms` | `int` | `0` | How long (ms) after startup to return `503 {"status": "starting"}` from `/ready`. `0` disables the grace period. |
| `history_size` | `int` | `10` | Number of past probe results to retain per probe. Retrieved via `GET /health/history`. Minimum `1`. |

### `HealthRegistry.add(probe, critical=True)`

Adds a single probe. Returns `self` for chaining. Adding the same instance more than once is a no-op.

`critical=True` (default) — a failing probe causes the overall status to be `"unhealthy"` and `/ready` to return `503`. Set `critical=False` to include the probe in reports without affecting readiness.

### `HealthRegistry.add_probes(probes, critical=True)`

Adds a list of probes. The `critical` flag applies to every probe in the list. Returns `self` for chaining. Duplicate instances are silently skipped.

### `HealthRegistry.on_state_change(callback)`

Registers a callback invoked whenever a probe's status changes between runs. The callback receives `(probe_name: str, old_status: ProbeStatus, new_status: ProbeStatus)` and may be a plain function or an `async` coroutine. Returns `self` for chaining.

### `HealthRegistry.set_poll_interval(ms)`

Updates the poll interval at runtime. Pass `0` or `None` to switch to single-fetch mode. If SSE clients are currently connected the poll task is restarted immediately with the new interval.

```python
registry.set_poll_interval(30_000)   # every 30 s
registry.set_poll_interval(0)        # disable — each event runs probes on demand
```

### `HealthRegistry.run_all()`

Async method — runs every registered probe concurrently and returns `list[ProbeResult]`. Probe exceptions are caught and converted to unhealthy results. Useful for testing or building custom aggregation outside of the mounted routes.

```python
results = await registry.run_all()
for r in results:
    print(r.name, r.status, r.latency_ms, r.details)
```

### `BaseProbe`

| Attribute | Type | Default | Description |
|-----------|------|---------|-------------|
| `name` | `str` | `"unnamed"` | Label used in health reports. Override as a class attribute or set in `__init__`. |
| `timeout` | `float \| None` | `None` | Per-probe timeout in seconds. The check is cancelled and recorded as unhealthy if it exceeds this value. `None` means no limit. |

### `ProbeResult`

| Field | Type | Description |
|-------|------|-------------|
| `name` | `str` | Probe identifier |
| `status` | `ProbeStatus` | `"healthy"` or `"unhealthy"` |
| `critical` | `bool` | `True` if the probe was registered as critical; affects overall status and readiness |
| `latency_ms` | `float` | Duration of the check in milliseconds |
| `error` | `str \| None` | Error message; only present on failure |
| `details` | `dict \| None` | Service-specific metadata |
| `is_healthy` | `bool` (property) | `True` when `status == "healthy"` |

---

## Kubernetes integration

```yaml
livenessProbe:
  httpGet:
    path: /health/live
    port: 8000
  initialDelaySeconds: 5
  periodSeconds: 10

readinessProbe:
  httpGet:
    path: /health/ready
    port: 8000
  initialDelaySeconds: 10
  periodSeconds: 15
  failureThreshold: 3
```

Use `/health/ready` for the readiness probe — Kubernetes stops routing traffic to a pod the moment any critical dependency becomes unreachable. Use `/health/live` for liveness so the process is only restarted when it is genuinely stuck, not because an external service is temporarily down.

For applications that need time to warm up (loading models, seeding caches, running migrations), combine `grace_period_ms` with a short `initialDelaySeconds`:

```yaml
readinessProbe:
  httpGet:
    path: /health/ready
    port: 8000
  initialDelaySeconds: 5
  periodSeconds: 10
  failureThreshold: 6   # allow up to 60 s of failures before marking unready
```

```python
# App holds /ready as "starting" for 30 s regardless of probe results
registry = HealthRegistry(app, grace_period_ms=30_000)
```

---

## License

MIT
