Skip to content

Incremental Extraction

Incremental extraction lets a pipeline fetch only the records that are new since the last run, rather than re-extracting the full dataset every time. PyCharter uses a watermark — a persisted high-water-mark value (timestamp, sequence ID, cursor, etc.) — to track where each pipeline left off.

How it works

Run N
  ├─ Load last watermark from state store (e.g. "2024-06-01T00:00:00")
  ├─ Pass watermark to extractor as a parameter
  ├─ Extract only records where updated_at > watermark
  ├─ Load records
  └─ Save new watermark to state store (e.g. "2024-06-02T00:00:00")

Run N+1
  ├─ Load watermark: "2024-06-02T00:00:00"
  └─ Extract only records since that timestamp

State persistence is handled by a StateStore. Two implementations ship with PyCharter:

Store Best for
FileStateStore Single-process scripts, local development
SqliteStateStore Scheduled jobs, lightweight production, multi-pipeline tracking

Quick start

1 · Choose a state store

from pycharter.etl_generator.state import FileStateStore

store = FileStateStore(path="./.state")
# Creates .state/<pipeline_name>.json per pipeline
from pycharter.etl_generator.state import SqliteStateStore

store = SqliteStateStore(path="./.state/state.db")
# Creates a pipeline_state table in the SQLite file

2 · Read and update the watermark

import asyncio
from pycharter import Pipeline
from pycharter.etl_generator.state import SqliteStateStore, PipelineState

PIPELINE = "orders"

async def run():
    store = SqliteStateStore("./.state/state.db")

    # Load last state (None on first run)
    state = store.get(PIPELINE)
    last_watermark = state.watermark if state else None

    pipeline = Pipeline.from_config_files(
        extract="pipelines/orders/extract.yaml",
        transform="pipelines/orders/transform.yaml",
        load="pipelines/orders/load.yaml",
        variables={
            # Inject watermark as a variable — use it in extract.yaml
            "SINCE": last_watermark or "1970-01-01T00:00:00",
        },
    )

    result = await pipeline.run()

    if result.success:
        # Persist the new watermark — use metadata from result or compute it
        new_watermark = result.metadata.get("max_updated_at") or result.run_id
        store.save(PipelineState(
            pipeline_name=PIPELINE,
            watermark=new_watermark,
            last_run_id=result.run_id,
            last_run_at=result.started_at,
            run_count=(state.run_count if state else 0) + 1,
        ))
        print(f"Loaded {result.rows_loaded} rows. Next watermark: {new_watermark}")
    else:
        print(f"Pipeline failed — watermark NOT updated")
        for err in result.errors:
            print(f"  {err}")

asyncio.run(run())

3 · Use the watermark in extract.yaml

# extract.yaml (HTTP example)
source_type: http
base_url: "https://api.example.com"
api_endpoint: "/orders"
params:
  updated_after: "${SINCE}"
  page_size: 500
# extract.yaml (Database example)
source_type: database
connection: "${DATABASE_URL}"
query: |
  SELECT * FROM orders
  WHERE updated_at > '${SINCE}'
  ORDER BY updated_at

State model

PipelineState is a frozen dataclass:

@dataclass(frozen=True, slots=True)
class PipelineState:
    pipeline_name: str
    watermark: str | None = None    # High-water mark (ISO 8601, sequence ID, etc.)
    last_run_id: str | None = None  # UUID of the last successful run
    last_run_at: str | None = None  # ISO 8601 timestamp of last run
    run_count: int = 0              # Total number of successful runs
    metadata: dict[str, Any] = field(default_factory=dict)  # Arbitrary extras

Serialize/deserialize with .to_dict() / PipelineState.from_dict(data).


Managing state manually

from pycharter.etl_generator.state import FileStateStore, PipelineState

store = FileStateStore()

# Inspect state
state = store.get("orders")
print(state.watermark, state.run_count)

# List all tracked pipelines
print(store.list_pipelines())

# Reset state (re-process from the beginning)
store.delete("orders")

# Backfill from a specific point
store.save(PipelineState(
    pipeline_name="orders",
    watermark="2024-01-01T00:00:00",
))

Creating a store from config

If you drive pipeline configuration from a YAML/dict config, use the factory helper:

from pycharter.etl_generator.state import create_state_store

store = create_state_store({
    "backend": "sqlite",       # "file" or "sqlite"
    "path": "./.state/state.db",
})

Choosing a watermark strategy

Strategy When to use
Timestamp (updated_at) Source has a reliable modification timestamp
Sequence / auto-increment ID Source has a monotonically increasing integer key
Cursor / page token Pagination API returns a cursor for the next page
Run UUID No natural watermark — track which run loaded what

Watermark reliability

Timestamps can be unreliable if clock skew exists between producer and consumer. Add a small overlap (updated_at >= watermark - 5 minutes) to avoid missing recently written records.


FileStateStore vs SqliteStateStore

Attribute FileStateStore SqliteStateStore
Storage JSON files SQLite table
Concurrent access Not safe (no file locking) Safe (SQLite WAL mode)
Multiple pipelines One file each One table, one row each
Portability Copy/move files Copy SQLite file
Dependencies None sqlite3 (stdlib)
Best for Local dev, single process Scheduled jobs, multi-pipeline

See also