Skip to content

Data Quality Monitoring

Learn to monitor contract (row-based) data quality, track violations, and set up alerts. For pipeline (column/dataset-based) quality checks on ETL runs (e.g. row count, null rate), see Pipeline quality and Data quality: contract vs pipeline.

What You'll Learn

  • Run quality checks on your data
  • Calculate quality metrics
  • Set up threshold alerts
  • Track and query violations
  • Generate quality reports
  • Integrate with pipelines

Prerequisites

pip install pycharter

Part 1: Quick Start with Convenience Functions

The fastest way to run a quality check is the check_quality() function — pass a contract and data, get a report:

from pycharter import check_quality

contract = {
    "schema": {
        "version": "1.0.0",
        "properties": {
            "name": {"type": "string", "minLength": 1},
            "email": {"type": "string", "format": "email"},
            "age": {"type": "integer", "minimum": 0, "maximum": 150}
        },
        "required": ["name", "email"]
    }
}

data = [
    {"name": "Alice", "email": "alice@example.com", "age": 30},
    {"name": "Bob", "email": "invalid-email", "age": 25},
    {"name": "", "email": "charlie@example.com", "age": -5},
]

report = check_quality(contract=contract, data=data)

print(f"Quality Score: {report.quality_score.overall_score:.1f}/100")
print(f"Records: {report.record_count}")
print(f"Valid: {report.valid_count}")
print(f"Invalid: {report.invalid_count}")

Presets

QualityCheckOptions offers three presets for common scenarios:

from pycharter import check_quality, QualityCheckOptions

# basic() — metrics + violations, no profiling or thresholds (default)
report = check_quality(contract, data)

# strict() — everything on, including thresholds with defaults
report = check_quality(contract, data, options=QualityCheckOptions.strict())
if not report.passed:
    print(f"Threshold breaches: {report.threshold_breaches}")

# monitoring() — strict + skip-if-unchanged + dedup violations
report = check_quality(contract, data, options=QualityCheckOptions.monitoring())
Preset Metrics Violations Profiling Thresholds Skip unchanged Dedup
basic() Yes Yes No No No Yes
strict() Yes Yes Yes Yes (defaults) No Yes
monitoring() Yes Yes Yes Yes (defaults) Yes Yes

Standalone Data Profiling

Profile a dataset without a contract using profile_data():

from pycharter import profile_data

profile = profile_data(data)
print(f"Records: {profile['record_count']}")
print(f"Completeness: {profile['overall_stats']['completeness']:.1f}%")

for field, stats in profile["field_profiles"].items():
    print(f"  {field}: {stats['null_count']} nulls, {stats['distinct_count']} distinct")

Using the QualityCheck Class

For store-backed schemas, database persistence, or advanced options, use the QualityCheck class directly:

from pycharter import QualityCheck, SQLiteMetadataStore

store = SQLiteMetadataStore("quality.db")
store.connect()

schema = {
    "type": "object",
    "version": "1.0.0",
    "properties": {
        "name": {"type": "string", "minLength": 1},
        "email": {"type": "string", "format": "email"},
        "age": {"type": "integer", "minimum": 0, "maximum": 150}
    },
    "required": ["name", "email"]
}
schema_id = store.store_schema("user", schema, version="1.0.0")

check = QualityCheck(store=store)
report = check.run(
    schema_id=schema_id,
    data=[
        {"name": "Alice", "email": "alice@example.com", "age": 30},
        {"name": "Bob", "email": "invalid-email", "age": 25},
        {"name": "", "email": "charlie@example.com", "age": -5},
    ]
)

print(f"Quality Score: {report.quality_score.overall_score:.1f}/100")

There is also a convenience function for store-based checks:

from pycharter import check_quality_with_store

report = check_quality_with_store(
    store=store,
    contract_name="user",
    contract_version="1.0.0",
    data=data,
)

Part 2: Quality Metrics

Understanding Metrics

Metric Description Formula
overall_score Overall quality (0-100) Weighted combination
violation_rate % of records with errors invalid / total
completeness % of non-null required fields filled / required
accuracy % of valid values valid_values / total_values
field_scores Per-field quality scores Varies by field

Accessing Metrics

report = check.run(schema_id=schema_id, data=data)

# Overall metrics
score = report.quality_score
print(f"Overall Score: {score.overall_score:.1f}")
print(f"Violation Rate: {score.violation_rate:.2%}")
print(f"Completeness: {score.completeness:.2%}")
print(f"Accuracy: {score.accuracy:.2%}")

# Record counts
print(f"Total Records: {report.record_count}")
print(f"Valid Records: {report.valid_count}")
print(f"Invalid Records: {report.invalid_count}")
print(f"Violations: {report.violation_count}")

# Field-level scores
for field, field_score in score.field_scores.items():
    print(f"  {field}: {field_score:.1f}")

Field Metrics

