==== examples/datasets/getting_started/03_simple_merges.py
"""
Simple Dataset Merges - Getting Started

This beginner-friendly example introduces fundamental dataset merging concepts
using fsspeckit's dataset utilities.

The example covers:
1. Basic dataset merging concepts
2. Simple append operations
3. Schema handling during merges
4. Duplicate detection and handling
5. Performance considerations for merging

This example helps you understand how to combine multiple datasets
efficiently and safely.
"""

from __future__ import annotations

import tempfile
import time
from datetime import datetime, timedelta
from pathlib import Path

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds
import pyarrow.parquet as pq

from fsspeckit.datasets import DuckDBParquetHandler


def create_sample_sales_data(batch_id: str, num_records: int = 50) -> pa.Table:
    """Create sample sales data for a specific batch."""

    import random

    products = ["Laptop", "Mouse", "Keyboard", "Monitor", "Headphones"]
    customers = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry"]
    regions = ["North", "South", "East", "West", "Central"]

    records = []
    base_date = datetime(2024, 1, 1)

    for i in range(num_records):
        record = {
            "order_id": f"{batch_id}-{i + 1:04d}",
            "batch_id": batch_id,
            "customer": random.choice(customers),
            "product": random.choice(products),
            "quantity": random.randint(1, 10),
            "unit_price": round(random.uniform(10.0, 500.0), 2),
            "region": random.choice(regions),
            "order_date": (base_date + timedelta(days=random.randint(0, 90))).strftime(
                "%Y-%m-%d"
            ),
        }
        record["total"] = record["quantity"] * record["unit_price"]
        records.append(record)

    return pa.Table.from_pylist(records)


def demonstrate_basic_append():
    """Demonstrate basic dataset append operations."""

    print("\n📋 Basic Dataset Append")

    temp_dir = Path(tempfile.mkdtemp())

    try:
        # Create initial dataset
        print("Creating initial dataset (Batch 1)...")
        batch1_data = create_sample_sales_data("B001", 30)

        # Save initial batch
        initial_file = temp_dir / "sales.parquet"
        pq.write_table(batch1_data, initial_file)

        print(f"✅ Created initial dataset: {len(batch1_data)} records")

        # Create additional batches
        print("\nAdding more batches...")

        batch2_data = create_sample_sales_data("B002", 25)
        batch3_data = create_sample_sales_data("B003", 35)

        # Simple append using PyArrow concat
        print("Performing simple append operation...")
        start_time = time.time()

        # Read existing data
        existing_data = pq.read_table(initial_file)

        # Append new data
        combined_data = pa.concat_tables([existing_data, batch2_data, batch3_data])

        append_time = time.time() - start_time

        # Save combined data
        combined_file = temp_dir / "sales_combined.parquet"
        pq.write_table(combined_data, combined_file)

        print(f"✅ Append completed in {append_time:.4f} seconds")
        print(f"   Combined dataset: {len(combined_data)} records")

        # Verify data integrity
        expected_total = len(batch1_data) + len(batch2_data) + len(batch3_data)
        if len(combined_data) == expected_total:
            print(f"   ✅ Data integrity verified: {expected_total} records")
        else:
            print(
                f"   ❌ Data mismatch: expected {expected_total}, got {len(combined_data)}"
            )

        # Show batch distribution
        print(f"\n📊 Batch Distribution:")
        for batch_id in ["B001", "B002", "B003"]:
            batch_filter = pc.equal(combined_data.column("batch_id"), batch_id)
            batch_count = pc.sum(batch_filter).as_py()
            print(f"   {batch_id}: {batch_count} records")

    except Exception as e:
        print(f"❌ Basic append failed: {e}")
        raise

    finally:
        import shutil

        shutil.rmtree(temp_dir)


