Metadata-Version: 2.4
Name: dory-sdk
Version: 2.1.5
Summary: Python SDK for building stateful processors with zero-downtime migration, auto-initialization, and smart instrumentation
Author-email: Dory Team <dory@example.com>
License: Apache-2.0
Project-URL: Homepage, https://github.com/example/dory-sdk
Project-URL: Documentation, https://dory-sdk.readthedocs.io
Project-URL: Repository, https://github.com/example/dory-sdk
Project-URL: Issues, https://github.com/example/dory-sdk/issues
Keywords: kubernetes,stateful,migration,orchestration,sdk
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Classifier: Typing :: Typed
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: aiohttp>=3.8.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: PyYAML>=6.0
Provides-Extra: kubernetes
Requires-Dist: kubernetes>=28.0.0; extra == "kubernetes"
Provides-Extra: s3
Requires-Dist: boto3>=1.28.0; extra == "s3"
Provides-Extra: tracing
Requires-Dist: opentelemetry-api>=1.20.0; extra == "tracing"
Requires-Dist: opentelemetry-sdk>=1.20.0; extra == "tracing"
Requires-Dist: opentelemetry-exporter-otlp-proto-grpc>=1.20.0; extra == "tracing"
Provides-Extra: resilience
Provides-Extra: monitoring
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Requires-Dist: ruff>=0.1.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Provides-Extra: production
Requires-Dist: kubernetes>=28.0.0; extra == "production"
Requires-Dist: opentelemetry-api>=1.20.0; extra == "production"
Requires-Dist: opentelemetry-sdk>=1.20.0; extra == "production"
Requires-Dist: opentelemetry-exporter-otlp-proto-grpc>=1.20.0; extra == "production"
Provides-Extra: all
Requires-Dist: kubernetes>=28.0.0; extra == "all"
Requires-Dist: boto3>=1.28.0; extra == "all"
Requires-Dist: opentelemetry-api>=1.20.0; extra == "all"
Requires-Dist: opentelemetry-sdk>=1.20.0; extra == "all"
Requires-Dist: opentelemetry-exporter-otlp-proto-grpc>=1.20.0; extra == "all"
Requires-Dist: pytest>=7.0.0; extra == "all"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "all"
Requires-Dist: pytest-cov>=4.0.0; extra == "all"
Requires-Dist: mypy>=1.0.0; extra == "all"
Requires-Dist: ruff>=0.1.0; extra == "all"
Requires-Dist: black>=23.0.0; extra == "all"

# Dory SDK

A production-ready Python SDK for building **stateful, fault-tolerant processors** on Kubernetes with zero-downtime migration, automatic state persistence, and comprehensive observability.

