Skip to content

Pipeline

The Pipeline class is the main entry point for building and running ETL pipelines.

Overview

from pycharter import Pipeline, HTTPExtractor, FileLoader, Rename

# Build with pipe operator
pipeline = (
    Pipeline(HTTPExtractor(url="https://api.example.com/data"))
    | Rename({"old": "new"})
    | FileLoader(path="output.json")
)

# Run
result = await pipeline.run()

API Reference

Pipeline

Pipeline(
    extractor: Extractor | None = None,
    transformers: list[Transformer] | None = None,
    loader: Loader | None = None,
    context: PipelineContext | None = None,
    name: str | None = None,
    transform_config: (
        dict[str, Any] | list[dict[str, Any]] | None
    ) = None,
    extract_validation_config: dict[str, Any] | None = None,
    load_validation_config: dict[str, Any] | None = None,
    quality_checks_config: (
        list[dict[str, Any]] | None
    ) = None,
    settings: dict[str, Any] | None = None,
    base_dir: Path | None = None,
    extract_config_raw: dict[str, Any] | None = None,
    load_config_raw: dict[str, Any] | None = None,
)

ETL Pipeline with | operator for chaining transformers.

Programmatic usage

pipeline = ( ... Pipeline(HTTPExtractor(url="...")) ... | Rename({"old": "new"}) ... | PostgresLoader(...) ... ) result = await pipeline.run()

Config-driven usage (variables and settings are first-class in all methods): >>> # From explicit files (extract, load, transform, variables, settings) >>> pipeline = Pipeline.from_config_files( ... "configs/extract.yaml", "configs/load.yaml", ... variables={"API_KEY": "secret"} ... ) >>> >>> # From directory (extract.yaml, load.yaml, optional transform/settings/variables.yaml) >>> pipeline = Pipeline.from_config_dir("pipelines/users/") >>> >>> # From single file (all sections in one YAML) >>> pipeline = Pipeline.from_config_file("pipelines/users/pipeline.yaml") >>> >>> result = await pipeline.run()

Async execution

run() is async. From a script use asyncio.run(): asyncio.run(pipeline.run()) From an async context (FastAPI, Jupyter) await directly: result = await pipeline.run() See pycharter/etl_generator/ASYNC_AND_EXECUTION.md for details.

Source code in src/pycharter/etl_generator/pipeline.py
def __init__(
    self,
    extractor: Extractor | None = None,
    transformers: list[Transformer] | None = None,
    loader: Loader | None = None,
    context: PipelineContext | None = None,
    name: str | None = None,
    transform_config: dict[str, Any] | list[dict[str, Any]] | None = None,
    # Validation configs
    extract_validation_config: dict[str, Any] | None = None,
    load_validation_config: dict[str, Any] | None = None,
    # Quality checks config (post-load)
    quality_checks_config: list[dict[str, Any]] | None = None,
    # Shared settings
    settings: dict[str, Any] | None = None,
    # Base directory for resolving relative paths
    base_dir: Path | None = None,
    # Raw configs for lineage dataset descriptors (set by _build_from_configs)
    extract_config_raw: dict[str, Any] | None = None,
    load_config_raw: dict[str, Any] | None = None,
):
    self.extractor = extractor
    self._transformers: list[Transformer] = (
        list(transformers) if transformers else []
    )
    self.loader = loader
    self.context = context or PipelineContext()
    self.name = name
    # For config-driven runs, store raw transform config and use apply_transforms
    self._transform_config = transform_config
    # Validation configs
    self._extract_validation_config = extract_validation_config
    self._load_validation_config = load_validation_config
    # Quality checks config (post-load)
    self._quality_checks_config = quality_checks_config
    # Shared settings from settings.yaml
    self._settings = settings or {}
    # Base directory for resolving file paths
    self._base_dir = base_dir
    # Run ID (set during run())
    self._run_id: str | None = None
    # Raw extract/load configs for OpenLineage dataset descriptors
    self._extract_config_raw = extract_config_raw
    self._load_config_raw = load_config_raw

__or__

__or__(other: Transformer | Loader) -> 'Pipeline'

Chain transformer or set loader using | operator.

