Metadata-Version: 2.4
Name: universe-reads
Version: 0.2.5
Summary: Fast parallel extraction for UniVerse databases using uopy
Author-email: Yanga Nkohla <yanga.nkohla@yahoo.com>
License-Expression: MIT
Classifier: Programming Language :: Python :: 3
Classifier: Operating System :: OS Independent
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Provides-Extra: uopy
Requires-Dist: uopy; extra == "uopy"
Provides-Extra: parquet
Requires-Dist: pyarrow>=12; extra == "parquet"
Provides-Extra: all
Requires-Dist: uopy; extra == "all"
Requires-Dist: pyarrow>=12; extra == "all"
Dynamic: license-file

# universe-reads

Fast parallel extraction for UniVerse databases using `uopy`. Reads large volumes of records with **5 concurrent sessions by default** (configurable), multiple output formats, streaming, record filtering, key-range partitioning, and file introspection.

## Features

- **Parallel reads** — configurable number of concurrent `uopy` sessions via multiprocessing (`spawn`)
- **Output formats** — NDJSON (default), CSV, and Parquet (via optional `pyarrow`)
- **Streaming** — pipe records directly to stdout (`--stream`) for Unix pipeline integration
- **Record filtering** — server-side `WHERE` / `SELECT` clauses pushed to UniVerse
- **Key-range partitioning** — `prefix` (ordinal) or `hash` strategies for balanced worker loads
- **File inspection** — `universe-reads inspect FILE` for quick metadata, field counts, and sample records
- **Enhanced metrics** — per-worker tabular summary with peak/average rates after every run
- **Retry with backoff** — configurable exponential backoff for transient RPC failures
- **Dry-run mode** — validate parallelism and sinks without a UniVerse connection

## Quick start

### 1) Install

```bash
python -m venv .venv
source .venv/bin/activate
pip install -e .
```

With optional dependencies:

```bash
# uopy (if available on PyPI)
pip install -e ".[uopy]"

# Parquet output support
pip install -e ".[parquet]"

# Everything
pip install -e ".[all]"
```

If `uopy` is *not* available on PyPI in your environment, install it using your internal method and keep the import name `uopy`.

For a quick run without installing the package:

```bash
PYTHONPATH=src python -m universe_reads --dry-run 10000 --count-only
```

### 2) Configure

`universe-reads` itself only needs:

- `UV_FILE` (optional if you pass `--file`)

You provide the UniVerse session creation via `--session-factory`.

## Session factory (important)

This project uses `multiprocessing` with the **spawn** start method, which means worker processes must be able to **import** whatever they need.

The recommended way to specify a session factory is a string import spec:

- `"module.submodule:callable_name"`

This is robust across processes and lets you reuse the same factory reference in many modules.

### A) Minimal factory module

Example factory module `my_uv.py`:

```python
import uopy


def make_session():
  return uopy.connect(host="...", account="...", user="...", password="...")
```

Then use it from the CLI:

```bash
UV_FILE=READS universe-reads --session-factory my_uv:make_session --count-only
```

Or programmatically:

```python
from universe_reads import run


results = run(
  file_name="READS",
  session_factory="my_uv:make_session",
  sessions=5,
  count_only=True,
)
```

### B) Why not pass a lambda/closure?

Avoid nested functions / lambdas for `session_factory` when using multiprocessing spawn.

Do NOT do:

```python
from universe_reads import run


def make_factory():
  def factory():
    ...
  return factory


run(file_name="READS", session_factory=make_factory())
```

Prefer a top-level function in an importable module and pass it as a string spec.

### C) Reuse across modules

If you'll use this in a bunch of places, put the spec string in one location:

```python
# settings.py
SESSION_FACTORY = "my_uv:make_session"
```

Then:

```python
from settings import SESSION_FACTORY
from universe_reads import run


run(file_name="READS", session_factory=SESSION_FACTORY)
```

Or use the built-in env-based factory:

```bash
--session-factory universe_reads.session_factory:make_session
```

### D) Pooling kwargs are optional (factory-friendly)

The CLI/library can pass pooling hints as keyword args:

- `pooling_on`
- `min_pool_size`
- `max_pool_size`

