Data Quality Monitoring¶
Learn to monitor contract (row-based) data quality, track violations, and set up alerts. For pipeline (column/dataset-based) quality checks on ETL runs (e.g. row count, null rate), see Pipeline quality and Data quality: contract vs pipeline.
What You'll Learn¶
- Run quality checks on your data
- Calculate quality metrics
- Set up threshold alerts
- Track and query violations
- Generate quality reports
- Integrate with pipelines
Prerequisites¶
Part 1: Quick Start with Convenience Functions¶
The fastest way to run a quality check is the check_quality() function — pass a contract and data, get a report:
from pycharter import check_quality
contract = {
"schema": {
"version": "1.0.0",
"properties": {
"name": {"type": "string", "minLength": 1},
"email": {"type": "string", "format": "email"},
"age": {"type": "integer", "minimum": 0, "maximum": 150}
},
"required": ["name", "email"]
}
}
data = [
{"name": "Alice", "email": "alice@example.com", "age": 30},
{"name": "Bob", "email": "invalid-email", "age": 25},
{"name": "", "email": "charlie@example.com", "age": -5},
]
report = check_quality(contract=contract, data=data)
print(f"Quality Score: {report.quality_score.overall_score:.1f}/100")
print(f"Records: {report.record_count}")
print(f"Valid: {report.valid_count}")
print(f"Invalid: {report.invalid_count}")
Presets¶
QualityCheckOptions offers three presets for common scenarios:
from pycharter import check_quality, QualityCheckOptions
# basic() — metrics + violations, no profiling or thresholds (default)
report = check_quality(contract, data)
# strict() — everything on, including thresholds with defaults
report = check_quality(contract, data, options=QualityCheckOptions.strict())
if not report.passed:
print(f"Threshold breaches: {report.threshold_breaches}")
# monitoring() — strict + skip-if-unchanged + dedup violations
report = check_quality(contract, data, options=QualityCheckOptions.monitoring())
| Preset | Metrics | Violations | Profiling | Thresholds | Skip unchanged | Dedup |
|---|---|---|---|---|---|---|
basic() |
Yes | Yes | No | No | No | Yes |
strict() |
Yes | Yes | Yes | Yes (defaults) | No | Yes |
monitoring() |
Yes | Yes | Yes | Yes (defaults) | Yes | Yes |
Standalone Data Profiling¶
Profile a dataset without a contract using profile_data():
from pycharter import profile_data
profile = profile_data(data)
print(f"Records: {profile['record_count']}")
print(f"Completeness: {profile['overall_stats']['completeness']:.1f}%")
for field, stats in profile["field_profiles"].items():
print(f" {field}: {stats['null_count']} nulls, {stats['distinct_count']} distinct")
Using the QualityCheck Class¶
For store-backed schemas, database persistence, or advanced options, use the QualityCheck class directly:
from pycharter import QualityCheck, SQLiteMetadataStore
store = SQLiteMetadataStore("quality.db")
store.connect()
schema = {
"type": "object",
"version": "1.0.0",
"properties": {
"name": {"type": "string", "minLength": 1},
"email": {"type": "string", "format": "email"},
"age": {"type": "integer", "minimum": 0, "maximum": 150}
},
"required": ["name", "email"]
}
schema_id = store.store_schema("user", schema, version="1.0.0")
check = QualityCheck(store=store)
report = check.run(
schema_id=schema_id,
data=[
{"name": "Alice", "email": "alice@example.com", "age": 30},
{"name": "Bob", "email": "invalid-email", "age": 25},
{"name": "", "email": "charlie@example.com", "age": -5},
]
)
print(f"Quality Score: {report.quality_score.overall_score:.1f}/100")
There is also a convenience function for store-based checks:
from pycharter import check_quality_with_store
report = check_quality_with_store(
store=store,
contract_name="user",
contract_version="1.0.0",
data=data,
)
Part 2: Quality Metrics¶
Understanding Metrics¶
| Metric | Description | Formula |
|---|---|---|
overall_score |
Overall quality (0-100) | Weighted combination |
violation_rate |
% of records with errors | invalid / total |
completeness |
% of non-null required fields | filled / required |
accuracy |
% of valid values | valid_values / total_values |
field_scores |
Per-field quality scores | Varies by field |
Accessing Metrics¶
report = check.run(schema_id=schema_id, data=data)
# Overall metrics
score = report.quality_score
print(f"Overall Score: {score.overall_score:.1f}")
print(f"Violation Rate: {score.violation_rate:.2%}")
print(f"Completeness: {score.completeness:.2%}")
print(f"Accuracy: {score.accuracy:.2%}")
# Record counts
print(f"Total Records: {report.record_count}")
print(f"Valid Records: {report.valid_count}")
print(f"Invalid Records: {report.invalid_count}")
print(f"Violations: {report.violation_count}")
# Field-level scores
for field, field_score in score.field_scores.items():
print(f" {field}: {field_score:.1f}")
Field Metrics¶
# Get detailed field metrics
for field_name, metrics in report.field_metrics.items():
print(f"\n{field_name}:")
print(f" Null Count: {metrics.null_count}")
print(f" Violation Count: {metrics.violation_count}")
print(f" Total Count: {metrics.total_count}")
print(f" Violation Rate: {metrics.violation_rate:.2%}")
print(f" Completeness: {metrics.completeness:.2%}")
if metrics.error_types:
print(f" Error Types: {metrics.error_types}")
Part 3: Quality Thresholds¶
Set alerts when quality drops below acceptable levels:
Defining Thresholds¶
from pycharter import QualityCheck, QualityThresholds
# Define thresholds
thresholds = QualityThresholds(
min_overall_score=95.0, # Alert if score < 95
max_violation_rate=0.05, # Alert if violations > 5%
min_completeness=0.99, # Alert if completeness < 99%
min_accuracy=0.98, # Alert if accuracy < 98%
)
# Run check with thresholds
report = check.run(
schema_id=schema_id,
data=data,
thresholds=thresholds
)
# Check if thresholds passed
print(f"Passed: {report.passed}")
print(f"Threshold Breaches: {report.threshold_breaches}")
Handling Threshold Breaches¶
report = check.run(
schema_id=schema_id,
data=data,
thresholds=thresholds
)
if not report.passed:
print("⚠️ Quality thresholds breached!")
for breach in report.threshold_breaches:
print(f" - {breach}")
# Take action
send_alert(
message=f"Data quality dropped to {report.quality_score.overall_score:.1f}",
breaches=report.threshold_breaches
)
Per-Field Thresholds¶
from pycharter import QualityCheckOptions
options = QualityCheckOptions(
calculate_metrics=True,
record_violations=True,
check_thresholds=True,
thresholds=thresholds,
field_thresholds={
"email": {"max_violation_rate": 0.01}, # Strict for email
"age": {"max_violation_rate": 0.10}, # Lenient for age
}
)
report = check.run(
schema_id=schema_id,
data=data,
options=options
)
Part 4: Violation Tracking¶
Track individual validation errors for analysis and remediation:
Recording Violations¶
from pycharter import QualityCheckOptions
options = QualityCheckOptions(
record_violations=True, # Enable violation tracking
calculate_metrics=True
)
report = check.run(
schema_id=schema_id,
data=data,
options=options
)
print(f"Violations recorded: {report.violation_count}")
Querying Violations¶
# Get violations from the store
violations = store.query_violations(
schema_id=schema_id,
status="open", # open, resolved, ignored
severity="high", # low, medium, high, critical
limit=100
)
for violation in violations:
print(f"Record ID: {violation.record_id}")
print(f"Field: {violation.field_violations}")
print(f"Severity: {violation.severity}")
print(f"Timestamp: {violation.timestamp}")
print("---")
Violation Analysis¶
# Aggregate violations by field
from collections import Counter
violations = store.query_violations(schema_id=schema_id)
field_counts = Counter()
error_types = Counter()
for v in violations:
for field_error in v.field_violations:
field_counts[field_error["field"]] += 1
error_types[field_error["type"]] += 1
print("Violations by field:")
for field, count in field_counts.most_common():
print(f" {field}: {count}")
print("\nViolations by error type:")
for error_type, count in error_types.most_common():
print(f" {error_type}: {count}")
Resolving Violations¶
# Mark violation as resolved
store.resolve_violation(
violation_id=violation.id,
resolved_by="alice@example.com",
resolution_note="Fixed upstream data source"
)
# Ignore false positive
store.update_violation_status(
violation_id=violation.id,
status="ignored",
note="Known exception for legacy records"
)
Part 5: Quality Reports¶
Basic Report¶
report = check.run(schema_id=schema_id, data=data)
# Access report data
print(f"Schema: {report.schema_id}")
print(f"Version: {report.schema_version}")
print(f"Timestamp: {report.check_timestamp}")
print(f"Duration: {report.metadata.get('duration_seconds', 'N/A')}s")
Exporting Reports¶
import json
# To dictionary
report_dict = {
"schema_id": report.schema_id,
"timestamp": report.check_timestamp,
"quality_score": {
"overall": report.quality_score.overall_score,
"violation_rate": report.quality_score.violation_rate,
"completeness": report.quality_score.completeness,
"accuracy": report.quality_score.accuracy,
},
"record_counts": {
"total": report.record_count,
"valid": report.valid_count,
"invalid": report.invalid_count,
},
"passed": report.passed,
"threshold_breaches": report.threshold_breaches,
}
# To JSON
with open("quality_report.json", "w") as f:
json.dump(report_dict, f, indent=2, default=str)
# To CSV (metrics history)
import csv
with open("quality_history.csv", "a", newline="") as f:
writer = csv.writer(f)
writer.writerow([
report.check_timestamp,
report.quality_score.overall_score,
report.quality_score.violation_rate,
report.record_count,
report.valid_count,
])
Historical Tracking¶
# Store quality metrics
metrics_id = store.store_quality_metrics(
schema_id=schema_id,
quality_score=report.quality_score,
record_count=report.record_count,
valid_count=report.valid_count,
invalid_count=report.invalid_count,
violation_count=report.violation_count,
)
# Query historical metrics
history = store.get_quality_metrics_history(
schema_id=schema_id,
limit=30 # Last 30 checks
)
# Analyze trends
import statistics
scores = [m.overall_score for m in history]
print(f"Average Score: {statistics.mean(scores):.1f}")
print(f"Min Score: {min(scores):.1f}")
print(f"Max Score: {max(scores):.1f}")
print(f"Std Dev: {statistics.stdev(scores):.2f}")
Part 6: Pipeline Integration¶
Quality Gate Pattern¶
import asyncio
from pycharter import Pipeline, QualityCheck, QualityThresholds
async def run_pipeline_with_quality_gate():
# Run ETL pipeline
pipeline = Pipeline.from_config_dir("pipelines/users/")
result = await pipeline.run()
if not result.success:
raise Exception(f"Pipeline failed: {result.errors}")
# Run quality check on output
check = QualityCheck(store=store)
report = check.run(
schema_id="user_schema",
data=read_output("output/users.json"),
thresholds=QualityThresholds(min_overall_score=95.0)
)
if not report.passed:
# Quality gate failed - don't proceed
raise Exception(f"Quality gate failed: {report.threshold_breaches}")
# Quality passed - continue to next stage
print(f"Quality gate passed with score {report.quality_score.overall_score:.1f}")
asyncio.run(run_pipeline_with_quality_gate())
Inline Validation with Metrics¶
from pycharter import Pipeline, CustomFunction, Validator
validator = Validator.from_file("contracts/user.yaml")
metrics = {"valid": 0, "invalid": 0, "errors": []}
def validate_with_metrics(records):
valid_records = []
for record in records:
result = validator.validate(record)
if result.is_valid:
metrics["valid"] += 1
valid_records.append(result.data.model_dump())
else:
metrics["invalid"] += 1
metrics["errors"].extend(result.errors)
return valid_records
pipeline = (
Pipeline(extractor)
| CustomFunction(validate_with_metrics)
| loader
)
result = asyncio.run(pipeline.run())
# Access metrics after run
print(f"Valid: {metrics['valid']}")
print(f"Invalid: {metrics['invalid']}")
print(f"Quality: {metrics['valid'] / (metrics['valid'] + metrics['invalid']):.2%}")
Scheduled Quality Checks¶
Use the monitoring() preset for recurring checks — it enables deduplication and skip-if-unchanged to avoid redundant work:
import schedule
import time
from pycharter import check_quality, QualityCheckOptions
def run_quality_check():
"""Run quality check on production data."""
data = fetch_production_data()
# monitoring() preset: thresholds + dedup + skip-if-unchanged
report = check_quality(
contract="contracts/user.yaml",
data=data,
options=QualityCheckOptions.monitoring(),
)
if not report.passed:
send_alert(
channel="#data-quality",
message=f"Quality alert: {report.threshold_breaches}"
)
# Schedule hourly checks
schedule.every().hour.do(run_quality_check)
while True:
schedule.run_pending()
time.sleep(60)
Part 7: Best Practices¶
1. Set Realistic Thresholds¶
# Start with lenient thresholds
initial_thresholds = QualityThresholds(
min_overall_score=80.0,
max_violation_rate=0.20
)
# Tighten over time as data quality improves
production_thresholds = QualityThresholds(
min_overall_score=95.0,
max_violation_rate=0.05
)
2. Track Metrics Over Time¶
# Always store metrics for trend analysis
report = check.run(schema_id=schema_id, data=data)
store.store_quality_metrics(
schema_id=schema_id,
quality_score=report.quality_score,
# ... other fields
)
3. Categorize Violations¶
# Use severity levels
def classify_violation(violation):
critical_fields = ["email", "payment_id"]
high_fields = ["name", "phone"]
field = violation["field"]
if field in critical_fields:
return "critical"
elif field in high_fields:
return "high"
else:
return "medium"
4. Automate Remediation¶
def auto_remediate(violation):
"""Attempt automatic fixes for known issues."""
if violation.error_type == "string_pattern_mismatch":
if violation.field == "phone":
# Auto-fix: strip non-digits
return fix_phone_number(violation.value)
return None # Cannot auto-fix
Exercises¶
-
Basic Check: Run a quality check on a dataset and print all metrics.
-
Thresholds: Set up thresholds and implement an alert system for breaches.
-
Violation Analysis: Query violations and create a summary report by field and error type.
-
Pipeline Integration: Add a quality gate to an ETL pipeline that stops if quality drops below 90%.
-
Historical Tracking: Store quality metrics over time and plot a trend chart.
Next Steps¶
- Metadata Store Tutorial - Store quality metrics
- REST API Tutorial - Access quality via API
- API Reference: QualityCheck - Complete documentation