Metadata-Version: 2.4
Name: django-vtasks
Version: 0.1.1
Summary: A very fast valkey/postgres django tasks backend.
Project-URL: Source, https://gitlab.com/glitchtip/django-vtasks
Project-URL: Tracker, https://gitlab.com/glitchtip/django-vtasks/-/issues
License-Expression: MIT
License-File: LICENSE
Keywords: background,django,postgres,tasks,valkey
Classifier: Framework :: Django
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Requires-Python: >=3.12
Requires-Dist: croniter>=2.0.4
Requires-Dist: django==6.0rc1
Requires-Dist: orjson>=3.11.4
Requires-Dist: zstandard>=0.15.0; python_version < '3.14'
Provides-Extra: dev
Requires-Dist: celery; extra == 'dev'
Requires-Dist: dj-database-url; extra == 'dev'
Requires-Dist: django-redis; extra == 'dev'
Requires-Dist: django-valkey; extra == 'dev'
Requires-Dist: granian[reload,uvloop]>=2.6.0; extra == 'dev'
Requires-Dist: hiredis; extra == 'dev'
Requires-Dist: memray>=1.19.1; extra == 'dev'
Requires-Dist: mypy; extra == 'dev'
Requires-Dist: psutil; extra == 'dev'
Requires-Dist: redis; extra == 'dev'
Requires-Dist: rich; extra == 'dev'
Requires-Dist: ruff; extra == 'dev'
Provides-Extra: postgres
Requires-Dist: psycopg; extra == 'postgres'
Provides-Extra: valkey
Requires-Dist: valkey[libvalkey]>=6.1.1; extra == 'valkey'
Description-Content-Type: text/markdown

# django-vtasks

**Valkey Tasks. Very Fast Tasks.**