If your factory does not accept them, they are safely ignored.

Example of a kw-accepting factory:

```python
import uopy


def make_session(*, pooling_on=None, min_pool_size=None, max_pool_size=None):
  kwargs = {"user": "...", "password": "..."}
  if pooling_on:
    kwargs["min_pool_size"] = min_pool_size or 1
    kwargs["max_pool_size"] = max_pool_size or 10
  return uopy.connect(**kwargs)
```

### 3) Run

Count-only (fastest):

```bash
UV_FILE=READS universe-reads \
  --session-factory my_uv:make_session \
  --count-only
```

Write NDJSON files (one per worker):

```bash
UV_FILE=READS universe-reads \
  --session-factory my_uv:make_session \
  --output-dir out
```

Write CSV files:

```bash
UV_FILE=READS universe-reads \
  --session-factory my_uv:make_session \
  --output-dir out \
  --format csv
```

Write Parquet files (requires `pip install universe-reads[parquet]`):

```bash
UV_FILE=READS universe-reads \
  --session-factory my_uv:make_session \
  --output-dir out \
  --format parquet
```

Stream records to stdout as NDJSON (for piping):

```bash
UV_FILE=READS universe-reads \
  --session-factory my_uv:make_session \
  --stream | jq .
```

Stream as CSV:

```bash
UV_FILE=READS universe-reads \
  --session-factory my_uv:make_session \
  --stream --format csv | head -100
```

More sessions (if you have licenses/CPU to spare):

```bash
UV_FILE=READS universe-reads \
  --session-factory my_uv:make_session \
  --sessions 10 \
  --count-only
```

Dry run (no UniVerse connection) to validate parallelism:

```bash
universe-reads --dry-run 300000 --count-only
```

Smoke-test the session factory mechanism without needing UniVerse:

```bash
PYTHONPATH=src:. python -m tests.test_dry_run --smoke-session-factory
```

## Output formats

| Format   | Flag              | Extension  | Notes                             |
| -------- | ----------------- | ---------- | --------------------------------- |
| NDJSON   | `--format ndjson` | `.ndjson`  | Default. One JSON object per line |
| JSONL    | `--format jsonl`  | `.ndjson`  | Alias for ndjson                  |
| CSV      | `--format csv`    | `.csv`     | Header row + data rows            |
| Parquet  | `--format parquet`| `.parquet` | Requires `pyarrow>=12`            |

When `--output-dir` is set, one file per worker is written: `worker-0.ndjson`, `worker-1.csv`, etc.

When `--stream` is set, records are written to stdout in real time (NDJSON or CSV only). This is ideal for piping into `jq`, `grep`, `wc -l`, or another process.

## Record filtering

Push filtering to the UniVerse server to reduce the number of records transferred:

```bash
# WHERE clause appended to SELECT <file>
universe-reads --session-factory my_uv:make_session \
  --where "WITH STATUS = 'ACTIVE'" \
  --output-dir out

# Full SELECT override
universe-reads --session-factory my_uv:make_session \
  --select "SELECT STUDENTS WITH GPA >= '3.0'" \
  --output-dir out
```

Library equivalent:

```python
results = run(
  file_name="STUDENTS",
  session_factory="my_uv:make_session",
  where_clause="WITH STATUS = 'ACTIVE'",
)

# Or with a full SELECT override
results = run(
  file_name="STUDENTS",
  session_factory="my_uv:make_session",
  select_override="SELECT STUDENTS WITH GPA >= '3.0'",
)
```

## Key-range partitioning

By default, keys are distributed round-robin across workers. For skewed data, choose a partitioning strategy:

| Strategy | Flag                 | Description                                                   |
| -------- | -------------------- | ------------------------------------------------------------- |
| prefix   | `--partition prefix` | Assigns keys by the ordinal value of their first character    |
| hash     | `--partition hash`   | Assigns keys by a deterministic hash for even distribution    |

```bash
universe-reads --session-factory my_uv:make_session \
  --partition hash \
  --sessions 5 \
  --output-dir out
```

Library equivalent:

```python
results = run(
  file_name="READS",
  session_factory="my_uv:make_session",
  partition="hash",
  sessions=5,
)
```

