Async Execution Model¶
PyCharter's ETL pipeline is fully async: Pipeline.run() returns a coroutine, extractors yield batches asynchronously, and loaders perform I/O asynchronously. This page explains how to run pipelines from scripts, async frameworks, notebooks, and background workers.
Why async?¶
Async execution lets PyCharter overlap I/O — waiting for an API response while writing the previous batch to the database, or consuming a streaming source while accumulating a batch for transformation. All built-in extractors and loaders are async-native.
Running a pipeline¶
From a script¶
Use asyncio.run() to create an event loop, run the pipeline, and clean up:
import asyncio
from pycharter import Pipeline
async def main():
pipeline = Pipeline.from_config_dir("pipelines/users/")
result = await pipeline.run()
print(f"Loaded {result.rows_loaded} rows in {result.duration_seconds:.2f}s")
if not result.success:
for err in result.errors:
print(f" error: {err}")
if __name__ == "__main__":
asyncio.run(main())
Do not nest asyncio.run()
Call asyncio.run() only once per process and only from synchronous code. Calling it from inside an existing event loop raises RuntimeError: This event loop is already running.
From an async framework (FastAPI, Starlette, etc.)¶
Inside an existing async context, await the pipeline directly — the framework already manages the event loop:
from fastapi import APIRouter
from pycharter import Pipeline
router = APIRouter()
@router.post("/run-etl")
async def run_etl_endpoint():
pipeline = Pipeline.from_config_dir("pipelines/users/")
result = await pipeline.run()
return {
"rows_loaded": result.rows_loaded,
"success": result.success,
"errors": result.errors,
}
From Jupyter / IPython¶
Jupyter notebooks run inside a built-in event loop. Use await directly in a notebook cell:
from pycharter import Pipeline
pipeline = Pipeline.from_config_dir("pipelines/users/")
result = await pipeline.run()
print(result.rows_loaded)
From an async Celery task¶
from celery import Celery
from pycharter import Pipeline
app = Celery("tasks")
@app.task
async def run_pipeline_task(pipeline_dir: str):
pipeline = Pipeline.from_config_dir(pipeline_dir)
result = await pipeline.run()
return {"rows_loaded": result.rows_loaded, "success": result.success}
Quick reference¶
| Context | Event loop | How to run |
|---|---|---|
Script (python run.py) |
Created by asyncio.run() |
asyncio.run(main()) |
| Jupyter / IPython | Built-in notebook loop | await pipeline.run() |
| FastAPI / Starlette | Uvicorn's loop | await pipeline.run() |
| Async Celery task | Worker's loop | await pipeline.run() |
| Sync code (no loop) | None | asyncio.run(main()) |
Error handling and error modes¶
Pipeline.run() accepts an optional error_context that controls failure behaviour.
Error modes¶
| Mode | Behaviour |
|---|---|
STRICT (default for most paths) |
Extraction or load failures raise immediately |
LENIENT |
Failures are logged and appended to result.errors; pipeline continues |
COLLECT |
Same as LENIENT; errors also collected on the context for later inspection |
Setting the global mode¶
from pycharter.shared.errors import set_error_mode, ErrorMode
set_error_mode(ErrorMode.LENIENT)
pipeline = Pipeline.from_config_dir("pipelines/users/")
result = await pipeline.run()
if not result.success:
for err in result.errors:
print(f" {err}")
Per-run error context¶
Pass a specific ErrorContext to run() without changing the global default:
from pycharter.shared.errors import ErrorContext, ErrorMode
ctx = ErrorContext(mode=ErrorMode.LENIENT)
result = await pipeline.run(error_context=ctx)
Running multiple pipelines concurrently¶
Use asyncio.gather() to run independent pipelines in parallel:
import asyncio
from pycharter import Pipeline
async def main():
pipelines = [
Pipeline.from_config_dir("pipelines/orders/"),
Pipeline.from_config_dir("pipelines/products/"),
Pipeline.from_config_dir("pipelines/users/"),
]
results = await asyncio.gather(*[p.run() for p in pipelines])
for r in results:
print(f"loaded {r.rows_loaded} rows — success={r.success}")
asyncio.run(main())
Shared database connections
If multiple pipelines write to the same database table, ensure your loader is configured for the correct write mode (upsert, insert, etc.) to avoid conflicts.
Resource management¶
Connections (HTTP sessions, database connections) are managed inside each extractor and loader and are released when the pipeline run completes. There is no async with context manager on Pipeline itself.
For cleanup across multiple runs in a long-lived process, wrap each pipeline.run() call in a try/finally:
async def safe_run(pipeline):
try:
return await pipeline.run()
except Exception as exc:
# log, alert, etc.
raise
finally:
# any custom cleanup
pass
See also¶
- Building ETL Pipelines
- Streaming and Messaging — async streaming extractors
- Testing Pipelines — run pipelines with mock data in tests
- ETL Transformations