Skip to content

Transformers

Transformers modify data between extraction and loading.

Overview

from pycharter import Rename, Filter, AddField, Drop, Select, Convert, CustomFunction

Built-in Transformers

Rename

Rename fields in records:

from pycharter import Rename

# Single field
transform = Rename({"old_name": "new_name"})

# Multiple fields
transform = Rename({
    "userName": "user_name",
    "userEmail": "email",
    "createdAt": "created_at"
})

Filter

Filter records based on conditions:

from pycharter import Filter

# Lambda function
transform = Filter(lambda r: r.get("active", False))

# Multiple conditions
transform = Filter(lambda r: r["age"] >= 18 and r["status"] == "active")

# Using expression string
transform = Filter("active == true and age >= 18")

AddField

Add new fields to records:

from pycharter import AddField

# Static value
transform = AddField("source", "api")

# Current timestamp
transform = AddField("processed_at", "now()")

# Expression
transform = AddField("full_name", "concat(first_name, ' ', last_name)")
transform = AddField("is_adult", "age >= 18")

# Multiple fields
transform = AddField({
    "source": "api",
    "processed_at": "now()"
})

Drop

Remove fields from records:

from pycharter import Drop

# Single field
transform = Drop(["password"])

# Multiple fields
transform = Drop(["password", "ssn", "internal_id", "temp_field"])

Select

Keep only specified fields:

from pycharter import Select

# Whitelist fields
transform = Select(["id", "name", "email", "created_at"])

Convert

Convert field types:

from pycharter import Convert

transform = Convert({
    "age": "int",
    "price": "float",
    "active": "bool",
    "created_at": "datetime",
    "tags": "list"
})

Default

Set default values for missing fields:

from pycharter import Default

transform = Default({
    "status": "pending",
    "priority": 0,
    "tags": []
})

Map

Apply a function to each record:

from pycharter import Map

def process_record(record):
    record["name"] = record["name"].title()
    return record

transform = Map(process_record)

FlatMap

Map and flatten results:

from pycharter import FlatMap

def explode_tags(record):
    # Return multiple records from one
    for tag in record.get("tags", []):
        yield {**record, "tag": tag}

transform = FlatMap(explode_tags)

CustomFunction

Apply custom transformation logic:

from pycharter import CustomFunction

def enrich_user(records):
    for record in records:
        record["display_name"] = f"{record['first_name']} {record['last_name']}"
        record["email_domain"] = record["email"].split("@")[1]
    return records

transform = CustomFunction(enrich_user)

Chaining Transformers

Use the | operator to chain transformers:

from pycharter import Pipeline, Rename, Filter, AddField, Drop

pipeline = (
    Pipeline(extractor)
    | Rename({"userName": "user_name"})
    | Filter(lambda r: r.get("active"))
    | AddField("processed_at", "now()")
    | Drop(["password", "internal_id"])
    | loader
)

TransformerChain

Create reusable transformer chains:

from pycharter import TransformerChain, Rename, Filter, AddField

# Create reusable chain
user_transforms = TransformerChain([
    Rename({"userName": "user_name"}),
    Filter(lambda r: r.get("active")),
    AddField("processed_at", "now()")
])

# Use in pipeline
pipeline = (
    Pipeline(extractor)
    | user_transforms
    | loader
)

Config-Driven Transformers

Define transformers in YAML:

transform.yaml
rename:
  userName: user_name
  userEmail: email

filter:
  expression: "active == true"

add:
  processed_at: "now()"
  source: "api"

drop:
  - password
  - internal_id

convert:
  age: int
  price: float

Expression Syntax

Supported expressions for AddField and Filter:

Expression Description Example
now() Current timestamp "processed_at": "now()"
concat(...) String concatenation "full_name": "concat(first, ' ', last)"
field >= value Comparison "is_adult": "age >= 18"
field == value Equality "is_active": "status == 'active'"
field and field Logical AND "active and verified"
field or field Logical OR "admin or moderator"
not field Logical NOT "not deleted"

Custom Transformers

Create custom transformers by extending BaseTransformer:

from pycharter.etl_generator.transformers import BaseTransformer
from typing import List, Dict, Any

class UppercaseNames(BaseTransformer):
    def __init__(self, fields: List[str]):
        self.fields = fields

    def transform(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        for record in records:
            for field in self.fields:
                if field in record and isinstance(record[field], str):
                    record[field] = record[field].upper()
        return records

# Use in pipeline
pipeline = (
    Pipeline(extractor)
    | UppercaseNames(["name", "city"])
    | loader
)

See Also