Metadata-Version: 2.4
Name: deadpipe
Version: 0.1.2
Summary: Dead simple pipeline monitoring. Know when your pipelines die.
Project-URL: Homepage, https://deadpipe.com
Project-URL: Documentation, https://deadpipe.com/docs
Project-URL: Repository, https://github.com/deadpipe/deadpipe-python
Author-email: Deadpipe <hello@deadpipe.com>
Maintainer-email: Deadpipe <hello@deadpipe.com>
License-File: LICENSE
Keywords: dead-mans-switch,etl,heartbeat,monitoring,observability,pipelines
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: System :: Monitoring
Requires-Python: >=3.8
Provides-Extra: async
Requires-Dist: aiohttp>=3.8.0; extra == 'async'
Description-Content-Type: text/markdown

# Deadpipe Python SDK

Dead simple pipeline monitoring. Know when your pipelines die.

## Installation

```bash
pip install deadpipe
```

## Quick Start

### Option 1: Decorator (Recommended)

```python
from deadpipe import Deadpipe

dp = Deadpipe("your-api-key")

@dp.heartbeat("daily-sales-etl")
def run_pipeline():
    # Your pipeline code here
    process_data()
    return {"records_processed": 1500}  # Optional: track records

# That's it! Deadpipe will ping on success or failure.
run_pipeline()
```

### Option 2: Context Manager

```python
from deadpipe import Deadpipe

dp = Deadpipe("your-api-key")

with dp.pipeline("hourly-sync"):
    # Your code here
    sync_data()
```

### Option 3: Manual Ping

```python
from deadpipe import Deadpipe

dp = Deadpipe("your-api-key")

try:
    run_my_job()
    dp.ping("my-job", status="success", records_processed=1000)
except Exception as e:
    dp.ping("my-job", status="failed")
    raise
```

### Option 4: Environment Variable

Set `DEADPIPE_API_KEY` and use module-level functions:

```python
import deadpipe

@deadpipe.heartbeat("my-pipeline")
def my_job():
    pass
```

## Async Support (FastAPI, aiohttp, asyncio)

For async applications, install with async extras:

```bash
pip install deadpipe[async]
```

### Async Decorator

```python
from deadpipe import AsyncDeadpipe

dp = AsyncDeadpipe("your-api-key")

@dp.heartbeat("async-pipeline")
async def my_async_job():
    await fetch_data()
    return {"records_processed": 500}
```

### Async Context Manager

```python
async with dp.pipeline("async-etl"):
    await process_async_data()
```

### Async Manual Ping

```python
await dp.ping("my-job", status="success")
```

### FastAPI Example

```python
from fastapi import FastAPI
from deadpipe import AsyncDeadpipe

app = FastAPI()
dp = AsyncDeadpipe()  # Uses DEADPIPE_API_KEY env var

@app.on_event("startup")
async def startup():
    # Optionally track app startup
    await dp.ping("fastapi-app", status="success")

@app.on_event("shutdown") 
async def shutdown():
    await dp.close()  # Clean up HTTP session

@app.post("/process")
@dp.heartbeat("data-processor")
async def process_data():
    result = await heavy_processing()
    return {"records_processed": len(result)}
```

### Session Management

`AsyncDeadpipe` uses connection pooling via `aiohttp.ClientSession`. For long-running apps, use as a context manager or call `close()`:

```python
# Option 1: Context manager (auto-closes)
async with AsyncDeadpipe("your-key") as dp:
    await dp.ping("my-pipeline")

# Option 2: Manual close
dp = AsyncDeadpipe("your-key")
try:
    await dp.ping("my-pipeline")
finally:
    await dp.close()
```

## Airflow Integration

```python
from deadpipe import Deadpipe

dp = Deadpipe(api_key=Variable.get("DEADPIPE_API_KEY"))

@dp.heartbeat("{{ dag.dag_id }}")
def my_task():
    ...
```

Or add to the end of any task:

```python
from deadpipe import ping

def my_task():
    # ... your code ...
    ping("daily-etl", status="success")
```

## dbt Integration

Add to your `dbt_project.yml` on-run-end hook:

```yaml
on-run-end:
  - "{{ deadpipe_heartbeat('dbt-run') }}"
```

Or call from Python:

```python
# In your dbt runner script
from deadpipe import Deadpipe

dp = Deadpipe("your-api-key")

with dp.pipeline("dbt-daily"):
    subprocess.run(["dbt", "run"], check=True)
```

## API Reference

### `Deadpipe(api_key, base_url, timeout)`

Create a client instance.

- `api_key`: Your API key (or set `DEADPIPE_API_KEY` env var)
- `base_url`: Override for self-hosted (default: `https://www.deadpipe.com/api/v1`)
- `timeout`: Request timeout in seconds (default: 10)

### `dp.ping(pipeline_id, status, duration_ms, records_processed, app_name)`

Send a heartbeat.

- `pipeline_id`: Unique identifier for this pipeline
- `status`: `"success"` or `"failed"`
- `duration_ms`: How long the run took (optional)
- `records_processed`: Number of records (optional)
- `app_name`: Group pipelines under an app (optional)

### `@dp.heartbeat(pipeline_id, app_name, on_error)`

Decorator that auto-sends heartbeats.

- `on_error`: What to do on exception:
  - `"ping"` (default): Send failed heartbeat, then re-raise
  - `"raise"`: Re-raise without heartbeat
  - `"ignore"`: Send success heartbeat anyway

### `with dp.pipeline(pipeline_id, app_name)`

Context manager for heartbeats.

### `AsyncDeadpipe(api_key, base_url, timeout)`

Async client with the same API as `Deadpipe`, but all methods are async:

- `await dp.ping(...)` - Async heartbeat
- `@dp.heartbeat(...)` - Decorator for async functions
- `async with dp.pipeline(...)` - Async context manager
- `await dp.run(pipeline_id, fn, *args)` - Run async function with heartbeat
- `await dp.close()` - Close HTTP session
- `async with AsyncDeadpipe(...) as dp:` - Auto-close on exit

## Dependencies

- **Sync**: Zero dependencies (uses Python standard library only)
- **Async**: Requires `aiohttp` (install with `pip install deadpipe[async]`)

## License

Deadpipe SDK License - see [LICENSE](LICENSE) file.

