Skip to content

Loaders

Loaders write transformed data to destinations.

Overview

from pycharter import PostgresLoader, FileLoader, CloudStorageLoader

PostgresLoader

Load data to PostgreSQL databases.

PostgresLoader

PostgresLoader(
    connection_string: str,
    table: str,
    schema: str = "public",
    write_method: str = "upsert",
    primary_key: str | list[str] | None = None,
    batch_size: int = 1000,
    ssh_tunnel: dict[str, Any] | None = None,
    update_columns: list[str] | None = None,
)

Bases: BaseLoader

Loader for PostgreSQL databases.

Supports: - Insert, upsert, replace, update, delete, truncate_and_load - Bulk operations for efficiency - SSH tunneling

Example

loader = PostgresLoader( ... connection_string="postgresql://user:pass@localhost/db", ... table="users", ... write_method="upsert", ... primary_key="id", ... ) result = await loader.load(data)

from_config classmethod

from_config(config: dict[str, Any]) -> 'PostgresLoader'

Create loader from configuration dict.

load async

load(data: list[dict[str, Any]], **params) -> LoadResult

Load data to PostgreSQL.

Load Modes

Mode Description
insert Insert all records (fails on duplicates)
upsert Insert or update on conflict
replace Truncate table and insert

Examples

from pycharter import PostgresLoader

# Basic insert
loader = PostgresLoader(
    connection_string="postgresql://user:pass@localhost/db",
    table="users"
)

# Upsert (update on conflict)
loader = PostgresLoader(
    connection_string="postgresql://user:pass@localhost/db",
    table="users",
    schema="public",
    mode="upsert",
    conflict_columns=["id"]
)

# Replace (truncate and insert)
loader = PostgresLoader(
    connection_string="postgresql://user:pass@localhost/db",
    table="users",
    mode="replace"
)

# With SSH tunnel
loader = PostgresLoader(
    connection_string="postgresql://user:pass@localhost/db",
    table="users",
    ssh_tunnel={
        "host": "bastion.example.com",
        "port": 22,
        "username": "deploy",
        "key_file": "~/.ssh/id_rsa"
    }
)

DatabaseLoader

Generic database loader supporting multiple databases.

from pycharter import DatabaseLoader

# MySQL
loader = DatabaseLoader(
    connection_string="mysql://user:pass@localhost/db",
    table="users"
)

# SQLite
loader = DatabaseLoader(
    connection_string="sqlite:///data.db",
    table="users"
)

FileLoader

Load data to local files.

FileLoader

FileLoader(
    path: str,
    file_format: str = "json",
    write_mode: str = "overwrite",
)

Bases: BaseLoader

Loader for local files.

Supports JSON, CSV, Parquet, and JSONL formats.

Example

loader = FileLoader(path="output/data.json", format="json") result = await loader.load(data)

from_config classmethod

from_config(config: dict[str, Any]) -> 'FileLoader'

Create loader from configuration dict.

load async

load(data: list[dict[str, Any]], **params) -> LoadResult

Load data to file.

Supported Formats

Format Extension Notes
JSON .json Pretty-printed array
JSON Lines .jsonl One JSON object per line
CSV .csv With header row
Parquet .parquet Columnar, compressed

Examples

from pycharter import FileLoader

# JSON output
loader = FileLoader(path="output/users.json", file_format="json")

# JSON Lines (streaming-friendly)
loader = FileLoader(path="output/users.jsonl", file_format="jsonl")

# CSV output
loader = FileLoader(path="output/users.csv", file_format="csv")

# Parquet (compressed, columnar)
loader = FileLoader(path="output/users.parquet", file_format="parquet")

# Append mode
loader = FileLoader(
    path="output/users.jsonl",
    file_format="jsonl",
    write_mode="append"
)

CloudStorageLoader

Load data to cloud storage (S3, GCS, Azure Blob).

CloudStorageLoader

CloudStorageLoader(
    provider: str,
    bucket: str,
    path: str,
    credentials: dict[str, Any] | None = None,
    file_format: str = "json",
)

Bases: BaseLoader

