Metadata-Version: 2.4
Name: setmore-etl
Version: 0.2.0
Summary: Setmore ETL pipelines for Databricks Unity Catalog
Author-email: KIMI <p4w3l@outlook.com>
License: MIT
License-File: LICENSE
Requires-Python: >=3.10
Requires-Dist: requests>=2.28.0
Provides-Extra: dev
Requires-Dist: black>=23.0.0; extra == 'dev'
Requires-Dist: mypy>=1.5.0; extra == 'dev'
Requires-Dist: pytest-mock>=3.10.0; extra == 'dev'
Requires-Dist: pytest>=7.0.0; extra == 'dev'
Requires-Dist: ruff>=0.1.0; extra == 'dev'
Description-Content-Type: text/markdown

# 🚀 Setmore ETL for Databricks

[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Databricks](https://img.shields.io/badge/Databricks-Unity%20Catalog-orange.svg)](https://www.databricks.com/)

A production-grade Python library for building reliable, incremental ETL pipelines from [Setmore REST API](https://setmore.docs.apiary.io/#) to Databricks Unity Catalog. Built for Databricks Serverless environments with exactly-once semantics, automatic watermarking, and comprehensive audit logging.

## ✨ Features

- **🔄 Incremental Processing** - Smart watermark management with configurable lookback windows for late-arriving data
- **🎯 Exactly-Once Semantics** - Deterministic extraction IDs prevent duplicate processing
- **🔐 Thread-Safe OAuth** - Automatic token refresh with 5-minute buffer for Databricks Serverless
- **📊 Unity Catalog Native** - First-class support for Databricks catalogs, schemas, and Delta tables
- **🔍 Comprehensive Audit Trail** - Track every extraction with detailed metrics and error logging
- **💾 Delta Lake Optimized** - Efficient upsert patterns for appointments, staff, and services
- **🏗️ Type-Safe Schemas** - Explicit PySpark schemas eliminate runtime type inference issues
- **🧪 Production Ready** - Immutable configurations, pure functions, and zero side effects on import

## 📋 Prerequisites

- Databricks Workspace with Unity Catalog enabled
- Python 3.10 or higher
- Setmore account with API access
- Setmore OAuth refresh token ([obtain here](https://setmore.docs.apiary.io/#reference/authentication))

## 📦 Installation

### From PyPI (Coming Soon)

```bash
pip install setmore-etl
```

### From Source

```bash
git clone https://github.com/p4w3l/setmore-etl.git
cd setmore-etl
pip install -e .
```

### For Development

```bash
pip install -e ".[dev]"
```

## 🚀 Quick Start

### 1. Configure Your Pipeline

```python
from setmore_etl import SetmoreConfig

config = SetmoreConfig(
    catalog="main",
    schema="setmore_prod",
    landing_volume="raw_data",
    lookback_hours=2,  # Handle late-arriving appointments
)
```

### 2. Extract Appointments

```python
from pyspark.sql import SparkSession
from setmore_etl import (
    SetmoreAPIClient,
    WatermarkManager,
    AuditLogger,
    flatten_appointment,
    APPOINTMENT_SCHEMA
)

# Initialize components
spark = SparkSession.builder.getOrCreate()
client = SetmoreAPIClient(refresh_token="your_token", config=config)
watermark = WatermarkManager(spark, config)
audit = AuditLogger(spark, config)

# Calculate extraction window
window_start, window_end, extraction_id = watermark.calculate_extraction_window(
    pipeline_name="appointments"
)

# Check idempotency
if watermark.check_extraction_exists(extraction_id):
    print(f"Extraction {extraction_id} already completed")
    exit(0)

# Fetch and transform data
appointments = []
cursor = None

while True:
    response = client.fetch_appointments_page(
        start_date=window_start.strftime("%d-%m-%Y"),
        end_date=window_end.strftime("%d-%m-%Y"),
        cursor=cursor
    )
    
    if response.get("response"):
        batch = response["data"].get("appointments", [])
        appointments.extend([
            flatten_appointment(appt, "api", extraction_id)
            for appt in batch
        ])
        cursor = response["data"].get("cursor")
        if not cursor:
            break
    else:
        break

# Write to Delta table
df = spark.createDataFrame(appointments, schema=APPOINTMENT_SCHEMA)
df.write.format("delta").mode("append").saveAsTable(
    f"{config.catalog}.{config.schema}.appointments"
)

# Record watermark
watermark.set_watermark(
    pipeline_name="appointments",
    extraction_id=extraction_id,
    window_start=window_start,
    window_end=window_end,
    records=len(appointments)
)

# Audit logging
audit.log(
    pipeline_name="appointments",
    extraction_id=extraction_id,
    window_start=window_start,
    window_end=window_end,
    appointments=len(appointments),
    status="success"
)
```

### 3. Extract Staff and Services

```python
# Staff members
staff_records = client.fetch_all_staff()
staff_df = spark.createDataFrame([
    process_staff_record(s, "api", extraction_id)
    for s in staff_records
], schema=STAFF_SCHEMA)

staff_df.write.format("delta").mode("overwrite").option(
    "overwriteSchema", "true"
).saveAsTable(f"{config.catalog}.{config.schema}.staff")

# Services
service_records = client.fetch_all_services()
services_df = spark.createDataFrame([
    process_service_record(s, "api", extraction_id)
    for s in service_records
], schema=SERVICE_SCHEMA)

services_df.write.format("delta").mode("overwrite").option(
    "overwriteSchema", "true"
).saveAsTable(f"{config.catalog}.{config.schema}.services")
```

## 🏗️ Architecture

```
┌─────────────────────────────────────────────────────────────┐
│                     Setmore REST API                        │
│            https://developer.setmore.com/api/v1             │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│                  SetmoreAPIClient                           │
│  • Thread-safe OAuth token management                       │
│  • Automatic pagination                                     │
│  • Request retry logic                                      │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│                  Transformation Layer                       │
│  • flatten_appointment()    • parse_iso_timestamp()         │
│  • process_staff_record()   • generate_extraction_id()      │
│  • process_service_record()                                 │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│              Databricks Unity Catalog                       │
│                                                              │
│  ┌────────────────┐  ┌──────────────┐  ┌────────────────┐  │
│  │  appointments  │  │    staff     │  │   services     │  │
│  │   (Delta)      │  │   (Delta)    │  │   (Delta)      │  │
│  └────────────────┘  └──────────────┘  └────────────────┘  │
│                                                              │
│  ┌────────────────┐  ┌──────────────────────────────────┐  │
│  │  watermarks    │  │     audit_log                    │  │
│  │   (Delta)      │  │     (Delta)                      │  │
│  └────────────────┘  └──────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘
```

## 📊 Data Models

### Appointments
Captures booking details with customer information and temporal partitioning.

| Column | Type | Description |
|--------|------|-------------|
| `key` | STRING | Unique appointment identifier |
| `start_time` | TIMESTAMP | Appointment start (UTC) |
| `end_time` | TIMESTAMP | Appointment end (UTC) |
| `staff_key` | STRING | Foreign key to staff |
| `service_key` | STRING | Foreign key to service |
| `customer_key` | STRING | Customer identifier |
| `cost` | DOUBLE | Appointment cost |
| `customer_first_name` | STRING | Customer first name |
| `customer_email` | STRING | Customer email |
| `date_key` | STRING | Partition key (YYYY-MM-DD) |
| `extraction_id` | STRING | Extraction batch identifier |

### Staff
Team member directory with contact information.

| Column | Type | Description |
|--------|------|-------------|
| `key` | STRING | Unique staff identifier |
| `first_name` | STRING | Staff first name |
| `last_name` | STRING | Staff last name |
| `email_id` | STRING | Staff email |
| `work_phone` | STRING | Contact number |
| `extraction_id` | STRING | Extraction batch identifier |

### Services
Service catalog with pricing and duration.

| Column | Type | Description |
|--------|------|-------------|
| `key` | STRING | Unique service identifier |
| `service_name` | STRING | Service display name |
| `staff_keys` | ARRAY<STRING> | Available staff members |
| `duration` | INT | Service duration (minutes) |
| `cost` | DOUBLE | Service price |
| `extraction_id` | STRING | Extraction batch identifier |

## 🔧 Configuration Reference

### SetmoreConfig

```python
@dataclass(frozen=True)
class SetmoreConfig:
    catalog: str              # Unity Catalog name
    schema: str               # Schema name (e.g., "setmore_prod")
    landing_volume: str       # Volume for raw data storage
    base_url: str            # API base URL (default: Setmore v1)
    lookback_hours: int      # Late-arrival window (default: 2)
    watermark_table: str     # Watermark table name
    audit_table: str         # Audit log table name
```

### Environment Variables

For Databricks notebooks, use secrets:

```python
refresh_token = dbutils.secrets.get(scope="setmore", key="refresh_token")
```

## 🎯 Use Cases

### Healthcare & Wellness
- Patient appointment analytics
- Provider utilization tracking
- Revenue forecasting
- No-show pattern analysis

### Beauty & Personal Care
- Stylist performance metrics
- Service popularity trends
- Customer retention analysis
- Revenue per service tracking

### Professional Services
- Consultant booking analytics
- Client meeting analysis
- Service demand forecasting
- Staff capacity planning

### Education & Tutoring
- Student session tracking
- Tutor availability optimization
- Subject popularity analysis
- Enrollment trend reporting

## 🔍 Advanced Usage

### Custom Extraction Windows

```python
from datetime import datetime, timezone, timedelta

# Last 30 days
window_start = datetime.now(timezone.utc) - timedelta(days=30)
window_end = datetime.now(timezone.utc)
extraction_id = generate_extraction_id(window_start, window_end)
```

### Error Handling

```python
try:
    appointments = client.fetch_appointments_page(start_date, end_date)
except requests.HTTPError as e:
    audit.log(
        pipeline_name="appointments",
        extraction_id=extraction_id,
        window_start=window_start,
        window_end=window_end,
        status="failed",
        error=str(e)
    )
    raise
```

### Delta Merge Pattern (Upserts)

```python
from delta.tables import DeltaTable

target_table = f"{config.catalog}.{config.schema}.appointments"

if table_exists(spark, config.catalog, config.schema, "appointments"):
    delta_table = DeltaTable.forName(spark, target_table)
    
    delta_table.alias("target").merge(
        df.alias("source"),
        "target.key = source.key"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
    df.write.format("delta").saveAsTable(target_table)
```

## 🧪 Testing

```bash
# Run tests
pytest tests/

# Run with coverage
pytest --cov=setmore_etl tests/

# Type checking
mypy src/setmore_etl

# Linting
ruff check src/
black --check src/
```

## 📚 API Reference

Full API documentation available at [https://setmore.docs.apiary.io/](https://setmore.docs.apiary.io/)

### Key Endpoints Used

- `GET /o/oauth2/token` - OAuth token refresh
- `GET /bookingapi/appointments` - Fetch appointments with pagination
- `GET /bookingapi/staffs` - Fetch staff members
- `GET /bookingapi/services` - Fetch service catalog

## 🤝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

### Development Setup

```bash
# Clone repository
git clone https://github.com/p4w3l/setmore-etl.git
cd setmore-etl

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest
```

### Code Style

- Follow PEP 8 guidelines
- Use type hints for all function signatures
- Write docstrings for public APIs
- Maintain test coverage above 80%

## 📝 License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## 🙏 Acknowledgments

- Built for [Databricks](https://www.databricks.com/) Unity Catalog
- Integrates with [Setmore](https://www.setmore.com/) booking platform
- Inspired by modern data engineering best practices

## 📞 Support

- 📧 Email: support@yourcompany.com
- 💬 Issues: [GitHub Issues](https://github.com/p4w3l/setmore-etl/issues)
- 📖 Documentation: [Wiki](https://github.com/p4w3l/setmore-etl/wiki)

## 🗺️ Roadmap

- [ ] Add streaming support for real-time ingestion
- [ ] Implement data quality checks and validations
- [ ] Add support for Setmore webhooks
- [ ] Create dbt models for analytics
- [ ] Add Prefect/Airflow orchestration examples
- [ ] Develop data observability integrations (Monte Carlo, Great Expectations)

---

**Built with ❤️ for the data engineering community**
