Metadata-Version: 2.4
Name: ai-pipeline-core
Version: 0.12.2
Summary: Core utilities for AI-powered processing pipelines using prefect
Project-URL: Homepage, https://github.com/bbarwik/ai-pipeline-core
Project-URL: Repository, https://github.com/bbarwik/ai-pipeline-core
Project-URL: Issues, https://github.com/bbarwik/ai-pipeline-core/issues
Author-email: "ResearchTech Inc." <bartosz@research.tech>
License: MIT
License-File: LICENSE
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.12
Requires-Dist: clickhouse-connect>=0.10.0
Requires-Dist: google-cloud-pubsub>=2.0
Requires-Dist: httpx>=0.28.1
Requires-Dist: jinja2>=3.1.6
Requires-Dist: lmnr>=0.7.37
Requires-Dist: openai>=2.16.0
Requires-Dist: opentelemetry-sdk>=1.39.1
Requires-Dist: pillow>=12.1.0
Requires-Dist: prefect-gcp>=0.6.15
Requires-Dist: prefect>=3.6.15
Requires-Dist: pydantic-settings>=2.12.0
Requires-Dist: pydantic>=2.12.5
Requires-Dist: pypdf>=5.0.0
Requires-Dist: python-magic>=0.4.27
Requires-Dist: ruamel-yaml>=0.19.1
Requires-Dist: tiktoken>=0.12.0
Provides-Extra: dev
Requires-Dist: basedpyright==1.37.3; extra == 'dev'
Requires-Dist: bump2version>=1.0.1; extra == 'dev'
Requires-Dist: interrogate==1.7.0; extra == 'dev'
Requires-Dist: pre-commit>=4.3.0; extra == 'dev'
Requires-Dist: pylint>=3.0.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=1.1.0; extra == 'dev'
Requires-Dist: pytest-cov>=5.0.0; extra == 'dev'
Requires-Dist: pytest-mock>=3.14.0; extra == 'dev'
Requires-Dist: pytest-xdist>=3.8.0; extra == 'dev'
Requires-Dist: pytest>=8.4.1; extra == 'dev'
Requires-Dist: ruff==0.15.2; extra == 'dev'
Requires-Dist: testcontainers[clickhouse]>=4.0.0; extra == 'dev'
Requires-Dist: vulture==2.14; extra == 'dev'
Description-Content-Type: text/markdown

# AI Pipeline Core

