Skip to content

PyCharter Migration Guide

This guide helps you migrate from the legacy ETLOrchestrator API to the new Pipeline API.

Why Migrate?

The new API provides:

  • Pipeline with | operator for intuitive chaining
  • Chainable transformers that compose naturally
  • PipelineContext with auto CONTRACT_DIR injection
  • Fluent PipelineBuilder for step-by-step construction
  • Protocol-based design for extensibility

Quick Comparison

Legacy API

from pycharter.etl_generator import ETLOrchestrator

orchestrator = ETLOrchestrator(
    contract_dir="data/contracts/users",
    config_context={"TARGET_DATABASE_URL": "postgresql://..."}
)
result = await orchestrator.run()

print(result.get("status"))
print(result.get("rows_loaded"))

New API

from pycharter import Pipeline, PipelineContext
from pycharter import HTTPExtractor, PostgresLoader
from pycharter import Rename, Filter

# Build with | operator
pipeline = (
    Pipeline(HTTPExtractor(url="https://api.example.com/users"))
    | Rename({"user_name": "name"})
    | Filter(lambda r: r["active"])
    | PostgresLoader(connection_string="...", table="users")
)

result = await pipeline.run()

print(result.status)          # Enum, not string
print(result.rows_loaded)     # Direct attribute
print(result.duration_seconds)

Migration Steps

1. Replace ETLOrchestrator with Pipeline

Before:

orchestrator = ETLOrchestrator(contract_dir="data/contracts/users")
result = await orchestrator.run()

After (from config):

pipeline = Pipeline.from_config("data/contracts/users")
result = await pipeline.run()

After (programmatic):

pipeline = (
    Pipeline(FileExtractor(path="data.json"))
    | Rename(...)
    | PostgresLoader(...)
)
result = await pipeline.run()

2. Use Chainable Transformers

Before (transform.yaml):

rename:
  old_name: new_name
  old_email: email

add:
  processed: true

filter:
  field: active
  operator: eq
  value: true

After (Python):

from pycharter import Rename, AddField, Filter

transforms = (
    Rename({"old_name": "new_name", "old_email": "email"})
    | AddField("processed", True)
    | Filter(lambda r: r["active"])
)

3. Use PipelineContext

Before:

orchestrator = ETLOrchestrator(
    contract_dir="data/contracts",
    config_context={
        "TARGET_DATABASE_URL": "postgresql://...",
        "CONTRACT_DIR": "/path/to/contract",
    }
)

After:

from pycharter import PipelineContext

context = PipelineContext(
    contract_dir="/path/to/contract",  # Auto-sets CONTRACT_DIR
    TARGET_DATABASE_URL="postgresql://...",
)

# Variables resolve automatically
resolved = context.resolve("${CONTRACT_DIR}/data.json")

4. Use PipelineBuilder (Fluent API)

from pycharter import PipelineBuilder

pipeline = (
    PipelineBuilder()
    .name("user_pipeline")
    .with_context(context)
    .extract_from("http", url="https://api.example.com/users")
    .transform(Rename({"old": "new"}))
    .transform(Filter(lambda r: r["active"]))
    .load_to("postgres", table="users", connection_string="...")
    .build()
)

result = await pipeline.run()

Transformer Mapping

YAML Config New Transformer
rename: {old: new} Rename({"old": "new"})
add: {field: value} AddField("field", value)
drop: [field1, field2] Drop(["field1", "field2"])
select: [field1, field2] Select(["field1", "field2"])
convert: {field: type} Convert({"field": int})
defaults: {field: value} Default({"field": value})
filter: ... Filter(lambda r: ...)
custom_function: ... CustomFunction(module="...", function="...")

Extractor Mapping

Extract Type New Extractor
HTTP API HTTPExtractor(url="...", headers={})
File FileExtractor(path="...")
Database DatabaseExtractor(connection_string="...", query="...")
S3/Cloud CloudStorageExtractor(bucket="...", key="...")

Loader Mapping

Load Type New Loader
PostgreSQL PostgresLoader(connection_string="...", table="...")
File FileLoader(path="...")
S3/Cloud CloudStorageLoader(bucket="...", key="...")

Backward Compatibility

The legacy API is fully preserved:

# Still works
from pycharter.etl_generator import ETLOrchestrator

orchestrator = ETLOrchestrator(contract_dir="...")
result = await orchestrator.run()

You can: - Continue using YAML configuration files - Mix old and new APIs - Migrate gradually

Result Object Changes

Before:

result = await orchestrator.run()
status = result.get("status")
rows = result.get("rows_loaded")

After:

result = await pipeline.run()
status = result.status           # Direct attribute
is_ok = result.success           # Boolean
rows = result.rows_loaded        # Direct attribute
time = result.duration_seconds   # Timing info
errors = result.errors           # List of errors

# Export
d = result.to_dict()

Questions?

If you have questions about migration, please open an issue on GitHub.