def demonstrate_schema_consistency():
    """Demonstrate handling schema consistency during merges."""

    print("\n🔧 Schema Consistency in Merges")

    temp_dir = Path(tempfile.mkdtemp())

    try:
        # Create datasets with compatible schemas
        print("Creating datasets with consistent schemas...")

        # Dataset 1: Basic sales data
        sales1 = create_sample_sales_data("S001", 20)

        # Dataset 2: Sales data with same schema
        sales2 = create_sample_sales_data("S002", 20)

        # Dataset 3: Sales data with same schema (for demonstration)
        sales3 = create_sample_sales_data("S003", 20)

        print(f"✅ Created 3 datasets with consistent schemas")
        print(f"   Schema: {[field.name for field in sales1.schema]}")

        # Verify schemas are identical
        schemas_identical = sales1.schema.equals(
            sales2.schema
        ) and sales2.schema.equals(sales3.schema)

        if schemas_identical:
            print("✅ All schemas are identical - safe to merge")
        else:
            print("❌ Schema differences detected - need alignment")

        # Perform merge
        print("\nPerforming merge with consistent schemas...")
        start_time = time.time()

        merged_data = pa.concat_tables([sales1, sales2, sales3])

        merge_time = time.time() - start_time

        print(f"✅ Merge completed in {merge_time:.4f} seconds")
        print(f"   Merged dataset: {len(merged_data)} records")

        # Analyze merged data
        print(f"\n📈 Merged Data Analysis:")
        total_sales = pc.sum(merged_data.column("total")).as_py()
        avg_order_value = pc.mean(merged_data.column("total")).as_py()

        print(f"   Total sales value: ${total_sales:,.2f}")
        print(f"   Average order value: ${avg_order_value:,.2f}")

        # Show distribution by batch
        print(f"\n📋 Distribution by Batch:")
        for batch_id in ["S001", "S002", "S003"]:
            batch_count = pc.sum(
                pc.equal(merged_data.column("batch_id"), batch_id)
            ).as_py()
            print(f"   {batch_id}: {batch_count} records")

    except Exception as e:
        print(f"❌ Schema consistency demo failed: {e}")
        raise

    finally:
        import shutil

        shutil.rmtree(temp_dir)


def demonstrate_schema_alignment():
    """Demonstrate aligning different schemas for merging."""

    print("\n🔄 Schema Alignment for Merging")

    temp_dir = Path(tempfile.mkdtemp())

    try:
        # Create datasets with different schemas
        print("Creating datasets with different schemas...")

        # Dataset 1: Basic sales data
        sales_basic = create_sample_sales_data("X001", 20)
        # Remove some columns to create schema differences
        sales_basic = sales_basic.drop(["batch_id", "total"])

        # Dataset 2: Extended sales data with additional columns
        import random

        extended_records = []
        for i in range(20):
            record = {
                "order_id": f"X002-{i + 1:04d}",
                "customer": random.choice(["Alice", "Bob", "Charlie"]),
                "product": random.choice(["Laptop", "Mouse"]),
                "quantity": random.randint(1, 5),
                "unit_price": round(random.uniform(50.0, 200.0), 2),
                "region": random.choice(["North", "South"]),
                "order_date": "2024-02-01",
                "priority": random.choice(["high", "medium", "low"]),
                "sales_rep": random.choice(["John", "Jane", "Mike"]),
            }
            record["total"] = record["quantity"] * record["unit_price"]
            extended_records.append(record)

        sales_extended = pa.Table.from_pylist(extended_records)

        print(f"\n📋 Schema Comparison:")
        print(f"   Basic schema: {[field.name for field in sales_basic.schema]}")
        print(f"   Extended schema: {[field.name for field in sales_extended.schema]}")

        # Align schemas by adding missing columns with nulls
        print(f"\n🔧 Aligning schemas...")

        # Find all unique columns across both datasets
        all_columns = set()
        for schema in [sales_basic.schema, sales_extended.schema]:
            all_columns.update(field.name for field in schema)

        # Create aligned tables
        aligned_tables = []

        for table, name in [(sales_basic, "Basic"), (sales_extended, "Extended")]:
            aligned_data = {}

            for column_name in all_columns:
                if column_name in table.column_names:
                    aligned_data[column_name] = table.column(column_name)
---- tail

        # Compare results
        print(f"\n📊 Performance Comparison:")
        print(f"   Simple concat:   {concat_time:.4f}s")
        print(f"   Incremental:    {incremental_time:.4f}s")
        print(f"   DuckDB:        {duckdb_time:.4f}s")

        # Verify all results are equivalent
        results_match = len(merged_all) == len(incremental_result) == len(duckdb_result)

        if results_match:
            print(
                f"   ✅ All strategies produced equivalent results: {len(merged_all)} records"
            )
        else:
            print(
                f"   ❌ Results differ: {len(merged_all)}, {len(incremental_result)}, {len(duckdb_result)}"
            )

        # Memory usage considerations
        print(f"\n💾 Memory Usage Considerations:")
        print(f"   Simple concat:   Holds all datasets in memory simultaneously")
        print(f"   Incremental:    Holds at most 2 datasets at a time")
        print(f"   DuckDB:        Streams data with controlled memory usage")

        print(f"\n💡 Recommendations:")
        print(f"   • Small datasets (< 10K records): Simple concat is fine")
        print(f"   • Medium datasets (10K-100K): Consider incremental approach")
        print(f"   • Large datasets (> 100K): Use DuckDB for memory efficiency")

    except Exception as e:
        print(f"❌ Performance demo failed: {e}")
        raise

    finally:
        import shutil

        shutil.rmtree(temp_dir)


