Skip to content

Testing Pipelines

PyCharter ships a dedicated testing framework so you can write fast, isolated unit and integration tests for your ETL pipelines — no real databases, HTTP calls, or file I/O required.

Overview

The testing module is at pycharter.etl_generator.testing and provides:

Component Purpose
MockExtractor Yields pre-configured fixture data instead of calling a real source
MockLoader Captures loaded records for inspection instead of writing to a real destination
PipelineTestHarness Wires mock extractor + loader into a pipeline and runs it
load_fixture Loads fixture records from a YAML or JSON file
assert_records_match Asserts loaded records equal expected records
assert_record_count Asserts the number of loaded rows
assert_fields_present Asserts every record contains required fields
assert_no_field Asserts a field was removed by the pipeline
assert_field_values Asserts a field's values across all records
assert_schema_shape Asserts field values have the correct Python types
validate_pipeline_config Validates a pipeline YAML config without running it

Quick start

import asyncio
import pytest
from pycharter import Pipeline
from pycharter.etl_generator.testing import (
    PipelineTestHarness,
    assert_record_count,
    assert_records_match,
    assert_fields_present,
)

FIXTURE = [
    {"userId": "1", "firstName": "Alice", "lastName": "Doe", "price": "19.99"},
    {"userId": "2", "firstName": "Bob",   "lastName": "Smith", "price": "5.00"},
]

@pytest.mark.asyncio
async def test_transform_pipeline():
    pipeline = Pipeline.from_config_dir("pipelines/orders/")

    harness = PipelineTestHarness(pipeline, fixture_data=FIXTURE)
    result = await harness.run()

    assert result.success, result.errors

    assert_record_count(result, expected=2)
    assert_fields_present(harness.loaded_records, ["user_id", "full_name", "price"])
    assert_records_match(
        harness.loaded_records,
        [
            {"user_id": "1", "full_name": "Alice Doe", "price": 19.99},
            {"user_id": "2", "full_name": "Bob Smith",  "price": 5.00},
        ],
    )

MockExtractor

Implements the Extractor protocol — yields fixture data as async batches.

from pycharter.etl_generator.testing import MockExtractor

# Flat list — auto-batched by batch_size (default 1000)
extractor = MockExtractor(data=[{"id": 1}, {"id": 2}])

# Pre-batched (two batches of two)
extractor = MockExtractor(data=[[{"id": 1}, {"id": 2}], [{"id": 3}, {"id": 4}]])

# Small batch size for testing batching logic
extractor = MockExtractor(data=list(range(10)), batch_size=3)

Assign it to a pipeline directly if you are not using PipelineTestHarness:

pipeline.extractor = MockExtractor(fixture_data)

MockLoader

Implements the Loader protocol — captures records instead of writing to a destination.

from pycharter.etl_generator.testing import MockLoader

loader = MockLoader()

# After running:
print(loader.loaded_records)  # All records across all batches (flat)
print(loader.load_calls)      # List of per-batch record lists

loader.reset()  # Clear for the next run

Simulating a load failure

loader = MockLoader(simulate_failure=True, failure_error="DB connection refused")

PipelineTestHarness

Combines MockExtractor and MockLoader and injects them into a pipeline before running it.

from pycharter.etl_generator.testing import PipelineTestHarness

harness = PipelineTestHarness(
    pipeline,
    fixture_data=[{"id": 1, "name": "Alice"}],
    batch_size=500,   # optional, default 1000
)

result = await harness.run()

# Convenient properties
harness.loaded_records  # Flat list of all loaded records
harness.load_calls      # List of per-batch record lists

Loading fixtures from files

Keep test data in version-controlled YAML or JSON files:

# Flat list format
- {order_id: "ORD-001", customer_id: "C1", amount: "99.50"}
- {order_id: "ORD-002", customer_id: "C2", amount: "14.99"}
name: "orders batched fixture"
batches:
  - [{order_id: "ORD-001", amount: "99.50"}]
  - [{order_id: "ORD-002", amount: "14.99"}]
from pycharter.etl_generator.testing import load_fixture

records = load_fixture("tests/fixtures/orders.yaml")
harness = PipelineTestHarness(pipeline, fixture_data=records)
result = await harness.run()

Assertion helpers

assert_records_match

from pycharter.etl_generator.testing import assert_records_match

# Ordered comparison (default)
assert_records_match(
    actual=harness.loaded_records,
    expected=[{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}],
)

# Order-insensitive
assert_records_match(
    actual=harness.loaded_records,
    expected=[{"id": 2}, {"id": 1}],
    order_matters=False,
)

# Subset check (actual must contain expected)
assert_records_match(
    actual=harness.loaded_records,
    expected=[{"id": 1}],
    subset=True,
)

assert_fields_present and assert_no_field

from pycharter.etl_generator.testing import assert_fields_present, assert_no_field

assert_fields_present(harness.loaded_records, ["user_id", "email", "created_at"])
assert_no_field(harness.loaded_records, "internal_debug_id")

assert_field_values

from pycharter.etl_generator.testing import assert_field_values

assert_field_values(
    harness.loaded_records,
    field_name="status",
    expected_values=["pending", "pending"],  # default filled in
)

assert_schema_shape

from pycharter.etl_generator.testing import assert_schema_shape

assert_schema_shape(
    harness.loaded_records,
    schema={"user_id": str, "price": float, "quantity": int},
)

assert_record_count

from pycharter.etl_generator.testing import assert_record_count

assert_record_count(result, expected=100)

Validating pipeline config

Check that a YAML config file is syntactically and semantically valid without running the pipeline:

from pycharter.etl_generator.testing import validate_pipeline_config

is_valid, errors = validate_pipeline_config("pipelines/orders/")
if not is_valid:
    for err in errors:
        print(f"[{err['section']}] {err['message']}")

Also accepts a config dict or a path to a single YAML file.


Patterns

pytest-asyncio setup

Add to conftest.py or pyproject.toml:

# pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto"

Or use @pytest.mark.asyncio on each async test.

Parametrized tests across fixtures

import pytest
from pathlib import Path
from pycharter.etl_generator.testing import load_fixture, PipelineTestHarness

FIXTURES = list(Path("tests/fixtures").glob("*.yaml"))

@pytest.mark.parametrize("fixture_path", FIXTURES, ids=[f.stem for f in FIXTURES])
async def test_pipeline_on_all_fixtures(pipeline, fixture_path):
    records = load_fixture(fixture_path)
    harness = PipelineTestHarness(pipeline, fixture_data=records)
    result = await harness.run()
    assert result.success, result.errors

Testing incremental logic

async def test_incremental_skips_old_records(pipeline):
    all_records = [
        {"id": 1, "updated_at": "2024-01-01T00:00:00"},
        {"id": 2, "updated_at": "2024-06-01T00:00:00"},
    ]
    # Supply only the recent record (simulating watermark-filtered extraction)
    harness = PipelineTestHarness(
        pipeline, fixture_data=[all_records[1]]
    )
    result = await harness.run()
    assert_record_count(result, expected=1)
    assert harness.loaded_records[0]["id"] == 2

See also