PyCharter Migration Guide¶
This guide helps you migrate from the legacy ETLOrchestrator API to the new Pipeline API.
Why Migrate?¶
The new API provides:
- Pipeline with
|operator for intuitive chaining - Chainable transformers that compose naturally
- PipelineContext with auto
CONTRACT_DIRinjection - Fluent PipelineBuilder for step-by-step construction
- Protocol-based design for extensibility
Quick Comparison¶
Legacy API¶
from pycharter.etl_generator import ETLOrchestrator
orchestrator = ETLOrchestrator(
contract_dir="data/contracts/users",
config_context={"TARGET_DATABASE_URL": "postgresql://..."}
)
result = await orchestrator.run()
print(result.get("status"))
print(result.get("rows_loaded"))
New API¶
from pycharter import Pipeline, PipelineContext
from pycharter import HTTPExtractor, PostgresLoader
from pycharter import Rename, Filter
# Build with | operator
pipeline = (
Pipeline(HTTPExtractor(url="https://api.example.com/users"))
| Rename({"user_name": "name"})
| Filter(lambda r: r["active"])
| PostgresLoader(connection_string="...", table="users")
)
result = await pipeline.run()
print(result.status) # Enum, not string
print(result.rows_loaded) # Direct attribute
print(result.duration_seconds)
Migration Steps¶
1. Replace ETLOrchestrator with Pipeline¶
Before:
orchestrator = ETLOrchestrator(contract_dir="data/contracts/users")
result = await orchestrator.run()
After (from config):
After (programmatic):
pipeline = (
Pipeline(FileExtractor(path="data.json"))
| Rename(...)
| PostgresLoader(...)
)
result = await pipeline.run()
2. Use Chainable Transformers¶
Before (transform.yaml):
rename:
old_name: new_name
old_email: email
add:
processed: true
filter:
field: active
operator: eq
value: true
After (Python):
from pycharter import Rename, AddField, Filter
transforms = (
Rename({"old_name": "new_name", "old_email": "email"})
| AddField("processed", True)
| Filter(lambda r: r["active"])
)
3. Use PipelineContext¶
Before:
orchestrator = ETLOrchestrator(
contract_dir="data/contracts",
config_context={
"TARGET_DATABASE_URL": "postgresql://...",
"CONTRACT_DIR": "/path/to/contract",
}
)
After:
from pycharter import PipelineContext
context = PipelineContext(
contract_dir="/path/to/contract", # Auto-sets CONTRACT_DIR
TARGET_DATABASE_URL="postgresql://...",
)
# Variables resolve automatically
resolved = context.resolve("${CONTRACT_DIR}/data.json")
4. Use PipelineBuilder (Fluent API)¶
from pycharter import PipelineBuilder
pipeline = (
PipelineBuilder()
.name("user_pipeline")
.with_context(context)
.extract_from("http", url="https://api.example.com/users")
.transform(Rename({"old": "new"}))
.transform(Filter(lambda r: r["active"]))
.load_to("postgres", table="users", connection_string="...")
.build()
)
result = await pipeline.run()
Transformer Mapping¶
| YAML Config | New Transformer |
|---|---|
rename: {old: new} |
Rename({"old": "new"}) |
add: {field: value} |
AddField("field", value) |
drop: [field1, field2] |
Drop(["field1", "field2"]) |
select: [field1, field2] |
Select(["field1", "field2"]) |
convert: {field: type} |
Convert({"field": int}) |
defaults: {field: value} |
Default({"field": value}) |
filter: ... |
Filter(lambda r: ...) |
custom_function: ... |
CustomFunction(module="...", function="...") |
Extractor Mapping¶
| Extract Type | New Extractor |
|---|---|
| HTTP API | HTTPExtractor(url="...", headers={}) |
| File | FileExtractor(path="...") |
| Database | DatabaseExtractor(connection_string="...", query="...") |
| S3/Cloud | CloudStorageExtractor(bucket="...", key="...") |
Loader Mapping¶
| Load Type | New Loader |
|---|---|
| PostgreSQL | PostgresLoader(connection_string="...", table="...") |
| File | FileLoader(path="...") |
| S3/Cloud | CloudStorageLoader(bucket="...", key="...") |
Backward Compatibility¶
The legacy API is fully preserved:
# Still works
from pycharter.etl_generator import ETLOrchestrator
orchestrator = ETLOrchestrator(contract_dir="...")
result = await orchestrator.run()
You can: - Continue using YAML configuration files - Mix old and new APIs - Migrate gradually
Result Object Changes¶
Before:
After:
result = await pipeline.run()
status = result.status # Direct attribute
is_ok = result.success # Boolean
rows = result.rows_loaded # Direct attribute
time = result.duration_seconds # Timing info
errors = result.errors # List of errors
# Export
d = result.to_dict()
Questions?¶
If you have questions about migration, please open an issue on GitHub.