Skip to content

Testing Framework

pycharter.etl_generator.testing provides mock components, assertion helpers, fixture loading, and a test harness for writing isolated unit and integration tests of ETL pipelines.

See Testing Pipelines for the full guide.

Classes

MockExtractor

MockExtractor(
    data: list[dict[str, Any]] | list[list[dict[str, Any]]],
    batch_size: int = 1000,
)

Mock extractor that yields pre-configured fixture data.

Implements the Extractor protocol for testing pipelines without real data sources.

Parameters:

Name Type Description Default
data list[dict[str, Any]] | list[list[dict[str, Any]]]

Fixture records. Either a flat list of dicts (auto-batched by batch_size) or a pre-batched list of lists.

required
batch_size int

Number of records per batch when data is flat.

1000
Source code in src/pycharter/etl_generator/testing.py
def __init__(
    self,
    data: list[dict[str, Any]] | list[list[dict[str, Any]]],
    batch_size: int = 1000,
) -> None:
    self._data = data
    self._batch_size = batch_size

extract async

extract(
    **params: Any,
) -> AsyncIterator[list[dict[str, Any]]]

Yield fixture data as batches.

Yields:

Type Description
AsyncIterator[list[dict[str, Any]]]

Batches of records from the configured fixture data.

Source code in src/pycharter/etl_generator/testing.py
async def extract(self, **params: Any) -> AsyncIterator[list[dict[str, Any]]]:
    """Yield fixture data as batches.

    Yields:
        Batches of records from the configured fixture data.
    """
    if not self._data:
        return

    # Detect pre-batched vs flat data
    if self._data and isinstance(self._data[0], list):
        for batch in self._data:
            yield batch
    else:
        # Chunk flat data by batch_size
        flat: list[dict[str, Any]] = self._data  # type: ignore[assignment]
        for i in range(0, len(flat), self._batch_size):
            yield flat[i : i + self._batch_size]

MockLoader dataclass

MockLoader(
    simulate_failure: bool = False,
    failure_error: str = "Simulated load failure",
    loaded_records: list[dict[str, Any]] = list(),
    load_calls: list[list[dict[str, Any]]] = list(),
)

Mock loader that captures loaded records for assertion.

Implements the Loader protocol. Records are accumulated in loaded_records and each call's batch is stored in load_calls.

Parameters:

Name Type Description Default
simulate_failure bool

If True, load() returns a failure result.

False
failure_error str

Error message used when simulating failure.

'Simulated load failure'

load async

load(
    data: list[dict[str, Any]], **params: Any
) -> LoadResult

Capture loaded data and return a result.

Parameters:

Name Type Description Default
data list[dict[str, Any]]

Batch of records to load.

required
**params Any

Ignored.

{}

Returns:

Type Description
LoadResult

LoadResult indicating success or simulated failure.

Source code in src/pycharter/etl_generator/testing.py
async def load(self, data: list[dict[str, Any]], **params: Any) -> LoadResult:
    """Capture loaded data and return a result.

    Args:
        data: Batch of records to load.
        **params: Ignored.

    Returns:
        LoadResult indicating success or simulated failure.
    """
    self.load_calls.append(list(data))

    if self.simulate_failure:
        return LoadResult(
            success=False,
            rows_loaded=0,
            rows_failed=len(data),
            error=self.failure_error,
        )

    self.loaded_records.extend(data)
    return LoadResult(success=True, rows_loaded=len(data))

reset

reset() -> None

Clear all captured records and call history.

Source code in src/pycharter/etl_generator/testing.py
def reset(self) -> None:
    """Clear all captured records and call history."""
    self.loaded_records.clear()
    self.load_calls.clear()

PipelineTestHarness

PipelineTestHarness(
    pipeline: Any,
    fixture_data: (
        list[dict[str, Any]] | list[list[dict[str, Any]]]
    ),
    batch_size: int = 1000,
)

Run a pipeline with mock I/O injected.

Works with both programmatic and config-driven pipelines by replacing the extractor and loader with mock implementations.

Parameters:

Name Type Description Default
pipeline Any

The pipeline to test.

required
fixture_data list[dict[str, Any]] | list[list[dict[str, Any]]]

Test data — flat list of dicts or pre-batched list of lists.

required
batch_size int

Batch size for the mock extractor.

1000
Example

harness = PipelineTestHarness( ... pipeline, fixture_data=[{"id": 1, "name": "Alice"}] ... ) result = await harness.run() assert result.success assert harness.loaded_records == [{"id": 1, "name": "Alice"}]