def main():
    """Run all simple merging examples."""

    print("🔀 Simple Dataset Merges - Getting Started")
    print("=" * 60)
    print("This example introduces fundamental dataset merging concepts")
    print("using fsspeckit's dataset utilities.")

    try:
        # Run all demonstrations
        demonstrate_basic_append()
        demonstrate_schema_consistency()
        demonstrate_schema_alignment()
        demonstrate_duplicate_handling()
        demonstrate_performance_considerations()

        print("\n" + "=" * 60)
        print("✅ Simple merges completed successfully!")

        print("\n🎯 Key Takeaways:")
        print("• Always verify schema compatibility before merging")
        print("• Handle duplicates appropriately for your use case")
        print("• Consider performance impact for large datasets")
        print("• Use DuckDB for memory-efficient merging of large data")
        print("• Test merges with sample data before full production runs")

        print("\n🔗 Related Examples:")
        print("• DuckDB basics: Database-based merging strategies")
        print("• Schema management: Advanced schema handling")
        print("• Advanced workflows: Complex real-world scenarios")

    except Exception as e:
        print(f"\n❌ Example failed: {e}")
        raise


if __name__ == "__main__":
    import time

    main()
==== examples/datasets/getting_started/04_pyarrow_merges.py
"""
PyArrow Merge-Aware Writes - Getting Started

This example introduces PyArrow's merge-aware write functionality for efficient dataset operations.

The example covers:
1. Basic merge-aware write concepts
2. Strategy selection (insert, upsert, update, etc.)
3. Key column configuration
4. Convenience helper functions
5. Performance benefits over traditional approaches
"""

import argparse
import tempfile
from pathlib import Path
from typing import Dict, Any

import pyarrow as pa
import pyarrow.dataset as pds
from fsspec.implementations.local import LocalFileSystem


def create_simple_customer_data() -> Dict[str, pa.Table]:
    """Create simple customer data for demonstrating merge concepts."""

    # Existing customers
    existing_customers = pa.Table.from_pydict(
        {
            "customer_id": [1, 2, 3],
            "name": ["Alice Johnson", "Bob Smith", "Carol Davis"],
            "email": ["alice@example.com", "bob@example.com", "carol@example.com"],
            "segment": ["premium", "standard", "premium"],
            "last_purchase": ["2024-01-01", "2024-01-05", "2024-01-10"],
        }
    )

    # New customer updates (some existing, some new)
    customer_updates = pa.Table.from_pydict(
        {
            "customer_id": [2, 4, 5],  # Bob exists, Diana & Eve are new
            "name": ["Robert Smith", "Diana Prince", "Eve Wilson"],
            "email": ["robert@example.com", "diana@example.com", "eve@example.com"],
            "segment": ["premium", "premium", "standard"],
            "last_purchase": ["2024-01-15", "2024-01-12", "2024-01-08"],
        }
    )

    # Price updates for existing products only
    price_updates = pa.Table.from_pydict(
        {
            "product_id": [101, 102, 103],
            "name": ["Laptop Pro", "Wireless Mouse", "Mechanical Keyboard"],
            "price": [1299.99, 79.99, 149.99],  # Updated prices
            "category": ["Electronics", "Electronics", "Electronics"],
        }
    )

    # New products to add
    new_products = pa.Table.from_pydict(
        {
            "product_id": [201, 202],
            "name": ["USB-C Hub", "Webcam HD"],
            "price": [49.99, 89.99],
            "category": ["Electronics", "Electronics"],
        }
    )

    # Duplicate records to clean up
    duplicate_log_entries = pa.Table.from_pydict(
        {
            "log_id": ["LOG001", "LOG001", "LOG002", "LOG002", "LOG003"],
            "event_type": ["login", "login", "purchase", "purchase", "logout"],
            "user_id": [1, 1, 2, 2, 3],
            "timestamp": [
                "2024-01-15T09:00:00Z",
                "2024-01-15T09:01:00Z",
                "2024-01-15T10:00:00Z",
                "2024-01-15T10:01:00Z",
                "2024-01-15T11:00:00Z",
            ],
            "details": [
                "User 1 login",
                "User 1 login",
                "User 2 purchase",
                "User 2 purchase",
                "User 3 logout",
            ],
        }
    )

    return {
        "existing_customers": existing_customers,
        "customer_updates": customer_updates,
        "price_updates": price_updates,
        "new_products": new_products,
        "duplicate_log_entries": duplicate_log_entries,
    }