Source code in src/pycharter/etl_generator/pipeline.py
def __or__(self, other: Transformer | Loader) -> "Pipeline":
    """Chain transformer or set loader using | operator."""
    if isinstance(other, Loader):
        return Pipeline(
            extractor=self.extractor,
            transformers=self._transformers.copy(),
            loader=other,
            context=self.context,
            name=self.name,
            extract_validation_config=self._extract_validation_config,
            load_validation_config=self._load_validation_config,
            quality_checks_config=self._quality_checks_config,
            settings=self._settings,
            base_dir=self._base_dir,
        )
    else:
        new_transformers = self._transformers.copy()
        new_transformers.append(other)
        return Pipeline(
            extractor=self.extractor,
            transformers=new_transformers,
            loader=self.loader,
            context=self.context,
            name=self.name,
            extract_validation_config=self._extract_validation_config,
            load_validation_config=self._load_validation_config,
            quality_checks_config=self._quality_checks_config,
            settings=self._settings,
            base_dir=self._base_dir,
        )

run async

run(
    dry_run: bool = False,
    error_context: ErrorContext | None = None,
    **params: Any
) -> PipelineResult

Run the ETL pipeline.

Parameters:

Name Type Description Default
dry_run bool

If True, extract and transform but do not load.

False
error_context ErrorContext | None

Optional error context for handling failures. If not set, uses the default from get_error_context(). In STRICT mode, extraction or load failures raise. In LENIENT/COLLECT mode, errors are logged and appended to result.errors.

None
**params Any

Passed to extractor.extract() and loader.load().

{}

Returns:

Type Description
PipelineResult

PipelineResult with counts and any errors.

