Metadata-Version: 2.4
Name: tracepipe
Version: 0.2.0
Summary: Row-level data lineage tracking for pandas pipelines
Project-URL: Homepage, https://github.com/tracepipe/tracepipe
Project-URL: Documentation, https://github.com/tracepipe/tracepipe#readme
Project-URL: Repository, https://github.com/tracepipe/tracepipe.git
Project-URL: Issues, https://github.com/tracepipe/tracepipe/issues
Author: Gauthier Piarrette
License: MIT License
        
        Copyright (c) 2026 Gauthier Piarrette
        
        Permission is hereby granted, free of charge, to any person obtaining a copy
        of this software and associated documentation files (the "Software"), to deal
        in the Software without restriction, including without limitation the rights
        to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
        copies of the Software, and to permit persons to whom the Software is
        furnished to do so, subject to the following conditions:
        
        The above copyright notice and this permission notice shall be included in all
        copies or substantial portions of the Software.
        
        THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
        IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
        FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
        AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
        LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
        OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
        SOFTWARE.
License-File: LICENSE
Keywords: data-engineering,data-lineage,data-quality,debugging,observability,pandas
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.9
Requires-Dist: numpy>=1.20.0
Requires-Dist: pandas>=1.5.0
Provides-Extra: all
Requires-Dist: psutil>=5.9.0; extra == 'all'
Requires-Dist: pyarrow>=10.0.0; extra == 'all'
Provides-Extra: arrow
Requires-Dist: pyarrow>=10.0.0; extra == 'arrow'
Provides-Extra: dev
Requires-Dist: black>=23.0.0; extra == 'dev'
Requires-Dist: pre-commit>=3.5.0; extra == 'dev'
Requires-Dist: pytest-cov>=4.0.0; extra == 'dev'
Requires-Dist: pytest>=7.0.0; extra == 'dev'
Requires-Dist: ruff>=0.1.0; extra == 'dev'
Requires-Dist: taskipy>=1.12.0; extra == 'dev'
Provides-Extra: memory
Requires-Dist: psutil>=5.9.0; extra == 'memory'
Description-Content-Type: text/markdown

# 🔍 TracePipe

**Row-level data lineage tracking for pandas pipelines**

TracePipe tracks every row, every change, every step in your data pipelines. Point at any row and instantly see its complete transformation history.

## Why TracePipe?

Ever asked "Why did this row get dropped?" or "What happened to this user's data?" Traditional pipeline logging tells you *what operations ran*, but not *what happened to specific data points*.

TracePipe gives you **row-level provenance**:
- 🎯 **Track individual rows** through filters, transforms, and aggregations
- 📊 **Cell-level diffs** - see exactly what changed (e.g., `age: NaN → 30`)
- 🔗 **Aggregation lineage** - trace which source rows contributed to each group
- 🚫 **Zero code changes** - just enable and your pipeline is tracked

## Installation

```bash
pip install tracepipe

# With optional dependencies
pip install tracepipe[arrow]   # For Parquet/Arrow export
pip install tracepipe[all]     # All optional dependencies
```

## Quick Start

```python
import tracepipe
import pandas as pd

# Enable tracking
tracepipe.enable()
tracepipe.watch("age", "salary")  # Track cell-level changes for these columns

# Your normal pandas code
df = pd.DataFrame({
    "name": ["Alice", "Bob", "Charlie"],
    "age": [28, None, 35],
    "salary": [75000, 65000, None]
})

# Data cleaning
df = df.dropna()
df["salary"] = df["salary"] * 1.1  # Give a raise

# Query lineage
dropped = tracepipe.dropped_rows()
print(f"Dropped rows: {dropped}")  # [1, 2] - Bob and Charlie

row = tracepipe.explain(0)  # Alice's journey
print(row.history())
# [{'operation': 'DataFrame.__setitem__[salary]', 'col': 'salary',
#   'old_val': 75000.0, 'new_val': 82500.0, ...}]

# Export
tracepipe.save("lineage_report.html")
tracepipe.disable()
```

