Metadata-Version: 2.4
Name: andor-data
Version: 0.1.0
Summary: A small pipeline-aware rate limiting libray for data engineering workflows
License: Copyright 2026 andor
        
        Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
        
        The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
        
        THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: requests
Dynamic: license-file

# andor

`andor` is a lightweight Python pipeline utility for running functions over a list of inputs with optional batching and throttling.

It is designed for workflows where you need to process many records while controlling request pacing, such as:

- API ingestion jobs
- data enrichment pipelines
- bulk transformation scripts

## Features

- Chainable pipeline configuration
- Batch-based execution
- Per-item randomized delay (throttling)
- Per-batch randomized pause
- Tuple input unpacking for multi-argument functions
- Built-in error handling that skips failed items and continues

## Installation

Install from source in editable mode:

```bash
pip install -e .
```

Or install dependencies only:

```bash
pip install -r requirements.txt
```

## Quick Start

```python
from andor.pipeline import Pipeline

data = [1, 2, 3, 4]

def square(x):
	return x * x

result = Pipeline(source=data).run(square)
print(result)
# [1, 4, 9, 16]
```

## Core Concepts

### 1) Source Data

The pipeline takes a list-like `source` and iterates through it.

Each item can be:

- a single value (passed as one argument)
- a tuple (unpacked into multiple function arguments)

### 2) Batching

Use `.batch(size=...)` to process inputs in chunks.

```python
from andor.pipeline import Pipeline

data = ["a", "b", "c", "d", "e"]

def process(value):
	return value.upper()

result = (
	Pipeline(source=data)
	.batch(size=2)
	.run(process)
)

print(result)
# ['A', 'B', 'C', 'D', 'E']
```

When batching is enabled, progress output is printed for each batch.

### 3) Throttling

Use `.throttle(delay=(min_seconds, max_seconds), pause=(min_seconds, max_seconds))` to slow processing.

- `delay`: sleep between each processed item
- `pause`: sleep between batches

```python
from andor.pipeline import Pipeline

items = [1, 2, 3, 4, 5, 6]

def identity(x):
	return x

result = (
	Pipeline(source=items)
	.batch(size=3)
	.throttle(delay=(0.2, 0.5), pause=(1.0, 2.0))
	.run(identity)
)
```

In this example:

- each item waits a random delay between `0.2` and `0.5` seconds
- each completed batch waits a random pause between `1.0` and `2.0` seconds

## Tuple Unpacking Example

If your source contains tuples, they are expanded into function arguments automatically.

```python
from andor.pipeline import Pipeline

pairs = [(2, 3), (4, 5), (10, 20)]

def add(a, b):
	return a + b

result = Pipeline(source=pairs).run(add)
print(result)
# [5, 9, 30]
```

## Error Handling Behavior

If a function call raises an exception, the pipeline:

1. prints an error message with the item index
2. skips the failed item
3. continues processing the rest of the data

This allows long-running jobs to complete even when some records fail.

```python
from andor.pipeline import Pipeline

data = [10, 5, 0, 2]

def divide_100_by(x):
	return 100 / x

result = Pipeline(source=data).run(divide_100_by)
print(result)
# [10.0, 20.0, 50.0]
# (division by zero is logged and skipped)
```
## Example

### Before:
```python
def run_funds_data_pipeline():

    print(f"--- STARTING FUNDS ANALYSIS (Catalog: {CATALOG_NAME}) ---", flush=True)
    processed_data = []
    fund_items = list(FUND_LIST.items())

    for i, (ticker, name) in enumerate(fund_items):
        try:
            record = analyze_fund(ticker, name)
            if record:
                processed_data.append(record)
        except Exception as e:
            print(f"Error analyzing {ticker}: {e}")
  

        delay = random.uniform(REQUEST_DELAY_MIN, REQUEST_DELAY_MAX)
        print(f"  Sleeping {delay:.1f}s...", flush=True)
        time.sleep(delay)

        if (i + 1) % BATCH_SIZE == 0 and (i + 1) < len(fund_items):
            batch_pause = random.uniform(BATCH_PAUSE_MIN, BATCH_PAUSE_MAX)
            print(f"\n--- Batch of {BATCH_SIZE} complete. Pausing {batch_pause:.1f}s ---\n", flush=True)
            time.sleep(batch_pause)

    if not processed_data:
        print("No data collected.")
        return
```

### The workflow now looks like this:

```python
fund_items = list(FUND_LIST.items())

pipeline = Pipeline(source=fund_items)
pipeline.batch(size=20)
pipeline.throttle(delay=(1.5, 4), pause=(1.5, 4))
results = pipeline.run(analyze_fund)
```


## API Reference

### `Pipeline(source)`

Creates a new pipeline instance.

- `source`: iterable collection of items to process

### `Pipeline.batch(size)`

Sets batch size and returns the same `Pipeline` instance.

- `size` (`int`): number of items per batch

### `Pipeline.throttle(delay=None, pause=None)`

Configures randomized wait times and returns the same `Pipeline` instance.

- `delay` (`tuple[float, float] | None`): per-item sleep range
- `pause` (`tuple[float, float] | None`): per-batch sleep range

### `Pipeline.run(func)`

Executes the pipeline and returns a list of successful results.

- `func`: callable to apply to each item

## Practical Pattern for API Workloads

```python
import requests
from andor.pipeline import Pipeline

ids = [101, 102, 103, 104, 105]

def fetch_record(record_id):
	response = requests.get(f"https://api.example.com/items/{record_id}", timeout=10)
	response.raise_for_status()
	return response.json()

records = (
	Pipeline(source=ids)
	.batch(size=2)
	.throttle(delay=(0.3, 0.7), pause=(1.5, 2.5))
	.run(fetch_record)
)
```

## Development

Run tests with:

```bash
pytest
```

## License

This project is licensed under the terms in [LICENSE](LICENSE).
