Metadata-Version: 2.4
Name: kanban-analytics
Version: 0.1.0
Summary: Server-side Python analytics SDK for Kanban
License-Expression: MIT
Requires-Python: >=3.9
Requires-Dist: httpx>=0.25.0
Requires-Dist: tenacity>=8.0.0
Requires-Dist: typing-extensions>=4.0.0
Provides-Extra: celery
Requires-Dist: celery>=5.0.0; extra == 'celery'
Provides-Extra: dev
Requires-Dist: celery>=5.0; extra == 'dev'
Requires-Dist: django>=4.0; extra == 'dev'
Requires-Dist: fastapi>=0.100; extra == 'dev'
Requires-Dist: httpx>=0.25; extra == 'dev'
Requires-Dist: mypy>=1.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.21; extra == 'dev'
Requires-Dist: pytest-httpx>=0.21; extra == 'dev'
Requires-Dist: pytest>=7.0; extra == 'dev'
Provides-Extra: django
Requires-Dist: django>=3.2; extra == 'django'
Provides-Extra: fastapi
Requires-Dist: fastapi>=0.100.0; extra == 'fastapi'
Description-Content-Type: text/markdown

# kanban-analytics

Server-side Python analytics SDK for tracking events from Django, FastAPI, Flask, Celery, and plain Python applications.

- **Sync + Async first-class support** via `httpx`
- **Batch mode** with configurable size and flush interval
- **Retry with exponential backoff** via `tenacity`
- **Framework integrations** for Django, FastAPI, Flask, and Celery
- **Typed event catalog** for compile-time and runtime validation
- **Python 3.9+** compatible

## Installation

```bash
pip install kanban-analytics
```

With framework extras:

```bash
pip install "kanban-analytics[django]"
pip install "kanban-analytics[fastapi]"
pip install "kanban-analytics[celery]"
```

## Quick Start — Plain Python / Scripts

```python
from kanban_analytics import analytics

analytics.init(
    "your-api-key",
    endpoint="https://your-app.com/api/ingest",
)

analytics.identify("user_123", traits={"name": "Alice", "plan": "pro"})

analytics.track(
    "payment_completed",
    user_id="user_123",
    properties={"amount": 99.0, "currency": "USD"},
)

# Cleanup on exit
analytics.shutdown()
```

Or use as a context manager:

```python
from kanban_analytics import AnalyticsClient

with AnalyticsClient() as client:
    client.init("your-api-key", endpoint="https://your-app.com/api/ingest")
    client.track("server_started", anonymous_id="system")
```

## Quick Start — Django

**settings.py:**

```python
MIDDLEWARE = [
    # ...
    "kanban_analytics.integrations.django.AnalyticsMiddleware",
]
```

**apps.py:**

```python
from django.apps import AppConfig

class MyAppConfig(AppConfig):
    def ready(self):
        from kanban_analytics import analytics
        from kanban_analytics.integrations.django import connect_auth_signals

        analytics.init(
            "your-api-key",
            endpoint="https://your-app.com/api/ingest",
            service_name="my-django-app",
        )
        connect_auth_signals()
```

**views.py:**

```python
def my_view(request):
    request.analytics.track("page_viewed", properties={"path": request.path})
    return HttpResponse("OK")
```

## Quick Start — FastAPI

```python
from contextlib import asynccontextmanager
from typing import Annotated

from fastapi import Depends, FastAPI

from kanban_analytics import async_analytics
from kanban_analytics.integrations.fastapi import Analytics, get_analytics


@asynccontextmanager
async def lifespan(app: FastAPI):
    await async_analytics.init(
        "your-api-key",
        endpoint="https://your-app.com/api/ingest",
    )
    yield
    await async_analytics.shutdown()


app = FastAPI(lifespan=lifespan)


@app.post("/checkout")
async def checkout(analytics: Analytics):
    await analytics.track("checkout_initiated")
    return {"status": "ok"}
```

## Quick Start — Flask

```python
from flask import Flask, g
from kanban_analytics import analytics
from kanban_analytics.integrations.flask import AnalyticsExtension

app = Flask(__name__)

analytics.init("your-api-key", endpoint="https://your-app.com/api/ingest")

analytics_ext = AnalyticsExtension()
analytics_ext.init_app(app)


@app.route("/")
def index():
    g.analytics.track("page_viewed", properties={"path": "/"})
    return "OK"
```

## Quick Start — Celery

```python
from celery import Celery
from kanban_analytics import analytics
from kanban_analytics.integrations.celery import TrackedTask

app = Celery("myapp")

analytics.init("your-api-key", endpoint="https://your-app.com/api/ingest")


@app.task(base=TrackedTask, track=True)
def send_email(user_id: str, template: str):
    # Automatically tracks:
    # - celery_task_started
    # - celery_task_completed (with duration_ms)
    # - celery_task_failed (on exception, with error details)
    pass
```

## Fire and Forget vs `await_response`

By default, all tracking calls are **fire-and-forget** — they return immediately without waiting for server confirmation:

```python
# Returns immediately (sync)
analytics.track("event", user_id="u1")

# Returns immediately (async — uses asyncio.create_task internally)
await async_analytics.track("event", user_id="u1")
```