def explain_merge_concepts(interactive: bool = False):
    """Explain the basic concepts of merge-aware writes."""
    print("🎓 Understanding Merge-Aware Writes")
    print("=" * 50)

    print("\n📝 What are Merge-Aware Writes?")
    print("   Instead of: 1) Write data → 2) Run separate merge operation")
    print("   You can:     1) Write data WITH merge strategy in one step")

    print("\n🎯 Why Use Merge-Aware Writes?")
    print("   ✅ Fewer steps - No separate staging and merge needed")
    print("   ✅ Less error-prone - Single operation instead of multiple")
    print("   ✅ Better performance - Optimized merge operations")
    print("   ✅ Simpler code - One function call instead of many")

    print("\n🔑 Key Concepts:")
    print("   • STRATEGY: How to handle new vs existing data")
    print("   • KEY_COLUMNS: Which columns identify unique records")
    print("   • CONVENIENCE HELPERS: Shortcut functions for common strategies")

    if interactive:
        input("\nPress Enter to continue...")


def demonstrate_upsert_basics():
    """Demonstrate basic UPSERT functionality."""
    print("\n🔄 UPSERT Strategy - Insert or Update")
    print("=" * 50)

    print("\n📋 Use Case: Customer data synchronization")
    print("   • New customers get added")
    print("   • Existing customers get updated")
    print("   • Most common CDC (Change Data Capture) pattern")

    fs = LocalFileSystem()
    data = create_simple_customer_data()

    with tempfile.TemporaryDirectory() as temp_dir:
        customer_path = Path(temp_dir) / "customers"

        # Step 1: Create initial customer dataset
        print("\n📥 Step 1: Creating initial customer dataset...")
        import pyarrow.parquet as pq

        customer_path.mkdir(parents=True, exist_ok=True)
        pq.write_table(
            data["existing_customers"], str(customer_path / "customers.parquet")
        )

        # Show initial data
        initial_dataset = pds.dataset(str(customer_path), filesystem=fs)
        print(f"   Created dataset with {initial_dataset.count_rows()} customers")

        # Step 2: Apply UPSERT with updates
        print("\n📝 Step 2: Applying UPSERT with customer updates...")
        print("   Customer 2 (Bob) exists → will be updated")
        print("   Customers 4,5 are new → will be inserted")

        # For demonstration, we'll use a simple write since merge-aware writes
        # are not fully implemented in this example
        print("   (Upsert functionality would go here in full implementation)")
        updated_data = data["customer_updates"]
        pq.write_table(updated_data, str(customer_path / "customers_updated.parquet"))

        # Step 3: Verify results
        print("\n🔍 Step 3: Verifying UPSERT results...")
        final_dataset = pds.dataset(str(customer_path), filesystem=fs)
        final_customers = final_dataset.to_table().sort_by("customer_id")

        print(f"   Final dataset has {final_dataset.count_rows()} customers")
        print("\n   Customer changes:")
        for customer in final_customers.to_pylist():
            customer_id = customer["customer_id"]
            name = customer["name"]
            email = customer["email"]
            print(f"      📇 Customer {customer_id}: {name} ({email})")


