# X-Ray SDK & API for Decision Reasoning – RPG PRD

> A general-purpose observability system to capture **why** multi-step, non-deterministic pipelines make specific decisions, using decorator/middleware style instrumentation and a decision-centric data model.

---

<overview>

## Problem Statement

Modern pipelines (LLM + ranking + filters + heuristics) behave like black boxes: when a final output is wrong, engineers cannot see **which decision step** went bad or **why** that choice was made.

**Core pain points:**
- Traditional tracing/logging focuses on control flow and latency ("which function called which, how long"), but does not capture **candidates, filters, scores, and reasoning** behind business-level decisions
- Debugging requires re-running pipelines with print statements or guesswork
- No visibility into systematic issues across pipelines (e.g., "filter X removes >90% of candidates in all runs")
- When output is wrong, there's no audit trail of what options existed and why one was chosen

**Why existing solutions don't work:**
- **OpenTelemetry/Jaeger**: Great for distributed tracing (spans, latency), but not designed for decision reasoning (candidates, scores, selection logic)
- **Custom logging**: Inconsistent, hard to query across pipelines, no standard schema for decision data
- **LLM observability tools** (LangSmith, etc.): Focused on LLM calls specifically, not general decision pipelines

## Target Users

**Primary: ML/LLM Pipeline Engineers**
- Own complex multi-step flows: retrieval → filtering → ranking → LLM calls → post-processing
- Workflow: Pipeline produces bad output → need to identify which step failed and why
- Goal: Debug bad recommendations, wrong classifications, or nonsensical generations without re-running

**Secondary: Backend Engineers Building Decision-Heavy Services**
- Own: competitor selection, listing optimization, product categorization, routing bots, fraud rules
- Workflow: Need to understand patterns across many runs (e.g., "why does filter X always remove too much?")
- Goal: Cross-run analysis and systematic issue detection

**Tertiary: Debuggers/Oncall/SREs**
- Investigate incidents where "output looks wrong" without clear stack traces
- Workflow: Get alert → inspect single run → walk through each step's inputs/outputs/reasoning
- Goal: Quick root cause identification without deep pipeline knowledge

## Success Metrics

| Metric | Target | How Measured |
|--------|--------|--------------|
| Debugging efficiency | 80% of sessions identify problematic step from single X-Ray run | User surveys, time-to-resolution |
| Performance overhead | < 5% latency increase, 0% error rate from SDK | A/B testing instrumented vs non-instrumented |
| Instrumentation effort | Basic visibility in < 10 LOC; full visibility in < 30 LOC | Code samples, user feedback |
| Query capability | Single query answers "filter steps where removed_ratio > 0.9" | API functionality test |

</overview>

---

<functional-decomposition>

## Capability Tree

### Capability: Decision Run Capture

Capture one full execution of a multi-step pipeline as a **Run** with structured metadata, lifecycle management, and context propagation.

#### Feature: Run lifecycle management
- **Description**: Create, track, and finalize a complete pipeline execution from initial input to final output
- **Inputs**: `pipeline_name` (string), `input_summary` (JSON), optional metadata (`request_id`, `user_id`, `environment`, arbitrary key-values)
- **Outputs**: Persisted `Run` record with `id`, timestamps, status, input/output summaries
- **Behavior**:
  - `start_run(...)` creates Run, assigns UUID, sets `started_at`, status="running"
  - `end_run(status, output_summary)` sets `ended_at`, final status, output summary
  - Supports both explicit API (`run = start_run(); run.end(...)`) and context manager (`with xray.run(...):`)
  - Run object provides methods to create Steps attached to it

#### Feature: Run context management
- **Description**: Make the current Run available to nested code without explicit parameter passing
- **Inputs**: Run object, execution context (thread/async task)
- **Outputs**: Ability to retrieve current Run from anywhere in the call stack
- **Behavior**:
  - Uses `contextvars` to store current Run per async context/thread
  - `current_run()` returns active Run or None if no Run is active
  - Decorators and helpers use this to auto-attach Steps to the correct Run
  - Context properly propagates across async boundaries

#### Feature: Request-scoped run via middleware
- **Description**: Automatically start/end a Run for each HTTP request using web framework middleware
- **Inputs**: HTTP request (method, path, headers), response (status code, duration)
- **Outputs**: One Run per request with request/response metadata attached
- **Behavior**:
  - On request start: create Run with `pipeline_name = "http:{method}:{path_pattern}"`, set into context
  - On response: attach status code, duration, end Run with appropriate status
  - Expose current Run to all downstream code (route handlers, services, etc.)
  - Handle exceptions: mark Run as error with error info

---

### Capability: Step-Level Decision Tracing

Capture each **decision step** within a Run, including inputs, outputs, counts, and reasoning.

