Metadata-Version: 2.4
Name: cognite-data-quality
Version: 0.1.0
Summary: Data quality validation runner for SHACL rules in Cognite Data Fusion.
Project-URL: Homepage, https://github.com/cognitedata/data-quality-validation
Project-URL: Repository, https://github.com/cognitedata/data-quality-validation
Author-email: Cognite <support@cognite.com>
License: Apache-2.0
License-File: LICENSE
Requires-Python: >=3.10
Requires-Dist: cognite-sdk
Requires-Dist: pydantic>=2.0.0
Requires-Dist: pyshacl>=0.25.0
Requires-Dist: pyyaml
Requires-Dist: rdflib
Requires-Dist: thisisneat==0.2.4
Requires-Dist: toml; python_version >= '3.11'
Requires-Dist: tomli; python_version < '3.11'
Provides-Extra: dev
Requires-Dist: marko<3.0.0,>=2.1.0; extra == 'dev'
Requires-Dist: mypy; extra == 'dev'
Requires-Dist: packaging>=21.3; extra == 'dev'
Requires-Dist: pre-commit; extra == 'dev'
Requires-Dist: pytest; extra == 'dev'
Requires-Dist: pytest-cov; extra == 'dev'
Requires-Dist: ruff; extra == 'dev'
Requires-Dist: twine>=6.0.0; extra == 'dev'
Requires-Dist: typer>=0.9.0; extra == 'dev'
Description-Content-Type: text/markdown

# Data Quality Validation

SHACL-based data quality validation for Cognite Data Fusion (CDF) using Cognite Functions and Workflows.

## Overview

This repository provides automated data quality validation for both **DMS instances** and **time series data** using SHACL (Shapes Constraint Language) rules. Validation results are posted to the CDF Records API for tracking and analysis.

**Key Features:**
- 🔍 **Instance Validation**: Validates DMS instances on data changes with automatic reference loading
- 📊 **Time Series Validation**: Quality checks using CDF SDK and INDSL functions (outliers, gaps, density)
- 🎯 **Multi-Environment**: Separate configurations for different environments (e.g., `cog-ai`, `abp-dev`)
- 🔄 **Automated Deployment**: CI/CD pipeline with intelligent change detection
- 📈 **Records API Integration**: All validation results stored for analysis and monitoring

**Python package** (`cognite-data-quality`):
- `run_validation`: Run validation directly from DMS (no workflows). Supports TTL, JSON, or YAML view config. Ideal for development.
- `deploy_validation_infrastructure`: Unified deployment of functions, workflows, and triggers.
- `deploy_incremental`: Smart incremental deployment with hash-based change detection.
- `call_validation`: Invoke the unified validation function with type-based dispatch (instance, timeseries, orchestrator, etc.).

## Architecture

### Instance Validation Flow

```mermaid
graph LR
    A[DMS Instance Change] --> B[Workflow Trigger]
    B --> C[Cognite Function:<br/>data-quality-validation<br/>type: instance]
    C --> D[Load SHACL Rules<br/>from CDF Files]
    C --> E[Fetch Instances<br/>from DMS]
    E --> F[Auto-load References<br/>configurable depth]
    D --> G[NEAT Validation]
    F --> G
    G --> H{Conforms?}
    H -->|Yes| I[Records API:<br/>PASSED]
    H -->|No| J[Records API:<br/>FAILED + Violations]
    I --> K[DataQualityValidationRecord]
    J --> K
```

### Time Series Validation Flow

```mermaid
graph LR
    A[Scheduled Trigger<br/>cron] --> B[Cognite Function:<br/>data-quality-validation<br/>type: timeseries]
    B --> C[Load SHACL Rules<br/>from CDF Files]
    B --> D{Selection Method}
    D -->|DMS Filter| E[Filter Time Series<br/>by prefix/properties]
    D -->|Instance IDs| F[Explicit List<br/>of Time Series]
    E --> G[Fetch Time Series<br/>from DMS]
    F --> G
    C --> H[NEAT Validation<br/>with CDF/INDSL Functions]
    G --> H
    H --> I{Backfill Mode?}
    I -->|Yes| J[Process Historic Data<br/>in Chunks]
    I -->|No| K[Validate Last<br/>Complete Hour]
    J --> L[Save Cursor State<br/>for Resume]
    K --> M[Records API:<br/>Per Time Series Result]
    L --> M
    M --> N[DataQualityValidationRecord]
```

## Repository Structure