## Features

### 🎯 Row-Level Tracking

Every row gets a unique ID that persists through operations:

```python
tracepipe.enable()
df = pd.DataFrame({"a": [1, 2, 3, 4, 5]})

# Filter some rows
df = df[df["a"] > 2]

# Which rows were dropped?
dropped = tracepipe.dropped_rows()
print(dropped)  # [0, 1] - rows with a=1,2

# What happened to a specific row?
row = tracepipe.explain(2)  # Row with a=3
print(row.is_alive())  # True
```

### 📊 Cell-Level Diffs

Watch specific columns to track value changes:

```python
tracepipe.enable()
tracepipe.watch("age", "income")

df = pd.DataFrame({"age": [25, None, 35], "income": [50000, 60000, None]})
df["age"] = df["age"].fillna(30)

# What changed for row 1?
row = tracepipe.explain(1)
history = row.cell_history("age")
print(history)
# [{'col': 'age', 'old_val': None, 'new_val': 30.0, 'change_type': 'MODIFIED'}]
```

### 🔗 Aggregation Lineage

Trace back from aggregated results to source rows:

```python
tracepipe.enable()
df = pd.DataFrame({
    "department": ["Engineering", "Engineering", "Sales"],
    "salary": [80000, 90000, 70000]
})

summary = df.groupby("department").mean()

# Which rows contributed to the Engineering average?
group = tracepipe.explain_group("Engineering")
print(group.row_ids)  # [0, 1]
print(group.row_count)  # 2
```

### 📋 Pipeline Stages

Organize tracking by logical stages:

```python
with tracepipe.stage("cleaning"):
    df = df.dropna()
    df = df.fillna(0)

with tracepipe.stage("feature_engineering"):
    df["new_feature"] = df["a"] * df["b"]

# Steps are tagged with stage names
steps = tracepipe.steps()
for step in steps:
    print(f"{step['operation']} [{step['stage']}]")
```

### 📤 Export & Visualization

```python
# HTML report
tracepipe.save("report.html")
tracepipe.save("report.html", row_id=42)  # Specific row's journey

# JSON export
tracepipe.export_json("lineage.json")

# Parquet export (requires pyarrow)
tracepipe.export_arrow("lineage.parquet")
```

## API Reference

### Core Functions

| Function | Description |
|----------|-------------|
| `enable(config=None)` | Enable lineage tracking |
| `disable()` | Disable tracking and restore pandas |
| `reset()` | Clear all tracking state |
| `configure(**kwargs)` | Update configuration |

### Column Watching

| Function | Description |
|----------|-------------|
| `watch(*columns)` | Track cell-level changes for columns |
| `unwatch(*columns)` | Stop tracking columns |
| `register(df)` | Manually register a DataFrame |

### Query Functions

| Function | Description |
|----------|-------------|
| `explain(row_id)` | Get a row's complete lineage |
| `explain_group(group_key)` | Get aggregation group membership |
| `dropped_rows()` | List all dropped row IDs |
| `dropped_rows_by_step()` | Count dropped rows per operation |
| `steps()` | List all tracked operations |
| `stats()` | Get tracking statistics |

### Export Functions

| Function | Description |
|----------|-------------|
| `save(filepath)` | Save HTML report |
| `export_json(filepath)` | Export to JSON |
| `export_arrow(filepath)` | Export to Parquet |

### RowLineageResult Methods

| Method | Description |
|--------|-------------|
| `.is_alive` | True if row wasn't dropped (property) |
| `.dropped_at` | Operation that dropped the row (property) |
| `.history()` | Full event history |
| `.cell_history(col)` | Changes to specific column |
| `.gaps` | Lineage completeness info |

### GroupLineageResult Methods