#### Feature: Manual step creation
- **Description**: Explicitly mark the start and end of a decision step with full control over captured data
- **Inputs**: `step_name` (string), `step_type` (enum), `input` (any), optional `metadata`, `reasoning`
- **Outputs**: Persisted Step record with timing, counts, summaries, reasoning
- **Behavior**:
  - `run.start_step(name, type, input)` returns Step handle, records `started_at`
  - `step.end(output, status)` records `ended_at`, computes duration
  - Auto-infers `input_count`/`output_count` if input/output are list-like
  - Captures exceptions: marks Step as error, records error message, re-raises
  - Step index auto-increments within Run

#### Feature: Decorator-based function instrumentation
- **Description**: Automatically turn a function/method call into a Step without changing call sites
- **Inputs**: Decorated function, decorator params (`name`, `type`, `detail`), function args/kwargs
- **Outputs**: Step record per call with input/output summaries and timing
- **Behavior**:
  - Decorator wrapper calls `current_run()` to get active Run
  - If Run exists: `start_step` before function, `end` after (with output or error)
  - If no Run: function executes normally without any X-Ray overhead
  - Default `step_name` = function name; can override via decorator param
  - Captures summarized view of args/kwargs as input, return value as output
  - Supports both sync and async functions

#### Feature: Class-level auto-instrumentation
- **Description**: Instrument all public methods of a class as Steps with a single decorator
- **Inputs**: Class definition, decorator params (`type`, `name_prefix`)
- **Outputs**: All public methods wrapped with `@step` decorator
- **Behavior**:
  - Class decorator iterates over class `__dict__`
  - Wraps each callable public method (not starting with `_`) with `@step`
  - Step name = `{ClassName}.{method_name}` by default
  - Respects inheritance: only wraps methods defined on the class itself
  - No-op if no active Run (methods work normally)