```
.
├── cognite_data_quality/                # Python package
│   ├── _function_code/                  # Unified SHACL validation function code
│   │   ├── handler.py                   # Main dispatcher (routes by validation_type)
│   │   ├── handlers/                    # Type-specific handlers
│   │   │   ├── instance_validation.py   # Real-time instance validation
│   │   │   ├── partitioned_validation.py # Historic partition validation
│   │   │   ├── timeseries_validation.py # Time series validation with backfill
│   │   │   ├── orchestrator.py          # Historic validation orchestrator
│   │   │   └── test_rule.py             # Test SHACL rules without writing
│   │   └── common/                      # Shared utilities
│   │       ├── cursor_state.py          # Cursor state management
│   │       ├── records_api.py           # Records API helpers
│   │       ├── shacl_utils.py           # SHACL utilities
│   │       └── time_window.py           # Time window parsing
│   ├── deploy.py                        # Unified deployment API
│   ├── invoke.py                        # Function invocation API
│   ├── runner.py                        # Direct validation runner (no workflows)
│   └── template_generator.py            # Generate config templates
│
├── config/                              # Configuration by environment
│   └── environments/
│       ├── cog-ai/                      # Example environment
│       │   ├── settings.yaml            # Environment settings
│       │   ├── shacl_rules/             # SHACL rules (TTL files)
│       │   ├── views/                   # Instance validation configs
│       │   └── timeseries/              # Time series validation configs
│       └── abp-dev/                     # Another environment
│           ├── settings.yaml
│           ├── shacl_rules/
│           └── views/
│
├── generate_demo_data/                  # Demo data generator
│   ├── handler.py
│   └── requirements.txt
│
├── scripts/                             # Deployment & utility scripts
│   ├── shared.py                        # Common utilities
│   ├── deploy_infrastructure.py         # Unified deployment script (replaces 5 scripts)
│   └── validate_config.py               # Validate YAML/SHACL syntax
│
└── test_and_deploy/                     # Notebooks & dashboard
    ├── historic_data_validation.ipynb   # Historic validation notebook
    ├── validation_dashboard.py          # Streamlit dashboard
    └── batch_dashboard.py               # Batch validation dashboard
```

## Configuration Structure

### Environment Settings (`settings.yaml`)

Each environment has its own `settings.yaml` defining:

```yaml
function:                              # Unified validation function
  external_id: "data-quality-validation"
  name: "Data Quality Validation"
  description: "Unified function for all SHACL validation types (instance, partitioned, timeseries, orchestrator)"
  runtime: "py311"
  source_dir: "function"
  include_files:
    - "handler.py"
    - "handlers/**/*.py"
    - "common/**/*.py"
    - "requirements.txt"

workflow:                              # Instance workflow settings
  external_id_prefix: "dq-shacl"
  version: "1"
  task:
    retries: 3
    timeout: 600

timeseries_workflow:                   # Time series workflow settings
  external_id_prefix: "dq-ts-shacl"
  version: "1"
  task:
    retries: 3
    timeout: 1800

trigger:                               # Instance trigger settings
  external_id_prefix: "dq-shacl-trigger"
  batch_size: 10
  batch_timeout: 60

records:                               # Records API settings
  space: "dataQuality"
  container: "DataQualityValidationRecord"
  stream_id: "dq_validation_stream"

shacl_rules:
  source_dir: "config/environments/cog-ai/shacl_rules"
  mime_type: "text/turtle"

timeseries:
  config_dir: "config/environments/cog-ai/timeseries"
```

### Instance Validation Config (`views/*.yaml`)

```yaml
view:
  space: "sp_enterprise_process_industry"
  external_id: "Equipment"
  version: "v1"

instance_space: "sp_enterprise_process_industry"

shacl_rules:
  file: "equipment_shacl_rules.ttl"
  external_id: "equipment_shacl_rules"

datamodel:
  space: "sp_enterprise_process_industry"
  external_id: "EnterpriseProcessIndustry"
  version: "v1"

validation:
  auto_load_depth: 2                   # Auto-load referenced instances
  verbose: true

records:
  rule_set_id: "EquipmentSHACLv1"
  rule_set_version: "1.0"
```

### Time Series Validation Config (`timeseries/*.yaml`)