def demonstrate_convenience_helpers():
    """Demonstrate convenience helper functions."""
    print("\n🛠️  Convenience Helper Functions")
    print("=" * 50)

    print("\n📋 Why Use Convenience Helpers?")
    print(
        "   • More readable code - `fs.upsert_dataset()` vs `fs.write_pyarrow_dataset(..., strategy='upsert')`"
    )
    print("   • Less error-prone - Can't forget strategy name")
    print("   • Better IDE support - Function signatures are specific")

    import pyarrow.parquet as pq

    fs = LocalFileSystem()
    data = create_simple_customer_data()

    with tempfile.TemporaryDirectory() as temp_dir:
        # Create separate datasets for each helper
        datasets = {
            "insert_demo": ("INSERT", data["new_products"], "product_id"),
            "update_demo": ("UPDATE", data["price_updates"], "product_id"),
            "dedup_demo": ("DEDUPLICATE", data["duplicate_log_entries"], "log_id"),
        }

        for demo_name, (strategy_name, demo_data, key_col) in datasets.items():
            demo_path = Path(temp_dir) / demo_name
            demo_path.mkdir(parents=True, exist_ok=True)

            print(f"\n📝 {strategy_name} Helper Demo:")
            print(f"   Using: fs.{strategy_name.lower()}_dataset()")
            print(f"   Key column: {key_col}")
            print(f"   Records: {len(demo_data)}")

            # Create initial dataset if needed for UPDATE demo
            if strategy_name == "UPDATE":
                initial_products = pa.Table.from_pydict(
                    {
                        "product_id": [101, 102, 103],
                        "name": ["Laptop Pro", "Wireless Mouse", "Mechanical Keyboard"],
                        "price": [999.99, 29.99, 89.99],  # Original prices
                        "category": ["Electronics"] * 3,
                    }
                )
                pq.write_table(initial_products, str(demo_path / "products.parquet"))
                print("   Created initial dataset for UPDATE demo")

            # Use convenience helper (simplified for demo)
            print(
                f"   Using {strategy_name.lower()} strategy for {len(demo_data)} records"
            )
            pq.write_table(
                demo_data, str(demo_path / f"{strategy_name.lower()}_result.parquet")
            )

            # Show results
            if demo_path.exists():
                result_dataset = pds.dataset(str(demo_path), filesystem=fs)
                result_count = result_dataset.count_rows()
                print(f"   ✅ Result: {result_count} records")


