Metadata-Version: 2.1
Name: stirfried
Version: 0.3.0
Summary: Socket.IO server to schedule Celery tasks from clients in real-time.
License: MIT
Author: Korijn van Golen
Author-email: korijn@gmail.com
Requires-Python: >=3.6,<4.0
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Requires-Dist: aioredis (>=1.3.1,<2.0.0)
Requires-Dist: celery (>=4.4.0,<5.0.0)
Requires-Dist: python-socketio (>=4.4.0,<5.0.0)
Requires-Dist: redis (>=3.4.1,<4.0.0)
Requires-Dist: uvicorn (>=0.11.3,<0.12.0)
Description-Content-Type: text/markdown

[![PyPI version](https://badge.fury.io/py/stirfried.svg)](https://badge.fury.io/py/stirfried)
[![Docker Image Version (latest semver)](https://img.shields.io/docker/v/korijn/stirfried?label=docker%20image)](https://hub.docker.com/r/korijn/stirfried)

# Stirfried 🥡

Socket.IO server to schedule Celery tasks from clients in real-time.

## Getting started

Stirfried has a three layered architecture:

1. [Socket.IO clients](#socketio-clients)
3. [Socket.IO server](#socketio-server)
2. [Celery workers](#celery-workers)

The design allows you to independently scale the number of servers when
server-client communication workload increases and the number of workers
when the task processing workload increases.

By leveraging Celery's task routing ([explained below](#task-routing)) you can
also divide workers into groups and scale groups independently.

### Socket.IO clients

Clients can connect using standard [Socket.IO](https://socket.io/) libraries.

The server is listening for clients to emit any of the following events:

| Event | Description |
| ----- | ----------- |
| `send_task({task_name, args, kwargs})` | Emit to schedule a task. |
| `revoke_task(task_id)` | Emit to cancel a task. |

Clients can subscribe to the following events emitted by the server:

| Event | Description |
| ----- | ----------- |
| `on_return({status, retval, task_id, task_name})` | Emitted on task success and failure. |
| `on_progress({current, total, task_id, task_name})` | Emitted on task progress updates. |
| `on_retry({task_id, task_name})` | Emitted on task retries. |

### Socket.IO server

For the Socket.IO server component you can pull the prebuilt docker image:

`docker pull korijn/stirfried`

or you can copy the project and customize it to your liking.

You are required to provide a `settings.py` file with the configuration
for the server.

It requires at a minimum:

* `socketio_redis` - Redis connection string for the [Socket.IO](https://github.com/miguelgrinberg/python-socketio) server.
* `broker_url` - Connection string for the [Celery](https://github.com/celery/celery/) [broker](http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html).

#### Task routing

The server sends tasks to the Celery broker
[by name](https://docs.celeryproject.org/en/latest/reference/celery.html#celery.Celery.send_task),
so it can act as a gateway to many different Celery workers with
different tasks. You can leverage Celery's
[task routing configuration](http://docs.celeryproject.org/en/latest/userguide/routing.html)
for this purpose.

#### Example

Let's say you have two workers, one listening on the `feeds` queue and
another on the `web` queue. This is how you would configure the 
server accordingly with `settings.py`:

```python
socketio_redis = "redis://localhost:6379/0"
broker_url = "redis://localhost:6379/1"
task_routes = {
    "feed.tasks.*": {"queue": "feeds"},
    "web.tasks.*": {"queue": "web"},
}
```

You can then run the server as follows:

```bash
docker run --rm -ti -v `pwd`/settings.py:/app/settings.py:ro -p 8000:8000 stirfried
```

### Celery workers

You need to install Stirfried in your Celery workers via pip:

 `pip install stirfried`

In your Celery workers, import the `StirfriedTask`:

```python
from stirfried.celery import StirfriedTask
```

Configure `StirfriedTask` as the base class globally:

```python
app = Celery(..., task_cls=StirfriedTask)
```

...or per task:

```python
@app.task(base=StirfriedTask)
def add(x, y, room=None):
    return x + y
```

#### Rooms

The server injects the client's `sid` into the keyword argument `room`.

The `StirfriedTask` base class depends on the presence of this keyword argument.

This means you are required to add the keyword argument `room=None` to your
task definitions in order to receive it.

#### Progress

You can emit progress from tasks by calling `self.emit_progress(current, total)`.

Note that you are required to pass `bind=True` to the `celery.task` decorator
in order to get access to the `self` instance variable.

```python
@celery.task(bind=True)
def add(self, x, y, room=None):
    s = x
    self.emit_progress(50, 100)  # 50%
    s += y
    self.emit_progress(100, 100)  # 100%
    return s
```

