Metadata-Version: 2.4
Name: incrementalflow
Version: 0.1.0
Summary: The definitive framework for incremental data loading in PySpark and Delta Lake
Home-page: https://github.com/Maria25182/incrementalflow
Author: Maria Antonia Bermudez Cardona
Author-email: maria212502@gmail.com
Keywords: pyspark,delta-lake,incremental,data-engineering,etl,data-pipeline,watermark,upsert,merge,scd
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Database
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pyspark>=3.0.0
Requires-Dist: delta-spark>=2.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: flake8>=6.0.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Provides-Extra: aws
Requires-Dist: boto3>=1.26.0; extra == "aws"
Provides-Extra: monitoring
Requires-Dist: datadog>=0.45.0; extra == "monitoring"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: keywords
Dynamic: license-file
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# IncrementalFlow

Stop writing the same incremental loading logic over and over again.

IncrementalFlow is a Python framework for PySpark that handles incremental data loading with automatic watermark management, merge operations, and data quality validation.

## The Problem

Every data engineer writes this code dozens of times:

```python
# Read watermark
last_processed = get_last_watermark("my_table")

# Read new data  
df_new = spark.read.parquet(source).filter(col("date") > last_processed)

# Merge logic
df_new.createOrReplaceTempView("updates")
spark.sql("""
    MERGE INTO silver.my_table t
    USING updates s ON t.id = s.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

# Update watermark
save_watermark("my_table", df_new.agg(max("date")).collect()[0][0])
```

This same pattern. Every single table. Every single pipeline.

## The Solution

```python
from incrementalflow import IncrementalLoader

loader = IncrementalLoader(
    source="bronze.transactions",
    target="silver.transactions",
    watermark_column="updated_at",
    keys=["transaction_id"],
    mode="upsert"
)

loader.run()
```

That's it. Watermarks, merge logic, idempotency - all handled automatically.

## Installation

```bash
pip install incrementalflow
```

Requires Python 3.8+ and PySpark 3.0+.

## Quick Example

Here's a complete pipeline from bronze to silver:

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import upper, trim, col
from incrementalflow import IncrementalLoader
from incrementalflow.validators import DataQualityRule

spark = SparkSession.builder.getOrCreate()

# Define data cleaning
def clean_data(df):
    return df.withColumn("email", trim(upper(col("email"))))

# Configure loader
loader = IncrementalLoader(
    spark=spark,
    source="bronze.customers",
    target="silver.customers_clean",
    watermark_column="last_updated",
    keys=["customer_id"],
    mode="upsert",
    transformations=[clean_data],
    quality_rules=[
        DataQualityRule.no_nulls(["customer_id", "email"]),
        DataQualityRule.no_duplicates(["customer_id"])
    ]
)

# Run
result = loader.run()
print(f"Processed: {result.records_processed}")
print(f"Updated watermark: {result.new_watermark}")
```

The first time this runs, it processes all data. Every subsequent run processes only new records since the last watermark.

## Core Features

**Automatic Watermark Management**

No more manual watermark tracking. The framework remembers where you left off.

```python
# First run: processes everything, saves watermark
loader.run()

# Second run: only processes new data
loader.run()
```

**Multiple Load Modes**

- `upsert` - Insert new records, update existing ones
- `append` - Insert only, never update  
- `replace` - Replace data in date range

**Data Quality Validation**

Catch bad data before it reaches your clean tables.

```python
quality_rules=[
    DataQualityRule.no_nulls(["critical_field"]),
    DataQualityRule.no_duplicates(["id"]),
    DataQualityRule.value_range("amount", min_val=0, max_val=1000000)
]
```

If validation fails, the load stops and nothing gets written.

**Transformation Pipeline**

Apply transformations in sequence. Each function receives the DataFrame and returns a transformed version.

```python
transformations=[
    clean_strings,
    add_calculated_fields,
    filter_invalid_records
]
```

**Late Data Handling**

Process data that arrives out of order.

```python
IncrementalLoader(
    ...
    lookback_days=3  # Check last 3 days for updates
)
```

**Backfilling**

Reprocess specific date ranges when needed.

```python
loader.run(
    start_date=datetime(2024, 1, 1),
    end_date=datetime(2024, 1, 31),
    mode_override="replace"
)
```

## Real-World Example

This is how you'd handle a typical bronze-to-silver pipeline with multiple sources:

```python
from incrementalflow import IncrementalLoader
from incrementalflow.validators import DataQualityRule
from pyspark.sql.functions import trim, upper, current_timestamp, lit, col