Source code in src/pycharter/etl_generator/pipeline.py
async def run(
    self,
    dry_run: bool = False,
    error_context: ErrorContext | None = None,
    **params: Any,
) -> PipelineResult:
    """
    Run the ETL pipeline.

    Args:
        dry_run: If True, extract and transform but do not load.
        error_context: Optional error context for handling failures.
            If not set, uses the default from get_error_context().
            In STRICT mode, extraction or load failures raise.
            In LENIENT/COLLECT mode, errors are logged and appended to result.errors.
        **params: Passed to extractor.extract() and loader.load().

    Returns:
        PipelineResult with counts and any errors.
    """
    run_id = str(uuid.uuid4())[:8]
    self._run_id = run_id
    start_time = datetime.now(timezone.utc)
    ctx = error_context or get_error_context()

    result = PipelineResult(
        pipeline_name=self.name,
        run_id=run_id,
        start_time=start_time,
    )

    if not self.extractor:
        result.success = False
        result.errors.append("No extractor configured")
        return result

    logger.info("[%s] Starting pipeline: %s", run_id, self.name or "unnamed")

    # Incremental extraction: inject watermark into context
    state_store = None
    existing_state = None
    inc_config = None
    max_watermark: str | None = None

    if self._extract_config_raw and isinstance(self._extract_config_raw, dict):
        inc_raw = self._extract_config_raw.get("incremental")
        if inc_raw and inc_raw.get("enabled", True):
            inc_config = inc_raw
            state_store = self._create_state_store()
            existing_state = state_store.get(self.name or "unnamed")
            current_wm = (
                existing_state.watermark if existing_state else None
            ) or inc_config.get("initial_value", "")
            max_watermark = current_wm
            self.context.set("__watermark__", current_wm)
            logger.info("[%s] Incremental mode: watermark=%s", run_id, current_wm)

    # Lineage: build emitter and dataset descriptors (no-op when disabled)
    emitter, input_ds, output_ds = self._prepare_lineage()
    if emitter:
        run_id = str(uuid.uuid4())
        result.run_id = run_id

    # Create validators if configured
    extract_validator = self._create_extract_validator()
    load_validator = self._create_load_validator()

    # Accumulate loaded records for quality checks
    all_loaded_records: list[dict[str, Any]] = []
    quality_checker = self._create_quality_checker()

    # Lineage: emit START and capture payload for response
    if emitter:
        try:
            payload = emitter.start(self.name or "unnamed", run_id, input_ds)
            if payload:
                result.lineage_events.append(payload)
        except Exception:
            logger.debug("Lineage START emission failed", exc_info=True)

    try:
        batch_index = 0
        async for batch in self.extractor.extract(**params):
            batch_result = BatchResult(batch_index=batch_index, rows_in=len(batch))
            result.rows_extracted += len(batch)

            # Source validation (after extract)
            if extract_validator:
                try:
                    batch, quarantined, error_count = (
                        await extract_validator.validate(batch, run_id=run_id)
                    )
                    result.rows_quarantined_extract += len(quarantined)
                    if error_count > 0:
                        batch_result.errors.append(
                            f"Extract validation: {error_count} record(s) invalid"
                        )
                except Exception as e:
                    # Validation error with on_error=fail
                    ctx.handle_error(str(e), e, category="extract_validation")
                    batch_result.errors.append(f"Extract validation failed: {e}")
                    result.success = False
                    result.errors.append(str(e))
                    break

            # Transform
            transformed = self._apply_transforms(batch)
            batch_result.rows_out = len(transformed)
            result.rows_transformed += len(transformed)

            # Track max watermark across batches
            if inc_config and transformed:
                wm_field = inc_config["watermark_field"]
                batch_max = max(
                    (str(r.get(wm_field, "")) for r in transformed),
                    default="",
                )
                if batch_max > (max_watermark or ""):
                    max_watermark = batch_max

            # Target validation (before load)
            if load_validator and transformed:
                try:
                    transformed, quarantined, error_count = (
                        await load_validator.validate(transformed, run_id=run_id)
                    )
                    result.rows_quarantined_load += len(quarantined)
                    if error_count > 0:
                        batch_result.errors.append(
                            f"Load validation: {error_count} record(s) invalid"
                        )
                except Exception as e:
                    # Validation error with on_error=fail
                    ctx.handle_error(str(e), e, category="load_validation")
                    batch_result.errors.append(f"Load validation failed: {e}")
                    result.success = False
                    result.errors.append(str(e))
                    break

            # Load
            load_success = True
            if not dry_run and self.loader and transformed:
                try:
                    load_result = await self.loader.load(transformed, **params)
                    load_success = load_result.success
                    if load_result.success:
                        result.rows_loaded += load_result.rows_loaded
                        if quality_checker:
                            all_loaded_records.extend(transformed)
                    else:
                        msg = load_result.error or "Load failed"
                        ctx.handle_error(msg, category="load")
                        batch_result.errors.append(msg)
                        batch_result.rows_failed += len(transformed)
                except Exception as e:
                    load_success = False
                    ctx.handle_error(str(e), e, category="load")
                    batch_result.errors.append(str(e))
                    batch_result.rows_failed += len(transformed)
            elif dry_run:
                result.rows_loaded += len(transformed)

            # Acknowledge batch for message queue extractors
            if hasattr(self.extractor, "acknowledge"):
                try:
                    await self.extractor.acknowledge(batch_index, load_success)
                except Exception as ack_err:
                    logger.warning(
                        "[%s] Batch %d ack failed: %s",
                        run_id,
                        batch_index,
                        ack_err,
                    )

            result.batches_processed += 1
            result.batch_results.append(batch_result)
            batch_index += 1

    except Exception as e:
        result.success = False
        result.errors.append(str(e))
        ctx.handle_error(str(e), e, category="pipeline")
        logger.error("[%s] Pipeline error: %s", run_id, e)

    result.end_time = datetime.now(timezone.utc)
    result.duration_seconds = (result.end_time - start_time).total_seconds()
    result.rows_failed = sum(br.rows_failed for br in result.batch_results)

    # Post-load quality checks
    if quality_checker and result.success:
        last_load_result = LoadResult(
            success=True,
            rows_loaded=result.rows_loaded,
            rows_failed=result.rows_failed,
        )
        quality_report = quality_checker.run(all_loaded_records, last_load_result)
        result.quality_report = quality_report
        if not quality_report.passed:
            result.success = False
            for failure in quality_report.hard_failures:
                result.errors.append(f"Quality check failed: {failure.message}")
        for warning in quality_report.warnings:
            logger.warning("[%s] Quality warning: %s", run_id, warning.message)

    if result.errors:
        result.success = False

    # Incremental: save updated watermark on success
    if state_store and result.success and max_watermark:
        from pycharter.etl_generator.state import PipelineState

        state_store.save(
            PipelineState(
                pipeline_name=self.name or "unnamed",
                watermark=max_watermark,
                last_run_id=run_id,
                last_run_at=datetime.now(timezone.utc).isoformat(),
                run_count=((existing_state.run_count + 1) if existing_state else 1),
            )
        )

    # Lineage: emit COMPLETE or FAIL and capture payload for response
    if emitter:
        try:
            if result.success:
                payload = emitter.complete(
                    self.name or "unnamed",
                    run_id,
                    input_ds,
                    output_ds,
                    result,
                )
            else:
                payload = emitter.fail(
                    self.name or "unnamed", run_id, input_ds, result
                )
            if payload:
                result.lineage_events.append(payload)
        except Exception:
            logger.debug("Lineage COMPLETE/FAIL emission failed", exc_info=True)

    logger.info(
        "[%s] Complete: extracted=%s, loaded=%s, "
        "quarantined_extract=%s, quarantined_load=%s",
        run_id,
        result.rows_extracted,
        result.rows_loaded,
        result.rows_quarantined_extract,
        result.rows_quarantined_load,
    )
    return result

