Metadata-Version: 2.4
Name: cpg-cdm-agg
Version: 1.0.0
Summary: A Python package for creating Gold layer Iceberg tables from Silver layer data in Snowflake using Snowpark Python
Author-email: Your Name <your.email@example.com>
Maintainer-email: Your Name <your.email@example.com>
License: MIT
Project-URL: Homepage, https://github.com/yourusername/cpg-cdm-agg
Project-URL: Documentation, https://github.com/yourusername/cpg-cdm-agg#readme
Project-URL: Repository, https://github.com/yourusername/cpg-cdm-agg
Project-URL: Issues, https://github.com/yourusername/cpg-cdm-agg/issues
Keywords: snowflake,snowpark,data-engineering,etl,gold-layer,iceberg,data-warehouse,aggregations,cpg
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Topic :: Database
Classifier: Topic :: Scientific/Engineering
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: snowflake-snowpark-python>=1.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: isort>=5.12.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Dynamic: license-file

# CPG CDM Aggregations (cpg-cdm-agg)

A Python package for creating Gold layer Iceberg tables from Silver layer data in Snowflake using Snowpark Python.

## Features

- **31 Aggregation Functions** covering various business domains:
  - Dimension Tables (1 function)
  - RMS - Raw Material Sourcing (4 functions)
  - PM - Production Management (4 functions)
  - DSC - Distribution & Supply Chain (6 functions)
  - REUS - Retail & End User Sales (6 functions)
  - CFM - Consumer & Field Marketing (6 functions)
  - FC - Finance & Controlling (3 functions)
  - SESG - Sustainability & ESG (1 function)

- **Flexible Execution**: Run individual functions, multiple functions, or all 31 at once
- **Configurable**: Customize database and schema names for your environment
- **Snowflake Native**: Designed to work seamlessly with Snowflake Snowpark Python

## Installation

### From PyPI

```bash
pip install cpg-cdm-agg
```

### From Source

```bash
git clone https://github.com/yourusername/cpg-cdm-agg.git
cd cpg-cdm-agg
pip install -e .
```

## Quick Start

### Basic Usage with Executor

```python
from snowflake.snowpark import Session
from cpg_cdm_agg import GoldLayerExecutor, GoldLayerConfig

# Create Snowpark session
connection_parameters = {
    "account": "your_account",
    "user": "your_user",
    "password": "your_password",
    "role": "your_role",
    "warehouse": "your_warehouse",
    "database": "your_database",
    "schema": "your_schema"
}
session = Session.builder.configs(connection_parameters).create()

# Create configuration with your database/schema names
config = GoldLayerConfig(
    gold_db="MY_GOLD_DB",
    gold_schema="MY_GOLD_SCHEMA",
    silver_db="MY_SILVER_DB",
    silver_schema="MY_SILVER_SCHEMA"
)

# Create executor
executor = GoldLayerExecutor(session, config)

# Run all 31 aggregations
result = executor.run()
print(f"Successful: {result['successful']}, Failed: {result['failed']}")
```

### Run Specific Aggregations

```python
# Run only specific aggregations
result = executor.run([
    "dim_dates",
    "rms_fact_sustainability",
    "fc_budget"
])

# Run a single aggregation
result = executor.run_single("dim_dates")
```

### Using Individual Functions Directly

```python
from cpg_cdm_agg import (
    dim_dates,
    rms_delivery_delay,
    fc_budget,
    write_to_gold_iceberg
)

# Get your silver layer tables
dim_date_df = session.table("SILVER_LAYER.MAPPED_DATA.dim_date")

# Call the function
result_df = dim_dates(dim_date_df)

# Write to gold layer manually
result_df.write.mode("overwrite").save_as_table("GOLD_LAYER.AGGREGATED_DATA.dim_dates")

# Or use the helper function
row_count = write_to_gold_iceberg(result_df, "dim_dates", "GOLD_LAYER", "AGGREGATED_DATA")
```

## Using in Snowflake Stored Procedure

You can use this package directly in a Snowflake stored procedure:

### Step 1: Upload Package to Snowflake Stage

```sql
-- Create a stage for Python packages
CREATE OR REPLACE STAGE my_python_packages;

-- Upload the wheel file (after building)
PUT file:///path/to/cpg_cdm_agg-1.0.0-py3-none-any.whl @my_python_packages;
```

### Step 2: Create Stored Procedure

