Metadata-Version: 2.4
Name: netaio
Version: 0.0.8
Summary: Simple asyncio TCP client and server and UDP node library inspired by fastapi
Project-URL: Homepage, https://github.com/k98kurz/netaio
Project-URL: Repository, https://github.com/k98kurz/netaio
Project-URL: Bug Tracker, https://github.com/k98kurz/netaio/issues
Author-email: k98kurz <k98kurz@gmail.com>
Classifier: Development Status :: 2 - Pre-Alpha
Classifier: License :: OSI Approved :: ISC License (ISCL)
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Topic :: Internet
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Requires-Python: >=3.10
Requires-Dist: packify>=0.3.1
Provides-Extra: asymmetric
Requires-Dist: pynacl>=1.5.0; extra == 'asymmetric'
Requires-Dist: tapescript>=0.7.2; extra == 'asymmetric'
Description-Content-Type: text/markdown

# netaio

This is designed to be a simple and easy to use asyncio-based TCP client and
server and UDP node library inspired by fastapi but for non-HTTP use cases.

## Status

This is currently a work-in-progress. Remaining work before the v0.1.0 release:

- [x] Authorization plugin system
- [x] Cipher plugin system
- [x] Optional authorization plugin using HMAC
- [x] Optional cipher plugin using simple symmetric stream cipher
- [x] UDP node with multicast
- [x] Automatic peer advertisement/discovery/management for UDP node
- [x] Error/errored message handling system
- [x] Optional authorization plugin using tapescript
- [x] Optional cipher plugin using Curve25519 asymmetric encryption
- [ ] Optional authorization plugin using Hashcash/PoW for anti-spam DoS protection
- [ ] Ephemeral handlers (i.e. handlers that are removed after first use)
- [ ] IPv6 support
- [ ] Core daemon to proxy traffic for local apps
- [ ] E2e encrypted chat app example