from_config_dir classmethod

from_config_dir(
    directory: str | Path,
    variables: str | Path | dict[str, str] | None = None,
    settings: str | Path | dict[str, Any] | None = None,
    *,
    validate: bool = True,
    name: str | None = None,
    load_defaults: dict[str, Any] | None = None,
    base_dir: Path | None = None
) -> "Pipeline"

Create pipeline from a directory (extract.yaml, load.yaml, etc.).

Config at same level: directory supplies extract, load, transform, variables, settings. Optional variables/settings args override or supplement directory files.

Parameters:

Name Type Description Default
directory str | Path

Path to directory with extract.yaml, load.yaml, etc.

required
variables str | Path | dict[str, str] | None

Optional path or dict; merged with directory variables.yaml (caller wins).

None
settings str | Path | dict[str, Any] | None

Optional path or dict; merged with directory settings.yaml (caller wins).

None
validate bool

If True, validate configs.

True
name str | None

Optional pipeline name (default: directory name).

None
load_defaults dict[str, Any] | None

Merged under load config (load wins).

None
base_dir Path | None

Base for relative paths (default: directory).

None
Source code in src/pycharter/etl_generator/pipeline.py
@classmethod
def from_config_dir(
    cls,
    directory: str | Path,
    variables: str | Path | dict[str, str] | None = None,
    settings: str | Path | dict[str, Any] | None = None,
    *,
    validate: bool = True,
    name: str | None = None,
    load_defaults: dict[str, Any] | None = None,
    base_dir: Path | None = None,
) -> "Pipeline":
    """
    Create pipeline from a directory (extract.yaml, load.yaml, etc.).

    Config at same level: directory supplies extract, load, transform, variables, settings.
    Optional variables/settings args override or supplement directory files.

    Args:
        directory: Path to directory with extract.yaml, load.yaml, etc.
        variables: Optional path or dict; merged with directory variables.yaml (caller wins).
        settings: Optional path or dict; merged with directory settings.yaml (caller wins).
        validate: If True, validate configs.
        name: Optional pipeline name (default: directory name).
        load_defaults: Merged under load config (load wins).
        base_dir: Base for relative paths (default: directory).
    """
    directory = Path(directory)
    if not directory.is_dir():
        raise NotADirectoryError(f"Not a directory: {directory}")

    variables_file = directory / "variables.yaml"
    initial_vars = _load_variables_file(variables_file, {})
    variables = _resolve_variables(variables)
    variables = {**initial_vars, **variables}

    extract_file = directory / "extract.yaml"
    load_file = directory / "load.yaml"
    transform_file = directory / "transform.yaml"
    settings_file = directory / "settings.yaml"
    if not extract_file.exists():
        raise FileNotFoundError(f"Required file not found: {extract_file}")
    if not load_file.exists():
        raise FileNotFoundError(f"Required file not found: {load_file}")

    extract_config = _load_config_input(extract_file, variables)
    load_config = _load_config_input(load_file, variables)
    if load_defaults:
        load_config = {**load_defaults, **load_config}
    transform_config = (
        _load_config_input(transform_file, variables)
        if transform_file.exists()
        else {}
    )
    dir_settings = (
        _load_config_input(settings_file, variables)
        if settings_file.exists()
        else {}
    )
    if not isinstance(dir_settings, dict):
        dir_settings = {}
    settings_config = {**dir_settings, **_resolve_settings(settings, variables)}

    # Extract validation configs from extract/load
    extract_validation_config = (
        extract_config.pop("validation", None)
        if isinstance(extract_config, dict)
        else None
    )
    load_validation_config = (
        load_config.pop("validation", None)
        if isinstance(load_config, dict)
        else None
    )
    quality_checks_config = (
        load_config.pop("quality_checks", None)
        if isinstance(load_config, dict)
        else None
    )

    return cls._build_from_configs(
        extract_config=extract_config,
        transform_config=transform_config,
        load_config=load_config,
        variables=variables,
        validate=validate,
        name=name or directory.name,
        extract_validation_config=extract_validation_config,
        load_validation_config=load_validation_config,
        quality_checks_config=quality_checks_config,
        settings=settings_config,
        base_dir=(base_dir or directory).resolve(),
    )

