Metadata-Version: 2.4
Name: deadpipe
Version: 0.1.0
Summary: Dead simple pipeline monitoring. Know when your pipelines die.
Project-URL: Homepage, https://deadpipe.com
Project-URL: Documentation, https://deadpipe.com/docs
Project-URL: Repository, https://github.com/deadpipe/deadpipe-python
Author-email: Deadpipe <hello@deadpipe.com>
License-Expression: MIT
Keywords: dead-mans-switch,etl,heartbeat,monitoring,observability,pipelines
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: System :: Monitoring
Requires-Python: >=3.8
Description-Content-Type: text/markdown

# Deadpipe Python SDK

Dead simple pipeline monitoring. Know when your pipelines die.

## Installation

```bash
pip install deadpipe
```

## Quick Start

### Option 1: Decorator (Recommended)

```python
from deadpipe import Deadpipe

dp = Deadpipe("your-api-key")

@dp.heartbeat("daily-sales-etl")
def run_pipeline():
    # Your pipeline code here
    process_data()
    return {"records_processed": 1500}  # Optional: track records

# That's it! Deadpipe will ping on success or failure.
run_pipeline()
```

### Option 2: Context Manager

```python
from deadpipe import Deadpipe

dp = Deadpipe("your-api-key")

with dp.pipeline("hourly-sync"):
    # Your code here
    sync_data()
```

### Option 3: Manual Ping

```python
from deadpipe import Deadpipe

dp = Deadpipe("your-api-key")

try:
    run_my_job()
    dp.ping("my-job", status="success", records_processed=1000)
except Exception as e:
    dp.ping("my-job", status="failed")
    raise
```

### Option 4: Environment Variable

Set `DEADPIPE_API_KEY` and use module-level functions:

```python
import deadpipe

@deadpipe.heartbeat("my-pipeline")
def my_job():
    pass
```

## Airflow Integration

```python
from deadpipe import Deadpipe

dp = Deadpipe(api_key=Variable.get("DEADPIPE_API_KEY"))

@dp.heartbeat("{{ dag.dag_id }}")
def my_task():
    ...
```

Or add to the end of any task:

```python
from deadpipe import ping

def my_task():
    # ... your code ...
    ping("daily-etl", status="success")
```

## dbt Integration

Add to your `dbt_project.yml` on-run-end hook:

```yaml
on-run-end:
  - "{{ deadpipe_heartbeat('dbt-run') }}"
```

Or call from Python:

```python
# In your dbt runner script
from deadpipe import Deadpipe

dp = Deadpipe("your-api-key")

with dp.pipeline("dbt-daily"):
    subprocess.run(["dbt", "run"], check=True)
```

## API Reference

### `Deadpipe(api_key, base_url, timeout)`

Create a client instance.

- `api_key`: Your API key (or set `DEADPIPE_API_KEY` env var)
- `base_url`: Override for self-hosted (default: `https://www.deadpipe.com/api/v1`)
- `timeout`: Request timeout in seconds (default: 10)

### `dp.ping(pipeline_id, status, duration_ms, records_processed, app_name)`

Send a heartbeat.

- `pipeline_id`: Unique identifier for this pipeline
- `status`: `"success"` or `"failed"`
- `duration_ms`: How long the run took (optional)
- `records_processed`: Number of records (optional)
- `app_name`: Group pipelines under an app (optional)

### `@dp.heartbeat(pipeline_id, app_name, on_error)`

Decorator that auto-sends heartbeats.

- `on_error`: What to do on exception:
  - `"ping"` (default): Send failed heartbeat, then re-raise
  - `"raise"`: Re-raise without heartbeat
  - `"ignore"`: Send success heartbeat anyway

### `with dp.pipeline(pipeline_id, app_name)`

Context manager for heartbeats.

## Zero Dependencies

This SDK has no external dependencies. It uses only Python standard library.

## License

MIT