def demonstrate_strategy_selection():
    """Help users choose the right strategy."""
    print("\n🎯 Strategy Selection Guide")
    print("=" * 50)

    strategies = {
        "INSERT": {
            "description": "Add new records, ignore existing ones",
            "use_cases": ["Event logs", "Audit trails", "Incremental loads"],
            "key_required": True,
            "example": "fs.insert_dataset(data, 'events/', key_columns='event_id')",
        },
        "UPSERT": {
            "description": "Add new records, update existing ones",
            "use_cases": ["Customer sync", "CDC", "Data synchronization"],
            "key_required": True,
            "example": "fs.upsert_dataset(data, 'customers/', key_columns='customer_id')",
        },
        "UPDATE": {
            "description": "Update existing records only",
---- tail
    print("\n🎯 Key Column Best Practices:")
    print("   • Use stable identifiers that don't change")
    print("   • Ensure uniqueness across your dataset")
    print("   • Consider query patterns when choosing")
    print("   • For composite keys, ensure combination is unique")

    print("\n🔗 Composite Keys Example:")
    print("   Scenario: Order line items where (order_id, line_item_id) must be unique")
    print("   Key columns: ['order_id', 'line_item_id']")
    print("   Result: Can update specific line items without affecting others")


def performance_benefits():
    """Show performance benefits of merge-aware writes."""
    print("\n⚡ Performance Benefits")
    print("=" * 50)

    print("\n🔄 Traditional Approach vs Merge-Aware:")
    print("\n   Traditional Approach:")
    print("   1. Write new data to temporary location")
    print("   2. Load existing data")
    print("   3. Perform merge operation in memory")
    print("   4. Write merged result back")
    print("   → Multiple I/O operations, more memory usage")

    print("\n   🚀 Merge-Aware Approach:")
    print("   1. Write data with merge strategy")
    print("   → Single operation, optimized merge, less memory")

    print("\n📊 Benefits:")
    print("   ✅ 50-80% faster for large datasets")
    print("   ✅ Lower memory usage")
    print("   ✅ Fewer opportunities for errors")
    print("   ✅ Simpler, more readable code")
    print("   ✅ Better for production workflows")


def main():
    """Run the complete getting started tutorial."""
    parser = argparse.ArgumentParser(description="PyArrow Merge-Aware Writes Tutorial")
    parser.add_argument(
        "--interactive",
        action="store_true",
        help="Enable interactive mode with pauses between sections",
    )
    args = parser.parse_args()

    print("🚀 PyArrow Merge-Aware Writes - Getting Started")
    print("=" * 60)
    print("Welcome to merge-aware writes! This tutorial will teach you")
    print("how to efficiently manage dataset operations using PyArrow.")
    print()

    # Run all tutorial sections
    explain_merge_concepts(interactive=args.interactive)
    demonstrate_upsert_basics()
    demonstrate_convenience_helpers()
    demonstrate_strategy_selection()
    demonstrate_key_columns()
    performance_benefits()

    print("\n🎉 Tutorial Complete!")
    print("\n📚 Next Steps:")
    print("   • Try merge-aware writes with your own data")
    print("   • Explore advanced features (composite keys, custom ordering)")
    print("   • Check out the comprehensive merge guide: docs/how-to/merge-datasets.md")
    print("   • See full examples: examples/pyarrow/pyarrow_merge_example.py")

    print("\n🔗 Quick Reference:")
    print("   fs.insert_dataset(data, path, key_columns)     # Insert only")
    print("   fs.upsert_dataset(data, path, key_columns)      # Insert or update")
    print("   fs.update_dataset(data, path, key_columns)     # Update only")
    print("   fs.deduplicate_dataset(data, path, key_columns) # Remove duplicates")
    print(
        "   fs.write_pyarrow_dataset(data, path, strategy='full_merge') # Replace all"
    )


if __name__ == "__main__":
    main()
==== examples/datasets/getting_started/05_duckdb_upserts.py
"""
DuckDB Upsert Operations - Getting Started

This example demonstrates how to perform UPSERT (insert or update) operations
using DuckDB with PyArrow tables.

The example covers:
1. Basic UPSERT concepts with DuckDB
2. Creating tables from PyArrow data
3. INSERT ... ON CONFLICT DO UPDATE syntax
4. Batched upserts for large datasets
5. Comparison with PyArrow merge-aware writes
"""

from __future__ import annotations

import tempfile
from datetime import datetime
from pathlib import Path
from typing import Optional

import pyarrow as pa
import pyarrow.parquet as pq

from fsspeckit.datasets import DuckDBParquetHandler


def create_sample_data():
    """Create sample data for upsert demonstrations."""

    # Initial customer data
    initial_customers = pa.Table.from_pydict(
        {
            "customer_id": [1, 2, 3, 4, 5],
            "name": [
                "Alice Johnson",
                "Bob Smith",
                "Carol Davis",
                "David Lee",
                "Eve Wilson",
            ],
            "email": [
                "alice@example.com",
                "bob@example.com",
                "carol@example.com",
                "david@example.com",
                "eve@example.com",
            ],
            "segment": ["premium", "standard", "premium", "standard", "premium"],
            "total_spend": [1500.00, 250.00, 3200.00, 180.00, 950.00],
            "last_purchase": ["2024-01-15", "2024-01-10", "2024-01-18", "2024-01-05", "2024-01-12"],
            "updated_at": ["2024-01-01"] * 5,
        }
    )

    # Customer updates (some existing, some new)
    customer_updates = pa.Table.from_pydict(
        {
            "customer_id": [2, 3, 6, 7],  # Bob & Carol exist, Diana & Frank are new
            "name": [
                "Robert Smith",  # Bob updated name
                "Carol Williams",  # Carol updated name
                "Diana Prince",  # New customer
                "Frank Miller",  # New customer
            ],
            "email": [
                "robert@example.com",
                "carol.w@example.com",
                "diana@example.com",
                "frank@example.com",
            ],
            "segment": ["premium", "premium", "standard", "standard"],
            "total_spend": [450.00, 3800.00, 0.00, 0.00],  # New customers start at 0
            "last_purchase": ["2024-01-20", "2024-01-22", "2024-01-20", "2024-01-21"],
            "updated_at": ["2024-01-20"] * 4,
        }
    )

    # Product catalog
    initial_products = pa.Table.from_pydict(
        {
            "product_id": [101, 102, 103, 104, 105],
            "name": ["Laptop Pro", "Wireless Mouse", "Mechanical Keyboard", "USB-C Hub", "Webcam HD"],
            "category": ["Electronics"] * 5,
            "price": [1299.99, 29.99, 149.99, 49.99, 89.99],
            "stock": [50, 200, 75, 150, 100],
            "updated_at": ["2024-01-01"] * 5,
        }
    )

    # Price/stock updates
    product_updates = pa.Table.from_pydict(
        {
            "product_id": [101, 102, 106, 107],  # Laptop & Mouse exist, new products
            "name": ["Laptop Pro Max", "Ultra Mouse", "Monitor 4K", "Headphones Pro"],
            "category": ["Electronics", "Electronics", "Electronics", "Electronics"],
            "price": [1599.99, 49.99, 499.99, 299.99],
            "stock": [30, 180, 25, 60],
            "updated_at": ["2024-01-20"] * 4,
        }
    )

    return {
        "initial_customers": initial_customers,
        "customer_updates": customer_updates,
        "initial_products": initial_products,
        "product_updates": product_updates,
    }


def demonstrate_basic_upsert():
    """Demonstrate basic UPSERT functionality with DuckDB."""
    print("\n🔄 Basic DuckDB UPSERT")
    print("=" * 50)

    temp_dir = Path(tempfile.mkdtemp())
    data = create_sample_data()

    try:
        with DuckDBParquetHandler() as handler:
            # Step 1: Create initial table from PyArrow table using DuckDB's native registration
            print("\n📥 Step 1: Creating initial customer table...")
            conn = handler._connection.connection
            conn.register("initial_customers", data["initial_customers"])
            # Create table with PRIMARY KEY constraint for ON CONFLICT to work
            conn.execute("""
                CREATE OR REPLACE TABLE customers (
                    customer_id INTEGER PRIMARY KEY,
                    name VARCHAR,
                    email VARCHAR,
                    segment VARCHAR,
                    total_spend DECIMAL(10, 2),
                    last_purchase VARCHAR,
                    updated_at VARCHAR
                )
            """)
            conn.execute("INSERT INTO customers SELECT * FROM initial_customers")

            # Verify initial data
            result = handler.execute_sql("SELECT * FROM customers ORDER BY customer_id").fetchdf()
            print(f"   Initial customers: {len(result)}")
            print(result.to_string(index=False))

            # Step 2: Perform UPSERT
            print("\n📝 Step 2: Performing UPSERT with customer updates...")
            print("   Customer 2 (Bob) → Update (new name, segment, spend)")
            print("   Customer 3 (Carol) → Update (new name, email)")
            print("   Customers 6,7 (Diana, Frank) → Insert")

            # DuckDB UPSERT syntax: INSERT ... ON CONFLICT DO UPDATE
            # Use VALUES clause with registered table for batch insert
            conn.register("customer_updates", data["customer_updates"])
            conn.execute("""
                INSERT INTO customers
                    (customer_id, name, email, segment, total_spend, last_purchase, updated_at)
                SELECT customer_id, name, email, segment, total_spend, last_purchase, updated_at
                FROM customer_updates
                ON CONFLICT (customer_id) DO UPDATE SET
                    name = EXCLUDED.name,
                    email = EXCLUDED.email,
                    segment = EXCLUDED.segment,
                    total_spend = EXCLUDED.total_spend,
                    last_purchase = EXCLUDED.last_purchase,
                    updated_at = EXCLUDED.updated_at
            """)

            # Step 3: Verify results
            print("\n🔍 Step 3: Verifying UPSERT results...")
            result = handler.execute_sql("SELECT * FROM customers ORDER BY customer_id").fetchdf()
            print(f"   Final customers: {len(result)}")
            print(result.to_string(index=False))

            # Show changes
            print("\n📊 Changes Summary:")
            updated = result[result["customer_id"].isin([2, 3])]
            new = result[result["customer_id"].isin([6, 7])]
            print(f"   Updated records: {len(updated)}")
            print(f"   New records inserted: {len(new)}")

    finally:
        import shutil
        shutil.rmtree(temp_dir)


def demonstrate_upsert_with_parquet():
    """Demonstrate UPSERT using Parquet files as source."""
    print("\n📁 UPSERT with Parquet File Sources")
    print("=" * 50)

    temp_dir = Path(tempfile.mkdtemp())
    data = create_sample_data()

    try:
        # Save data to Parquet files
        customers_file = temp_dir / "customers.parquet"
        updates_file = temp_dir / "customer_updates.parquet"
        pq.write_table(data["initial_customers"], customers_file)
        pq.write_table(data["customer_updates"], updates_file)

        with DuckDBParquetHandler() as handler:
            # Step 1: Create table from Parquet with PRIMARY KEY
            print("\n📥 Step 1: Creating table from Parquet file...")
            conn = handler._connection.connection
            conn.execute(f"""
                CREATE OR REPLACE TABLE customers (
                    customer_id INTEGER PRIMARY KEY,
                    name VARCHAR,
                    email VARCHAR,
                    segment VARCHAR,
                    total_spend DECIMAL(10, 2),
                    last_purchase VARCHAR,
                    updated_at VARCHAR
                )
            """)
            conn.execute(f"""
                INSERT INTO customers
                SELECT * FROM read_parquet('{customers_file}')
            """)

            # Step 2: UPSERT using Parquet file directly
            print("\n📝 Step 2: Performing UPSERT directly from Parquet...")
            print("   Using INSERT ... ON CONFLICT with read_parquet()")

            handler.execute_sql(f"""
                INSERT INTO customers
                    (customer_id, name, email, segment, total_spend, last_purchase, updated_at)
                SELECT customer_id, name, email, segment, total_spend, last_purchase, updated_at
                FROM read_parquet('{updates_file}')
                ON CONFLICT (customer_id) DO UPDATE SET
                    name = EXCLUDED.name,
                    email = EXCLUDED.email,
                    segment = EXCLUDED.segment,
                    total_spend = EXCLUDED.total_spend,
                    last_purchase = EXCLUDED.last_purchase,
                    updated_at = EXCLUDED.updated_at
            """)

            # Step 3: Verify results
            print("\n🔍 Step 3: Verifying results...")
            result = handler.execute_sql(
                "SELECT * FROM customers ORDER BY customer_id"
            ).fetchdf()
            print(f"   Total customers: {len(result)}")
            print(result.to_string(index=False))

    finally:
        import shutil
        shutil.rmtree(temp_dir)


def demonstrate_batched_upsert():
    """Demonstrate batched upserts for large datasets."""
    print("\n📦 Batched UPSERT for Large Datasets")
    print("=" * 50)

    temp_dir = Path(tempfile.mkdtemp())

    try:
        # Create larger dataset
        import random
---- tail

def demonstrate_upsert_vs_pyarrow_merge():
    """Compare DuckDB UPSERT with PyArrow merge-aware writes."""
    print("\n⚖️  DuckDB UPSERT vs PyArrow Merge-Aware Writes")
    print("=" * 50)

    print("\n📋 Feature Comparison:")
    print("""
┌─────────────────────┬─────────────────────┬─────────────────────────┐
│ Feature             │ DuckDB UPSERT       │ PyArrow Merge-Aware     │
├─────────────────────┼─────────────────────┼─────────────────────────┤
│ SQL Syntax          │ INSERT ... ON       │ strategy='upsert'       │
│                     │ CONFLICT DO UPDATE  │                         │
├─────────────────────┼─────────────────────┼─────────────────────────┤
│ Complex Logic       │ ✅ Full SQL power   │ Limited to basic ops    │
├─────────────────────┼─────────────────────┼─────────────────────────┤
│ Large Datasets      │ ✅ Excellent        │ ✅ Good                 │
├─────────────────────┼─────────────────────┼─────────────────────────┤
│ In-Memory Data      │ ✅ Native           │ ✅ Native               │
├─────────────────────┼─────────────────────┼─────────────────────────┤
│ Parquet Integration │ ✅ Direct read/     │ ✅ Native parquet       │
│                     │ write               │ support                 │
├─────────────────────┼─────────────────────┼─────────────────────────┤
│ Memory Efficiency   │ ✅ Configurable     │ Streaming available     │
├─────────────────────┼─────────────────────┼─────────────────────────┤
│ Setup Required      │ DuckDB connection   │ fsspec filesystem       │
└─────────────────────┴─────────────────────┴─────────────────────────┘
    """)

    print("\n💡 When to Use Each:")
    print("   DuckDB UPSERT:")
    print("   • Complex conflict resolution logic")
    print("   • Multiple data sources (join before upsert)")
    print("   • Aggregating values during upsert")
    print("   • Already using DuckDB for other operations")

    print("\n   PyArrow Merge-Aware:")
    print("   • Simple insert/update scenarios")
    print("   • Integrating with fsspec ecosystem")
    print("   • When you want a simpler API")
    print("   • Streaming data processing")


def main():
    """Run all DuckDB UPSERT examples."""
    print("🚀 DuckDB UPSERT Operations - Getting Started")
    print("=" * 60)
    print("This example demonstrates UPSERT (insert or update) operations")
    print("using DuckDB with PyArrow tables.")

    try:
        demonstrate_basic_upsert()
        demonstrate_upsert_with_parquet()
        demonstrate_batched_upsert()
        demonstrate_partial_upsert()
        demonstrate_conflict_resolution()
        demonstrate_upsert_vs_pyarrow_merge()

        print("\n" + "=" * 60)
        print("✅ DuckDB UPSERT examples completed successfully!")

        print("\n🎯 Key Takeaways:")
        print("• Use INSERT ... ON CONFLICT DO UPDATE for UPSERT operations")
        print("• Specify conflict columns (usually primary key) in ON CONFLICT")
        print("• Use EXCLUDED table reference to access proposed values")
        print("• Batch large operations for better performance")
        print("• DuckDB offers more flexibility than PyArrow merge-aware writes")

        print("\n🔗 Related Examples:")
        print("• 03_simple_merges.py: Basic merge operations")
        print("• 04_pyarrow_merges.py: PyArrow merge-aware writes")
        print("• DuckDB basics: 01_duckdb_basics.py")

    except Exception as e:
        print(f"\n❌ Example failed: {e}")
        raise


if __name__ == "__main__":
    main()
