Metadata-Version: 2.4
Name: ntb-dq-framework
Version: 0.2.1
Summary: Data Quality framework for Databricks — rule-based checks across six dimensions with Delta table persistence
Author-email: Ake-Adul <adul.chu@turbo.co.th>
License: MIT
Requires-Python: >=3.11
Provides-Extra: dev
Requires-Dist: chispa; extra == 'dev'
Requires-Dist: hypothesis; extra == 'dev'
Requires-Dist: pytest-cov; extra == 'dev'
Requires-Dist: pytest>=7.0; extra == 'dev'
Provides-Extra: spark
Requires-Dist: pyspark>=3.4; extra == 'spark'
Description-Content-Type: text/markdown

# NTB DQ Framework

Data Quality framework for Databricks — rule-based checks across six dimensions with Delta table persistence.

## Installation

```bash
pip install ntb-dq-framework
```

Requires Python >= 3.9.

## Quick Start

```python
from pyspark.sql import SparkSession
from ntb_dq_framework import DQEngine

spark = SparkSession.builder.getOrCreate()

# Initialize — creates Delta tables if they don't exist yet
engine = DQEngine(spark, catalog="my_catalog")

# Register a rule
rule_id = engine.add_rule(
    rule_name="null_email",
    dimension="Completeness",
    target_table="my_catalog.my_schema.customers",
    target_column="email",
    rule_type="null_check",
    severity="critical",
    owner="data-team",
)

# Run all active rules for a table
summary = engine.run_checks("my_catalog.my_schema.customers")
print(f"Passed: {summary.total_rows_passed}, Failed: {summary.total_rows_failed}")

# Query results
results_df = engine.get_results(run_id=summary.run_id)
results_df.show()
```

## Dimensions and Rule Types

> **How rules work:** You always define what **valid (good) data** looks like. The framework then reports any rows that **don't** match as failures. For example, `allowed_values=["Male", "Female"]` means rows with those values **pass** — anything else **fails**. This applies to all rule types: regex patterns describe valid formats, range bounds describe acceptable values, business rules describe conditions that should be true, etc.

### Accuracy
| Rule Type | Description | Key Parameters |
|---|---|---|
| `reference_match` | Joins target with a reference table and reports mismatched rows | `reference_table`, `key_column`, `comparison_column` |
| `tolerance_match` | Like reference_match but allows a numeric tolerance | `reference_table`, `key_column`, `comparison_column`, `tolerance` |
| `reconciliation` | Compares aggregate values (sum/count) between target and reference | `reference_table`, `aggregate_column`, `aggregate_function` |

### Completeness
| Rule Type | Description | Key Parameters |
|---|---|---|
| `null_check` | Counts rows where the target column is null or empty | — |
| `missing_partition` | Compares expected date partitions against actual values | `expected_partitions`, `partition_column` |
| `volume_check` | Checks row count against a baseline threshold | `baseline`, `threshold_percentage` |

### Consistency
| Rule Type | Description | Key Parameters |
|---|---|---|
| `referential_integrity` | Left anti-join to find orphan rows missing from a reference table | `reference_table`, `key_column` |
| `cross_system_match` | Joins on key columns and reports rows where compared columns differ | `reference_table`, `key_columns`, `comparison_columns` |
| `business_rule` | Evaluates a SQL boolean expression and reports rows where it's false | Uses `rule_expression` |

### Timeliness
| Rule Type | Description | Key Parameters |
|---|---|---|
| `freshness_check` | Computes delay in hours since the latest timestamp value | — |
| `sla_check` | Fails when freshness delay exceeds a threshold | `sla_threshold_hours` |

### Validity
| Rule Type | Description | Key Parameters |
|---|---|---|
| `regex_check` | Reports non-null rows not matching a regex pattern (nulls are excluded; column is auto-cast to string) | Uses `rule_expression` |
| `allowed_values` | Reports rows with values not in an allowed list | `allowed_values` |
| `range_check` | Reports rows outside min/max bounds | `min`, `max` |
| `type_check` | Reports rows that can't be cast to a target type | `target_type` |

### Uniqueness
| Rule Type | Description | Key Parameters |
|---|---|---|
| `duplicate_check` | Groups by key columns and identifies duplicate groups | `key_columns` |

## API Reference

### DQEngine

```python
engine = DQEngine(spark, catalog="my_catalog", schema="my_schema")
```

