Metadata-Version: 2.4
Name: moltres
Version: 0.8.0
Summary: DataFrame API with SQL pushdown execution and real SQL CRUD - the missing layer for SQL in Python
Author-email: Odos Matthews <odosmatthews@gmail.com>
License-Expression: MIT
Project-URL: Homepage, https://github.com/eddiethedean/moltres
Project-URL: Repository, https://github.com/eddiethedean/moltres
Project-URL: Issues, https://github.com/eddiethedean/moltres/issues
Keywords: dataframe,sql,crud,pushdown,etl,data-engineering,sqlalchemy,pandas,polars,spark,database,query-builder
Requires-Python: >=3.9
Description-Content-Type: text/markdown
Requires-Dist: SQLAlchemy>=2.0
Requires-Dist: typing-extensions>=4.5
Provides-Extra: polars
Requires-Dist: polars>=1.0; extra == "polars"
Provides-Extra: pandas
Requires-Dist: pandas>=2.1; extra == "pandas"
Provides-Extra: async
Requires-Dist: aiofiles>=23.0; extra == "async"
Requires-Dist: greenlet>=3.0.0; extra == "async"
Provides-Extra: async-postgresql
Requires-Dist: aiofiles>=23.0; extra == "async-postgresql"
Requires-Dist: asyncpg>=0.29.0; extra == "async-postgresql"
Requires-Dist: greenlet>=3.0.0; extra == "async-postgresql"
Provides-Extra: async-mysql
Requires-Dist: aiofiles>=23.0; extra == "async-mysql"
Requires-Dist: aiomysql>=0.2.0; extra == "async-mysql"
Requires-Dist: greenlet>=3.0.0; extra == "async-mysql"
Provides-Extra: async-sqlite
Requires-Dist: aiofiles>=23.0; extra == "async-sqlite"
Requires-Dist: aiosqlite>=0.19.0; extra == "async-sqlite"
Requires-Dist: greenlet>=3.0.0; extra == "async-sqlite"
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: pytest-cov>=4.1; extra == "dev"
Requires-Dist: pytest-xdist>=3.5; extra == "dev"
Requires-Dist: mypy>=1.8; extra == "dev"
Requires-Dist: sphinx>=7.0; extra == "dev"
Requires-Dist: sphinx-rtd-theme>=2.0; extra == "dev"
Requires-Dist: ruff>=0.6; extra == "dev"
Requires-Dist: pre-commit>=3.5; extra == "dev"
Requires-Dist: pandas>=2.1; extra == "dev"
Requires-Dist: pandas-stubs>=2.1; extra == "dev"
Requires-Dist: polars>=1.0; extra == "dev"
Requires-Dist: pyarrow>=10.0; extra == "dev"
Requires-Dist: aiofiles>=23.0; extra == "dev"
Requires-Dist: aiosqlite>=0.19.0; extra == "dev"
Requires-Dist: greenlet>=3.0.0; extra == "dev"
Requires-Dist: testing.postgresql>=1.3.0; extra == "dev"
Requires-Dist: testing.mysqld>=1.4.0; extra == "dev"
Requires-Dist: psycopg2-binary>=2.9.0; extra == "dev"
Requires-Dist: pymysql>=1.0.0; extra == "dev"

# Moltres

<div align="center">

