Metadata-Version: 2.4
Name: wynd-dataingest
Version: 1.0.4
Summary: High-performance Python client for wynd data ingest.
Author: Veeresh Kumar
License: MIT
Keywords: data,ingestion,wynd
Requires-Python: >=3.9
Requires-Dist: httpx<1.0,>=0.27
Provides-Extra: test
Requires-Dist: pytest>=8.0; extra == 'test'
Description-Content-Type: text/markdown

# wynd-dataingest

High-performance Python client for wynd data ingest.

It is built for services that want:

- HTTP connection reuse through `httpx`
- explicit sync and async NDJSON clients
- producer-style internal batching with linger-based flushing
- retries with exponential backoff and jitter
- optional gzip compression based on raw payload size
- publish-ready packaging for `pip install`

## Install

```bash
pip install wynd-dataingest
```

## Quick Start

### Sync client

```python
from wynd_dataingest import CompressionOptions, WyndDataIngestClient

client = WyndDataIngestClient(
    url="http://127.0.0.1:8686/events",
    connections=16,
    table_name="orders",
    table_operation="insert",
    compression=CompressionOptions(gzip=True),
    headers={
        "authorization": "Basic my-token",
    },
)

result = client.send(
    [
        {"service": "api", "level": "info", "message": "order accepted"},
        {"service": "api", "message": "batch item 1"},
        {"service": "api", "message": "batch item 2"},
    ]
)

print(result.status_code)
client.close()
```

`send()` expects a sequence and emits newline-delimited JSON.

### Async client

```python
from wynd_dataingest import AsyncWyndDataIngestClient, CompressionOptions


async def ingest() -> None:
    client = AsyncWyndDataIngestClient(
        url="http://127.0.0.1:8686/events",
        compression=CompressionOptions(gzip=True),
    )
    try:
        result = await client.send(
            [
                {"service": "api", "message": "batch item 1"},
                {"service": "api", "message": "batch item 2"},
            ]
        )

        print(result.status_code)
    finally:
        await client.aclose()
```

## API

### `WyndDataIngestClient(...)`

Supported constructor options:

- `url`: full endpoint URL
- `connections`: max pooled HTTP connections; defaults to `16`
- `table_name`: base table name for the `table-name` header
- `table_operation`: `"insert" | "new" | "update" | "delete"`; defaults to `"insert"` when `table_name` is set
- `headers`: default request headers
- `timeout_ms`: per-request timeout; defaults to `30000`
- `user_agent`: override the default user agent
- `retry`: `RetryOptions`
- `compression`: `CompressionOptions`
- `batching`: `BatchingOptions`

### Sending data

The client exposes:

- `send(payload, options=None)`
- `close()`
- `AsyncWyndDataIngestClient.send(payload, options=None)`
- `AsyncWyndDataIngestClient.aclose()`

`send()` accepts:

- a sequence payload, which is serialized as newline-delimited JSON
- `bytes` or `bytearray`, which are sent as-is when you already have NDJSON-encoded bytes

When `batching` is enabled, multiple `send()` calls can be coalesced into one HTTP request.

### Internal Batching

Both sync and async clients support producer-style internal batching:

```python
from wynd_dataingest import BatchingOptions, WyndDataIngestClient

client = WyndDataIngestClient(
    url="http://127.0.0.1:8686/events",
    batching=BatchingOptions(
        linger_ms=1000,
        max_records=500,
        max_bytes=5 * 1024 * 1024,
        max_inflight_batches=4,
    ),
)
```

Flush happens when the first of these limits is hit:

- `linger_ms`: max time to wait before sending buffered records
- `max_records`: max number of buffered records in one request
- `max_bytes`: max raw NDJSON bytes before compression
- `max_inflight_batches`: max number of batch requests that can be in flight at the same time

Default batching values:

- `linger_ms`: `1000`
- `max_records`: `500`
- `max_bytes`: `5MB`
- `max_inflight_batches`: `4`

For the sync client, batching also does a best-effort flush during common Python shutdown paths such as `atexit`, `SIGINT`, `SIGTERM`, and uncaught exceptions. The async client still relies on `await client.aclose()` for the strongest shutdown guarantee.
When `max_inflight_batches` is greater than `1`, later batches do not wait for earlier batches to finish, but strict batch ordering is no longer guaranteed.

### Table Routing Header

When `table_name` is configured, the client automatically adds a `table-name` header:

- `table_operation="insert"` or `"new"` -> `table-name: <table_name>`
- `table_operation="update"` -> `table-name: <table_name>-updates`
- `table_operation="delete"` -> `table-name: <table_name>-deletes`

### Compression

Enable gzip with built-in defaults:

```python
from wynd_dataingest import CompressionOptions, WyndDataIngestClient

client = WyndDataIngestClient(
    url="http://127.0.0.1:8686/events",
    compression=CompressionOptions(gzip=True),
)
```

That uses the default gzip threshold of `100KB` raw bytes.

Or customize it:

```python
from wynd_dataingest import CompressionOptions, GzipCompressionOptions, WyndDataIngestClient

client = WyndDataIngestClient(
    url="http://127.0.0.1:8686/events",
    compression=CompressionOptions(
        gzip=GzipCompressionOptions(min_bytes=4096, level=6)
    ),
)
```

## Run Tests

Install test dependencies:

```bash
./.venv/bin/python -m pip install -e ".[test]"
```

Run the full test suite:

```bash
./.venv/bin/python -m pytest
```

Run only the client tests:

```bash
./.venv/bin/python -m pytest tests/test_wynd_dataingest_client.py
```

## Publish To PyPI

Build locally:

```bash
python3 -m pip install --upgrade build
python3 -m build
```

Upload to PyPI:

```bash
python3 -m pip install --upgrade twine
python3 -m twine upload dist/*
```

Then users can install it with:

```bash
pip install wynd-dataingest
```
