Building ETL Pipelines¶
Learn to build production-ready data pipelines with PyCharter's ETL system.
What You'll Learn¶
- Create pipelines with the
|operator - Use built-in extractors (HTTP, file, database, cloud)
- Apply transformations (rename, filter, add fields)
- Load data to various destinations
- Handle errors and retries
- Use config-driven pipelines
Prerequisites¶
Part 1: Your First Pipeline¶
The Pipe Operator¶
PyCharter uses the | (pipe) operator to chain pipeline stages:
import asyncio
from pycharter import Pipeline, HTTPExtractor, FileLoader, Rename
# Build a simple pipeline
pipeline = (
Pipeline(HTTPExtractor(url="https://jsonplaceholder.typicode.com/users"))
| Rename({"username": "user_name"})
| FileLoader(path="output/users.json", file_format="json")
)
# Run it
result = asyncio.run(pipeline.run())
print(f"Loaded {result.rows_loaded} rows in {result.duration_seconds:.2f}s")
Understanding the Result¶
The PipelineResult contains execution metrics:
result = asyncio.run(pipeline.run())
print(f"Rows extracted: {result.rows_extracted}")
print(f"Rows transformed: {result.rows_transformed}")
print(f"Rows loaded: {result.rows_loaded}")
print(f"Rows failed: {result.rows_failed}")
print(f"Duration: {result.duration_seconds:.2f}s")
print(f"Success: {result.success}")
if result.errors:
for error in result.errors:
print(f"Error: {error}")
Part 2: Extractors¶
HTTP Extractor¶
Extract data from REST APIs:
from pycharter import HTTPExtractor
# Simple GET request
extractor = HTTPExtractor(
url="https://api.example.com/users",
method="GET",
headers={"Authorization": "Bearer ${API_KEY}"}
)
# With pagination
extractor = HTTPExtractor(
url="https://api.example.com/users",
pagination={
"type": "offset",
"param": "offset",
"limit": 100
}
)
# POST with body
extractor = HTTPExtractor(
url="https://api.example.com/search",
method="POST",
body={"query": "active users"}
)
File Extractor¶
Extract from local files:
from pycharter import FileExtractor
# JSON file
extractor = FileExtractor(path="data/users.json", file_format="json")
# CSV file
extractor = FileExtractor(path="data/users.csv", file_format="csv")
# Parquet file
extractor = FileExtractor(path="data/users.parquet", file_format="parquet")
# Glob pattern (multiple files)
extractor = FileExtractor(path="data/*.json", file_format="json")
Database Extractor¶
Extract from SQL databases:
from pycharter import DatabaseExtractor
extractor = DatabaseExtractor(
connection_string="postgresql://user:pass@localhost/db",
query="SELECT * FROM users WHERE active = true"
)
# With parameters
extractor = DatabaseExtractor(
connection_string="postgresql://user:pass@localhost/db",
query="SELECT * FROM users WHERE created_at > %(start_date)s",
params={"start_date": "2024-01-01"}
)
Cloud Storage Extractor¶
Extract from S3, GCS, or Azure:
from pycharter import CloudStorageExtractor
# AWS S3
extractor = CloudStorageExtractor(
provider="s3",
bucket="my-bucket",
path="data/users.json",
credentials={
"aws_access_key_id": "${AWS_KEY}",
"aws_secret_access_key": "${AWS_SECRET}"
}
)
# Google Cloud Storage
extractor = CloudStorageExtractor(
provider="gcs",
bucket="my-bucket",
path="data/users.json"
)
# Azure Blob Storage
extractor = CloudStorageExtractor(
provider="azure",
container="my-container",
path="data/users.json"
)
Part 3: Transformers¶
Chain multiple transformers to process your data:
Rename Fields¶
from pycharter import Rename
# Rename single field
transform = Rename({"old_name": "new_name"})
# Rename multiple fields
transform = Rename({
"userName": "user_name",
"userEmail": "email",
"createdAt": "created_at"
})
Filter Records¶
from pycharter import Filter
# Lambda filter
transform = Filter(lambda r: r.get("active", False))
# Multiple conditions
transform = Filter(lambda r: r["age"] >= 18 and r["status"] == "active")
Add Fields¶
from pycharter import AddField
# Static value
transform = AddField("source", "api")
# Dynamic expression
transform = AddField("processed_at", "now()") # Current timestamp
transform = AddField("full_name", "concat(first_name, ' ', last_name)")
# From other fields
transform = AddField("is_adult", "age >= 18")
Drop Fields¶
from pycharter import Drop
# Drop single field
transform = Drop(["password"])
# Drop multiple fields
transform = Drop(["password", "ssn", "internal_id"])
Select Fields¶
Convert Types¶
from pycharter import Convert
# Convert field types
transform = Convert({
"age": "int",
"price": "float",
"active": "bool",
"created_at": "datetime"
})
Custom Function¶
from pycharter import CustomFunction
# Apply custom logic
def enrich_user(record):
record["display_name"] = f"{record['first_name']} {record['last_name']}"
record["email_domain"] = record["email"].split("@")[1]
return record
transform = CustomFunction(enrich_user)
Chaining Transformers¶
pipeline = (
Pipeline(extractor)
| Rename({"userName": "user_name"})
| Filter(lambda r: r.get("active"))
| AddField("processed_at", "now()")
| Drop(["password", "ssn"])
| Convert({"age": "int"})
| loader
)
Part 4: Loaders¶
PostgreSQL Loader¶
from pycharter import PostgresLoader
loader = PostgresLoader(
connection_string="postgresql://user:pass@localhost/db",
table="users",
schema="public",
mode="upsert", # insert, upsert, replace
conflict_columns=["id"]
)
File Loader¶
from pycharter import FileLoader
# JSON output
loader = FileLoader(path="output/users.json", file_format="json")
# CSV output
loader = FileLoader(path="output/users.csv", file_format="csv")
# JSONL (JSON Lines) - good for streaming
loader = FileLoader(path="output/users.jsonl", file_format="jsonl")
# Parquet (compressed, columnar)
loader = FileLoader(path="output/users.parquet", file_format="parquet")
Cloud Storage Loader¶
from pycharter import CloudStorageLoader
loader = CloudStorageLoader(
provider="s3",
bucket="my-bucket",
path="output/users.json",
file_format="json"
)
Part 5: Config-Driven Pipelines¶
Define pipelines in YAML for easier management:
Directory Structure¶
Extract Config¶
type: http
url: https://api.example.com/users
method: GET
headers:
Authorization: "Bearer ${API_KEY}"
pagination:
type: offset
param: page
limit: 100
Transform Config¶
rename:
userName: user_name
userEmail: email
filter:
expression: "active == true"
add:
processed_at: "now()"
source: "api"
drop:
- password
- internal_id
Load Config¶
type: postgres
connection_string: "${DATABASE_URL}"
table: users
schema: public
mode: upsert
conflict_columns:
- id
Run Config-Driven Pipeline¶
import asyncio
from pycharter import Pipeline
# From directory
pipeline = Pipeline.from_config_dir("pipelines/users/")
# With variables
pipeline = Pipeline.from_config_dir(
"pipelines/users/",
variables={
"API_KEY": "your-api-key",
"DATABASE_URL": "postgresql://..."
}
)
result = asyncio.run(pipeline.run())
Part 6: Error Handling¶
Error Modes¶
from pycharter.shared.errors import ErrorMode, ErrorContext
# Strict: stop on first error (default)
result = await pipeline.run(
error_context=ErrorContext(mode=ErrorMode.STRICT)
)
# Lenient: log errors and continue
result = await pipeline.run(
error_context=ErrorContext(mode=ErrorMode.LENIENT)
)
# Collect: gather all errors without stopping
result = await pipeline.run(
error_context=ErrorContext(mode=ErrorMode.COLLECT)
)
# Check collected errors
if result.errors:
for error in result.errors:
print(f"Error: {error}")
Try-Except Pattern¶
from pycharter.shared.errors import PyCharterError, ConfigError
try:
pipeline = Pipeline.from_config_dir("pipelines/users/")
result = asyncio.run(pipeline.run())
except ConfigError as e:
print(f"Configuration error: {e}")
except PyCharterError as e:
print(f"Pipeline error: {e}")
Part 7: Advanced Patterns¶
Validation During Load¶
Add contract validation before loading:
from pycharter import Pipeline, Validator
validator = Validator.from_file("contracts/user.yaml")
def validate_and_filter(records):
valid_records = []
for record in records:
result = validator.validate(record)
if result.is_valid:
valid_records.append(result.data.model_dump())
return valid_records
pipeline = (
Pipeline(extractor)
| CustomFunction(validate_and_filter)
| loader
)
Bulk (multi-parameter) runs¶
When your API uses a path parameter (e.g. {symbol}) and you want to run the same pipeline for many values (e.g. many symbols), use the config-driven orchestrator’s run_bulk():
import asyncio
from pycharter import ETLOrchestrator
orchestrator = ETLOrchestrator(contract_dir="pipelines/barometer/")
# Option 1: Pass values explicitly
result = await orchestrator.run_bulk(
param_name="symbol",
param_values=["AAPL", "MSFT", "GOOGL"],
concurrency=20,
max_calls_per_minute=300,
start_date="2024-01-01",
end_date="2024-01-31",
)
# Option 2: Use default_param_values from extract.yaml (no param_values needed)
result = await orchestrator.run_bulk(
start_date="2024-01-01",
end_date="2024-01-31",
)
print(f"Extracted {result['extraction']['total_records']} records")
print(f"Loaded {result['loading']['total']} rows")
In extract.yaml you can define default lists so CLI or schedulers don’t need to pass symbols:
api_endpoint: /v2/aggs/ticker/{symbol}/range/1/day/{start_date}/{end_date}
default_param_values:
symbol:
- AAPL
- MSFT
- GOOGL
The HTTP extractor automatically injects each path parameter (e.g. symbol) into every extracted record, so transforms and loaders see a symbol field on each row without extra code.
Parallel Extraction¶
Extract from multiple sources:
import asyncio
from pycharter import Pipeline
# Create multiple pipelines
pipeline1 = Pipeline.from_config_dir("pipelines/source1/")
pipeline2 = Pipeline.from_config_dir("pipelines/source2/")
# Run in parallel
async def run_parallel():
results = await asyncio.gather(
pipeline1.run(),
pipeline2.run()
)
return results
results = asyncio.run(run_parallel())
Incremental Loading¶
Track progress for incremental loads:
from datetime import datetime, timedelta
# Get last run timestamp (from your tracking system)
last_run = datetime(2024, 1, 1)
pipeline = Pipeline.from_config_files(
extract="extract.yaml",
load="load.yaml",
variables={
"LAST_RUN": last_run.isoformat(),
"CURRENT_RUN": datetime.now().isoformat()
}
)
Exercises¶
-
Basic Pipeline: Create a pipeline that extracts users from JSONPlaceholder API, filters for users with
.comemails, and saves to a JSON file. -
Transform Pipeline: Build a pipeline that reads a CSV file, converts all field names to snake_case, adds a timestamp, and writes to JSONL.
-
Database Pipeline: Create a pipeline that extracts from one PostgreSQL table, transforms the data, and loads to another table.
-
Config-Driven: Convert one of the above pipelines to use YAML configuration files.
Next Steps¶
- Data Contracts & Validation - Add validation to your pipelines
- Quality Monitoring - Monitor pipeline data quality
- API Reference: Extractors - Complete extractor documentation
- API Reference: Transformers - Complete transformer documentation