Metadata-Version: 2.4
Name: aperion-flow
Version: 1.2.1
Summary: The Conductor - Pipeline & Workflow Engine for Aperion Ecosystem
Project-URL: Homepage, https://github.com/invictustitan2/aperion-flow
Project-URL: Documentation, https://github.com/invictustitan2/aperion-flow#readme
Project-URL: Repository, https://github.com/invictustitan2/aperion-flow
Project-URL: Bug Tracker, https://github.com/invictustitan2/aperion-flow/issues
Author: Aperion Team
License: MIT
License-File: LICENSE
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: fastapi>=0.100.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: uvicorn>=0.30.0
Provides-Extra: dev
Requires-Dist: httpx>=0.27.0; extra == 'dev'
Requires-Dist: mypy>=1.10.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23.0; extra == 'dev'
Requires-Dist: pytest-cov>=4.0.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.4.0; extra == 'dev'
Description-Content-Type: text/markdown

# Aperion Flow - The Conductor

> Pipeline & Workflow Engine for the Aperion Ecosystem

**Identity:** The Conductor orchestrates the symphony of agents with stateful execution and crash recovery.

## Overview

Aperion Flow is a standalone workflow engine extracted from `aperion-legendary-ai`. It provides:

- **DAG-based Pipeline Execution** - Define workflows with dependencies
- **Crash Recovery** - Checkpoint-based state persistence
- **LLM Assistance Integration** - Pre-validate, analyze, diagnose with AI
- **Circuit Breaker Protection** - Prevent cascade failures
- **Event Emission** - Real-time progress tracking via The Nervous System

## Installation

```bash
cd aperion-flow
pip install -e ".[dev]"
```

## Quick Start

### Define a Pipeline

```python
from aperion_flow import Pipeline, PipelineStage, LLMAssistanceMode, LLMAssistanceConfig

# Create stages
stages = [
    PipelineStage(
        id="fetch",
        name="Fetch Data",
        handler="myapp.handlers.fetch_data",
        timeout=30.0,
    ),
    PipelineStage(
        id="process",
        name="Process Data",
        handler="myapp.handlers.process_data",
        depends_on=["fetch"],
        llm_assistance=[
            LLMAssistanceConfig(mode=LLMAssistanceMode.ERROR_DIAGNOSIS)
        ],
    ),
    PipelineStage(
        id="store",
        name="Store Results",
        handler="myapp.handlers.store_results",
        depends_on=["process"],
    ),
]

# Create pipeline
pipeline = Pipeline(
    name="data_pipeline",
    description="ETL pipeline with LLM assistance",
    stages=stages,
    max_parallel=2,
    fail_fast=True,
)
```

### Using the Builder

```python
from aperion_flow import PipelineBuilder, LLMAssistanceMode

pipeline = (
    PipelineBuilder("my_pipeline", "Description")
    .add_stage("fetch", "myapp.fetch", timeout=60.0)
    .add_stage("process", "myapp.process", depends_on=["fetch"], retry_count=3)
    .add_stage("analyze", "myapp.analyze", depends_on=["process"],
               llm_modes=[LLMAssistanceMode.ANALYZE_OUTPUT])
    .with_max_parallel(4)
    .with_fail_fast(True)
    .build()
)
```

### Execute a Pipeline

```python
import asyncio
from aperion_flow import PipelineExecutor

async def main():
    executor = PipelineExecutor()

    # Register handlers (or use fully qualified module paths)
    executor.register_handler("fetch", fetch_handler)
    executor.register_handler("process", process_handler)

    result = await executor.execute(
        pipeline,
        initial_data={"input": "value"},
        correlation_id="req-123",
    )

    print(f"Status: {result.status}")
    print(f"Completed: {result.stages_completed}/{result.stage_count}")
    print(f"Output: {result.data}")

asyncio.run(main())
```

### Crash Recovery

```python
# Resume from checkpoint
result = await executor.execute(
    pipeline,
    resume_from="execution-id-from-previous-run",
)
```

## Architecture

```
aperion-flow/
├── src/aperion_flow/
│   ├── definitions/
│   │   ├── pipeline.py    # Pipeline, Stage, LLMConfig models
│   │   └── registry.py    # Strategic pipeline catalog
│   ├── engine/
│   │   ├── context.py     # PipelineContext, Checkpoint, State
│   │   ├── executor.py    # Async DAG runner
│   │   └── recovery.py    # Retry, backoff, circuit breaker
│   └── api/
│       └── routes.py      # FastAPI endpoints
└── tests/
```

## Key Components

### PipelineContext

The "State Bag" that flows through execution:

