Skip to content

Async Execution Model

PyCharter's ETL pipeline is fully async: Pipeline.run() returns a coroutine, extractors yield batches asynchronously, and loaders perform I/O asynchronously. This page explains how to run pipelines from scripts, async frameworks, notebooks, and background workers.

Why async?

Async execution lets PyCharter overlap I/O — waiting for an API response while writing the previous batch to the database, or consuming a streaming source while accumulating a batch for transformation. All built-in extractors and loaders are async-native.


Running a pipeline

From a script

Use asyncio.run() to create an event loop, run the pipeline, and clean up:

import asyncio
from pycharter import Pipeline

async def main():
    pipeline = Pipeline.from_config_dir("pipelines/users/")
    result = await pipeline.run()
    print(f"Loaded {result.rows_loaded} rows in {result.duration_seconds:.2f}s")
    if not result.success:
        for err in result.errors:
            print(f"  error: {err}")

if __name__ == "__main__":
    asyncio.run(main())

Do not nest asyncio.run()

Call asyncio.run() only once per process and only from synchronous code. Calling it from inside an existing event loop raises RuntimeError: This event loop is already running.

From an async framework (FastAPI, Starlette, etc.)

Inside an existing async context, await the pipeline directly — the framework already manages the event loop:

from fastapi import APIRouter
from pycharter import Pipeline

router = APIRouter()

@router.post("/run-etl")
async def run_etl_endpoint():
    pipeline = Pipeline.from_config_dir("pipelines/users/")
    result = await pipeline.run()
    return {
        "rows_loaded": result.rows_loaded,
        "success": result.success,
        "errors": result.errors,
    }

From Jupyter / IPython

Jupyter notebooks run inside a built-in event loop. Use await directly in a notebook cell:

from pycharter import Pipeline

pipeline = Pipeline.from_config_dir("pipelines/users/")
result = await pipeline.run()
print(result.rows_loaded)

From an async Celery task

from celery import Celery
from pycharter import Pipeline

app = Celery("tasks")

@app.task
async def run_pipeline_task(pipeline_dir: str):
    pipeline = Pipeline.from_config_dir(pipeline_dir)
    result = await pipeline.run()
    return {"rows_loaded": result.rows_loaded, "success": result.success}

Quick reference

Context Event loop How to run
Script (python run.py) Created by asyncio.run() asyncio.run(main())
Jupyter / IPython Built-in notebook loop await pipeline.run()
FastAPI / Starlette Uvicorn's loop await pipeline.run()
Async Celery task Worker's loop await pipeline.run()
Sync code (no loop) None asyncio.run(main())

Error handling and error modes

Pipeline.run() accepts an optional error_context that controls failure behaviour.

Error modes

Mode Behaviour
STRICT (default for most paths) Extraction or load failures raise immediately
LENIENT Failures are logged and appended to result.errors; pipeline continues
COLLECT Same as LENIENT; errors also collected on the context for later inspection

Setting the global mode

from pycharter.shared.errors import set_error_mode, ErrorMode

set_error_mode(ErrorMode.LENIENT)

pipeline = Pipeline.from_config_dir("pipelines/users/")
result = await pipeline.run()

if not result.success:
    for err in result.errors:
        print(f"  {err}")

Per-run error context

Pass a specific ErrorContext to run() without changing the global default:

from pycharter.shared.errors import ErrorContext, ErrorMode

ctx = ErrorContext(mode=ErrorMode.LENIENT)
result = await pipeline.run(error_context=ctx)

Running multiple pipelines concurrently

Use asyncio.gather() to run independent pipelines in parallel:

import asyncio
from pycharter import Pipeline

async def main():
    pipelines = [
        Pipeline.from_config_dir("pipelines/orders/"),
        Pipeline.from_config_dir("pipelines/products/"),
        Pipeline.from_config_dir("pipelines/users/"),
    ]
    results = await asyncio.gather(*[p.run() for p in pipelines])
    for r in results:
        print(f"loaded {r.rows_loaded} rows — success={r.success}")

asyncio.run(main())

Shared database connections

If multiple pipelines write to the same database table, ensure your loader is configured for the correct write mode (upsert, insert, etc.) to avoid conflicts.


Resource management

Connections (HTTP sessions, database connections) are managed inside each extractor and loader and are released when the pipeline run completes. There is no async with context manager on Pipeline itself.

For cleanup across multiple runs in a long-lived process, wrap each pipeline.run() call in a try/finally:

async def safe_run(pipeline):
    try:
        return await pipeline.run()
    except Exception as exc:
        # log, alert, etc.
        raise
    finally:
        # any custom cleanup
        pass

See also