Custom Transformers¶
Create reusable transformation functions for your ETL pipelines.
Basic Custom Transformer¶
from pycharter.etl_generator.transformers import BaseTransformer
from typing import List, Dict, Any
class Deduplicate(BaseTransformer):
"""Remove duplicate records based on a key field."""
def __init__(self, key_field: str):
self.key_field = key_field
def transform(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen = set()
unique = []
for record in records:
key = record.get(self.key_field)
if key not in seen:
seen.add(key)
unique.append(record)
return unique
# Use in pipeline
pipeline = (
Pipeline(extractor)
| Deduplicate("id")
| loader
)
Using CustomFunction¶
For simple transformations, use CustomFunction:
from pycharter import CustomFunction
def add_metadata(records):
for record in records:
record["processed_at"] = datetime.now().isoformat()
record["source"] = "etl"
return records
pipeline = (
Pipeline(extractor)
| CustomFunction(add_metadata)
| loader
)
Stateful Transformers¶
class RunningAverage(BaseTransformer):
"""Calculate running average of a numeric field."""
def __init__(self, field: str, output_field: str = None):
self.field = field
self.output_field = output_field or f"{field}_avg"
self.sum = 0
self.count = 0
def transform(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
for record in records:
value = record.get(self.field, 0)
self.sum += value
self.count += 1
record[self.output_field] = self.sum / self.count
return records
Async Transformers¶
class EnrichFromAPI(BaseTransformer):
"""Enrich records with data from an external API."""
def __init__(self, api_url: str, key_field: str):
self.api_url = api_url
self.key_field = key_field
async def transform_async(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
import httpx
async with httpx.AsyncClient() as client:
for record in records:
key = record.get(self.key_field)
response = await client.get(f"{self.api_url}/{key}")
if response.status_code == 200:
record["enrichment"] = response.json()
return records
Best Practices¶
- Keep transformers focused - One transformation per class
- Make them reusable - Use parameters for configuration
- Handle edge cases - Check for missing fields, None values
- Document behavior - Add docstrings and examples
- Test thoroughly - Unit test each transformer