```yaml
name: "demo-timeseries-quality"
description: "Quality checks for demo time series"

# Selection method 1: DMS Filter (server-side filtering)
filter:
  instance_space: "ts_dq_test"
  view:
    space: "cdf_cdm"
    external_id: "CogniteTimeSeries"
    version: "v1"
  expression:
    type: "prefix"
    property: ["node", "externalId"]
    value: "dq_test_"
  limit: 1000

# OR Selection method 2: Explicit instance IDs
# instance_ids:
#   - space: "ts_dq_test"
#     external_id: "sensor_001"
#   - space: "ts_dq_test"
#     external_id: "sensor_002"

shacl_rules:
  file: "timeseries_quality_rules.ttl"
  external_id: "demo_timeseries_shacl_rules"

datamodel:
  space: "cdf_cdm"
  external_id: "CogniteCore"
  version: "v1"

validation:
  auto_load_depth: 0
  verbose: true

records:
  rule_set_id: "DemoTimeSeriesQuality"
  rule_set_version: "1.0"

schedule:
  cron: "0 * * * *"                    # Every hour

# Optional: Backfill historic data
backfill:
  enabled: false
  start_time: "30d-ago"
  end_time: "now"
  window_minutes: 60
```

## Deployment

### CI/CD Deployment (Recommended)

The repository uses GitHub Actions for automated deployment:

1. **Push to `main`**: Automatically deploys all changes
2. **Pull Request**: Validates configurations without deploying
3. **Manual Trigger**: Supports dry-run, force redeploy, and specific view/config selection

#### Deployment Steps

The CI/CD pipeline uses a **unified deployment script**:

```bash
python scripts/deploy_infrastructure.py --env cog-ai
```

This single script handles:
1. **Records Container**: Ensures Records API container exists
2. **Functions**: Deploys all Cognite Functions with thisisneat 0.2.3
3. **Instance Workflows**: Deploys workflows & triggers for real-time validation
4. **Time Series Workflows**: Deploys scheduled workflows for time series validation
5. **Data Models** (optional): Can deploy data models if needed

**Smart Deployment**: Only redeploys when:
- Function code or dependencies change (tracked via hash in function metadata)
- View/timeseries configuration changes (tracked via hash in workflow description)
- SHACL rules content changes
- Function secrets change

### Manual Deployment

**Option 1: Unified Script (Recommended)**

```bash
# Set environment variables
export COGNITE_CLUSTER=api
export COGNITE_PROJECT=my-project
export COGNITE_CLIENT_ID=your-client-id
export COGNITE_CLIENT_SECRET=your-client-secret
export AZURE_TENANT_ID=your-tenant-id

# Install package
pip install cognite-data-quality

# Deploy everything
python scripts/deploy_infrastructure.py --env cog-ai

# Deployment options
python scripts/deploy_infrastructure.py --env cog-ai --dry-run         # Preview changes
python scripts/deploy_infrastructure.py --env cog-ai --force           # Force redeployment
python scripts/deploy_infrastructure.py --env cog-ai --views equipment # Deploy specific views
```

**Option 2: Python API**

```python
from cognite_data_quality import (
    deploy_validation_infrastructure,
    deploy_incremental,
    load_cognite_client_from_toml,
)

client = load_cognite_client_from_toml()

# Full deployment
deploy_validation_infrastructure(
    client=client,
    env_name="cog-ai",
    config_dir="config/environments/cog-ai",
    force=False,
)

# Or incremental deployment (only changed resources)
deploy_incremental(
    client=client,
    env_name="cog-ai",
    config_dir="config/environments/cog-ai",
)
```

### GitHub Environment Setup

Create a GitHub environment (e.g., `cog-ai-arn`) with:

**Secrets:**
- `COGNITE_CLIENT_ID`: OAuth client ID
- `COGNITE_CLIENT_SECRET`: OAuth client secret

**Variables:**
- `COGNITE_CLUSTER`: CDF cluster (e.g., `api`, `westeurope-1`)
- `COGNITE_PROJECT`: CDF project name
- `AZURE_TENANT_ID`: Azure AD tenant ID

## Adding New Validations

### Adding Instance Validation

1. **Create SHACL rules** in `config/environments/{env}/shacl_rules/my_view_shacl_rules.ttl`:

```turtle
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix : <http://example.org/shapes#> .

:MyViewShape a sh:NodeShape ;
    sh:targetClass <http://example.org/MyView> ;
    sh:property [
        sh:path <http://example.org/name> ;
        sh:minCount 1 ;
        sh:datatype xsd:string ;
        sh:message "Name is required and must be a string" ;
    ] .
```

2. **Create view config** in `config/environments/{env}/views/my_view.yaml`:

```yaml
view:
  space: "my_space"
  external_id: "MyView"
  version: "v1"

instance_space: "my_instances"

shacl_rules:
  file: "my_view_shacl_rules.ttl"
  external_id: "my_view_shacl_rules"

datamodel:
  space: "my_space"
  external_id: "MyDataModel"
  version: "v1"

validation:
  auto_load_depth: 2
  verbose: true

records:
  rule_set_id: "MyViewSHACLv1"
  rule_set_version: "1.0"
```