From the team at [GlitchTip](https://glitchtip.com), `django-vtasks` is a lightweight, async-first task queue for Django 6.0+, designed to bridge the gap between simplicity (Postgres-backed) and raw performance (Valkey-backed).

**Status**: Alpha quality. Don't even think about using in production.

## When to use django-vtasks

Adopting `django-vtasks` is a good choice when:

*   You are using Django 6.0+ and want to leverage modern `asyncio`.
*   You want a "Hybrid" strategy: Start with Postgres (Less-Infra) and switch to Valkey (High-Throughput) without rewriting task code.
*   You care about efficiency: You want to handle thousands of concurrent I/O-bound tasks in a single process/container.

It might **not** be the ideal option if:

*   You need complex workflow primitives (Chains, Chords, Groups) -> Use [Celery](https://docs.celeryq.dev/en/stable/).
*   You need to store Task Results (this library is strictly Fire-and-Forget).
*   You rely on brokers other than Valkey or Postgres (e.g., SQS, RabbitMQ).

## Features

*   **Async First:** Native `asyncio` worker for high-performance I/O.
*   **Sync Compatibility:** `enqueue()` works in standard Django views and sync code, with `transaction.on_commit` safety for database-backed tasks.
*   **Dual Backends:**
    *   **Postgres:** Simple, zero-infrastructure setup using `SKIP LOCKED`.
    *   **Valkey:** High-throughput, low-latency using atomic `BLMOVE`.
*   **Batch Processing:** Optional, explicit batching for high-throughput queues.
*   **Compression:** Automatic [Zstandard](https://github.com/indygreg/python-zstandard) compression for large payloads (>1KB).
*   **Reliability:** Fail-fast DLQ with capped history for failed tasks.
*   **Lightweight:** Minimal dependencies and a simple, modern codebase.
*   **Admin** View in-progress and failed tasks via Django admin interface, even for valkey backend.

## Requirements

*   Python 3.10+
*   Django 6.0+
*   Valkey backend requires Valkey 7+ (Redis 7+ is likely to work)
*   Postgres backend requires 12+

## Installation

```bash
# For Postgres backend
pip install "django-vtasks[postgres]"

# For Valkey backend
pip install "django-vtasks[valkey]"

# For both
pip install "django-vtasks[postgres,valkey]"
```
`orjson` and `zstandard` are installed as core dependencies.

## Configuration

### Global Settings

These settings apply to all backends.

-   `VTASKS_VALKEY_PREFIX`: A string prefix for all Valkey keys, to provide namespace isolation. Defaults to `"vt"`. If set, it is automatically normalized to end with a colon (e.g., `"myapp"` becomes `"myapp:"`).

### Postgres Backend

```python
# settings.py

INSTALLED_APPS = [
    # ...
    "django_vtasks",
    "django_vtasks.postgres", # Required for the Postgres backend
]

TASKS = {
    "default": {
        "BACKEND": "django_vtasks.backends.postgres.PostgresTaskBackend",
    }
}

# Ensure your DATABASE_URL is configured for Django
```

### Valkey Backend (Standard)

This is the simplest way to configure the Valkey backend.

```python
# settings.py

INSTALLED_APPS = [
    # ...
    "django_vtasks",
]

TASKS = {
    "default": {
        "BACKEND": "django_vtasks.backends.valkey.ValkeyTaskBackend",
        "OPTIONS": {
            "BROKER_URL": "valkey://localhost:6379/0",
        }
    }
}
```

### Valkey Backend (Advanced: Shared Connection Pool)

For applications that already use `valkey-py`, you can share an existing `valkey.asyncio.ConnectionPool` to minimize TCP connections.

When using a shared pool, you **must still provide a `BROKER_URL`**. This is required for synchronous operations like `task.enqueue()` which cannot use an asynchronous connection pool.

```python
# settings.py
import valkey.asyncio as valkey

# Create a shared connection pool
MY_APP_VALKEY_POOL = valkey.ConnectionPool.from_url("valkey://localhost:6379/0")


INSTALLED_APPS = [
    # ...
    "django_vtasks",
]

TASKS = {
    "default": {
        "BACKEND": "django_vtasks.backends.valkey.ValkeyTaskBackend",
        "OPTIONS": {
            # Pass the shared async pool
            "CONNECTION_POOL": MY_APP_VALKEY_POOL,
            # BROKER_URL is still required for sync `enqueue`
            "BROKER_URL": "valkey://localhost:6379/0",
        }
    }
}
```

## Usage

### Defining Tasks
Create a `tasks.py` in your Django app.

```python
# my_app/tasks.py
from django.tasks import task
from my_app.models import User

@task
def send_welcome_email(user_id):
    user = User.objects.get(id=user_id)
    # Your email logic here
    print(f"Sent welcome email to {user.email}")
```

### Enqueueing Tasks

The library bridges sync and async contexts automatically.

```python
# In a sync view
from django.http import HttpResponse
from .tasks import send_welcome_email

def register_sync(request):
    user = User.objects.create(...)
    send_welcome_email.enqueue(user.id) # Safe to use in sync code
    return HttpResponse("User created.")

# In an async view
async def register_async(request):
    user = await User.objects.acreate(...)
    await send_welcome_email.aenqueue(user.id)
    return HttpResponse("User created.")
```

### Bulk Enqueueing

For high-performance scenarios where you need to dispatch a large number of tasks at once, `enqueue_many` and `aenqueue_many` provide an optimized way to send tasks in a single network request per queue.

This is significantly more performant than enqueueing tasks in a loop. The implementation uses Valkey's variadic `LPUSH` and Postgres's `bulk_create` for maximum efficiency.

The methods accept a list of tuples, where each tuple contains `(task_function, args, kwargs)`. The `queue_name` can be specified in the `kwargs` for each task.

```python
# In an async view
from .tasks import process_user, cleanup_job
from django.tasks import task_backends

async def dispatch_many_tasks(request):
    tasks_to_send = [
        # (task, args, kwargs)
        (process_user, (user1.id,), {}),
        (process_user, (user2.id,), {}),
        (cleanup_job, (), {"queue_name": "low_priority"}),
    ]
    
    # Use aenqueue_many in async contexts
    await task_backends["default"].aenqueue_many(tasks_to_send)

    # Use enqueue_many in sync contexts, usually within a transaction
    # from django.db import transaction
    # with transaction.atomic():
    #     task_backends["default"].enqueue_many(tasks_to_send)

    return HttpResponse("Dispatched many tasks.")
```
The tasks are automatically grouped by queue name (`default` and `low_priority` in this example) and sent in optimized batches.

### Periodic Tasks

`django-vtasks` supports periodic tasks using cron-style schedules.

```python
# settings.py
from django_vtasks.scheduler import crontab

VTASKS_SCHEDULE = {
    "daily_report": {
        "task": "myapp.tasks.report",
        "schedule": crontab(hour=5, minute=0),  # Runs at 5:00 AM
    },
    "cleanup": {
        "task": "myapp.tasks.cleanup",
        "schedule": 3600,  # Runs every hour
    },
}
```

To run the scheduler, use the `runworker` command with the `--scheduler` flag.

```bash
python manage.py runworker --scheduler
```

## Batch Processing

For high-throughput scenarios, you can enable native batching on a per-queue basis. This allows the worker to fetch multiple tasks at once and process them in a single function call.

### 1. Configure a Batch Queue

In your `settings.py`, define a batch queue using `VTASKS_BATCH_QUEUES`.

```python
# settings.py

VTASKS_BATCH_QUEUES = {
    "batch_queue": {
        "count": 100,  # Max number of tasks to fetch at once
        "timeout": 5.0,  # Max time to wait for tasks
        "task": "sample.tasks.process_widgets_batch",  # The function to process the batch
    }
}
```

### 2. Create a Batch Processing Task

The task function specified in the configuration must accept a single argument: a list of task dictionaries.

```python
# sample/tasks.py
from django.tasks import task

@task
def process_widgets_batch(tasks: list):
    """Processes a batch of widgets."""
    print(f"Processing a batch of {len(tasks)} widgets.")
    widget_ids = [task["kwargs"]["widget_id"] for task in tasks]
    # Your batch processing logic here
    print(f"  - Processing widget IDs: {widget_ids}")

@task
def single_widget_task(widget_id: int):
    """A dummy task to be processed in a batch."""
    pass
```

### 3. Enqueue Tasks to the Batch Queue

Enqueue individual tasks to the batch queue as you normally would. The worker will automatically group them into batches.

```python
# Enqueueing multiple tasks to the batch_queue
for i in range(10):
    single_widget_task.enqueue(widget_id=i, queue_name="batch_queue")
```

The `batch_worker` will fetch up to `100` tasks from `batch_queue`, and then call `process_widgets_batch` with the list of these tasks.

## Deployment

### Standalone Worker

For traditional deployments, you can run one or more standalone worker processes. This is the most robust and scalable option.

The worker uses a bounded semaphore to handle concurrency safely.

```bash
# Run a worker with default settings
python manage.py runworker

# Run with specific concurrency and queues
python manage.py runworker --concurrency 100 --queue high_priority --queue default

# Run a worker for a batch queue
python manage.py runworker --queue=batch_queue
```

### Optimization

It's possible to reduce worker memory by removing unneeded INSTALLED_APPS from your worker. In settings.py:

```python
if os.environ.get("VTASKS_IS_WORKER") == "true":
    INSTALLED_APPS = prune_installed_apps(INSTALLED_APPS)
    ROOT_URLCONF = "django_vtasks.empty_urls"  # Omit if tasks require "reverse"
```

Ensure you set VTASKS_IS_WORKER to "true" in your environment variables.

**Scaling and Reliability:**

It is safe to run multiple standalone worker instances. Both the Postgres (`SKIP LOCKED`) and Valkey (`BLMOVE`) backends use atomic operations to prevent multiple workers from picking up the same task.

Each worker has a unique ID. If a worker process is terminated uncleanly, its in-process tasks will be abandoned. On startup, a worker will attempt to rescue any tasks that were previously abandoned *by a worker with the same ID*.

### Embedded Worker (All-in-One)

For simpler deployments or "light scaling" needs, you can run the worker inside your ASGI web server's event loop. This "All-in-One" setup is efficient and reduces the number of processes you need to manage.

This is achieved by wrapping your main Django ASGI application.

**1. Create an embedded ASGI entrypoint:**

```python
# sample/asgi_embedded.py
import os
from django.core.asgi import get_asgi_application
from django_vtasks.asgi import get_worker_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sample.settings")

# Get the standard Django ASGI application
django_asgi_app = get_asgi_application()

# Wrap it with the worker application
application = get_worker_application(django_asgi_app)
```

**2. Run with an ASGI Server (e.g., Granian):**

You can then run this application with any ASGI-compliant server that supports the `lifespan` protocol.

```bash
# Example with Granian
granian --interface asgi sample.asgi_embedded:application --host 0.0.0.0 --port 8000
```

**Scaling:**

You can run multiple instances of this "All-in-One" configuration. Each web server process will have its own embedded worker, and they will safely coordinate through the shared backend (Postgres or Valkey). This is a simple way to achieve horizontal scaling for both web and task processing.


## Technical Details

### Why not Celery?

Celery is powerful but heavy. At GlitchTip, we needed a simpler, `asyncio`-native task runner for our own Django backend, and `django-vtasks` is the result. It's opinionated: it only supports the two best options for modern Django (Valkey & Postgres) and strips away the rest to maximize `asyncio` throughput without the overhead of AMQP or legacy support.

A Celery web app typically requires 3 services (django, worker, beat) which can be a high burden for smaller/hobby level apps. VTasks can run embedded in one asyncio loop (web, worker, scheduler) or as separate services for high throughput.

Celery is awkward to use in async views, requiring an inefficient sync_to_async wrapper.

### Reliability (Valkey)

When using Valkey, `django-vtasks` implements the Reliable Queue Pattern:

1.  Worker waits for a task.
2.  Task is atomically moved from `q:default` to `processing:<worker_id>` via `BLMOVE`.
3.  Task is processed and then acknowledged (removed from the processing list).

If a worker crashes hard (e.g., OOM kill, power failure), the task remains in its `processing:` list. On the next startup, the same worker (or a new one with the same ID logic) can rescue the task and move it back to the main queue.

## Queue Management

### Clearing Queues

For debugging, maintenance, or emergency situations, you can clear tasks from queues using the `clear_queue` management command.

```bash
# Clear a specific queue (with confirmation prompt)
python manage.py clear_queue --backend-alias=default --queue=default

# Clear a specific queue without confirmation
python manage.py clear_queue --backend-alias=default --queue=default --force

# Clear all queues for a backend
python manage.py clear_queue --backend-alias=default --all-queues --force

# Clear failed tasks (DLQ)
python manage.py clear_queue --backend-alias=default --failed --force

# Examples with different backends
python manage.py clear_queue --backend-alias=benchmark_postgres --force
python manage.py clear_queue --backend-alias=benchmark_valkey --force
```

## Benchmarking

`django-vtasks` is designed to be lighter and faster than Celery for I/O-bound workloads.

Benchmarks run on identical hardware (1000 tasks, concurrency 50):

### Speed (Operations Per Second)

| Metric | Celery (Prefork) | django-vtasks (Async) | Improvement |
| :--- | :--- | :--- | :--- |
| **Enqueue Rate** (API Speed) | 637 ops/s | **1,695 ops/s** | **2.6x Faster** |
| **Process Rate** (Worker Speed) | 166 ops/s | **250 ops/s** | **1.5x Faster** |

* **Enqueue Rate:** Measures how quickly your web views return after scheduling a task.
* **Process Rate:** Measures raw machinery overhead processing no-op tasks.

### Efficiency (Resource Usage)

| Metric | Celery (Standard) | django-vtasks | Savings |
| :--- | :--- | :--- | :--- |
| **Memory per Worker** | 72 MB | **48 MB** | **33% Less RAM** |
| **Concurrency Model** | Threads/Processes | `asyncio` | High Concurrency "for free" |

* **Memory:** Measured with a standard Django installation including common apps.
* **Scalability:** VTasks maintains stable throughput even at 500+ concurrent tasks per worker, whereas thread-based workers often degrade due to context switching.

### Running Performance Tests

django-vtasks includes comprehensive benchmarking tools to measure performance across different backends and task types.

```bash
# Basic benchmark
python manage.py benchmark_vtasks --count=1000 --concurrency=50

# Test specific backend and task type
python manage.py benchmark_vtasks \
    --backend-alias=benchmark_valkey \
    --task-type=sleep \
    --count=1000 \
    --concurrency=50

# Test with large payloads
python manage.py benchmark_vtasks \
    --task-type=noop \
    --payload-size=1024 \
    --count=500
```

### Benchmark Suite

Run the complete benchmark suite to compare Postgres vs Valkey performance:

```bash
python benchmarks/run_suite.py
```

This will run a matrix of tests covering:
- **NoOp tasks**: Measure raw serialization/transport overhead
- **Sleep tasks**: Measure I/O concurrency handling
- **Both backends**: Direct performance comparison

## Contributing

We welcome contributions! Please feel free to open an issue or submit a pull request.

### Development Environment

The recommended way to set up a development environment is using `docker compose` and `uv`.

1.  **Start the services:**
    ```bash
    docker compose up
    ```
    This will start the Postgres database, Valkey cache, and a development web server.

2.  **Run commands:**
    You can run commands inside the `web` container:
    ```bash
    docker compose run --rm web python manage.py <command>
    ```

### Running Tests

To run the test suite, use the following command after starting the services with `docker compose up`:

```bash
docker compose run --rm web python manage.py test --settings=tests.settings
```

### Local Development without Docker

If you prefer not to use Docker, you can use `uv` to create a virtual environment and install the dependencies.

1.  **Create a virtual environment:**
    ```bash
    uv venv
    ```

2.  **Activate the virtual environment:**
    ```bash
    source .venv/bin/activate
    ```

3.  **Install dependencies:**
    ```bash
    uv pip install -e ".[postgres,valkey,dev]"
    ```
