ETL Transformations¶
PyCharter supports three levels of transformation complexity that are applied in sequence within a single transform.yaml file. Start with simple operations for most cases, layer in JSONata for complex reshaping, and reach for custom Python functions only when declarative approaches fall short.
Transformation order¶
Extracted data
↓ Simple operations (rename, convert, defaults, add, select, drop)
↓ JSONata (query-language reshaping / aggregation)
↓ Custom function (arbitrary Python logic)
Loaded data
Each step is optional. You can use any combination.
1 · Simple operations¶
Declarative field-level operations that cover the vast majority of transformation needs.
Configuration format¶
# transform.yaml
transform:
rename:
oldName: new_name
camelCase: snake_case
convert:
price: float
quantity: integer
active: boolean
defaults:
status: "pending"
add:
full_name: "${first_name} ${last_name}"
created_at: "now()"
record_id: "uuid()"
select:
- user_id
- full_name
- created_at
drop:
- internal_debug
Available operations¶
rename — rename fields¶
convert — type coercion¶
| Target type | Accepted values |
|---|---|
string / str |
Any value |
integer / int |
Numeric strings, floats |
float / number / numeric |
Numeric strings |
boolean / bool |
"true", "1", "yes" → True |
datetime |
ISO 8601 or common date strings |
date |
ISO 8601 date strings |
Conversion errors
If a value cannot be converted, the original value is kept and a warning is logged. The pipeline does not fail.
defaults — fill missing / null fields¶
add — computed fields¶
Supported expressions:
| Expression | Example | Result |
|---|---|---|
| Field reference | "${field_name}" |
Value of field_name |
| String concatenation | "${first_name} ${last_name}" |
"John Doe" |
| Timestamp | "now()" |
Current UTC datetime string |
| UUID | "uuid()" |
Random UUID string |
| Literal | "production" |
The literal string |
transform:
add:
full_name: "${first_name} ${last_name}"
ingested_at: "now()"
record_id: "uuid()"
environment: "production"
select — keep only specified fields¶
All other fields are removed.
drop — remove specified fields¶
All other fields are kept.
Operation execution order¶
Within the transform: block, operations run in this fixed order regardless of their position in the YAML:
renameconvertdefaultsaddselectdrop
2 · JSONata¶
JSONata is a powerful query-and-transformation language for JSON data. Use it when you need aggregations, reshaping across nested structures, or conditional field derivation.
Configuration¶
jsonata:
expression: |
$.{
"ticker": symbol,
"price_change": price - previousClose,
"change_pct": $round(((price - previousClose) / previousClose) * 100, 2)
}
mode: "record" # "record" (default) or "batch"
| Mode | Description |
|---|---|
record |
Expression applied to each record independently |
batch |
Expression applied to the entire dataset (array) |
Examples¶
Rename and calculate (record mode):
jsonata:
expression: |
$.{
"ticker": symbol,
"price_change": price - previousClose,
"change_pct": $round(((price - previousClose) / previousClose) * 100, 2)
}
mode: "record"
Aggregation (batch mode):
jsonata:
expression: |
{
"total": $count($),
"avg_price": $average($.price),
"max_price": $max($.price),
"min_price": $min($.price)
}
mode: "batch"
Filtering (record mode):
Restructure nested objects (record mode):
jsonata:
expression: |
$.{
"user": { "id": userId, "name": userName },
"ts": { "created": created_at, "updated": updated_at }
}
mode: "record"
JSONata resources
JSONata is applied after simple operations and before the custom function.
3 · Custom Python function¶
For business logic that cannot be expressed declaratively: load an arbitrary Python function at runtime.
Configuration¶
custom_function:
module: "myproject.transforms"
function: "process_records"
mode: "batch" # "batch" (default) or "record"
kwargs:
threshold: 0.05
method: "zscore"
Alternatively, use a dotted callable path:
Function signatures¶
Class-based functions¶
If the target is a class, PyCharter calls the first available method in this order: optimize(), run(), __call__().
class PortfolioOptimizer:
def optimize(
self, data: list[dict[str, Any]], **kwargs: Any
) -> list[dict[str, Any]]:
# Optimization logic
return optimized_data
custom_function:
module: "myproject.transforms"
function: "PortfolioOptimizer"
mode: "batch"
kwargs:
method: "min_volatility"
Combining all three¶
# transform.yaml
# Step 1 — simple field ops
transform:
rename:
userId: user_id
convert:
price: float
defaults:
status: "pending"
add:
ingested_at: "now()"
# Step 2 — JSONata reshaping (applied after step 1)
jsonata:
expression: |
$.{
"user_id": user_id,
"price": price,
"value": price * quantity,
"ingested_at": ingested_at
}
mode: "record"
# Step 3 — custom Python logic (applied after step 2)
custom_function:
module: "myproject.transforms"
function: "flag_anomalies"
mode: "batch"
kwargs:
zscore_threshold: 3.0
Error handling summary¶
| Layer | Failure behaviour |
|---|---|
Simple convert |
Original value kept, warning logged |
Simple rename / add |
Missing source field logged, step skipped |
| JSONata | Pipeline run fails with expression error detail |
| Custom function | Pipeline run fails with exception traceback |
Best practices¶
- Start with simple operations — they are the fastest and easiest to read.
- Use JSONata for reshaping — aggregations, conditional logic, and nested-to-flat conversions.
- Use custom functions sparingly — reserve them for business logic that genuinely cannot be expressed declaratively.
- Test each layer independently — use the pipeline testing harness to feed fixture data through each step.
- Comment complex JSONata — add a YAML comment explaining what the expression does.
See also¶
- Building ETL Pipelines — end-to-end tutorial
- Testing Pipelines — mock data, assertions, and the test harness
- Custom Transformers — implementing the transformer protocol directly
- Streaming and Messaging — transform while streaming