def standardize_format(df):
    return (df
        .withColumn("customer_name", upper(trim(col("customer_name"))))
        .withColumn("email", trim(col("email")))
        .withColumn("processed_at", current_timestamp())
    )

def add_source_tag(source_name):
    def tag(df):
        return df.withColumn("source", lit(source_name))
    return tag

# Process each source
for source in ["source_a", "source_b", "source_c"]:
    loader = IncrementalLoader(
        source=f"bronze.{source}_customers",
        target="silver.customers_consolidated",
        watermark_column="updated_at",
        keys=["customer_id"],
        mode="upsert",
        transformations=[
            standardize_format,
            add_source_tag(source)
        ],
        quality_rules=[
            DataQualityRule.no_nulls(["customer_id", "email"]),
            DataQualityRule.no_duplicates(["customer_id"])
        ]
    )
    
    result = loader.run()
    print(f"{source}: {result.records_processed} records processed")
```

## When You Don't Have a Date Column

If your source table doesn't have a timestamp column, you have a few options:

**Option 1: Add timestamp during load (recommended)**

```python
from pyspark.sql.functions import current_timestamp

df = spark.table("bronze.my_table")
df_with_ts = df.withColumn("loaded_at", current_timestamp())

loader = IncrementalLoader(
    source_df=df_with_ts,
    target="silver.my_table",
    watermark_column="loaded_at",
    keys=["id"],
    mode="upsert"
)
```

**Option 2: Use an ID column**

If you have an auto-incrementing ID, you can use it as a watermark:

```python
IncrementalLoader(
    watermark_column="record_id",  # Uses ID instead of timestamp
    ...
)
```

Note: This only detects new inserts, not updates to existing records.

**Option 3: Full refresh**

For small tables, just reprocess everything:

```python
IncrementalLoader(
    watermark_column=None,  # No watermark
    ...
)
```

## Configuration

You can configure the loader with various options:

```python
IncrementalLoader(
    spark=spark,                    # SparkSession (optional, uses active session)
    source="bronze.table",          # Source table name
    target="silver.table",          # Target table name
    watermark_column="updated_at",  # Column for incremental logic
    keys=["id"],                    # Primary key for merge
    mode="upsert",                  # Load mode
    
    # Optional
    partition_columns=["date"],     # Partition columns
    transformations=[...],          # Transform functions
    quality_rules=[...],            # Validation rules
    lookback_days=7,                # Days to look back for late data
    enable_z_ordering=True,         # Z-order optimization
    enable_auto_optimize=True       # Auto-optimize after write
)
```

## Watermark Storage

By default, watermarks are stored implicitly by reading the max value from the target table. You can also use explicit storage:

**Control Table**

```python
from incrementalflow.watermark import ControlTableWatermarkStore

loader = IncrementalLoader(
    ...
    watermark_store=ControlTableWatermarkStore(
        spark, 
        control_table="my_watermarks"
    )
)
```

**File Storage (for local development)**

```python
from incrementalflow.watermark import FileWatermarkStore

loader = IncrementalLoader(
    ...
    watermark_store=FileWatermarkStore("watermarks.json")
)
```

## Performance

IncrementalFlow is designed for production workloads:

- Only reads data since last watermark (not the entire table)
- Partition pruning for partitioned tables
- Z-ordering support for Delta Lake
- Automatic optimization after writes

In our testing with a 10M record table receiving 100K daily updates:
- Without IncrementalFlow: 15 minutes (full table scan)
- With IncrementalFlow: 2 minutes (incremental only)

## Error Handling

If something goes wrong, the watermark isn't updated. This means you can safely retry:

```python
try:
    result = loader.run()
except Exception as e:
    # Watermark not updated - safe to retry
    logger.error(f"Load failed: {e}")
    # Fix the issue, then re-run
    result = loader.run()
```

## Requirements

- Python 3.8 or higher
- PySpark 3.0 or higher
- Delta Lake 2.0+ (optional, for Delta features)

## Contributing

Contributions are welcome. Please open an issue first to discuss what you'd like to change.

## License

MIT License - see LICENSE file for details.

## Support

- **Issues**: GitHub Issues
- **Discussions**: GitHub Discussions
- **Email**: your.email@example.com

## Acknowledgments

Built by data engineers, for data engineers. Inspired by years of writing the same incremental loading code over and over.
