Metadata-Version: 2.4
Name: service-bridge
Version: 1.1.0.dev28
Summary: ServiceBridge SDK for Python — RPC, events, HTTP middleware
Author-email: Eugene Surkov <esurkovv@yandex.ru>
License: Free for non-commercial use. Commercial use requires a separate paid license. Contact esurkovv@yandex.ru or @esurkov1 on Telegram.
Project-URL: Source, https://github.com/service-bridge/sdk
Keywords: servicebridge,service-bridge,microservices,rpc,grpc,event-bus,event-driven,distributed-tracing,workflow,orchestration,background-jobs,cron,mtls,service-mesh,service-discovery,distributed-systems,zero-sidecar,proxyless,istio-alternative,rabbitmq-alternative,temporal-alternative,jaeger-alternative,postgresql,docker,kubernetes,dead-letter-queue,dlq,saga,distributed-transactions,ai-agent-orchestration,fastapi,flask,http-middleware,observability,prometheus,tracing,async,asyncio,production-ready,self-hosted
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: Free To Use But Restricted
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: grpcio>=1.60.0
Requires-Dist: grpcio-tools>=1.60.0
Requires-Dist: protobuf>=4.25.0
Provides-Extra: mtls
Requires-Dist: cryptography>=40.0; extra == "mtls"
Provides-Extra: fastapi
Requires-Dist: fastapi>=0.110.0; extra == "fastapi"
Requires-Dist: starlette>=0.36.0; extra == "fastapi"
Provides-Extra: flask
Requires-Dist: flask>=3.0.0; extra == "flask"
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.23; extra == "dev"
Requires-Dist: respx>=0.20; extra == "dev"
Requires-Dist: httpx>=0.27; extra == "dev"
Requires-Dist: cryptography>=40.0; extra == "dev"
Requires-Dist: fastapi>=0.110.0; extra == "dev"
Requires-Dist: starlette>=0.36.0; extra == "dev"
Requires-Dist: flask>=3.0.0; extra == "dev"

<!-- keywords: service-bridge servicebridge pip install service-bridge Python microservices RPC gRPC event-bus event-driven distributed-tracing workflow orchestration background-jobs cron mTLS service-mesh service-discovery distributed-systems zero-sidecar Istio-alternative RabbitMQ-alternative Temporal-alternative Jaeger-alternative PostgreSQL Docker Kubernetes DLQ dead-letter-queue saga distributed-transactions AI-agent-orchestration FastAPI Flask HTTP-middleware observability Prometheus tracing service-catalog async-messaging durable-events retries idempotency auto-mTLS runtime-dashboard production-ready asyncio aiohttp -->

# service-bridge