[![CI](https://github.com/eddiethedean/moltres/actions/workflows/ci.yml/badge.svg)](https://github.com/eddiethedean/moltres/actions/workflows/ci.yml)
[![Python 3.9+](https://img.shields.io/badge/python-3.9+-blue.svg)](https://github.com/eddiethedean/moltres)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://github.com/eddiethedean/moltres/blob/main/LICENSE)
[![Code style: ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)

**The Missing DataFrame Layer for SQL in Python**

**MOLTRES**: **M**odern **O**perations **L**ayer for **T**ransformations, **R**elational **E**xecution, and **S**QL

[Installation](#-installation) • [Quick Start](#-quick-start) • [Documentation](#-documentation) • [Why Moltres?](#-why-moltres)

</div>

---

**Moltres** fills a major gap in the Python data ecosystem: it's the **only** library that combines a **DataFrame API** (like Pandas/Polars), **SQL pushdown execution** (no data loading into memory), and **real SQL CRUD operations** (INSERT, UPDATE, DELETE) in one unified interface.

Transform millions of rows using familiar DataFrame operations—all executed directly in SQL without materializing data. Update, insert, and delete with column-aware, type-safe operations. No juggling between Pandas, SQLAlchemy, and raw SQL. Just one library that does it all.

## 📑 Table of Contents

- [Features](#-features)
- [What Makes Moltres Unique](#-what-makes-moltres-unique)
- [Installation](#-installation)
- [Quick Start](#-quick-start)
- [Why Moltres?](#-why-moltres)
- [Core Concepts](#-core-concepts)
- [Reading Data](#-reading-data)
- [Writing Data](#-writing-data)
- [Streaming for Large Datasets](#-streaming-for-large-datasets)
- [Table Management](#️-table-management)
- [Data Mutations](#️-data-mutations)
- [Result Formats](#-result-formats)
- [Configuration](#️-configuration)
- [Performance Monitoring](#-performance-monitoring)
- [Security](#-security)
- [Advanced Examples](#-advanced-examples)
- [Supported Operations](#️-supported-operations)
- [Development](#-development)
- [Documentation](#-documentation)
- [Contributing](#-contributing)

## ✨ Features

- ✏️ **Real SQL CRUD** - INSERT, UPDATE, DELETE operations with DataFrame-style syntax
- 🚀 **DataFrame API** - Familiar operations (select, filter, join, groupBy, etc.) like Pandas/Polars
- 🗄️ **SQL Pushdown Execution** - All operations compile to SQL and run on your database—no data loading into memory
- 📊 **Operates Directly on SQL Tables** - Transform tables without materialization
- 🌊 **Streaming Support** - Handle datasets larger than memory with chunked processing
- 📊 **Multiple Formats** - Read/write CSV, JSON, JSONL, Parquet, and more
- 🔧 **Type Safe** - Full type hints with strict mypy checking and custom type stubs for dependencies
- 🎯 **Zero Dependencies** - Works with just SQLAlchemy (pandas/polars optional)
- 🔒 **Security First** - Built-in SQL injection prevention and validation
- ⚡ **Performance Monitoring** - Optional hooks for query performance tracking
- 🌍 **Environment Config** - Configure via environment variables for 12-factor apps
- ⚡ **Async Support** - Full async/await support for all operations (optional dependency)

## 🔥 What Makes Moltres Unique

Moltres is the **only** Python library that provides:

| Feature | Pandas/Polars | Ibis | SQLAlchemy | SQLModel | **Moltres** |
|--------|----------------|------|-------------|-----------|-------------|
| DataFrame API | ✔ | ✔ | ❌ | ❌ | **✔** |
| SQL Pushdown Execution | ❌ | ✔ | ✔ | ✔ | **✔** |
| **Row-Level INSERT/UPDATE/DELETE** | ❌ | ❌ | ✔ | ✔ | **✔** |
| Lazy query building | ✔ (Polars) | ✔ | ⚠️ | ⚠️ | **✔** |
| Operates directly on SQL tables | ⚠️ limited | ✔ | ✔ | ✔ | **✔** |
| Column-oriented transformations | ✔ | ✔ | ❌ | ❌ | **✔** |

**The combination of DataFrame API + SQL pushdown + CRUD does not exist anywhere else in Python.**

### Key Differentiators

- **Only library with DataFrame API + SQL pushdown + CRUD** - No other Python library offers this combination
- **No data loading into memory for transformations** - All DataFrame operations execute directly in SQL
- **Works with existing SQL infrastructure** - No cluster required, works with SQLite, PostgreSQL, MySQL, and more
- **Type-safe CRUD operations** - Validated, column-aware INSERT, UPDATE, DELETE with DataFrame-style syntax
- **SQL-first design** - Focuses on providing full SQL feature support through a DataFrame API, not replicating every PySpark feature. Features are included only if they map to SQL/SQLAlchemy capabilities and align with SQL pushdown execution.

## 🆕 What's New

### Version 0.8.0

- **Lazy CRUD and DDL Operations** - All DataFrame CRUD and DDL operations are now lazy, requiring an explicit `.collect()` call for execution. This improves composability and aligns with PySpark's lazy evaluation model.
- **Transaction Management** - All operations within a single `.collect()` call are part of a single session that rolls back all changes if any failure occurs.
- **Batch Operation API** - New `db.batch()` context manager to queue multiple lazy operations and execute them together within a single transaction.
- **Type Checking Improvements** - Added pandas-stubs for proper mypy type checking and fixed type compatibility issues.

### Version 0.7.0

- **PostgreSQL and MySQL Testing Infrastructure** - Comprehensive test support for multiple database backends with ephemeral database instances, database fixtures, and extensive test coverage
- **Enhanced Type Safety** - Type overloads for `collect()` methods with improved type inference and better IDE support
- **Improved Code Quality** - Fixed all mypy type checking errors and ruff linting issues, with comprehensive type annotations throughout
- **Database Connection Management** - Added `close()` methods to `Database` and `AsyncDatabase` classes for proper resource cleanup
- **Cross-Database Compatibility** - Fixed PostgreSQL and MySQL-specific issues with JSON extraction, array functions, and async DSN parsing
- **Test Coverage** - 301 passing tests across SQLite, PostgreSQL, and MySQL with async support

### Version 0.6.0

- **Null Handling Convenience Methods** - New `na` property on DataFrame: `df.na.drop()` and `df.na.fill(value)` for convenient null handling
- **Random Sampling** - New `sample(fraction, seed=None)` method for random row sampling with dialect-specific SQL compilation
- **Enhanced Type System** - New data types: `decimal()`, `uuid()`, `json()`, and `interval()` helpers with full SQL support and dialect-specific compilation
- **Interval Arithmetic** - New `date_add()` and `date_sub()` functions for date/time interval operations
- **Join Hints** - New `hints` parameter for `join()` method to provide query optimization hints
- **Complex Join Conditions** - Enhanced `join()` method to support arbitrary Column expressions in join conditions
- **Query Plan Analysis** - New `explain(analyze=False)` method to return query execution plans
- **Pivot Operations** - New `pivot()` method for data reshaping with cross-dialect compatibility

### Version 0.5.0

- **Compressed File Reading** - Automatic detection and support for gzip, bz2, and xz compression in CSV, JSON, JSONL, and text file readers (both sync and async)
- **Array/JSON Functions** - New functions for working with JSON and array data: `json_extract()`, `array()`, `array_length()`, `array_contains()`, `array_position()` with dialect-specific SQL compilation
- **Collect Aggregations** - New aggregation functions `collect_list()` and `collect_set()` for array aggregation (uses `ARRAY_AGG` in PostgreSQL, `group_concat` in SQLite/MySQL)
- **Semi-Join and Anti-Join** - New `semi_join()` and `anti_join()` methods that compile to efficient `EXISTS`/`NOT EXISTS` subqueries
- **MERGE/UPSERT Operations** - New `merge()` method on tables for upsert operations with dialect-specific support (SQLite `ON CONFLICT`, PostgreSQL `MERGE`, MySQL `ON DUPLICATE KEY`)
- **Comprehensive Test Coverage** - All new features include full test coverage with execution tests

<details>
<summary><b>Previous Releases</b></summary>

### Version 0.4.0
- **Strict Type Checking** - Full mypy strict mode compliance with comprehensive type annotations
- **Type Stubs for PyArrow** - Custom type stubs to provide type information for pyarrow library
- **PEP 561 Compliance** - Added `py.typed` marker file
- **Enhanced Type Safety** - Complete type annotations with improved type inference

### Version 0.3.0
- **Separation of File Reads and SQL Operations** - File readers return `Records` instead of `DataFrame`
- **Records Class** - New `Records` and `AsyncRecords` classes for file data
- **Full Async/Await Support** - Complete async API for all operations
- **Async Streaming** - Process large datasets asynchronously

### Version 0.2.0
- **Environment Variable Support** - Configure via environment variables
- **Performance Monitoring Hooks** - Track query execution time
- **Enhanced Security** - Comprehensive SQL injection prevention
- **Modular Architecture** - Refactored file readers

</details>

## 📦 Installation

### Requirements

- Python 3.9+
- SQLAlchemy 2.0+ (for database connectivity)
- A supported SQLAlchemy driver (SQLite, PostgreSQL, MySQL, etc.)

### Install Moltres

```bash
pip install moltres
```

For optional dependencies:

```bash
# For pandas support
pip install moltres[pandas]

# For polars support
pip install moltres[polars]

# For async support (requires async database drivers)
pip install moltres[async]  # Core async support (aiofiles)
pip install moltres[async-postgresql]  # PostgreSQL async (includes async + asyncpg)
pip install moltres[async-mysql]  # MySQL async (includes async + aiomysql)
pip install moltres[async-sqlite]  # SQLite async (includes async + aiosqlite)

# For both pandas and polars
pip install moltres[pandas,polars]
```

## 🚀 Quick Start

```python
from moltres import col, connect
from moltres.expressions.functions import sum

# Connect to your database
db = connect("sqlite:///example.db")

# DataFrame operations with SQL pushdown (no data loading into memory)
df = (
    db.table("orders")
    .select()
    .join(db.table("customers").select(), on=[("customer_id", "id")])
    .where(col("active") == True)  # noqa: E712
    .group_by("country")
    .agg(sum(col("amount")).alias("total_amount"))
)

# Execute and get results (SQL is compiled and executed here)
results = df.collect()  # Returns list of dicts by default

# CRUD operations with DataFrame-style syntax (lazy execution)
customers = db.table("customers")

# Insert rows (batch optimized) - requires .collect() to execute
customers.insert([
    {"id": 1, "name": "Alice", "email": "alice@example.com", "active": 1},
    {"id": 2, "name": "Bob", "email": "bob@example.com", "active": 0},
]).collect()

# Update rows - requires .collect() to execute
customers.update(
    where=col("active") == 0,
    set={"active": 1, "updated_at": "2024-01-01"}
).collect()

# Delete rows - requires .collect() to execute
customers.delete(where=col("email").is_null()).collect()
```

### Async Support

Moltres also supports async/await for all database operations:

```python
import asyncio
from moltres import async_connect, col

async def main():
    # Connect asynchronously
    db = async_connect("postgresql+asyncpg://user:pass@localhost/db")
    
    # All operations are async
    # For SQL operations, use db.table().select()
    df = db.table("orders").select()
    results = await df.collect()
    
    # Streaming support
    async for chunk in await df.collect(stream=True):
        process_chunk(chunk)
    
    await db.close()

asyncio.run(main())
```

**Note:** Async support requires async database drivers. Install with:
- `pip install moltres[async-postgresql]` for PostgreSQL (includes async + asyncpg)
- `pip install moltres[async-mysql]` for MySQL (includes async + aiomysql)
- `pip install moltres[async-sqlite]` for SQLite (includes async + aiosqlite)

## 💡 Why Moltres?

### The Gap in Python's Ecosystem

Python has powerful DataFrame tools (Pandas, Polars) and powerful SQL tools (SQLAlchemy, SQLModel), but **no library connects them in a unified, ergonomic way**.

**The problem:** Developers must juggle:
- Pandas or Polars for DataFrame transformations (but data must be loaded into memory)
- SQLAlchemy/ORMs for persistence (but not DataFrame-style)
- Raw SQL for updates/deletes (but not type-safe or composable)

**Moltres fixes this** by providing:
- ✅ **DataFrame API** - Transform data with familiar operations (select, filter, join, groupBy)
- ✅ **SQL Pushdown Execution** - All operations compile to SQL and run on your database—**no data loading into memory**
- ✅ **Real SQL CRUD** - INSERT, UPDATE, DELETE with DataFrame-style syntax
- ✅ **Works with Existing SQL Infrastructure** - No cluster required, works with SQLite, PostgreSQL, MySQL, and more
- ✅ **Type Safe** - Full type hints for better IDE support and fewer bugs
- ✅ **Production Ready** - Environment variables, connection pooling, monitoring hooks
- ✅ **Secure by Default** - SQL injection prevention built-in

### Who Needs Moltres?

- **Data Engineers** - Avoid loading millions of rows into memory just to update a subset
- **Backend Developers** - Replace many ORM operations with cleaner, column-aware DataFrame syntax
- **Analytics Engineers / dbt Users** - Express SQL models in Python code with DataFrame chaining
- **Product Engineers** - Validated, type-safe CRUD without hand-writing SQL
- **Teams migrating off Spark** - Familiar DataFrame API style for traditional SQL databases—no cluster required. Note: Moltres focuses on SQL features, not PySpark feature parity. Features are included only if they map to SQL capabilities.

## 📖 Core Concepts

> **Design Philosophy:** Moltres provides a DataFrame API that compiles to SQL. We focus on supporting SQL features (standard SQL and common dialect extensions) rather than replicating every PySpark feature. If a feature doesn't map to SQL/SQLAlchemy or doesn't align with SQL pushdown execution, it's not included.

### Lazy Evaluation

All DataFrame operations are lazy—they build a logical plan that only executes when you call `collect()`. CRUD operations (insert, update, delete, merge) and DDL operations (create_table, drop_table) are also lazy and require `.collect()` to execute. The plan is compiled to SQL and executed on your database:

```python
# This doesn't execute any SQL yet
df = db.table("users").select().where(col("age") > 18)

# SQL is compiled and executed here
results = df.collect()
```

### Column Expressions

Build complex expressions using column operations:

```python
from moltres.expressions.functions import sum, avg, count, concat

df = (
    db.table("sales")
    .select(
        col("product"),
        (col("price") * col("quantity")).alias("revenue"),
        concat(col("first_name"), lit(" "), col("last_name")).alias("full_name"),
    )
    .where(col("date") >= "2024-01-01")
    .group_by("product")
    .agg(
        sum(col("revenue")).alias("total_revenue"),
        avg(col("price")).alias("avg_price"),
        count("*").alias("order_count"),
    )
)
```

## 📥 Reading Data

### From Database Tables

```python
# For SQL operations, use db.table().select()
df = db.table("customers").select()
df = db.table("customers").select("id", "name", "email")
```

### From Files

Moltres supports loading data from various file formats. **File readers return `Records`, not `DataFrame`** - this makes it clear that file data is materialized and not suitable for SQL operations.

```python
# CSV files - returns Records
records = db.load.csv("data.csv")
records = db.load.option("delimiter", "|").csv("pipe_delimited.csv")
records = db.load.option("header", False).schema([...]).csv("no_header.csv")

# JSON files - returns Records
records = db.load.json("data.json")  # Array of objects
records = db.load.jsonl("data.jsonl")  # One JSON object per line

# Parquet files (requires pandas and pyarrow) - returns Records
records = db.load.parquet("data.parquet")

# Text files (one line per row) - returns Records
records = db.load.text("log.txt", column_name="line")

# Generic format reader - returns Records
records = db.load.format("csv").option("header", True).load("data.csv")

# Records can be used directly with insert operations (lazy operation)
table.insert(records).collect()  # Records implements Sequence protocol, requires .collect() to execute
# Or use the convenience method (calls .collect() internally)
records.insert_into("table_name")  # Executes immediately

# Access data
rows = records.rows()  # Get all rows as a list
for row in records:  # Iterate directly
    process(row)
```

### Schema Inference and Explicit Schemas

Schema is automatically inferred from data, but you can provide explicit schemas:

```python
from moltres.table.schema import ColumnDef

schema = [
    ColumnDef(name="id", type_name="INTEGER"),
    ColumnDef(name="name", type_name="TEXT"),
    ColumnDef(name="score", type_name="REAL"),
]

records = db.load.schema(schema).csv("data.csv")
```

**File Format Options:**
- **CSV**: `header` (default: True), `delimiter` (default: ","), `inferSchema` (default: True)
- **JSON**: `multiline` (default: False) - if True, reads as JSONL
- **Parquet**: Requires `pandas` and `pyarrow`

> **Important:** File readers (`db.load.*`) return `Records`, not `DataFrame`. Records are materialized data containers that can be:
> - Iterated directly: `for row in records: ...`
> - Converted to a list: `rows = records.rows()`
> - Used with insert operations: `table.insert(records).collect()` or `records.insert_into("table")` (insert_into executes immediately)
> 
> For SQL operations (select, filter, join, etc.), use `db.table(name).select()` to get a DataFrame.

## 📤 Writing Data

### To Database Tables

```python
# Write with automatic schema inference and table creation
df.write.save_as_table("target_table")

# Write modes
df.write.mode("append").save_as_table("target")  # Add to existing (default)
df.write.mode("overwrite").save_as_table("target")  # Replace contents
df.write.mode("error_if_exists").save_as_table("target")  # Fail if exists

# Insert into existing table (table must exist)
df.write.insertInto("existing_table")

# With explicit schema
from moltres.table.schema import ColumnDef
schema = [
    ColumnDef(name="id", type_name="INTEGER"),
    ColumnDef(name="name", type_name="TEXT"),
]
df.write.schema(schema).save_as_table("target")
```

### To Files

```python
# Various formats
df.write.csv("output.csv")
df.write.json("output.json")
df.write.jsonl("output.jsonl")
df.write.parquet("output.parquet")  # Requires pandas and pyarrow

# Generic save
df.write.save("output.csv")  # Infers format from extension
df.write.save("data.txt", format="csv")  # Explicit format

# With options
df.write.option("header", True).option("delimiter", "|").csv("output.csv")
df.write.option("compression", "gzip").parquet("output.parquet")

# Partitioned writes
df.write.partitionBy("country", "year").csv("partitioned_data")
```

**File Formats:**
- **CSV**: Standard comma-separated values (options: `header`, `delimiter`)
- **JSON**: Array of objects (options: `indent`)
- **JSONL**: One JSON object per line
- **Parquet**: Columnar format (requires `pandas` and `pyarrow`, options: `compression`)

## 🌊 Streaming for Large Datasets

Moltres supports streaming operations for datasets larger than available memory:

```python
# Enable streaming mode
records = db.load.stream().option("chunk_size", 10000).csv("large_file.csv")

# Process records (streaming Records iterate row-by-row)
for row in records:
    process(row)

# Or materialize all at once
all_rows = records.rows()  # Materializes all data

# Streaming writes
df.write.stream().mode("overwrite").save_as_table("large_table")
df.write.stream().csv("output.csv")

# Streaming SQL queries
df = db.table("large_table").select()
for chunk in df.collect(stream=True):
    process_chunk(chunk)
```

**Streaming Options:**
- `.stream()`: Enable streaming mode (default: False for backward compatibility)
- `.option("chunk_size", N)`: Set chunk size for reads (default: 10000)
- `.option("batch_size", N)`: Set batch size for SQL inserts (default: 10000)
- `collect(stream=True)`: Return iterator of row chunks instead of materializing

**When to Use Streaming:**
- Files or tables larger than available RAM
- Processing data in batches for memory efficiency
- Incremental processing pipelines
- Large data transformations

## 🗄️ Table Management

### Creating Tables

```python
from moltres import column, connect

db = connect("sqlite:///example.db")

# Create a table with schema definition (lazy operation)
customers = db.create_table(
    "customers",
    [
        column("id", "INTEGER", nullable=False, primary_key=True),
        column("name", "TEXT", nullable=False),
        column("email", "TEXT", nullable=True),
        column("active", "INTEGER", default=1),
    ],
).collect()  # Execute the create_table operation

# Insert data (lazy operation)
customers.insert([
    {"id": 1, "name": "Alice", "email": "alice@example.com"},
    {"id": 2, "name": "Bob", "email": "bob@example.com"},
]).collect()  # Execute the insert operation

# Drop tables (lazy operation)
db.drop_table("customers").collect()  # Execute the drop_table operation
```

The `column()` helper accepts:
- `name`: Column name
- `type_name`: SQL type (e.g., "INTEGER", "TEXT", "REAL", "VARCHAR(255)")
- `nullable`: Whether the column allows NULL (default: True)
- `default`: Default value for the column (optional)
- `primary_key`: Whether this is a primary key column (default: False)

## ✏️ Data Mutations

Insert, update, delete, and DDL operations are **lazy**—they build an operation plan that only executes when you call `.collect()`. This allows you to compose multiple operations and execute them together, optionally within a transaction:

```python
from moltres import col

customers = db.table("customers")

# Insert rows (batch optimized) - lazy operation
mutation = customers.insert([
    {"id": 1, "name": "Alice", "active": 1},
    {"id": 2, "name": "Bob", "active": 0},
])
# Execute the insert
rowcount = mutation.collect()

# Update rows - lazy operation
mutation = customers.update(where=col("id") == 2, set={"active": 1})
# Execute the update
rowcount = mutation.collect()

# Delete rows - lazy operation
mutation = customers.delete(where=col("active") == 0)
# Execute the delete
rowcount = mutation.collect()

# Merge/upsert operations - lazy operation
mutation = customers.merge(
    [{"email": "user@example.com", "name": "Updated Name"}],
    on=["email"],
    when_matched={"name": "Updated Name"}
)
# Execute the merge
rowcount = mutation.collect()
```

**Note:** DataFrame write operations (like `df.write.save_as_table()`) remain **eager** and execute immediately, matching PySpark's behavior.

### Transaction Support

All operations within a transaction context share the same database connection and transaction. If any operation fails, all changes are automatically rolled back:

```python
# Execute multiple operations in a single transaction
with db.transaction() as txn:
    # All operations share the same transaction
    customers.insert([{"id": 1, "name": "Alice"}]).collect()
    orders.insert([{"id": 1, "customer_id": 1, "amount": 100.0}]).collect()
    # If any operation fails, all changes are rolled back
    # If all succeed, transaction commits automatically
```

You can also explicitly control the transaction:

```python
with db.transaction() as txn:
    customers.insert([{"id": 1, "name": "Alice"}]).collect()
    txn.commit()  # Explicit commit
    # Or txn.rollback() to rollback
```

**Note:** By default, each `.collect()` call executes in its own auto-commit transaction. Use `db.transaction()` to group multiple operations into a single atomic transaction.

### Batch Operations

The batch API allows you to queue multiple lazy operations and execute them together in a single transaction:

```python
# Queue multiple operations and execute them atomically
with db.batch():
    # All operations are queued and executed together on exit
    db.create_table("users", [
        ColumnDef(name="id", type_name="INTEGER"),
        ColumnDef(name="name", type_name="TEXT"),
    ])
    users = db.table("users")
    users.insert([{"id": 1, "name": "Alice"}])
    users.insert([{"id": 2, "name": "Bob"}])
    # All operations execute together in a single transaction
    # If any operation fails, all changes are rolled back
```

The batch context manager automatically:
- Queues all lazy operations created within the context
- Executes them together in a single transaction when exiting the context
- Rolls back all changes if any operation fails
- Supports both synchronous and asynchronous operations

## 📊 Result Formats

By default, `collect()` returns a list of dictionaries (`fetch_format="records"`), so Moltres works even when pandas/polars are unavailable. You can configure the result format when connecting:

```python
# Default: list of dicts
db = connect("sqlite:///example.db")
results = df.collect()  # List[Dict[str, Any]]

# Pandas DataFrame (requires pandas)
db = connect("sqlite:///example.db", fetch_format="pandas")
results = df.collect()  # pandas.DataFrame

# Polars DataFrame (requires polars)
db = connect("sqlite:///example.db", fetch_format="polars")
results = df.collect()  # polars.DataFrame
```

## ⚙️ Configuration

### Programmatic Configuration

```python
db = connect(
    "postgresql://user:pass@host/dbname",
    echo=True,  # Enable SQL logging
    fetch_format="pandas",
    pool_size=10,
    max_overflow=5,
    pool_timeout=30,
    pool_recycle=3600,
    pool_pre_ping=True,  # Connection health checks
)
```

### Environment Variables

Moltres supports configuration via environment variables for easier deployment (12-factor app friendly):

```bash
export MOLTRES_DSN="postgresql://user:pass@host/dbname"
export MOLTRES_POOL_SIZE=10
export MOLTRES_POOL_PRE_PING=true
export MOLTRES_FETCH_FORMAT="pandas"
```

Then in your code:
```python
from moltres import connect

# Uses MOLTRES_DSN from environment
db = connect()
```

**Supported environment variables:**
- `MOLTRES_DSN`: Database connection string
- `MOLTRES_ECHO`: Enable SQL logging (true/false)
- `MOLTRES_FETCH_FORMAT`: "records", "pandas", or "polars"
- `MOLTRES_DIALECT`: Override SQL dialect
- `MOLTRES_POOL_SIZE`: Connection pool size
- `MOLTRES_MAX_OVERFLOW`: Maximum pool overflow
- `MOLTRES_POOL_TIMEOUT`: Pool timeout in seconds
- `MOLTRES_POOL_RECYCLE`: Connection recycle time
- `MOLTRES_POOL_PRE_PING`: Enable connection health checks (true/false)

**Configuration Precedence:** Programmatic arguments > Environment variables > Defaults

## 📈 Performance Monitoring

Moltres provides optional performance monitoring hooks for tracking query execution:

```python
from moltres.engine import register_performance_hook

def log_slow_queries(sql: str, elapsed: float, metadata: dict):
    if elapsed > 1.0:
        print(f"Slow query ({elapsed:.2f}s): {sql[:100]}")
        print(f"  Rows affected: {metadata.get('rowcount', 'N/A')}")

register_performance_hook("query_end", log_slow_queries)

# Now all queries will be monitored
db = connect("sqlite:///example.db")
df.collect()  # Slow queries will be logged

# Unregister when done
from moltres.engine import unregister_performance_hook
unregister_performance_hook("query_end", log_slow_queries)
```

**Available Events:**
- `query_start`: Fired when a query begins execution
- `query_end`: Fired when a query completes (includes elapsed time and metadata)

## 🔒 Security

Moltres includes built-in security features to prevent SQL injection:

- **SQL Identifier Validation** - All table and column names are validated
- **Parameterized Queries** - All user data is passed as parameters, never string concatenation
- **Input Sanitization** - Comprehensive validation of identifiers and inputs

See [`docs/SECURITY.md`](https://github.com/eddiethedean/moltres/blob/main/docs/SECURITY.md) for security best practices and guidelines.

## 📚 Advanced Examples

### Complex Joins and Aggregations

```python
from moltres import col
from moltres.expressions.functions import sum, avg, count

# Multi-table join with aggregations
df = (
    db.table("orders")
    .select()
    .join(db.table("customers").select(), on=[("customer_id", "id")])
    .join(db.table("products").select(), on=[("product_id", "id")])
    .where(col("date") >= "2024-01-01")
    .group_by("country", "category")
    .agg(
        sum(col("amount")).alias("total_revenue"),
        avg(col("amount")).alias("avg_order_value"),
        count("*").alias("order_count"),
    )
    .order_by(col("total_revenue").desc())
    .limit(10)
)

results = df.collect()
```

### Window Functions

```python
# Complex expressions with window functions
df = (
    db.table("sales")
    .select(
        col("product"),
        col("amount"),
        col("date"),
        (col("amount") - avg(col("amount")).over()).alias("deviation_from_avg"),
    )
    .where(col("date") >= "2024-01-01")
)
```

### Complete ETL Pipeline

```python
# Complete ETL pipeline
db = connect("postgresql://user:pass@localhost/warehouse")

# Extract: Load from CSV (returns Records)
raw_records = db.load.csv("raw_sales.csv")

# Load raw data into staging table (lazy operation)
db.create_table("staging_sales", [
    column("order_id", "INTEGER"),
    column("product", "TEXT"),
    column("amount", "REAL"),
    column("date", "DATE"),
]).collect()  # Execute the create_table operation
raw_records.insert_into("staging_sales")  # Executes immediately

# Transform: Clean and aggregate using SQL operations
cleaned = (
    db.table("staging_sales")
    .select(
        col("order_id"),
        col("product").upper().alias("product"),
        col("amount").cast("REAL"),
        col("date"),
    )
    .where(col("amount") > 0)
    .group_by("product", "date")
    .agg(sum(col("amount")).alias("daily_revenue"))
)

# Load: Write to database
cleaned.write.mode("overwrite").save_as_table("daily_sales_summary")
```

## 🛠️ Supported Operations

### DataFrame Operations
- `select()` - Project columns
- `where()` / `filter()` - Filter rows
- `join()` - Join with other DataFrames
- `group_by()` / `groupBy()` - Group rows
- `agg()` - Aggregate functions
- `order_by()` - Sort rows
- `limit()` - Limit number of rows

### Column Expressions
- **Arithmetic**: `+`, `-`, `*`, `/`, `%`
- **Comparisons**: `==`, `!=`, `<`, `>`, `<=`, `>=`
- **Boolean**: `&`, `|`, `~`
- **Functions**: `sum()`, `avg()`, `count()`, `concat()`, `coalesce()`, `upper()`, `lower()`, etc.

### Supported SQL Dialects
- ✅ SQLite
- ✅ PostgreSQL
- ✅ MySQL (basic support)
- ✅ Other SQLAlchemy-supported databases (with ANSI SQL fallback)

## 🧪 Development

### Setup

```bash
# Clone the repository
git clone https://github.com/eddiethedean/moltres.git
cd moltres

# Install in development mode
pip install -e ".[dev]"

# Install pre-commit hooks
pre-commit install
```

### Running Tests

```bash
# Run all tests
pytest

# Run tests in parallel
pytest -n 9

# Run with coverage
pytest --cov=src/moltres --cov-report=html
```

### Code Quality

```bash
# Linting
ruff check .

# Formatting
ruff format .

# Type checking (strict mode enabled)
mypy src
```

## 📖 Documentation

Additional documentation is available in the `docs/` directory:

- **[Why Moltres?](https://github.com/eddiethedean/moltres/blob/main/docs/WHY_MOLTRES.md)** - Understanding the gap Moltres fills and who needs it
- **[Examples](https://github.com/eddiethedean/moltres/blob/main/docs/EXAMPLES.md)** - Common patterns, use cases, and examples for each audience
- **[Security Guide](https://github.com/eddiethedean/moltres/blob/main/docs/SECURITY.md)** - Security best practices and SQL injection prevention
- **[Troubleshooting](https://github.com/eddiethedean/moltres/blob/main/docs/TROUBLESHOOTING.md)** - Common issues and solutions
- **[Design Notes](https://github.com/eddiethedean/moltres/blob/main/docs/moltres_plan.md)** - High-level architecture and design decisions
- **[Advocacy Document](https://github.com/eddiethedean/moltres/blob/main/docs/moltres_advocacy.md)** - Detailed positioning and comparison with alternatives

## 🤝 Contributing

Contributions are welcome! Please see [`CONTRIBUTING.md`](https://github.com/eddiethedean/moltres/blob/main/CONTRIBUTING.md) for guidelines.

**Quick Start:**
1. Fork the repository
2. Create a feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add some amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request

**Before submitting:**
- Run tests: `pytest`
- Check code quality: `ruff check . && mypy src`
- Update documentation if needed

## 👤 Author

**Odos Matthews**

- GitHub: [@eddiethedean](https://github.com/eddiethedean)
- Email: odosmatthews@gmail.com

## 🙏 Acknowledgments

- Inspired by PySpark's DataFrame API style, but focused on SQL feature support rather than PySpark feature parity
- Built on SQLAlchemy for database connectivity and SQL compilation
- Thanks to all contributors and users

## 📄 License

MIT License - see [LICENSE](https://github.com/eddiethedean/moltres/blob/main/LICENSE) file for details.

---

<div align="center">

**Made with ❤️ for the Python data community**

[⬆ Back to Top](#moltres)

</div>
