Skip to content

Validation Worker

Run validation jobs asynchronously in a separate process. The worker polls the database for pending jobs, claims them atomically, runs data quality validation, and updates job status—no message broker required.

Overview

  • Database-backed queue: Jobs live in the validation_jobs table. No Redis or other broker.
  • Multiple workers: You can run several worker processes; each job is claimed by exactly one worker.
  • Same database: The API (or any process with DB access) enqueues jobs; workers use the same database URL.

When to use the worker

  • Non-blocking validation: API returns immediately with a job_id; clients poll for status.
  • Heavy or slow checks: Offload validation to background processes so the API stays responsive.
  • Scaling: Add more worker processes to increase throughput.

Prerequisites

  • Database configured (PostgreSQL or SQLite) with PyCharter schema applied (pycharter db init or pycharter db upgrade).
  • The validation_jobs table exists (created by migrations).

Running the worker

1. Set the database URL

The worker uses the same configuration as the rest of PyCharter. Set one of:

  • Environment: PYCHARTER_DATABASE_URL or PYCHARTER_WORKER_DB_URL
  • Config file: pycharter.cfg or project config with a [database] section

Verify connectivity:

pycharter db current

2. Start the worker process

# Default: 5 concurrent pollers, 500 ms poll interval
pycharter worker start

CLI options:

Option Description Default
--concurrency Number of concurrent poller tasks 5
--poll-interval Poll interval in milliseconds 500

Examples:

# Fewer pollers, slower polling
pycharter worker start --concurrency 2 --poll-interval 1000

# More pollers for high throughput
pycharter worker start --concurrency 10

Stop the worker with Ctrl+C. It will finish in-flight jobs (up to the drain timeout) then exit.

Submitting jobs

From the API

If the API is running and the database is shared with the worker:

  • Submit: POST /api/v1/validation/jobs with JSON body (contract name/version, data source, options).
  • Status: GET /api/v1/validation/jobs/{job_id}.
  • Cancel: POST /api/v1/validation/jobs/{job_id}/cancel (if supported).

See the REST API or Swagger UI for the exact request/response shapes.

From Python

Use the worker package to enqueue jobs from any process that has database access:

from pycharter.worker import submit_job_sync

job_id = submit_job_sync(
    contract_name="user_schema",
    contract_version="1.0",
    data_source="/path/to/data.json",
    options={"record_violations": True},  # optional
)
print(f"Job submitted: {job_id}")

Async variant:

from pycharter.worker import submit_job, get_job

job_id = await submit_job(
    contract_name="user_schema",
    contract_version="1.0",
    data_source="s3://bucket/data/users.parquet",
    options={"include_profiling": True},
)

# Later: check status
status = await get_job(job_id)

Optional arguments for submit_job / submit_job_sync:

  • options: Dict passed to the validation run (e.g. record_violations, calculate_metrics).
  • job_id: Custom job ID (otherwise auto-generated).
  • idempotency_key: Deduplication key; duplicate submits return the existing job ID.
  • max_attempts: Max processing attempts before the job is marked failed (default 5).
  • db_url: Override database URL (default: from config/env).

Architecture

flowchart LR
    API[API / Client] --> DB[(validation_jobs)]
    DB --> W1[Worker 1]
    DB --> W2[Worker 2]
    W1 --> DB
    W2 --> DB
  1. Producer: API or any code calls submit_job (or the REST endpoint), which inserts a row into validation_jobs with status pending.
  2. Worker: Each worker process runs several poller tasks. A poller selects a pending job, claims it (atomic update to claimed with claimed_by and claimed_at), then runs validation via PyCharter’s quality check.
  3. Result: On success, the worker marks the job completed and stores the result. On failure, it marks failed and may retry according to max_attempts.

Configuration summary

Source Purpose
PYCHARTER_DATABASE_URL Default database URL (worker and API).
PYCHARTER_WORKER_DB_URL Override database URL for the worker only.
--concurrency Number of poller tasks per process.
--poll-interval Milliseconds between polls when no job is available.

Internal defaults (in WorkerConfig): poll_interval_ms=500, concurrency=5, drain_timeout_s=30, max_attempts=5, claim_lease_timeout_s=300.

Data sources

Validation runs use the same data source support as QualityCheck: local files, S3-style URLs, or other backends your quality check is configured for. Pass the same data_source string you would use for a synchronous quality check.

Troubleshooting

Worker won’t start

  • "No database URL": Set PYCHARTER_DATABASE_URL or PYCHARTER_WORKER_DB_URL (or configure in pycharter.cfg).
  • Connection errors: Ensure the database is reachable and migrations are applied: pycharter db current.

Jobs stay pending

  • Confirm at least one worker is running (pycharter worker start).
  • Check worker logs for errors (e.g. validation failures, missing contract, or data source issues).
  • Verify the same database URL is used by the process that submits jobs and the worker.

Retries and failures

  • Each job has max_attempts (default 5). After that, the job is marked failed.
  • Inspect the job row or use get_job(job_id) / the REST endpoint to see error message and result.

See also