Metadata-Version: 2.4
Name: dbx-elt-utils
Version: 0.1.3
Summary: Librería de utilidades comunes para pipelines ELT orientados a Databricks (DLT).
Author-email: DBX Analyst <developer@example.com>
Requires-Python: >=3.9
Provides-Extra: local
Requires-Dist: databricks-connect>=14.0.0; extra == 'local'
Description-Content-Type: text/markdown

# DBX ELT Utilities

`dbx-elt-utils` es una librería de utilidades diseñada para simplificar y unificar la creación de pipelines **Delta Live Tables (DLT)** en Databricks. Proporciona métodos estandarizados para ingesta híbrida, setup automático del entorno (Local vs Nube) y limpieza de datos estructurados.

## 🚀 Instalación

**Para uso en producción (Pipelines de Databricks):**
```bash
pip install dbx-elt-utils
```
*(Nota: La librería no fuerza la instalación interna de `pyspark`, previniendo conflictos con el Databricks Runtime).*

**Para desarrollo local (VS Code / Python Kernel):**
Incluye el SDK oficial para que tu PC pueda conectarse y simular ejecuciones.
```bash
pip install dbx-elt-utils[local]
```

---

## 🛠️ Cómo Inicializar un Notebook

Olvídate de configurar el espantoso `sys.path`. Solo necesitas estas 5 líneas al inicio de cualquier notebook (Bronze, Silver o Gold):

```python
from dbx_elt_utils.notebook_utils import init_notebook

# Detecta automáticamente si estás en la Nube de Databricks o probando en local.
# Si estás en local, activa un "Mock" de DLT para que no tire error de importación.
notebook = init_notebook()

# Variables listas para usar:
env = notebook.env       # Ej: '_dev' o '_prod'
spark = notebook.spark   # Tu sesión de PySpark
dlt = notebook.dlt       # El módulo delta live tables (o el mock si estás local)
```

---

## 📦 Módulos Principales y Funcionalidad

Esta librería está dividida en 3 módulos lógicos:

### 1. `ingest_utils.py` (Ingesta Inteligente)
Diseñado para la capa **Bronze**. Su función principal es abstraer la complejidad técnica de AutoLoader y lectura de Unity Catalog.

* `ingesta_hibrida(spark, origen, tipo="auto_detect", formato_archivo="json", env_suffix="_dev")`
  * **Qué hace:** Detecta si tu origen es una ruta RAW (como `abfss://...`) o una tabla externa de Databricks para crear un *Streaming DataFrame* perfecto para DLT.
  * **Súper poder:** Si es una ruta (AutoLoader), activa mágicamente la opción `schemaEvolutionMode="addNewColumns"`. Esto evita que el pipeline se rompa si mañana llega un CSV con más columnas de las esperadas.
  * **Uso típico:** `df = ingesta_hibrida(spark, "abfss://contenedor@storage.dfs.core.windows.net/data.csv", formato_archivo="csv")`

### 2. `clean_utils.py` (Limpieza y Estandarización)
Diseñado para la capa **Silver**.

* `extraer_valor_array_string(columna)`
  * **Qué hace:** Las APIs externas muchas veces entregan valores atrapados en arreglos de texto (ej. `["12345"]`). Esta función recibe la columna de Spark, elimina corchetes y comillas limpiamente usando expresiones regulares, convirtiéndolo a texto puro (`12345`).
  * **Súper poder:** Si el string viene vacío (`"[]"`), lo convierte formalmente a un valor `NULL` real de base de datos.
  * **Uso típico:** `df.withColumn("id_limpio", extraer_valor_array_string(col("id_crudo")))`

### 3. `notebook_utils.py` (Pruebas Locales Seguras)
Ideal para tus ciclos de prueba en VS Code (Mock Local). Además del `init_notebook()`, incluye herramientas de protección:

* `get_local_source_table(spark, source_official)`
  * **Qué hace:** Cuando pruebas la capa Silver/Gold localmente, no quieres arriesgarte a sobrescribir tablas oficiales en Unity Catalog (`table`). Esta función averigua si corriste un "Test Bronze Local" antes. Si sí, fuerza a tu notebook Silver a leer los datos de prueba (`temporary.table_tmp_sql`). Si no, lee la oficial.

* `clean_local_test_table(spark, source_table)`
  * **Qué hace:** Aplica el patrón *Read & Destroy*. Destruye las tablas temporales (`_tmp_sql`) después de que haces `df.show()`, manteniendo el catálogo limpio.

* `get_test_spark()`
  * **Qué hace:** Crea e invoca la sesión de Databricks Connect. 

* `stop_local_spark()`
  * **Qué hace:** El salvavidas principal. Al terminar una prueba local, mata el proceso persistente de Java/Databricks-Connect. Sin esta función, tu kernel de local de Python se quedaría colgado hasta que reinicies VS Code.
  * **Uso típico:** Poner siempre en la cláusula `finally:` de tus bloques de test.

---

##  Ejemplo de Uso: Capa Bronze Completa

Así luce un flujo típico de ingesta usando DBX ELT Utils, reduciendo toda la complejidad técnica a un par de funciones limpias:

`python
# 1. Setup del Entorno
from dbx_elt_utils.notebook_utils import init_notebook

# Detecta entorno (DBX vs Local) y prepara variables
notebook = init_notebook()
env = notebook.env
spark = notebook.spark
dlt = notebook.dlt

# 2. Configuración y Lógica
from dbx_elt_utils.ingest_utils import ingesta_hibrida

# 1. DEFINICIÓN DE ORIGEN Y DESTINO
SOURCE_LANDING = "landing_catalog.schema.table_name"   # <-- Poner origen real aquí

CATALOG_TARGET = f"bronze{env}"     # Normalmente no se cambia ('bronze_dev' o 'bronze_prod')
SCHEMA_TARGET  = "mi_esquema"       # <-- Tu esquema funcional (ej: 'ventas' o 'recursos_humanos')
TARGET_TABLE   = "mi_tabla_raw"     # <-- Nombre de la tabla destino

def transformar_bronze(spark_session, is_streaming=True):
    """
    Función pura que define la lógica de extracción y transformación inicial.
    Al estar en una función, podemos testearla localmente sin depender de DLT.
    """
    # 2. INGESTA HÍBRIDA
    # 'auto_detect' decide si leer Delta Table o Archivos (CloudFiles).
    # Ya maneja automáticamente si la lectura debe ser en formato continuado (Streaming) o como foto (Batch).
    df_source = ingesta_hibrida(
        spark_session, 
        SOURCE_LANDING, 
        tipo="auto_detect"
    )
    
    # 3. TRANSFORMAR CON SQL
    # Registramos una vista temporal. Esto es un puente mágico de PySpark a SQL.
    # Todo lo que tenga 'df_source' ahora se puede consultar vía SQL.
    df_source.createOrReplaceTempView("v_raw_source")
    
    return spark_session.sql("""
        SELECT 
            *,                                  -- Traer todo el origen inalterado
            current_timestamp() as _ingested_at -- Añadimos Metadata de auditoría obligatoria en Bronze
        FROM v_raw_source
    """)


# 3. Empaquetado DLT
# Este bloque solo se ejecuta realmente en el motor interno de Databricks DLT.
# En código local, el decorador es ignorado limpiamente.

@dlt.table(
    name=TARGET_TABLE,
    comment=f"Tabla Bronze cruda desde {SOURCE_LANDING}",
    table_properties={"quality": "bronze",
        "tags": "governance=demo"
    }
)
def dlt_wrapper():
    # DLT siempre funciona en Streaming de fondo para las cargas incrementales (Bronze).
    return transformar_bronze(spark, is_streaming=True)


`
