Skip to content

ETL Transformations

PyCharter supports three levels of transformation complexity that are applied in sequence within a single transform.yaml file. Start with simple operations for most cases, layer in JSONata for complex reshaping, and reach for custom Python functions only when declarative approaches fall short.

Transformation order

Extracted data
    ↓ Simple operations  (rename, convert, defaults, add, select, drop)
    ↓ JSONata            (query-language reshaping / aggregation)
    ↓ Custom function    (arbitrary Python logic)
Loaded data

Each step is optional. You can use any combination.


1 · Simple operations

Declarative field-level operations that cover the vast majority of transformation needs.

Configuration format

# transform.yaml
transform:
  rename:
    oldName: new_name
    camelCase: snake_case
  convert:
    price: float
    quantity: integer
    active: boolean
  defaults:
    status: "pending"
  add:
    full_name: "${first_name} ${last_name}"
    created_at: "now()"
    record_id: "uuid()"
  select:
    - user_id
    - full_name
    - created_at
  drop:
    - internal_debug
rename:
  oldName: new_name
convert:
  price: float
defaults:
  status: "pending"

Available operations

rename — rename fields

transform:
  rename:
    userId: user_id
    firstName: first_name
    lastName: last_name

convert — type coercion

Target type Accepted values
string / str Any value
integer / int Numeric strings, floats
float / number / numeric Numeric strings
boolean / bool "true", "1", "yes"True
datetime ISO 8601 or common date strings
date ISO 8601 date strings
transform:
  convert:
    price: float
    quantity: integer
    active: boolean
    created_at: datetime

Conversion errors

If a value cannot be converted, the original value is kept and a warning is logged. The pipeline does not fail.

defaults — fill missing / null fields

transform:
  defaults:
    status: "pending"
    priority: 0
    category: "uncategorized"

add — computed fields

Supported expressions:

Expression Example Result
Field reference "${field_name}" Value of field_name
String concatenation "${first_name} ${last_name}" "John Doe"
Timestamp "now()" Current UTC datetime string
UUID "uuid()" Random UUID string
Literal "production" The literal string
transform:
  add:
    full_name: "${first_name} ${last_name}"
    ingested_at: "now()"
    record_id: "uuid()"
    environment: "production"

select — keep only specified fields

transform:
  select:
    - user_id
    - email
    - created_at

All other fields are removed.

drop — remove specified fields

transform:
  drop:
    - internal_id
    - debug_payload
    - temp_field

All other fields are kept.

Operation execution order

Within the transform: block, operations run in this fixed order regardless of their position in the YAML:

  1. rename
  2. convert
  3. defaults
  4. add
  5. select
  6. drop

2 · JSONata

JSONata is a powerful query-and-transformation language for JSON data. Use it when you need aggregations, reshaping across nested structures, or conditional field derivation.

Configuration

jsonata:
  expression: |
    $.{
      "ticker": symbol,
      "price_change": price - previousClose,
      "change_pct": $round(((price - previousClose) / previousClose) * 100, 2)
    }
  mode: "record"   # "record" (default) or "batch"
Mode Description
record Expression applied to each record independently
batch Expression applied to the entire dataset (array)

Examples

Rename and calculate (record mode):

jsonata:
  expression: |
    $.{
      "ticker": symbol,
      "price_change": price - previousClose,
      "change_pct": $round(((price - previousClose) / previousClose) * 100, 2)
    }
  mode: "record"

Aggregation (batch mode):

jsonata:
  expression: |
    {
      "total": $count($),
      "avg_price": $average($.price),
      "max_price": $max($.price),
      "min_price": $min($.price)
    }
  mode: "batch"

Filtering (record mode):

jsonata:
  expression: "$[price > 100]"
  mode: "record"

Restructure nested objects (record mode):

jsonata:
  expression: |
    $.{
      "user": { "id": userId, "name": userName },
      "ts": { "created": created_at, "updated": updated_at }
    }
  mode: "record"

JSONata is applied after simple operations and before the custom function.


3 · Custom Python function

For business logic that cannot be expressed declaratively: load an arbitrary Python function at runtime.

Configuration

custom_function:
  module: "myproject.transforms"
  function: "process_records"
  mode: "batch"   # "batch" (default) or "record"
  kwargs:
    threshold: 0.05
    method: "zscore"

Alternatively, use a dotted callable path:

custom_function:
  callable: "myproject.transforms.process_records"
  mode: "batch"

Function signatures

from typing import Any

def process_records(
    data: list[dict[str, Any]], **kwargs: Any
) -> list[dict[str, Any]]:
    """Receive the full dataset, return transformed dataset."""
    return [enrich(r) for r in data]
from typing import Any

def process_record(
    record: dict[str, Any], **kwargs: Any
) -> dict[str, Any] | None:
    """Receive one record, return it (or None to skip it)."""
    if record.get("active"):
        return record
    return None   # skipped

Class-based functions

If the target is a class, PyCharter calls the first available method in this order: optimize(), run(), __call__().

class PortfolioOptimizer:
    def optimize(
        self, data: list[dict[str, Any]], **kwargs: Any
    ) -> list[dict[str, Any]]:
        # Optimization logic
        return optimized_data
custom_function:
  module: "myproject.transforms"
  function: "PortfolioOptimizer"
  mode: "batch"
  kwargs:
    method: "min_volatility"

Combining all three

# transform.yaml

# Step 1 — simple field ops
transform:
  rename:
    userId: user_id
  convert:
    price: float
  defaults:
    status: "pending"
  add:
    ingested_at: "now()"

# Step 2 — JSONata reshaping (applied after step 1)
jsonata:
  expression: |
    $.{
      "user_id": user_id,
      "price": price,
      "value": price * quantity,
      "ingested_at": ingested_at
    }
  mode: "record"

# Step 3 — custom Python logic (applied after step 2)
custom_function:
  module: "myproject.transforms"
  function: "flag_anomalies"
  mode: "batch"
  kwargs:
    zscore_threshold: 3.0

Error handling summary

Layer Failure behaviour
Simple convert Original value kept, warning logged
Simple rename / add Missing source field logged, step skipped
JSONata Pipeline run fails with expression error detail
Custom function Pipeline run fails with exception traceback

Best practices

  • Start with simple operations — they are the fastest and easiest to read.
  • Use JSONata for reshaping — aggregations, conditional logic, and nested-to-flat conversions.
  • Use custom functions sparingly — reserve them for business logic that genuinely cannot be expressed declaratively.
  • Test each layer independently — use the pipeline testing harness to feed fixture data through each step.
  • Comment complex JSONata — add a YAML comment explaining what the expression does.

See also