Metadata-Version: 2.1
Name: sca-rhythm
Version: 0.1.2
Summary: Create and manage workflows using Celery tasks
Author: Deepak Duggirala
Author-email: deepakduggi@gmail.com
Requires-Python: >=3.11,<4.0
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Requires-Dist: celery (>=5.2.7,<6.0.0)
Requires-Dist: pymongo (>=4.3.3,<5.0.0)
Description-Content-Type: text/markdown

# Rhythm
Create and manage workflows using Celery tasks.

### Prerequisites

Celery app should be configured with a mongo database backend.

### Create Tasks with `WorkflowTask` class

```python
import os
import time

from celery import Celery

from sca_rhythm import WorkflowTask

app = Celery("tasks")

@app.task(base=WorkflowTask, bind=True)
def task1(self, batch_id, **kwargs):
    print(f'task - {os.getpid()} 1 starts with {batch_id}')
    # do work
    time.sleep(1)

    # update progress to result backend
    # sets the task's state as "PROGRESS"
    self.update_progress({
        done: 2873,
        total: 100000
    })

    # do some more work
    return batch_id, {'return_obj': 'foo'}
```
#### :warning: Task Constraints :warning:
1. The task signature must contain `**kwargs` for the workflow orchestration to function.
2. The return type must be of list / tuple type and the first element of the return value is sent to the next task as its argument.

### Create Workflows with `Workflow` class

```python
from celery import Celery

from sca_rhythm import Workflow

steps = [
    {
        'name': 'inspect',
        'task': 'tasks.inspect'
    },
    {
        'name': 'archive',
        'task': 'tasks.archive'
    },
    {
        'name': 'stage',
        'task': 'tasks.stage'
    }
]

wf = Workflow(app, steps=steps, name='archive_batch')
wf.start('batch-id-test')
```

### Pause / Resume Workflows

```python
wf = Workflow(app, workflow_id='2f87decb-a431-472b-b26e-32c894993881')

wf.pause()

wf.resume()
```