## File inspection

Quickly inspect a UniVerse file without doing a full read — see record count, field counts, and sample records:

```bash
# Human-readable summary to stderr
universe-reads inspect ATTENDANCE \
  --session-factory my_uv:make_session

# Machine-readable JSON to stdout
universe-reads inspect ATTENDANCE \
  --session-factory my_uv:make_session \
  --json

# Control the number of samples
universe-reads inspect ATTENDANCE \
  --session-factory my_uv:make_session \
  --samples 10
```

Library equivalent:

```python
from universe_reads import inspect

info = inspect(
  file_name="ATTENDANCE",
  session_factory_spec="my_uv:make_session",
  sample_count=5,
)
print(info)
# {'file': 'ATTENDANCE', 'record_count': 312000, 'sample_keys': [...], ...}
```

## Enhanced metrics

After every run, a tabular summary is printed showing per-worker stats, peak rate, and average rate:

```
==============================================================
  Worker Stats
--------------------------------------------------------------
    worker     records    errors          rate     elapsed
--------------------------------------------------------------
       w-0     60,000         0    780,000/s       0.1s
       w-1     60,000         0    750,000/s       0.1s
       w-2     60,000         0    810,000/s       0.1s
--------------------------------------------------------------
     TOTAL    180,000         0    900,000/s       0.2s
  Peak worker rate: 810,000 rec/s
  Avg  worker rate: 780,000 rec/s
==============================================================
```

Access the summary programmatically:

```python
from universe_reads.metrics import format_summary

summary = format_summary(results, wall_seconds=elapsed)
print(summary)
```

## Connection pooling (optional)

If your `uopy` environment has pooling enabled (often via `uopy.ini`), you can keep it optional by enabling it only in your session factory.

Important caveats:
- Pooling is **per Python process**. Since this project uses multiprocessing for parallel sessions, each worker process has its **own pool**.
- Pooling is **not "multi-user" by itself**. A pool is typically keyed by connection parameters (host/account/user/password). Different users generally mean **separate pools**.

With the built-in factory you can enable pooling hints via CLI flags:

```bash
universe-reads \
  --session-factory universe_reads.session_factory:make_session \
  --pooling \
  --min-pool-size 1 \
  --max-pool-size 10 \
  --count-only
```

Or via env vars:

```bash
export UV_POOLING_ON=1
export UV_MIN_POOL_SIZE=1
export UV_MAX_POOL_SIZE=10

universe-reads --session-factory universe_reads.session_factory:make_session --count-only
```

## Use as a library

```python
from universe_reads import run


# Basic parallel read
results = run(
  file_name="READS",
  session_factory="my_uv:make_session",
  sessions=5,
  count_only=True,
)

# With output format, filtering, and partitioning
results = run(
  file_name="ATTENDANCE",
  session_factory="my_uv:make_session",
  sessions=5,
  output_dir="out",
  output_format="csv",
  where_clause="WITH STATUS = 'ACTIVE'",
  partition="hash",
)

# Stream to stdout
results = run(
  file_name="READS",
  session_factory="my_uv:make_session",
  stream=True,
  output_format="ndjson",
)

# Dry-run (no UniVerse needed)
results = run(file_name="", dry_run=300000)
```

## Arguments

### CLI (`universe-reads`)

#### Read mode (default)

| Flag | Default | Description |
| ---- | ------- | ----------- |
| `--file` | `UV_FILE` env | UniVerse file name to read |
| `--session-factory` | *(required unless `--dry-run`)* | Session factory in `module.sub:callable` form |
| `--sessions` | `5` | Number of concurrent sessions/processes |
| `--count-only` | off (implied when no `--output-dir` and no `--stream`) | Count records only |
| `--output-dir` | unset | Write one file per worker into this directory |
| `--format` | `ndjson` | Output format: `ndjson`, `jsonl`, `csv`, `parquet` |
| `--stream` | off | Stream records to stdout (NDJSON or CSV) |
| `--where` | unset | UniVerse WHERE/WITH clause, e.g. `"WITH STATUS = 'ACTIVE'"` |
| `--select` | unset | Full UniVerse SELECT statement override |
| `--partition` | unset (round-robin) | Key partitioning strategy: `prefix` or `hash` |
| `--batch-size` | `500` | Keys per worker IPC batch |
| `--progress-every` | `5.0` | Seconds between throughput logs |
| `--max-records` | unset | Debug cap |
| `--dry-run N` | unset | Generate N fake keys/records instead of connecting |
| `--retries` | `3` | Retries per batch/key on RPC failure (`0` disables) |
| `--retry-backoff` | `0.5` | Base delay in seconds between retries (doubles each attempt) |
| `--pooling` / `--no-pooling` | neither | Enable/disable pooling hints |
| `--min-pool-size` | unset | Minimum pool size hint |
| `--max-pool-size` | unset | Maximum pool size hint |

