Metadata-Version: 2.4
Name: eventix
Version: 4.3.3
Author-email: Marco Bartel <bsimpson888@gmail.com>
Requires-Python: <4.0,>=3.13
Requires-Dist: click<9.0.0,>=8.1.7
Requires-Dist: croniter<3.0.0,>=2.0.1
Requires-Dist: lsidentity<2.0.0,>=1.0.2
Requires-Dist: lsrestclient<4.0.0,>=3.0.1
Requires-Dist: pydantic-db-backend-common<3.0.0,>=2.3.1
Requires-Dist: pydantic<3.0.0,>=2.12.0
Requires-Dist: pydash
Requires-Dist: python-dotenv<2.0.0,>=1.0.0
Requires-Dist: toml<0.11.0,>=0.10.2
Requires-Dist: webexception<2.0.0,>=1.0.2
Description-Content-Type: text/markdown

# Eventix

**Eventix** is a distributed event-driven task scheduling and execution system built with Python and FastAPI. It enables you to register events, define triggers that respond to those events, and schedule asynchronous tasks with priorities, retry logic, and flexible execution.

## Table of Contents

- [Features](#features)
- [Architecture](#architecture)
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Configuration](#configuration)
- [Core Concepts](#core-concepts)
  - [Events](#events)
  - [Triggers](#triggers)
  - [Tasks](#tasks)
  - [Namespaces](#namespaces)
- [Usage Examples](#usage-examples)
- [API Documentation](#api-documentation)
- [Worker Setup](#worker-setup)
- [Development](#development)
- [License](#license)

## Features

- **Event-Driven Architecture**: Post events and automatically trigger tasks based on registered triggers
- **Priority-Based Task Scheduling**: Tasks are executed based on priority and ETA (Estimated Time of Arrival)
- **Retry Logic**: Automatic retry with configurable backoff for failed tasks
- **Task Uniqueness**: Prevent duplicate tasks with unique keys
- **Namespace Support**: Isolate tasks and events across different namespaces
- **MongoDB Backend**: Built on MongoDB for persistent storage
- **RESTful API**: FastAPI-based HTTP interface for event and task management
- **Worker System**: Distributed worker support for task execution
- **Scheduled Tasks**: Support for recurring tasks via cron-like scheduling
- **Task Relay**: Forward tasks between different Eventix instances
- **Metrics & Monitoring**: Built-in metrics endpoints for observability

## Architecture

Eventix consists of three main components:

1. **API Server** (`main.py`): FastAPI application that receives events, registers triggers, and manages tasks
2. **Worker** (`worker.py`): Processes scheduled tasks from the queue
3. **Client**: Applications that post events or schedule tasks

```
┌─────────────┐         ┌──────────────┐         ┌─────────────┐
│   Client    │────────▶│  API Server  │────────▶│  MongoDB    │
│ Application │         │  (FastAPI)   │         │  Backend    │
└─────────────┘         └──────────────┘         └─────────────┘
                               │                         ▲
                               │                         │
                               ▼                         │
                        ┌──────────────┐                 │
                        │   Worker(s)  │─────────────────┘
                        │  Task Exec   │
                        └──────────────┘
```

## Installation

### Prerequisites

- Python 3.13+
- MongoDB instance (local or remote)

### Install with uv (recommended)

```bash
# Clone the repository
git clone https://github.com/yourusername/eventix.git
cd eventix

# Install dependencies with uv
uv sync
```

### Install with pip

```bash
pip install eventix
```

## Quick Start

### 1. Start MongoDB

```bash
docker run -d -p 27017:27017 mongo:latest
```

### 2. Configure Environment

Create a `.env.local` file based on `.env.local.example`:

```bash
cp .env.local.example .env.local
```

Edit `.env.local`:

```env
MONGODB_URI=mongodb://localhost:27017/eventix
EVENTIX_URL=http://localhost:8000
EVENTIX_NAMESPACE=default
EVENTIX_API_PORT=8000
```

### 3. Start the API Server

```bash
python main.py
```

The API will be available at `http://localhost:8000`. Visit `http://localhost:8000/docs` for interactive API documentation.

### 4. Start a Worker

```bash
python worker.py
```

### 5. Schedule Your First Task

Create a simple task scheduler (example from `demo_app/demo_scheduler.py`):

```python
from eventix.functions.core import task
from eventix.functions.task_scheduler import TaskScheduler

@task()
def mytask(data: str):
    print(f"Task executed with data: {data}")

if __name__ == "__main__":
    TaskScheduler.config({})
    mytask.delay(data="Hello, Eventix!")
```

## Configuration

### Environment Variables

| Variable | Description | Default |
|----------|-------------|---------|
| `MONGODB_URI` | MongoDB connection string | - |
| `EVENTIX_URL` | URL of the Eventix API server | - |
| `EVENTIX_NAMESPACE` | Default namespace for tasks/events | `default` |
| `EVENTIX_API_PORT` | Port for the API server | `8000` |
| `EVENTIX_BACKEND` | Backend type (mongodb/couchdb) | `mongodb` |
| `EVENTIX_DELAY_TASKS` | Enable delayed task execution | `true` |
| `EVENTIX_SCHEDULE_ENABLED` | Enable scheduled/recurring tasks | `false` |
| `EVENTIX_TRIGGER_CONFIG_DIRECTORY` | Directory for trigger YAML configs | - |
| `EVENTIX_RELAY_CONFIG` | Path to relay configuration YAML | `relay.yaml` |

## Core Concepts

### Events

Events are notifications that something has occurred in your system. Events have:

- **name**: Unique identifier for the event type
- **namespace**: Logical grouping (default: "default")
- **payload**: Data associated with the event
- **priority**: Execution priority (lower = higher priority)
- **timestamp**: When the event occurred
- **operator**: Who/what triggered the event

Example event definition:

```python
from eventix.pydantic.event import TEventixEvent

class TransactionStatusChangeEvent(TEventixEvent):
    class Payload(BaseModel):
        full_tr_number: str
        status: str

    payload: Payload
```

### Triggers

Triggers are event handlers that execute when specific events occur. They convert events into tasks.

Example trigger:

```python
from eventix.pydantic.event import TEventixEventTrigger, TEventixEvent
from eventix.pydantic.task import TEventixTask

class AnnouncementEmailTrigger(TEventixEventTrigger):
    event_name: str = "TransactionStatusChangeEvent"
    namespace: str = "default"

    def execute(self, event: TEventixEvent) -> TEventixTask:
        # Check if event matches criteria
        if event.payload.get("status") != "ANNOUNCED":
            self.ignore()  # Skip this event

        # Return a task to be scheduled
        return TEventixTask(
            task="send_email",
            kwargs={"to": "user@example.com", "subject": "Announced!"}
        )
```

Register triggers:

```python
from eventix.functions.event import event_register_trigger

event_register_trigger(AnnouncementEmailTrigger())
```

Or load from YAML configuration files.

### Tasks

Tasks are units of work to be executed by workers. Tasks have:

- **task**: Function name to execute
- **args/kwargs**: Function arguments
- **eta**: Estimated time of arrival (when to execute)
- **priority**: Execution order (lower = higher priority)
- **status**: scheduled, processing, done, error, retry
- **unique_key**: Optional key to prevent duplicate tasks
- **retry**: Enable automatic retry on failure
- **max_retries**: Maximum retry attempts

Example task creation:

```python
from eventix.functions.core import task

@task()
def send_email(to: str, subject: str, body: str):
    # Send email logic
    print(f"Sending email to {to}: {subject}")

# Schedule immediately
send_email.delay(to="user@example.com", subject="Hello", body="World")

# Schedule for later
from datetime import datetime, timedelta
send_email.apply_async(
    kwargs={"to": "user@example.com", "subject": "Reminder", "body": "..."},
    eta=datetime.utcnow() + timedelta(hours=1)
)
```

### Namespaces

Namespaces provide logical isolation for tasks and events, allowing you to:

- Separate environments (dev, staging, production)
- Isolate different applications or tenants
- Apply different processing rules per namespace

## Usage Examples

### Example 1: Post an Event

```python
from eventix.functions.event import event_post
from eventix.pydantic.event import TEventixEvent

event = TEventixEvent(
    name="UserRegistered",
    namespace="production",
    payload={"user_id": "12345", "email": "user@example.com"}
)

tasks = event_post(event)  # Returns list of scheduled tasks
```

### Example 2: Define and Register a Trigger

```python
from eventix.pydantic.event import TEventixEventTrigger
from eventix.pydantic.task import TEventixTask
from eventix.functions.event import event_register_trigger

class WelcomeEmailTrigger(TEventixEventTrigger):
    event_name = "UserRegistered"

    def execute(self, event):
        return TEventixTask(
            task="send_welcome_email",
            kwargs={"email": event.payload["email"]}
        )

event_register_trigger(WelcomeEmailTrigger())
```

### Example 3: Define a Task with Retry Logic

```python
from eventix.functions.core import task

@task(retry=True, max_retries=3)
def process_payment(order_id: str, amount: float):
    # Payment processing logic
    pass

process_payment.delay(order_id="ORD-123", amount=99.99)
```

### Example 4: Task with Unique Key (Prevents Duplicates)

```python
@task()
def generate_report(report_id: str):
    # Report generation logic
    pass

generate_report.apply_async(
    kwargs={"report_id": "RPT-001"},
    unique_key="report_RPT-001"  # Only one task with this key will be scheduled
)
```

## API Documentation

Once the server is running, visit:

- **Swagger UI**: `http://localhost:8000/docs`
- **ReDoc**: `http://localhost:8000/redoc`

### Key Endpoints

- `POST /event` - Post a new event
- `GET /tasks` - List tasks (with filtering)
- `GET /task/{task_id}` - Get task details
- `POST /task` - Create a new task
- `GET /metrics` - System metrics
- `GET /namespaces` - List available namespaces
- `GET /healthz` - Health check endpoint

## Worker Setup

Workers are responsible for executing scheduled tasks. You can run multiple workers for horizontal scaling.

### Basic Worker (`worker.py`)

```python
from eventix.functions.fastapi import init_backend
from eventix.functions.task_worker import TaskWorker
from eventixconfig import config

worker = TaskWorker(config)

if __name__ == "__main__":
    init_backend()
    worker.start(endless=True)
```

### Worker Configuration (`eventixconfig.py`)

```python
# Define your task functions here
config = {
    "send_email": send_email_function,
    "process_payment": process_payment_function,
    # ... more tasks
}
```

### Running Multiple Workers

```bash
# Terminal 1
python worker.py

# Terminal 2
python worker.py

# Terminal 3
python worker.py
```

Workers will automatically coordinate via MongoDB to prevent duplicate task execution.

## Development

### Running Tests

```bash
pytest
```

### Code Formatting

```bash
ruff format .
```

### Linting

```bash
ruff check .
```

### Docker Setup

Build and run with Docker:

```bash
docker-compose up
```

Or use the helper scripts:

```bash
python docker_build.py
python docker_run.py
```

## Database

Eventix uses MongoDB as its primary backend for storing events, tasks, and triggers. The system automatically creates indexes for optimal query performance.

### Collections

- **task**: Stores all task records with status, priority, and execution details
- **event**: Stores event history (if enabled)

## License

[Add your license information here]

## Contributing

Contributions are welcome! Please open an issue or submit a pull request.

## Support

For questions or issues, please open an issue on GitHub.
