Skip to content

Custom Transformers

Create reusable transformation functions for your ETL pipelines.

Basic Custom Transformer

from pycharter.etl_generator.transformers import BaseTransformer
from typing import List, Dict, Any

class Deduplicate(BaseTransformer):
    """Remove duplicate records based on a key field."""

    def __init__(self, key_field: str):
        self.key_field = key_field

    def transform(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        seen = set()
        unique = []

        for record in records:
            key = record.get(self.key_field)
            if key not in seen:
                seen.add(key)
                unique.append(record)

        return unique

# Use in pipeline
pipeline = (
    Pipeline(extractor)
    | Deduplicate("id")
    | loader
)

Using CustomFunction

For simple transformations, use CustomFunction:

from pycharter import CustomFunction

def add_metadata(records):
    for record in records:
        record["processed_at"] = datetime.now().isoformat()
        record["source"] = "etl"
    return records

pipeline = (
    Pipeline(extractor)
    | CustomFunction(add_metadata)
    | loader
)

Stateful Transformers

class RunningAverage(BaseTransformer):
    """Calculate running average of a numeric field."""

    def __init__(self, field: str, output_field: str = None):
        self.field = field
        self.output_field = output_field or f"{field}_avg"
        self.sum = 0
        self.count = 0

    def transform(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        for record in records:
            value = record.get(self.field, 0)
            self.sum += value
            self.count += 1
            record[self.output_field] = self.sum / self.count
        return records

Async Transformers

class EnrichFromAPI(BaseTransformer):
    """Enrich records with data from an external API."""

    def __init__(self, api_url: str, key_field: str):
        self.api_url = api_url
        self.key_field = key_field

    async def transform_async(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        import httpx

        async with httpx.AsyncClient() as client:
            for record in records:
                key = record.get(self.key_field)
                response = await client.get(f"{self.api_url}/{key}")
                if response.status_code == 200:
                    record["enrichment"] = response.json()

        return records

Best Practices

  1. Keep transformers focused - One transformation per class
  2. Make them reusable - Use parameters for configuration
  3. Handle edge cases - Check for missing fields, None values
  4. Document behavior - Add docstrings and examples
  5. Test thoroughly - Unit test each transformer

See Also