Source code in src/pycharter/etl_generator/testing.py
def __init__(
    self,
    pipeline: Any,
    fixture_data: list[dict[str, Any]] | list[list[dict[str, Any]]],
    batch_size: int = 1000,
) -> None:
    self._pipeline = pipeline
    self._mock_extractor = MockExtractor(fixture_data, batch_size=batch_size)
    self._mock_loader = MockLoader()

loaded_records property

loaded_records: list[dict[str, Any]]

Records captured by the mock loader.

load_calls property

load_calls: list[list[dict[str, Any]]]

Individual load call batches.

run async

run(**params: Any) -> PipelineResult

Run the pipeline with mock extractor and loader.

Parameters:

Name Type Description Default
**params Any

Passed through to pipeline.run().

{}

Returns:

Type Description
PipelineResult

PipelineResult from the pipeline execution.

Source code in src/pycharter/etl_generator/testing.py
async def run(self, **params: Any) -> PipelineResult:
    """Run the pipeline with mock extractor and loader.

    Args:
        **params: Passed through to ``pipeline.run()``.

    Returns:
        PipelineResult from the pipeline execution.
    """
    self._pipeline.extractor = self._mock_extractor
    self._pipeline.loader = self._mock_loader
    return await self._pipeline.run(**params)

TestFixture dataclass

TestFixture(
    name: str = "",
    records: tuple[dict[str, Any], ...] = (),
    batches: tuple[tuple[dict[str, Any], ...], ...] = (),
    metadata: dict[str, Any] = dict(),
)

Container for loaded test fixture data.

Attributes:

Name Type Description
name str

Optional fixture name from metadata.

records tuple[dict[str, Any], ...]

Flat list of records.

batches tuple[tuple[dict[str, Any], ...], ...]

Pre-batched records (empty if data was flat).

metadata dict[str, Any]

Additional metadata from the fixture file.

Functions

load_fixture

load_fixture(path: str | Path) -> list[dict[str, Any]]

Load fixture data from a YAML or JSON file.

Supported formats
  • Top-level list: [{id: 1}, ...]
  • Dict with records key: {records: [{id: 1}, ...]}
  • Dict with batches key: flattened into a single list.

Parameters:

Name Type Description Default
path str | Path

Path to the fixture file (.yaml, .yml, or .json).

required

Returns:

Type Description
list[dict[str, Any]]

Flat list of records.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

If the format is unrecognized.

Source code in src/pycharter/etl_generator/testing.py
def load_fixture(path: str | Path) -> list[dict[str, Any]]:
    """Load fixture data from a YAML or JSON file.

    Supported formats:
        - Top-level list: ``[{id: 1}, ...]``
        - Dict with ``records`` key: ``{records: [{id: 1}, ...]}``
        - Dict with ``batches`` key: flattened into a single list.

    Args:
        path: Path to the fixture file (.yaml, .yml, or .json).

    Returns:
        Flat list of records.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If the format is unrecognized.
    """
    fixture = load_test_fixture(path)
    return list(fixture.records)

load_test_fixture

load_test_fixture(path: str | Path) -> TestFixture

Load a test fixture with full metadata.

Parameters:

Name Type Description Default
path str | Path

Path to the fixture file (.yaml, .yml, or .json).

required

Returns:

Type Description
TestFixture

TestFixture with records, batches, and metadata.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

If the format is unrecognized.

Source code in src/pycharter/etl_generator/testing.py
def load_test_fixture(path: str | Path) -> TestFixture:
    """Load a test fixture with full metadata.

    Args:
        path: Path to the fixture file (.yaml, .yml, or .json).

    Returns:
        TestFixture with records, batches, and metadata.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If the format is unrecognized.
    """
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"Fixture file not found: {path}")

    data = _read_fixture_file(path)
    return _parse_fixture_data(data)

validate_pipeline_config

validate_pipeline_config(
    config: dict[str, Any] | str | Path,
    *,
    variables: dict[str, str] | None = None
) -> tuple[bool, list[dict[str, Any]]]

Validate a pipeline configuration for correctness.

Wraps the existing ConfigValidator for a simpler testing API.

Parameters:

Name Type Description Default
config dict[str, Any] | str | Path

Pipeline config as a dict, file path string, or Path object.

required
variables dict[str, str] | None

Optional variables for ${VAR} substitution.

None

Returns:

Type Description
bool

Tuple of (is_valid, errors) where errors is a list of dicts

list[dict[str, Any]]

with 'section' and 'message' keys.