```sql
CREATE OR REPLACE PROCEDURE run_gold_aggregations(
    GOLD_DB STRING,
    GOLD_SCHEMA STRING,
    SILVER_DB STRING,
    SILVER_SCHEMA STRING
)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_python_packages/cpg_cdm_agg-1.0.0-py3-none-any.whl')
HANDLER = 'main'
AS
$$
from cpg_cdm_agg import GoldLayerExecutor, GoldLayerConfig

def main(session, gold_db, gold_schema, silver_db, silver_schema):
    config = GoldLayerConfig(
        gold_db=gold_db,
        gold_schema=gold_schema,
        silver_db=silver_db,
        silver_schema=silver_schema
    )
    
    executor = GoldLayerExecutor(session, config)
    result = executor.run()
    
    return f"Successful: {result['successful']}, Failed: {result['failed']}"
$$;

-- Execute the procedure
CALL run_gold_aggregations('GOLD_LAYER', 'AGGREGATED_DATA', 'SILVER_LAYER', 'MAPPED_DATA');
```

### Alternative: Run Specific Aggregations

```sql
CREATE OR REPLACE PROCEDURE run_selected_aggregations(
    GOLD_DB STRING,
    GOLD_SCHEMA STRING,
    SILVER_DB STRING,
    SILVER_SCHEMA STRING,
    AGGREGATION_NAMES ARRAY
)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_python_packages/cpg_cdm_agg-1.0.0-py3-none-any.whl')
HANDLER = 'main'
AS
$$
from cpg_cdm_agg import GoldLayerExecutor, GoldLayerConfig

def main(session, gold_db, gold_schema, silver_db, silver_schema, aggregation_names):
    config = GoldLayerConfig(
        gold_db=gold_db,
        gold_schema=gold_schema,
        silver_db=silver_db,
        silver_schema=silver_schema
    )
    
    executor = GoldLayerExecutor(session, config)
    result = executor.run(list(aggregation_names))
    
    return f"Successful: {result['successful']}, Failed: {result['failed']}"
$$;

-- Run specific aggregations
CALL run_selected_aggregations(
    'GOLD_LAYER',
    'AGGREGATED_DATA', 
    'SILVER_LAYER',
    'MAPPED_DATA',
    ARRAY_CONSTRUCT('dim_dates', 'fc_budget', 'rms_fact_sustainability')
);
```

## Available Aggregation Functions

### Complete List of 31 Functions

| # | Function Name | Domain | Description |
|---|--------------|--------|-------------|
| 1 | `dim_dates` | Dimension | Date dimension table |
| 2 | `rms_fact_sustainability` | RMS | Supplier sustainability metrics |
| 3 | `rms_delivery_delay` | RMS | Delivery delay analysis |
| 4 | `rms_procurement_lead_time` | RMS | Procurement lead time analysis |
| 5 | `rms_grn_to_po` | RMS | GRN to PO aggregated metrics |
| 6 | `pm_scrap_and_raw` | PM | Production scrap and raw material metrics |
| 7 | `pm_plan_adherence` | PM | Production plan adherence |
| 8 | `pm_downtime_per_shift_aggregated` | PM | Downtime aggregated metrics |
| 9 | `pm_downtime_per_shift_oeee` | PM | OEEE percentage metrics |
| 10 | `dsc_cogs` | DSC | Cost of goods sold metrics |
| 11 | `dsc_inventory_days` | DSC | Inventory days calculation |
| 12 | `dsc_damaged_return` | DSC | Damaged goods and returns |
| 13 | `dsc_transport_cost_per_unit` | DSC | Transportation cost per unit |
| 14 | `dsc_on_time_shipment_rate` | DSC | On-time shipment rate |
| 15 | `dsc_inbound_delivery_accuracy` | DSC | Inbound delivery accuracy |
| 16 | `reus_customer_retention_monthly_totals` | REUS | Monthly customer totals |
| 17 | `reus_customer_retention_monthly_retained` | REUS | Retained customers |
| 18 | `reus_customer_retention_monthly_new` | REUS | New customers |
| 19 | `reus_product_return_analysis` | REUS | Product return analysis |
| 20 | `reus_average_order_value_analysis` | REUS | Average order value |
| 21 | `reus_sales_per_outlet_analysis` | REUS | Sales per outlet |
| 22 | `cfm_conversion_rate` | CFM | Marketing conversion rate |
| 23 | `cfm_cost_per_conversion_spend` | CFM | Cost per conversion (spend) |
| 24 | `cfm_cost_per_conversion_engagement` | CFM | Cost per conversion (engagement) |
| 25 | `cfm_survey_response_rate` | CFM | Survey response rate |
| 26 | `cfm_customer_satisfaction_score` | CFM | Customer satisfaction score |
| 27 | `cfm_trade_promotion_effectiveness` | CFM | Trade promotion effectiveness |
| 28 | `fc_invoice_accuracy_analysis` | FC | Invoice accuracy analysis |
| 29 | `fc_budget` | FC | Budget vs actual spend |
| 30 | `fc_payment_timeliness_analysis` | FC | Payment timeliness |
| 31 | `sesg_emission_production_analysis` | SESG | CO2e emissions analysis |

