Metadata-Version: 2.1
Name: stirfried
Version: 0.7.0
Summary: Socket.IO server to schedule Celery tasks from clients in real-time.
License: MIT
Author: Korijn van Golen
Author-email: korijn@gmail.com
Requires-Python: >=3.6,<4.0
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Provides-Extra: msgpack
Provides-Extra: server
Requires-Dist: aioredis (>=1.3.1,<2.0.0); extra == "server"
Requires-Dist: celery (>=4.4.0,<5.0.0)
Requires-Dist: msgpack-python (>=0.5.6,<0.6.0); extra == "msgpack"
Requires-Dist: python-socketio (>=4.4.0,<5.0.0)
Requires-Dist: redis (>=3.4.1,<4.0.0)
Requires-Dist: starlette (>=0.13.2,<0.14.0); extra == "server"
Requires-Dist: uvicorn (>=0.11.3,<0.12.0); extra == "server"
Description-Content-Type: text/markdown

[![PyPI version](https://badge.fury.io/py/stirfried.svg)](https://badge.fury.io/py/stirfried)
[![Docker Image Version (latest semver)](https://img.shields.io/docker/v/korijn/stirfried?label=docker%20image)](https://hub.docker.com/r/korijn/stirfried)

# Stirfried 🥡

Stirfried is an ASGI HTTP/Socket.IO server that provides both browser-based and regular clients with real-time control over Celery tasks.

Tasks are scheduled by name, meaning the server won't necessarily need an update when changes are made to the workers and tasks available to it.

Stirfried implements a simple-to-scale, three layered architecture: clients, servers and workers. Any layer can be scaled out by adding more instances.

Stirfried provides Socket.IO and HTTP APIs with three core functions:

* Schedule a task
* Revoke a task
* Query task info

Want to see Stirfried in action before digging through the README? Try running the [example](#example-code).

Built on:

* [starlette](https://www.starlette.io/) & [uvicorn](https://www.uvicorn.org/)
* [python-socketio](https://github.com/miguelgrinberg/python-socketio)
* [celery](http://www.celeryproject.org/)

## Workers

Install Stirfried in your Celery workers via pip/pipenv/poetry:

`pip install stirfried`

Import the `StirfriedTask`:

```python
from stirfried.celery import StirfriedTask
```

Configure the base class globally:

```python
app = Celery(..., task_cls=StirfriedTask)
```

...or per task:

```python
@app.task(base=StirfriedTask)
def add(x, y, room=None):
    return x + y
```

## Servers

The server can be run by running the `korijn/stirfried` Docker container and
exposing port 8000, or alternatively by cloning this repo, installing the dependencies
with poetry and starting the `uvicorn` server as demonstrated in the example code.

You can configure both the servers and workers with a `settings.py` file, via the standard Celery configuration mechanism. In the docker deployment scenario, you can mount the settings file to the path `/app/settings.py`.

## Clients

Clients can connect to the Socket.IO API using standard [Socket.IO](https://socket.io/) libraries, and to the HTTP API using plain `window.fetch`.

## Task object schema

Tasks are scheduled by submitting the following task object to either of the APIs:

```javascript
{
    "task_name": "",  // (required) task name
    "args": [],       // (optional) task arguments
    "kwargs": {},     // (optional) task keyword arguments
    "room": "",       // (optional) custom room override, only processed if
                      //            `custom_rooms` is enabled
                      //            NOTE: can also be used to disable server events
                      //                  for this task by passing the sentinel room
                      //                  ("NO_EMIT" by default)
    "chain": []       // (optional) array of task objects to chain onto the main task
                      //            task objects use the same schema, except for
                      //            the `chain` property which cannot be nested further
                      //            NOTE: chained tasks are applied in reverse order
}
```

## Socket.IO API

Events are described in the following format: `name(args[, optional]) -> callback_args`

Clients can **emit** any of the following events that servers are listening for:

| Event | Description |
| ----- | ----------- |
| `send_task({task_name[, args][, kwargs][, room][, chain]}) -> {status, data}` | Schedule a task. Use a callback to receive the reply in the client. `status` indicates if scheduling succeeded and `data` contains the task id or error message in case of failure. The client can use the task id as reference when processing subsequent server-emitted events. Reference the [task object schema](#task-object-schema) for more details. |
| `revoke_task(task_id)` | Revoke a task. Will not fail if the task does not exist, and won't do anything if the task is already running. |
| `task_info(task_id) -> {id, state, result}` | Query task info. Use a callback to receive the reply in the client. Only works if a Celery result backend is configured. |

Clients can **listen** for the following server-emitted events, which directly hook into the Celery Task class callbacks, except for `on_progress` which is a Stirfried addition and may be implemented by tasks to support progress events:

| Event | Description |
| ----- | ----------- |
| `on_progress({current, total, info, task_id, task_name})` | Emitted on task progress updates. This event will only be emitted if tasks call `emit_progress`. |
| `on_retry({task_id, task_name[, einfo]})` | Emitted automatically on task retries. `einfo` is only included if `stirfried_error_info=True`. |
| `on_failure({task_id, task_name[, einfo]})` | Emitted automatically on task failure. `einfo` is only included if `stirfried_error_info=True`. |
| `on_success({retval, task_id, task_name})` | Emitted automatically on task success. |
| `on_return({status, retval, task_id, task_name})` | Emitted automatically on task success and failure. |

## HTTP API

| Endpoint | Description |
| -------- | ----------- |
| `POST /task` | Schedule a task. Submit the Task object as JSON the body of the request. Reference the [task object schema](#task-object-schema) for more details. |
| `DELETE /task/{id}` | Revoke a task. Will not fail if the task does not exist, and won't do anything if the task is already running. |
| `GET /task/{id}` | Query task info. Only works if a Celery result backend is configured. |

## Settings

You can configure Celery, Socket.IO and Stirfried all from the same `settings.py` file. Stirfried settings are prefixed with `stirfried_`, Socket.IO settings are prefixed with `socketio_`, and Celery settings are used as-is (not prefixed).

Socket.IO server settings are passed on directly (but without the prefix) to the `AsyncServer` constructor of the python-socketio library, see their [documentation](https://python-socketio.readthedocs.io/en/latest/api.html#asyncserver-class) for the options that are available. See the Celery [documentation](https://docs.celeryproject.org/en/stable/userguide/configuration.html) for the options there.

The following options are additionally available for configuring Stirfried **servers and workers**:

| Key | Type | Default | Description |
| --- | ---- | ------- | ----------- |
| `stirfried_enable_http` | `bool` | `True` | Set to `False` to disable the HTTP API. |
| `stirfried_enable_socketio` | `bool` | `True` | Set to `False` to disable the Socket.IO API. |
| `stirfried_enable_task_info` | `bool` | `True` | Set to `False` to disable the `task_info` event and `GET /task/{id}` endpoint. |
| `stirfried_enable_revoke_task` | `bool` | `True` | Set to `False` to disable the `revoke_task` event and `DELETE /task/{id}` endpoint. |
| `stirfried_redis_url` | `str` | `""` | Connection string for the Socket.IO API server-to-server communication over Redis pubsub. Required if you want workers to be able to emit events. |
| `stirfried_available_tasks` | `List[str]` | `[]` | If non-empty, `send_task` and `POST /task` will fail if a task name is not contained in the list. |
| `stirfried_error_info` | `bool` | `False` | Set to `True` to include error messages and tracebacks in events, event callbacks and HTTP responses. |
| `stirfried_sentinel_room` | `str` | `"NO_EMIT"` | A magic string value that can be passed to the `room` argument to prevent workers from emitting events for a task. This is the default `room` value for task sent to the HTTP API since there is no Socket.IO client. |
| `stirfried_custom_rooms` | `bool` | `False` | Set to `True` to allow clients to override the default `room` for server-emitted events. |
| `stirfried_header_task_map` | `Dict[str, Dict[str, str]` | `{}` | Configure to map headers to keyword arguments for specific tasks. For example, `{"send_email": {"Date": "date"}}`, would cause the `Date` header's value to be injected into the keyword argument `date` whenever a `send_email` task is scheduled. This can be used in concert with Socket.IO's `extraHeaders` feature to implement authorization and validation.

## Rooms

For Socket.IO API scheduled tasks, server-emitted events are sent to the client that scheduled the task by default. For HTTP API scheduled tasks, server-emitted events are not emitted by default. The server accomplishes this by injecting a value into
the `room` keyword argument of Stirfried Celery tasks.

The `StirfriedTask` base class depends on the presence of this keyword argument.

This means you are required to add the keyword argument `room=None` to your
task definitions in order to receive it.

If `custom_rooms` is enabled, clients can override the value by sending along a custom `room` value (though not via the keyword arguments of the task, see the API schema documentation).

## Progress events

You can emit progress events from workers by calling `self.emit_progress(current, total, info=None)` in a task.

You can use the optional `info` keyword argument to send along arbitrary metadata, such as a progress message or early results.

Note that you are required to pass `bind=True` to the `celery.task` decorator
in order to get access to the `self` instance variable.

```python
@celery.task(bind=True)
def add(self, x, y, room=None):
    s = x
    self.emit_progress(50, 100)  # 50%
    s += y
    return s
```

## Binary/big data

Socket.IO clients, servers and Celery workers support the `msgpack`
transport, allowing you to use binary data directly (without needing to manually convert
to and from base64 encoded strings and suffering the according performance penalty).

You should also be aware of [limitations in Redis](http://redis.io/topics/clients#output-buffers-limits)
on client output buffers. This means that you cannot emit events greater than a certain
value (32mb by default). You can override this setting in various ways, here's how to do
it via the Redis server CLI:

```bash
# client-output-buffer-limit <class> <hard limit> <soft limit> <soft seconds>
redis-server --client-output-buffer-limit pubsub 256mb 128mb 30
```

## Testing

When unit testing a Stirfried Celery worker, the recommended approach is to disable the
Redis connection by simply leaving out `stirfried_redis_url` from your settings, and to
directly call the task functions in unit tests. The lack of a Redis connection will
short-circuit any events that would normally be emitted. This setup will allow you to
treat tasks as regular functions and perform unit testing as usual.

Optionally, you can patch/mock any calls to `self.emit_progress` using standard Python
testing utilities to test those too.

## Example code

The repo includes an example demonstrating all of the functionality provided by Stirfried.

You can run the example as follows:

* Clone the repository
* `cd` into the `example` directory
* Run `docker-compose build`
* Then `docker-compose up`
* Open your browser and go to `http://localhost:8080/`
* You should see the following interface and are ready to give Stirfried a try:

![Stirfried 🥡 test client](https://user-images.githubusercontent.com/1882046/76843175-b2c78200-683b-11ea-92df-b2169a7ce9ce.png)