Source code in src/pycharter/etl_generator/testing.py
def validate_pipeline_config(
    config: dict[str, Any] | str | Path,
    *,
    variables: dict[str, str] | None = None,
) -> tuple[bool, list[dict[str, Any]]]:
    """Validate a pipeline configuration for correctness.

    Wraps the existing ``ConfigValidator`` for a simpler testing API.

    Args:
        config: Pipeline config as a dict, file path string, or Path object.
        variables: Optional variables for ``${VAR}`` substitution.

    Returns:
        Tuple of (is_valid, errors) where errors is a list of dicts
        with 'section' and 'message' keys.
    """
    from pycharter.etl_generator.config_validator import ConfigValidator
    from pycharter.etl_generator.context import PipelineContext

    errors: list[dict[str, Any]] = []

    # Load config from file if needed
    if isinstance(config, (str, Path)):
        path = Path(config)
        if not path.exists():
            return False, [{"section": "file", "message": f"File not found: {path}"}]
        if path.is_dir():
            return _validate_config_dir(path, variables)
        with open(path) as f:
            raw = f.read()
        if variables:
            context = PipelineContext(variables=variables)
            raw = context.resolve(raw)
        config = yaml.safe_load(raw) or {}

    if not isinstance(config, dict):
        return False, [{"section": "config", "message": "Config must be a dict"}]

    validator = ConfigValidator(strict=True)

    # Validate extract
    extract_config = config.get("extract", {})
    if extract_config:
        is_valid, extract_errors = validator.check_extract(extract_config)
        if not is_valid:
            for err in extract_errors:
                errors.append({"section": "extract", "message": str(err)})

    # Validate transform
    transform_config = config.get("transform")
    if transform_config:
        if isinstance(transform_config, list):
            is_valid, transform_errors = validator.check_transform(
                {"transform": transform_config}
            )
        else:
            is_valid, transform_errors = validator.check_transform(transform_config)
        if not is_valid:
            for err in transform_errors:
                errors.append({"section": "transform", "message": str(err)})

    # Validate load
    load_config = config.get("load", {})
    if load_config:
        is_valid, load_errors = validator.check_load(load_config)
        if not is_valid:
            for err in load_errors:
                errors.append({"section": "load", "message": str(err)})

    return len(errors) == 0, errors

assert_records_match

assert_records_match(
    actual: list[dict[str, Any]],
    expected: list[dict[str, Any]],
    *,
    order_matters: bool = True,
    subset: bool = False
) -> None

Assert that actual records match expected records.

Parameters:

Name Type Description Default
actual list[dict[str, Any]]

Records produced by the pipeline.

required
expected list[dict[str, Any]]

Expected records.

required
order_matters bool

If True, records must be in the same order.

True
subset bool

If True, actual must contain expected as a subset.

False

Raises:

Type Description
AssertionError

If records do not match.

Source code in src/pycharter/etl_generator/testing.py
def assert_records_match(
    actual: list[dict[str, Any]],
    expected: list[dict[str, Any]],
    *,
    order_matters: bool = True,
    subset: bool = False,
) -> None:
    """Assert that actual records match expected records.

    Args:
        actual: Records produced by the pipeline.
        expected: Expected records.
        order_matters: If True, records must be in the same order.
        subset: If True, actual must contain expected as a subset.

    Raises:
        AssertionError: If records do not match.
    """
    if subset:
        for i, exp in enumerate(expected):
            if exp not in actual:
                raise AssertionError(
                    f"Expected record at index {i} not found in actual: {exp}"
                )
        return

    if not order_matters:
        _assert_unordered(actual, expected)
        return

    if len(actual) != len(expected):
        raise AssertionError(
            f"Record count mismatch: got {len(actual)}, expected {len(expected)}"
        )
    for i, (act, exp) in enumerate(zip(actual, expected)):
        if act != exp:
            raise AssertionError(
                f"Record mismatch at index {i}:\n  actual:   {act}\n  expected: {exp}"
            )

assert_record_count

assert_record_count(
    result: PipelineResult, expected: int
) -> None

Assert the number of loaded rows in a pipeline result.

Parameters:

Name Type Description Default
result PipelineResult

PipelineResult from a pipeline run.

required
expected int

Expected number of loaded rows.

required

Raises:

Type Description
AssertionError

If the count does not match.

Source code in src/pycharter/etl_generator/testing.py
def assert_record_count(result: PipelineResult, expected: int) -> None:
    """Assert the number of loaded rows in a pipeline result.

    Args:
        result: PipelineResult from a pipeline run.
        expected: Expected number of loaded rows.

    Raises:
        AssertionError: If the count does not match.
    """
    if result.rows_loaded != expected:
        raise AssertionError(
            f"Row count mismatch: got {result.rows_loaded}, expected {expected}"
        )

