Metadata-Version: 2.3
Name: worker_dispatcher
Version: 1.0.2
Summary: A flexible task dispatcher for Python with multiple threading or processing control
Project-URL: Homepage, https://github.com/yidas/python-worker-dispatcher
Project-URL: Issues, https://github.com/yidas/python-worker-dispatcher/issues
Author-email: Nick Tsai <myintaer@gmail.com>
License-File: LICENSE
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Requires-Python: >=3.0
Description-Content-Type: text/markdown

<p align="center">
    <a href="https://www.python.org/psf-landing/" target="_blank">
        <img src="https://www.python.org/static/community_logos/python-logo.png" height="60px">
    </a>
    <h1 align="center">Python Worker Dispatcher</h1>
    <br>
</p>

A flexible task dispatcher for Python with multiple threading or processing control

[![PyPI](https://img.shields.io/pypi/v/worker-dispatcher)](https://pypi.org/project/worker-dispatcher/)
![](https://img.shields.io/pypi/implementation/worker-dispatcher)


Features
--------

- ***Tasks Dispatching** to managed worker*

- ***Elegant Interface** for setup and use*

---

OUTLINE
-------

- [Demonstration](#demonstration)
- [Introduction](#introduction)
- [Installation](#installation)
- [Usage](#usage)
    - [Option](#option)
        - [callbacks.process](#callbacksprocess)
        - [callbacks.task](#callbackstask)
    - [Other Methods](#other-methods)
        - [get_results()](#get_results)
        - [get_logs()](#get_logs)
        - [get_result_info()](#get_result_info)
        - [get_tps()](#get_tps)
    - [Scenarios](#scenarios)
        - [Stress Test](#stress-test)

---

DEMONSTRATION
-------------

Use 20 theads concurrently to dispatch tasks for HTTP reqeusts

```python
import worker_dispatcher
import requests

def each_task(id: int, config, task):
    response = requests.get(config['my_endpoint'] + task)
    return response

responses = worker_dispatcher.start({
    'task': {
        'list': ['ORD_AH001', 'ORD_KL502', '...' , 'ORD_GR393'],
        'callback': each_task,
        'config': {
            'my_endpoint': 'https://your.name/order-handler/'
        },
    },
    'worker': {
        'number': 20,
    }
})
```

Utilizes all CPU cores on the machine to compute tasks.

```python
import worker_dispatcher

def each_task(id: int, config=None, task=None):
    result = sum(id * i for i in range(10**9))
    return result

if __name__ == '__main__':
    results = worker_dispatcher.start({
        'task': {
            'list': 10,
            'callback': each_task,
        },
        'worker': {   
            'multiprocessing': True
        }
    })
```

---

INTRODUCTION
------------

This library helps to efficiently consume tasks by using multiple threading or processing and returns all results jointly.

![Introduction](https://www.plantuml.com/plantuml/proxy?cache=no&src=https://raw.githubusercontent.com/yidas/python-worker-dispatcher/main/img/introduction.planuml&v=1)

---

INSTALLATION
------------

To install the current release:

```shell
$ pip install worker-dispatcher
```

---

USAGE
-----

By calling the `start()` methid with the configuration parameter, the package will begin dispatching tasks while managing threading or processing based on the provided settings. Once the tasks are completed, the package will return all the results.

An example configuration setting with all options is as follows:

```python
import worker_dispatcher 

results = worker_dispatcher.start({
    'debug': False,
    'task': {
        'list': [],                     # Support list and integer. Integer represent the number of tasks to be generated.
        'callback': your_function_name_for_each_task,
        'config': {'your_config': 'your_value'},
        'result_callback': your_function_name_for_each_result,
    },
    'worker': {
        'number': 8,
        'per_second': 0,                # If greater than 0, the specified number of workers run forcefully at set intervals.
        'cumulative': False,            # Cumulative mode for per_second method.
        'multiprocessing': False
    }
})
```

### Options

|Option            |Type     |Deafult      |Description|
|:--               |:--      |:--          |:--        |
|debug             |bool     |False        |Debug mode |
|task.list         |multitype|list         |The tasks for dispatching to each worker. *<br>- List: Each value will be passed as a parameter to your callback function. <br>- Integer: The number of tasks to be generated.|
|[task.callback](#taskcallback)             |callable |(sample)          |The callback function called by each worker runs|
|task.config       |multitype|list         |The custom variable to be passed to the callback function|
|[task.result_callback](#taskresultcallback) |callable |Null          |The callback function called when each task processes the result|
|worker.number     |int      |(auto)       |The number of workers to fork. <br>(The default value is the number of local CPU cores)|
|worker.per_second |float    |0            |If greater than 0, the specified number of workers run forcefully at set intervals.|
|worker.cumulative |bool     |False        |Cumulative mode for per_second method.|
|worker.multiprocessing                     |boolean  |False        |Use multi-processing instead of the default multi-threading |
|verbose|bool      |bool     |True         |Enables or disables verbose mode for detailed output.|


#### task.callback

The callback function called by each worker runs

```python
callback_function (id: int=None, config=None, task=None)
```

|Argument          |Type     |Deafult      |Description|
|:--               |:--      |:--          |:--        |
|id                |int      |(auto)       |The sequence number generated by each task starting from 1|
|config            |multitype|{}           |The custom variable to be passed to the callback function|
|task              |multitype|(custom)     |Each value from the `task.list`|


#### task.result_callback

The callback function called when each task processes the result

```python
result_callback_function (id: int=None, config=None, result=None, log: dict=None)
```

|Argument          |Type     |Deafult      |Description|
|:--               |:--      |:--          |:--        |
|id                |int      |(auto)       |The sequence number generated by each task starting from 1|
|config            |multitype|{}           |The custom variable to be passed to the callback function|
|result            |multitype|(custom)     |Each value returned back from `task.callback`|
|log               |dict     |(auto)       |Reference: [get_logs()](#get_logs)|


### Other Methods

- #### get_results()
    Get all results in list type after completing `start()`

- #### get_logs()
    Get all logs in list type after completing `start()`

    Each log is of type dict, containing the results of every task processed by the worker:
    - task_id 
    - started_at 
    - ended_at 
    - duration 
    - result

- #### get_result_info()
    Get a dict with the whole spending time and started/ended timestamps after completing `start()`

- #### get_tps()
    Get TPS report in dict type after completing `start()` or by passing a list data.
  ```python
  def get_tps(logs: dict=None, debug: bool=False, peak_duration: float=0, peak_logs: bool=False) -> dict:
  ```
  The log dict matches the format of the [get_logs()](#get_logs) and refers to it by default. Each `result` of a log will be considered valid if it meets one of the following conditions:
  - It is a `requests.Response` object with a status code of 200
  - It is a valid value other than the aforementioned object

### Scenarios

#### Stress Test

Perform a stress test scenario with 10 requests per second.

```python
import worker_dispatcher, requests

def each_task(id, config, task):
    response = None
    try:
        response = requests.get(config['my_endpoint'], timeout=(5, 10))
    except requests.exceptions.RequestException as e:
        print("An error occurred:", e)
    return response

responses = worker_dispatcher.start({
    'task': {
        'list': 5000,
        'callback': each_task,
        'config': {
            'my_endpoint': 'https://your.name/api'
        },
    },
    'worker': {
        'number': 10,
        'per_second': 1
    }
})

print(worker_dispatcher.get_logs())
print(worker_dispatcher.get_tps())
```