## Configuration Options

### GoldLayerConfig

```python
from cpg_cdm_agg import GoldLayerConfig

config = GoldLayerConfig(
    gold_db="GOLD_LAYER",         # Default: "GOLD_LAYER"
    gold_schema="AGGREGATED_DATA", # Default: "AGGREGATED_DATA"
    silver_db="SILVER_LAYER",     # Default: "SILVER_LAYER"
    silver_schema="MAPPED_DATA"   # Default: "MAPPED_DATA"
)
```

## API Reference

### GoldLayerExecutor

```python
class GoldLayerExecutor:
    def __init__(self, session: Session, config: Optional[GoldLayerConfig] = None):
        """Initialize the executor with a Snowpark session and optional config."""
    
    def run(self, aggregations: Optional[List[str]] = None) -> Dict[str, Any]:
        """Run specified aggregations or all if none specified."""
    
    def run_single(self, aggregation_name: str) -> Dict[str, Any]:
        """Run a single aggregation by name."""
    
    def get_available_aggregations(self) -> List[str]:
        """Get list of all available aggregation function names."""
    
    def get_aggregation_info(self, aggregation_name: str) -> Optional[Dict[str, Any]]:
        """Get information about a specific aggregation."""
```

### Return Values

The `run()` method returns:

```python
{
    "total": 31,           # Total aggregations attempted
    "successful": 28,      # Successfully completed
    "failed": 3,           # Failed aggregations
    "results": [...],      # List of successful results
    "errors": [...]        # List of errors
}
```

Each result/error contains:

```python
{
    "name": "dim_dates",
    "status": "success",   # or "error"
    "message": "Created dim_dates",
    "rows": 1000
}
```

## Building the Package

### Build Wheel

```bash
cd cpg_cdm_agg
pip install build
python -m build
```

This creates:
- `dist/cpg_cdm_agg-1.0.0-py3-none-any.whl`
- `dist/cpg_cdm_agg-1.0.0.tar.gz`

### Upload to PyPI

```bash
pip install twine
twine upload dist/*
```

## Required Silver Layer Tables

The package expects the following tables in your Silver layer:

- `dim_date`
- `dim_supplier_master`
- `dim_material_master`
- `dim_product_master`
- `dim_production_line`
- `dim_facility_master`
- `dim_warehouse_master`
- `dim_carrier_master`
- `dim_distribution_center`
- `dim_truck_route`
- `dim_channel`
- `dim_material_category`
- `dim_retail_outlet`
- `dim_customer_master`
- `dim_region_hierarchy`
- `dim_campaign_master`
- `dim_promotion_master`
- `dim_account_master`
- `fact_goods_receipt`
- `fact_purchase_order_line`
- `fact_purchase_order_header`
- `fact_production_batch`
- `fact_shift_log`
- `fact_production_schedule_line`
- `fact_production_schedule_header`
- `fact_line_efficiency_log`
- `fact_equipment_downtime_log`
- `fact_product_inventory`
- `fact_sales_order_line`
- `fact_outbound_shipment`
- `fact_shipment_line`
- `fact_return_order`
- `fact_transportation_cost`
- `fact_inbound_shipment`
- `fact_sales_order`
- `fact_customer_visit`
- `fact_ad_impression`
- `fact_marketing_spend`
- `fact_consumer_survey`
- `fact_promotion_plan`
- `fact_trade_promotion`
- `fact_invoice`
- `fact_payment`
- `fact_budget`
- `fact_general_ledger`
- `fact_emission_record`

## License

MIT License

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.
