Skip to content

Building ETL Pipelines

Learn to build production-ready data pipelines with PyCharter's ETL system.

What You'll Learn

  • Create pipelines with the | operator
  • Use built-in extractors (HTTP, file, database, cloud)
  • Apply transformations (rename, filter, add fields)
  • Load data to various destinations
  • Handle errors and retries
  • Use config-driven pipelines

Prerequisites

pip install pycharter[etl]

Part 1: Your First Pipeline

The Pipe Operator

PyCharter uses the | (pipe) operator to chain pipeline stages:

import asyncio
from pycharter import Pipeline, HTTPExtractor, FileLoader, Rename

# Build a simple pipeline
pipeline = (
    Pipeline(HTTPExtractor(url="https://jsonplaceholder.typicode.com/users"))
    | Rename({"username": "user_name"})
    | FileLoader(path="output/users.json", file_format="json")
)

# Run it
result = asyncio.run(pipeline.run())
print(f"Loaded {result.rows_loaded} rows in {result.duration_seconds:.2f}s")

Understanding the Result

The PipelineResult contains execution metrics:

result = asyncio.run(pipeline.run())

print(f"Rows extracted: {result.rows_extracted}")
print(f"Rows transformed: {result.rows_transformed}")
print(f"Rows loaded: {result.rows_loaded}")
print(f"Rows failed: {result.rows_failed}")
print(f"Duration: {result.duration_seconds:.2f}s")
print(f"Success: {result.success}")

if result.errors:
    for error in result.errors:
        print(f"Error: {error}")

Part 2: Extractors

HTTP Extractor

Extract data from REST APIs:

from pycharter import HTTPExtractor

# Simple GET request
extractor = HTTPExtractor(
    url="https://api.example.com/users",
    method="GET",
    headers={"Authorization": "Bearer ${API_KEY}"}
)

# With pagination
extractor = HTTPExtractor(
    url="https://api.example.com/users",
    pagination={
        "type": "offset",
        "param": "offset",
        "limit": 100
    }
)

# POST with body
extractor = HTTPExtractor(
    url="https://api.example.com/search",
    method="POST",
    body={"query": "active users"}
)

File Extractor

Extract from local files:

from pycharter import FileExtractor

# JSON file
extractor = FileExtractor(path="data/users.json", file_format="json")

# CSV file
extractor = FileExtractor(path="data/users.csv", file_format="csv")

# Parquet file
extractor = FileExtractor(path="data/users.parquet", file_format="parquet")

# Glob pattern (multiple files)
extractor = FileExtractor(path="data/*.json", file_format="json")

Database Extractor

Extract from SQL databases:

from pycharter import DatabaseExtractor

extractor = DatabaseExtractor(
    connection_string="postgresql://user:pass@localhost/db",
    query="SELECT * FROM users WHERE active = true"
)

# With parameters
extractor = DatabaseExtractor(
    connection_string="postgresql://user:pass@localhost/db",
    query="SELECT * FROM users WHERE created_at > %(start_date)s",
    params={"start_date": "2024-01-01"}
)

Cloud Storage Extractor

Extract from S3, GCS, or Azure:

from pycharter import CloudStorageExtractor

# AWS S3
extractor = CloudStorageExtractor(
    provider="s3",
    bucket="my-bucket",
    path="data/users.json",
    credentials={
        "aws_access_key_id": "${AWS_KEY}",
        "aws_secret_access_key": "${AWS_SECRET}"
    }
)

# Google Cloud Storage
extractor = CloudStorageExtractor(
    provider="gcs",
    bucket="my-bucket",
    path="data/users.json"
)

# Azure Blob Storage
extractor = CloudStorageExtractor(
    provider="azure",
    container="my-container",
    path="data/users.json"
)

Part 3: Transformers

Chain multiple transformers to process your data:

Rename Fields

from pycharter import Rename

# Rename single field
transform = Rename({"old_name": "new_name"})