| Method | Description |
|--------|-------------|
| `.row_ids` | List of contributing row IDs |
| `.row_count` | Number of rows in group |
| `.group_column` | Column used for grouping |
| `.aggregation_functions` | Functions applied |

## Tracked Operations

### Pandas DataFrame

**Filters** (track dropped rows):
- `dropna`, `drop_duplicates`, `query`, `head`, `tail`, `sample`
- `df[mask]` boolean indexing
- `df.drop(index=...)`

**Transforms** (track cell changes):
- `fillna`, `replace`, `astype`
- `df[col] = value` assignment

**Aggregations** (track group membership):
- `groupby().agg()`, `groupby().sum()`, `groupby().mean()`, etc.

**Index Operations**:
- `reset_index`, `set_index`, `sort_values`

**Other**:
- `copy`, `merge`, `join`, `pd.concat`

## Configuration

```python
from tracepipe import TracePipeConfig

config = TracePipeConfig(
    max_diffs_in_memory=500_000,     # Spill to disk above this
    max_diffs_per_step=100_000,      # Mark as "mass update" above this
    max_group_membership_size=100_000,  # Store count-only for large groups
    strict_mode=False,               # Raise on instrumentation errors
    warn_on_duplicate_index=True,    # Warn about ambiguous row identity
)

tracepipe.enable(config=config)
```

Environment variables:
- `TRACEPIPE_MAX_DIFFS` - Max diffs in memory
- `TRACEPIPE_STRICT` - Enable strict mode (`1`)
- `TRACEPIPE_AUTO_WATCH` - Auto-watch columns with nulls (`1`)

## Extensibility

TracePipe uses protocols for pluggable backends:

```python
from tracepipe import LineageBackend, RowIdentityStrategy

# Custom storage backend (e.g., SQLite)
class SQLiteBackend:
    """Implements LineageBackend protocol."""
    ...

# Custom engine support (e.g., Polars)
class PolarsRowIdentity:
    """Implements RowIdentityStrategy protocol."""
    ...

tracepipe.enable(backend=my_backend, identity=my_identity)
```

## Limitations

TracePipe v0.2.0 has some known limitations:

| Limitation | Behavior | Future |
|------------|----------|--------|
| `merge`/`concat` | Lineage reset (UNKNOWN completeness) |  |
| `apply`/`pipe` | Output tracked, internals unknown (PARTIAL) | Inherent |
| Series methods | Not tracked (e.g., `df['col'].str.upper()`) |  |
| `loc`/`iloc` | Not tracked |  |
| Very large datasets | May spill to disk | Configure thresholds |

**Tip**: For Series operations, the column assignment is tracked:
```python
# The str.upper() isn't tracked, but the assignment is
df['name'] = df['name'].str.upper()
```

## Performance & Benchmarks

### Key Insight: Overhead is ADDITIVE, not MULTIPLICATIVE

TracePipe adds a **fixed time cost** for row tracking and change detection. This overhead is **independent** of how long your pandas operations take. For pipelines with heavy computation (model training, I/O, complex aggregations), TracePipe overhead becomes negligible.

### Benchmark Results

**Test Configuration**: MacBook Pro M1, pandas 2.0+, 5M rows, 12 columns

#### Operation-Level Overhead

| Operation | Without TracePipe | With TracePipe | Overhead | Slowdown |
|-----------|-------------------|----------------|----------|----------|
| `drop_duplicates` (50K rows) | 45ms | 67ms | +22ms | 1.49x |
| `dropna` (50K rows) | 38ms | 56ms | +18ms | 1.47x |
| `fillna` (50K rows) | 52ms | 89ms | +37ms | 1.71x |
| Boolean filter `[mask]` (5M rows) | 2.1s | 3.8s | +1.7s | 1.81x |
| `drop_duplicates` (5M rows) | 3.2s | 5.9s | +2.7s | 1.84x |

#### End-to-End Pipeline Performance

