Metadata-Version: 2.4
Name: sandai-operator-client
Version: 0.4.6
Summary: Client library for the Sandai Operator SDK
Home-page: https://github.com/world-sim-dev/sandai-data-project
Author: Sandai Team
Author-email: dev@sand.ai
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: celery>=5.2.0
Requires-Dist: redis>=4.0.0
Requires-Dist: kombu>=5.2.0
Requires-Dist: msgpack>=1.0.0
Provides-Extra: dev
Requires-Dist: black>=24.0.0; extra == "dev"
Requires-Dist: flake8>=7.0.0; extra == "dev"
Requires-Dist: isort>=5.13.0; extra == "dev"
Requires-Dist: mypy>=1.10.0; extra == "dev"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# Sandai Operator Client

Lightweight Python client for submitting tasks to a Sandai operator over Celery.

## Installation

```bash
pip install sandai-operator-client
```

For local development:

```bash
pip install -e ".[dev]"
```

## Quick Start

```python
from sandai.operator_client import create_client

client = create_client("video-clipper", "v1")

artifacts, results = client.sync(
    input_artifacts=[{"uri": "file:///tmp/input.mp4", "slot": "video"}],
    options={"clip_duration": 10},
)
```

Async usage:

```python
import asyncio

from sandai.operator_client import create_async_client


async def main() -> None:
    async with create_async_client("video-clipper", "v1") as client:
        artifacts, results = await client.sync(
            input_artifacts=[{"uri": "file:///tmp/input.mp4", "slot": "video"}],
            options={"clip_duration": 10},
        )
        print(artifacts, results)


asyncio.run(main())
```

## Core API

- Sync client: `create_client(operator_name, operator_version, **kwargs)` returns `OperatorClient`.
- Async client: `create_async_client(operator_name, operator_version, **kwargs)` returns `AsyncOperatorClient`.
- Sync submit: `client.async_call(...)` returns Celery `AsyncResult`.
- Async submit: `await async_client.async_call(...)` returns `AsyncTaskHandle`.
- Sync request/response: `client.sync(...)` returns `(artifacts, results)`.
- Async request/response: `await async_client.sync(...)` returns `(artifacts, results)`.
- Sync description: `client.desc` is a cached property.
- Async description: `await async_client.desc()` is an async cached method.
- Sync legacy batch APIs: `batch_async(...)` and `batch_sync(...)` keep the existing best-effort contract.
- Async legacy-shaped batch APIs: `await async_client.batch_async(...)` and `await async_client.batch_sync(...)` keep the same result shapes in async call sites.
- Structured batch APIs: `batch_submit(...)`, `batch_collect(...)`, and `batch_sync_detailed(...)` are available on both clients.
- Health APIs: `probe()` / `ping()` exist on both clients, with `await` required on the async client.
- `get_queue_info()` returns the queue names used for each priority.

## Task Controls

The client supports the same task-control fields used by the operator runtime:

- `timeout`: converted into an absolute `deadline` before sending the task.
- `cancel_key`: propagated to the operator so tasks can be skipped before compute starts.
- `persistent_output`: stored under `meta["persistent-output"]` for runtimes that preserve uploaded outputs.

## Batch APIs

The package now exposes two batch styles:

- Legacy compatibility APIs: `batch_async(...)` and `batch_sync(...)` keep the previous best-effort semantics.
- Structured APIs: `batch_submit(...)`, `batch_collect(...)`, and `batch_sync_detailed(...)` expose explicit per-task submission/execution status instead of collapsing failures into empty results.

Async batch methods follow the same contract, but are awaited:

- `await async_client.batch_submit(...)` returns `AsyncSubmittedTask` objects.
- `await async_client.batch_async(...)` returns `AsyncTaskHandle | None` per task.
- `await async_client.batch_collect(...)` returns `BatchTaskResult` objects.
- `await async_client.batch_sync_detailed(...)` returns `BatchTaskResult` objects.
- `await async_client.batch_sync(...)` returns legacy `(artifacts, results)` tuples.

Prefer the structured APIs for new integrations.

## Async API

`AsyncOperatorClient` is an additive API layer. It does not remove or replace `OperatorClient`.

`AsyncTaskHandle` exposes:

- `task_id` / `id`
- `await handle.get(timeout=...)`
- `await handle.ready()`
- `await handle.state()`
- `await handle.forget()`

Implementation notes:

- Task submission remains compatible with the current Celery-based operator side.
- When `result_backend` is Redis, the async client uses `redis.asyncio` plus a shared polling hub for async result waiting.
- The async client is intended for async application integration and lower blocking overhead on result collection.
- The sync client remains the compatibility/default path for existing callers.

## Health Checks

`ping()` and `probe()` are intentionally different:

- `ping()` returns `True` when task submission succeeds.
- `probe()` returns a structured `HealthCheckResult` with `submit_ok`, `retrieval_ok`, `response_ok`, and optional error details.

Use `probe()` when you need a meaningful health signal.

## Priority Model

Supported priorities are:

- `high`
- `normal`
- `low`
- `very_low`

Queue names follow this pattern:

```text
<priority>.<operator_name>.<operator_version>
```

## Environment Variables

The client reads these settings when explicit connection arguments are not provided:

- `SANDAI_OPERATOR_CELERY_BROKER_URL`
- `SANDAI_OPERATOR_CELERY_RESULT_BACKEND`
- `SANDAI_OPERATOR_CELERY_TASK_SERIALIZER`
- `SANDAI_OPERATOR_CELERY_RESULT_SERIALIZER`
- `SANDAI_OPERATOR_AUTO_FORGET_RESULT`

If broker/backend are not configured, the client falls back to local Redis defaults:

```text
redis://localhost:6379/0
```

By default the client now submits Celery task payloads with `msgpack`, keeps Celery results on `json`, and advertises both `json` and `msgpack` in `accept_content` so it can interoperate with operators during rollout.

## Error Model

`sync()` and `desc` normalize failures into typed exceptions where possible:

- `TaskTimeoutError`
- `TaskExecutionError`
- `TaskDeadlineExceededError`
- `TaskCancelledError`
- `InvalidTaskStatusError`
- `InvalidResultFormatError`
- `InvalidPriorityError`

Structured batch APIs surface task errors directly on each result object instead of converting them into empty legacy tuples.

## Testing

Run the client regression suite from the repository root:

```bash
python operator-client/tests/run_all_tests.py
```

The suite is mock-based and does not require a live Celery or Redis deployment.