[![PyPI version](https://img.shields.io/pypi/v/service-bridge?color=3775A9&logo=pypi&logoColor=white)](https://pypi.org/project/service-bridge/)
[![License](https://img.shields.io/badge/License-Free%20%2F%20Commercial-blue)](../LICENSE)
[![Python](https://img.shields.io/badge/Python-3.10%2B-3776AB?logo=python&logoColor=white)](https://www.python.org/)

**The Unified Bridge for Microservices Interaction**

Python SDK for [ServiceBridge](https://servicebridge.dev) — production-ready RPC, durable events, workflows, jobs, and distributed tracing in a single SDK. One Go runtime and PostgreSQL.

```
┌─────────────────────────────────────────────────────────────────┐
│                    BEFORE: 10 moving parts                      │
│  Istio · Envoy · RabbitMQ · Temporal · Jaeger · Consul ·       │
│  cert-manager · Alertmanager · cron · custom glue              │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│               AFTER: ServiceBridge + PostgreSQL                 │
│  RPC · Events · Workflows · Jobs · Tracing · mTLS · Dashboard  │
│            One SDK  ·  One runtime  ·  Zero sidecars            │
└─────────────────────────────────────────────────────────────────┘
```

## Table of Contents

- [Why ServiceBridge](#why-servicebridge)
- [Use Cases](#use-cases)
- [Quick Start](#quick-start)
- [Runtime Setup](#runtime-setup)
- [End-to-End Example](#end-to-end-example)
- [Platform Features](#platform-features)
- [How It Compares](#how-it-compares)
- [API Reference](#api-reference)
- [HTTP Plugins](#http-plugins)
- [Configuration](#configuration)
- [Environment Variables](#environment-variables)
- [Error Handling](#error-handling)
- [When to Use / When Not to Use](#when-to-use--when-not-to-use)
- [FAQ](#faq)
- [Community and Support](#community-and-support)
- [License](#license)

---

## Why ServiceBridge

| Problem | Without ServiceBridge | With ServiceBridge |
|---|---|---|
| Service-to-service calls | Istio/Envoy sidecar proxy per pod | **Direct SDK-to-worker gRPC, zero proxy hops** |
| Async messaging | Kafka/RabbitMQ + retry logic + DLQ setup | **Built-in durable events with retry, DLQ, replay** |
| Background jobs | Celery + Redis + cron daemon | **Built-in cron and delayed jobs** |
| Workflow orchestration | Temporal/Conductor cluster + persistence | **Built-in DAG workflows** |
| Distributed tracing | Jaeger/Tempo + OTEL collector + dashboards | **Built-in traces + realtime UI** |
| Service discovery | Consul/etcd + DNS glue | **Built-in registry + health-aware balancing** |
| mTLS | cert-manager + Vault PKI | **Auto-provisioned certs from service key** |

**Result**: `10 tools → 1 runtime`. One Go binary + PostgreSQL replaces the entire stack.

---

## Use Cases

**Microservice communication** — Replace sidecar mesh with direct RPC calls. Get sub-millisecond overhead instead of double proxy hop latency.

**Event-driven architecture** — Publish durable events with fan-out, retries, DLQ, idempotency, and server-side filtering. No broker infrastructure to manage.

**Background job scheduling** — Cron jobs, delayed execution, and job-triggered workflows in a single API. No Redis, no Celery, no separate queue workers.

**Saga / distributed transactions** — DAG workflows with typed steps (`rpc`, `event`, `event_wait`, `sleep`, child workflow). Compensations and rollbacks via workflow step dependencies.

**AI agent orchestration** — Stream LLM tokens via realtime run streams with replay. Orchestrate multi-step AI pipelines as workflows.

**Full-stack observability** — Every RPC call, event delivery, workflow step, and HTTP request traced automatically. One timeline, one dashboard. Prometheus metrics and Loki-compatible log API included.

---

## Quick Start

### 1. Install

```bash
pip install service-bridge
```

For HTTP middleware:

```bash
pip install service-bridge[fastapi]
# or
pip install service-bridge[flask]
```

### 2. Create a worker (service that handles calls)

```python
import asyncio
from service_bridge import ServiceBridge

sb = ServiceBridge("localhost:14445", "your-service-key")

@sb.handle_rpc("charge")
async def charge(payload: dict) -> dict:
    return {"ok": True, "tx_id": f"tx_{int(asyncio.get_event_loop().time())}"}

asyncio.run(sb.serve())
```

### 3. Call it from another service

```python
import asyncio
from service_bridge import ServiceBridge

sb = ServiceBridge("localhost:14445", "your-service-key")

async def main():
    result = await sb.rpc("payments/charge", {
        "order_id": "ord_42",
        "amount": 4990,
    })
    print(result["tx_id"])

asyncio.run(main())
```

That's it. No broker, no sidecar, no proxy — direct gRPC call between services.

---

## Runtime Setup

The SDK connects to a ServiceBridge runtime. The fastest way to start:

```bash
bash <(curl -fsSL https://servicebridge.dev/install.sh)
```

This installs ServiceBridge + PostgreSQL via Docker Compose and generates an admin password automatically. After install, the dashboard is at `http://localhost:14444` and the gRPC control plane at `localhost:14445`.

For manual Docker Compose setup, configuration reference, and all runtime environment variables, see the **[Runtime Setup](../README.md#runtime-setup)** section in the main SDK README.

---

## End-to-End Example

A complete order flow: RPC → Event → Event handler with streaming.

```python
import asyncio
from service_bridge import ServiceBridge, EventContext

# --- Payments service (worker) ---

payments = ServiceBridge("localhost:14445", "key")

@payments.handle_rpc("charge")
async def charge(payload: dict, ctx) -> dict:
    await ctx.stream.write({"status": "charging", "order_id": payload["order_id"]}, "progress")
    # ... charge logic ...
    await ctx.stream.write({"status": "charged"}, "progress")
    return {"ok": True, "tx_id": f"tx_{int(asyncio.get_event_loop().time())}"}

asyncio.run(payments.serve())
```

```python
# --- Orders service (caller + event publisher) ---

orders = ServiceBridge("localhost:14445", "key")

async def process_order():
    charge = await orders.rpc("payments/charge", {
        "order_id": "ord_42",
        "amount": 4990,
    })

    await orders.event("orders.completed", {
        "order_id": "ord_42",
        "tx_id": charge["tx_id"],
    }, idempotency_key="order:ord_42:completed", headers={"source": "checkout"})

asyncio.run(process_order())
```

```python
# --- Notifications service (event consumer) ---

notifications = ServiceBridge("localhost:14445", "key")

@notifications.handle_event("orders.*", group_name="notifications.orders")
async def on_order(payload: dict, ctx: EventContext) -> None:
    if not payload.get("order_id"):
        ctx.reject("missing_order_id")
        return
    await ctx.stream.write({"status": "sending_email"}, "progress")
    # ... send email ...

asyncio.run(notifications.serve())
```

```python
# --- Orchestrate as a workflow ---

from service_bridge import WorkflowStep

await orders.workflow("order.fulfillment", [
    WorkflowStep(id="reserve",  type="rpc",        ref="inventory/reserve"),
    WorkflowStep(id="charge",   type="rpc",        ref="payments/charge",     deps=["reserve"]),
    WorkflowStep(id="wait_dlv", type="event_wait", ref="shipping.delivered",  deps=["charge"]),
    WorkflowStep(id="notify",   type="event",      ref="orders.fulfilled",    deps=["wait_dlv"]),
])
```

Every step above — RPC, event publish, event delivery, workflow execution — appears in a single trace timeline in the built-in dashboard.

---

## Platform Features

### Communication
- **Direct RPC** — zero-hop gRPC calls with retries, deadlines, and mTLS identity
- **Durable Events** — fan-out delivery, at-least-once guarantees, retries, DLQ, replay, idempotency
- **Realtime Streams** — live chunks with replay for AI/progress/log streaming
- **Service Discovery** — automatic endpoint resolution and round-robin balancing
- **HTTP Middleware** — FastAPI and Flask instrumentation with automatic trace propagation

### Orchestration
- **Workflows** — DAG steps: `rpc`, `event`, `event_wait`, `sleep`, child workflow
- **Jobs** — cron, delayed, and workflow-triggered scheduling

### Security
- **TLS by default** — control plane TLS + worker mTLS with gRPC certificate provisioning
- **Access Policy** — service-level caller/target restrictions and RBAC

### Observability
- **Unified Tracing** — single trace timeline across HTTP, RPC, events, workflows, and jobs
- **Metrics** — Prometheus-compatible `/metrics` endpoint (30+ metric families)
- **Logs** — structured log ingest with Loki-compatible query API; auto-captures Python `logging` module
- **Alerts** — runtime alerts for delivery failures, errors, and service health
- **Dashboard** — realtime web UI for runs, events, workflows, jobs, DLQ, service map, and service keys

---

## How It Compares

| Concern | Istio + Envoy | Dapr | Temporal + Kafka | ServiceBridge |
|---|---|---|---|---|
| RPC data path | Sidecar proxy hop | Sidecar/daemon hop | N/A | **Direct (proxyless)** |
| Service discovery | K8s control plane | Sidecar placement | External registry | **Built-in registry** |
| Durable events + DLQ | External broker | Pub/Sub component | Kafka + consumers | **Built-in** |
| Workflow orchestration | External engine | External engine | Built-in | **Built-in** |
| Job scheduling | External cron/queue | External scheduler | External scheduler | **Built-in** |
| Traces + UI | Jaeger/Tempo + dashboards | OTEL backend + dashboards | Temporal UI | **Built-in** |
| Logs for Grafana | Loki + Promtail pipeline | Log pipeline | Log pipeline | **Built-in Loki API** |
| Metrics | App/exporter setup | App/exporter setup | Multiple exporters | **Built-in `/metrics`** |
| Security model | Mesh PKI + policy | Deployment-dependent mTLS | Mixed | **Service keys + auto mTLS** |
| Operational footprint | Multi-component mesh | Runtime + sidecars | Workflow + broker + DB | **One binary + PostgreSQL** |

---

## API Reference

### Cross-SDK parity notes

ServiceBridge keeps the core API surface aligned across Node.js, Go, and Python:
constructor, RPC, events, jobs, workflows, `run_workflow`, streams, serve/stop, and typed errors.

Constructor-level defaults for timeout, retries, and retry delay are available
across all three SDKs. Differences are naming and language idioms only:

- Python uses decorators for handler registration (`@handle_rpc`, `@handle_event`)
- Handler hints are advisory in all SDKs (`timeout_ms`/`retryable`/`concurrency` for RPC, `concurrency`/`prefetch` for events)
- Shared serve controls: `max_in_flight` / `MaxInFlight` / `maxInFlight`
- Shared serve identity/TLS controls: `instance_id`/`InstanceID`/`instanceId`, `weight`/`Weight`, `tls`/`TLS`
- Python has no public `get_trace_context()` helper; pass `trace_id` explicitly or use middleware state

### `ServiceBridge(grpc_url, service_key, opts?)`

```python
import os
from service_bridge import ServiceBridge, Options

sb = ServiceBridge(
    grpc_url="localhost:14445",
    service_key="sbv2.<id>.<secret>.<ca>",
    opts=Options(
        heartbeat_interval_ms=10_000,
        capture_logs=True,
        queue_max_size=1000,
        queue_overflow="drop-oldest",
        discovery_refresh_ms=10_000,
        timeout_ms=30_000,
        retries=3,
        retry_delay_ms=300,
    ),
)
```

Service identity is resolved by the runtime from `service_key`.

`Options`:

| Option | Type | Default | Description |
|---|---|---|---|
| `heartbeat_interval_ms` | `int` | `10000` | Base heartbeat period for worker registrations. |
| `capture_logs` | `bool` | `True` | Auto-attach a logging handler that forwards logs to ServiceBridge. |
| `queue_max_size` | `int` | `1000` | Max offline queue size for control-plane writes. |
| `queue_overflow` | `str` | `"drop-oldest"` | Overflow strategy: `"drop-oldest"`, `"drop-newest"`, `"error"`. |
| `discovery_refresh_ms` | `int` | `10000` | Discovery refresh period for endpoint updates. |
| `timeout_ms` | `int` | `30000` | Global default hard timeout per RPC attempt in ms. Per-call `timeout_ms` overrides. |
| `retries` | `int` | `3` | Global default retry count. Per-call `retries` overrides. |
| `retry_delay_ms` | `int` | `300` | Base retry delay in ms. Exponential backoff: `delay * 2^(attempt-1)`. |

### Advanced TLS overrides

| Option | Type | Default | Description |
|---|---|---|---|
| `ca_cert` | `str` | from `service_key` | Optional control-plane CA override. By default SDK reads CA from sbv2 service key. |
| `worker_tls` | `WorkerTLSOpts \| None` | `None` | Global explicit worker mTLS materials (`cert` + `key` required together, `ca_cert` optional). |

`WorkerTLSOpts`:

```python
from service_bridge import WorkerTLSOpts

WorkerTLSOpts(
    ca_cert="-----BEGIN CERTIFICATE-----...",
    cert="-----BEGIN CERTIFICATE-----...",
    key="-----BEGIN PRIVATE KEY-----...",
    server_name="workers.internal",
)
```

---

### `rpc(fn, payload?, *, retries?, timeout_ms?, retry_delay_ms?, trace_id?)`

```python
result = await sb.rpc(
    "payments/charge",
    {"order_id": "ord_42", "amount": 4990},
    retries=3,
    timeout_ms=5000,
    retry_delay_ms=500,
)
```

| Parameter | Type | Default | Description |
|---|---|---|---|
| `fn` | `str` | required | Function name (e.g. `"payments/charge"`). |
| `payload` | `Any` | `None` | JSON-serialisable payload. |
| `retries` | `int` | from `Options` (`3`) | Retry count (0 = no retry). |
| `timeout_ms` | `int` | from `Options` (`30000`) | Hard timeout per attempt in ms. |
| `retry_delay_ms` | `int` | from `Options` (`300`) | Base retry delay in ms. Exponential backoff: `delay * 2^(attempt-1)`. |
| `trace_id` | `str` | auto | Override trace ID. |

`rpc()` is bounded even when a downstream worker is silent:
each attempt has a hard timeout, retries are finite (`retries + 1` total attempts),
and after the last failed attempt the root RPC span is closed with `error`.

---

### `event(topic, payload?, *, idempotency_key?, trace_id?, headers?)`

```python
message_id = await sb.event(
    "orders.created",
    {"order_id": "ord_42"},
    idempotency_key="order:ord_42",
    headers={"source": "checkout"},
)
```

| Parameter | Type | Default | Description |
|---|---|---|---|
| `topic` | `str` | required | Dot-separated event topic. |
| `payload` | `Any` | `None` | JSON-serialisable payload. |
| `idempotency_key` | `str` | `""` | Dedup key for safe publishing. |
| `trace_id` | `str` | auto | Override trace ID. |
| `headers` | `dict[str, str]` | `None` | Custom metadata headers. |

Returns `message_id` (empty string when buffered offline).

---

### `job(target, opts?)`

```python
from service_bridge import ScheduleOpts

job_id = await sb.job("billing/collect", ScheduleOpts(
    cron="0 * * * *",
    timezone="UTC",
    via="rpc",
))
```

`ScheduleOpts`:

| Field | Type | Default | Description |
|---|---|---|---|
| `cron` | `str` | `""` | Cron expression. |
| `delay_ms` | `int` | `0` | One-shot delay in ms. |
| `timezone` | `str` | `"UTC"` | IANA timezone. |
| `misfire` | `str` | `"fire_now"` | `"fire_now"` or `"skip"`. |
| `via` | `str` | `"rpc"` | `"rpc"`, `"event"`, or `"workflow"`. |
| `retry_policy_json` | `str` | `""` | Retry policy JSON. |

---

### `workflow(name, steps, opts?)`

```python
await sb.workflow(name: str, steps: list[WorkflowStep], opts: WorkflowOpts | None = None) -> None
```

Registers (or updates) a workflow definition. If `opts` is `None`, defaults are used.

```python
from service_bridge import WorkflowStep

await sb.workflow("order.fulfillment", [
    WorkflowStep(id="reserve", type="rpc", ref="inventory/reserve"),
    WorkflowStep(id="charge", type="rpc", ref="payments/charge", deps=["reserve"]),
    WorkflowStep(id="wait_5m", type="sleep", duration_ms=300_000, deps=["charge"]),
    WorkflowStep(id="notify", type="event", ref="orders.fulfilled", deps=["wait_5m"]),
])
```

With explicit limits:

```python
from service_bridge import ServiceBridge, WorkflowOpts, WorkflowStep

await sb.workflow("checkout.flow", steps, WorkflowOpts(step_timeout_ms=60_000))
```

`WorkflowOpts`:

```python
@dataclass
class WorkflowOpts:
    state_limit_bytes: int = 0  # default 262144 (256 KB) when zero
    step_timeout_ms: int = 0    # default 30000 (30 s) when zero
```

| Field | Type | Default | Description |
|---|---|---|---|
| `state_limit_bytes` | `int` | `262144` (256 KB) | Maximum serialized state size in bytes. `0` uses the default. |
| `step_timeout_ms` | `int` | `30000` (30 s) | Default per-step timeout in milliseconds. `0` uses the default. |

`WorkflowStep`:

| Field | Type | Description |
|---|---|---|
| `id` | `str` | Unique step identifier in the DAG. |
| `type` | `str` | `"rpc"`, `"event"`, `"event_wait"`, `"sleep"`, `"workflow"`. |
| `ref` | `str` | Required for `rpc`, `event`, `event_wait`, `workflow`. |
| `deps` | `list[str]` | Dependencies. Empty/omitted means root step. |
| `if_expr` | `str` | Optional filter expression (step is skipped if false). |
| `timeout_ms` | `int` | Optional timeout for `rpc` and `event_wait` steps. |
| `duration_ms` | `int` | Required for `sleep` steps. |

---

### `run_workflow(name, input?)`

```python
async def run_workflow(name: str, input: Any = None) -> dict[str, str]
```

Starts a workflow run on demand. The workflow must be registered first via `workflow()`.
An alternative to scheduling via `job(target, ScheduleOpts(via="workflow"))` — triggers the run immediately.

| Parameter | Type | Default | Description |
|---|---|---|---|
| `name` | `str` | required | Name of a previously registered workflow. |
| `input` | `Any` | `None` | Optional JSON-serializable input payload. |

Returns a `dict` with `runId` and `traceId` keys. Use `traceId` with `watch_run()` to observe execution in real time.

```python
result = await sb.run_workflow("user.onboarding", {"userId": "u_123"})
```

---

### `cancel_workflow_run(run_id)`

```python
await sb.cancel_workflow_run("run_01HQ...XYZ")
```

---

### `@handle_rpc(fn, *, allowed_callers?, schema?, timeout_ms?, retryable?, concurrency?)`

Decorator that registers an RPC handler.

```python
@sb.handle_rpc("charge", allowed_callers=["orders"])
async def charge(payload: dict) -> dict:
    return {"ok": True}

# With full context (stream + trace):
@sb.handle_rpc("ai/generate")
async def generate(payload: dict, ctx) -> dict:
    await ctx.stream.write({"token": "Hello"}, "output")
    await ctx.stream.write({"token": " world"}, "output")
    return {"text": "Hello world"}
```

`timeout_ms`, `retryable`, and `concurrency` are advisory runtime hints (not locally hard-enforced by SDK).

---

### `@handle_event(topic, group_name?, retry_policy_json?, filter_expr?, concurrency?, prefetch?)`

Decorator that registers an event consumer handler.

```python
@sb.handle_event("orders.*", group_name="payments.orders")
async def on_order(payload: dict, ctx: EventContext) -> None:
    if not payload.get("order_id"):
        ctx.reject("missing_order_id")
        return
    await ctx.stream.write({"status": "processing"}, "progress")
```

`concurrency` and `prefetch` are advisory runtime hints (not locally hard-enforced by SDK).

`EventContext` helpers:

- `ctx.retry(delay_ms=1000)` — request redelivery with delay
- `ctx.reject(reason)` — reject permanently (moves to DLQ)
- `ctx.topic`, `ctx.group_name`, `ctx.message_id`, `ctx.attempt`, `ctx.headers` — delivery metadata
- `ctx.stream.write(data, key)` — append real-time chunks to run stream

Duplicate `group_name` registration raises `ValueError`.

---

### `serve(*, host?, max_in_flight?, instance_id?, weight?, tls?)`

```python
from service_bridge import WorkerTLSOpts

await sb.serve(
    host="localhost",
    max_in_flight=256,
    instance_id="orders-a1",
    weight=10,
    tls=WorkerTLSOpts(cert=CERT_PEM, key=KEY_PEM, ca_cert=CA_PEM),
)
# or
asyncio.run(sb.serve())
```

Blocks until cancelled (for example, by process signal handling).
Initial `OpenWorkerSession` failure fails startup immediately; after startup, reconnect uses exponential backoff (`1s` → `15s`).

| Parameter | Type | Default | Description |
|---|---|---|---|
| `host` | `str` | `"localhost"` | Bind host for the worker gRPC server. Use `0.0.0.0` in Docker/Kubernetes so ServiceBridge can reach the worker. |
| `max_in_flight` | `int` | `128` | Max in-flight runtime-originated commands over `OpenWorkerSession`. |
| `instance_id` | `str` | `""` | Stable worker instance identifier override. |
| `weight` | `int` | `1` | Scheduling/discovery weight hint. Values `<=0` normalize to `1`. |
| `tls` | `WorkerTLSOpts \| None` | `None` | Per-serve worker TLS override (takes precedence over `Options.worker_tls`). |

---

### `stop()`

```python
await sb.stop()
```

Gracefully shuts down: flushes logs, drains offline queue, closes gRPC channels.

---

### `watch_run(run_id, opts?)`

```python
from service_bridge import WatchRunOpts

async for event in sb.watch_run(run_id, WatchRunOpts(key="output", from_sequence=0)):
    print(event.data)
    if event.done:
        break
```

`run_id` is the stream identifier used by `ctx.stream.write(...)` (typically a trace ID).

Behavior:

- Auto-reconnect with exponential backoff (`500ms` → `5000ms`) on retryable stream failures.
- Deduplicates by `sequence` across reconnects.
- Enforces strict JSON for `type="chunk"` payloads (non-JSON chunk terminates stream with fatal error).
- Enforces internal queue limit `256`; overflow is fatal (consumer must drain promptly).

`WatchRunOpts`:

| Field | Type | Default | Description |
|---|---|---|---|
| `key` | `str` | `""` | Stream key filter (`""` = all keys). |
| `from_sequence` | `int` | `0` | Replay from sequence cursor. |

`RunStreamEvent`:

| Field | Type | Description |
|---|---|---|
| `sequence` | `int` | Monotonic sequence number. |
| `key` | `str` | Stream lane key. |
| `data` | `Any` | JSON-decoded chunk payload. |
| `done` | `bool` | True when terminal `run_complete` arrives. |
| `run_status` | `str` | Final status on terminal event. |

---

### HTTP Span Utilities

#### `start_http_span(method, path, trace_id?, parent_span_id?)`

```python
span = sb.start_http_span("GET", "/health")
try:
    # handler logic
    span.end(status_code=200)
except Exception as e:
    span.end(error=str(e))
```

#### `register_http_endpoint(method, route, ...)`

```python
sb.register_http_endpoint(
    "GET", "/users/{id}",
    request_schema_json='{"type":"object"}',
    transport="http",
)
```

| Parameter | Type | Default | Description |
|---|---|---|---|
| `method` | `str` | required | HTTP method. |
| `route` | `str` | required | Route pattern, e.g. `"/users/{id}"`. |
| `instance_id` | `str` | `""` | Stable identifier for this process. |
| `endpoint` | `str` | `""` | Reachable address, e.g. `"http://10.0.0.1:8000"`. |
| `allowed_callers` | `list[str]` | `None` | Service names allowed to call (RBAC). |
| `request_schema_json` | `str` | `""` | JSON schema for request validation metadata. |
| `response_schema_json` | `str` | `""` | JSON schema for response validation metadata. |
| `transport` | `str` | `""` | Transport label (e.g. `"http"`, `"https"`). |

A periodic heartbeat is automatically sent to keep the HTTP endpoint alive in the registry.

---

## HTTP Plugins

### FastAPI (`service_bridge.http.fastapi`)

```bash
pip install service-bridge[fastapi]
```

```python
from fastapi import FastAPI, Request
from service_bridge import ServiceBridge
from service_bridge.http.fastapi import ServiceBridgeMiddleware, get_client

sb = ServiceBridge("localhost:14445", "key")
app = FastAPI()

app.add_middleware(ServiceBridgeMiddleware, client=sb, exclude_paths=["/health"], auto_register=True)

@app.get("/users/{user_id}")
async def get_user(user_id: str, request: Request):
    client = get_client(request)
    user = await client.rpc("users/get", {"id": user_id})
    return user
```

The middleware:
- Injects the SDK client into `request.state.servicebridge_client`
- Starts/ends HTTP span automatically
- Sets `x-trace-id` response header
- Auto-registers route patterns in the catalog

---

### Flask (`service_bridge.http.flask`)

```bash
pip install service-bridge[flask]
```

```python
from flask import Flask, g
from service_bridge import ServiceBridge
from service_bridge.http.flask import init_servicebridge

sb = ServiceBridge("localhost:14445", "key")
app = Flask(__name__)
init_servicebridge(app, sb, auto_register=True)

@app.get("/users/<user_id>")
def get_user(user_id):
    # g.service_bridge, g.trace_id, g.span_id available
    return {"id": user_id}
```

The middleware:
- Injects the SDK client into `g.service_bridge`
- Provides `g.trace_id` and `g.span_id` in handlers
- Starts/ends HTTP span automatically
- Sets `x-trace-id` response header

---

## Configuration

### TLS behavior

- Control plane is TLS-only. Trust source is embedded into sbv2 service key by default.
- Embedded/explicit CA PEM is validated with strict x509 parsing.
- Worker serving is always mTLS.
- Worker certificate provisioning is gRPC-only via `ProvisionWorkerCertificate`.
- mTLS provisioning requires the `cryptography` package: `pip install service-bridge[mtls]`

### Offline queue behavior

When the control plane is unavailable, SDK queues write operations (`event`, `job`, `workflow`, telemetry writes).

- Queue size: `queue_max_size` (default: 1000)
- Overflow policy: `queue_overflow` (default: `"drop-oldest"`)
- Return values for queued writes may be empty strings until flushed

### Log capture

By default (`capture_logs=True`), a `ServiceBridgeLogHandler` is attached to the root Python logger. All `logging.info()`, `logging.error()`, etc. calls are automatically shipped to ServiceBridge with batching (100 records or 500ms flush interval).

---

## Environment Variables

The SDK requires values you pass into `ServiceBridge(...)`. Common setup:

| Variable | Required | Example | Description |
|---|---|---|---|
| `SERVICEBRIDGE_URL` | yes | `localhost:14445` | gRPC control plane URL |
| `SERVICEBRIDGE_SERVICE_KEY` | yes | `sbv2.<id>.<secret>.<ca>` | Service authentication key (sbv2 only) |

```python
import os
from service_bridge import ServiceBridge, Options

sb = ServiceBridge(
    os.environ.get("SERVICEBRIDGE_URL", "localhost:14445"),
    os.environ["SERVICEBRIDGE_SERVICE_KEY"],
)
```

---

## Error Handling

`ServiceBridgeError` is exported for normalized SDK and runtime errors.

```python
from service_bridge import ServiceBridge, ServiceBridgeError

try:
    await sb.rpc("payments/charge", {"order_id": "ord_1"})
except ServiceBridgeError as e:
    print(e.component, e.operation, e.severity, e.retryable, e.code)
    raise
```

| Field | Type | Description |
|---|---|---|
| `component` | `str` | SDK subsystem (for example, `"rpc"` or `"event"`). |
| `operation` | `str` | Operation that failed. |
| `severity` | `str` | `"fatal"`, `"retriable"`, or `"ignorable"`. |
| `retryable` | `bool` | Whether retry is recommended. |
| `code` | `str \| None` | gRPC status code string (when provided). |

---

## When to Use / When Not to Use

### ServiceBridge is a good fit when you:

- Have **3+ microservices** that need to communicate via RPC, events, or both
- Want **RPC + events + workflows + jobs** without managing separate infrastructure for each
- Need **end-to-end tracing** across all communication patterns in one timeline
- Want to **eliminate sidecar proxies** and reduce operational overhead
- Need **durable event delivery** with retry, DLQ, and replay without running a broker
- Are building **AI/LLM pipelines** and need realtime streaming with replay

### Consider alternatives when you:

- Run a **single monolith** with no service decomposition plans
- Need **ultra-high-throughput event streaming** (100K+ msg/s sustained) — Kafka is purpose-built for this
- Need a **full API gateway** with rate limiting, auth plugins, and request transformation — use Kong/Envoy Gateway
- Already have a **mature Istio/Linkerd mesh** and only need traffic management (no events/workflows/jobs)
- Need **multi-region event replication** — ServiceBridge currently targets single-region deployments

---

## FAQ

**How does ServiceBridge handle service failures?**
RPC calls have configurable retries with exponential backoff and hard per-attempt timeouts, so a silent downstream service cannot keep a call pending forever. Events are durable (PostgreSQL-backed) with at-least-once delivery per consumer group. Failed deliveries are retried according to policy, then moved to DLQ. Workflows track step state and can be resumed.

**Is there vendor lock-in?**
ServiceBridge is self-hosted. The runtime is a single Go binary + PostgreSQL. SDK calls map to standard patterns (RPC, pub/sub, cron) — migrating away means replacing SDK calls with equivalent library calls.

**How does tracing work without an OTEL collector?**
The SDK automatically reports trace spans for every RPC call, event publish/delivery, workflow step, and HTTP request. The runtime stores traces in PostgreSQL and serves them via the built-in dashboard and a Loki-compatible API for Grafana integration.

**Can I use ServiceBridge alongside existing infrastructure?**
Yes. You can adopt incrementally — start with RPC between two services, add events later, then workflows. ServiceBridge doesn't require replacing your existing broker or mesh all at once.

**What happens when the control plane is down?**
In-flight direct RPC calls continue working (they go service-to-service, not through the control plane). New discovery lookups, event publishes, and telemetry writes are queued in the SDK offline queue and flushed when the control plane recovers.

**What databases does the runtime support?**
PostgreSQL 16+. The runtime uses PostgreSQL for all persistence: traces, events, workflows, jobs, service registry, and configuration.

---

## Community and Support

- Website: [servicebridge.dev](https://servicebridge.dev)
- GitHub: [github.com/service-bridge](https://github.com/service-bridge)
- SDK monorepo: [README.md](../README.md)

---

## License

Free for non-commercial use. Commercial use requires a separate license. See [LICENSE](../LICENSE).

Copyright (c) 2026 Eugene Surkov.

---

## Keywords

service-bridge · servicebridge · pip install service-bridge · Python SDK · asyncio microservices · RPC · gRPC · event bus · event-driven · distributed tracing · workflow orchestration · background jobs · cron · mTLS · service mesh · service discovery · zero sidecar · Istio alternative · RabbitMQ alternative · Celery alternative · Temporal alternative · Jaeger alternative · PostgreSQL · Docker · Kubernetes · DLQ · dead letter queue · saga · distributed transactions · AI agent orchestration · FastAPI middleware · Flask middleware · HTTP middleware · observability · Prometheus · tracing · service catalog · durable events · retries · idempotency · auto mTLS · runtime dashboard · production ready · microservice communication · Python 3.10+