# Rename multiple fields
transform = Rename({
    "userName": "user_name",
    "userEmail": "email",
    "createdAt": "created_at"
})

Filter Records

from pycharter import Filter

# Lambda filter
transform = Filter(lambda r: r.get("active", False))

# Multiple conditions
transform = Filter(lambda r: r["age"] >= 18 and r["status"] == "active")

Add Fields

from pycharter import AddField

# Static value
transform = AddField("source", "api")

# Dynamic expression
transform = AddField("processed_at", "now()")  # Current timestamp
transform = AddField("full_name", "concat(first_name, ' ', last_name)")

# From other fields
transform = AddField("is_adult", "age >= 18")

Drop Fields

from pycharter import Drop

# Drop single field
transform = Drop(["password"])

# Drop multiple fields
transform = Drop(["password", "ssn", "internal_id"])

Select Fields

from pycharter import Select

# Keep only these fields
transform = Select(["id", "name", "email"])

Convert Types

from pycharter import Convert

# Convert field types
transform = Convert({
    "age": "int",
    "price": "float",
    "active": "bool",
    "created_at": "datetime"
})

Custom Function

from pycharter import CustomFunction

# Apply custom logic
def enrich_user(record):
    record["display_name"] = f"{record['first_name']} {record['last_name']}"
    record["email_domain"] = record["email"].split("@")[1]
    return record

transform = CustomFunction(enrich_user)

Chaining Transformers

pipeline = (
    Pipeline(extractor)
    | Rename({"userName": "user_name"})
    | Filter(lambda r: r.get("active"))
    | AddField("processed_at", "now()")
    | Drop(["password", "ssn"])
    | Convert({"age": "int"})
    | loader
)

Part 4: Loaders

PostgreSQL Loader

from pycharter import PostgresLoader

loader = PostgresLoader(
    connection_string="postgresql://user:pass@localhost/db",
    table="users",
    schema="public",
    mode="upsert",  # insert, upsert, replace
    conflict_columns=["id"]
)

File Loader

from pycharter import FileLoader

# JSON output
loader = FileLoader(path="output/users.json", file_format="json")

# CSV output
loader = FileLoader(path="output/users.csv", file_format="csv")

# JSONL (JSON Lines) - good for streaming
loader = FileLoader(path="output/users.jsonl", file_format="jsonl")

# Parquet (compressed, columnar)
loader = FileLoader(path="output/users.parquet", file_format="parquet")

Cloud Storage Loader

from pycharter import CloudStorageLoader

loader = CloudStorageLoader(
    provider="s3",
    bucket="my-bucket",
    path="output/users.json",
    file_format="json"
)

Part 5: Config-Driven Pipelines

Define pipelines in YAML for easier management:

Directory Structure

pipelines/
└── users/
    ├── extract.yaml
    ├── transform.yaml
    └── load.yaml

Extract Config

extract.yaml
type: http
url: https://api.example.com/users
method: GET
headers:
  Authorization: "Bearer ${API_KEY}"
pagination:
  type: offset
  param: page
  limit: 100

Transform Config

transform.yaml
rename:
  userName: user_name
  userEmail: email

filter:
  expression: "active == true"

add:
  processed_at: "now()"
  source: "api"

drop:
  - password
  - internal_id

Load Config

load.yaml
type: postgres
connection_string: "${DATABASE_URL}"
table: users
schema: public
mode: upsert
conflict_columns:
  - id

Run Config-Driven Pipeline

import asyncio
from pycharter import Pipeline

# From directory
pipeline = Pipeline.from_config_dir("pipelines/users/")

# With variables
pipeline = Pipeline.from_config_dir(
    "pipelines/users/",
    variables={
        "API_KEY": "your-api-key",
        "DATABASE_URL": "postgresql://..."
    }
)

result = asyncio.run(pipeline.run())

Part 6: Error Handling

Error Modes

from pycharter.shared.errors import ErrorMode, ErrorContext

# Strict: stop on first error (default)
result = await pipeline.run(
    error_context=ErrorContext(mode=ErrorMode.STRICT)
)

