Skip to content

Metadata Store & Schema Registry

Learn to store, version, and manage your data contracts in a centralized registry.

What You'll Learn

  • Set up different metadata store backends
  • Store and version schemas
  • Manage coercion and validation rules
  • Track ownership and governance
  • Query and retrieve metadata
  • Handle schema evolution

Prerequisites

pip install pycharter

For specific backends:

pip install psycopg2-binary  # PostgreSQL
pip install pymongo          # MongoDB
pip install redis            # Redis

Part 1: Store Backends

PyCharter supports multiple storage backends:

Backend Best For
InMemoryMetadataStore Testing, development
SQLiteMetadataStore Single-user, local dev
PostgresMetadataStore Production, multi-user
MongoDBMetadataStore Document-oriented
RedisMetadataStore High-performance caching

In-Memory Store

from pycharter import InMemoryMetadataStore

store = InMemoryMetadataStore()
store.connect()

# Data is lost when the program exits

SQLite Store

from pycharter import SQLiteMetadataStore

store = SQLiteMetadataStore("metadata.db")
store.connect()

# Database file persists between runs

PostgreSQL Store

from pycharter import PostgresMetadataStore

store = PostgresMetadataStore(
    "postgresql://user:password@localhost:5432/pycharter"
)
store.connect()

# Production-ready with full ACID compliance

MongoDB Store

from pycharter import MongoDBMetadataStore

store = MongoDBMetadataStore(
    "mongodb://localhost:27017/pycharter"
)
store.connect()

# Document-oriented storage

Redis Store

from pycharter import RedisMetadataStore

store = RedisMetadataStore("redis://localhost:6379/0")
store.connect()

# High-performance key-value storage

Part 2: Storing Schemas

Basic Schema Storage

from pycharter import SQLiteMetadataStore

store = SQLiteMetadataStore("metadata.db")
store.connect()

# Define a schema
schema = {
    "type": "object",
    "version": "1.0.0",
    "properties": {
        "id": {"type": "integer"},
        "name": {"type": "string", "minLength": 1},
        "email": {"type": "string", "format": "email"},
        "age": {"type": "integer", "minimum": 0}
    },
    "required": ["id", "name", "email"]
}

# Store the schema
schema_id = store.store_schema(
    name="user",
    schema=schema,
    version="1.0.0"
)

print(f"Stored schema with ID: {schema_id}")

Schema Versioning

# Store version 1.0.0
schema_v1 = {
    "type": "object",
    "version": "1.0.0",
    "properties": {
        "name": {"type": "string"},
        "email": {"type": "string"}
    }
}
store.store_schema("user", schema_v1, version="1.0.0")

# Store version 2.0.0 (added age field)
schema_v2 = {
    "type": "object",
    "version": "2.0.0",
    "properties": {
        "name": {"type": "string"},
        "email": {"type": "string"},
        "age": {"type": "integer"}  # New field
    }
}
store.store_schema("user", schema_v2, version="2.0.0")

# Get specific version
schema = store.get_schema("user", version="1.0.0")

# Get latest version
schema = store.get_schema("user")  # Returns 2.0.0

Listing Schemas

# List all schemas
schemas = store.list_schemas()
for s in schemas:
    print(f"{s['name']} v{s['version']}")

# List versions of a schema
versions = store.list_schema_versions("user")
for v in versions:
    print(f"Version: {v['version']}, Created: {v['created_at']}")

Part 3: Coercion and Validation Rules

Storing Rules

# Store coercion rules
coercion_rules = {
    "version": "1.0.0",
    "rules": {
        "age": "coerce_to_integer",
        "email": "coerce_to_lowercase",
        "created_at": "coerce_to_datetime"
    }
}

store.store_coercion_rules(
    schema_id="user",
    rules=coercion_rules,
    version="1.0.0"
)

# Store validation rules
validation_rules = {
    "version": "1.0.0",
    "rules": {
        "age": {
            "is_positive": {},
            "less_than_or_equal_to": {"threshold": 150}
        },
        "email": {
            "is_email": {}
        },
        "name": {
            "min_length": {"threshold": 1},
            "max_length": {"threshold": 100}
        }
    }
}

store.store_validation_rules(
    schema_id="user",
    rules=validation_rules,
    version="1.0.0"
)

Retrieving Rules

# Get coercion rules
coercion = store.get_coercion_rules("user")
print(f"Coercion rules: {coercion}")

# Get validation rules
validation = store.get_validation_rules("user")
print(f"Validation rules: {validation}")

# Get specific version
coercion_v1 = store.get_coercion_rules("user", version="1.0.0")

Part 4: Metadata and Ownership

Storing Metadata

# Store metadata for a schema
metadata = {
    "title": "User Schema",
    "description": "Defines the structure of user records",
    "domain": "customer",
    "data_classification": "pii",
    "retention_period": "7 years",
    "business_owners": ["product-team"],
    "technical_owners": ["data-engineering"],
    "steward": "alice@example.com",
    "tags": ["user", "customer", "pii"]
}

store.store_metadata(
    resource_id="user",
    resource_type="schema",
    metadata=metadata
)

Retrieving Metadata

# Get metadata
metadata = store.get_metadata(
    resource_id="user",
    resource_type="schema"
)

print(f"Title: {metadata.get('title')}")
print(f"Owner: {metadata.get('business_owners')}")
print(f"Tags: {metadata.get('tags')}")

Searching by Metadata