Loader for cloud storage (S3, GCS, Azure).

Supports JSON, CSV, Parquet, and JSONL formats.

Example

loader = CloudStorageLoader( ... provider="s3", ... bucket="my-bucket", ... path="output/data.json", ... format="json", ... ) result = await loader.load(data)

from_config classmethod

from_config(config: dict[str, Any]) -> 'CloudStorageLoader'

Create loader from configuration dict.

load async

load(data: list[dict[str, Any]], **params) -> LoadResult

Load data to cloud storage.

Examples

from pycharter import CloudStorageLoader

# AWS S3
loader = CloudStorageLoader(
    provider="s3",
    bucket="my-bucket",
    path="output/users.json",
    file_format="json",
    credentials={
        "aws_access_key_id": "${AWS_KEY}",
        "aws_secret_access_key": "${AWS_SECRET}"
    }
)

# Google Cloud Storage
loader = CloudStorageLoader(
    provider="gcs",
    bucket="my-bucket",
    path="output/users.json",
    file_format="json"
)

# Azure Blob Storage
loader = CloudStorageLoader(
    provider="azure",
    container="my-container",
    path="output/users.json",
    file_format="json",
    credentials={
        "connection_string": "${AZURE_STORAGE_CONNECTION_STRING}"
    }
)

# Parquet to S3
loader = CloudStorageLoader(
    provider="s3",
    bucket="data-lake",
    path="users/date=2024-01-01/users.parquet",
    file_format="parquet"
)

LoaderFactory

Create loaders from configuration:

from pycharter.etl_generator.loaders import LoaderFactory

# Create from config
loader = LoaderFactory.create({
    "type": "postgres",
    "connection_string": "postgresql://...",
    "table": "users"
})

# Register custom loader
LoaderFactory.register("bigquery", BigQueryLoader)

LoadResult

LoadResult dataclass

LoadResult(
    success: bool = True,
    rows_loaded: int = 0,
    rows_failed: int = 0,
    error: str | None = None,
    duration_seconds: float | None = None,
    inserted: int | None = None,
    updated: int | None = None,
)

Result from a load operation.

rows_loaded is the total affected count (inserted + updated when both are provided). inserted and updated are optional; loaders that support upsert (e.g. Postgres) set them. When omitted, consumers can treat rows_loaded as total and inserted, and 0 as updated.

success class-attribute instance-attribute

success: bool = True

rows_loaded class-attribute instance-attribute

rows_loaded: int = 0

duration_seconds class-attribute instance-attribute

duration_seconds: float | None = None

error class-attribute instance-attribute

error: str | None = None

Config-Driven Loaders

Define loaders in YAML:

load.yaml (PostgreSQL)
type: postgres
connection_string: "${DATABASE_URL}"
table: users
schema: public
mode: upsert
conflict_columns:
  - id
load.yaml (File)
type: file
file_path: output/users.json
format: json
write_mode: overwrite
load.yaml (S3)
type: cloud_storage
storage:
  provider: s3
  bucket: my-bucket
  path: output/users.json
format: json
credentials:
  aws_access_key_id: "${AWS_KEY}"
  aws_secret_access_key: "${AWS_SECRET}"

Custom Loaders

Create custom loaders by extending BaseLoader:

from pycharter.etl_generator.loaders import BaseLoader
from pycharter.etl_generator.result import LoadResult
from typing import List, Dict, Any

class BigQueryLoader(BaseLoader):
    def __init__(self, project: str, dataset: str, table: str):
        self.project = project
        self.dataset = dataset
        self.table = table

    async def load(self, data: List[Dict[str, Any]], **params) -> LoadResult:
        try:
            # Implement BigQuery loading logic
            client = bigquery.Client(project=self.project)
            table_ref = f"{self.dataset}.{self.table}"

            job = client.load_table_from_json(data, table_ref)
            job.result()  # Wait for completion

            return LoadResult(success=True, rows_loaded=len(data))
        except Exception as e:
            return LoadResult(success=False, error=str(e))

# Register
LoaderFactory.register("bigquery", BigQueryLoader)

See Also