# Get detailed field metrics
for field_name, metrics in report.field_metrics.items():
    print(f"\n{field_name}:")
    print(f"  Null Count: {metrics.null_count}")
    print(f"  Violation Count: {metrics.violation_count}")
    print(f"  Total Count: {metrics.total_count}")
    print(f"  Violation Rate: {metrics.violation_rate:.2%}")
    print(f"  Completeness: {metrics.completeness:.2%}")

    if metrics.error_types:
        print(f"  Error Types: {metrics.error_types}")

Part 3: Quality Thresholds

Set alerts when quality drops below acceptable levels:

Defining Thresholds

from pycharter import QualityCheck, QualityThresholds

# Define thresholds
thresholds = QualityThresholds(
    min_overall_score=95.0,      # Alert if score < 95
    max_violation_rate=0.05,     # Alert if violations > 5%
    min_completeness=0.99,       # Alert if completeness < 99%
    min_accuracy=0.98,           # Alert if accuracy < 98%
)

# Run check with thresholds
report = check.run(
    schema_id=schema_id,
    data=data,
    thresholds=thresholds
)

# Check if thresholds passed
print(f"Passed: {report.passed}")
print(f"Threshold Breaches: {report.threshold_breaches}")

Handling Threshold Breaches

report = check.run(
    schema_id=schema_id,
    data=data,
    thresholds=thresholds
)

if not report.passed:
    print("⚠️ Quality thresholds breached!")

    for breach in report.threshold_breaches:
        print(f"  - {breach}")

    # Take action
    send_alert(
        message=f"Data quality dropped to {report.quality_score.overall_score:.1f}",
        breaches=report.threshold_breaches
    )

Per-Field Thresholds

from pycharter import QualityCheckOptions

options = QualityCheckOptions(
    calculate_metrics=True,
    record_violations=True,
    check_thresholds=True,
    thresholds=thresholds,
    field_thresholds={
        "email": {"max_violation_rate": 0.01},  # Strict for email
        "age": {"max_violation_rate": 0.10},    # Lenient for age
    }
)

report = check.run(
    schema_id=schema_id,
    data=data,
    options=options
)

Part 4: Violation Tracking

Track individual validation errors for analysis and remediation:

Recording Violations

from pycharter import QualityCheckOptions

options = QualityCheckOptions(
    record_violations=True,  # Enable violation tracking
    calculate_metrics=True
)

report = check.run(
    schema_id=schema_id,
    data=data,
    options=options
)

print(f"Violations recorded: {report.violation_count}")

Querying Violations

# Get violations from the store
violations = store.query_violations(
    schema_id=schema_id,
    status="open",          # open, resolved, ignored
    severity="high",        # low, medium, high, critical
    limit=100
)

for violation in violations:
    print(f"Record ID: {violation.record_id}")
    print(f"Field: {violation.field_violations}")
    print(f"Severity: {violation.severity}")
    print(f"Timestamp: {violation.timestamp}")
    print("---")

Violation Analysis

# Aggregate violations by field
from collections import Counter

violations = store.query_violations(schema_id=schema_id)

field_counts = Counter()
error_types = Counter()

for v in violations:
    for field_error in v.field_violations:
        field_counts[field_error["field"]] += 1
        error_types[field_error["type"]] += 1

print("Violations by field:")
for field, count in field_counts.most_common():
    print(f"  {field}: {count}")

print("\nViolations by error type:")
for error_type, count in error_types.most_common():
    print(f"  {error_type}: {count}")

Resolving Violations

# Mark violation as resolved
store.resolve_violation(
    violation_id=violation.id,
    resolved_by="alice@example.com",
    resolution_note="Fixed upstream data source"
)

# Ignore false positive
store.update_violation_status(
    violation_id=violation.id,
    status="ignored",
    note="Known exception for legacy records"
)

Part 5: Quality Reports

Basic Report

report = check.run(schema_id=schema_id, data=data)

# Access report data
print(f"Schema: {report.schema_id}")
print(f"Version: {report.schema_version}")
print(f"Timestamp: {report.check_timestamp}")
print(f"Duration: {report.metadata.get('duration_seconds', 'N/A')}s")

Exporting Reports

import json

# To dictionary
report_dict = {
    "schema_id": report.schema_id,
    "timestamp": report.check_timestamp,
    "quality_score": {
        "overall": report.quality_score.overall_score,
        "violation_rate": report.quality_score.violation_rate,
        "completeness": report.quality_score.completeness,
        "accuracy": report.quality_score.accuracy,
    },
    "record_counts": {
        "total": report.record_count,
        "valid": report.valid_count,
        "invalid": report.invalid_count,
    },
    "passed": report.passed,
    "threshold_breaches": report.threshold_breaches,
}

# To JSON
with open("quality_report.json", "w") as f:
    json.dump(report_dict, f, indent=2, default=str)

# To CSV (metrics history)
import csv

with open("quality_history.csv", "a", newline="") as f:
    writer = csv.writer(f)
    writer.writerow([
        report.check_timestamp,
        report.quality_score.overall_score,
        report.quality_score.violation_rate,
        report.record_count,
        report.valid_count,
    ])

