Metadata-Version: 2.1
Name: stateful_pool
Version: 0.1.0
Summary: A process pool that allows workers to maintain state across tasks.
Home-page: https://github.com/menxli/stateful_pool
Author: Li, Meng-Xun
Author-email: mengxunli@whu.edu.cn
Requires-Python: >=3.8,<4.0
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Project-URL: Repository, https://github.com/menxli/stateful_pool
Description-Content-Type: text/markdown


Default `concurrent.futures.ProcessPoolExecutor` makes it hard to maintain stateful workers (e.g., workers each with a model loaded in GPU memory).
This library provides a simple interface to create a pool of stateful workers that can execute tasks in parallel across multiple processes. 

```txt
+-----------------------+              +----------------------+
|     Main Process      |              |     Process Pool     |
| +--------+ +--------+ |              | +------------------+ |
| | Thread | | Thread | | -- Task ---> | | Worker (Process) | |
| |   1    | |   2    | |              | +------------------+ |
| +--------+ +--------+ |              | +------------------+ |
|      ...     ...      | <--- Res --- | | Worker (Process) | |
| +--------+ +--------+ |              | +------------------+ |
| | Thread | | Thread | |              |         ...          |
| |  N-1   | |   N    | |              | +------------------+ |
| +--------+ +--------+ |              | | Worker (Process) | |
|                       |              | +------------------+ |
+-----------------------+              +----------------------+
```

Following is a simple example of how to use the library to create a pool of workers each maintaining its own state (in this case, a GPU ID). 
A task is submitted to the pool, and the result is retrieved in a blocking manner. 
In practice, you would likely want to submit tasks from a separate thread to avoid blocking.

```python
from stateful_pool import SPool, SWorker
import time, random

# optioanl type annotations are:
# [spawn return, execute argument, execute return]
class SquareWorker(SWorker[str, int, str]):
    gpu_id: int

    def spawn(self, gpu_id):
        self.gpu_id = gpu_id
        return f"Worker initialized on GPU {gpu_id}"
    
    def execute(self, task):
        time.sleep(random.uniform(0.1, 1.0))
        return f"[Execute] Square of {task} is {task * task} (computed on GPU {self.gpu_id})"

if __name__ == "__main__":

    with SPool(SquareWorker, queue_size=100) as pool:
        s1 = pool.spawn(gpu_id=1)
        s2 = pool.spawn(gpu_id=2)
        print(f"{s1}, {s2}")

        res = pool.execute(100)
        print(res)
```