from_config_files classmethod

from_config_files(
    extract: str | Path | dict[str, Any],
    load: str | Path | dict[str, Any],
    transform: (
        str
        | Path
        | dict[str, Any]
        | list[dict[str, Any]]
        | None
    ) = None,
    variables: str | Path | dict[str, str] | None = None,
    settings: str | Path | dict[str, Any] | None = None,
    *,
    validate: bool = True,
    name: str | None = None,
    load_defaults: dict[str, Any] | None = None,
    base_dir: Path | None = None
) -> "Pipeline"

Create pipeline from explicit file paths or dictionaries.

Config inputs (same level): extract, load, transform, variables, settings. Each can be a path or dict; variables are used to resolve ${VAR} in others.

Parameters:

Name Type Description Default
extract str | Path | dict[str, Any]

Path or dict for extract config.

required
load str | Path | dict[str, Any]

Path or dict for load config.

required
transform str | Path | dict[str, Any] | list[dict[str, Any]] | None

Optional path or dict/list for transform config.

None
variables str | Path | dict[str, str] | None

Optional path or dict for ${VAR} substitution.

None
settings str | Path | dict[str, Any] | None

Optional path or dict for shared settings (DLQ, metadata_store, etc.).

None
validate bool

If True, validate configs against schemas.

True
name str | None

Optional pipeline name.

None
load_defaults dict[str, Any] | None

Merged under load config (load wins).

None
base_dir Path | None

Base for relative paths; default from extract path when possible.

None
Source code in src/pycharter/etl_generator/pipeline.py
@classmethod
def from_config_files(
    cls,
    extract: str | Path | dict[str, Any],
    load: str | Path | dict[str, Any],
    transform: str | Path | dict[str, Any] | list[dict[str, Any]] | None = None,
    variables: str | Path | dict[str, str] | None = None,
    settings: str | Path | dict[str, Any] | None = None,
    *,
    validate: bool = True,
    name: str | None = None,
    load_defaults: dict[str, Any] | None = None,
    base_dir: Path | None = None,
) -> "Pipeline":
    """
    Create pipeline from explicit file paths or dictionaries.

    Config inputs (same level): extract, load, transform, variables, settings.
    Each can be a path or dict; variables are used to resolve ${VAR} in others.

    Args:
        extract: Path or dict for extract config.
        load: Path or dict for load config.
        transform: Optional path or dict/list for transform config.
        variables: Optional path or dict for ${VAR} substitution.
        settings: Optional path or dict for shared settings (DLQ, metadata_store, etc.).
        validate: If True, validate configs against schemas.
        name: Optional pipeline name.
        load_defaults: Merged under load config (load wins).
        base_dir: Base for relative paths; default from extract path when possible.
    """
    variables = _resolve_variables(variables)
    if base_dir is None and isinstance(extract, (str, Path)):
        base_dir = Path(extract).parent.resolve()

    extract_config = _load_config_input(extract, variables)
    load_config = _load_config_input(load, variables)
    if load_defaults:
        load_config = {**load_defaults, **load_config}
    transform_config = (
        _load_config_input(transform, variables) if transform is not None else {}
    )
    settings_config = _resolve_settings(settings, variables)

    # Extract validation configs from extract/load
    extract_validation_config = (
        extract_config.pop("validation", None)
        if isinstance(extract_config, dict)
        else None
    )
    load_validation_config = (
        load_config.pop("validation", None)
        if isinstance(load_config, dict)
        else None
    )
    quality_checks_config = (
        load_config.pop("quality_checks", None)
        if isinstance(load_config, dict)
        else None
    )

    return cls._build_from_configs(
        extract_config=extract_config,
        transform_config=transform_config,
        load_config=load_config,
        variables=variables,
        validate=validate,
        name=name,
        extract_validation_config=extract_validation_config,
        load_validation_config=load_validation_config,
        quality_checks_config=quality_checks_config,
        settings=settings_config,
        base_dir=base_dir,
    )

