Transformers¶
Transformers modify data between extraction and loading.
Overview¶
Built-in Transformers¶
Rename¶
Rename fields in records:
from pycharter import Rename
# Single field
transform = Rename({"old_name": "new_name"})
# Multiple fields
transform = Rename({
"userName": "user_name",
"userEmail": "email",
"createdAt": "created_at"
})
Filter¶
Filter records based on conditions:
from pycharter import Filter
# Lambda function
transform = Filter(lambda r: r.get("active", False))
# Multiple conditions
transform = Filter(lambda r: r["age"] >= 18 and r["status"] == "active")
# Using expression string
transform = Filter("active == true and age >= 18")
AddField¶
Add new fields to records:
from pycharter import AddField
# Static value
transform = AddField("source", "api")
# Current timestamp
transform = AddField("processed_at", "now()")
# Expression
transform = AddField("full_name", "concat(first_name, ' ', last_name)")
transform = AddField("is_adult", "age >= 18")
# Multiple fields
transform = AddField({
"source": "api",
"processed_at": "now()"
})
Drop¶
Remove fields from records:
from pycharter import Drop
# Single field
transform = Drop(["password"])
# Multiple fields
transform = Drop(["password", "ssn", "internal_id", "temp_field"])
Select¶
Keep only specified fields:
from pycharter import Select
# Whitelist fields
transform = Select(["id", "name", "email", "created_at"])
Convert¶
Convert field types:
from pycharter import Convert
transform = Convert({
"age": "int",
"price": "float",
"active": "bool",
"created_at": "datetime",
"tags": "list"
})
Default¶
Set default values for missing fields:
from pycharter import Default
transform = Default({
"status": "pending",
"priority": 0,
"tags": []
})
Map¶
Apply a function to each record:
from pycharter import Map
def process_record(record):
record["name"] = record["name"].title()
return record
transform = Map(process_record)
FlatMap¶
Map and flatten results:
from pycharter import FlatMap
def explode_tags(record):
# Return multiple records from one
for tag in record.get("tags", []):
yield {**record, "tag": tag}
transform = FlatMap(explode_tags)
CustomFunction¶
Apply custom transformation logic:
from pycharter import CustomFunction
def enrich_user(records):
for record in records:
record["display_name"] = f"{record['first_name']} {record['last_name']}"
record["email_domain"] = record["email"].split("@")[1]
return records
transform = CustomFunction(enrich_user)
Chaining Transformers¶
Use the | operator to chain transformers:
from pycharter import Pipeline, Rename, Filter, AddField, Drop
pipeline = (
Pipeline(extractor)
| Rename({"userName": "user_name"})
| Filter(lambda r: r.get("active"))
| AddField("processed_at", "now()")
| Drop(["password", "internal_id"])
| loader
)
TransformerChain¶
Create reusable transformer chains:
from pycharter import TransformerChain, Rename, Filter, AddField
# Create reusable chain
user_transforms = TransformerChain([
Rename({"userName": "user_name"}),
Filter(lambda r: r.get("active")),
AddField("processed_at", "now()")
])
# Use in pipeline
pipeline = (
Pipeline(extractor)
| user_transforms
| loader
)
Config-Driven Transformers¶
Define transformers in YAML:
transform.yaml
rename:
userName: user_name
userEmail: email
filter:
expression: "active == true"
add:
processed_at: "now()"
source: "api"
drop:
- password
- internal_id
convert:
age: int
price: float
Expression Syntax¶
Supported expressions for AddField and Filter:
| Expression | Description | Example |
|---|---|---|
now() |
Current timestamp | "processed_at": "now()" |
concat(...) |
String concatenation | "full_name": "concat(first, ' ', last)" |
field >= value |
Comparison | "is_adult": "age >= 18" |
field == value |
Equality | "is_active": "status == 'active'" |
field and field |
Logical AND | "active and verified" |
field or field |
Logical OR | "admin or moderator" |
not field |
Logical NOT | "not deleted" |
Custom Transformers¶
Create custom transformers by extending BaseTransformer:
from pycharter.etl_generator.transformers import BaseTransformer
from typing import List, Dict, Any
class UppercaseNames(BaseTransformer):
def __init__(self, fields: List[str]):
self.fields = fields
def transform(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
for record in records:
for field in self.fields:
if field in record and isinstance(record[field], str):
record[field] = record[field].upper()
return records
# Use in pipeline
pipeline = (
Pipeline(extractor)
| UppercaseNames(["name", "city"])
| loader
)