Metadata-Version: 2.4
Name: rembus
Version: 0.3.0
Summary: Rembus for python
Author-email: Attilio Donà <attilio.dona@gmail.com>
License-Expression: AGPL-3.0-only
Project-URL: Homepage, https://github.com/cardo-org/rembus.python
Project-URL: Bug Tracker, https://github.com/cardo-org/rembus.python/issues
Keywords: rembus,rpc,publish,subscribe,websocket,cbor
Classifier: Programming Language :: Python :: 3
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 3 - Alpha
Requires-Python: >=3.7
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: async-timeout
Requires-Dist: platformdirs
Requires-Dist: pandas
Requires-Dist: pyarrow
Requires-Dist: websockets
Requires-Dist: cbor2
Requires-Dist: cryptography
Requires-Dist: pydantic
Provides-Extra: dev
Requires-Dist: check-manifest; extra == "dev"
Requires-Dist: bumpver; extra == "dev"
Requires-Dist: twine; extra == "dev"
Provides-Extra: test
Requires-Dist: coverage; extra == "test"
Requires-Dist: mock; extra == "test"
Requires-Dist: pytest; extra == "test"
Requires-Dist: pytest-asyncio; extra == "test"
Requires-Dist: pytest-mock; extra == "test"
Requires-Dist: pytest-cov; extra == "test"
Dynamic: license-file

# Rembus for Python

[![Build Status](https://github.com/cardo-org/rembus.python/actions/workflows/python-app.yml/badge.svg?branch=main)](https://github.com/cardo-org/rembus.python/actions/workflows/CI.yml?query=branch%3Amain)
[![Coverage](https://codecov.io/gh/cardo-org/rembus.python/branch/main/graph/badge.svg)](https://codecov.io/gh/cardo-org/rembus.python)

Rembus is a Pub/Sub and RPC middleware.

There are few key concepts to get confident with Rembus:

- A Component is a distributed application that communicate with Pub/Sub or RPC styles;
- A Component connect to a Broker;
- A Broker dispatch messages between Components;
- A Component expose RPC services and/or subscribe to Pub/Sub topics;
- A Component make RPC requests and/or publish messages to Pub/Sub topics;

This API version supports only the WebSocket protocol.

## Getting Started

Start the [Rembus](https://cardo-org.github.io/Rembus.python/stable/) broker.

Install the package:

```shell
pip install rembus
```

```python
import rembus

rb = rembus.node()
rb.publish({'name': 'sensor_1', 'metric': 'T', 'value':21.6})
rb.close()
```

or call `component("myname")` for the asynchronous Python API:

```python
import asyncio
import rembus

async def main():
    rb = await rembus.component("myname")
    await rb.publish("mytopic", {'name': 'sensor_1','metric': 'T','value':21.6})
    await rb.close()


loop = asyncio.new_event_loop()
loop.run_until_complete(main())

```

## Initialize a Component

Currently the Python API provides the WebSocket protocol for connecting to the Rembus broker.

The url argument of the `component` function define the component identity and the broker endpoint to connect:

```python
import rembus

# Broker endpoint and named component
rb = await rembus.component('ws://hostname:port/component_name')

# Broker endpoint and anonymous component 
rb = await rembus.component('ws://hostname:port')

# Default broker and named component 
rb = await rembus.component('component_name')

# Default broker and anonymous component 
rb = await rembus.component()
```

The `component` builder function returns a Rembus handler that will be used for interacting with the components via Pub/Sub and RPC messages.

`component_name` is the unique name that assert the component identity between online sessions (connect/disconnect windows).

`component_name` is optional: if it is missing then a random identifier that changes at each connection event is used as the component identifier. In this case the broker is unable to bind the component to a persistent twin and messages published when the component is offline get not broadcasted to the component when it gets online again.

The default broker endpoint is set by `REMBUS_BASE_URL` environment variable and default to `ws://127.0.0.1:8000`.

## Pub/Sub example

A message is published with `publish` function.

```python
rb.publish('mytopic', arg_1, arg_2, ..., arg_N)
```

Where the arguments `arg_i` comprise the message data payload that gets received by the subscribed components.

A subscribed component interested to the topic `mytopic` have to define a function named as the topic of interest and with the same numbers of arguments:

```python
# do something each time a message published to topic mytopic is published
def mytopic(arg_1, arg_2, ..., arg_N):
    ...

rb.subscribe(mytopic)

rb.wait()
```

The first argument to `subscribe` is the function, named as the topic of interest, that will be called each time a message is published.

The optional second argument of `subscribe` define the "retroactive" feature of the
subscribed topic.

If the second argument is `True` then the messages published when the component is offline will be delivered as soon as the component will get online again, otherwise
the messages published before connecting will be lost.

> **NOTE**: To cache messages for an offline component the broker needs to know that such component has subscribed for a specific topic. This imply that messages published before the first subscribe happens will be lost. If you want all message will be delivered subscribe first and publish after.  

## RPC example

A RPC service is implemented with a function named as the exposed service.

```python
import rembus as rembus

def add(x,y):
    return x+y

rb = rembus.node('calculator')

rb.expose(add)

rb.wait()
```

The `calculator` component expose the `add` service, the RPC client will invoke as:

```python
import rembus as rembus

rb = rembus.node()
result = rb.rpc('add', 1, 2)
```

The asynchronous client and server implementations will be something like:

```python
#server.py
import asyncio
import rembus

async def add(x, y):
    return x+y

async def main():
    rb = await rembus.component()
    
    await rb.expose(add)
    await rb.wait()

loop = asyncio.new_event_loop()
loop.run_until_complete(main())
```

```python
# client.py
import asyncio
import rembus

async def main():
    rb = await rembus.component()
    result = await rb.rpc('add', 1, 2)
    print(f'result={result}')
    await rb.close()


loop = asyncio.new_event_loop()
loop.run_until_complete(main())
```

## Test

```shell
pytest --cov=rembus --cov-report=lcov:lcov.info
```
