Metadata-Version: 2.4
Name: emergent-client
Version: 0.10.0
Summary: Python SDK for the Emergent event-driven workflow platform
Project-URL: Homepage, https://github.com/govcraft/emergent
Project-URL: Documentation, https://github.com/govcraft/emergent/tree/main/sdks/py
Project-URL: Repository, https://github.com/govcraft/emergent
Author-email: Govcraft <info@govcraft.ai>
License: MIT OR Apache-2.0
Keywords: async,emergent,event-driven,message-bus,workflow
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.12
Requires-Dist: msgpack>=1.0
Requires-Dist: pydantic>=2.0
Requires-Dist: typeid-python>=0.3.9
Provides-Extra: dev
Requires-Dist: mypy>=1.13; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.24; extra == 'dev'
Requires-Dist: pytest-cov>=5.0; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ruff>=0.8; extra == 'dev'
Description-Content-Type: text/markdown

# emergent-client

Python SDK for building event-driven workflows on the
[Emergent](https://github.com/govcraft/emergent) engine. Connect to a running
engine over Unix IPC and publish or consume messages through three typed
primitives: **Source**, **Handler**, and **Sink**.

```python
from emergent import EmergentSink

async for msg in EmergentSink.messages("my_sink", ["timer.tick"]):
    print(msg.payload)
```

## Install

```bash
pip install emergent-client
```

Or with [uv](https://docs.astral.sh/uv/):

```bash
uv add emergent-client
```

Then import:

```python
from emergent import EmergentSource, EmergentHandler, EmergentSink
```

## Three Primitives

Every Emergent workflow is composed of Sources, Handlers, and Sinks. Each
primitive has a single, well-defined role:

| Primitive     | Subscribe | Publish | Role                                        |
| ------------- | --------- | ------- | ------------------------------------------- |
| **Source**    | --        | Yes     | Ingress -- bring data into the system       |
| **Handler**   | Yes       | Yes     | Processing -- transform, enrich, or route   |
| **Sink**      | Yes       | --      | Egress -- persist, display, or forward data |

## Quick Start

### Sink -- consume messages

A Sink subscribes to message types and processes each one as it arrives.
`EmergentSink.messages` is a convenience method that connects, subscribes, and
yields messages in a single call:

```python
from emergent import EmergentSink

async for msg in EmergentSink.messages("my_sink", ["timer.tick"]):
    data = msg.payload_as(dict)
    print(f"Tick #{data['sequence']}")
```

For explicit lifecycle control, connect and subscribe separately:

```python
async with await EmergentSink.connect("my_sink") as sink:
    async with await sink.subscribe(["timer.tick", "timer.filtered"]) as stream:
        async for msg in stream:
            print(msg.message_type, msg.payload)
```

### Source -- publish messages

A Source publishes messages into the engine. It cannot subscribe:

```python
from emergent import EmergentSource

async with await EmergentSource.connect("my_source") as source:
    await source.publish("sensor.reading", {"value": 42.5, "unit": "celsius"})
```

### Handler -- subscribe and publish

A Handler subscribes to incoming messages and publishes new ones. Use
`caused_by` to link output messages to the input that triggered them:

```python
from emergent import EmergentHandler, create_message

async with await EmergentHandler.connect("order_processor") as handler:
    async with await handler.subscribe(["order.created"]) as stream:
        async for msg in stream:
            await handler.publish(
                create_message("order.processed")
                .caused_by(msg.id)
                .payload({"status": "ok"})
            )
```

## Publishing Messages

Every primitive that can publish supports three calling styles. All three
produce the same result:

```python
# Shorthand -- type string and payload dict
await source.publish("timer.tick", {"count": 1})

# MessageBuilder -- fluent API with auto-build
await source.publish(
    create_message("timer.tick").payload({"count": 1})
)

# Full EmergentMessage object
await source.publish(message)
```

## Building Messages

`create_message` returns a fluent builder for constructing immutable
`EmergentMessage` instances:

```python
from emergent import create_message

msg = (
    create_message("sensor.reading")
    .payload({"value": 42.5, "unit": "celsius"})
    .metadata({"sensor_id": "temp-01", "location": "room-a"})
    .build()
)
```

Link messages into traceable chains with `caused_by` and `correlated_with`:

```python
reply = (
    create_message("order.confirmed")
    .caused_by(original_msg.id)
    .correlated_with(request_id)
    .payload({"confirmed": True})
    .build()
)
```

The builder sets `id` (TypeID format) and `timestamp_ms` automatically. Call
`.build()` explicitly when you need the message object, or pass the builder
directly to `publish()`, which calls `.build()` for you.

## Subscribing to Messages

`subscribe` accepts a list or variadic arguments:

```python
# List form
stream = await sink.subscribe(["timer.tick", "timer.filtered"])

# Variadic form
stream = await sink.subscribe("timer.tick", "timer.filtered")
```

Iterate over the returned `MessageStream` with `async for`:

```python
async for msg in stream:
    data = msg.payload_as(dict)
    print(data["count"])
```

`MessageStream` implements `AsyncIterator` and the async context manager
protocol, so you can use `async with` for automatic cleanup:

```python
async with await sink.subscribe(["timer.tick"]) as stream:
    async for msg in stream:
        print(msg.payload)
```

### Typed payloads with Pydantic

`payload_as` validates dict payloads against Pydantic models automatically:

```python
from pydantic import BaseModel

class SensorReading(BaseModel):
    value: float
    unit: str

async for msg in EmergentSink.messages("my_sink", ["sensor.reading"]):
    reading = msg.payload_as(SensorReading)
    print(f"{reading.value} {reading.unit}")
```

## Resource Cleanup

All primitives implement the async context manager protocol. Use `async with`
for automatic cleanup (recommended), or call `close()` / `disconnect()`
manually:

```python
# Automatic cleanup (recommended)
async with await EmergentSink.connect("my_sink") as sink:
    ...

# Manual cleanup
sink = await EmergentSink.connect("my_sink")
# ... use sink ...
await sink.disconnect()
```

The SDK subscribes to `system.shutdown` internally. When the Emergent engine
signals a graceful shutdown, active message streams close automatically.

## Helper Functions

`run_source`, `run_handler`, and `run_sink` eliminate connection and
signal-handling boilerplate. Each helper connects, sets up SIGTERM/SIGINT
handlers, runs your callback, and disconnects on completion:

```python
import asyncio
from emergent import run_source, run_handler, run_sink, create_message

# Source -- custom event loop with shutdown signal
async def timer_logic(source, shutdown_event):
    count = 0
    while not shutdown_event.is_set():
        try:
            await asyncio.wait_for(shutdown_event.wait(), timeout=3.0)
            break
        except asyncio.TimeoutError:
            count += 1
            await source.publish(
                create_message("timer.tick").payload({"count": count})
            )

await run_source("my_timer", timer_logic)

# Handler -- called once per message
async def process(msg, handler):
    await handler.publish(
        create_message("processed").caused_by(msg.id).payload({"done": True})
    )

await run_handler("my_handler", ["raw.event"], process)

# Sink -- called once per message
async def consume(msg):
    print(msg.payload)

await run_sink("my_sink", ["timer.tick"], consume)
```

The name argument is optional. When omitted or set to `None`, the helper reads
from the `EMERGENT_NAME` environment variable.

## Error Handling

All SDK errors extend `EmergentError` and include a machine-readable `code`
property. Catch specific error types for precise control:

```python
from emergent import (
    EmergentSource,
    SocketNotFoundError,
    ConnectionError,
    TimeoutError,
)

try:
    source = await EmergentSource.connect("my_source")
except SocketNotFoundError as e:
    print(f"Engine not running at: {e.socket_path}")
except TimeoutError as e:
    print(f"Timed out after {e.timeout}s")
except ConnectionError as e:
    print(f"Connection failed: {e}")
```

### Error Types

| Error                 | Code                  | Extra Fields     |
| --------------------- | --------------------- | ---------------- |
| `ConnectionError`     | `CONNECTION_FAILED`   |                  |
| `SocketNotFoundError` | `SOCKET_NOT_FOUND`    | `socket_path`    |
| `TimeoutError`        | `TIMEOUT`             | `timeout`        |
| `ProtocolError`       | `PROTOCOL_ERROR`      |                  |
| `SubscriptionError`   | `SUBSCRIPTION_FAILED` | `message_types`  |
| `PublishError`        | `PUBLISH_FAILED`      | `message_type`   |
| `DiscoveryError`      | `DISCOVERY_FAILED`    |                  |
| `DisposedError`       | `DISPOSED`            |                  |
| `ValidationError`     | `VALIDATION_ERROR`    | `field`          |

## Message Shape

Every message flowing through Emergent follows the same envelope:

| Field            | Type                 | Description                          |
| ---------------- | -------------------- | ------------------------------------ |
| `id`             | `str`                | Unique TypeID (`msg_<uuidv7>`)       |
| `message_type`   | `str`                | Routing key (e.g., `"timer.tick"`)   |
| `source`         | `str`                | Name of the publishing primitive     |
| `correlation_id` | `str \| None`        | Links related messages               |
| `causation_id`   | `str \| None`        | ID of the triggering message         |
| `timestamp_ms`   | `int`                | Creation time (Unix ms)              |
| `payload`        | `Any`                | User-defined data                    |
| `metadata`       | `dict[str, Any] \| None` | Optional tracing/debug data     |

Use `msg.payload_as(MyModel)` to validate and convert the payload to a typed
Pydantic model or any other type.

## System Events

The Emergent engine broadcasts lifecycle events that your primitives can
subscribe to:

| Event Pattern              | Payload Type           | Fired When                    |
| -------------------------- | ---------------------- | ----------------------------- |
| `system.started.<name>`    | `SystemEventPayload`   | Primitive started             |
| `system.stopped.<name>`    | `SystemEventPayload`   | Primitive stopped             |
| `system.error.<name>`      | `SystemEventPayload`   | Primitive failed              |
| `system.shutdown`          | `SystemShutdownPayload`| Engine shutting down          |

Use the typed payload classes for safe access:

```python
from emergent import SystemEventPayload

if msg.message_type.startswith("system.started."):
    event = msg.payload_as(SystemEventPayload)
    print(f"{event.name} ({event.kind}) started with PID {event.pid}")

if msg.message_type.startswith("system.error."):
    event = msg.payload_as(SystemEventPayload)
    if event.is_error():
        print(f"{event.name} failed: {event.error}")
```

## Requirements

- Python 3.12 or later
- A running Emergent engine with the `EMERGENT_SOCKET` environment variable set

## License

MIT OR Apache-2.0