3. **Deploy**: Commit and push to `main`, or run manually:

```bash
python scripts/deploy_infrastructure.py --env my_env --views my_view
```

### Adding Time Series Validation

1. **Create SHACL rules** in `config/environments/{env}/shacl_rules/my_timeseries_quality_rules.ttl`:

```turtle
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix cdf_sdk: <http://purl.org/cognite/cdf_sdk#> .
@prefix : <http://example.org/shapes#> .

:DataDensityShape a sh:NodeShape ;
    sh:targetClass <http://purl.org/cognite/cdf_cdm/v1/CogniteTimeSeries> ;
    sh:sparql [
        sh:message "Time series must have at least 50 datapoints in the last hour" ;
        sh:select """
            SELECT $this ?count
            WHERE {
                BIND(cdf_sdk:datapoints_count($this, "60m-ago", "now") AS ?count)
                FILTER (?count < 50)
            }
        """ ;
    ] .
```

2. **Create config** in `config/environments/{env}/timeseries/my_sensors_quality.yaml`:

```yaml
name: "my-sensors-quality"
description: "Quality checks for my sensors"

filter:
  instance_space: "my_space"
  view:
    space: "cdf_cdm"
    external_id: "CogniteTimeSeries"
    version: "v1"
  expression:
    type: "prefix"
    property: ["node", "externalId"]
    value: "sensor_"
  limit: 500

shacl_rules:
  file: "my_timeseries_quality_rules.ttl"
  external_id: "my_sensors_shacl_rules"

datamodel:
  space: "cdf_cdm"
  external_id: "CogniteCore"
  version: "v1"

validation:
  auto_load_depth: 0
  verbose: true

records:
  rule_set_id: "MySensorsQuality"
  rule_set_version: "1.0"

schedule:
  cron: "0 6 * * *"  # Daily at 6 AM UTC
```

3. **Deploy**: Commit and push to `main`, or run manually:

```bash
python scripts/deploy_infrastructure.py --env my_env --timeseries my_sensors_quality
```

## Time Series Quality Functions

The time series validation supports these CDF SPARQL functions:

### CDF SDK Functions

| Function | Description |
|----------|-------------|
| `cdf_sdk:datapoints_count(?ts, start, end)` | Count datapoints in time range |
| `cdf_sdk:datapoints_average(?ts, start, end)` | Average value in time range |
| `cdf_sdk:datapoints_min(?ts, start, end)` | Minimum value in time range |
| `cdf_sdk:datapoints_max(?ts, start, end)` | Maximum value in time range |
| `cdf_sdk:timeseries_exists(?ts)` | Check if time series exists |

### INDSL Functions

| Function | Description |
|----------|-------------|
| `cdf_indsl:extreme_outliers(?ts, alpha)` | Detect extreme outliers using modified Z-score |
| `cdf_indsl:gaps_identification(?ts, threshold)` | Detect gaps exceeding threshold |
| `cdf_indsl:value_decrease_check(?ts)` | Check for value decreases (for counters) |
| `cdf_indsl:out_of_range(?ts)` | Detect out-of-range values |

### Time Windows in SHACL Rules

Use placeholders that get replaced at runtime:

- `"60m-ago"`, `"90m-ago"`, `"24h-ago"`, `"7d-ago"`: Relative time (replaced with actual timestamp)
- `"now"`: Current time (replaced with actual timestamp)

**Example:**

```turtle
BIND(cdf_sdk:datapoints_count($this, "60m-ago", "now") AS ?count)
```

For hourly schedules, use a 90-minute window to ensure 30-minute overlap for edge cases.

## Backfill Mode

Time series validation supports backfilling historic data:

```yaml
backfill:
  enabled: true
  start_time: "30d-ago"         # Or ISO timestamp: "2024-01-01T00:00:00Z"
  end_time: "now"               # Or ISO timestamp: "2024-01-31T23:59:59Z"
  window_minutes: 60            # Chunk size (should match SHACL rule window)
```

**Features:**
- Processes data in chunks aligned to hour boundaries
- Saves cursor state for resume on failure
- Automatically switches to normal mode when complete
- Each chunk posts separate records to Records API

## Validation Results

All validation results are posted to the CDF Records API:

```json
{
  "space": "dataQuality",
  "externalId": "dq_{ruleSetId}_{instanceId}_{jobRunId}",
  "sources": [{
    "source": {
      "type": "container",
      "space": "dataQuality",
      "externalId": "DataQualityValidationRecord"
    },
    "properties": {
      "ruleSetId": "EquipmentSHACLv1",
      "ruleSetVersion": "1.0",
      "jobRunId": "validation_1706234567",
      "passedValidation": false,
      "resultSeverity": ["Violation"],
      "failedConstraints": ["MinCountConstraintComponent::name"],
      "focusNode": "http://purl.org/cognite/sp_enterprise/equipment_001",
      "focusNodeInstance": {
        "space": "sp_enterprise",
        "externalId": "equipment_001"
      },
      "validationReport": {
        "violationCount": 1,
        "violations": [{
          "sourceConstraintComponent": "MinCountConstraintComponent",
          "resultMessage": "Name is required",
          "resultSeverity": "Violation",
          "resultPath": "name",
          "value": null
        }],
        "summary": "1 violation(s)"
      },
      "dataDomainExternalId": "Equipment"
    }
  }]
}
```

## Validation Dashboard

Visualize validation results with the Streamlit dashboard:

```bash
cd test_and_deploy
pip install -r dashboard_requirements.txt
streamlit run validation_dashboard.py
```

The dashboard shows:
- Validation pass/fail rates over time
- Failed constraints breakdown
- Per-instance validation history
- Severity distribution

## Demo Data Generator

For testing time series validation, enable the demo data generator:

**Creates 25 test time series** with various quality patterns:

| Pattern | Count | Description | Expected Validation |
|---------|-------|-------------|---------------------|
| `good_*` | 5 | Regular sensor data | ✅ Pass all checks |
| `outlier_*` | 5 | Data with extreme outliers | ❌ Fail outlier detection |
| `gap_*` | 5 | Random gaps (2-6 hours) | ❌ Fail gap detection |
| `decrease_*` | 3 | Counter resets | ❌ Fail decrease checks |
| `sparse_*` | 3 | Low data density (10%) | ❌ Fail density checks |
| `noisy_*` | 2 | High variance (50% std dev) | ❌ Fail range checks |
| `empty_*` | 2 | No datapoints | ❌ Fail presence checks |

### Enable Demo Data

1. Set `demo_data.enabled: true` in your environment's `settings.yaml`
2. Deploy:

```bash
python scripts/deploy_demo_data.py
```

The generator runs every minute, creating realistic test data patterns for validation testing.

## Local Development

### Setup

1. **Create `.env` file** with CDF credentials:

```bash
COGNITE_CLUSTER=api
COGNITE_PROJECT=my-project
COGNITE_CLIENT_ID=your-client-id
COGNITE_CLIENT_SECRET=your-client-secret
AZURE_TENANT_ID=your-tenant-id
```

2. **Install dependencies:**

```bash
pip install cognite-data-quality
# Or install with development dependencies
pip install cognite-data-quality[dev]
```

3. **Use notebooks** in `test_and_deploy/`:
   - `testing_and_exploration.ipynb`: Test validation logic
   - `deploy.ipynb`: Manual deployment
   - `validation_dashboard.py`: Visualize results

### Validate Configurations

Before deploying, validate your configs:

```bash
python scripts/validate_config.py
```

This checks:
- YAML syntax
- SHACL Turtle syntax
- Required fields
- File references

## Troubleshooting

### Function Logs

View function execution logs:

```python
from cognite.client import CogniteClient
client = CogniteClient()

# Get recent function calls
calls = client.functions.calls.list(
    function_external_id="data-quality-validation",
    limit=10
)

# Get logs for a specific call
logs = client.functions.calls.get_logs(call_id=calls[0].id)
print(logs)
```

### Workflow Executions

Check workflow execution status:

```python
executions = client.workflows.executions.list(
    workflow_external_id="dq-shacl-equipment",
    limit=10
)

for execution in executions:
    print(f"Status: {execution.status}, Started: {execution.started_at}")
```

### Common Issues

**Issue**: Workflow not triggering on data changes
- Check trigger status: `client.workflows.triggers.retrieve(external_id="...")`
- Verify instance space matches trigger filter
- Check workflow version matches trigger version

**Issue**: "Failed to load SHACL rules"
- Ensure SHACL file exists in CDF Files
- Check file external_id matches config
- Verify file content is valid Turtle syntax

**Issue**: "No instances to validate"
- Check DMS filter in time series config
- Verify time series exist in specified space
- Ensure view and version match

## Prerequisites

- Python 3.10+
- Cognite Python SDK
- thisisneat 0.2.3 (automatically installed with package)
- Access to CDF project with:
  - Cognite Functions
  - Workflows
  - Records API (alpha)
  - Data Modeling Services (DMS)

## License

See LICENSE file for details.