from_config_file classmethod

from_config_file(
    path: str | Path,
    *,
    variables: dict[str, str] | None = None,
    validate: bool = True,
    name: str | None = None,
    load_defaults: dict[str, Any] | None = None,
    base_dir: Path | None = None
) -> "Pipeline"

Create pipeline from a single config file (extract, load, transform, variables, settings as keys).

Parameters:

Name Type Description Default
path str | Path

Path to pipeline YAML.

required
variables dict[str, str] | None

Optional overlay; merged with file variables (caller wins).

None
validate bool

If True, validate config.

True
name str | None

Override name from file.

None
load_defaults dict[str, Any] | None

Merged under load config (load wins).

None
base_dir Path | None

Base for relative paths (default: path.parent).

None
Source code in src/pycharter/etl_generator/pipeline.py
@classmethod
def from_config_file(
    cls,
    path: str | Path,
    *,
    variables: dict[str, str] | None = None,
    validate: bool = True,
    name: str | None = None,
    load_defaults: dict[str, Any] | None = None,
    base_dir: Path | None = None,
) -> "Pipeline":
    """
    Create pipeline from a single config file (extract, load, transform, variables, settings as keys).

    Args:
        path: Path to pipeline YAML.
        variables: Optional overlay; merged with file variables (caller wins).
        validate: If True, validate config.
        name: Override name from file.
        load_defaults: Merged under load config (load wins).
        base_dir: Base for relative paths (default: path.parent).
    """
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"Config file not found: {path}")
    if not path.is_file():
        raise ValueError(
            f"Not a file: {path}. Use from_config_dir() for directories."
        )
    with open(path) as f:
        raw_content = f.read()
    raw_config = yaml.safe_load(raw_content) or {}
    file_vars = {}
    if isinstance(raw_config.get("variables"), dict):
        file_vars = {k: str(v) for k, v in raw_config["variables"].items()}
    variables = {**file_vars, **(variables or {})}
    context = PipelineContext(variables=variables)
    resolved_content = context.resolve(raw_content)
    config = yaml.safe_load(resolved_content) or {}
    if "extract" not in config:
        raise ValueError(f"Config file missing 'extract' section: {path}")
    if "load" not in config:
        raise ValueError(f"Config file missing 'load' section: {path}")

    extract_config = config["extract"]
    load_config = config["load"]
    if load_defaults:
        load_config = {**load_defaults, **load_config}
    extract_validation_config = (
        extract_config.pop("validation", None)
        if isinstance(extract_config, dict)
        else None
    )
    load_validation_config = (
        load_config.pop("validation", None)
        if isinstance(load_config, dict)
        else None
    )
    quality_checks_config = (
        load_config.pop("quality_checks", None)
        if isinstance(load_config, dict)
        else None
    )
    settings = {}
    for key in ("metadata_store", "dlq", "contract", "lineage"):
        if config.get(key):
            settings[key] = config[key]

    return cls._build_from_configs(
        extract_config=extract_config,
        transform_config=config.get("transform", {}),
        load_config=load_config,
        variables=variables,
        validate=validate,
        name=name if name is not None else config.get("name"),
        extract_validation_config=extract_validation_config,
        load_validation_config=load_validation_config,
        quality_checks_config=quality_checks_config,
        settings=settings,
        base_dir=(base_dir or path.parent).resolve(),
    )

Factory Methods

from_config_dir

Load pipeline from a directory containing extract.yaml, transform.yaml, and load.yaml:

pipeline = Pipeline.from_config_dir("pipelines/users/")

from_config_files

Load from explicit file paths:

pipeline = Pipeline.from_config_files(
    extract="configs/extract.yaml",
    transform="configs/transform.yaml",  # Optional
    load="configs/load.yaml",
    variables={"API_KEY": "secret"}
)

from_config_file

Load from a single combined config file:

pipeline = Pipeline.from_config_file("pipeline.yaml")

PipelineResult

PipelineResult dataclass