```python
from aperion_flow import PipelineContext

ctx = PipelineContext(
    pipeline_id="p1",
    pipeline_name="my_pipeline",
)

# Store data between stages
ctx.set("key", "value")
ctx.update({"batch": [1, 2, 3]})

# Track stage results
ctx.record_stage_start("stage_id", "stage_name")
ctx.record_stage_complete("stage_id", {"output": "data"})
```

### Recovery Engine

Resilient execution with multiple strategies:

```python
from aperion_flow import RecoveryConfig, RecoveryEngine, RecoveryStrategy

config = RecoveryConfig(
    strategy=RecoveryStrategy.EXPONENTIAL_BACKOFF,
    max_retries=5,
    base_delay=1.0,
    max_delay=60.0,
    failure_threshold=10,  # Circuit breaker
)

engine = RecoveryEngine(config)
result = await engine.execute_with_recovery(my_handler, circuit_key="api_calls")
```

### Strategic Registry

Pre-configured pipelines with intelligent defaults:

```python
from aperion_flow.definitions.registry import get_registry, PipelineCategory

registry = get_registry()

# Get LLM config with smart defaults
llm_config = registry.get_llm_config(
    "pytest_execution",
    LLMAssistanceMode.ERROR_DIAGNOSIS,
)

# List pipelines by category
testing_pipelines = registry.get_by_category(PipelineCategory.TESTING)
```

## API Server

Run the Flow API server:

```bash
python -m aperion_flow.api.routes
# or
uvicorn aperion_flow.api.routes:create_app --factory --port 8001
```

### Endpoints

| Method | Path | Description |
|--------|------|-------------|
| POST | `/flow/start` | Start a pipeline execution |
| POST | `/flow/resume` | Resume from checkpoint |
| GET | `/flow/{id}` | Get execution status |
| GET | `/flow/{id}/data` | Get execution output data |
| GET | `/flow/` | List executions |
| DELETE | `/flow/{id}` | Delete execution record |
| GET | `/healthz` | Health check |

## Integration Points

Aperion Flow integrates with other ecosystem components:

- **The Nervous System (Event Bus)** - Emits `flow.started`, `flow.stage.completed`, `flow.failed` events
- **The Switchboard (LLM Router)** - Provides LLM assistance for validation and diagnosis
- **The Cortex (State Gateway)** - Persists checkpoints and execution history
- **The Gatekeeper (Auth)** - Validates access to pipeline execution

### Event Emitter Protocol

```python
from typing import Protocol

class EventEmitter(Protocol):
    async def emit(
        self,
        event_type: str,
        payload: dict,
        *,
        source: str | None = None,
        correlation_id: str | None = None,
    ) -> str:
        ...
```

### LLM Assistant Protocol

```python
from typing import Protocol

class LLMAssistant(Protocol):
    async def pre_validate(self, stage, context, timeout) -> dict:
        ...
    async def analyze_output(self, stage, output, context, timeout) -> dict:
        ...
    async def diagnose_error(self, stage, error, context, timeout) -> dict:
        ...
```

## Migration from aperion-legendary-ai

### Source Files Migrated

| Original | New Location |
|----------|--------------|
| `stack/aperion/pipelines/executor.py` | `src/flow/engine/executor.py` |
| `stack/aperion/pipelines/pipeline.py` | `src/flow/definitions/pipeline.py` |
| `stack/aperion/pipelines/recovery.py` | `src/flow/engine/recovery.py` |
| `stack/aperion/pipelines/strategic_registry.py` | `src/flow/definitions/registry.py` |

### Key Changes

1. **Async-first** - Replaced ThreadPoolExecutor with asyncio
2. **Stateless Executor** - All state in PipelineContext with checkpoints
3. **Pydantic v2** - Immutable models with `frozen=True`
4. **Protocol-based Integration** - Pluggable EventEmitter, LLMAssistant, CheckpointStore
5. **No Internal Dependencies** - Removed `@with_doc_context` and internal service coupling

## Testing

```bash
# Run all tests
pytest

# Run with coverage
pytest --cov=aperion_flow --cov-report=html

# Run specific test file
pytest tests/unit/test_executor.py -v
```

## Security Considerations

- **Secrets** - Never store secrets in PipelineContext; use `SecretRef` patterns
- **Timeouts** - All stages have configurable timeouts (default 30s)
- **Circuit Breakers** - Prevent cascade failures from external services
- **Audit Trail** - Checkpoints provide execution history

## Constitution Alignment

| Constitution | Implementation |
|--------------|----------------|
| A4 - Agents Analyze, Not Execute | Executor controls flow; handlers execute |
| A5 - Latency SLOs | Stage timeouts, parallel execution |
| B1 - Secrets via Env | SecretRef pattern, no secrets in context |
| D3 - Audit Logging | Checkpoint history, event emission |

## License

MIT
