Metadata-Version: 2.4
Name: costflow
Version: 0.1.4
Summary: Cost estimation & scaling analysis for pandas pipelines
Author: Shradhit Subudhi
License: MIT
Project-URL: Homepage, https://pypi.org/project/costflow/
Keywords: pandas,performance,profiling,scaling,analytics
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering :: Information Analysis
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: numpy>=1.24
Requires-Dist: pandas>=2.0
Provides-Extra: mem
Requires-Dist: psutil>=5.9; extra == "mem"
Provides-Extra: dev
Requires-Dist: pytest>=7; extra == "dev"
Requires-Dist: ruff>=0.4; extra == "dev"
Provides-Extra: docs
Requires-Dist: mkdocs>=1.5; extra == "docs"
Dynamic: license-file

## CostFlow
[![PyPI](https://img.shields.io/pypi/v/costflow.svg)](https://pypi.org/project/costflow/)

CostFlow measures how your pandas pipelines scale. It runs a pipeline across multiple data sizes, traces expensive DataFrame ops (merge, groupby, sort, pivot, etc.), and fits simple complexity models so you can spot bottlenecks before the code hits production.

### Why use it
- See how wall time changes as rows grow and get quick projections for larger sizes.
- Identify the pandas operations consuming most of the traced time.
- Estimate memory needs (when `psutil` is installed) to avoid surprises on shared machines.

### Install
```bash
# from PyPI
pip install costflow
# for memory tracking (optional)
pip install "costflow[mem]"
# for plotting (optional)
pip install matplotlib
```

### Quickstart (synthetic data)
```python
import pandas as pd
from costflow import analyze
from costflow.report import pretty_print

def make_df(n: int) -> pd.DataFrame:
    import numpy as np
    rng = np.random.default_rng(0)
    return pd.DataFrame(
        {
            "customer_id": rng.integers(0, 100, size=n),
            "region": rng.choice(["NA", "EU", "APAC", "LATAM"], size=n),
            "quantity": rng.integers(1, 10, size=n),
            "price": rng.normal(50, 10, size=n).clip(5),
            "discount": rng.uniform(0, 0.3, size=n),
        }
    )

def pipeline(df):
    df["revenue"] = df["quantity"] * df["price"] * (1 - df["discount"])
    out = (
        df.groupby(["customer_id", "region"])
          .agg(total_revenue=("revenue", "sum"))
          .reset_index()
          .sort_values("total_revenue", ascending=False)
    )
    return out

report = analyze(
    pipeline_fn=pipeline,
    make_df=make_df,
    sizes=[1_000, 5_000, 10_000],
    trace_ops=True,
)
pretty_print(report)
```

### Quickstart (your real data)
If you already have real data, you can skip writing `make_df` and let CostFlow resample it for each size.

```python
import pandas as pd
from costflow import analyze_with_df
from costflow.report import pretty_print

base_df = pd.read_parquet("your_data.parquet")  # or read_csv/DB pull

sizes = [
    len(base_df) // 2,              # smaller downsample
    len(base_df),                   # full dataset
    min(len(base_df) * 2, 200_000)  # scaled up (capped)
]

report = analyze_with_df(
    pipeline_fn=pipeline,
    base_df=base_df,
    sizes=sizes,
    trace_ops=True,
    warmup=False,
)
pretty_print(report)
```

### Multiple DataFrames
If your pipeline takes multiple DataFrames, use `analyze_with_dfs` and keep your function signature unchanged:

```python
from costflow import analyze_with_dfs
from costflow.report import pretty_print

# Example: a lookup table derived from your main data
other_df = base_df[["customer_id", "region"]].drop_duplicates()

def pipeline_multi(df_left, df_right):
    return (
        df_left.merge(df_right, on="customer_id", how="left")
               .groupby(["customer_id", "region"])
               .agg(total_qty=("quantity", "sum"))
               .reset_index()
    )

report = analyze_with_dfs(
    pipeline_fn=pipeline_multi,
    base_dfs=[base_df, other_df],
    sizes=[10_000, 50_000, 100_000],
    trace_ops=True,
)
pretty_print(report)
```

### Many parameters (DataFrames + config)
If your pipeline already accepts multiple parameters (DataFrames, dicts, lists, strings, etc.),
use `analyze_with_inputs` to avoid writing `make_df` yourself.

```python
from costflow import analyze_with_inputs

def pipeline_many(df, lookup_df, config, cols, mode):
    out = df.merge(lookup_df, on="customer_id", how="left")
    out["score"] = out[cols].sum(axis=1) * config.get("multiplier", 1.0)
    if mode == "top":
        out = out.sort_values("score", ascending=False).head(1000)
    return out

report = analyze_with_inputs(
    pipeline_fn=pipeline_many,
    base_args=(base_df, other_df, {"multiplier": 0.8}, ["quantity", "price"], "top"),
    sizes=[10_000, 50_000, 100_000],
    trace_ops=True,
    scale="first",  # scale only the first DataFrame (common when lookup_df is fixed)
)
pretty_print(report)
```

### Plotting
If you installed `matplotlib`, you can plot time/memory vs. size:

```python
from costflow.report import plot_report

plot_report(report)
```

Tip: start with 2–4 increasing `sizes` so CostFlow can distinguish linear vs super-linear growth.

### Interpreting the report
- **Runs**: shows wall time and (when available) peak RSS for each dataset size.
- **Time model**: picks the candidate scaling model with the lowest RMSE; check `r2` to see how well it fits.
- **Memory model**: only populated when `psutil` is installed (`pip install "costflow[mem]"`). Without it, the memory fit is `N/A` and projections are omitted.
- **Dominant ops**: fraction of traced time spent in the most expensive pandas operations. Tracing is best-effort: common methods are wrapped explicitly, and other DataFrame/GroupBy methods are generically wrapped, but deep pandas internals (C/numba/numexpr) are not captured.
- **Projections**: extrapolations to common target sizes using the chosen model and the column count from your largest run. Treat these as coarse estimates—validate them with a real run when possible and prefer models with a reasonable `r2`.

### Tips for better signal
- Use at least two or three increasing sizes so the fit can distinguish between linear vs. super-linear growth.
- Set `trace_ops=True` to find bottlenecks, then re-run with `trace_ops=False` if you want to remove tracing overhead from timings.
- Keep pipelines functional: return a DataFrame/Series; avoid mutating global state.

### CLI
Analyze a pipeline from the command line (defaults: `pipeline`/`make_df` function names):
```bash
costflow --pipeline your_pipeline.py:pipeline --make-df your_pipeline.py:make_df --sizes 1000 5000 10000
```
- Add `--json` to emit machine-readable output.
- Use `--no-trace-ops` to remove tracing overhead when you just want wall-clock measurements.

### Limitations and guidance
- Tracing is shallow: pandas operations implemented in C/numexpr/numba are not visible, so dominant-op fractions reflect Python-level pandas calls.
- Tracing wraps most DataFrame and Series methods; common top-level pandas functions (`pd.concat`, `pd.merge`) are also traced.
- Projections assume the chosen model holds; verify with a real run near your target size before making infra decisions.
- Memory projections require `psutil`. Without it, memory fit is `N/A` and only wall-time projections are shown.

### Development
```bash
pip install ".[dev]"
pytest
```

### License
MIT (see LICENSE)