PipelineResult(
    success: bool = True,
    rows_extracted: int = 0,
    rows_transformed: int = 0,
    rows_loaded: int = 0,
    rows_failed: int = 0,
    rows_quarantined_extract: int = 0,
    rows_quarantined_load: int = 0,
    validation_errors_extract: list[str] = list(),
    validation_errors_load: list[str] = list(),
    start_time: datetime | None = None,
    end_time: datetime | None = None,
    duration_seconds: float | None = None,
    batches_processed: int = 0,
    batch_results: list[BatchResult] = list(),
    errors: list[str] = list(),
    pipeline_name: str | None = None,
    run_id: str | None = None,
    quality_report: Any = None,
    lineage_events: list[dict[str, Any]] = list(),
)

Complete result from running an ETL pipeline.

success class-attribute instance-attribute

success: bool = True

rows_extracted class-attribute instance-attribute

rows_extracted: int = 0

rows_transformed class-attribute instance-attribute

rows_transformed: int = 0

rows_loaded class-attribute instance-attribute

rows_loaded: int = 0

rows_failed class-attribute instance-attribute

rows_failed: int = 0

duration_seconds class-attribute instance-attribute

duration_seconds: float | None = None

errors class-attribute instance-attribute

errors: list[str] = field(default_factory=list)

Examples

Basic Pipeline

import asyncio
from pycharter import Pipeline, FileExtractor, FileLoader

pipeline = (
    Pipeline(FileExtractor(path="input.json"))
    | FileLoader(path="output.json")
)

result = asyncio.run(pipeline.run())
print(f"Loaded {result.rows_loaded} rows")

With Transformations

from pycharter import Pipeline, HTTPExtractor, PostgresLoader, Rename, Filter, AddField

pipeline = (
    Pipeline(HTTPExtractor(url="https://api.example.com/users"))
    | Rename({"userName": "user_name"})
    | Filter(lambda r: r.get("active"))
    | AddField("processed_at", "now()")
    | PostgresLoader(connection_string="...", table="users")
)

Error Handling

from pycharter.shared.errors import ErrorMode, ErrorContext

# Collect errors instead of raising
result = await pipeline.run(
    error_context=ErrorContext(mode=ErrorMode.COLLECT)
)

if result.errors:
    for error in result.errors:
        print(f"Error: {error}")

With Variables

pipeline = Pipeline.from_config_dir(
    "pipelines/users/",
    variables={
        "API_KEY": os.environ["API_KEY"],
        "OUTPUT_PATH": "/data/output.json"
    }
)

Message Queue Acknowledgment

When a pipeline uses a messaging extractor (Kafka, RabbitMQ, SQS), the pipeline automatically acknowledges each batch after loading:

from pycharter.etl_generator.extractors import KafkaExtractor

pipeline = (
    Pipeline(KafkaExtractor(topics=["orders"], consumer_group="etl"))
    | Rename({"orderId": "order_id"})
    | PostgresLoader(connection_string="...", table="orders")
)

# Pipeline calls extractor.acknowledge() after each batch load
result = await pipeline.run()

# Don't forget to close the consumer
await pipeline.extractor.close()

The pipeline determines success from the LoadResult:

Scenario Acknowledge call
Load succeeds acknowledge(batch_index, success=True)
Load fails acknowledge(batch_index, success=False)
Dry run acknowledge(batch_index, success=True)
No loader set acknowledge(batch_index, success=True)

If acknowledge() raises an exception, it is logged as a warning and the pipeline continues.

Incremental Extraction

Track a watermark field across runs so only new/updated records are extracted:

extract.yaml
type: database
query: "SELECT * FROM events WHERE updated_at > '${watermark}'"
connection_string: "${DATABASE_URL}"
incremental:
  enabled: true
  watermark_field: updated_at
  initial_value: "2024-01-01"
settings.yaml
state_store:
  backend: file      # or "sqlite"
  path: ./.state

The pipeline persists the highest watermark value on success and injects it into the next run.

Testing Utilities

PyCharter provides mock classes and assertion helpers for testing pipelines without real I/O:

from pycharter import MockExtractor, MockLoader, PipelineTestHarness

# Mock extractor yields fixture data
extractor = MockExtractor(data=[
    [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
])

# Mock loader captures what was loaded
loader = MockLoader()

pipeline = Pipeline(extractor) | Rename({"name": "full_name"}) | loader
result = await pipeline.run()

# Assert on captured data
from pycharter import assert_record_count, assert_fields_present
assert_record_count(loader.loaded_data, 2)
assert_fields_present(loader.loaded_data, ["id", "full_name"])

See Also