Issues are tracked [here](https://github.com/k98kurz/netaio/issues).

## Usage

Install with `pip install netaio`. To use the optional asymmetric cryptography
plugins, install with `pip install netaio[asymmetric]`.

Brief examples are shown below. For more documentation, see the
[dox.md](https://github.com/k98kurz/netaio/blob/master/dox.md) file generated by
[autodox](https://pypi.org/project/autodox/).

### TCPServer

```python
from netaio import TCPServer, Body, Message, MessageType, HMACAuthPlugin
import asyncio


server = TCPServer(port=8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"}))

@server.on((MessageType.REQUEST_URI, b'something'))
async def something(msg: Message, writer: asyncio.StreamWriter):
    body = Body.prepare(b'This is it.', uri=b'something')
    return Message.prepare(body, MessageType.RESPOND_URI)

@server.on(MessageType.SUBSCRIBE_URI)
async def subscribe(msg: Message, writer: asyncio.StreamWriter):
    server.subscribe(msg.body.uri, writer)
    return Message.prepare(Body.prepare(b'', uri=msg.body.uri), MessageType.CONFIRM_SUBSCRIBE)

@server.on(MessageType.UNSUBSCRIBE_URI)
async def unsubscribe(msg: Message, writer: asyncio.StreamWriter):
    server.unsubscribe(msg.body.uri, writer)
    return Message.prepare(Body.prepare(b'', uri=msg.body.uri), MessageType.CONFIRM_UNSUBSCRIBE)

asyncio.run(server.start())
```

### TCPClient

```python
from netaio import TCPClient, Body, Message, MessageType, HMACAuthPlugin
import asyncio


client = TCPClient("127.0.0.1", 8888, auth_plugin=HMACAuthPlugin(config={"secret": "test"}))
received_resources = {}

@client.on(MessageType.RESPOND_URI)
def echo(msg: Message, writer: asyncio.StreamWriter):
    received_resources[msg.body.uri] = msg.body.content

async def run_client():
    request_body = Body.prepare(b'pls gibs me dat', uri=b'something')
    request_message = Message.prepare(request_body, MessageType.REQUEST_URI)
    await client.connect()
    await client.send(request_message)
    await client.receive_once()

asyncio.run(run_client())

print(received_resources)
```

### UDPNode

```python
from netaio import UDPNode, Peer, Body, Message, MessageType, HMACAuthPlugin
from os import urandom
import asyncio

local_peer = Peer(
    addrs={('127.0.0.1', 8888)},
    id=urandom(16),
    data=b''
)

echo_node = UDPNode(
    local_peer=local_peer,
    auth_plugin=HMACAuthPlugin(config={"secret": "test"})
)

@echo_node.on(MessageType.REQUEST_URI)
def request_uri(msg: Message, addr: tuple[str, int]):
    echo_node.logger.info("Sending echo to %s...", addr)
    return Message.prepare(msg.body, MessageType.OK)

@echo_node.on(MessageType.OK)
def echo(msg: Message, addr: tuple[str, int]):
    echo_node.logger.info("Received echo from %s.", addr)

echo_msg = Message.prepare(Body.prepare(b'echo'), MessageType.REQUEST_URI)

async def main(local_addr: tuple[str, int], remote_addr: tuple[str, int]|None = None):
    echo_node.interface = local_addr[0]
    echo_node.port = local_addr[1]
    await echo_node.start()
    await echo_node.manage_peers_automatically(advertise_every=1, peer_timeout=3)
    while True:
        await asyncio.sleep(1)
        if remote_addr:
            echo_node.logger.info("Sending message to %s...", remote_addr)
            echo_node.send(echo_msg, remote_addr)
        else:
            if len(echo_node.peers) > 0:
                echo_node.logger.info("Broadcasting message to all known peers...")
                echo_node.broadcast(echo_msg)
            else:
                echo_node.logger.info("No peers known, waiting to discover peers...")

local_addr = ("0.0.0.0", 8888)
remote_addr = None
asyncio.run(main(local_addr, remote_addr))
```

Note that to run this example on a single machine, the port must be different
in the second node instance, e.g. `local_addr = ("127.0.0.1", 8889)`, and then
the remote address must be set to the first node's address, e.g.
`remote_addr = ("127.0.0.1", 8888)`. Multicast will not work locally because of
the different ports. If the interface is set to "0.0.0.0", multicast will work
across the LAN, but this will result in the node hearing its own multicast
messages; hence, the `request_uri` handler ignores messages from the local
machine.

(It is technically possible to get multicast to work in one direction on a
single machine by changing the `.port` property after one has started.)

Note also that when a peer is removed from the node's peer list, it is also
unsubscribed from all URIs.

### Plugin System

The plugin system is designed to be simple and easy to understand. Each plugin
implements a specific protocol, and the `TCPServer`, `TCPClient`, and
`UDPNode` classes automatically apply plugins to messages in the correct order
to preserve the encapsulation model (see below).

There are three types of plugins defined with `typing.Protocol` interfaces:

- `AuthPluginProtocol`
- `CipherPluginProtocol`
- `PeerPluginProtocol`

#### Authentication/Authorization

`TCPServer`, `TCPClient`, and `UDPNode` support an optional
authentication/authorization plugin.
Each plugin is instantiated with a dict of configuration parameters, and it must
implement the `AuthPluginProtocol`. Once the plugin has been instantiated, it
can be passed to the `TCPServer`, `TCPClient`, and `UDPNode` constructors or set
on the instances themselves. An auth plugin can also be set on a per-handler
basis by passing the plugin as a second argument (or as the keyword argument
`auth_plugin`) to the `on` or `add_handler` method. Currently, if an auth plugin
is set both on the instance and per-handler, both will be checked before the
handler function is called, and both will be applied to the response body; the
per-handler plugin will be able to overwrite any auth fields set by the instance
plugin, which may break communication -- each plugin instantiation should be
configured to use its own writeable auth data fields.

Currently, netaio includes an `HMACAuthPlugin` that can be used by the server
and client to authenticate and authorize requests. This uses a shared secret to
generate and check HMACs over message bodies.

The `TapescriptAuthPlugin` is included in the optional `netaio.asymmetric`
submodule, which requires the [tapescript](https://pypi.org/project/tapescript)
as a dependency and allows for customizable authentication/authorization models
using the tapescript DSL for access controls.

<details>
<summary>Example of additional auth layer</summary>

```python
from netaio import TCPServer, TCPClient, HMACAuthPlugin, MessageType, Body, Message
import asyncio

outer_auth_plugin = HMACAuthPlugin(config={"secret": "test"})
inner_auth_plugin = HMACAuthPlugin(config={"secret": "tset", "hmac_field": "camh"})
server = TCPServer(port=8888, auth_plugin=outer_auth_plugin)
client = TCPClient(host="127.0.0.1", port=8888, auth_plugin=outer_auth_plugin)

@server.on(MessageType.CREATE_URI, auth_plugin=inner_auth_plugin)
async def put_uri(msg: Message, writer: asyncio.StreamWriter):
    body = Body.prepare(b'Resource saved.', uri=msg.body.uri)
    return Message.prepare(body, MessageType.OK)

async def main():
    task = asyncio.create_task(server.start())
    await asyncio.sleep(0.1)
    await client.connect()
    await client.send(
        Message.prepare(Body.prepare(b'test'), MessageType.CREATE_URI),
        auth_plugin=inner_auth_plugin
    )
    result = await client.receive_once()
    await client.close()
    task.cancel()
    return result

response = asyncio.run(main())
print(response)
```
</details>

#### Cipher (encryption/decryption)

`TCPServer`, `TCPClient`, and `UDPNode` support an optional cipher plugin. Each
plugin is instantiated with a dict of configuration parameters, and it must
implement the `CipherPluginProtocol`. Once the plugin has been instantiated, it
can be passed to the `TCPServer`, `TCPClient`, and `UDPNode` constructors or set
on the instances themselves. A cipher plugin can also be set on a per-handler
basis by passing the plugin as a third argument (or as the keyword argument
`auth_plugin`) to the `on` or `add_handler` method. If a cipher plugin is set
both on the instance and per-handler, both will be applied to the message.

Currently, netaio includes a `Sha256StreamCipherPlugin` that can be used by
the server and client to encrypt and decrypt messages using a simple symmetric
stream cipher. This uses a shared secret key and per-message IVs. Note that the
`encrypt_uri` config option should be `False` to prevent the URI from being
encrypted when using this as an additional, inner layer of encryption, else the
URI will not be usable for routing requests/determining responses when the
default key extractor is used (i.e. handlers set on the tuple of MessageType
and URI).

The `X25519CipherPlugin` is included in the optional `netaio.asymmetric`
submodule, which requires [PyNaCl](https://pypi.org/project/PyNaCl) as a
dependency and allows for asymmetric encryption using ECDHE (Elliptic Curve
Diffie-Hellman Exchange). Note that this plugin should be used as an inner
layer of encryption with automatic peer management and local peer data including
`{'pubkey': SigningKey(seed).verify_key}`.

<details>
<summary>Example of additional encryption layer</summary>

```python
from netaio import TCPServer, TCPClient, Sha256StreamCipherPlugin, MessageType, Body, Message
import asyncio

outer_cipher_plugin = Sha256StreamCipherPlugin(config={"key": "test"})
inner_cipher_plugin = Sha256StreamCipherPlugin(config={"key": "tset", "iv_field": "iv2"})
server = TCPServer(port=8888, cipher_plugin=outer_cipher_plugin)
client = TCPClient(host="127.0.0.1", port=8888, cipher_plugin=outer_cipher_plugin)

@server.on(MessageType.REQUEST_URI, cipher_plugin=inner_cipher_plugin)
async def request_uri(msg: Message, writer: asyncio.StreamWriter):
    body = Body.prepare(b'Super secret data.', uri=msg.body.uri)
    return Message.prepare(body, MessageType.RESPOND_URI)

async def main():
    task = asyncio.create_task(server.start())
    await asyncio.sleep(0.1)
    await client.connect()
    await client.send(
        Message.prepare(
            Body.prepare(b'psst gimme the secret', uri=b'something'),
            MessageType.REQUEST_URI
        ),
        cipher_plugin=inner_cipher_plugin
    )
    result = await client.receive_once(cipher_plugin=inner_cipher_plugin)
    await client.close()
    task.cancel()
    return result

response = asyncio.run(main())
print(response)
```
</details>

#### Peer Data Encoding/Decoding

To use the automatic peer management system, nodes accept a peer plugin. Each
plugin is instantiated with a dict of configuration parameters, and it must
implement the `PeerPluginProtocol`. Once the plugin has been instantiated, it
can be passed to the `TCPServer`, `TCPClient`, and `UDPNode` constructors or set
on the instances themselves. Unlike the auth and cipher plugins, this is used
only for encoding and decoding the data stored in `Peer.data`, and it is not
settable on a per-handler basis.

Currently, netaio includes a `DefaultPeerPlugin` that is used to encode and
decode peer data using the [packify](https://pypi.org/project/packify) library
to encode/decode dicts.

#### Included Plugins

The following plugins are included for convenience and as references for making
custom plugins:

- `HMACAuthPlugin`
- `Sha256StreamCipherPlugin`
- `DefaultPeerPlugin`
- `TapescriptAuthPlugin`
- `X25519CipherPlugin`

<details>
<summary>Example instantiation of HMACAuthPlugin</summary>

```python
from netaio import HMACAuthPlugin
auth_plugin = HMACAuthPlugin({
    "secret": "we attack at dawn",
    "hmac_field": "hmac", #default
    "nonce_field": "nonce", # default
    "ts_field": "ts", # default
})
```
</details>

<details>
<summary>Example instantiation of Sha256StreamCipherPlugin</summary>

```python
from netaio import Sha256StreamCipherPlugin
cipher_plugin = Sha256StreamCipherPlugin({
    "key": "the key to success is held by people better than you",
    "iv_field": "iv", # default
    "encrypt_uri": True, # default; should be False for inner cipher plugins
})
```
</details>

<details>
<summary>Example instantiation of TapescriptAuthPlugin</summary>

```python
from nacl.signing import SigningKey
from netaio.asymmetric import TapescriptAuthPlugin
import os
import tapescript

seed = os.urandom(32)
vkey = SigningKey(seed).verify_key

# pretending there is an admin key to allow admin auth bypass
if os.path.exists('admin.vkey.hex'):
    with open('admin.vkey.hex', 'r') as f:
        admin_vkey = bytes.fromhex(f.read())
else:
    admin_vkey = vkey

inner_script = tapescript.make_single_sig_lock(admin_vkey)

lock = tapescript.make_taproot_lock(vkey, script=inner_script)
witness_func = lambda seed, sigfields: tapescript.make_taproot_witness_keyspend(
    seed, sigfields, committed_script=inner_script
)
auth_plugin = TapescriptAuthPlugin({
    'seed': seed,
    'lock': lock,
    'nonce_field': 'nonce', # default
    'ts_field': 'ts', # default
    'witness_field': 'witness', # default
    'witness_func': witness_func,
    'contracts': {}, # default
    'plugins': {}, # default
})
```
</details>

<details>
<summary>Example instantiation of X25519CipherPlugin</summary>

```python
from netaio.asymmetric import X25519CipherPlugin, Peer, DefaultPeerPlugin
import os

seed = os.urandom(32)
cipher_plugin = X25519CipherPlugin({
    'seed': seed,
    'encrypt_uri': False, # default
    # should not be True unless peers are set manually and this is the outer cipher
})

# for use with peer management, the pubkey should be sent to peers
local_peer = Peer(
    addrs={('0.0.0.0', 8888)},
    id=bytes(cipher_plugin.pubk),
    data=DefaultPeerPlugin().encode_data({
        "pubkey": bytes(cipher_plugin.pubk),
        "vkey": bytes(cipher_plugin.vkey), # optional
    }),
)
```
</details>

More documentation on the asymmetric plugins can be found in
[asymmetric.md](https://github.com/k98kurz/netaio/blob/master/asymmetric.md).

#### Encapsulation Model

The encapsulation model for plugin interactions with messages is as follows:

##### Send

1. Per-handler/injected `cipher_plugin.encrypt`
2. Per-handler/injected `auth_plugin.make`
3. Instance `self.cipher_plugin.encrypt`
4. Instance `self.auth_plugin.make`

##### Receive

1. Instance `self.auth_plugin.check`
2. Instance `self.cipher_plugin.decrypt`
3. Per-handler/injected `auth_plugin.check`
4. Per-handler/injected `cipher_plugin.decrypt`

### Peer Management

This package includes an optional automatic peer management system, which can be
enabled on `TCPServer`, `TCPClient`, and `UDPNode` by awaiting a call to the
`manage_peers_automatically()` method. For TCP, this will cause clients and
servers to send `ADVERTISE_PEER`/`PEER_DISCOVERED` messages to each other upon
connection or upon enabling of peer management for existing connections. For UDP,
this will cause nodes to multicast `ADVERTISE_PEER` messages every 20 seconds by
default, and any node that receives such a message will respond to that peer
with a `PEER_DISCOVERED` message that includes its own information. This will
populate the local peer lists on each server/client/node, enabling the
`broadcast` method to send messages to all known peers.

The `UDPNode.manage_peers_automatically` method can accept optional arguments
`advertise_every: int = 20` and `peer_timeout: int = 60`. All three node types
will accept the following optional arguments:

- `app_id: bytes = b'netaio'`
- `auth_plugin: AuthPluginProtocol|None = None`
- `cipher_plugin: CipherPluginProtocol|None = None`

The `auth_plugin` and `cipher_plugin` provided here will be used only for the
peer advertisement and response traffic.

Upon calling `await client_or_server_or_node.stop_peer_management()`, a
`DISCONNECT` message will be sent by TCP nodes or multicast by UDP nodes; any
node that receives a `DISCONNECT` message will remove that peer from the local
peer lists and all subscriptions.

## Testing

To test, clone the repo, install the dependencies (preferably within a virtual
environment) using `pip install -r requirements.txt`, and run
`python -m unittest discover -s tests`. Or to run the individual tests and see
the output separated by test file, instead run
`find tests/ -name test_*.py -print -exec python {} \;`.

Currently, there are 13 unit tests and 12 e2e tests. The unit tests cover the
bundled plugins and miscellaneous features. The e2e tests start a server and
client (or 2 clients), then send messages from the client to the server and
receive responses; the UDP e2e test suite starts 2 nodes and treats them like a
server and client to make testing a bit simpler and easier to follow. The
automatic peer management system is also tested in both TCP and UDP. The bundled
plugins are used for the e2e tests, and authentication failure cases are also
tested.

## License

Copyright (c) 2025 Jonathan Voss (k98kurz)

Permission to use, copy, modify, and/or distribute this software
for any purpose with or without fee is hereby granted, provided
that the above copyright notice and this permission notice appear in
all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
