Validation Worker¶
Run validation jobs asynchronously in a separate process. The worker polls the database for pending jobs, claims them atomically, runs data quality validation, and updates job status—no message broker required.
Overview¶
- Database-backed queue: Jobs live in the
validation_jobstable. No Redis or other broker. - Multiple workers: You can run several worker processes; each job is claimed by exactly one worker.
- Same database: The API (or any process with DB access) enqueues jobs; workers use the same database URL.
When to use the worker¶
- Non-blocking validation: API returns immediately with a
job_id; clients poll for status. - Heavy or slow checks: Offload validation to background processes so the API stays responsive.
- Scaling: Add more worker processes to increase throughput.
Prerequisites¶
- Database configured (PostgreSQL or SQLite) with PyCharter schema applied (
pycharter db initorpycharter db upgrade). - The
validation_jobstable exists (created by migrations).
Running the worker¶
1. Set the database URL¶
The worker uses the same configuration as the rest of PyCharter. Set one of:
- Environment:
PYCHARTER_DATABASE_URLorPYCHARTER_WORKER_DB_URL - Config file:
pycharter.cfgor project config with a[database]section
Verify connectivity:
2. Start the worker process¶
CLI options:
| Option | Description | Default |
|---|---|---|
--concurrency |
Number of concurrent poller tasks | 5 |
--poll-interval |
Poll interval in milliseconds | 500 |
Examples:
# Fewer pollers, slower polling
pycharter worker start --concurrency 2 --poll-interval 1000
# More pollers for high throughput
pycharter worker start --concurrency 10
Stop the worker with Ctrl+C. It will finish in-flight jobs (up to the drain timeout) then exit.
Submitting jobs¶
From the API¶
If the API is running and the database is shared with the worker:
- Submit:
POST /api/v1/validation/jobswith JSON body (contract name/version, data source, options). - Status:
GET /api/v1/validation/jobs/{job_id}. - Cancel:
POST /api/v1/validation/jobs/{job_id}/cancel(if supported).
See the REST API or Swagger UI for the exact request/response shapes.
From Python¶
Use the worker package to enqueue jobs from any process that has database access:
from pycharter.worker import submit_job_sync
job_id = submit_job_sync(
contract_name="user_schema",
contract_version="1.0",
data_source="/path/to/data.json",
options={"record_violations": True}, # optional
)
print(f"Job submitted: {job_id}")
Async variant:
from pycharter.worker import submit_job, get_job
job_id = await submit_job(
contract_name="user_schema",
contract_version="1.0",
data_source="s3://bucket/data/users.parquet",
options={"include_profiling": True},
)
# Later: check status
status = await get_job(job_id)
Optional arguments for submit_job / submit_job_sync:
options: Dict passed to the validation run (e.g.record_violations,calculate_metrics).job_id: Custom job ID (otherwise auto-generated).idempotency_key: Deduplication key; duplicate submits return the existing job ID.max_attempts: Max processing attempts before the job is marked failed (default5).db_url: Override database URL (default: from config/env).
Architecture¶
flowchart LR
API[API / Client] --> DB[(validation_jobs)]
DB --> W1[Worker 1]
DB --> W2[Worker 2]
W1 --> DB
W2 --> DB
- Producer: API or any code calls
submit_job(or the REST endpoint), which inserts a row intovalidation_jobswith statuspending. - Worker: Each worker process runs several poller tasks. A poller selects a
pendingjob, claims it (atomic update toclaimedwithclaimed_byandclaimed_at), then runs validation via PyCharter’s quality check. - Result: On success, the worker marks the job
completedand stores the result. On failure, it marksfailedand may retry according tomax_attempts.
Configuration summary¶
| Source | Purpose |
|---|---|
PYCHARTER_DATABASE_URL |
Default database URL (worker and API). |
PYCHARTER_WORKER_DB_URL |
Override database URL for the worker only. |
--concurrency |
Number of poller tasks per process. |
--poll-interval |
Milliseconds between polls when no job is available. |
Internal defaults (in WorkerConfig): poll_interval_ms=500, concurrency=5, drain_timeout_s=30, max_attempts=5, claim_lease_timeout_s=300.
Data sources¶
Validation runs use the same data source support as QualityCheck: local files, S3-style URLs, or other backends your quality check is configured for. Pass the same data_source string you would use for a synchronous quality check.
Troubleshooting¶
Worker won’t start¶
- "No database URL": Set
PYCHARTER_DATABASE_URLorPYCHARTER_WORKER_DB_URL(or configure inpycharter.cfg). - Connection errors: Ensure the database is reachable and migrations are applied:
pycharter db current.
Jobs stay pending¶
- Confirm at least one worker is running (
pycharter worker start). - Check worker logs for errors (e.g. validation failures, missing contract, or data source issues).
- Verify the same database URL is used by the process that submits jobs and the worker.
Retries and failures¶
- Each job has
max_attempts(default 5). After that, the job is marked failed. - Inspect the job row or use
get_job(job_id)/ the REST endpoint to see error message and result.
See also¶
- Quality Check API — synchronous validation (same logic the worker runs).
- REST API — validation job endpoints.
- Production Deployment — running the worker in production.