Pass `await_response=True` to block until the server confirms receipt. This is useful when you need guaranteed delivery:

```python
# Blocks until confirmed or raises on failure
analytics.track("critical_event", user_id="u1", await_response=True)

# Awaits server response
await async_analytics.track("critical_event", user_id="u1", await_response=True)
```

## Batch Mode

Enable batch mode to accumulate events and send them in bulk:

```python
analytics.init(
    "your-api-key",
    endpoint="https://your-app.com/api/ingest",
    batch_mode=True,
    batch_size=50,         # flush every 50 events
    flush_interval=2.0,    # or every 2 seconds
)

# Events are queued, not sent immediately
analytics.track("event_1", user_id="u1")
analytics.track("event_2", user_id="u1")

# Manual flush
result = analytics.flush()
print(f"Sent: {result['sent']}, Failed: {result['failed']}")

# shutdown() automatically flushes
analytics.shutdown()
```

## Framework Integrations in Depth

### Django Middleware

The `AnalyticsMiddleware` attaches a scoped `ScopedAnalyticsClient` to `request.analytics`. It automatically extracts the authenticated user's ID:

```python
def my_view(request):
    # user_id is already set from request.user
    request.analytics.track("action_performed")
```

### Django Auth Signals

`connect_auth_signals()` hooks into Django's auth signals to automatically track `user_logged_in`, `user_logged_out`, and `user_login_failed` events.

### FastAPI Dependency

Use `get_analytics` as a FastAPI dependency. It extracts `user_id` from `request.state.user_id` (set by your auth middleware) and `request_id` from the `X-Request-ID` header:

```python
from kanban_analytics.integrations.fastapi import Analytics

@app.get("/items")
async def list_items(analytics: Analytics):
    await analytics.track("items_listed")
```

### Flask Extension

The `AnalyticsExtension` registers `before_request` hooks that attach a scoped client to `flask.g.analytics`. It auto-detects `flask-login`'s `current_user`.

### Celery TrackedTask

Subclass `TrackedTask` and set `track=True` on individual tasks. The mixin wraps task execution to track start, completion (with duration), and failure events.

## Typed Event Catalog

Define your event schemas with `TypedDict` and get compile-time + runtime validation:

```python
from typing import TypedDict, Literal
from kanban_analytics import create_catalog

class PaymentCompleted(TypedDict):
    amount: float
    currency: str
    plan_id: str

class SubscriptionCreated(TypedDict):
    plan_id: str
    billing_interval: Literal["monthly", "annual"]

catalog = create_catalog({
    "payment_completed": PaymentCompleted,
    "subscription_created": SubscriptionCreated,
})

# Validated at runtime — raises ValidationError if wrong event name
# or missing required properties:
catalog.track(
    "payment_completed",
    user_id="u1",
    properties={"amount": 99.0, "currency": "USD", "plan_id": "pro"},
)
```

## Async Usage Guide — Common Pitfalls

### DO: Use `AsyncAnalyticsClient` in async code

```python
from kanban_analytics import async_analytics

async def handler():
    await async_analytics.track("event", user_id="u1")
```

### DON'T: Use the sync client in async code

```python
# BAD — blocks the event loop!
from kanban_analytics import analytics

async def handler():
    analytics.track("event", user_id="u1")  # blocks!
```

### DON'T: Use `asyncio.run()` inside async functions

The async client uses `asyncio.create_task()` for fire-and-forget. It never calls `asyncio.run()` or `loop.run_until_complete()` internally.

### DO: Use the async context manager for lifecycle

```python
async with AsyncAnalyticsClient() as client:
    await client.init("key", endpoint="...")
    await client.track("event", user_id="u1")
# shutdown() called automatically
```

## Environment Variables Reference

The SDK reads these environment variables for server context:

| Variable | Purpose |
|---|---|
| `ENV` | Environment name (e.g. `production`, `staging`) |
| `ENVIRONMENT` | Fallback for `ENV` |
| `DJANGO_ENV` | Fallback for `ENVIRONMENT` |

## Error Handling Reference

All SDK exceptions inherit from `AnalyticsError`:

| Exception | Code | Retryable | When |
|---|---|---|---|
| `AuthenticationError` | `AUTHENTICATION_ERROR` | No | Invalid API key (401) |
| `ValidationError` | `VALIDATION_ERROR` | No | Bad request / missing identity (400) |
| `NetworkError` | `NETWORK_ERROR` | Yes | Server errors (5xx), connection issues |
| `RateLimitError` | `RATE_LIMIT_ERROR` | Yes | Rate limited (429) |
| `TimeoutError` | `TIMEOUT_ERROR` | Yes | Request timed out |

```python
from kanban_analytics.errors import (
    AnalyticsError,
    AuthenticationError,
    ValidationError,
    NetworkError,
    RateLimitError,
    TimeoutError,
)

try:
    analytics.track("event", user_id="u1", await_response=True)
except RateLimitError as e:
    print(f"Rate limited, retry after {e.retry_after_ms}ms")
except NetworkError as e:
    print(f"Network error: {e} (status: {e.status_code})")
except AnalyticsError as e:
    print(f"Analytics error [{e.code}]: {e}")
```
