Módulo 6: Taller Práctico
Del Pipeline de Datos al Modelo de Machine Learning
Agenda del Taller Práctico
- Fase 1: Construcción del Pipeline Modularizado
- Configuración del entorno y carga de datos reales (Olist).
- Ingesta a la capa Bronce.
- Validación de Calidad de Datos.
- Transformación a la capa Silver (Modelo Dimensional).
- Agregación de negocio en la capa Gold.
- Orquestación del pipeline con Databricks Jobs.
- Fase 2: Del Dato al Insight con Machine Learning
- Ingeniería de Características para segmentación de clientes (RFM).
- Entrenamiento de un modelo K-Means con Scikit-learn.
- Seguimiento de experimentos con MLflow.
- Fase 3: Monitoreo y Visualización
- Creación de tableros para monitorear la salud del pipeline y los KPIs de negocio.
Fase 1: Pipeline Modularizado
Se transformará el proceso de ETL en una serie de notebooks, simulando un job de producción.
Notebook 0: `00_Setup_Environment`
Esta tarea se ejecuta una sola vez para preparar la infraestructura en Unity Catalog. Un catálogo dedicado aísla el proyecto y facilita la gobernanza.
# Celda 1: Definir la configuración del catálogo y esquemas
catalog_name = "sesion_5"
bronze_schema = "bronze"
# ... (y así sucesivamente para silver, gold, auditoria)
# Celda 2: Crear la infraestructura en Unity Catalog
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
# ... (creación de esquemas)
# Celda 3: Crear el volumen para los archivos crudos
bronze_volume_path = f"/Volumes/{catalog_name}/{bronze_schema}/raw_files"
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{bronze_schema}.raw_files")
print(f"Por favor, cargue los archivos CSV del dataset Olist en: {bronze_volume_path}")
Acción del Participante: Cargar los archivos del dataset Olist de Kaggle en el volumen especificado.
Notebook 1: `01_Bronze_Ingestion`
Esta tarea lee los archivos CSV del volumen y los persiste como tablas Delta en la capa Bronce, manteniendo los datos en su estado crudo original.
# Celda 2: Función de ayuda para cargar y escribir tablas
def ingest_csv_to_bronze(file_name, table_name):
df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{bronze_volume_path}/{file_name}")
df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{bronze_schema}.{table_name}")
print(f"Tabla '{table_name}' creada en la capa Bronce.")
# Celda 3: Ejecutar la ingesta para cada archivo
ingest_csv_to_bronze("olist_customers_dataset.csv", "customers_bronze")
# ... (y así para el resto de archivos)
🏆 Desafío: Completar el notebook para ingestar los archivos `olist_sellers_dataset.csv` y `product_category_name_translation.csv`.
Notebook 2: `02_DataQuality_Validation`
Este notebook actúa como un "guardián de calidad". Utiliza una función para ejecutar reglas de negocio. Si una regla se rompe (la consulta devuelve más de 0 filas), el pipeline se detiene con una excepción.
# Celda 1: Función de ayuda para validaciones de calidad
def ejecutar_validacion_calidad(query, descripcion_validacion):
print(f"Ejecutando validación: '{descripcion_validacion}'...")
df_resultado = spark.sql(query)
conteo_filas_malas = df_resultado.first()[0]
if conteo_filas_malas > 0:
raise Exception(f"VALIDACIÓN FALLIDA: Se encontraron {conteo_filas_malas} filas que violan la regla.")
else:
print(f"VALIDACIÓN EXITOSA ✔️")
# Celda 3: Ejecutar las reglas de negocio
# REGLA 1: La llave primaria de un cliente NUNCA debe ser nula.
query_pk_nula_cliente = "SELECT COUNT(*) FROM customers_bronze_vw WHERE customer_id IS NULL"
ejecutar_validacion_calidad(query_pk_nula_cliente, "La columna 'customer_id' no debe contener nulos.")
# REGLA 3: No debe haber pedidos sin al menos un item.
query_pedidos_sin_items = """
SELECT COUNT(o.order_id)
FROM orders_bronze_vw o
LEFT JOIN order_items_bronze_vw i ON o.order_id = i.order_id
WHERE i.order_id IS NULL
"""
ejecutar_validacion_calidad(query_pedidos_sin_items, "Existen pedidos que no tienen ningún item asociado.")
🤔 Punto de Análisis: La REGLA 3 fallará. ¿Qué implicaciones tiene esto para el `JOIN` en la capa Silver? ¿Por qué un `INNER JOIN` es una estrategia efectiva para manejar implícitamente este problema de calidad?
Notebook 3: `03_Silver_Transformation`
El corazón del ETL. Se transforman los datos validados de Bronce en un modelo dimensional limpio (Esquema en Estrella) en la capa Silver. Esta capa se convierte en la "fuente de la verdad" para el análisis.
-- Celda 2: Crear Dimensiones (dim_customers, dim_products)
CREATE OR REPLACE TABLE dim_products AS
SELECT
p.product_id,
p.product_category_name,
t.product_category_name_english AS product_category_name_en,
...
FROM sesion_5.bronze.products_bronze p
LEFT JOIN sesion_5.bronze.category_translation_bronze t ON p.product_category_name = t.product_category_name;
-- Celda 3: Crear Tabla de Hechos (fact_orders) particionada
CREATE OR REPLACE TABLE fact_orders
PARTITIONED BY (order_purchase_year)
AS
SELECT ...
FROM sesion_5.bronze.orders_bronze o
JOIN sesion_5.bronze.order_items_bronze i ON o.order_id = i.order_id
...
🏆 Desafío: Modificar la consulta de creación de `fact_orders` para añadir una nueva columna llamada `total_value` que sea la suma de `price` y `freight_value`.
Notebook 4: `04_Gold_Aggregation`
Se construyen tablas agregadas y pre-calculadas en la capa Gold. Estas tablas están diseñadas para responder preguntas de negocio específicas de manera muy rápida, alimentando directamente los tableros de BI.
-- Celda 2: Crear tabla de ventas mensuales por categoría
CREATE OR REPLACE TABLE monthly_sales_by_category_gold AS
SELECT
YEAR(fo.order_purchase_date) AS anio,
MONTH(fo.order_purchase_date) AS mes,
dp.product_category_name_en AS categoria_producto,
COUNT(DISTINCT fo.order_id) AS numero_pedidos,
SUM(fo.payment_value) AS ingresos_totales
FROM silver.fact_orders fo
JOIN silver.dim_products dp ON fo.product_id = dp.product_id
WHERE fo.order_status = 'delivered'
GROUP BY anio, mes, categoria_producto;
🏆 Desafío: Crear una nueva tabla Gold llamada `customer_lifetime_value_gold` que calcule, para cada `customer_unique_id`, el gasto total y el número total de pedidos que ha realizado.
Fase 2: Del Dato al Insight con ML
Se utilizarán los datos limpios de la capa Silver para segmentar clientes.
Notebook 5: Feature Engineering
Se crea una tabla `gold` con características específicas para el modelo, calculando métricas RFM (Recencia, Frecuencia, Monetario) con Spark SQL.
CREATE OR REPLACE TABLE
customer_features_gold AS
WITH rfm_metrics AS (
SELECT
c.customer_unique_id,
DATEDIFF(..., MAX(fo.order_purchase_date)) AS recency,
COUNT(DISTINCT fo.order_id) AS frequency,
SUM(fo.payment_value) AS monetary
FROM silver.fact_orders fo ...
) ...
Notebook 6: Model Training
Se entrena un modelo K-Means con `sklearn` y se registra la ejecución como un experimento de MLflow para su seguimiento y reproducibilidad.
# Configurar y ejecutar el experimento
user_email = spark.sql("SELECT current_user()").first()[0]
mlflow.set_experiment(f"/Users/{user_email}/...")
with mlflow.start_run() as run:
mlflow.autolog()
kmeans = KMeans(n_clusters=4, ...)
kmeans.fit(features_scaled)
silhouette = silhouette_score(...)
mlflow.log_metric("silhouette_score", silhouette)
Fase 3: Creación de Tableros
Se crearán tableros para monitorear la salud del pipeline y los KPIs de negocio.
Paso A: Crear Datasets
En la pestaña **Data** del Dashboard, se crean los datasets a partir de consultas SQL guardadas en el notebook `05_Dashboard_Queries`.
-- Ejemplo de consulta para un dataset
SELECT
categoria_producto,
SUM(ingresos_totales) AS ingresos
FROM gold.monthly_sales_by_category_gold
GROUP BY categoria_producto
ORDER BY ingresos DESC LIMIT 10;
Paso B: Construir Visualizaciones
En la pestaña **Canvas**, se añaden widgets de visualización y se conectan a los datasets creados en el paso anterior para construir los tableros.
Conclusión del Taller
Al finalizar esta sesión, los participantes habrán completado el ciclo de vida de un proyecto de datos de extremo a extremo:
- ✔️ Construido un pipeline de datos modular y automatizable.
- ✔️ Implementado un framework de validación de calidad de datos.
- ✔️ Aplicado la arquitectura Medallion con datos reales.
- ✔️ Realizado ingeniería de características para un caso de uso de ML.
- ✔️ Entrenado y evaluado un modelo de clustering con seguimiento de experimentos.
- ✔️ Creado tableros para el monitoreo operacional y de negocio.