# Search schemas by tag
results = store.search_schemas(tags=["pii"])

# Search by owner
results = store.search_schemas(owner="data-engineering")

# Search by domain
results = store.search_schemas(domain="customer")

Part 5: Using with Validator

Creating Validators from Store

from pycharter import Validator, SQLiteMetadataStore

store = SQLiteMetadataStore("metadata.db")
store.connect()

# Create validator from stored schema
validator = Validator(store=store, schema_id="user")

# Validate data
result = validator.validate({
    "id": "123",      # Will be coerced
    "name": "Alice",
    "email": "ALICE@EXAMPLE.COM",  # Will be lowercased
    "age": "30"       # Will be coerced
})

if result.is_valid:
    print(f"Valid: {result.data}")

Building Contracts from Store

from pycharter import build_contract_from_store

# Build a consolidated contract
# Note: Contract has RAW schema + separate rules (not merged)
contract = build_contract_from_store(store, schema_id="user")

print(f"Schema (raw): {contract['schema']}")
print(f"Coercion rules (separate): {contract['coercion_rules']}")
print(f"Validation rules (separate): {contract['validation_rules']}")
print(f"Versions: {contract['versions']}")

Part 6: Schema Evolution

Checking Compatibility

from pycharter.schema_evolution import check_compatibility, CompatibilityMode

# Check if new schema is backward compatible
old_schema = store.get_schema("user", version="1.0.0")
new_schema = store.get_schema("user", version="2.0.0")

result = check_compatibility(
    old_schema=old_schema,
    new_schema=new_schema,
    mode=CompatibilityMode.BACKWARD
)

print(f"Compatible: {result.is_compatible}")
if not result.is_compatible:
    for issue in result.issues:
        print(f"  - {issue}")

Computing Schema Diff

from pycharter.schema_evolution import compute_diff

diff = compute_diff(old_schema, new_schema)

print("Added fields:")
for field in diff.added_fields:
    print(f"  + {field}")

print("Removed fields:")
for field in diff.removed_fields:
    print(f"  - {field}")

print("Modified fields:")
for field, changes in diff.modified_fields.items():
    print(f"  ~ {field}: {changes}")

Migration Strategies

# Check compatibility before storing new version
def store_schema_safe(store, name, schema, version):
    """Store schema only if backward compatible."""

    # Get current latest version
    try:
        current = store.get_schema(name)

        # Check compatibility
        result = check_compatibility(
            old_schema=current,
            new_schema=schema,
            mode=CompatibilityMode.BACKWARD
        )

        if not result.is_compatible:
            raise ValueError(
                f"Schema not backward compatible: {result.issues}"
            )
    except KeyError:
        # No existing schema - OK to store
        pass

    return store.store_schema(name, schema, version)

Part 7: Database Initialization

CLI Commands

# Initialize database (create tables)
pycharter db init

# Run migrations
pycharter db migrate

# Check migration status
pycharter db status

# Rollback last migration
pycharter db rollback

Programmatic Initialization

from pycharter.db import init_database, run_migrations

# Initialize database
init_database("postgresql://user:pass@localhost/pycharter")

# Run migrations
run_migrations("postgresql://user:pass@localhost/pycharter")

Part 8: Best Practices

1. Use Semantic Versioning

# Follow semver: MAJOR.MINOR.PATCH
# MAJOR: Breaking changes
# MINOR: Backward-compatible additions
# PATCH: Backward-compatible fixes

store.store_schema("user", schema, version="1.0.0")  # Initial
store.store_schema("user", schema, version="1.1.0")  # Added optional field
store.store_schema("user", schema, version="2.0.0")  # Breaking change

2. Document Ownership

# Always include ownership metadata
metadata = {
    "business_owners": ["product-team"],
    "technical_owners": ["data-engineering"],
    "steward": "alice@example.com",
    "slack_channel": "#data-contracts",
    "oncall_rotation": "data-team"
}

3. Tag for Discovery

# Use consistent tags
metadata = {
    "tags": [
        "customer",           # Domain
        "pii",               # Data classification
        "production",        # Environment
        "v2"                 # Major version
    ]
}

4. Track Lineage

# Include lineage in metadata
metadata = {
    "source_systems": ["crm", "billing"],
    "downstream_consumers": ["analytics", "marketing"],
    "data_product": "customer-360"
}

5. Automate Sync

# Sync schemas from contract files on deploy
from pathlib import Path

def sync_contracts(store, contracts_dir):
    """Sync all contract files to metadata store."""

    for contract_file in Path(contracts_dir).glob("**/*.yaml"):
        with open(contract_file) as f:
            contract = yaml.safe_load(f)

        name = contract_file.stem
        version = contract["schema"]["version"]

        store.store_schema(name, contract["schema"], version)

        if "coercion_rules" in contract:
            store.store_coercion_rules(name, contract["coercion_rules"], version)

        if "validation_rules" in contract:
            store.store_validation_rules(name, contract["validation_rules"], version)

        if "metadata" in contract:
            store.store_metadata(name, "schema", contract["metadata"])

        print(f"Synced {name} v{version}")

Exercises

  1. Setup: Create a SQLite store and store a schema with coercion/validation rules.

  2. Versioning: Store multiple versions of a schema and retrieve specific versions.

  3. Metadata: Add ownership and governance metadata, then search by tags.

  4. Evolution: Check compatibility between two schema versions and compute the diff.

  5. Integration: Create a validator from the store and validate a batch of records.

Next Steps