Production framework for building type-safe AI pipelines — designed to be developed and used by AI coding agents. Open-sourced by [research.tech](https://research.tech).

[![Python Version](https://img.shields.io/badge/python-3.12%2B-blue)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Code Style: Ruff](https://img.shields.io/badge/code%20style-ruff-000000.svg)](https://github.com/astral-sh/ruff)
[![Type Checked: Basedpyright](https://img.shields.io/badge/type%20checked-basedpyright-blue)](https://github.com/DetachHead/basedpyright)

## Overview

AI Pipeline Core is a production-ready framework that combines document processing, LLM integration, and workflow orchestration into a unified system. Built with strong typing (Pydantic), automatic retries, cost tracking, and distributed tracing, it enforces best practices while keeping application code minimal and straightforward.

This framework is the foundation of AI projects at [research.tech](https://research.tech). It is an internal-first solution, open-sourced because we believe in sharing production infrastructure publicly. The design prioritizes **strictness over flexibility** — all data structures are immutable, all inputs are validated at definition time, and all prompts are typed Python classes. These constraints exist because the framework is primarily developed and maintained by AI coding agents, which require rigid guardrails rather than flexible guidelines.

### Key Features

- **Document System**: Single `Document` base class with immutable content, SHA256-based identity, automatic MIME type detection, provenance tracking, multi-part attachments, and optional typed content via `Document[ModelType]`
- **Document Store**: Pluggable storage backends (ClickHouse production, local filesystem CLI/debug, in-memory testing) with automatic deduplication
- **Conversation Class**: Immutable, stateful multi-turn LLM conversations with context caching, automatic URL/address shortening, and eager response restoration
- **LLM Integration**: Unified interface to any model via LiteLLM proxy (OpenRouter compatible) with context caching (default 300s TTL)
- **Structured Output**: Type-safe generation with Pydantic model validation via `Conversation.send_structured()`
- **Workflow Orchestration**: Prefect-based flows and tasks with annotation-driven document types and decoration-time input/output validation
- **Auto-Persistence**: `@pipeline_task` saves returned documents to `DocumentStore` automatically
- **Image Processing**: Automatic image tiling/splitting for LLM vision models with model-specific presets
- **Observability**: Built-in distributed tracing via Laminar (LMNR) with cost tracking, local trace debugging, ClickHouse-based tracking, and remote trace download/reconstruction
- **Prompt Compiler**: Type-safe prompt specifications replacing Jinja2 templates — typed Python classes for roles, rules, guides, and output formats with definition-time validation and a CLI tool for inspection
- **Replay**: Capture and re-execute any LLM conversation, pipeline task, or flow from human-editable YAML files with document resolution via SHA256 references
- **Deployment**: Unified pipeline execution for local, CLI, and production environments with per-flow resume and dual-store support

## Installation

```bash
pip install ai-pipeline-core
```

This installs four CLI commands:
- `ai-prompt-compiler` — discover, inspect, render, and compile prompt specifications
- `ai-pipeline-deploy` — build and deploy pipelines to Prefect Cloud
- `ai-replay` — execute or inspect replay YAML files from trace output
- `ai-trace` — list, show, and download pipeline traces from ClickHouse

### Requirements

- Python 3.12 or higher
- Linux/macOS (Windows via WSL2)
- [uv](https://astral.sh/uv) (recommended)

### Versioning

This is an internal framework under active development. **No backward compatibility is guaranteed between versions** — pin your dependency to an exact version. There is no changelog; the git commit history serves as the changelog.

### Development Installation

```bash
git clone https://github.com/bbarwik/ai-pipeline-core.git
cd ai-pipeline-core
make install-dev     # Initializes uv environment and installs pre-commit hooks
```

## Quick Start

### Basic Pipeline

```python
from typing import ClassVar

from pydantic import BaseModel, Field

from ai_pipeline_core import (
    Document,
    DeploymentResult,
    FlowOptions,
    PipelineDeployment,
    pipeline_flow,
    pipeline_task,
    setup_logging,
    get_pipeline_logger,
)

setup_logging(level="INFO")
logger = get_pipeline_logger(__name__)


# 1. Define document types (subclass Document)
class InputDocument(Document):
    """Pipeline input."""

class AnalysisDocument(Document):
    """Per-document analysis result."""

class ReportDocument(Document):
    """Final compiled report."""


# 2. Structured output model
class AnalysisSummary(BaseModel):
    word_count: int
    top_keywords: list[str] = Field(default_factory=list)


# 3. Pipeline task -- auto-saves returned documents to DocumentStore
@pipeline_task
async def analyze_document(document: InputDocument) -> AnalysisDocument:
    return AnalysisDocument.create(
        name=f"analysis_{document.sha256[:12]}.json",
        content=AnalysisSummary(word_count=42, top_keywords=["ai", "pipeline"]),
        derived_from=(document.sha256,),
    )


# 4. Pipeline flow -- type contract is in the annotations
@pipeline_flow(estimated_minutes=5)
async def analysis_flow(
    run_id: str,
    documents: list[InputDocument],
    flow_options: FlowOptions,
) -> list[AnalysisDocument]:
    results: list[AnalysisDocument] = []
    for doc in documents:
        results.append(await analyze_document(doc))
    return results


@pipeline_flow(estimated_minutes=2)
async def report_flow(
    run_id: str,
    documents: list[AnalysisDocument],
    flow_options: FlowOptions,
) -> list[ReportDocument]:
    report = ReportDocument.create(
        name="report.md",
        content="# Report\n\nAnalysis complete.",
        derived_from=tuple(doc.sha256 for doc in documents),
    )
    return [report]


# 5. Deployment -- ties flows together with type chain validation
class MyResult(DeploymentResult):
    report_count: int = 0


class MyPipeline(PipelineDeployment[FlowOptions, MyResult]):
    flows: ClassVar = [analysis_flow, report_flow]

    @staticmethod
    def build_result(
        run_id: str,
        documents: list[Document],
        options: FlowOptions,
    ) -> MyResult:
        reports = [d for d in documents if isinstance(d, ReportDocument)]
        return MyResult(success=True, report_count=len(reports))


# 6. CLI initializer provides run ID and initial documents
def initialize(options: FlowOptions) -> tuple[str, list[Document]]:
    docs: list[Document] = [
        InputDocument.create_root(name="input.txt", content="Sample data", reason="CLI input"),
    ]
    return "my-project", docs


# Run from CLI (requires positional working_directory arg: python script.py ./output)
pipeline = MyPipeline()
pipeline.run_cli(initializer=initialize, trace_name="my-pipeline")
```

### Conversation (Multi-Turn LLM)

```python
from ai_pipeline_core.llm import Conversation, ModelOptions

# Create a conversation with model and optional context
conv = Conversation(model="gemini-3-pro")

# Add documents to cacheable context prefix (shared across forks)
conv = conv.with_context(doc1, doc2, doc3)

# Add a document to dynamic messages suffix (NOT cached, per-fork content)
conv = conv.with_document(my_document)

# Send a message (returns NEW immutable Conversation instance — always capture!)
conv = await conv.send("Analyze the document")
print(conv.content)  # Response text

# Structured output
conv = await conv.send_structured("Extract key points", response_format=KeyPoints)
print(conv.parsed)  # KeyPoints instance

# Multi-turn: each send() appends to conversation history
conv = await conv.send("Now summarize the key points")
print(conv.content)

# Access response properties
print(conv.reasoning_content)  # Thinking/reasoning text (if available)
print(conv.usage)              # Token usage with input/output counts
print(conv.cost)               # Estimated cost
print(conv.citations)          # Citation objects (for search models)
```

### Structured Output

```python
from pydantic import BaseModel
from ai_pipeline_core import Conversation

class Analysis(BaseModel):
    summary: str
    sentiment: float
    key_points: list[str]

# Generate structured output via Conversation
conv = Conversation(model="gemini-3-pro")
conv = await conv.send_structured(
    "Analyze this product review: ...",
    response_format=Analysis,
)

# Access parsed result with type safety
analysis = conv.parsed
print(f"Sentiment: {analysis.sentiment}")
for point in analysis.key_points:
    print(f"- {point}")
```

### Document Handling

```python
from ai_pipeline_core import Document

class MyDocument(Document):
    """Custom document type -- must subclass Document."""

# Create documents with automatic conversion
doc = MyDocument.create(
    name="data.json",
    content={"key": "value"},  # Automatically converted to JSON bytes
    derived_from=("https://api.example.com/data",),
)

# Parse back to original type
data = doc.parse(dict)  # Returns {"key": "value"}

# Document provenance tracking
source_doc = MyDocument.create_root(name="source.txt", content="original data", reason="user upload")
plan_doc = MyDocument.create(name="plan.txt", content="research plan", derived_from=(source_doc.sha256,))
derived = MyDocument.create(
    name="derived.json",
    content={"result": "processed"},
    derived_from=("https://api.example.com/data",),  # Content came from this URL
    triggered_by=(plan_doc.sha256,),  # Created because of this plan (causal, not content)
)

# Check provenance
for hash in derived.content_documents:
    print(f"Derived from document: {hash}")
for ref in derived.content_references:
    print(f"External source: {ref}")
```

### Typed Content (Document[T])

Declare a Pydantic BaseModel as the content schema for a Document subclass. Content is validated at creation time and accessible via `.parsed`:

```python
from pydantic import BaseModel
from ai_pipeline_core import Document

class ResearchDefinition(BaseModel, frozen=True):
    topic: str
    max_sources: int = 10

class ResearchPlanDocument(Document[ResearchDefinition]):
    """Plan document with typed content schema."""

# Content is validated against the schema at creation time
plan = ResearchPlanDocument.create(
    name="plan.json",
    content=ResearchDefinition(topic="AI safety"),
    derived_from=(input_doc.sha256,),
)

# Zero-boilerplate typed access (cached, returns ResearchDefinition)
definition = plan.parsed
print(definition.topic)       # "AI safety"
print(definition.max_sources)  # 10

# Wrong schema type is rejected at creation time
class WrongModel(BaseModel, frozen=True):
    x: int

plan = ResearchPlanDocument.create(
    name="plan.json",
    content=WrongModel(x=1),   # TypeError: Expected content of type ResearchDefinition
    derived_from=(input_doc.sha256,),
)

# Introspection
ResearchPlanDocument.get_content_type()  # ResearchDefinition
```

## Core Concepts

### Documents

Documents are immutable Pydantic models that wrap binary content with metadata. There is a single `Document` base class -- subclass it to define your document types:

```python
class MyDocument(Document):
    """All documents subclass Document directly."""

# Use create() for automatic conversion
doc = MyDocument.create(
    name="data.json",
    content={"key": "value"},  # Auto-converts to JSON
    derived_from=(source.sha256,),
)

# Access content
if doc.is_text:
    print(doc.text)

# Parse structured data
data = doc.as_json()  # or as_yaml()
model = doc.as_pydantic_model(MyModel)  # Requires model_type argument

# Convert between document types
other = doc.retype(OtherDocType, preserve_provenance=True)

# Content-addressed identity
print(doc.sha256)  # Full SHA256 hash (base32)
print(doc.id)      # Short 6-char identifier
```

**Typed content** — declare a content schema via generic parameter for automatic validation and typed access:

```python
class PlanDocument(Document[PlanModel]):
    """Content is validated against PlanModel at creation time."""

plan = PlanDocument.create(name="plan.json", content=PlanModel(...), derived_from=(...,))
plan.parsed  # → PlanModel (cached, typed)
PlanDocument.get_content_type()  # → PlanModel
```

**Document fields:**
- `name`: Filename (validated for security -- no path traversal)
- `description`: Optional human-readable description
- `content`: Raw bytes (auto-converted from str, dict, list, BaseModel via `create()`)
- `derived_from`: Content provenance — SHA256 hashes of source documents or URI-style references (must contain `://`). A SHA256 must not appear in both derived_from and triggered_by.
- `triggered_by`: Causal provenance — SHA256 hashes of documents that caused this document to be created without contributing to its content.
- `attachments`: Tuple of `Attachment` objects for multi-part content

Documents support:
- Automatic content serialization based on file extension: `.json` → JSON, `.yaml`/`.yml` → YAML, others → UTF-8 text. Structured data (dict, list, BaseModel) requires `.json` or `.yaml` extension.
- Optional typed content schema via `Document[ModelType]` with creation-time validation and `.parsed` access
- MIME type detection via `mime_type` cached property, with `is_text`/`is_image`/`is_pdf` helpers
- SHA256-based identity and deduplication
- Provenance tracking (`derived_from` for content sources, `triggered_by` for causal lineage)
- `FILES` enum for filename restrictions (definition-time validation)
- `retype(new_type, preserve_provenance=True|False)` for type conversion between document subclasses
- `derive(from_documents=..., name=..., content=...)` convenience method for creating documents from other documents (extracts SHA256 hashes automatically)
- Token count estimation via `approximate_tokens_count`

### Document Store

Documents are automatically persisted by `@pipeline_task` to a document store. The framework provides four backend implementations (ClickHouse, local filesystem, in-memory, dual), all managed internally — application code interacts only through the read-only `DocumentReader` protocol.

**Backend implementations** (internal, auto-selected by execution mode):
- **ClickHouse**: Production backend (selected when `CLICKHOUSE_HOST` is configured)
- **Local filesystem**: CLI/debug mode (browsable files on disk, collision-safe filenames)
- **In-memory**: Testing (zero I/O, used by `run_local()`)
- **Dual**: Wraps primary (ClickHouse) + secondary (local). Saves to both, reads from primary only

**Store selection depends on the execution mode:**
- `run_cli()`: Uses dual store (ClickHouse + local) when `CLICKHOUSE_HOST` is configured, local otherwise
- `run_local()`: Always uses in-memory store (no persistence)
- `as_prefect_flow()`: Auto-selects based on settings

**Public API — `DocumentReader`** (read-only protocol; `get_document_store()` returns the full `DocumentStore | None` for framework-internal use):
- `load(run_scope, document_types)` -- Load documents by type from a run scope
- `has_documents(run_scope, document_type, *, max_age=None)` -- Check if documents of a type exist
- `check_existing(sha256s)` -- Check which SHA256 hashes exist globally
- `find_by_source(source_values, document_type, *, max_age=None)` -- Find most recent document per source value
- `load_summaries(document_sha256s)` -- Load summaries by SHA256 (global)
- `load_by_sha256s(sha256s, document_type, run_scope=None)` -- Batch-load full documents by SHA256
- `load_nodes_by_sha256s(sha256s)` -- Batch-load lightweight `DocumentNode` metadata (global, cross-scope)
- `load_scope_metadata(run_scope)` -- Load `DocumentNode` metadata for all documents in a scope
- `get_flow_completion(run_scope, flow_name, *, max_age=None)` -- Check flow completion record (returns `FlowCompletion | None`)

Write operations (`save`, `save_batch`, `flush`, `shutdown`) are framework-internal — `@pipeline_task` handles persistence automatically.

**Document summaries:** When enabled, stores automatically generate LLM-powered summaries in the background after each new document is saved. Summaries are best-effort and stored as store-level metadata (not on the `Document` model). Configure via `DOC_SUMMARY_ENABLED` and `DOC_SUMMARY_MODEL` environment variables.

### LLM Integration

The primary interface is the **`Conversation`** class for multi-turn interactions.

#### Conversation (Recommended)

The `Conversation` class provides immutable, stateful conversation management:

```python
from ai_pipeline_core.llm import Conversation, ModelOptions

# Create with model and optional configuration
conv = Conversation(model="gemini-3-pro")

# Add documents to cacheable context prefix (shared across forks)
conv = conv.with_context(doc1, doc2, doc3)

# Add a document to dynamic messages suffix (NOT cached, per-fork content)
conv = conv.with_document(my_document)

# Configure model options
conv = conv.with_model_options(ModelOptions(
    system_prompt="You are a research analyst.",
    reasoning_effort="high",
))

# Send a message (returns NEW Conversation instance)
conv = await conv.send("Analyze the document")
print(conv.content)  # Response text

# Multi-turn: conversation history is preserved
conv = await conv.send("Now summarize the key points")
print(conv.content)

# Structured output
conv = await conv.send_structured("Extract entities", response_format=Entities)
print(conv.parsed)  # Entities instance

# Add multiple documents at once
conv = conv.with_documents([doc1, doc2, doc3])

# Inject prior assistant output (e.g., from another conversation)
conv = conv.with_assistant_message("Previous analysis result...")

# Warmup + fork pattern for parallel calls with shared cache
import asyncio
base = await conv.send("Acknowledge the context")  # Warmup
# Fork: create parallel conversations from the same base
results = await asyncio.gather(
    base.send("Analyze source 1"),
    base.send("Analyze source 2"),
    base.send("Analyze source 3"),
)

# Approximate token count for all context and messages
print(conv.approximate_tokens_count)
```

**`send_spec()`** — sends a `PromptSpec` to the LLM. Handles document placement, stop sequences, and auto-extraction of `<result>` tags. For structured specs (`PromptSpec[SomeModel]`), dispatches to `send_structured()` automatically.

**Content protection (automatic):** URLs, blockchain addresses, and high-entropy strings in context documents are automatically shortened to `prefix...suffix` forms to save tokens. Both `.content` and `.parsed` are eagerly restored after every `send()`/`send_structured()` call — no manual restoration needed. A fuzzy fallback handles LLM-mangled forms (dropped suffix, prefix/suffix truncated by 1-2 chars).

**`ModelOptions` key fields (all optional with sensible defaults):**
- `cache_ttl`: Context cache TTL (default `"300s"`, set `None` to disable)
- `system_prompt`: System-level instructions
- `reasoning_effort`: `"low" | "medium" | "high"` for models with explicit reasoning
- `search_context_size`: `"low" | "medium" | "high"` for search-enabled models
- `retries`: Retry attempts (default `3`)
- `retry_delay_seconds`: Delay between retries (default `20`)
- `timeout`: Max wait seconds (default `600`)
- `service_tier`: `"auto" | "default" | "flex" | "scale" | "priority"` (OpenAI only)
- `max_completion_tokens`: Max output tokens
- `temperature`: Generation randomness (usually omit -- use provider defaults)
- `stop`: Stop sequences (tuple of strings, used internally by `send_spec` for `</result>` tags)

**ModelName predefined values:** `"gemini-3-pro"`, `"gpt-5.1"`, `"gemini-3-flash"`, `"gpt-5-mini"`, `"grok-4.1-fast"`, `"gemini-3-flash-search"`, `"gpt-5-mini-search"`, `"grok-4.1-fast-search"`, `"sonar-pro-search"` (also accepts any string for custom models).

### Image Processing

Image processing for LLM vision models is available from the `llm._images` module:

```python
from ai_pipeline_core.llm._images import process_image, ImagePreset

# Process an image with model-specific presets
result = process_image(screenshot_bytes, preset=ImagePreset.GEMINI)
for part in result:
    print(part.label, len(part.data))
```

Available presets: `GEMINI` (3000px, 9M pixels), `CLAUDE` (1568px, 1.15M pixels), `GPT4V` (2048px, 4M pixels), `DEFAULT` (1000px, 1M pixels).

**Token cost:** A single image is estimated at **1080 tokens** for token counting purposes (actual usage depends on provider).

The `Conversation` class automatically splits oversized images when documents are added to context — you typically don't need to call `process_image` directly.

### Exceptions

The framework re-exports key exceptions at the top level for convenient catching:

```python
from ai_pipeline_core import PipelineCoreError, LLMError, DocumentValidationError, DocumentSizeError, DocumentNameError
```

- `PipelineCoreError` — Base for all framework exceptions
- `LLMError` — LLM generation failures (retries exhausted, timeouts, degeneration)
- `DocumentValidationError` — Document validation failures
- `DocumentSizeError` — Document exceeds size limits
- `DocumentNameError` — Invalid document name (path traversal, etc.)

Output degeneration (token repetition loops) is detected automatically and raises `LLMError` after retry exhaustion.

### Pipeline Decorators

#### `@pipeline_task`

Decorates async functions as traced Prefect tasks with automatic document persistence:

```python
from ai_pipeline_core import pipeline_task

@pipeline_task  # No parameters needed for most cases
async def process_chunk(document: InputDocument) -> OutputDocument:
    return OutputDocument.create(
        name="result.json",
        content={"processed": True},
        derived_from=(document.sha256,),
    )

@pipeline_task(retries=3, estimated_minutes=5)
async def expensive_task(data: str) -> OutputDocument:
    # Retries, tracing, and document auto-save handled automatically
    ...

```

Key parameters:
- `retries`: Retry attempts on failure (default `0` -- no retries unless specified)
- `estimated_minutes`: Duration estimate for progress tracking (default `1`, must be >= 1)
- `timeout_seconds`: Task execution timeout
- `trace_level`: `"always" | "debug" | "off"` (default `"always"`)
- `expected_cost`: Expected cost budget for cost tracking

Key features:
- Async-only enforcement (raises `TypeError` if not `async def`)
- Decoration-time return type validation (must return Document types or None)
- Decoration-time input type validation (all parameters must be annotated with serializable types)
- Laminar tracing (automatic)
- Document auto-save to DocumentStore (returned documents are extracted and persisted)
- Source validation (warns if referenced SHA256s don't exist in store)

**Input type validation** — all `@pipeline_task` parameters must be annotated with serializable types. Allowed: `str`, `int`, `float`, `bool`, `Path`, `UUID`, `None`, `Any`, Document subclasses, FlowOptions subclasses, frozen BaseModel, Enum, `NewType` wrappers of valid types, and `list`/`tuple`/`dict[str, ...]`/`Union` containers thereof. Non-serializable types (mutable models, plain classes) are rejected at decoration time.

#### `@pipeline_flow`

Decorates async flow functions with annotation-driven document type extraction. Always requires parentheses:

```python
from ai_pipeline_core import pipeline_flow, FlowOptions

@pipeline_flow(estimated_minutes=10, retries=2, timeout_seconds=1200)
async def my_flow(
    run_id: str,                     # Must be named 'run_id' or '_run_id'
    documents: list[InputDoc],       # Input types extracted from annotation
    flow_options: MyFlowOptions,     # Must be FlowOptions or subclass
) -> list[OutputDoc]:                # Output types extracted from annotation
    ...
```

The flow's `documents` parameter annotation determines input types, and the return annotation determines output types. The function must have exactly 3 parameters. The first must be named `run_id: str` (or `_run_id: str` when unused), followed by `documents: list[...]` and `flow_options: FlowOptions`. No separate config class needed -- the type contract is in the function signature.

**FlowOptions** is a base `BaseSettings` class for pipeline configuration. Subclass it to add flow-specific parameters:

```python
class ResearchOptions(FlowOptions):
    analysis_model: ModelName = "gemini-3-pro"
    verification_model: ModelName = "grok-4.1-fast"
    synthesis_model: ModelName = "gemini-3-pro"
    max_sources: int = 10
```

#### `PipelineDeployment`

Orchestrates multi-flow pipelines with resume, per-flow uploads, and event publishing:

```python
class MyPipeline(PipelineDeployment[MyOptions, MyResult]):
    flows: ClassVar = [flow_1, flow_2, flow_3]
    pubsub_service_type: ClassVar = "research"  # Enables Pub/Sub event publishing

    @staticmethod
    def build_result(
        run_id: str,
        documents: list[Document],
        options: MyOptions,
    ) -> MyResult:
        ...
```

**Execution modes:**

```python
pipeline = MyPipeline()

# CLI mode: parses sys.argv, requires positional working_directory argument
# Usage: python script.py ./output [--start N] [--end N] [--max-keywords 8]
pipeline.run_cli(initializer=init_fn, trace_name="my-pipeline")

# Local mode: in-memory store, returns result directly (synchronous)
result = pipeline.run_local(
    run_id="test",
    documents=input_docs,
    options=MyOptions(),
)

# Production: generates a Prefect flow for deployment
prefect_flow = pipeline.as_prefect_flow()
```

Features:
- **Per-flow resume**: Skips flows with a `FlowCompletion` record in the store (explicit completion tracking, not document-presence inference). Configurable `cache_ttl` (default 24h)
- **Type chain validation**: At class definition time, validates that at least one of each flow's declared input types is producible by preceding flows (union semantics)
- **Event publishing**: Lifecycle events (started, progress, heartbeat, completed, failed) via Pub/Sub. Enabled by setting `pubsub_service_type` ClassVar on the deployment class. Requires `PUBSUB_PROJECT_ID` and `PUBSUB_TOPIC_ID` env vars for infrastructure config
- **Concurrency limits**: Cross-run enforcement via Prefect global concurrency limits
- **CLI mode**: `--start N` / `--end N` for step control, `DualDocumentStore` when ClickHouse is configured

#### Concurrency Limits

Declare cross-run concurrency and rate limits on `PipelineDeployment` to prevent exceeding external API quotas across all concurrent pipeline runs:

```python
from ai_pipeline_core import LimitKind, PipelineLimit, PipelineDeployment, pipeline_concurrency

class MyPipeline(PipelineDeployment[MyOptions, MyResult]):
    flows = [my_flow]
    concurrency_limits = {
        "provider-a": PipelineLimit(500, LimitKind.CONCURRENT),       # max 500 simultaneous
        "provider-b": PipelineLimit(15, LimitKind.PER_MINUTE, timeout=300),  # 15/min token bucket
    }
    ...
```

Use `pipeline_concurrency()` at call sites to acquire slots:

```python
from ai_pipeline_core import pipeline_concurrency

async def fetch_data(url: str) -> Data:
    async with pipeline_concurrency("provider-a"):
        return await provider.fetch(url)
```

**Limit kinds:**
- `CONCURRENT` — Lease-based slots held during operation, released on exit
- `PER_MINUTE` — Token bucket with `limit/60` decay per second (allows bursting)
- `PER_HOUR` — Token bucket with `limit/3600` decay per second

**Behavior:**
- Limits are auto-created in Prefect server at pipeline start (idempotent upsert)
- Timeout raises `AcquireConcurrencySlotTimeoutError` (limit doing its job)
- When Prefect is unavailable, limits proceed unthrottled (logged as warning)
- Limit names are validated at class definition time (alphanumeric, dashes, underscores)

#### Parallel Execution

`safe_gather` and `safe_gather_indexed` run coroutines in parallel with fault tolerance:

```python
from ai_pipeline_core import safe_gather, safe_gather_indexed

# safe_gather: returns successes only, filters out failures
results = await safe_gather(
    process(doc1), process(doc2), process(doc3),
    label="processing",
)  # Returns list of successful results (order may shift)

# safe_gather_indexed: preserves positional correspondence (None for failures)
results = await safe_gather_indexed(
    process(doc1), process(doc2), process(doc3),
    label="processing",
)  # Returns [result1, None, result3] if doc2 failed
```

Both raise if all coroutines fail (configurable via `raise_if_all_fail=False`).

#### Deploying to Prefect Cloud

The framework includes a deploy script that builds a fully bundled deployment (project wheel + all dependency wheels), uploads to GCS, and creates a Prefect deployment. The worker installs fully offline with `--no-index` — no PyPI contact, no stale cache issues.

```bash
# From your project root (where pyproject.toml lives)
ai-pipeline-deploy

# Also available as module:
python -m ai_pipeline_core.deployment.deploy
```

**Requirements:**
- `uv` (dependency resolution) and `pip` (wheel download) on the deploy machine
- `PREFECT_API_URL`, `PREFECT_GCS_BUCKET` configured
- `uv` on the worker (for offline install)

#### Remote Deployment Client

`RemoteDeployment` is a typed client for calling a remote `PipelineDeployment` via Prefect. Name the client class identically to the server's deployment class so the auto-derived deployment name matches:

```python
from ai_pipeline_core import RemoteDeployment, DeploymentResult, FlowOptions, Document

class RemoteInputDocument(Document):
    """Mirror type -- class_name must match the remote pipeline's document type."""

class RemoteResult(DeploymentResult):
    """Result type matching the remote pipeline's result."""
    report_count: int = 0

class MyPipeline(RemoteDeployment[RemoteInputDocument, FlowOptions, RemoteResult]):
    """Client for the remote MyPipeline deployment."""

client = MyPipeline()
result = await client.run(
    run_id="test",
    documents=input_docs,
    options=FlowOptions(),
    on_progress=my_progress_callback,  # Optional: async (fraction, message) -> None
)
```

The client defines local Document subclasses ("mirror types") whose `class_name` must match the remote pipeline's document types exactly. `run_remote_deployment()` is also available as a lower-level function.

**Deterministic run_id**: `RemoteDeployment` derives a deterministic `run_id` from the caller's run_id + a combined fingerprint hash of all document SHA256s and serialized options (format: `{run_id}-{fingerprint[:8]}`). Same inputs always produce the same derived run_id, enabling worker-side flow resume while preventing `task_results` key collisions.

**ClickHouse fallback**: When the remote worker completes but `state.result()` fails (e.g., caller lacks GCS credentials for Prefect's result storage), the framework falls back to reading from the ClickHouse `task_results` table. This requires both the worker and caller to have `CLICKHOUSE_HOST` configured. The worker writes results to `task_results` automatically whenever ClickHouse is available, independent of Pub/Sub configuration.

**run_id validation**: All `run_id` values are validated at entry points — alphanumeric characters, underscores, and hyphens only, max 100 characters.

### Prompt Compiler

Type-safe prompt specifications that replace Jinja2 templates. Every piece of prompt content is a class or class attribute, validated at definition time (import time).

**Components** — define once, reuse across specs:

```python
from ai_pipeline_core import Role, Rule, OutputRule, Guide

class ResearchAnalyst(Role):
    """Analyst role for research pipelines."""
    text = "experienced research analyst with expertise in data synthesis"

class CiteEvidence(Rule):
    """Citation rule."""
    text = "Always cite specific evidence from the source documents.\nInclude document IDs when referencing."

class DontUseMarkdownTables(OutputRule):
    """Table formatting rule."""
    text = "Do not use markdown tables in the output."

class RiskFrameworkGuide(Guide):
    """Risk assessment framework guide."""
    template = "guides/risk_framework.md"  # Relative to module file, loaded at import time
```

**Specs** — typed prompt definitions with full validation:

```python
from ai_pipeline_core import PromptSpec, Document
from pydantic import Field

class SourceDocument(Document):
    """Source material for analysis."""

class AnalysisSpec(PromptSpec):
    """Analyze source documents for key findings."""
    role = ResearchAnalyst
    input_documents = (SourceDocument,)
    task = "Analyze the provided documents and identify key findings."
    rules = (CiteEvidence,)
    guides = (RiskFrameworkGuide,)
    output_structure = "## Key Findings\n## Evidence\n## Gaps"
    output_rules = (DontUseMarkdownTables,)

    # Dynamic fields — become template variables
    project_name: str = Field(description="Project name")
```

**Multi-line fields** — use `MultiLineField` for long or multiline content (e.g., review feedback, website content). All multi-line fields are combined into a single XML-tagged user message sent before the main prompt, not inlined in the Context section. Regular `Field()` values must be short, single-line strings (up to 500 chars) — longer or multiline values are auto-promoted to multi-line treatment with a warning:

```python
from ai_pipeline_core import PromptSpec, MultiLineField
from pydantic import Field

class ReviewSpec(PromptSpec):
    """Analyze a review."""
    role = ResearchAnalyst
    input_documents = (SourceDocument,)
    task = "Analyze the review and identify key themes."

    project_name: str = Field(description="Project name")          # Short, inline in prompt
    review: str = MultiLineField(description="Review text")        # Sent as <review>...</review> message
```

**Rendering and sending:**

```python
from ai_pipeline_core import Conversation, render_text, render_preview

# Create spec instance with dynamic field values
spec = AnalysisSpec(project_name="ACME")

# Render prompt text (for inspection/debugging)
prompt = render_text(spec, documents=[source_doc])

# Preview with placeholder values (for debugging)
preview = render_preview(AnalysisSpec)

# Send to LLM via Conversation
conv = await Conversation(model="gemini-3-flash").send_spec(spec, documents=[source_doc])
print(conv.content)  # <result> tags auto-extracted by send_spec()
```

**Structured output** — `output_structure` automatically enables `<result>` tag wrapping, sets a stop sequence at `</result>`, and auto-extracts the content in `Conversation.send_spec()`. `conv.content` returns clean text without tags. Structured output (`PromptSpec[SomeModel]`) uses `send_structured()` automatically.

**Follow-up specs** — use `follows=ParentSpec` to declare follow-up specs. Follow-up specs inherit context from the parent conversation and don't require `role` or `input_documents`.

**CLI tool** for discovery, inspection, rendering, and compilation:

```bash
# Inspect a spec's anatomy (role, docs, fields, rules, output config, token estimate)
ai-prompt-compiler inspect AnalysisSpec

# Render a prompt preview
ai-prompt-compiler render AnalysisSpec

# Discover, list, and compile all specs to .prompts/ directory as markdown files
ai-prompt-compiler compile

# Explicit module:class reference
ai-prompt-compiler render my_package.specs:AnalysisSpec

# Also available as module:
python -m ai_pipeline_core.prompt_compiler inspect AnalysisSpec
```

### Replay

Every LLM conversation, pipeline task, and pipeline flow is automatically captured as a replay YAML file alongside traces. Replay files contain everything needed to re-execute the call — documents are referenced by SHA256 hash and resolved from the local store at replay time.

**Replay files in trace output:**

```
.trace/001_my_flow/002_extract_insights/
    span.yaml              # Timing, tokens, cost (unchanged)
    input.yaml             # Trimmed browsable view (unchanged)
    output.yaml            # Response content (unchanged)
    task.yaml              # NEW — replay payload with $doc_ref references
    003_insight_extraction/
        conversation.yaml  # NEW — LLM conversation replay
```

**Inspect a replay file:**

```bash
ai-replay show .trace/.../conversation.yaml
# Type: ConversationReplay
# Model: gemini-3-flash
# Options: {'reasoning_effort': 'low'}
# Response format: my_app.models:DocumentInsight
# Context docs: 1
# Prompt: Extract structured insights from the analysis.
```

**Re-execute with the same parameters:**

```bash
ai-replay run .trace/.../conversation.yaml --import my_app.tasks
```

**Override fields before execution:**

```bash
# Switch model
ai-replay run .trace/.../conversation.yaml --import my_app --set model=grok-4.1-fast

# Change prompt
ai-replay run .trace/.../conversation.yaml --import my_app --set prompt="Summarize in 3 bullet points"
```

**Replay task and flow payloads:**

```bash
ai-replay run .trace/.../task.yaml --import my_app
ai-replay run .trace/.../flow.yaml --import my_app
```

The `--import` flag is required when the original script was run as `__main__` — it imports the module so Document subclasses and functions are registered, and automatically remaps `__main__:X` references to the correct module path.

**Output directory and tracing:**

By default, replay writes results to `{replay_file_stem}_replay/` next to the replay file. The output directory contains:

```
conversation_replay/
    output.yaml     # Execution result (content, usage, cost, timestamp)
    .trace/         # Full trace from replay execution
        summary.md
        llm_calls.yaml
        costs.md
        001_root_span/
            conversation.yaml  # Replayable again!
```

Override with `--output-dir` or skip tracing with `--no-trace`:

```bash
# Custom output directory
ai-replay run conversation.yaml --import my_app --output-dir ./my_output

# Skip tracing (only save output.yaml)
ai-replay run conversation.yaml --import my_app --no-trace
```

**Programmatic replay:**

```python
from ai_pipeline_core.replay import ConversationReplay

# Load from YAML
replay = ConversationReplay.from_yaml(yaml_text)

# Modify fields
replay = replay.model_copy(update={"model": "grok-4.1-fast"})

# Execute
result = await replay.execute(store_base=Path("./output"))
print(result.content)
```

**Three payload types:**
- `ConversationReplay` — captures `Conversation.send()` / `send_structured()` with model, prompt, context docs, multi-turn history, and response_format
- `TaskReplay` — captures `@pipeline_task` calls with function path and all arguments (Documents as `$doc_ref` references)
- `FlowReplay` — captures `@pipeline_flow` calls with function path, run_id, documents, and flow_options

### Local Trace Debugging

When running via `run_cli()`, trace spans are automatically saved to `<working_dir>/.trace/` for
LLM-assisted debugging. Disable with `--no-trace`.

The directory structure mirrors the execution flow. Each new run overwrites the previous trace:

```
.trace/
  summary.md              # Execution tree, timing, LLM stats
  llm_calls.yaml          # All LLM calls with model, tokens, cost
  errors.yaml             # Failed spans with parent chain (only if errors exist)
  costs.md                # Cost aggregation by task
  001_my_flow/            # Root span (3-digit prefix, max 20 char name)
      span.yaml           # Span metadata (timing, status, type, I/O refs)
      input.yaml          # Span input (block scalar YAML for multiline)
      output.yaml         # Span output
      events.yaml         # OTel span events (log records)
      flow.yaml           # Replay payload (for @pipeline_flow spans)
      002_task_1/
          span.yaml
          input.yaml
          output.yaml
          task.yaml        # Replay payload (for @pipeline_task spans)
          003_analyze/
              span.yaml
              input.yaml
              output.yaml
              conversation.yaml  # Replay payload (for LLM conversation spans)
```

Duplicate LLM spans are filtered by default (every `Conversation` call creates both a DEFAULT and an inner LLM span). Use `verbose=True` in `TraceDebugConfig` to include all spans.

### Remote Trace Download

When ClickHouse tracking is enabled, all span data (input/output, replay payloads, attributes, events, timing, LLM metrics) is stored in the `pipeline_spans` table with ZSTD-compressed content columns. This enables downloading traces from remote/cloud pipeline runs and rebuilding the `.trace/` directory locally for debugging and replay via `ai-trace download`.

The reconstructed directory has the same structure as local `.trace/` output — `summary.md`, `llm_calls.yaml`, per-span directories with `span.yaml`, `input.yaml`, `output.yaml`, replay files, and `events.yaml`.

### `ai-trace` CLI

The `ai-trace` command-line tool provides access to pipeline traces stored in ClickHouse.

```bash
# List recent pipeline runs
ai-trace list --limit 10 --status completed

# Show trace summary without downloading
ai-trace show 550e8400-e29b-41d4-a716-446655440000

# Download trace and rebuild .trace/ directory
ai-trace download 550e8400-e29b-41d4-a716-446655440000 -o ./debug/
```

Connection defaults to `CLICKHOUSE_*` environment variables. Override with `--host`, `--port`, `--database`, `--user`, `--password`, `--no-secure`.

## Configuration

### Environment Variables

```bash
# LLM Configuration (via LiteLLM proxy)
OPENAI_BASE_URL=http://localhost:4000
OPENAI_API_KEY=your-api-key

# Optional: Observability
LMNR_PROJECT_API_KEY=your-lmnr-key
LMNR_DEBUG=true  # Enable debug traces

# Optional: Orchestration
PREFECT_API_URL=http://localhost:4200/api
PREFECT_API_KEY=your-prefect-key
PREFECT_API_AUTH_STRING=your-auth-string
PREFECT_WORK_POOL_NAME=default
PREFECT_WORK_QUEUE_NAME=default
PREFECT_GCS_BUCKET=your-gcs-bucket

# Optional: GCS (for remote storage)
GCS_SERVICE_ACCOUNT_FILE=/path/to/service-account.json

# Optional: Document Store & Tracking (ClickHouse -- omit for local filesystem store)
CLICKHOUSE_HOST=your-clickhouse-host
CLICKHOUSE_PORT=8443
CLICKHOUSE_DATABASE=default
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=your-password
CLICKHOUSE_SECURE=true
CLICKHOUSE_CONNECT_TIMEOUT=10
CLICKHOUSE_SEND_RECEIVE_TIMEOUT=30
TRACKING_ENABLED=true

# Optional: Document Summaries (store-level, LLM-generated)
DOC_SUMMARY_ENABLED=true
DOC_SUMMARY_MODEL=gemini-3-flash

# Optional: Pub/Sub event delivery (deployment progress/status)
# Requires pubsub_service_type ClassVar on the PipelineDeployment subclass
PUBSUB_PROJECT_ID=your-gcp-project
PUBSUB_TOPIC_ID=pipeline-events
```

### Settings Management

Create custom settings by inheriting from the base Settings class:

```python
from ai_pipeline_core import Settings

class ProjectSettings(Settings):
    """Project-specific configuration."""
    app_name: str = "my-app"
    max_retries: int = 3

# Create singleton instance
settings = ProjectSettings()

# Access configuration (all env vars above are available)
print(settings.openai_base_url)
print(settings.app_name)
```

## Best Practices

### Framework Rules

1. **Decorators**: Use `@pipeline_task` without parameters for most cases, `@pipeline_flow(estimated_minutes=N)` with annotations (always requires parentheses)
2. **Logging**: Use `get_pipeline_logger(__name__)` -- never `print()` or `logging` module directly
3. **LLM calls**: Use `Conversation` for all LLM interactions (multi-turn and single-shot)
4. **Options**: Omit `ModelOptions` unless specifically needed (defaults are production-optimized)
5. **Documents**: Use `create_root()` for pipeline inputs (no provenance), `create()` or `derive()` for derived documents. Always subclass `Document`
6. **Flow annotations**: Input/output types are in the function signature -- `list[InputDoc]` and `-> list[OutputDoc]`
7. **Initialization**: Logger at module scope, not in functions
8. **Document lists**: Use plain `list[Document]` -- no wrapper class needed

### Import Convention

Always import from the top-level package when possible:

```python
# Top-level imports (preferred)
from ai_pipeline_core import Document, pipeline_flow, pipeline_task, Conversation

# Sub-package imports for symbols not at top level
from ai_pipeline_core.deployment import CompletedRun, DeploymentResultData
from ai_pipeline_core.llm import ModelOptions
```

## Development

### Running Tests

```bash
make test              # Run all tests
make test-cov          # Run with coverage report
make test-clickhouse   # ClickHouse integration tests (requires Docker)
make test-pubsub       # Pub/Sub emulator integration tests (requires Docker)
make test-pubsub-live  # Pub/Sub tests against real GCP (requires PUBSUB_PROJECT_ID, PUBSUB_TOPIC_ID)
```

### Code Quality

```bash
make check             # Run ALL checks (lint, typecheck, deadcode, semgrep, docstrings-cover, filesize, check-claude-md, docs-ai-check, tests)
make lint              # Ruff linting (28 rule sets)
make format            # Auto-format and auto-fix code with ruff
make typecheck         # Type checking with basedpyright (strict mode)
make deadcode          # Dead code detection with vulture
make semgrep           # Project-specific AST pattern checks (.semgrep/ rules)
make docstrings-cover  # Docstring coverage (100% required)
```

**Static analysis tools:**
- **Ruff** — 28 rule sets including bugbear, security (bandit), complexity, async enforcement, exception patterns
- **Basedpyright** — strict mode with `reportUnusedCoroutine`, `reportUnreachable`, `reportImplicitStringConcatenation`
- **Vulture** — dead code detection with framework-aware whitelist
- **Semgrep** — custom rules in `.semgrep/` for frozen model mutable fields, async enforcement, docstring quality, architecture constraints
- **Interrogate** — 100% docstring coverage enforcement

### AI Documentation

The `.ai-docs/` directory contains auto-generated API guides designed to be fed directly to AI coding agents as context. Each module produces one self-contained guide — an AI agent should be able to correctly use any module's public API by reading only its guide.

Guides include full source signatures, constraint rules extracted from docstrings, usage examples extracted from tests, and internal types that appear in public API signatures. CI enforces that guides stay fresh with the source code.

```bash
make docs-ai-build  # Generate .ai-docs/ from source code
make docs-ai-check  # Validate .ai-docs/ freshness and completeness
```

When building applications on this framework, include the relevant `.ai-docs/*.md` guides in your AI agent's context window.

## Examples

The `examples/` directory contains:

- **`showcase.py`** -- Full 3-stage pipeline: Conversation API, multi-turn LLM analysis, structured extraction, PipelineDeployment with CLI, resume/skip, progress tracking, image processing
- **`showcase_document_store.py`** -- Document store usage: pipeline tasks with auto-save, flow execution via `run_local()`, document provenance tracking
- **`showcase_prompt_compiler.py`** -- Prompt compiler features: Role, Rule, OutputRule, Guide, PromptSpec, rendering, output_structure, follow-up specs, definition-time validation

Run examples:
```bash
# Full pipeline showcase (requires OPENAI_BASE_URL and OPENAI_API_KEY)
python examples/showcase.py ./output

# Document store showcase (no arguments needed)
python examples/showcase_document_store.py

# Prompt compiler showcase (no arguments needed)
python examples/showcase_prompt_compiler.py
```

## Project Structure

```
ai-pipeline-core/
|-- ai_pipeline_core/
|   |-- _llm_core/        # Internal LLM client, model types, and response handling
|   |-- deployment/        # Pipeline deployment, deploy script, CLI bootstrap, progress, remote
|   |-- docs_generator/    # AI-focused documentation generator
|   |-- document_store/    # Store protocol and backends (ClickHouse, local, memory)
|   |-- documents/         # Document system (Document base class, attachments, context)
|   |-- llm/               # Conversation class, URLSubstitutor, image processing, validation
|   |-- logging/           # Logging infrastructure
|   |-- observability/     # Tracing, ClickHouse tracking, local debug traces, and ai-trace CLI
|   |-- pipeline/          # Pipeline decorators, FlowOptions, and concurrency limits
|   |-- prompt_compiler/   # Type-safe prompt specs, rendering, and CLI tool
|   |-- replay/            # Trace-based replay system (capture, serialize, resolve, execute)
|   |-- settings.py        # Configuration management (Pydantic BaseSettings)
|   +-- exceptions.py      # Framework exceptions (LLMError, DocumentNameError, etc.)
|-- .ai-docs/             # Auto-generated API guides for AI coding agents
|-- tests/                 # Comprehensive test suite
|-- examples/              # Usage examples
+-- pyproject.toml         # Project configuration
```

## Contributing

1. Fork the repository
2. Create a feature branch
3. Make changes — run `make check` (must pass all linting, type checking, semgrep, and tests)
4. Open a Pull Request

Note: This is an internal-first framework. External contributions are welcome but the architecture and infrastructure choices (Prefect, ClickHouse) are fixed.

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## Support

- **Issues**: [GitHub Issues](https://github.com/bbarwik/ai-pipeline-core/issues)
- **Discussions**: [GitHub Discussions](https://github.com/bbarwik/ai-pipeline-core/discussions)

## Acknowledgments

- Built on [Prefect](https://www.prefect.io/) for workflow orchestration
- Uses [LiteLLM](https://github.com/BerriAI/litellm) for LLM provider abstraction (also compatible with [OpenRouter](https://openrouter.ai/))
- Integrates [Laminar (LMNR)](https://www.lmnr.ai/) for observability
- Type checking with [Pydantic](https://pydantic.dev/) and [basedpyright](https://github.com/DetachHead/basedpyright)
