Incremental Extraction¶
Incremental extraction lets a pipeline fetch only the records that are new since the last run, rather than re-extracting the full dataset every time. PyCharter uses a watermark — a persisted high-water-mark value (timestamp, sequence ID, cursor, etc.) — to track where each pipeline left off.
How it works¶
Run N
├─ Load last watermark from state store (e.g. "2024-06-01T00:00:00")
├─ Pass watermark to extractor as a parameter
├─ Extract only records where updated_at > watermark
├─ Load records
└─ Save new watermark to state store (e.g. "2024-06-02T00:00:00")
Run N+1
├─ Load watermark: "2024-06-02T00:00:00"
└─ Extract only records since that timestamp
State persistence is handled by a StateStore. Two implementations ship with PyCharter:
| Store | Best for |
|---|---|
FileStateStore |
Single-process scripts, local development |
SqliteStateStore |
Scheduled jobs, lightweight production, multi-pipeline tracking |
Quick start¶
1 · Choose a state store¶
2 · Read and update the watermark¶
import asyncio
from pycharter import Pipeline
from pycharter.etl_generator.state import SqliteStateStore, PipelineState
PIPELINE = "orders"
async def run():
store = SqliteStateStore("./.state/state.db")
# Load last state (None on first run)
state = store.get(PIPELINE)
last_watermark = state.watermark if state else None
pipeline = Pipeline.from_config_files(
extract="pipelines/orders/extract.yaml",
transform="pipelines/orders/transform.yaml",
load="pipelines/orders/load.yaml",
variables={
# Inject watermark as a variable — use it in extract.yaml
"SINCE": last_watermark or "1970-01-01T00:00:00",
},
)
result = await pipeline.run()
if result.success:
# Persist the new watermark — use metadata from result or compute it
new_watermark = result.metadata.get("max_updated_at") or result.run_id
store.save(PipelineState(
pipeline_name=PIPELINE,
watermark=new_watermark,
last_run_id=result.run_id,
last_run_at=result.started_at,
run_count=(state.run_count if state else 0) + 1,
))
print(f"Loaded {result.rows_loaded} rows. Next watermark: {new_watermark}")
else:
print(f"Pipeline failed — watermark NOT updated")
for err in result.errors:
print(f" {err}")
asyncio.run(run())
3 · Use the watermark in extract.yaml¶
# extract.yaml (HTTP example)
source_type: http
base_url: "https://api.example.com"
api_endpoint: "/orders"
params:
updated_after: "${SINCE}"
page_size: 500
# extract.yaml (Database example)
source_type: database
connection: "${DATABASE_URL}"
query: |
SELECT * FROM orders
WHERE updated_at > '${SINCE}'
ORDER BY updated_at
State model¶
PipelineState is a frozen dataclass:
@dataclass(frozen=True, slots=True)
class PipelineState:
pipeline_name: str
watermark: str | None = None # High-water mark (ISO 8601, sequence ID, etc.)
last_run_id: str | None = None # UUID of the last successful run
last_run_at: str | None = None # ISO 8601 timestamp of last run
run_count: int = 0 # Total number of successful runs
metadata: dict[str, Any] = field(default_factory=dict) # Arbitrary extras
Serialize/deserialize with .to_dict() / PipelineState.from_dict(data).
Managing state manually¶
from pycharter.etl_generator.state import FileStateStore, PipelineState
store = FileStateStore()
# Inspect state
state = store.get("orders")
print(state.watermark, state.run_count)
# List all tracked pipelines
print(store.list_pipelines())
# Reset state (re-process from the beginning)
store.delete("orders")
# Backfill from a specific point
store.save(PipelineState(
pipeline_name="orders",
watermark="2024-01-01T00:00:00",
))
Creating a store from config¶
If you drive pipeline configuration from a YAML/dict config, use the factory helper:
from pycharter.etl_generator.state import create_state_store
store = create_state_store({
"backend": "sqlite", # "file" or "sqlite"
"path": "./.state/state.db",
})
Choosing a watermark strategy¶
| Strategy | When to use |
|---|---|
Timestamp (updated_at) |
Source has a reliable modification timestamp |
| Sequence / auto-increment ID | Source has a monotonically increasing integer key |
| Cursor / page token | Pagination API returns a cursor for the next page |
| Run UUID | No natural watermark — track which run loaded what |
Watermark reliability
Timestamps can be unreliable if clock skew exists between producer and consumer. Add a small overlap (updated_at >= watermark - 5 minutes) to avoid missing recently written records.
FileStateStore vs SqliteStateStore¶
| Attribute | FileStateStore | SqliteStateStore |
|---|---|---|
| Storage | JSON files | SQLite table |
| Concurrent access | Not safe (no file locking) | Safe (SQLite WAL mode) |
| Multiple pipelines | One file each | One table, one row each |
| Portability | Copy/move files | Copy SQLite file |
| Dependencies | None | sqlite3 (stdlib) |
| Best for | Local dev, single process | Scheduled jobs, multi-pipeline |
See also¶
- Building ETL Pipelines
- Async Execution Model
- Testing Pipelines — test incremental logic with
MockExtractor PipelineStateAPI reference