Testing Pipelines¶
PyCharter ships a dedicated testing framework so you can write fast, isolated unit and integration tests for your ETL pipelines — no real databases, HTTP calls, or file I/O required.
Overview¶
The testing module is at pycharter.etl_generator.testing and provides:
| Component | Purpose |
|---|---|
MockExtractor |
Yields pre-configured fixture data instead of calling a real source |
MockLoader |
Captures loaded records for inspection instead of writing to a real destination |
PipelineTestHarness |
Wires mock extractor + loader into a pipeline and runs it |
load_fixture |
Loads fixture records from a YAML or JSON file |
assert_records_match |
Asserts loaded records equal expected records |
assert_record_count |
Asserts the number of loaded rows |
assert_fields_present |
Asserts every record contains required fields |
assert_no_field |
Asserts a field was removed by the pipeline |
assert_field_values |
Asserts a field's values across all records |
assert_schema_shape |
Asserts field values have the correct Python types |
validate_pipeline_config |
Validates a pipeline YAML config without running it |
Quick start¶
import asyncio
import pytest
from pycharter import Pipeline
from pycharter.etl_generator.testing import (
PipelineTestHarness,
assert_record_count,
assert_records_match,
assert_fields_present,
)
FIXTURE = [
{"userId": "1", "firstName": "Alice", "lastName": "Doe", "price": "19.99"},
{"userId": "2", "firstName": "Bob", "lastName": "Smith", "price": "5.00"},
]
@pytest.mark.asyncio
async def test_transform_pipeline():
pipeline = Pipeline.from_config_dir("pipelines/orders/")
harness = PipelineTestHarness(pipeline, fixture_data=FIXTURE)
result = await harness.run()
assert result.success, result.errors
assert_record_count(result, expected=2)
assert_fields_present(harness.loaded_records, ["user_id", "full_name", "price"])
assert_records_match(
harness.loaded_records,
[
{"user_id": "1", "full_name": "Alice Doe", "price": 19.99},
{"user_id": "2", "full_name": "Bob Smith", "price": 5.00},
],
)
MockExtractor¶
Implements the Extractor protocol — yields fixture data as async batches.
from pycharter.etl_generator.testing import MockExtractor
# Flat list — auto-batched by batch_size (default 1000)
extractor = MockExtractor(data=[{"id": 1}, {"id": 2}])
# Pre-batched (two batches of two)
extractor = MockExtractor(data=[[{"id": 1}, {"id": 2}], [{"id": 3}, {"id": 4}]])
# Small batch size for testing batching logic
extractor = MockExtractor(data=list(range(10)), batch_size=3)
Assign it to a pipeline directly if you are not using PipelineTestHarness:
MockLoader¶
Implements the Loader protocol — captures records instead of writing to a destination.
from pycharter.etl_generator.testing import MockLoader
loader = MockLoader()
# After running:
print(loader.loaded_records) # All records across all batches (flat)
print(loader.load_calls) # List of per-batch record lists
loader.reset() # Clear for the next run
Simulating a load failure¶
PipelineTestHarness¶
Combines MockExtractor and MockLoader and injects them into a pipeline before running it.
from pycharter.etl_generator.testing import PipelineTestHarness
harness = PipelineTestHarness(
pipeline,
fixture_data=[{"id": 1, "name": "Alice"}],
batch_size=500, # optional, default 1000
)
result = await harness.run()
# Convenient properties
harness.loaded_records # Flat list of all loaded records
harness.load_calls # List of per-batch record lists
Loading fixtures from files¶
Keep test data in version-controlled YAML or JSON files:
from pycharter.etl_generator.testing import load_fixture
records = load_fixture("tests/fixtures/orders.yaml")
harness = PipelineTestHarness(pipeline, fixture_data=records)
result = await harness.run()
Assertion helpers¶
assert_records_match¶
from pycharter.etl_generator.testing import assert_records_match
# Ordered comparison (default)
assert_records_match(
actual=harness.loaded_records,
expected=[{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}],
)
# Order-insensitive
assert_records_match(
actual=harness.loaded_records,
expected=[{"id": 2}, {"id": 1}],
order_matters=False,
)
# Subset check (actual must contain expected)
assert_records_match(
actual=harness.loaded_records,
expected=[{"id": 1}],
subset=True,
)
assert_fields_present and assert_no_field¶
from pycharter.etl_generator.testing import assert_fields_present, assert_no_field
assert_fields_present(harness.loaded_records, ["user_id", "email", "created_at"])
assert_no_field(harness.loaded_records, "internal_debug_id")
assert_field_values¶
from pycharter.etl_generator.testing import assert_field_values
assert_field_values(
harness.loaded_records,
field_name="status",
expected_values=["pending", "pending"], # default filled in
)
assert_schema_shape¶
from pycharter.etl_generator.testing import assert_schema_shape
assert_schema_shape(
harness.loaded_records,
schema={"user_id": str, "price": float, "quantity": int},
)
assert_record_count¶
from pycharter.etl_generator.testing import assert_record_count
assert_record_count(result, expected=100)
Validating pipeline config¶
Check that a YAML config file is syntactically and semantically valid without running the pipeline:
from pycharter.etl_generator.testing import validate_pipeline_config
is_valid, errors = validate_pipeline_config("pipelines/orders/")
if not is_valid:
for err in errors:
print(f"[{err['section']}] {err['message']}")
Also accepts a config dict or a path to a single YAML file.
Patterns¶
pytest-asyncio setup¶
Add to conftest.py or pyproject.toml:
Or use @pytest.mark.asyncio on each async test.
Parametrized tests across fixtures¶
import pytest
from pathlib import Path
from pycharter.etl_generator.testing import load_fixture, PipelineTestHarness
FIXTURES = list(Path("tests/fixtures").glob("*.yaml"))
@pytest.mark.parametrize("fixture_path", FIXTURES, ids=[f.stem for f in FIXTURES])
async def test_pipeline_on_all_fixtures(pipeline, fixture_path):
records = load_fixture(fixture_path)
harness = PipelineTestHarness(pipeline, fixture_data=records)
result = await harness.run()
assert result.success, result.errors
Testing incremental logic¶
async def test_incremental_skips_old_records(pipeline):
all_records = [
{"id": 1, "updated_at": "2024-01-01T00:00:00"},
{"id": 2, "updated_at": "2024-06-01T00:00:00"},
]
# Supply only the recent record (simulating watermark-filtered extraction)
harness = PipelineTestHarness(
pipeline, fixture_data=[all_records[1]]
)
result = await harness.run()
assert_record_count(result, expected=1)
assert harness.loaded_records[0]["id"] == 2