Metadata-Version: 2.1
Name: faststan
Version: 0.20.0
Summary: Build data streaming pipelines using NATS or NATS streaming
Home-page: https://pypi.org/project/faststan/
License: MIT
Author: gcharbon
Author-email: guillaume.charbonnier@capgemini.com
Requires-Python: >=3.7,<4.0
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Requires-Dist: asyncio-nats-client (>=0.10.0,<0.11.0)
Requires-Dist: asyncio-nats-streaming (>=0.4.0,<0.5.0)
Requires-Dist: loguru (>=0.5.3,<0.6.0)
Requires-Dist: pydantic (>=1.6,<2.0)
Requires-Dist: returns (>=0.14.0,<0.15.0)
Requires-Dist: typer (>=0.3.2,<0.4.0)
Project-URL: Repository, https://gitlab.com/faststan/faststan.git
Description-Content-Type: text/markdown

# FastSTAN

<a href="https://gitlab.com/faststan/faststan/-/commits/next"><img alt="Pipeline status" src="https://gitlab.com/faststan/faststan/badges/next/pipeline.svg"></a>
<a href="https://gitlab.com/faststan/faststan/-/commits/next"><img alt="Coverage report" src="https://gitlab.com/faststan/faststan/badges/next/coverage.svg"></a>
<a href="https://python-poetry.org/docs/"><img alt="Packaging: poetry" src="https://img.shields.io/badge/packaging-poetry-blueviolet"></a>
<a href="https://flake8.pycqa.org/en/latest/"><img alt="Style: flake8" src="https://img.shields.io/badge/style-flake8-ff69b4"></a>
<a href="https://black.readthedocs.io/en/stable/"><img alt="Format: black" src="https://img.shields.io/badge/format-black-black"></a>
<a href="https://docs.pytest.org/en/stable/"><img alt="Packaging: pytest" src="https://img.shields.io/badge/tests-pytest-yellowgreen"></a>
<a href="https://pypi.org/project/faststan/"><img alt="PyPI" src="https://img.shields.io/pypi/v/faststan"></a>
<a href="https://faststan.gitlab.io/faststan/"><img alt="Documentation" src="https://img.shields.io/badge/docs-mkdocs-blue"></a>
<a href="https://opensource.org/licenses/MIT"><img alt="License: MIT" src="https://img.shields.io/badge/License-MIT-yellow.svg"></a>
[![Gitpod ready-to-code](https://img.shields.io/badge/Gitpod-ready--to--code-blue?logo=gitpod)](https://gitpod.io/#https://gitlab.com/faststan/faststan)

Easily deploy NATS and NATS Streaming subscribers using Python.

## Features

- Define subscribers using sync and async python functions
- Automatic data parsing and validation using type annotations and pydantic
- Support all subscription configuration available in stan.py and nats.py
- Start subscriptions or services from command line
- Publish messages from command line

## Quick start

- Install the package from pypi:

```bash
pip install faststan
```

### Using the command line

Create your first NATS subscriber:

- Create a file named `app.py` and write the following lines:

```python
from pydantic import BaseModel

class NewEvent(BaseModel):
    name: str
    datetime: int

def on_event(event: NewEvent):
    print(f"INFO :: Received new message: {event}")
```

- Start your subscriber:

```shell
nats sub start demo --function app:on_event
```

- Publish a message:

```shell
nats pub demo --name "John Doe" --datetime 1602661983
```

NATS Streaming behave the same way:

- Define your subscription:

```python
from pydantic import BaseModel


class Greetings(BaseModel):
    message: str

def on_event(event: NewEvent) -> Greetings:
    print(f"Info :: Received new request.")
    return Greetings(message=f"Welcome to {event.name}!"
```

- Start it using `stan sub start` command:

```shell
stan sub start demo --function app:on_event
```

- And publish message using `stan pub` command:

```shell
stan pub demo --name "John Doe"
```

### Using Python API

In this example, we will build a machine learning service that perform a prediction using an ONNX model. This service will be impletended using the [request/reply] pattern.

Before running the example, make sure you have the dependencies installed:

- `onnxruntime`
- `numpy`
- `httpx`

```python
import asyncio
from typing import List, Dict
from faststan.nats import FastNATS
from pydantic import BaseModel, validator

from httpx import AsyncClient
import numpy as np
import onnxruntime as rt


async def load_predictor(
    app: FastNATS,
    url: str = "https://s3-per-grenoble.ams3.digitaloceanspaces.com/models/rf_iris.onnx",
) -> None:
    """Load an ONNX model and return a predictor for this model."""

    async with AsyncClient() as http_client:
        http_response = await http_client.get(url)

    sess = rt.InferenceSession(http_response.content)

    input_name = sess.get_inputs()[0].name
    label_name = sess.get_outputs()[0].name
    proba_name = sess.get_outputs()[1].name

    def predict(data: np.ndarray):
        """Perform prediction for given data."""
        return sess.run([label_name, proba_name], {input_name: data})

    app.state["predictor"] = predict


class Event(BaseModel):
    """Incoming data expected by the predictor."""

    values: np.ndarray
    timestamp: int

    @validator("values", pre=True)
    def validate_array(cls, value):
        """Cast data to numpy array with float32 precision.

        A ValidationError will be raise if any error is raised in this function.
        """
        return np.array(value, dtype=np.float32)

    class Config:
        # This must be set to True in order to let pydantic handle numpy types
        arbitrary_types_allowed = True


class Result(BaseModel):
    """Result returned by the predictor."""

    probabilities: List[
        Dict[int, float]
    ]  # Example: [{ 0: 0.25, 1:0.75}, {0: 0.15, 1: 0.85}]
    labels: List[int]  # Example: [1, 1]


app = FastNATS()
app.state = {}

await load_predictor(app)
await app.connect()


@app.reply("predict")
def predict(event: Event) -> Result:
    print(f"{event.timestamp} :: Received new event data")
    labels, probas = app.state["predictor"](event.values)
    return {"probabilities": probas, "labels": labels.tolist()}


await app.start()
```

- You can now publish messages on the service:

```python
from faststan import FastNATS


async with FastNATS() as nats_client:
    reply_msg = await nats_client.request_json(
        "predict", {"values": [[0, 0, 0, 0]], "timestamp": 1602661983}
    )

print(f"Received a reply: {reply_msg}")
```

