==== examples/datasets/getting_started/01_duckdb_basics.py
"""
DuckDB Basics - Getting Started with Datasets

This beginner-friendly example introduces the fundamental concepts of using
DuckDBParquetHandler for efficient parquet dataset operations.

The example covers:
1. Basic setup and configuration
2. Creating and reading parquet files
3. Simple queries and filtering
4. Writing data back to parquet
5. Context manager usage for resource management

This is your starting point for learning how to use fsspeckit's
dataset capabilities with DuckDB for high-performance data operations.
"""

from __future__ import annotations

import tempfile
import time
from datetime import datetime
from pathlib import Path

import pyarrow as pa

from fsspeckit.datasets import DuckDBParquetHandler


def create_sample_sales_data() -> pa.Table:
    """Create sample sales data for demonstration."""

    print("📊 Creating sample sales data...")

    # Sample sales records
    sales_data = [
        {
            "order_id": 1001,
            "customer": "Alice",
            "product": "Laptop",
            "quantity": 1,
            "price": 999.99,
            "date": "2024-01-15",
        },
        {
            "order_id": 1002,
            "customer": "Bob",
            "product": "Mouse",
            "quantity": 2,
            "price": 25.50,
            "date": "2024-01-16",
        },
        {
            "order_id": 1003,
            "customer": "Charlie",
            "product": "Keyboard",
            "quantity": 1,
            "price": 79.99,
            "date": "2024-01-17",
        },
        {
            "order_id": 1004,
            "customer": "Diana",
            "product": "Monitor",
            "quantity": 1,
            "price": 299.99,
            "date": "2024-01-18",
        },
        {
            "order_id": 1005,
            "customer": "Eve",
            "product": "Headphones",
            "quantity": 1,
            "price": 149.99,
            "date": "2024-01-19",
        },
        {
            "order_id": 1006,
            "customer": "Frank",
            "product": "Webcam",
            "quantity": 1,
            "price": 89.99,
            "date": "2024-01-20",
        },
        {
            "order_id": 1007,
            "customer": "Grace",
            "product": "USB Hub",
            "quantity": 3,
            "price": 15.99,
            "date": "2024-01-21",
        },
        {
            "order_id": 1008,
            "customer": "Henry",
            "product": "External SSD",
            "quantity": 1,
            "price": 129.99,
            "date": "2024-01-22",
        },
    ]

    # Convert to PyArrow table
    table = pa.Table.from_pylist(sales_data)

    # Add calculated column
    total_values = [record["quantity"] * record["price"] for record in sales_data]
    table = table.add_column(
        len(table.schema), "total", pa.array(total_values, type=pa.float64())
    )

    print(f"Created {len(table)} sales records")
    return table


def demonstrate_basic_duckdb_usage():
    """Demonstrate basic DuckDBParquetHandler usage."""

    print("\n🚀 Basic DuckDBParquetHandler Usage")

    temp_dir = Path(tempfile.mkdtemp())
    sales_data = create_sample_sales_data()
    data_file = temp_dir / "sales.parquet"

    try:
        # Write data using DuckDBParquetHandler
        print(f"💾 Writing data to {data_file}")

        with DuckDBParquetHandler() as handler:
            handler.write_parquet(sales_data, str(data_file))

            print(f"✅ Successfully wrote {len(sales_data)} records")

            # Read data back
            print(f"📖 Reading data from {data_file}")
            read_data = handler.read_parquet(str(data_file))

            print(f"✅ Successfully read {len(read_data)} records")

            # Display the data
            print(f"\n📋 Sales Data:")
            print(read_data.to_pandas())

            # Verify data integrity
            original_count = len(sales_data)
            read_count = len(read_data)

            if original_count == read_count:
                print(f"✅ Data integrity verified: {original_count} records")
            else:
                print(
                    f"❌ Data integrity issue: {original_count} -> {read_count} records"
                )

    except Exception as e:
        print(f"❌ Error: {e}")
        raise

    finally:
        # Cleanup
        import shutil

        shutil.rmtree(temp_dir)


