Metadata-Version: 2.4
Name: pingpulse-airflow
Version: 0.1.0
Summary: Apache Airflow provider for PingPulse workflow monitoring
Author-email: PingPulse Team <support@pingpulse.com>
Maintainer-email: PingPulse Team <support@pingpulse.com>
License: MIT
Project-URL: Homepage, https://pingpulse.com
Project-URL: Documentation, https://pingpulse.com/api-docs
Project-URL: Repository, https://github.com/pingpulse/pingpulse-airflow
Project-URL: Issues, https://github.com/pingpulse/pingpulse-airflow/issues
Keywords: airflow,pingpulse,monitoring,observability,workflow,webhook,alerting
Classifier: Development Status :: 4 - Beta
Classifier: Environment :: Console
Classifier: Framework :: Apache Airflow
Classifier: Framework :: Apache Airflow :: Provider
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: System Administrators
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: System :: Monitoring
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: apache-airflow>=2.0.0
Requires-Dist: requests>=2.25.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-mock>=3.10.0; extra == "dev"
Requires-Dist: responses>=0.22.0; extra == "dev"

# PingPulse Airflow Provider

Official Apache Airflow provider for [PingPulse](https://pingpulse.com) workflow monitoring.

Monitor your Airflow DAGs in real-time with PingPulse - get instant alerts when tasks fail, track execution metrics, and visualize your data pipelines.

## Installation

```bash
pip install pingpulse-airflow
```

## Quick Start

### 1. Configure Connection (Recommended)

In Airflow UI, go to **Admin > Connections** and create a new connection:

| Field | Value |
|-------|-------|
| Conn Id | `pingpulse_default` |
| Conn Type | `HTTP` |
| Host | `app.pingpulse.com` |
| Password | Your API key (`ppk_xxx_xxx`) |

### 2. Choose Your Integration Pattern

PingPulse offers three ways to integrate - pick what fits your workflow:

---

## Option 1: Operator (Explicit Tasks)

Best for: **New DAGs where you want visible ping tasks**

```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from pingpulse_airflow import PingPulseOperator
from datetime import datetime

with DAG('etl_pipeline', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    ping_extract = PingPulseOperator(
        task_id='ping_extract',
        workflow_id='dwf123abc',
        stage_path='1',
        start=True,  # Starts new workflow instance
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )

    ping_transform = PingPulseOperator(
        task_id='ping_transform',
        workflow_id='dwf123abc',
        stage_path='2',
    )

    load = PythonOperator(
        task_id='load',
        python_callable=load_data,
    )

    ping_complete = PingPulseOperator(
        task_id='ping_complete',
        workflow_id='dwf123abc',
        stage_path='3',
        final=True,  # Marks workflow complete
    )

    extract >> ping_extract >> transform >> ping_transform >> load >> ping_complete
```

**DAG visualization:**
```
[Extract] → [Ping 1] → [Transform] → [Ping 2] → [Load] → [Ping 3]
```

---

## Option 2: Decorator (Auto-Ping)

Best for: **Clean DAGs with Python tasks, invisible instrumentation**

```python
from airflow import DAG
from pingpulse_airflow import pingpulse_task
from datetime import datetime

with DAG('etl_pipeline', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    @pingpulse_task(workflow_id='dwf123abc', stage_path='1', start=True)
    def extract():
        data = fetch_from_source()
        return {'records': len(data)}

    @pingpulse_task(workflow_id='dwf123abc', stage_path='2')
    def transform(data):
        transformed = process(data)
        return {'processed': len(transformed)}

    @pingpulse_task(workflow_id='dwf123abc', stage_path='3', final=True)
    def load(data):
        save_to_destination(data)
        return {'status': 'complete'}

    extract() >> transform() >> load()
```

**DAG visualization:**
```
[Extract] → [Transform] → [Load]
```
*(Pings happen automatically inside each task)*

---

## Option 3: Callbacks (Retrofit Existing DAGs)

Best for: **Adding monitoring to existing DAGs without code changes**

```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from pingpulse_airflow import pingpulse_success, pingpulse_failure
from datetime import datetime

with DAG('existing_pipeline', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
        on_success_callback=pingpulse_success('dwf123abc', '1', start=True),
        on_failure_callback=pingpulse_failure('dwf123abc', '1'),
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
        on_success_callback=pingpulse_success('dwf123abc', '2'),
        on_failure_callback=pingpulse_failure('dwf123abc', '2'),
    )

    load = PythonOperator(
        task_id='load',
        python_callable=load_data,
        on_success_callback=pingpulse_success('dwf123abc', '3', final=True),
        on_failure_callback=pingpulse_failure('dwf123abc', '3'),
    )

    extract >> transform >> load
```

---

## Comparison

| Pattern | Modifies Task Code | Visible in DAG | Best For |
|---------|-------------------|----------------|----------|
| **Operator** | No | Yes (extra nodes) | New DAGs, explicit tracking |
| **Decorator** | Yes (wrap function) | No | Clean DAGs, TaskFlow API |
| **Callback** | No | No | Existing DAGs, any operator |

---

## Advanced Usage

### Custom Payload

Send custom metrics with your pings:

```python
PingPulseOperator(
    task_id='ping_with_metrics',
    workflow_id='dwf123abc',
    stage_path='2',
    payload={
        'records_processed': 10000,
        'duration_seconds': 45.2,
        'source': 'postgres',
    },
)
```

### Direct API Key (No Connection)

```python
PingPulseOperator(
    task_id='ping',
    workflow_id='dwf123abc',
    stage_path='1',
    api_key='ppk_xxx_xxx',  # Direct key
    base_url='https://your-instance.pingpulse.com',  # Self-hosted
)
```

### Static vs Dynamic Workflows

```python
# Dynamic workflow (default) - stages created on-the-fly
PingPulseOperator(
    workflow_id='dwf123abc',
    workflow_type='dynamic',  # Uses /dhk/ endpoint
    ...
)

# Static workflow - predefined stages
PingPulseOperator(
    workflow_id='swf456def',
    workflow_type='static',  # Uses /shk/ endpoint
    ...
)
```

### Heartbeat Monitoring

For scheduled jobs that should run on a cron:

```python
from pingpulse_airflow.operators.pingpulse import PingPulseHeartbeatOperator

heartbeat = PingPulseHeartbeatOperator(
    task_id='send_heartbeat',
    monitor_id='hb123abc',
)
```

### DAG-Level Callbacks

Monitor entire DAG success/failure:

```python
from pingpulse_airflow.callbacks import pingpulse_dag_callbacks

callbacks = pingpulse_dag_callbacks('dwf123abc')

with DAG(
    'my_dag',
    on_success_callback=callbacks['on_success'],
    on_failure_callback=callbacks['on_failure'],
    ...
) as dag:
    ...
```

---

## What Gets Sent to PingPulse

Each ping includes:

```json
{
  "status": "success",
  "duration": 12.345,
  "airflow": {
    "dag_id": "etl_pipeline",
    "task_id": "transform",
    "run_id": "scheduled__2024-01-15T00:00:00+00:00",
    "execution_date": "2024-01-15T00:00:00+00:00",
    "try_number": 1
  }
}
```

On failure, also includes:
```json
{
  "status": "failed",
  "error": "Connection refused to database..."
}
```

---

## Requirements

- Python 3.8+
- Apache Airflow 2.0+
- PingPulse account with API key

---

## Support

- Documentation: https://pingpulse.com/api-docs
- Issues: https://github.com/pingpulse/pingpulse-airflow/issues
- Email: support@pingpulse.com

---

## License

MIT License - see LICENSE file for details.
