Loaders¶
Loaders write transformed data to destinations.
Overview¶
PostgresLoader¶
Load data to PostgreSQL databases.
PostgresLoader
¶
PostgresLoader(
connection_string: str,
table: str,
schema: str = "public",
write_method: str = "upsert",
primary_key: str | list[str] | None = None,
batch_size: int = 1000,
ssh_tunnel: dict[str, Any] | None = None,
update_columns: list[str] | None = None,
)
Bases: BaseLoader
Loader for PostgreSQL databases.
Supports: - Insert, upsert, replace, update, delete, truncate_and_load - Bulk operations for efficiency - SSH tunneling
Example
loader = PostgresLoader( ... connection_string="postgresql://user:pass@localhost/db", ... table="users", ... write_method="upsert", ... primary_key="id", ... ) result = await loader.load(data)
from_config
classmethod
¶
Create loader from configuration dict.
Load Modes¶
| Mode | Description |
|---|---|
insert |
Insert all records (fails on duplicates) |
upsert |
Insert or update on conflict |
replace |
Truncate table and insert |
Examples¶
from pycharter import PostgresLoader
# Basic insert
loader = PostgresLoader(
connection_string="postgresql://user:pass@localhost/db",
table="users"
)
# Upsert (update on conflict)
loader = PostgresLoader(
connection_string="postgresql://user:pass@localhost/db",
table="users",
schema="public",
mode="upsert",
conflict_columns=["id"]
)
# Replace (truncate and insert)
loader = PostgresLoader(
connection_string="postgresql://user:pass@localhost/db",
table="users",
mode="replace"
)
# With SSH tunnel
loader = PostgresLoader(
connection_string="postgresql://user:pass@localhost/db",
table="users",
ssh_tunnel={
"host": "bastion.example.com",
"port": 22,
"username": "deploy",
"key_file": "~/.ssh/id_rsa"
}
)
DatabaseLoader¶
Generic database loader supporting multiple databases.
from pycharter import DatabaseLoader
# MySQL
loader = DatabaseLoader(
connection_string="mysql://user:pass@localhost/db",
table="users"
)
# SQLite
loader = DatabaseLoader(
connection_string="sqlite:///data.db",
table="users"
)
FileLoader¶
Load data to local files.
FileLoader
¶
Bases: BaseLoader
Loader for local files.
Supports JSON, CSV, Parquet, and JSONL formats.
Example
loader = FileLoader(path="output/data.json", format="json") result = await loader.load(data)
from_config
classmethod
¶
Create loader from configuration dict.
Supported Formats¶
| Format | Extension | Notes |
|---|---|---|
| JSON | .json |
Pretty-printed array |
| JSON Lines | .jsonl |
One JSON object per line |
| CSV | .csv |
With header row |
| Parquet | .parquet |
Columnar, compressed |
Examples¶
from pycharter import FileLoader
# JSON output
loader = FileLoader(path="output/users.json", file_format="json")
# JSON Lines (streaming-friendly)
loader = FileLoader(path="output/users.jsonl", file_format="jsonl")
# CSV output
loader = FileLoader(path="output/users.csv", file_format="csv")
# Parquet (compressed, columnar)
loader = FileLoader(path="output/users.parquet", file_format="parquet")
# Append mode
loader = FileLoader(
path="output/users.jsonl",
file_format="jsonl",
write_mode="append"
)
CloudStorageLoader¶
Load data to cloud storage (S3, GCS, Azure Blob).
CloudStorageLoader
¶
CloudStorageLoader(
provider: str,
bucket: str,
path: str,
credentials: dict[str, Any] | None = None,
file_format: str = "json",
)
Bases: BaseLoader
Loader for cloud storage (S3, GCS, Azure).
Supports JSON, CSV, Parquet, and JSONL formats.
Example
loader = CloudStorageLoader( ... provider="s3", ... bucket="my-bucket", ... path="output/data.json", ... format="json", ... ) result = await loader.load(data)
from_config
classmethod
¶
Create loader from configuration dict.
Examples¶
from pycharter import CloudStorageLoader
# AWS S3
loader = CloudStorageLoader(
provider="s3",
bucket="my-bucket",
path="output/users.json",
file_format="json",
credentials={
"aws_access_key_id": "${AWS_KEY}",
"aws_secret_access_key": "${AWS_SECRET}"
}
)
# Google Cloud Storage
loader = CloudStorageLoader(
provider="gcs",
bucket="my-bucket",
path="output/users.json",
file_format="json"
)
# Azure Blob Storage
loader = CloudStorageLoader(
provider="azure",
container="my-container",
path="output/users.json",
file_format="json",
credentials={
"connection_string": "${AZURE_STORAGE_CONNECTION_STRING}"
}
)
# Parquet to S3
loader = CloudStorageLoader(
provider="s3",
bucket="data-lake",
path="users/date=2024-01-01/users.parquet",
file_format="parquet"
)
LoaderFactory¶
Create loaders from configuration:
from pycharter.etl_generator.loaders import LoaderFactory
# Create from config
loader = LoaderFactory.create({
"type": "postgres",
"connection_string": "postgresql://...",
"table": "users"
})
# Register custom loader
LoaderFactory.register("bigquery", BigQueryLoader)
LoadResult¶
LoadResult
dataclass
¶
LoadResult(
success: bool = True,
rows_loaded: int = 0,
rows_failed: int = 0,
error: str | None = None,
duration_seconds: float | None = None,
inserted: int | None = None,
updated: int | None = None,
)
Result from a load operation.
rows_loaded is the total affected count (inserted + updated when both are provided). inserted and updated are optional; loaders that support upsert (e.g. Postgres) set them. When omitted, consumers can treat rows_loaded as total and inserted, and 0 as updated.
Config-Driven Loaders¶
Define loaders in YAML:
type: postgres
connection_string: "${DATABASE_URL}"
table: users
schema: public
mode: upsert
conflict_columns:
- id
type: cloud_storage
storage:
provider: s3
bucket: my-bucket
path: output/users.json
format: json
credentials:
aws_access_key_id: "${AWS_KEY}"
aws_secret_access_key: "${AWS_SECRET}"
Custom Loaders¶
Create custom loaders by extending BaseLoader:
from pycharter.etl_generator.loaders import BaseLoader
from pycharter.etl_generator.result import LoadResult
from typing import List, Dict, Any
class BigQueryLoader(BaseLoader):
def __init__(self, project: str, dataset: str, table: str):
self.project = project
self.dataset = dataset
self.table = table
async def load(self, data: List[Dict[str, Any]], **params) -> LoadResult:
try:
# Implement BigQuery loading logic
client = bigquery.Client(project=self.project)
table_ref = f"{self.dataset}.{self.table}"
job = client.load_table_from_json(data, table_ref)
job.result() # Wait for completion
return LoadResult(success=True, rows_loaded=len(data))
except Exception as e:
return LoadResult(success=False, error=str(e))
# Register
LoaderFactory.register("bigquery", BigQueryLoader)