| Parameter | Type | Default | Description |
|---|---|---|---|
| `spark` | `SparkSession` | — | Active Spark session |
| `catalog` | `str` | — | Unity Catalog name |
| `schema` | `str` | `"data_quality"` | Schema for DQ tables. Optional — defaults to `data_quality` if omitted |
| `table_prefix` | `str` | `"dq_"` | Prefix for all Delta table names |
| `max_failed_samples` | `int` | `100` | Max failing rows to sample per rule |

| Method | Description |
|---|---|
| `add_rule(...)` | Register a rule. Returns the `rule_id`. Deduplicates identical configs automatically. See [add_rule Parameters](#add_rule-parameters) below. |
| `deactivate_rule(rule_id)` | Soft-delete a rule by setting `is_active = False`. |
| `run_checks(target_table, df=None)` | Execute all active rules for a table. Returns a `RunSummary` (includes `critical_failed_rows_df` — see [Filtering Critical Failures](#filtering-critical-failures)). |
| `get_results(run_id=None, rule_id=None, dimension=None, target_table=None, date_range=None)` | Query the results table with optional filters. Returns a Spark DataFrame. |
| `daily_summary(run_date=None)` | Aggregate run log data by date and target table. |
| `dimension_summary(run_date=None)` | Aggregate results by date, dimension, and target table. |
| `rule_trend(rule_id=None, days=30)` | Result data grouped by rule and date over a time window. |
| `top_offenders(run_date=None, top_n=10)` | Top N rules with the lowest pass rate. |

### add_rule Parameters

```python
engine.add_rule(
    rule_name="...",
    dimension="...",
    target_table="...",
    rule_type="...",
    rule_expression="",        # SQL expression or regex pattern (for regex_check, business_rule)
    target_column=None,        # Column to check
    parameters=None,           # Dict of rule-type-specific parameters (see below)
    severity="warning",        # "critical" or "warning"
    owner="",
)
```

| Parameter | Type | Default | Description |
|---|---|---|---|
| `rule_name` | `str` | — | Human-readable name for the rule |
| `dimension` | `str` | — | One of `Accuracy`, `Completeness`, `Consistency`, `Timeliness`, `Validity`, `Uniqueness` |
| `target_table` | `str` | — | Fully qualified table name (e.g., `catalog.schema.table`) |
| `rule_type` | `str` | — | Check type (see [Dimensions and Rule Types](#dimensions-and-rule-types)) |
| `rule_expression` | `str` | `""` | SQL expression or regex pattern (used by `regex_check`, `business_rule`) |
| `target_column` | `str \| None` | `None` | Column to evaluate |
| `parameters` | `dict \| None` | `None` | Rule-type-specific parameters (see Key Parameters in each dimension table) |
| `severity` | `str` | `"warning"` | `"critical"` or `"warning"` |
| `owner` | `str` | `""` | Owner or team responsible for this rule |

> **Note:** Rule-type-specific options like `allowed_values`, `min`, `max`, `key_columns`, etc. must be passed inside the `parameters` dict — not as top-level keyword arguments.

**Examples:**

```python
# Validity — allowed_values
engine.add_rule(
    rule_name="valid_gender",
    dimension="Validity",
    target_table="my_catalog.my_schema.customers",
    target_column="gender",
    rule_type="allowed_values",
    severity="warning",
    owner="data-team",
    parameters={"allowed_values": ["Male", "Female"]},
)

# Validity — range_check
engine.add_rule(
    rule_name="valid_age",
    dimension="Validity",
    target_table="my_catalog.my_schema.customers",
    target_column="age",
    rule_type="range_check",
    parameters={"min": 0, "max": 120},
)

# Uniqueness — duplicate_check
engine.add_rule(
    rule_name="unique_order",
    dimension="Uniqueness",
    target_table="my_catalog.my_schema.orders",
    rule_type="duplicate_check",
    parameters={"key_columns": ["order_id"]},
)
```

### Severity Levels

Rules support two severity levels: `critical` and `warning`.

- `critical` — **all** failed rows are saved to `dq_failed_records` (no sampling cap) and collected into `RunSummary.critical_failed_rows_df` so callers can filter them out before writing.
- `warning` — only a sample of failed rows (up to `max_failed_samples`) is saved to `dq_failed_records`.

### Filtering Critical Failures

`run_checks()` returns a `RunSummary` with `critical_failed_rows_df` — the full (uncapped) union of all failed rows from critical-severity rules. Use it to filter out bad records before upserting:

```python
summary = engine.run_checks("my_catalog.my_schema.customers")

if summary.critical_failed_rows_df is not None:
    clean_df = df.join(summary.critical_failed_rows_df, how="left_anti")
    clean_df.write.mode("overwrite").saveAsTable("my_catalog.my_schema.customers")
else:
    df.write.mode("overwrite").saveAsTable("my_catalog.my_schema.customers")
```

### Notes on `rule_expression`

When using regex patterns in `rule_expression` (e.g., for `regex_check` or `business_rule`), use a raw string to preserve backslashes:

```python
engine.add_rule(
    rule_name="13_digit_id",
    dimension="Validity",
    target_table="my_catalog.my_schema.employees",
    target_column="id_card",
    rule_type="regex_check",
    rule_expression=r"^\d{13}$",  # raw string — backslashes are preserved
    severity="warning",
    owner="data-team",
)
```

### SQL Guard — Blocked Patterns

The framework includes a built-in SQL guard that validates all user-supplied expressions and identifiers before they reach Spark SQL. If a blocked pattern is detected, a `DQValidationError` is raised.

**Blocked DDL/DML statements:**

`DROP`, `ALTER`, `CREATE`, `INSERT`, `UPDATE`, `DELETE`, `TRUNCATE`, `MERGE`, `GRANT`, `REVOKE`

**Blocked injection patterns:**

| Pattern | Example |
|---|---|
| SQL comments | `--`, `/* */` |
| `UNION SELECT` | `UNION ALL SELECT ...` |
| `INTO OUTFILE/DUMPFILE` | File exfiltration |
| `EXEC()` / `EXECUTE()` | Stored procedure execution |
| `xp_*` / `sp_*` | SQL Server system procedures |

**Allowed — normal SQL boolean expressions:**

```python
"total_amount > 0"
"end_date >= start_date"
"status IN ('active', 'pending')"
"price * quantity = total AND discount >= 0"
"COALESCE(email, '') != ''"
```

## Project Structure

```
ntb_dq_framework/
├── __init__.py            # Public API — exports DQEngine
├── engine.py              # DQEngine entry point
├── models.py              # RuleConfig, CheckResult, RunSummary dataclasses
├── rule_manager.py        # CRUD operations on the rule registry
├── run_executor.py        # Orchestrates check execution
├── result_writer.py       # Persists results to Delta tables
├── table_initializer.py   # Creates Delta tables on first run
├── monitoring.py          # Aggregation views for dashboards
├── exceptions.py          # DQValidationError
└── checks/
    ├── base.py            # BaseCheck abstract class
    ├── accuracy.py
    ├── completeness.py
    ├── consistency.py
    ├── timeliness.py
    ├── validity.py
    └── uniqueness.py
```

## Delta Tables

On initialization, the framework creates four Delta tables (prefixed with `dq_` by default):

| Table | Purpose |
|---|---|
| `dq_rule_registry` | Stores rule definitions with activation status |
| `dq_run_log` | Tracks each run execution with summary stats (partitioned by `target_table`) |
| `dq_result_table` | Per-rule results for every run (partitioned by `target_table`) |
| `dq_failed_records` | Failing rows — all rows for critical rules, sampled up to `max_failed_samples` for warning rules (partitioned by `target_table`) |

## Changelog

### 0.2.1

- `run_checks` now returns `critical_failed_rows_df` on `RunSummary` — the full (uncapped) union of all failed rows from critical-severity rules, so callers can filter bad records before upserting.
- Critical-severity rules now save **all** failed rows to `dq_failed_records` (no sampling cap). Warning rules still save up to `max_failed_samples`.
- Delta tables (`dq_run_log`, `dq_result_table`, `dq_failed_records`) are now partitioned by `target_table` with `WriteSerializable` isolation level, enabling parallel pipeline execution without `MetadataChangedException`.

### 0.2.0

- `run_checks` no longer raises `DQRunError` when critical-severity rules fail or error. The method now returns a `RunSummary` containing all results (including critical failures), allowing the pipeline to continue. Results are still persisted to Delta tables as before.

### 0.1.5

- Initial release with six-dimension rule-based checks and Delta table persistence.