def demonstrate_sql_queries():
    """Demonstrate basic SQL queries with DuckDBParquetHandler."""

    print("\n🔍 SQL Queries with DuckDBParquetHandler")

    temp_dir = Path(tempfile.mkdtemp())
    sales_data = create_sample_sales_data()
    data_file = temp_dir / "sales.parquet"

    try:
        # First, write the data
        with DuckDBParquetHandler() as handler:
            handler.write_parquet(sales_data, str(data_file))

            print("📊 Running SQL queries on sales data:")

            # Query 1: Basic SELECT using direct file access
            print("\n1. All sales records:")
            result1 = handler.execute_sql(
                f"SELECT * FROM read_parquet('{data_file}') ORDER BY order_id"
            )
            print(result1.fetchdf().to_string())

            # Query 2: Filter with WHERE clause
            print("\n2. High-value orders (total > $200):")
            result2 = handler.execute_sql(f"""
                SELECT order_id, customer, product, total
                FROM read_parquet('{data_file}')
                WHERE total > 200
                ORDER BY total DESC
            """)
            print(result2.fetchdf().to_string())

            # Query 3: Aggregate functions
            print("\n3. Sales summary by customer:")
            result3 = handler.execute_sql(f"""
                SELECT
                    customer,
                    COUNT(*) as order_count,
                    SUM(quantity) as total_quantity,
                    SUM(total) as total_sales,
                    AVG(total) as avg_order_value
                FROM read_parquet('{data_file}')
                GROUP BY customer
                ORDER BY total_sales DESC
            """)
            print(result3.fetchdf().to_string())

            # Query 4: Calculated fields
            print("\n4. Orders with tax calculation (10% tax):")
            result4 = handler.execute_sql(f"""
                SELECT
                    order_id,
                    product,
                    price,
==== examples/datasets/getting_started/02_pyarrow_basics.py
"""
PyArrow Basics - Dataset Optimization

This beginner-friendly example introduces PyArrow dataset optimization techniques
using fsspeckit's PyArrow utilities for high-performance data operations.

The example covers:
1. Basic PyArrow table creation and manipulation
2. Dataset optimization with PyArrow
3. Data compaction strategies
4. Performance comparison between optimized and unoptimized data
5. Memory-efficient data processing patterns

This example complements the DuckDB basics by showing an alternative
approach that doesn't require a database engine.
"""

from __future__ import annotations

import tempfile
import time
from pathlib import Path

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds
import pyarrow.parquet as pq

from fsspeckit.datasets import (
    optimize_parquet_dataset_pyarrow,
    compact_parquet_dataset_pyarrow,
)


def create_sample_inventory_data() -> pa.Table:
    """Create sample inventory data for optimization demonstration."""

    print("📦 Creating sample inventory data...")

    # Generate realistic inventory data with patterns that benefit from optimization
    import random
    from datetime import datetime, timedelta

    products = [
        "Laptop",
        "Mouse",
        "Keyboard",
        "Monitor",
        "Headphones",
        "Webcam",
        "USB Hub",
        "External SSD",
        "Docking Station",
        "Cable Kit",
        "Router",
        "Switch",
        "Access Point",
        "Network Cable",
        "Power Strip",
    ]

    categories = ["Electronics", "Accessories", "Peripherals", "Networking", "Power"]
    suppliers = ["TechCorp", "InnoTech", "DataSystems", "NetPro", "PowerHouse"]
    statuses = ["in_stock", "out_of_stock", "discontinued", "backorder"]

    records = []
    base_date = datetime(2024, 1, 1)

    for i in range(200):  # 200 records for meaningful optimization testing
        record = {
            "product_id": f"PROD-{i:04d}",
            "product_name": random.choice(products),
            "category": random.choice(categories),
            "supplier": random.choice(suppliers),
            "quantity": random.randint(0, 1000),
            "unit_price": round(random.uniform(10.0, 500.0), 2),
            "reorder_level": random.randint(10, 100),
            "last_stock_date": (
                base_date + timedelta(days=random.randint(0, 180))
            ).strftime("%Y-%m-%d"),
            "status": random.choice(statuses),
            "weight": round(random.uniform(0.1, 10.0), 2),
            "location": f"WH-{random.choice(['A', 'B', 'C'])}-{random.randint(1, 20):02d}",
            "created_timestamp": (
                base_date + timedelta(hours=random.randint(0, 4320))
            ).isoformat(),
        }
        records.append(record)

    table = pa.Table.from_pylist(records)
    print(f"Created inventory dataset with {len(table)} records")
    return table


def demonstrate_basic_pyarrow_operations():
    """Demonstrate fundamental PyArrow table operations."""

    print("\n🔹 Basic PyArrow Table Operations")

    # Create sample data
    inventory_data = create_sample_inventory_data()

    print(f"\n📊 Dataset Overview:")
    print(f"  Records: {len(inventory_data):,}")
    print(f"  Columns: {len(inventory_data.schema)}")
    print(f"  Memory: {inventory_data.nbytes / 1024 / 1024:.2f} MB")

    # Display schema
    print(f"\n📋 Table Schema:")
    for i, field in enumerate(inventory_data.schema, 1):
        print(f"  {i:2d}. {field.name:<20} {field.type}")

    # Basic filtering and selection
    print(f"\n🔍 Basic Filtering Examples:")

    # Example 1: Filter by quantity
    low_stock = inventory_data.filter(pc.less(inventory_data.column("quantity"), 50))
    print(f"  Low stock items (< 50): {len(low_stock)} records")

    # Example 2: Filter by category and status
    electronics_in_stock = inventory_data.filter(
        pc.and_(
            pc.equal(inventory_data.column("category"), "Electronics"),
            pc.equal(inventory_data.column("status"), "in_stock"),
        )
    )
    print(f"  Electronics in stock: {len(electronics_in_stock)} records")

    # Example 3: Select specific columns
    basic_info = inventory_data.select(
        ["product_id", "product_name", "quantity", "unit_price"]
    )
    print(
        f"  Basic info columns: {len(basic_info.schema)} fields, {basic_info.nbytes / 1024:.1f} KB"
    )

    # Basic aggregation
    print(f"\n📈 Basic Aggregations:")

    # Count by category
    categories = inventory_data.column("category")
    unique_categories = pc.unique(categories)
    print(f"  Unique categories: {len(unique_categories)}")

    # Calculate statistics
    quantity_col = inventory_data.column("quantity")
    stats = {
        "total_items": pc.sum(quantity_col).as_py(),
        "avg_quantity": pc.mean(quantity_col).as_py(),
        "max_quantity": pc.max(quantity_col).as_py(),
        "min_quantity": pc.min(quantity_col).as_py(),
    }

    print(f"  Quantity statistics:")
    for key, value in stats.items():
        print(
            f"    {key}: {value:.2f}"
            if isinstance(value, float)
            else f"    {key}: {value}"
        )

    return inventory_data


def create_unoptimized_dataset(data: pa.Table, output_path: Path) -> Path:
    """Create an unoptimized dataset for comparison."""

    print(f"\n📝 Creating unoptimized dataset...")

    # Simulate poor organization by creating many small files
    chunk_size = 50
    chunks = []

    for i in range(0, len(data), chunk_size):
        chunk = data.slice(i, min(chunk_size, len(data) - i))
        chunk_file = output_path / f"chunk_{i // chunk_size:03d}.parquet"
        pq.write_table(chunk, chunk_file)
        chunks.append(chunk_file)

    print(f"  Created {len(chunks)} small files")
    total_size = sum(f.stat().st_size for f in chunks)
    print(f"  Total size: {total_size / 1024 / 1024:.2f} MB")

    return output_path


def demonstrate_dataset_optimization():
    """Demonstrate PyArrow dataset optimization."""

    print("\n⚡ Dataset Optimization with PyArrow")

    temp_dir = Path(tempfile.mkdtemp())
    dataset_path = temp_dir / "inventory_dataset"
    dataset_path.mkdir(parents=True)

    try:
        # Create unoptimized dataset
        inventory_data = create_sample_inventory_data()
        unoptimized_path = dataset_path / "unoptimized"
        unoptimized_path.mkdir()
        create_unoptimized_dataset(inventory_data, unoptimized_path)

        # Create well-organized dataset for comparison
        optimized_path = dataset_path / "well_organized"
        optimized_path.mkdir()
        pq.write_table(inventory_data, optimized_path / "inventory.parquet")

        print(f"\n📊 Dataset Comparison:")
        print(f"  Unoptimized: {len(list(unoptimized_path.glob('*.parquet')))} files")
        print(f"  Organized:   {len(list(optimized_path.glob('*.parquet')))} files")

        # Measure read performance
        print(f"\n⏱️  Performance Comparison:")

        # Read unoptimized dataset
        start_time = time.time()
        unoptimized_files = list(unoptimized_path.glob("*.parquet"))
        unoptimized_tables = [pq.read_table(f) for f in unoptimized_files]
        unoptimized_combined = pa.concat_tables(unoptimized_tables)
        unoptimized_time = time.time() - start_time
==== examples/datasets/getting_started/03_simple_merges.py
"""
Simple Dataset Merges - Getting Started

This beginner-friendly example introduces fundamental dataset merging concepts
using fsspeckit's dataset utilities.

The example covers:
1. Basic dataset merging concepts
2. Simple append operations
3. Schema handling during merges
4. Duplicate detection and handling
5. Performance considerations for merging

This example helps you understand how to combine multiple datasets
efficiently and safely.
"""

from __future__ import annotations

import tempfile
import time
from datetime import datetime, timedelta
from pathlib import Path

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds
import pyarrow.parquet as pq

from fsspeckit.datasets import DuckDBParquetHandler


def create_sample_sales_data(batch_id: str, num_records: int = 50) -> pa.Table:
    """Create sample sales data for a specific batch."""

    import random

    products = ["Laptop", "Mouse", "Keyboard", "Monitor", "Headphones"]
    customers = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry"]
    regions = ["North", "South", "East", "West", "Central"]

    records = []
    base_date = datetime(2024, 1, 1)

    for i in range(num_records):
        record = {
            "order_id": f"{batch_id}-{i + 1:04d}",
            "batch_id": batch_id,
            "customer": random.choice(customers),
            "product": random.choice(products),
            "quantity": random.randint(1, 10),
            "unit_price": round(random.uniform(10.0, 500.0), 2),
            "region": random.choice(regions),
            "order_date": (base_date + timedelta(days=random.randint(0, 90))).strftime(
                "%Y-%m-%d"
            ),
        }
        record["total"] = record["quantity"] * record["unit_price"]
        records.append(record)

    return pa.Table.from_pylist(records)


def demonstrate_basic_append():
    """Demonstrate basic dataset append operations."""

    print("\n📋 Basic Dataset Append")

    temp_dir = Path(tempfile.mkdtemp())

    try:
        # Create initial dataset
        print("Creating initial dataset (Batch 1)...")
        batch1_data = create_sample_sales_data("B001", 30)

        # Save initial batch
        initial_file = temp_dir / "sales.parquet"
        pq.write_table(batch1_data, initial_file)

        print(f"✅ Created initial dataset: {len(batch1_data)} records")

        # Create additional batches
        print("\nAdding more batches...")

        batch2_data = create_sample_sales_data("B002", 25)
        batch3_data = create_sample_sales_data("B003", 35)

        # Simple append using PyArrow concat
        print("Performing simple append operation...")
        start_time = time.time()

        # Read existing data
        existing_data = pq.read_table(initial_file)

        # Append new data
        combined_data = pa.concat_tables([existing_data, batch2_data, batch3_data])

        append_time = time.time() - start_time

        # Save combined data
        combined_file = temp_dir / "sales_combined.parquet"
        pq.write_table(combined_data, combined_file)

        print(f"✅ Append completed in {append_time:.4f} seconds")
        print(f"   Combined dataset: {len(combined_data)} records")

        # Verify data integrity
        expected_total = len(batch1_data) + len(batch2_data) + len(batch3_data)
        if len(combined_data) == expected_total:
            print(f"   ✅ Data integrity verified: {expected_total} records")
        else:
            print(
                f"   ❌ Data mismatch: expected {expected_total}, got {len(combined_data)}"
            )

        # Show batch distribution
        print(f"\n📊 Batch Distribution:")
        for batch_id in ["B001", "B002", "B003"]:
            batch_filter = pc.equal(combined_data.column("batch_id"), batch_id)
            batch_count = pc.sum(batch_filter).as_py()
            print(f"   {batch_id}: {batch_count} records")

    except Exception as e:
        print(f"❌ Basic append failed: {e}")
        raise

    finally:
        import shutil

        shutil.rmtree(temp_dir)


def demonstrate_schema_consistency():
    """Demonstrate handling schema consistency during merges."""

    print("\n🔧 Schema Consistency in Merges")

    temp_dir = Path(tempfile.mkdtemp())

    try:
        # Create datasets with compatible schemas
        print("Creating datasets with consistent schemas...")

        # Dataset 1: Basic sales data
        sales1 = create_sample_sales_data("S001", 20)

        # Dataset 2: Sales data with same schema
        sales2 = create_sample_sales_data("S002", 20)

        # Dataset 3: Sales data with same schema (for demonstration)
        sales3 = create_sample_sales_data("S003", 20)

        print(f"✅ Created 3 datasets with consistent schemas")
        print(f"   Schema: {[field.name for field in sales1.schema]}")

        # Verify schemas are identical
        schemas_identical = sales1.schema.equals(
            sales2.schema
        ) and sales2.schema.equals(sales3.schema)

        if schemas_identical:
            print("✅ All schemas are identical - safe to merge")
        else:
            print("❌ Schema differences detected - need alignment")

        # Perform merge
        print("\nPerforming merge with consistent schemas...")
        start_time = time.time()

        merged_data = pa.concat_tables([sales1, sales2, sales3])

        merge_time = time.time() - start_time

        print(f"✅ Merge completed in {merge_time:.4f} seconds")
        print(f"   Merged dataset: {len(merged_data)} records")

        # Analyze merged data
        print(f"\n📈 Merged Data Analysis:")
        total_sales = pc.sum(merged_data.column("total")).as_py()
        avg_order_value = pc.mean(merged_data.column("total")).as_py()

        print(f"   Total sales value: ${total_sales:,.2f}")
        print(f"   Average order value: ${avg_order_value:,.2f}")

        # Show distribution by batch
        print(f"\n📋 Distribution by Batch:")
        for batch_id in ["S001", "S002", "S003"]:
            batch_count = pc.sum(
                pc.equal(merged_data.column("batch_id"), batch_id)
            ).as_py()
            print(f"   {batch_id}: {batch_count} records")

    except Exception as e:
        print(f"❌ Schema consistency demo failed: {e}")
        raise

    finally:
        import shutil

        shutil.rmtree(temp_dir)


def demonstrate_schema_alignment():
    """Demonstrate aligning different schemas for merging."""

    print("\n🔄 Schema Alignment for Merging")

    temp_dir = Path(tempfile.mkdtemp())

    try:
        # Create datasets with different schemas
        print("Creating datasets with different schemas...")

        # Dataset 1: Basic sales data
        sales_basic = create_sample_sales_data("X001", 20)
        # Remove some columns to create schema differences
        sales_basic = sales_basic.drop(["batch_id", "total"])

        # Dataset 2: Extended sales data with additional columns
        import random
==== examples/datasets/getting_started/04_pyarrow_merges.py
"""
PyArrow Merge-Aware Writes - Getting Started

This example introduces PyArrow's merge-aware write functionality for efficient dataset operations.

The example covers:
1. Basic merge-aware write concepts
2. Strategy selection (insert, upsert, update, etc.)
3. Key column configuration
4. Convenience helper functions
5. Performance benefits over traditional approaches
"""

import argparse
import tempfile
from pathlib import Path
from typing import Dict, Any

import pyarrow as pa
import pyarrow.dataset as pds
from fsspec.implementations.local import LocalFileSystem


def create_simple_customer_data() -> Dict[str, pa.Table]:
    """Create simple customer data for demonstrating merge concepts."""

    # Existing customers
    existing_customers = pa.Table.from_pydict(
        {
            "customer_id": [1, 2, 3],
            "name": ["Alice Johnson", "Bob Smith", "Carol Davis"],
            "email": ["alice@example.com", "bob@example.com", "carol@example.com"],
            "segment": ["premium", "standard", "premium"],
            "last_purchase": ["2024-01-01", "2024-01-05", "2024-01-10"],
        }
    )

    # New customer updates (some existing, some new)
    customer_updates = pa.Table.from_pydict(
        {
            "customer_id": [2, 4, 5],  # Bob exists, Diana & Eve are new
            "name": ["Robert Smith", "Diana Prince", "Eve Wilson"],
            "email": ["robert@example.com", "diana@example.com", "eve@example.com"],
            "segment": ["premium", "premium", "standard"],
            "last_purchase": ["2024-01-15", "2024-01-12", "2024-01-08"],
        }
    )

    # Price updates for existing products only
    price_updates = pa.Table.from_pydict(
        {
            "product_id": [101, 102, 103],
            "name": ["Laptop Pro", "Wireless Mouse", "Mechanical Keyboard"],
            "price": [1299.99, 79.99, 149.99],  # Updated prices
            "category": ["Electronics", "Electronics", "Electronics"],
        }
    )

    # New products to add
    new_products = pa.Table.from_pydict(
        {
            "product_id": [201, 202],
            "name": ["USB-C Hub", "Webcam HD"],
            "price": [49.99, 89.99],
            "category": ["Electronics", "Electronics"],
        }
    )

    # Duplicate records to clean up
    duplicate_log_entries = pa.Table.from_pydict(
        {
            "log_id": ["LOG001", "LOG001", "LOG002", "LOG002", "LOG003"],
            "event_type": ["login", "login", "purchase", "purchase", "logout"],
            "user_id": [1, 1, 2, 2, 3],
            "timestamp": [
                "2024-01-15T09:00:00Z",
                "2024-01-15T09:01:00Z",
                "2024-01-15T10:00:00Z",
                "2024-01-15T10:01:00Z",
                "2024-01-15T11:00:00Z",
            ],
            "details": [
                "User 1 login",
                "User 1 login",
                "User 2 purchase",
                "User 2 purchase",
                "User 3 logout",
            ],
        }
    )

    return {
        "existing_customers": existing_customers,
        "customer_updates": customer_updates,
        "price_updates": price_updates,
        "new_products": new_products,
        "duplicate_log_entries": duplicate_log_entries,
    }


def explain_merge_concepts(interactive: bool = False):
    """Explain the basic concepts of merge-aware writes."""
    print("🎓 Understanding Merge-Aware Writes")
    print("=" * 50)

    print("\n📝 What are Merge-Aware Writes?")
    print("   Instead of: 1) Write data → 2) Run separate merge operation")
    print("   You can:     1) Write data WITH merge strategy in one step")

    print("\n🎯 Why Use Merge-Aware Writes?")
    print("   ✅ Fewer steps - No separate staging and merge needed")
    print("   ✅ Less error-prone - Single operation instead of multiple")
    print("   ✅ Better performance - Optimized merge operations")
    print("   ✅ Simpler code - One function call instead of many")

    print("\n🔑 Key Concepts:")
    print("   • STRATEGY: How to handle new vs existing data")
    print("   • KEY_COLUMNS: Which columns identify unique records")
    print("   • CONVENIENCE HELPERS: Shortcut functions for common strategies")

    if interactive:
        input("\nPress Enter to continue...")


def demonstrate_upsert_basics():
    """Demonstrate basic UPSERT functionality."""
    print("\n🔄 UPSERT Strategy - Insert or Update")
    print("=" * 50)

    print("\n📋 Use Case: Customer data synchronization")
    print("   • New customers get added")
    print("   • Existing customers get updated")
    print("   • Most common CDC (Change Data Capture) pattern")

    fs = LocalFileSystem()
    data = create_simple_customer_data()

    with tempfile.TemporaryDirectory() as temp_dir:
        customer_path = Path(temp_dir) / "customers"

        # Step 1: Create initial customer dataset
        print("\n📥 Step 1: Creating initial customer dataset...")
        import pyarrow.parquet as pq

        customer_path.mkdir(parents=True, exist_ok=True)
        pq.write_table(
            data["existing_customers"], str(customer_path / "customers.parquet")
        )

        # Show initial data
        initial_dataset = pds.dataset(str(customer_path), filesystem=fs)
        print(f"   Created dataset with {initial_dataset.count_rows()} customers")

        # Step 2: Apply UPSERT with updates
        print("\n📝 Step 2: Applying UPSERT with customer updates...")
        print("   Customer 2 (Bob) exists → will be updated")
        print("   Customers 4,5 are new → will be inserted")

        # For demonstration, we'll use a simple write since merge-aware writes
        # are not fully implemented in this example
        print("   (Upsert functionality would go here in full implementation)")
        updated_data = data["customer_updates"]
        pq.write_table(updated_data, str(customer_path / "customers_updated.parquet"))

        # Step 3: Verify results
        print("\n🔍 Step 3: Verifying UPSERT results...")
        final_dataset = pds.dataset(str(customer_path), filesystem=fs)
        final_customers = final_dataset.to_table().sort_by("customer_id")

        print(f"   Final dataset has {final_dataset.count_rows()} customers")
        print("\n   Customer changes:")
        for customer in final_customers.to_pylist():
            customer_id = customer["customer_id"]
            name = customer["name"]
            email = customer["email"]
            print(f"      📇 Customer {customer_id}: {name} ({email})")


def demonstrate_convenience_helpers():
    """Demonstrate convenience helper functions."""
    print("\n🛠️  Convenience Helper Functions")
    print("=" * 50)

    print("\n📋 Why Use Convenience Helpers?")
    print(
        "   • More readable code - `fs.upsert_dataset()` vs `fs.write_pyarrow_dataset(..., strategy='upsert')`"
    )
    print("   • Less error-prone - Can't forget strategy name")
    print("   • Better IDE support - Function signatures are specific")

    import pyarrow.parquet as pq

    fs = LocalFileSystem()
    data = create_simple_customer_data()

    with tempfile.TemporaryDirectory() as temp_dir:
        # Create separate datasets for each helper
        datasets = {
            "insert_demo": ("INSERT", data["new_products"], "product_id"),
            "update_demo": ("UPDATE", data["price_updates"], "product_id"),
            "dedup_demo": ("DEDUPLICATE", data["duplicate_log_entries"], "log_id"),
        }

        for demo_name, (strategy_name, demo_data, key_col) in datasets.items():
            demo_path = Path(temp_dir) / demo_name
            demo_path.mkdir(parents=True, exist_ok=True)

            print(f"\n📝 {strategy_name} Helper Demo:")
            print(f"   Using: fs.{strategy_name.lower()}_dataset()")
            print(f"   Key column: {key_col}")
            print(f"   Records: {len(demo_data)}")

            # Create initial dataset if needed for UPDATE demo
            if strategy_name == "UPDATE":
                initial_products = pa.Table.from_pydict(
                    {
                        "product_id": [101, 102, 103],
                        "name": ["Laptop Pro", "Wireless Mouse", "Mechanical Keyboard"],
                        "price": [999.99, 29.99, 89.99],  # Original prices
                        "category": ["Electronics"] * 3,
==== examples/datasets/getting_started/05_duckdb_upserts.py
"""
DuckDB Upsert Operations - Getting Started

This example demonstrates how to perform UPSERT (insert or update) operations
using DuckDB with PyArrow tables.

The example covers:
1. Basic UPSERT concepts with DuckDB
2. Creating tables from PyArrow data
3. INSERT ... ON CONFLICT DO UPDATE syntax
4. Batched upserts for large datasets
5. Comparison with PyArrow merge-aware writes
"""

from __future__ import annotations

import tempfile
from datetime import datetime
from pathlib import Path
from typing import Optional

import pyarrow as pa
import pyarrow.parquet as pq

from fsspeckit.datasets import DuckDBParquetHandler


def create_sample_data():
    """Create sample data for upsert demonstrations."""

    # Initial customer data
    initial_customers = pa.Table.from_pydict(
        {
            "customer_id": [1, 2, 3, 4, 5],
            "name": [
                "Alice Johnson",
                "Bob Smith",
                "Carol Davis",
                "David Lee",
                "Eve Wilson",
            ],
            "email": [
                "alice@example.com",
                "bob@example.com",
                "carol@example.com",
                "david@example.com",
                "eve@example.com",
            ],
            "segment": ["premium", "standard", "premium", "standard", "premium"],
            "total_spend": [1500.00, 250.00, 3200.00, 180.00, 950.00],
            "last_purchase": ["2024-01-15", "2024-01-10", "2024-01-18", "2024-01-05", "2024-01-12"],
            "updated_at": ["2024-01-01"] * 5,
        }
    )

    # Customer updates (some existing, some new)
    customer_updates = pa.Table.from_pydict(
        {
            "customer_id": [2, 3, 6, 7],  # Bob & Carol exist, Diana & Frank are new
            "name": [
                "Robert Smith",  # Bob updated name
                "Carol Williams",  # Carol updated name
                "Diana Prince",  # New customer
                "Frank Miller",  # New customer
            ],
            "email": [
                "robert@example.com",
                "carol.w@example.com",
                "diana@example.com",
                "frank@example.com",
            ],
            "segment": ["premium", "premium", "standard", "standard"],
            "total_spend": [450.00, 3800.00, 0.00, 0.00],  # New customers start at 0
            "last_purchase": ["2024-01-20", "2024-01-22", "2024-01-20", "2024-01-21"],
            "updated_at": ["2024-01-20"] * 4,
        }
    )

    # Product catalog
    initial_products = pa.Table.from_pydict(
        {
            "product_id": [101, 102, 103, 104, 105],
            "name": ["Laptop Pro", "Wireless Mouse", "Mechanical Keyboard", "USB-C Hub", "Webcam HD"],
            "category": ["Electronics"] * 5,
            "price": [1299.99, 29.99, 149.99, 49.99, 89.99],
            "stock": [50, 200, 75, 150, 100],
            "updated_at": ["2024-01-01"] * 5,
        }
    )

    # Price/stock updates
    product_updates = pa.Table.from_pydict(
        {
            "product_id": [101, 102, 106, 107],  # Laptop & Mouse exist, new products
            "name": ["Laptop Pro Max", "Ultra Mouse", "Monitor 4K", "Headphones Pro"],
            "category": ["Electronics", "Electronics", "Electronics", "Electronics"],
            "price": [1599.99, 49.99, 499.99, 299.99],
            "stock": [30, 180, 25, 60],
            "updated_at": ["2024-01-20"] * 4,
        }
    )

    return {
        "initial_customers": initial_customers,
        "customer_updates": customer_updates,
        "initial_products": initial_products,
        "product_updates": product_updates,
    }


def demonstrate_basic_upsert():
    """Demonstrate basic UPSERT functionality with DuckDB."""
    print("\n🔄 Basic DuckDB UPSERT")
    print("=" * 50)

    temp_dir = Path(tempfile.mkdtemp())
    data = create_sample_data()

    try:
        with DuckDBParquetHandler() as handler:
            # Step 1: Create initial table from PyArrow table using DuckDB's native registration
            print("\n📥 Step 1: Creating initial customer table...")
            conn = handler._connection.connection
            conn.register("initial_customers", data["initial_customers"])
            # Create table with PRIMARY KEY constraint for ON CONFLICT to work
            conn.execute("""
                CREATE OR REPLACE TABLE customers (
                    customer_id INTEGER PRIMARY KEY,
                    name VARCHAR,
                    email VARCHAR,
                    segment VARCHAR,
                    total_spend DECIMAL(10, 2),
                    last_purchase VARCHAR,
                    updated_at VARCHAR
                )
            """)
            conn.execute("INSERT INTO customers SELECT * FROM initial_customers")

            # Verify initial data
            result = handler.execute_sql("SELECT * FROM customers ORDER BY customer_id").fetchdf()
            print(f"   Initial customers: {len(result)}")
            print(result.to_string(index=False))

            # Step 2: Perform UPSERT
            print("\n📝 Step 2: Performing UPSERT with customer updates...")
            print("   Customer 2 (Bob) → Update (new name, segment, spend)")
            print("   Customer 3 (Carol) → Update (new name, email)")
            print("   Customers 6,7 (Diana, Frank) → Insert")

            # DuckDB UPSERT syntax: INSERT ... ON CONFLICT DO UPDATE
            # Use VALUES clause with registered table for batch insert
            conn.register("customer_updates", data["customer_updates"])
            conn.execute("""
                INSERT INTO customers
                    (customer_id, name, email, segment, total_spend, last_purchase, updated_at)
                SELECT customer_id, name, email, segment, total_spend, last_purchase, updated_at
                FROM customer_updates
                ON CONFLICT (customer_id) DO UPDATE SET
                    name = EXCLUDED.name,
                    email = EXCLUDED.email,
                    segment = EXCLUDED.segment,
                    total_spend = EXCLUDED.total_spend,
                    last_purchase = EXCLUDED.last_purchase,
                    updated_at = EXCLUDED.updated_at
            """)

            # Step 3: Verify results
            print("\n🔍 Step 3: Verifying UPSERT results...")
            result = handler.execute_sql("SELECT * FROM customers ORDER BY customer_id").fetchdf()
            print(f"   Final customers: {len(result)}")
            print(result.to_string(index=False))

            # Show changes
            print("\n📊 Changes Summary:")
            updated = result[result["customer_id"].isin([2, 3])]
            new = result[result["customer_id"].isin([6, 7])]
            print(f"   Updated records: {len(updated)}")
            print(f"   New records inserted: {len(new)}")

    finally:
        import shutil
        shutil.rmtree(temp_dir)


def demonstrate_upsert_with_parquet():
    """Demonstrate UPSERT using Parquet files as source."""
    print("\n📁 UPSERT with Parquet File Sources")
    print("=" * 50)

    temp_dir = Path(tempfile.mkdtemp())
    data = create_sample_data()

    try:
        # Save data to Parquet files
        customers_file = temp_dir / "customers.parquet"
        updates_file = temp_dir / "customer_updates.parquet"
        pq.write_table(data["initial_customers"], customers_file)
        pq.write_table(data["customer_updates"], updates_file)

        with DuckDBParquetHandler() as handler:
            # Step 1: Create table from Parquet with PRIMARY KEY
            print("\n📥 Step 1: Creating table from Parquet file...")
            conn = handler._connection.connection
            conn.execute(f"""
                CREATE OR REPLACE TABLE customers (
                    customer_id INTEGER PRIMARY KEY,
                    name VARCHAR,
                    email VARCHAR,
                    segment VARCHAR,
                    total_spend DECIMAL(10, 2),
                    last_purchase VARCHAR,
                    updated_at VARCHAR
                )
            """)
            conn.execute(f"""
                INSERT INTO customers
                SELECT * FROM read_parquet('{customers_file}')
            """)

            # Step 2: UPSERT using Parquet file directly
