Metadata-Version: 2.4
Name: sasy-observability
Version: 0.0.1
Summary: Observability graph server and API for LLM agent policy enforcement
Project-URL: Homepage, https://github.com/nilspalumbo/observability
Author: Nils Palumbo
License: MIT
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Requires-Python: >=3.10
Requires-Dist: grpcio<2.0,>=1.76.0
Requires-Dist: httpx>=0.28.0
Requires-Dist: pydantic-settings>=2.0.0
Requires-Dist: sasy-common
Provides-Extra: server
Requires-Dist: cryptography>=44.0.0; extra == 'server'
Requires-Dist: fastapi[standard]>=0.124.0; extra == 'server'
Requires-Dist: fire>=0.5.0; extra == 'server'
Requires-Dist: neo4j<5.29,>=5.28.1; extra == 'server'
Requires-Dist: networkx[default]>=3.4.2; extra == 'server'
Description-Content-Type: text/markdown

# Observability Server

The observability server maintains a dependency graph of all agent messages in Neo4j, enabling graph-based policy evaluation, auditing, and analysis.

## Design

The primary function of the server is event recording: it accepts message and OpenTelemetry events from instrumented applications and stores them with their dependencies in Neo4j.

In addition, it provides APIs for graph queries, forward and backward slices in particular.

The server also includes an updates server that streams graph changes in real-time to downstream consumers (e.g., the policy engine) via a gRPC API.

```
┌─────────────────┐     gRPC      ┌─────────────────────┐
│  Instrumented   │──────────────▶│  Observability      │
│  Application    │               │  Server (10085)     │
└─────────────────┘               └──────────┬──────────┘
                                             │
                                             ▼
                                  ┌─────────────────────┐
                                  │      Neo4j          │
                                  │  (Message Graph)    │
                                  └──────────┬──────────┘
                                             │ APOC Triggers
                                             ▼
                                  ┌─────────────────────┐     gRPC
                                  │   Updates Server    │────────────▶ Policy Engine
                                  │      (10086)        │
                                  └─────────────────────┘
```

## Running

Generally start via `docker compose`. To run manually:

```bash
# Set required environment variables
NEO4J_URI=bolt://localhost:7687
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=your-password

# Start the observability server
uv run python -m observability.server
```

## Python API

```python
import observability

# Configure client
observability.configure(
    server_url="localhost:10085",
    auth_hook=my_auth_hook,
)

# Record an event
observability.record_event(
    id="msg-123",
    content="Hello, world!",
    role="user",
    agent="MyAgent",
    parent_ids=["msg-122"],
)

# Register additional dependencies
observability.register_dependencies(
    id="msg-124",
    parent_ids=["msg-120", "msg-121"],
)

# Query the graph
backward = observability.backward_slice("msg-124")  # Returns NetworkX DiGraph
forward = observability.forward_slice("msg-100")
```

## Updates Server

The updates server streams graph changes to consumers via SSE. Changes are captured by Neo4j APOC triggers and pushed to connected clients.

### Endpoints

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/state` | GET | Current graph state (nodes, edges, sequence) |
| `/updates/{from_position}` | GET | SSE stream of graph changes |
| `/ack/{client_id}/{position}` | POST | Acknowledge receipt of updates |
| `/notify` | POST | Push notification from APOC triggers |
| `/health` | GET | Health check |

### Event Types

```
event: node_created
data: {"id": "msg-123", "content": "...", "role": "user", ...}

event: edge_created
data: {"source": "msg-123", "destination": "msg-122"}

event: node_deleted
data: {"id": "msg-123"}

event: edge_deleted
data: {"source": "msg-123", "destination": "msg-122"}

event: full_state
data: {"nodes": [...], "edges": [...], "sequence": 42}

event: heartbeat
data: {"sequence": 42}
```

## Neo4j Schema

The graph contains two node types with `DEPENDS_ON` relationships:

```cypher
// Message node - represents agent/user/system messages
(:Message {
    id: "msg-123",
    content: "Hello, world!",
    role: "user",
    agent: "MyAgent",
    timestamp: datetime(),
    tools: [...],
})

// Computation node - represents OTel spans (LLM calls, tool executions, etc.)
(:Computation {
    id: "span-456",
    trace_id: "abc123",
    span_id: "def456",
    name: "llm_response",
    start_time_ns: ...,
    end_time_ns: ...,
    attributes: {...},
})

// Message dependencies
(:Message)-[:DEPENDS_ON]->(:Message)

// Computation relationships
(:Computation)-[:CHILD_OF]->(:Computation)   // Span hierarchy (child -> parent)
(:Computation)-[:PRODUCES]->(:Message)       // Span outputs a message
(:Computation)-[:CONSUMES]->(:Message)       // Span uses a message as input
```

## gRPC API

Defined in `proto/observability.proto`. These require

### Recording messages and dependencies

These require `observability-writer` privileges. To record a message event in the graph:

```protobuf
rpc RegisterEvents (Events) returns (IDs)

message Events {
    repeated Event events = 1;
}

message Event {
    optional string text = 1;
    optional string agent = 2;
    optional Role role = 3;
    optional string id = 4;
    repeated Tool tools = 5;
    // If this message is a tool result, the tool it was derived from
    optional Tool derived_from = 6;
}

message IDs {
    repeated string ids = 1;
}
```

To register additional dependencies between existing messages:

```protobuf
rpc RegisterDependencies (Dependencies) returns (Response)

message Dependencies {
    repeated Edge edges = 1;
}

message Edge {
    string source = 1;
    string destination = 2;
    optional uint32 message_index = 3;
    optional bool proximal = 4;
}
```

### BackwardSlice / ForwardSlice
Query the dependency graph for message provenance or impact.

```protobuf
rpc BackwardSlice (SliceRequest) returns (Graph) {}
rpc ForwardSlice (SliceRequest) returns (Graph) {}

message SliceRequest {
    optional string event_id = 1;   // Starting message ID
    optional uint32 max_depth = 2;  // Maximum traversal depth
}

message Graph {
    repeated Event nodes = 1;
    repeated Edge edges = 2;
}
```

## Code Structure

| File | Description |
|------|-------------|
| `observability/server.py` | gRPC server implementation |
| `observability/updates_server.py` | SSE updates server |
| `observability/api.py` | Python client API |
| `observability/graph.py` | Neo4j graph operations |
| `observability/triggers.py` | APOC trigger management |
| `observability/utils.py` | Type conversions |