#### Inspect sub-command

```bash
universe-reads inspect FILE [--session-factory SPEC] [--samples N] [--json]
```

| Flag | Default | Description |
| ---- | ------- | ----------- |
| `FILE` | *(required)* | UniVerse file name to inspect |
| `--session-factory` | *(required)* | Session factory spec |
| `--samples` | `3` | Number of sample records to read |
| `--json` | off | Output as JSON to stdout |

### Library (`universe_reads.run`)

`run()` accepts the same knobs as the CLI, as keyword args:

- `file_name`
- `session_factory` (spec string or top-level callable; optional when `dry_run` is set)
- `sessions=5`
- `count_only=True`
- `output_dir=None`
- `output_format="ndjson"` — `"ndjson"` | `"jsonl"` | `"csv"` | `"parquet"`
- `stream=False`
- `where_clause=None`
- `select_override=None`
- `partition=None` — `"prefix"` | `"hash"` | `None`
- `batch_size=500`
- `progress_every_seconds=5.0`
- `max_records=None`
- `dry_run=None`
- `dry_run_data=None` — `{key: record}` fixtures for testing
- `retries=3`, `retry_backoff=0.5`
- `pooling_on=None`, `min_pool_size=None`, `max_pool_size=None`

### Library (`universe_reads.inspect`)

```python
from universe_reads import inspect

info = inspect(
  file_name="FILE",
  session_factory_spec="module:factory",
  sample_count=3,
)
```

Returns a dict with keys: `file`, `record_count`, `sample_keys`, `field_counts`, `sample_records`, `elapsed_seconds`.

### Built-in env-based session factory

If you use:

`--session-factory universe_reads.session_factory:make_session`

It supports these environment variables:

- Required: `UV_USER`, `UV_PASSWORD`
- Optional: `UV_HOST`, `UV_ACCOUNT`, `UV_PORT`, `UV_TIMEOUT`, `UV_SERVICE`, `UV_ENCODING`, `UV_SSL`
- Optional pooling hints: `UV_POOLING_ON`, `UV_MIN_POOL_SIZE`, `UV_MAX_POOL_SIZE`

## Testing

Run the test suite without a UniVerse connection:

```bash
PYTHONPATH=src:. python -m pytest tests/ -v
```

Individual test scripts can also be run directly:

```bash
# Dry-run harness (300K records, 5 workers)
PYTHONPATH=src:. python -m tests.test_dry_run

# Session factory smoke test
PYTHONPATH=src:. python -m tests.test_dry_run --smoke-session-factory

# Collector test (1M fixture records with attendance data)
PYTHONPATH=src:. python -m tests.test_collector

# Unit tests for individual modules
PYTHONPATH=src:. python tests/test_sinks.py
PYTHONPATH=src:. python tests/test_partitioning.py
PYTHONPATH=src:. python tests/test_streaming.py
PYTHONPATH=src:. python tests/test_metrics.py
PYTHONPATH=src:. python tests/test_inspect.py
PYTHONPATH=src:. python tests/test_cli_args.py
```

## Where to plug in real `uopy` calls

All UniVerse file I/O calls are isolated in:

- `src/universe_reads/uopy_adapter.py`

You manage session creation via `--session-factory`. The adapter implements:

- `iter_keys()` — select list key streaming
- `iter_keys_filtered()` — filtered key streaming with `WHERE` / `SELECT` clauses
- `read_record()` / `read_records()` — single and batch record reads