#### Feature: Payload summarization
- **Description**: Convert arbitrary inputs/outputs into bounded, safe summaries for storage
- **Inputs**: Raw input/output objects, `detail` level ("summary" or "full"), size thresholds
- **Outputs**: JSON-serializable summary with counts, samples, key fields
- **Behavior**:
  - For lists: extract `count`, `sample_ids` (first N items' IDs), type info
  - For dicts: extract key names, selected scalar values, nested counts
  - For objects: extract key attributes, type name
  - `detail="summary"` (default): counts + small samples only
  - `detail="full"`: full payload up to size threshold, then truncate
  - Never fails: gracefully handles non-serializable objects

#### Feature: Attach helpers
- **Description**: Enrich the current Step with reasoning and candidate information mid-execution
- **Inputs**: Reasoning (dict or string), candidates (list with scores), selected_id
- **Outputs**: Updated Step metadata/reasoning fields
- **Behavior**:
  - `xray.attach_reasoning(reasoning)`: merges into current Step's `reasoning` field
  - `xray.attach_candidates(candidates, selected_id)`: merges into Step's `metadata.candidates`
  - Works within decorated functions (uses context to find current Step)
  - If no active Step: logs warning, no-op (doesn't crash)
  - Candidates stored as: `[{id, score, selected, ...custom fields}]`

---

### Capability: Data Transport & Reliability

Ensure X-Ray data reaches the backend without impacting pipeline performance or reliability.

#### Feature: Async buffered transport
- **Description**: Queue events in memory and send to backend asynchronously
- **Inputs**: Run/Step events, backend URL, buffer configuration
- **Outputs**: Best-effort delivery of events to backend
- **Behavior**:
  - Events enqueued into in-memory buffer (bounded size)
  - Background worker flushes buffer to backend via HTTP POST
  - Batches multiple events per request for efficiency
  - Pipeline code never blocks waiting for X-Ray sends
  - Configurable flush interval and batch size

#### Feature: Fail-open behavior
- **Description**: Prevent X-Ray from breaking or slowing down business pipelines under any circumstance
- **Inputs**: Backend connectivity state, error responses, buffer state
- **Outputs**: Graceful degradation, never pipeline failures
- **Behavior**:
  - On backend error: log warning, mark client as degraded, back off retries
  - On buffer full: drop oldest events (configurable: oldest vs newest)
  - On SDK exceptions: catch internally, log, continue pipeline execution
  - SDK initialization failure: SDK becomes no-op, pipeline runs normally
  - Periodic health checks to detect backend recovery

---

### Capability: Data Persistence

Store Runs and Steps in a queryable database.

#### Feature: Run storage
- **Description**: Persist Run records with all metadata
- **Inputs**: Run data from SDK (via API)
- **Outputs**: Stored Run record, retrievable by ID
- **Behavior**:
  - Insert on `POST /xray/runs`
  - Update on `POST /xray/runs/{id}/end`
  - Index on: `pipeline_name`, `status`, `started_at`, `request_id`, `user_id`

#### Feature: Step storage
- **Description**: Persist Step records linked to their Run
- **Inputs**: Step data from SDK (via API)
- **Outputs**: Stored Step record with computed fields
- **Behavior**:
  - Insert on `POST /xray/steps`
  - Store `input_count`, `output_count` as columns for efficient queries
  - Index on: `run_id`, `step_type`, `step_name`, `input_count`, `output_count`
  - JSON fields: `input_summary`, `output_summary`, `reasoning`, `metadata`, `labels`

---

### Capability: Query & Analysis API

Provide endpoints to retrieve and analyze Runs and Steps for debugging and pattern detection.

#### Feature: Single run retrieval
- **Description**: Fetch one Run with all its Steps for detailed debugging
- **Inputs**: `run_id`
- **Outputs**: Run object with ordered array of Steps, all fields populated
- **Behavior**:
  - `GET /xray/runs/{run_id}` returns Run + Steps
  - Steps ordered by `index` (execution order)
  - Includes computed `removed_ratio` per Step
  - Includes `duration_ms` computed from timestamps

#### Feature: Run listing
- **Description**: List Runs with filtering and pagination
- **Inputs**: Filters (`pipeline_name`, `status`, `environment`, `user_id`, date range), pagination (`limit`, `offset`)
- **Outputs**: Paginated list of Runs (without Steps for performance)
- **Behavior**:
  - `GET /xray/runs?pipeline=...&status=...&limit=20`
  - Returns summary info per Run (no nested Steps)
  - Sorted by `started_at` descending (most recent first)

#### Feature: Cross-pipeline step queries
- **Description**: Query Steps across all Runs by type, counts, and computed metrics
- **Inputs**: Filters (`step_type`, `step_name`, `removed_ratio_gt`, `removed_ratio_lt`, `pipeline`, `status`), pagination
- **Outputs**: List of matching Steps with Run references
- **Behavior**:
  - `GET /xray/steps?step_type=filter&removed_ratio_gt=0.9`
  - Compute `removed_ratio = (input_count - output_count) / input_count` at query time
  - Include `run_id` and `pipeline_name` for context
  - Support basic aggregations: count by pipeline, avg removed_ratio by step_name

</functional-decomposition>

---

<structural-decomposition>

## Repository Structure

```
xray/
├── xray_sdk/                     # Python SDK package
│   ├── __init__.py               # Public exports
│   ├── config.py                 # SDK configuration
│   ├── types.py                  # Enums, type definitions
│   ├── client.py                 # Global client, context management
│   ├── run.py                    # Run class
│   ├── step.py                   # Step class, payload utilities
│   ├── decorators.py             # @step, @instrument_class
│   ├── middleware.py             # FastAPI/Starlette middleware
│   └── transport.py              # Async buffer, HTTP client
│
├── xray_api/                     # Backend API package
│   ├── __init__.py
│   ├── config.py                 # API configuration
│   ├── models.py                 # SQLAlchemy ORM models
│   ├── schemas.py                # Pydantic request/response schemas
│   ├── store.py                  # Data access layer
│   ├── routes.py                 # API route handlers
│   └── main.py                   # FastAPI app factory
│
├── examples/
│   └── competitor_pipeline/      # Demo application
│       ├── pipeline.py           # Instrumented pipeline class
│       ├── app.py                # FastAPI app with middleware
│       └── README.md
│
├── tests/
│   ├── sdk/                      # SDK unit tests
│   ├── api/                      # API unit tests
│   └── integration/              # End-to-end tests
│
├── pyproject.toml                # Project config, dependencies
└── README.md
```

## Module Definitions

### Module: `xray_sdk.config`
- **Maps to capability**: Foundation (cross-cutting)
- **Responsibility**: SDK configuration loading and defaults
- **Exports**:
  - `XRayConfig` dataclass: `base_url`, `api_key`, `buffer_size`, `flush_interval`, `default_detail`
  - `load_config(overrides)` → `XRayConfig`

### Module: `xray_sdk.types`
- **Maps to capability**: Foundation (cross-cutting)
- **Responsibility**: Shared type definitions and enums
- **Exports**:
  - `StepType` enum: `filter`, `rank`, `llm`, `retrieval`, `transform`, `other`
  - `RunStatus` enum: `running`, `success`, `error`
  - `StepStatus` enum: `running`, `success`, `error`
  - `DetailLevel` enum: `summary`, `full`

### Module: `xray_sdk.transport`
- **Maps to capability**: Data Transport & Reliability
- **Responsibility**: Async event buffering and HTTP sending with fail-open semantics
- **Exports**:
  - `Transport` class:
    - `__init__(config: XRayConfig)`
    - `enqueue_run(run_data: dict)`
    - `enqueue_step(step_data: dict)`
    - `enqueue_run_end(run_id: str, data: dict)`
    - `flush()` → force immediate send
    - `shutdown()` → graceful shutdown with final flush

### Module: `xray_sdk.step`
- **Maps to capability**: Step-Level Decision Tracing
- **Responsibility**: Step representation, payload summarization, attach helpers
- **Exports**:
  - `Step` class:
    - `__init__(run, name, type, input, metadata)`
    - `end(output, status)` → finalize and enqueue
    - `attach_reasoning(reasoning: dict | str)`
    - `attach_candidates(candidates: list, selected_id: str)`
  - `summarize_payload(obj, detail: DetailLevel, max_items: int)` → dict
  - `infer_count(obj)` → int | None

### Module: `xray_sdk.run`
- **Maps to capability**: Decision Run Capture
- **Responsibility**: Run representation and Step factory
- **Exports**:
  - `Run` class:
    - `__init__(transport, pipeline_name, input_summary, metadata)`
    - `start_step(name, type, input, metadata, reasoning)` → Step
    - `end(status, output_summary)`
    - Context manager support (`__enter__`, `__exit__`)

### Module: `xray_sdk.client`
- **Maps to capability**: Decision Run Capture (context management)
- **Responsibility**: Global client singleton, context variable management
- **Exports**:
  - `init_xray(config: dict)` → XRayClient
  - `XRayClient` class:
    - `start_run(pipeline_name, input, **metadata)` → Run
    - `current_run()` → Run | None
    - `current_step()` → Step | None
    - `attach_reasoning(reasoning)`
    - `attach_candidates(candidates, selected_id)`
    - `middleware()` → FastAPI middleware

### Module: `xray_sdk.decorators`
- **Maps to capability**: Step-Level Decision Tracing (decorator instrumentation)
- **Responsibility**: Function and class decorators for automatic Step creation
- **Exports**:
  - `step(name=None, type="other", detail="summary")` → decorator
  - `instrument_class(type="other", name_prefix=None)` → class decorator

### Module: `xray_sdk.middleware`
- **Maps to capability**: Decision Run Capture (request-scoped runs)
- **Responsibility**: HTTP middleware for automatic Run per request
- **Exports**:
  - `XRayMiddleware` class (Starlette/FastAPI compatible)
  - `create_middleware(client: XRayClient)` → middleware instance

### Module: `xray_api.config`
- **Maps to capability**: Foundation (API)
- **Responsibility**: API configuration (database URL, settings)
- **Exports**:
  - `APIConfig` dataclass: `database_url`, `debug`
  - `load_config()` → APIConfig

### Module: `xray_api.models`
- **Maps to capability**: Data Persistence
- **Responsibility**: SQLAlchemy ORM models for Run and Step
- **Exports**:
  - `Run` model: all columns as defined in data model
  - `Step` model: all columns as defined in data model
  - `create_tables(engine)` → create schema

### Module: `xray_api.schemas`
- **Maps to capability**: Query & Analysis API
- **Responsibility**: Pydantic models for API request/response validation
- **Exports**:
  - `RunCreate`, `RunEnd`, `RunResponse`, `RunListResponse`
  - `StepCreate`, `StepResponse`, `StepListResponse`
  - `StepQueryParams`

### Module: `xray_api.store`
- **Maps to capability**: Data Persistence, Query & Analysis API
- **Responsibility**: Database operations (CRUD, queries)
- **Exports**:
  - `create_run(db, data)` → Run
  - `end_run(db, run_id, data)` → Run
  - `get_run(db, run_id)` → Run with Steps
  - `list_runs(db, filters, pagination)` → list[Run]
  - `create_step(db, data)` → Step
  - `query_steps(db, filters, pagination)` → list[Step]

### Module: `xray_api.routes`
- **Maps to capability**: Query & Analysis API
- **Responsibility**: FastAPI route handlers
- **Exports**:
  - `router` APIRouter with all endpoints

### Module: `xray_api.main`
- **Maps to capability**: Query & Analysis API
- **Responsibility**: FastAPI application factory
- **Exports**:
  - `create_app()` → FastAPI app

</structural-decomposition>

---

<dependency-graph>

## Dependency Chain

### Foundation Layer (Phase 0)
No dependencies - these are built first.

- **xray_sdk.config**: Provides SDK configuration loading. No dependencies.
- **xray_sdk.types**: Provides enums and type definitions. No dependencies.
- **xray_api.config**: Provides API configuration loading. No dependencies.

### SDK Transport Layer (Phase 1a)
- **xray_sdk.transport**: Depends on [xray_sdk.config, xray_sdk.types]
  - Needs config for URLs, buffer settings
  - Needs types for payload structure

### SDK Core Layer (Phase 1b)
- **xray_sdk.step**: Depends on [xray_sdk.types, xray_sdk.transport]
  - Needs types for StepType, StepStatus, DetailLevel
  - Needs transport to enqueue step data

- **xray_sdk.run**: Depends on [xray_sdk.types, xray_sdk.step, xray_sdk.transport]
  - Needs types for RunStatus
  - Needs step to create Step instances
  - Needs transport to enqueue run data

- **xray_sdk.client**: Depends on [xray_sdk.config, xray_sdk.types, xray_sdk.run, xray_sdk.transport]
  - Needs config for initialization
  - Needs run to create Run instances
  - Needs transport for lifecycle management
  - Manages context variables

### SDK DX Layer (Phase 2a)
- **xray_sdk.decorators**: Depends on [xray_sdk.client, xray_sdk.types]
  - Needs client for current_run(), current_step()
  - Needs types for StepType, DetailLevel

- **xray_sdk.middleware**: Depends on [xray_sdk.client, xray_sdk.run]
  - Needs client to start runs and set context
  - Needs run for Run type

### API Data Layer (Phase 1c)
- **xray_api.models**: Depends on [xray_api.config]
  - Needs config for database connection

- **xray_api.store**: Depends on [xray_api.models]
  - Needs models for ORM operations

### API Interface Layer (Phase 2b)
- **xray_api.schemas**: Depends on [xray_sdk.types]
  - Needs types for enum values in validation

- **xray_api.routes**: Depends on [xray_api.schemas, xray_api.store]
  - Needs schemas for request/response validation
  - Needs store for database operations

- **xray_api.main**: Depends on [xray_api.routes, xray_api.config]
  - Needs routes to mount
  - Needs config for app settings

### Integration Layer (Phase 3)
- **examples/competitor_pipeline**: Depends on [xray_sdk.*, xray_api.*]
  - Needs full SDK for instrumentation
  - Needs running API for data storage

</dependency-graph>

---

<implementation-roadmap>

## Development Phases

### Phase 0: Foundation
**Goal**: Establish configuration, types, and project structure so all other modules can be built.

**Entry Criteria**: Empty repository

**Tasks**:
- [ ] Setup project structure with pyproject.toml (depends on: none)
  - Acceptance: `pip install -e .` works, both packages importable
  - Test: Import statements succeed

- [ ] Implement `xray_sdk.config` (depends on: none)
  - Acceptance: Can load config from env vars and kwargs, has sensible defaults
  - Test: Unit tests for config loading, override precedence

- [ ] Implement `xray_sdk.types` (depends on: none)
  - Acceptance: All enums defined, JSON serializable
  - Test: Unit tests for enum values, serialization

- [ ] Implement `xray_api.config` (depends on: none)
  - Acceptance: Can load database URL, debug settings
  - Test: Unit tests for config loading

**Exit Criteria**: All foundation modules importable, no circular dependencies

**Delivers**: Stable base for all other development

---

### Phase 1: Core Data Path
**Goal**: Enable basic Run/Step capture in SDK and storage in API - the minimum viable data flow.

**Entry Criteria**: Phase 0 complete

**Tasks**:
- [ ] Implement `xray_sdk.transport` (depends on: [config, types])
  - Acceptance: Can enqueue events, background worker sends to URL, fail-open on errors
  - Test: Unit tests with mock HTTP; test buffer overflow behavior

- [ ] Implement `xray_sdk.step` (depends on: [types, transport])
  - Acceptance: Step class captures input/output, computes duration, infers counts
  - Test: Unit tests for Step lifecycle, summarization, count inference

- [ ] Implement `xray_sdk.run` (depends on: [types, step, transport])
  - Acceptance: Run class creates Steps, enqueues data, supports context manager
  - Test: Unit tests for Run lifecycle, Step creation

- [ ] Implement `xray_sdk.client` (depends on: [config, types, run, transport])
  - Acceptance: `init_xray()` works, `start_run()` creates Run, context vars work
  - Test: Unit tests for client init, run creation, context management

- [ ] Implement `xray_api.models` (depends on: [api.config])
  - Acceptance: Run and Step tables created, all columns present, indexes defined
  - Test: Migration test, model instantiation

- [ ] Implement `xray_api.store` (depends on: [models])
  - Acceptance: CRUD operations work for Run and Step
  - Test: Integration tests with test database

- [ ] Implement `xray_api.schemas` (depends on: [sdk.types])
  - Acceptance: All request/response models defined, validation works
  - Test: Unit tests for schema validation

- [ ] Implement `xray_api.routes` - ingest only (depends on: [schemas, store])
  - Acceptance: `POST /xray/runs`, `POST /xray/runs/{id}/end`, `POST /xray/steps` work
  - Test: Integration tests with TestClient

- [ ] Implement `xray_api.main` (depends on: [routes, config])
  - Acceptance: App starts, routes mounted, database connected
  - Test: Smoke test app startup

**Exit Criteria**: Can create Run and Steps via SDK, verify data in database via direct query

**Delivers**: End-to-end data capture (SDK → API → DB)

---

### Phase 2: Developer Experience
**Goal**: Make instrumentation ergonomic with decorators, middleware, and query API.

**Entry Criteria**: Phase 1 complete

**Tasks**:
- [ ] Implement `@step` decorator (depends on: [client, types])
  - Acceptance: Decorating function creates Step when Run active, no-op otherwise
  - Test: Unit tests for sync/async functions, with/without Run, error handling

- [ ] Implement `@instrument_class` decorator (depends on: [decorators.step])
  - Acceptance: All public methods wrapped, names correct, inheritance respected
  - Test: Unit tests for class decoration, method wrapping

- [ ] Implement attach helpers in client (depends on: [client, step])
  - Acceptance: `attach_reasoning()` and `attach_candidates()` update current Step
  - Test: Unit tests for attach operations, no-op when no Step

- [ ] Implement `xray_sdk.middleware` (depends on: [client, run])
  - Acceptance: Each request creates Run, sets context, ends Run on response
  - Test: Integration tests with FastAPI TestClient

- [ ] Implement query routes (depends on: [routes, store, schemas])
  - Acceptance: `GET /xray/runs/{id}`, `GET /xray/runs`, `GET /xray/steps` with filters
  - Test: Integration tests for each endpoint, filter combinations

**Exit Criteria**: Can instrument a FastAPI app with decorators + middleware, query data via API

**Delivers**: Production-ready instrumentation DX

---

### Phase 3: Example & Polish
**Goal**: Demonstrate full system with realistic example, ensure production readiness.

**Entry Criteria**: Phase 2 complete

**Tasks**:
- [ ] Build competitor_pipeline example (depends on: [SDK, API])
  - Acceptance: Multi-step pipeline with filter/rank/llm steps, fully instrumented
  - Test: Example runs, creates expected Run/Steps

- [ ] Build example FastAPI app with middleware (depends on: [middleware, example pipeline])
  - Acceptance: `/run_pipeline` endpoint triggers instrumented pipeline
  - Test: Request creates Run with all Steps visible in API

- [ ] Write integration test suite (depends on: [example])
  - Acceptance: Full flow tested: request → SDK → API → query
  - Test: CI passes

- [ ] Documentation and README (depends on: [all])
  - Acceptance: README shows minimal and full instrumentation examples
  - Test: Code samples run successfully

**Exit Criteria**: Demo-able system with clear documentation

**Delivers**: Complete, usable X-Ray system

</implementation-roadmap>

---

<test-strategy>

## Test Pyramid

```
          /\
         /E2E\           ← 10% (Full flow: HTTP → SDK → API → DB → Query)
        /------\
       /Integration\     ← 25% (SDK↔Transport, API↔Store, Middleware)
      /--------------\
     /   Unit Tests   \  ← 65% (Step logic, decorators, summarization, store)
    /------------------\
```

## Coverage Requirements
- Line coverage: 80% minimum
- Branch coverage: 70% minimum
- Function coverage: 90% minimum
- Critical paths (transport fail-open, context management): 100%

## Critical Test Scenarios

### Module: xray_sdk.transport

**Happy path**:
- Enqueue events, background worker sends to backend
- Expected: Events arrive at backend, no blocking in enqueue

**Edge cases**:
- Buffer reaches capacity
- Expected: Oldest events dropped, no crash, warning logged

**Error cases**:
- Backend returns 500, connection timeout, DNS failure
- Expected: Events buffered, retried with backoff, pipeline unaffected

**Integration**:
- Transport with real HTTP server (mock)
- Expected: Batched sends, correct payload format

---

### Module: xray_sdk.step

**Happy path**:
- Create Step, attach reasoning/candidates, end with output
- Expected: All fields populated, counts inferred, duration computed

**Edge cases**:
- Non-list input/output (single object, None)
- Expected: `input_count`/`output_count` = None, no crash
- Very large payload with `detail="summary"`
- Expected: Truncated to configured limits

**Error cases**:
- Exception during step execution
- Expected: Step marked error, error_message captured, exception re-raised

---

### Module: xray_sdk.decorators

**Happy path**:
- Decorated function called with active Run
- Expected: Step created with correct name/type, input/output captured

**Edge cases**:
- Decorated function called with NO active Run
- Expected: Function runs normally, no Step created, no error
- Async decorated function
- Expected: Works correctly, context propagates

**Error cases**:
- Decorated function raises exception
- Expected: Step recorded with error status, exception propagates

---

### Module: xray_sdk.middleware

**Happy path**:
- HTTP request through middleware
- Expected: Run created, context set, Steps inside handler attach to Run

**Edge cases**:
- Request with no instrumented handlers
- Expected: Run still created/ended, just no Steps

**Error cases**:
- Handler raises exception
- Expected: Run marked error, error info captured, 500 returned

---

### Module: xray_api.store (query)

**Happy path**:
- Query steps with `step_type=filter&removed_ratio_gt=0.9`
- Expected: Correct Steps returned, removed_ratio computed correctly

**Edge cases**:
- Query with no matches
- Expected: Empty list, not error
- Step with `input_count=0`
- Expected: removed_ratio handled (null or 0, not division error)

---

## Test Generation Guidelines

- Use pytest with fixtures for database and client setup
- Mock transport HTTP calls with `httpx` mock or `responses` library
- Use FastAPI `TestClient` for API integration tests
- Test both sync and async code paths for decorators
- Use `contextvars` `copy_context()` for testing context propagation
- Parameterize tests for different StepTypes, DetailLevels

</test-strategy>

---

<architecture>

## System Components

```
┌─────────────────────────────────────────────────────────────────┐
│                      User Application                           │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐ │
│  │ @step       │  │ @instrument │  │ xray.start_run()        │ │
│  │ decorator   │  │ _class      │  │ run.start_step()        │ │
│  └──────┬──────┘  └──────┬──────┘  └───────────┬─────────────┘ │
│         │                │                     │               │
│         └────────────────┼─────────────────────┘               │
│                          ▼                                     │
│              ┌───────────────────────┐                         │
│              │   XRayClient          │                         │
│              │   (context vars)      │                         │
│              └───────────┬───────────┘                         │
│                          │                                     │
│              ┌───────────▼───────────┐                         │
│              │   Transport           │                         │
│              │   (async buffer)      │                         │
│              └───────────┬───────────┘                         │
└──────────────────────────┼──────────────────────────────────────┘
                           │ HTTP POST (async, fail-open)
                           ▼
┌──────────────────────────────────────────────────────────────────┐
│                      X-Ray API                                   │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │ POST /runs  │  │ POST /steps │  │ GET /runs, GET /steps   │  │
│  └──────┬──────┘  └──────┬──────┘  └───────────┬─────────────┘  │
│         │                │                     │                │
│         └────────────────┼─────────────────────┘                │
│                          ▼                                      │
│              ┌───────────────────────┐                          │
│              │   Store (SQLAlchemy)  │                          │
│              └───────────┬───────────┘                          │
│                          ▼                                      │
│              ┌───────────────────────┐                          │
│              │   Database            │                          │
│              │   (SQLite/PostgreSQL) │                          │
│              └───────────────────────┘                          │
└──────────────────────────────────────────────────────────────────┘
```

## Data Models

### Run Table

| Column | Type | Constraints | Index |
|--------|------|-------------|-------|
| `id` | UUID | PRIMARY KEY | - |
| `pipeline_name` | VARCHAR(255) | NOT NULL | YES |
| `status` | VARCHAR(20) | NOT NULL, DEFAULT 'running' | YES |
| `started_at` | TIMESTAMP | NOT NULL, DEFAULT NOW | YES |
| `ended_at` | TIMESTAMP | NULL | - |
| `input_summary` | JSON | NULL | - |
| `output_summary` | JSON | NULL | - |
| `metadata` | JSON | NULL | - |
| `request_id` | VARCHAR(255) | NULL | YES |
| `user_id` | VARCHAR(255) | NULL | YES |
| `environment` | VARCHAR(50) | NULL | YES |

### Step Table

| Column | Type | Constraints | Index |
|--------|------|-------------|-------|
| `id` | UUID | PRIMARY KEY | - |
| `run_id` | UUID | FOREIGN KEY → Run.id, NOT NULL | YES |
| `step_name` | VARCHAR(255) | NOT NULL | YES |
| `step_type` | VARCHAR(50) | NOT NULL | YES |
| `index` | INTEGER | NOT NULL | - |
| `started_at` | TIMESTAMP | NOT NULL | - |
| `ended_at` | TIMESTAMP | NULL | - |
| `duration_ms` | INTEGER | NULL (computed on end) | - |
| `input_summary` | JSON | NULL | - |
| `output_summary` | JSON | NULL | - |
| `input_count` | INTEGER | NULL | YES |
| `output_count` | INTEGER | NULL | YES |
| `reasoning` | JSON | NULL | - |
| `metadata` | JSON | NULL | - |
| `labels` | JSON | NULL | - |
| `status` | VARCHAR(20) | NULL | YES |
| `error_message` | TEXT | NULL | - |

**Notes:**
- `removed_ratio` computed at query time: `(input_count - output_count) / NULLIF(input_count, 0)`
- Candidate info stored in `metadata.candidates` as JSON array
- JSON columns use JSONB in PostgreSQL for indexing capability

## Technology Stack

| Component | Technology | Rationale |
|-----------|------------|-----------|
| SDK Language | Python 3.10+ | Target users are ML/LLM engineers, decorator support |
| API Framework | FastAPI | Async, fast, good typing, OpenAPI docs |
| ORM | SQLAlchemy 2.0 | Mature, async support, good migration tools |
| Database | SQLite (dev) / PostgreSQL (prod) | Simple dev, scalable prod |
| HTTP Client | httpx | Async support, modern API |
| Context | contextvars | Standard library, async-safe |
| Testing | pytest + pytest-asyncio | Standard, good async support |

**Decision: Python-only SDK (no polyglot)**
- **Rationale**: Target users are Python ML engineers; decorator/middleware patterns are Python-native; faster to ship single language
- **Trade-offs**: Teams using Node/Go need to wait for future ports
- **Alternatives**: Build HTTP-only API and let users write their own thin clients

**Decision: Counts stored, removed_ratio computed**
- **Rationale**: Counts are stable after insert; ratio is derived and benefits from always using latest formula
- **Trade-offs**: Query-time computation adds slight overhead
- **Alternatives**: Store ratio (risk of stale values if formula changes)

**Decision: No separate Candidate table (v1)**
- **Rationale**: Simplifies schema; candidate info often varies wildly; JSON in metadata is flexible
- **Trade-offs**: Cannot query individual candidates across steps
- **Alternatives**: Add Candidate table in v2 if querying needs emerge

</architecture>

---

<risks>

## Technical Risks

**Risk**: Context variable bugs in async code (wrong Run/Step association)
- **Impact**: High - incorrect traces undermine trust in the system
- **Likelihood**: Medium - contextvars are tricky with certain async patterns
- **Mitigation**: Extensive testing with concurrent async requests; use `copy_context()` properly; provide explicit `run` parameter as escape hatch
- **Fallback**: Allow users to pass `run` explicitly to all methods

**Risk**: Transport buffer memory bloat under high throughput
- **Impact**: Medium - could cause OOM in high-traffic services
- **Likelihood**: Medium - depends on backend latency and request volume
- **Mitigation**: Bounded buffer with drop policy; configurable limits; metrics on buffer size; alerts
- **Fallback**: Disable buffering, switch to sync send with timeout

**Risk**: JSON payload size explosion
- **Impact**: Medium - storage costs, slow queries
- **Likelihood**: High - users may accidentally log large objects
- **Mitigation**: Default `detail="summary"` with aggressive truncation; max payload size; warn on large payloads
- **Fallback**: Hard limit on payload size, reject oversized

## Dependency Risks

**Risk**: Backend API unavailable
- **Impact**: Low (on pipeline) / High (on observability)
- **Likelihood**: Medium - any service can have downtime
- **Mitigation**: Fail-open design; async buffering; clear degraded-mode behavior documented
- **Fallback**: Local file fallback for critical debugging (future)

**Risk**: Database performance under query load
- **Impact**: Medium - slow queries frustrate users
- **Likelihood**: Medium - depends on data volume and query patterns
- **Mitigation**: Proper indexing; pagination required; query timeouts
- **Fallback**: Read replica; caching layer (future)

## Scope Risks

**Risk**: Scope creep into UI/dashboard
- **Impact**: High - diverts effort from core SDK/API
- **Likelihood**: Medium - users will ask for visualizations
- **Mitigation**: Explicitly out of scope for v1; JSON API is the product; point to Postman/curl for now
- **Fallback**: Simple HTML debug view (read-only) if absolutely needed

**Risk**: Over-engineering auto-instrumentation
- **Impact**: Medium - time sink, complexity
- **Likelihood**: Medium - tempting to add more magic
- **Mitigation**: Limit to explicit decorators + class decorator; document manual approach; no monkey-patching
- **Fallback**: Remove class decorator if too complex

</risks>

---

<appendix>

## References

- OpenTelemetry Python instrumentation patterns: https://opentelemetry.io/docs/instrumentation/python/
- FastAPI middleware documentation: https://fastapi.tiangolo.com/tutorial/middleware/
- Python contextvars: https://docs.python.org/3/library/contextvars.html
- SQLAlchemy 2.0 async: https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html

## Glossary

| Term | Definition |
|------|------------|
| **Run** | One complete execution of a pipeline from initial input to final output |
| **Step** | One decision-making stage within a Run (e.g., filter, rank, LLM call) |
| **X-Ray** | This system - decision-reasoning observability layer focused on "why this output?" |
| **Fail-open** | Design pattern where failures in observability never impact the observed system |
| **removed_ratio** | `(input_count - output_count) / input_count` - measures how much a step filtered |
| **Detail level** | Configuration controlling payload capture depth: "summary" (counts only) vs "full" (complete data) |

## Open Questions (Deferred to v2)

1. **Candidate table**: Add separate table for queryable individual candidate data?
2. **LLM reasoning**: Auto-generate explanations from structured step data?
3. **UI**: Simple web dashboard vs JSON API only?
4. **Multi-language**: Node.js, Go SDKs?
5. **Sampling**: Probabilistic capture for high-volume pipelines?
6. **Retention**: Auto-delete old runs after N days?

</appendix>

---

<task-master-integration>

# How Task Master Parses This PRD

1. **Capabilities** (`### Capability:`) → Top-level tasks
2. **Features** (`#### Feature:`) → Subtasks under capabilities
3. **Dependency graph** → Task dependencies and ordering
4. **Phases** → Task priorities (Phase 0 = highest)
5. **Test scenarios** → Context for test generation

## Parsing Hints

- Each Feature has clear Inputs/Outputs/Behavior → maps to acceptance criteria
- Dependency chain explicitly states what each module needs → task dependencies
- Phase tasks have explicit `(depends on: [...])` annotations
- Test scenarios provide specific cases for Surgical Test Generator

</task-master-integration>