assert_fields_present

assert_fields_present(
    records: list[dict[str, Any]], fields: list[str]
) -> None

Assert that all specified fields exist in every record.

Parameters:

Name Type Description Default
records list[dict[str, Any]]

List of records to check.

required
fields list[str]

Field names that must be present.

required

Raises:

Type Description
AssertionError

If any field is missing from any record.

Source code in src/pycharter/etl_generator/testing.py
def assert_fields_present(records: list[dict[str, Any]], fields: list[str]) -> None:
    """Assert that all specified fields exist in every record.

    Args:
        records: List of records to check.
        fields: Field names that must be present.

    Raises:
        AssertionError: If any field is missing from any record.
    """
    for i, record in enumerate(records):
        for f in fields:
            if f not in record:
                raise AssertionError(
                    f"Field '{f}' missing from record at index {i}: {record}"
                )

assert_no_field

assert_no_field(
    records: list[dict[str, Any]], field_name: str
) -> None

Assert that a field does not exist in any record.

Parameters:

Name Type Description Default
records list[dict[str, Any]]

List of records to check.

required
field_name str

Field name that must be absent.

required

Raises:

Type Description
AssertionError

If the field is found in any record.

Source code in src/pycharter/etl_generator/testing.py
def assert_no_field(records: list[dict[str, Any]], field_name: str) -> None:
    """Assert that a field does not exist in any record.

    Args:
        records: List of records to check.
        field_name: Field name that must be absent.

    Raises:
        AssertionError: If the field is found in any record.
    """
    for i, record in enumerate(records):
        if field_name in record:
            raise AssertionError(
                f"Field '{field_name}' found in record at index {i}: {record}"
            )

assert_field_values

assert_field_values(
    records: list[dict[str, Any]],
    field_name: str,
    expected_values: list[Any],
) -> None

Assert that a specific field has the expected values across records.

Parameters:

Name Type Description Default
records list[dict[str, Any]]

List of records to check.

required
field_name str

Field to extract values from.

required
expected_values list[Any]

Expected values in order.

required

Raises:

Type Description
AssertionError

If values do not match.

Source code in src/pycharter/etl_generator/testing.py
def assert_field_values(
    records: list[dict[str, Any]],
    field_name: str,
    expected_values: list[Any],
) -> None:
    """Assert that a specific field has the expected values across records.

    Args:
        records: List of records to check.
        field_name: Field to extract values from.
        expected_values: Expected values in order.

    Raises:
        AssertionError: If values do not match.
    """
    actual_values = [r.get(field_name) for r in records]
    if actual_values != expected_values:
        raise AssertionError(
            f"Field '{field_name}' values mismatch:\n"
            f"  actual:   {actual_values}\n  expected: {expected_values}"
        )

assert_schema_shape

assert_schema_shape(
    records: list[dict[str, Any]], schema: dict[str, type]
) -> None

Assert that field values match expected types.

None values are allowed for any field type.

Parameters:

Name Type Description Default
records list[dict[str, Any]]

List of records to check.

required
schema dict[str, type]

Mapping of field name to expected Python type.

required

Raises:

Type Description
AssertionError

If any field has an unexpected type.

Source code in src/pycharter/etl_generator/testing.py
def assert_schema_shape(records: list[dict[str, Any]], schema: dict[str, type]) -> None:
    """Assert that field values match expected types.

    None values are allowed for any field type.

    Args:
        records: List of records to check.
        schema: Mapping of field name to expected Python type.

    Raises:
        AssertionError: If any field has an unexpected type.
    """
    for i, record in enumerate(records):
        for field_name, expected_type in schema.items():
            if field_name not in record:
                raise AssertionError(
                    f"Field '{field_name}' missing from record at index {i}"
                )
            value = record[field_name]
            if value is not None and not isinstance(value, expected_type):
                raise AssertionError(
                    f"Field '{field_name}' at index {i}: expected "
                    f"{expected_type.__name__}, got {type(value).__name__} "
                    f"(value={value!r})"
                )

Import

from pycharter.etl_generator.testing import (
    # Mock components
    MockExtractor,
    MockLoader,
    # Test harness
    PipelineTestHarness,
    # Fixtures
    load_fixture,
    load_test_fixture,
    TestFixture,
    # Config validation
    validate_pipeline_config,
    # Assertion helpers
    assert_records_match,
    assert_record_count,
    assert_fields_present,
    assert_no_field,
    assert_field_values,
    assert_schema_shape,
)

See also