# Lenient: log errors and continue
result = await pipeline.run(
    error_context=ErrorContext(mode=ErrorMode.LENIENT)
)

# Collect: gather all errors without stopping
result = await pipeline.run(
    error_context=ErrorContext(mode=ErrorMode.COLLECT)
)

# Check collected errors
if result.errors:
    for error in result.errors:
        print(f"Error: {error}")

Try-Except Pattern

from pycharter.shared.errors import PyCharterError, ConfigError

try:
    pipeline = Pipeline.from_config_dir("pipelines/users/")
    result = asyncio.run(pipeline.run())
except ConfigError as e:
    print(f"Configuration error: {e}")
except PyCharterError as e:
    print(f"Pipeline error: {e}")

Part 7: Advanced Patterns

Validation During Load

Add contract validation before loading:

from pycharter import Pipeline, Validator

validator = Validator.from_file("contracts/user.yaml")

def validate_and_filter(records):
    valid_records = []
    for record in records:
        result = validator.validate(record)
        if result.is_valid:
            valid_records.append(result.data.model_dump())
    return valid_records

pipeline = (
    Pipeline(extractor)
    | CustomFunction(validate_and_filter)
    | loader
)

Bulk (multi-parameter) runs

When your API uses a path parameter (e.g. {symbol}) and you want to run the same pipeline for many values (e.g. many symbols), use the config-driven orchestrator’s run_bulk():

import asyncio
from pycharter import ETLOrchestrator

orchestrator = ETLOrchestrator(contract_dir="pipelines/barometer/")

# Option 1: Pass values explicitly
result = await orchestrator.run_bulk(
    param_name="symbol",
    param_values=["AAPL", "MSFT", "GOOGL"],
    concurrency=20,
    max_calls_per_minute=300,
    start_date="2024-01-01",
    end_date="2024-01-31",
)

# Option 2: Use default_param_values from extract.yaml (no param_values needed)
result = await orchestrator.run_bulk(
    start_date="2024-01-01",
    end_date="2024-01-31",
)

print(f"Extracted {result['extraction']['total_records']} records")
print(f"Loaded {result['loading']['total']} rows")

In extract.yaml you can define default lists so CLI or schedulers don’t need to pass symbols:

api_endpoint: /v2/aggs/ticker/{symbol}/range/1/day/{start_date}/{end_date}
default_param_values:
  symbol:
    - AAPL
    - MSFT
    - GOOGL

The HTTP extractor automatically injects each path parameter (e.g. symbol) into every extracted record, so transforms and loaders see a symbol field on each row without extra code.

Parallel Extraction

Extract from multiple sources:

import asyncio
from pycharter import Pipeline

# Create multiple pipelines
pipeline1 = Pipeline.from_config_dir("pipelines/source1/")
pipeline2 = Pipeline.from_config_dir("pipelines/source2/")

# Run in parallel
async def run_parallel():
    results = await asyncio.gather(
        pipeline1.run(),
        pipeline2.run()
    )
    return results

results = asyncio.run(run_parallel())

Incremental Loading

Track progress for incremental loads:

from datetime import datetime, timedelta

# Get last run timestamp (from your tracking system)
last_run = datetime(2024, 1, 1)

pipeline = Pipeline.from_config_files(
    extract="extract.yaml",
    load="load.yaml",
    variables={
        "LAST_RUN": last_run.isoformat(),
        "CURRENT_RUN": datetime.now().isoformat()
    }
)

Exercises

  1. Basic Pipeline: Create a pipeline that extracts users from JSONPlaceholder API, filters for users with .com emails, and saves to a JSON file.

  2. Transform Pipeline: Build a pipeline that reads a CSV file, converts all field names to snake_case, adds a timestamp, and writes to JSONL.

  3. Database Pipeline: Create a pipeline that extracts from one PostgreSQL table, transforms the data, and loads to another table.

  4. Config-Driven: Convert one of the above pipelines to use YAML configuration files.

Next Steps