**Small Dataset (50K rows)**:
```
WITHOUT TracePipe:  0.89 seconds
WITH TracePipe:     3.98 seconds (tracking 3 columns)
Overhead:          +3.09 seconds
Slowdown:           4.47x
```

**Large Dataset (5M rows)**:
```
WITHOUT TracePipe:  6.25 seconds
WITH TracePipe:    16.19 seconds (tracking 3 columns)
Overhead:          +9.94 seconds
Slowdown:           2.59x
```

#### Real-World Pipeline Scenarios

The overhead is **fixed** regardless of pipeline duration:

| Pipeline Type | Duration | TracePipe Overhead | Actual Slowdown |
|--------------|----------|-------------------|-----------------|
| Quick data cleaning | 10 seconds | +5 seconds | **1.5x** |
| ETL pipeline | 5 minutes | +10 seconds | **1.03x** |
| Feature engineering + model training | 1 hour | +15 seconds | **1.0004x** (0.04%) |
| Full ML workflow | 3 hours | +20 seconds | **< 1.0001x** (< 0.01%) |

**Why?** TracePipe only tracks data transformations. It does NOT slow down:
- ❌ Model training (scikit-learn, PyTorch, etc.)
- ❌ I/O operations (reading/writing files, databases)
- ❌ Network calls (APIs, distributed computing)
- ❌ Complex pandas aggregations (rolling windows, complex groupby)

### Memory Usage

- **Columnar storage**: ~40 bytes per diff
- **Example**: 1M cell changes = ~40 MB memory
- **Automatic spillover**: Configurable threshold (default: 500K diffs)
- **Mass update detection**: Skips cell diffs when threshold exceeded

### Production Recommendations

✅ **Safe for production** when:
- Pipeline takes > 1 minute
- You need debugging/audit capabilities
- Memory allows ~40 bytes per expected change

⚠️ **Consider overhead** when:
- Pipeline is < 30 seconds
- Running in tight loops (thousands of iterations)
- Extremely memory-constrained environment

### Configuration for Performance

```python
from tracepipe import TracePipeConfig

config = TracePipeConfig(
    max_diffs_in_memory=500_000,      # Reduce if memory-constrained
    max_diffs_per_step=100_000,       # Mass updates skip cell diffs
    max_group_membership_size=100_000, # Large groups store count-only
)

tracepipe.enable(config=config)

# Only watch columns you need to debug
tracepipe.watch("age", "income")  # Not all columns
```

### Running Benchmarks

```bash
# Operation-level benchmarks
python examples/benchmark_overhead.py

# Scale tests
python examples/demo_50k_scale_test.py
python examples/demo_5m_stress_test.py
```

## Use Cases

### Debugging Data Quality Issues
```python
# Which rows were dropped and why?
for step, count in tracepipe.dropped_rows_by_step().items():
    print(f"{step}: {count} rows dropped")
```

### Compliance & Audit
```python
# Export complete data lineage for audit
tracepipe.export_json("audit_trail.json")
```

### Understanding Aggregations
```python
# Which transactions contributed to this customer's total?
group = tracepipe.explain_group("customer_123")
for row_id in group.row_ids:
    print(tracepipe.explain(row_id).history())
```

## Development

```bash
# Install for development
pip install -e ".[dev]"

# Run tests
PYTHONPATH=. python -m pytest tests/ -v

# Run demo
PYTHONPATH=. python examples/demo_v2.py
```

## License

MIT License - see LICENSE file for details.

## Changelog

### v0.2.0 (2026-01-28)

**Major rewrite with row-level provenance:**
- Row identity tracking through all operations
- Cell-level diffs for watched columns
- Aggregation group membership tracking
- Thread-safe context (per-thread isolation)
- Protocol-based extensibility
- Memory-efficient columnar storage (SoA pattern)
- Automatic spillover to disk
- Safe instrumentation (never crashes user code)