[![PyPI version](https://badge.fury.io/py/dory-sdk.svg)](https://pypi.org/project/dory-sdk/)
[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

## Why Dory SDK?

| Challenge | Without Dory | With Dory SDK |
|-----------|--------------|---------------|
| Pod termination | State lost, start from scratch | State auto-saved to ConfigMap, restored on new pod |
| Node drain/maintenance | Downtime, manual intervention | Zero-downtime migration with state transfer |
| Transient failures | DIY retry logic | Built-in retry with exponential backoff |
| Cascading failures | Service degradation | Circuit breakers protect dependencies |
| Debugging distributed systems | Scattered logs | OpenTelemetry tracing across services |
| Health monitoring | Custom implementation | Built-in `/healthz`, `/ready`, `/metrics` |

## Features

### Core
- **`@stateful` decorator** - Automatic state persistence and restoration
- **`DoryApp`** - Application lifecycle management
- **`BaseProcessor`** - Base class with built-in hooks
- **`ExecutionContext`** - Pod metadata, logging, shutdown detection

### Resilience
- **Circuit Breaker** - Prevent cascading failures with configurable thresholds
- **Retry with Backoff** - Exponential backoff with jitter and retry budgets
- **Error Classification** - Intelligent error categorization (transient, permanent, resource)

### Observability
- **OpenTelemetry** - Distributed tracing with automatic span creation
- **Prometheus Metrics** - Built-in `/metrics` endpoint
- **Structured Logging** - JSON logs with request context

### State Management
- **ConfigMap Backend** - Persist state in Kubernetes ConfigMaps
- **S3 Backend** - Store large state in S3 (with offline buffering)
- **Local Backend** - File-based storage for development
- **State Versioning** - Forward/backward compatible state formats

### Edge Support
- **Fencing Manager** - Split-brain prevention for edge deployments
- **Heartbeat Manager** - Connectivity monitoring
- **Role Manager** - Primary/standby failover

## Installation

```bash
pip install dory-sdk
```

With optional dependencies:
```bash
pip install dory-sdk[opentelemetry]  # OpenTelemetry support
pip install dory-sdk[s3]             # S3 state backend
pip install dory-sdk[all]            # All optional dependencies
```

## Quick Start

### Minimal Example (7 lines)

```python
from dory import DoryApp, BaseProcessor, stateful

class MyApp(BaseProcessor):
    counter = stateful(0)  # Automatically saved and restored

    async def run(self):
        async for _ in self.run_loop(interval=1):
            self.counter += 1
            print(f"Count: {self.counter}")

if __name__ == "__main__":
    DoryApp().run(MyApp)
```

### With Resilience Features

```python
from dory import DoryApp, BaseProcessor, stateful
from dory.resilience import CircuitBreaker, retry_with_backoff
from dory.monitoring import create_span

class MyApp(BaseProcessor):
    counter = stateful(0)
    db_breaker = CircuitBreaker(name="database", failure_threshold=5)

    @retry_with_backoff(max_attempts=3)
    async def fetch_data(self):
        with create_span("fetch_data"):
            return await self.db_breaker.call(self.database.query)

    async def run(self):
        async for _ in self.run_loop(interval=1):
            try:
                data = await self.fetch_data()
                self.counter += 1
            except Exception as e:
                self.context.logger().error(f"Failed: {e}")

if __name__ == "__main__":
    DoryApp().run(MyApp)
```

### Function-Based API

```python
from dory.simple import processor, state

counter = state(0)

@processor
async def main(ctx):
    async for _ in ctx.run_loop(interval=1):
        counter.value += 1
        ctx.logger().info(f"Count: {counter.value}")
```

## Configuration

### dory.yaml

```yaml
# Lifecycle
startup_timeout_sec: 30
shutdown_timeout_sec: 30

# Health Server
health_port: 8080

# State Backend: local, configmap, s3
state_backend: configmap

# Logging
log_level: INFO
log_format: json

# Metrics
metrics_enabled: true

# Retry Configuration
retry:
  max_attempts: 3
  initial_delay: 0.1
  multiplier: 2.0
  max_delay: 30.0
  jitter: true

# Circuit Breaker
circuit_breaker:
  failure_threshold: 5
  success_threshold: 2
  timeout: 30.0

# OpenTelemetry
opentelemetry:
  enabled: true
  service_name: my-service
  otlp:
    endpoint: "http://jaeger:4317"
    console_export: false
```

### Environment Variables

| Variable | Default | Description |
|----------|---------|-------------|
| `DORY_HEALTH_PORT` | 8080 | Health server port |
| `DORY_STATE_BACKEND` | configmap | State storage (local/configmap/s3) |
| `DORY_LOG_LEVEL` | INFO | Log level |
| `DORY_LOG_FORMAT` | json | Log format (json/text) |
| `DORY_STARTUP_TIMEOUT_SEC` | 30 | Startup timeout |
| `DORY_SHUTDOWN_TIMEOUT_SEC` | 30 | Shutdown timeout |

## API Reference

### BaseProcessor

```python
class MyApp(BaseProcessor):
    # Stateful fields (auto-saved/restored)
    counter = stateful(0)
    data = stateful(dict)

    async def startup(self):
        """Called once on startup (optional)"""
        pass

    async def run(self):
        """Main processing loop (required)"""
        async for i in self.run_loop(interval=1):
            self.counter += 1

    async def shutdown(self):
        """Called on graceful shutdown (optional)"""
        pass

    # Fault handling hooks (optional)
    async def on_state_restore_failed(self, error: Exception):
        """Called when state restoration fails"""
        pass

    async def on_rapid_restart_detected(self, restart_count: int):
        """Called when rapid restart loop detected"""
        pass

    def reset_caches(self):
        """Called on golden image reset"""
        pass
```

### Circuit Breaker

```python
from dory.resilience import CircuitBreaker, CircuitState

# Create circuit breaker
breaker = CircuitBreaker(
    name="database",
    failure_threshold=5,    # Open after 5 failures
    success_threshold=2,    # Close after 2 successes in half-open
    timeout=30.0            # Seconds before trying half-open
)

# Use with async call
result = await breaker.call(async_function, arg1, arg2)

# Check state
if breaker.state == CircuitState.OPEN:
    print("Circuit is open, requests will fail fast")

# Manual control
breaker.trip()   # Force open
breaker.reset()  # Force closed
```

### Retry with Backoff

```python
from dory.resilience import retry_with_backoff, RetryPolicy

# Decorator usage
@retry_with_backoff(max_attempts=3, initial_delay=0.1)
async def flaky_operation():
    return await api.call()

# With custom policy
policy = RetryPolicy(
    max_attempts=5,
    initial_delay=0.1,
    multiplier=2.0,
    max_delay=30.0,
    jitter=True
)

@retry_with_backoff(policy=policy)
async def custom_retry():
    pass
```

### Error Classification

```python
from dory.errors import ErrorClassifier, ErrorType

classifier = ErrorClassifier()

try:
    await operation()
except Exception as e:
    result = classifier.classify(e)

    if result.error_type == ErrorType.TRANSIENT:
        # Retry the operation
        await retry_operation()
    elif result.error_type == ErrorType.RESOURCE:
        # Back off and scale
        await asyncio.sleep(result.recommended_delay)
    elif result.error_type == ErrorType.PERMANENT:
        # Don't retry, log and alert
        logger.error(f"Permanent error: {e}")
```

### OpenTelemetry

```python
from dory.monitoring import create_span, add_span_attributes, trace_function

# Context manager
with create_span("database_query", {"table": "users"}):
    result = await db.query("SELECT * FROM users")

# Decorator
@trace_function("process_item")
async def process_item(item):
    add_span_attributes({"item_id": item.id})
    return await transform(item)
```

### ExecutionContext

```python
async def run(self):
    ctx = self.context

    # Logging
    ctx.logger().info("Processing started")

    # Pod metadata
    print(f"Pod: {ctx.pod_name}")
    print(f"Namespace: {ctx.pod_namespace}")
    print(f"Processor ID: {ctx.processor_id}")

    # Shutdown detection
    while not ctx.is_shutdown_requested():
        if ctx.is_migration_imminent():
            print("Migration coming, saving state...")
        await process()
```

## HTTP Endpoints

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/healthz` | GET | Liveness probe (is process alive?) |
| `/ready` | GET | Readiness probe (ready to serve?) |
| `/metrics` | GET | Prometheus metrics |
| `/state` | GET | Export current state |
| `/state` | POST | Import/restore state |
| `/prestop` | GET | PreStop hook handler |

## Kubernetes Deployment

### Required RBAC (for ConfigMap state backend)

```yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: my-app
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: my-app-state
rules:
- apiGroups: [""]
  resources: ["configmaps"]
  verbs: ["get", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: my-app-state
subjects:
- kind: ServiceAccount
  name: my-app
roleRef:
  kind: Role
  name: my-app-state
  apiGroup: rbac.authorization.k8s.io
```

### Deployment

```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-app
spec:
  replicas: 1
  selector:
    matchLabels:
      app: my-app
  template:
    metadata:
      labels:
        app: my-app
    spec:
      serviceAccountName: my-app
      terminationGracePeriodSeconds: 45
      containers:
      - name: my-app
        image: my-app:latest
        env:
        - name: DORY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: DORY_POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        ports:
        - containerPort: 8080
          name: health
        livenessProbe:
          httpGet:
            path: /healthz
            port: 8080
          initialDelaySeconds: 5
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
        lifecycle:
          preStop:
            httpGet:
              path: /prestop
              port: 8080
```

## CLI Tool

```bash
# Initialize new project
dory init my-app --image my-app:latest

# Generate Kubernetes manifests
dory generate rbac --name my-app
dory generate deployment --name my-app --image my-app:latest

# Validate configuration
dory validate
```

## State Migration Flow

### Pod Shutdown
```
1. Kubernetes sends SIGTERM / calls /prestop
2. SDK marks processor as not-ready
3. SDK saves state to ConfigMap
4. Your shutdown() is called
5. Pod terminates
```

### Pod Startup
```
1. New pod starts
2. SDK checks for existing state in ConfigMap
3. Your startup() is called
4. SDK restores state (calls restore_state or sets @stateful fields)
5. SDK marks processor as ready
6. Your run() starts
```

## Examples

| Example | Description |
|---------|-------------|
| [dory-info-logger-py](examples/dory-info-logger-py/) | Complete demo with all SDK features |
| [dory-cloud-processor-py](examples/dory-cloud-processor-py/) | Cloud deployment with circuit breakers, retry, OpenTelemetry |
| [dory-edge-processor-py](examples/dory-edge-processor-py/) | Edge deployment with fencing, heartbeat, failover |

## License

Apache 2.0