Historical Tracking

# Store quality metrics
metrics_id = store.store_quality_metrics(
    schema_id=schema_id,
    quality_score=report.quality_score,
    record_count=report.record_count,
    valid_count=report.valid_count,
    invalid_count=report.invalid_count,
    violation_count=report.violation_count,
)

# Query historical metrics
history = store.get_quality_metrics_history(
    schema_id=schema_id,
    limit=30  # Last 30 checks
)

# Analyze trends
import statistics

scores = [m.overall_score for m in history]
print(f"Average Score: {statistics.mean(scores):.1f}")
print(f"Min Score: {min(scores):.1f}")
print(f"Max Score: {max(scores):.1f}")
print(f"Std Dev: {statistics.stdev(scores):.2f}")

Part 6: Pipeline Integration

Quality Gate Pattern

import asyncio
from pycharter import Pipeline, QualityCheck, QualityThresholds

async def run_pipeline_with_quality_gate():
    # Run ETL pipeline
    pipeline = Pipeline.from_config_dir("pipelines/users/")
    result = await pipeline.run()

    if not result.success:
        raise Exception(f"Pipeline failed: {result.errors}")

    # Run quality check on output
    check = QualityCheck(store=store)
    report = check.run(
        schema_id="user_schema",
        data=read_output("output/users.json"),
        thresholds=QualityThresholds(min_overall_score=95.0)
    )

    if not report.passed:
        # Quality gate failed - don't proceed
        raise Exception(f"Quality gate failed: {report.threshold_breaches}")

    # Quality passed - continue to next stage
    print(f"Quality gate passed with score {report.quality_score.overall_score:.1f}")

asyncio.run(run_pipeline_with_quality_gate())

Inline Validation with Metrics

from pycharter import Pipeline, CustomFunction, Validator

validator = Validator.from_file("contracts/user.yaml")
metrics = {"valid": 0, "invalid": 0, "errors": []}

def validate_with_metrics(records):
    valid_records = []
    for record in records:
        result = validator.validate(record)
        if result.is_valid:
            metrics["valid"] += 1
            valid_records.append(result.data.model_dump())
        else:
            metrics["invalid"] += 1
            metrics["errors"].extend(result.errors)
    return valid_records

pipeline = (
    Pipeline(extractor)
    | CustomFunction(validate_with_metrics)
    | loader
)

result = asyncio.run(pipeline.run())

# Access metrics after run
print(f"Valid: {metrics['valid']}")
print(f"Invalid: {metrics['invalid']}")
print(f"Quality: {metrics['valid'] / (metrics['valid'] + metrics['invalid']):.2%}")

Scheduled Quality Checks

Use the monitoring() preset for recurring checks — it enables deduplication and skip-if-unchanged to avoid redundant work:

import schedule
import time
from pycharter import check_quality, QualityCheckOptions

def run_quality_check():
    """Run quality check on production data."""
    data = fetch_production_data()

    # monitoring() preset: thresholds + dedup + skip-if-unchanged
    report = check_quality(
        contract="contracts/user.yaml",
        data=data,
        options=QualityCheckOptions.monitoring(),
    )

    if not report.passed:
        send_alert(
            channel="#data-quality",
            message=f"Quality alert: {report.threshold_breaches}"
        )

# Schedule hourly checks
schedule.every().hour.do(run_quality_check)

while True:
    schedule.run_pending()
    time.sleep(60)

Part 7: Best Practices

1. Set Realistic Thresholds

# Start with lenient thresholds
initial_thresholds = QualityThresholds(
    min_overall_score=80.0,
    max_violation_rate=0.20
)

# Tighten over time as data quality improves
production_thresholds = QualityThresholds(
    min_overall_score=95.0,
    max_violation_rate=0.05
)

2. Track Metrics Over Time

# Always store metrics for trend analysis
report = check.run(schema_id=schema_id, data=data)

store.store_quality_metrics(
    schema_id=schema_id,
    quality_score=report.quality_score,
    # ... other fields
)

3. Categorize Violations

# Use severity levels
def classify_violation(violation):
    critical_fields = ["email", "payment_id"]
    high_fields = ["name", "phone"]

    field = violation["field"]

    if field in critical_fields:
        return "critical"
    elif field in high_fields:
        return "high"
    else:
        return "medium"

4. Automate Remediation

def auto_remediate(violation):
    """Attempt automatic fixes for known issues."""

    if violation.error_type == "string_pattern_mismatch":
        if violation.field == "phone":
            # Auto-fix: strip non-digits
            return fix_phone_number(violation.value)

    return None  # Cannot auto-fix

Exercises

  1. Basic Check: Run a quality check on a dataset and print all metrics.

  2. Thresholds: Set up thresholds and implement an alert system for breaches.

  3. Violation Analysis: Query violations and create a summary report by field and error type.

  4. Pipeline Integration: Add a quality gate to an ETL pipeline that stops if quality drops below 90%.

  5. Historical Tracking: Store quality metrics over time and plot a trend chart.

Next Steps