Metadata-Version: 2.4
Name: dfchain
Version: 0.1.2
Summary: Lightweight library to stream Pandas workflows
Author-email: Alexander Wei <alexander-wei@users.noreply.github.com>
License: MIT
Project-URL: Homepage, https://alexander-wei.github.io/dfchain/
Project-URL: Repository, https://github.com/alexander-wei/dfchain
Project-URL: Documentation, https://alexander-wei.github.io/dfchain/
Project-URL: Bug Tracker, https://github.com/alexander-wei/dfchain/issues
Keywords: pandas,etl,dataframe,pipeline,sqlite
Classifier: Development Status :: 2 - Pre-Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.12
Description-Content-Type: text/markdown
Requires-Dist: pandas>=2
Requires-Dist: SQLAlchemy>=2

# dfchain

[![PyPI - Version](https://img.shields.io/pypi/v/dfchain?style=for-the-badge)](https://pypi.org/project/dfchain/)
&emsp;[![Static Badge](https://img.shields.io/badge/docs-html-6082B6?style=for-the-badge)](https://alexander-wei.github.io/dfchain/)
&emsp;[![Static Badge](https://img.shields.io/badge/github-src-8A9A5B?style=for-the-badge&logo=github)](https://github.com/alexander-wei/dfchain)

A small library for composing Pandas ETL task functions involving a DataFrame.

## Overview

- Library `dfchain` provides lightweight building blocks to implement ETL operations as Python functions and run them in a sequential pipeline.

## Requirements

- Python 3.12 or newer
  - pandas >= 2
  - SQLAlchemy >= 2

## Installation

From the project root:

0.  Install Python 3.12 or newer. Using [Anaconda](https://www.anaconda.com/download),

        conda create --name [env-name] python=3.12

1.  Create and activate a virtual environment (recommended),

        python -m venv .venv
        source .venv/bin/activate

2.  Install the package,

        python -m pip install .

## High-level API (overview)

The library provides a few simple moving parts to compose ETL logic as reusable functions.

- Pipeline
  - Class: dfchain.api.pipeline.Pipeline
  - Purpose: Compose and execute an ordered sequence of Transformations.
  - Usage: pipeline = Pipeline([step1, step2]); pipeline.run(executor)
  - Steps: callables following the Transformation protocol (see below).

- Transformation
  - Protocol: any callable matching (df: pd.DataFrame, executor: Executor, \*\*kwargs) -> pd.DataFrame
  - Note: The simple @task decorator adapts plain functions to this protocol.

- task decorator
  - Function: dfchain.api.task.task
  - Purpose: Adapt a simple function that operates on a pandas.DataFrame so
    it can be used as a Pipeline step. Supports both single-DataFrame and
    iterator-of-chunks input shapes.

- Executor (core abstraction)
  - Interface: dfchain.core.executor.Executor (abstract base)
  - Responsibilities:
    - Provide chunk iteration via iter_chunks() -> Iterator[(chunk_id, DataFrame)]
    - Optionally provide group semantics (groupkey, rebuild_groups, iter_groups)
    - Control eager vs non-eager update semantics via is_eager/is_inplace flags
    - Persist transformed chunks via write_chunk / update_group

- Provided executors
  - PandasExecutor: In-memory executor backed by a pandas.DataFrame.
  - SqliteExecutor: SQLite-backed executor that stores pickled chunk blobs
    and per-group aggregated blobs to enable chunked processing with a small
    on-disk footprint.

- Writing a pipeline
  - Define a list of step callables (functions decorated with @df_task or compatible with the Transformation protocol).
  - Construct a Pipeline and run it with a PandasExecutor.

  Example:

  ```python
  from dfchain import SqliteExecutor, Pipeline, task
  import pandas as pd

  @task
  def add_dark_border_flag(
      df: pd.DataFrame,
      score_col: str = "border_darkness_score",
      threshold: float = 0.8,
      output_col: str = "has_dark_border",
  ) -> pd.DataFrame:
    """Adds a boolean column indicating whether the border is very dark.

    Args:
        df: DataFrame containing a border darkness score column.
        score_col: Name of the column with border darkness scores in [0, 1].
        threshold: Threshold above which the border is considered dark.
        output_col: Name of output boolean column.

    Returns:
        pandas.DataFrame: Copy of df with new boolean column.
    """
    df = df.copy()
    df[output_col] = df[score_col] > threshold
    return df


  executor = (
    SqliteExecutor(chunksize=1_000)
    .session()
    .textFileReader(pd.read_csv("api_rawitem.csv", chunksize=1_000))
  )

  pipeline = Pipeline([add_border_darkness_score, add_dark_border_flag])
  pipeline.run(executor)

  merged_df = pd.concat(
    (df for _, df in executor.iter_chunks()),
    ignore_index=True
  ) # This concatenates in memory. You do not need to do this to save the DataFrame as a CSV
  ```

  The resulting DataFrame can be streamed by iterating over executor.iter_chunks(), or saved with

  ```python
  from dfchain import SqliteCsvWriter
  SqliteCsvWriter(executor).to_csv("api_rawitem_flags.csv", index=False)
  ```

## Examples

Operating on a CSV file containing base64 encoding images is memory-intensive. Using distributed execution like with `dfchain` reduces active RAM usage by doing it in chunks. Here is the before and after of an image manipulation task that removes dark borders from base64 images in a CSV file, using the opencv library.

<div style="display: flex; flex-direction: row; column-gap: 20px; row-gap: 20px; flex-wrap: wrap; margin-bottom: 20px;">
<img src="https://alexander-wei.github.io/blottertools/dist/remove_borders_before.png" width="600px"/>
<img src="https://alexander-wei.github.io/blottertools/dist/remove_borders_after.png" width="600px" />
</div>

## RAM Usage

By distributing pandas operations, only one chunk of a CSV needs to be held in memory. The graph below shows the maximum RAM usage of dfchain on a 4gb CSV file, depending on chunk_size. The orange line shows the usage when chunk_size is `100`.

<div style="display: flex; flex-direction: row; column-gap: 20px; row-gap: 20px; flex-wrap: wrap; margin-bottom: 20px;">
<img src="https://alexander-wei.github.io/blottertools/dist/tracemalloc_csv_chunksize.png" width="600px"/>
</div>
