Skip to content
English
On this page

Ejercicio: Data Warehouse con Amazon Redshift y AWS Glue

Parte 1: Configuración de Infraestructura y ETL

Escenario

Implementaremos un data warehouse que incluirá:

  • Carga de datos usando AWS Glue
  • Transformación con Glue ETL
  • Almacenamiento en Redshift
  • Análisis y consultas optimizadas

Estructura del Proyecto

data-warehouse/
├── data/
│   ├── raw/
│   │   ├── customers.csv
│   │   ├── orders.csv
│   │   └── products.csv
│   └── processed/
│       └── transformed/

├── glue/
│   ├── jobs/
│   │   ├── customer_transform.py
│   │   ├── order_transform.py
│   │   └── product_transform.py
│   └── scripts/
│       └── quality_check.py

├── redshift/
│   ├── schemas/
│   │   ├── staging/
│   │   │   └── tables.sql
│   │   └── warehouse/
│   │       ├── dimensions.sql
│   │       └── facts.sql
│   │
│   ├── scripts/
│   │   ├── create_cluster.py
│   │   ├── setup_users.sql
│   │   └── optimize_tables.sql
│   │
│   └── queries/
│       ├── analysis/
│       │   └── sales_analysis.sql
│       └── maintenance/
│           └── vacuum_analyze.sql

├── infrastructure/
│   ├── vpc/
│   │   └── setup_vpc.py
│   └── security/
│       ├── roles.json
│       └── policies.json

├── scripts/
│   ├── deploy.py
│   └── monitor.py

└── config/
    ├── redshift_config.json
    └── glue_config.json

1. Configuración de Redshift

1.1 Crear Cluster de Redshift

python
# redshift/scripts/create_cluster.py
import boto3
import json

def create_redshift_cluster():
    redshift = boto3.client('redshift')
    
    with open('config/redshift_config.json', 'r') as f:
        config = json.load(f)
    
    try:
        response = redshift.create_cluster(
            ClusterIdentifier=config['cluster_identifier'],
            NodeType='ra3.xlplus',
            NumberOfNodes=2,
            MasterUsername=config['master_username'],
            MasterUserPassword=config['master_password'],
            DBName='analytics_db',
            VpcSecurityGroupIds=[config['security_group_id']],
            ClusterSubnetGroupName=config['subnet_group'],
            PubliclyAccessible=False,
            Port=5439
        )
        return response['Cluster']
    except Exception as e:
        print(f"Error creating cluster: {str(e)}")
        raise

1.2 Definir Esquemas

sql
-- redshift/schemas/staging/tables.sql
-- Tablas de staging
CREATE SCHEMA staging;

CREATE TABLE staging.customers (
    customer_id VARCHAR(50),
    name VARCHAR(100),
    email VARCHAR(100),
    address VARCHAR(200),
    registration_date TIMESTAMP,
    last_purchase_date TIMESTAMP
);

CREATE TABLE staging.orders (
    order_id VARCHAR(50),
    customer_id VARCHAR(50),
    order_date TIMESTAMP,
    total_amount DECIMAL(10,2),
    status VARCHAR(20)
);

CREATE TABLE staging.products (
    product_id VARCHAR(50),
    name VARCHAR(100),
    category VARCHAR(50),
    price DECIMAL(10,2),
    stock INTEGER
);

-- redshift/schemas/warehouse/dimensions.sql
-- Tablas dimensionales
CREATE SCHEMA warehouse;

CREATE TABLE warehouse.dim_customer (
    customer_key INTEGER IDENTITY(1,1),
    customer_id VARCHAR(50),
    name VARCHAR(100),
    email VARCHAR(100),
    address VARCHAR(200),
    registration_date TIMESTAMP,
    last_purchase_date TIMESTAMP
)
DISTSTYLE KEY
DISTKEY (customer_id)
SORTKEY (customer_id);

CREATE TABLE warehouse.dim_product (
    product_key INTEGER IDENTITY(1,1),
    product_id VARCHAR(50),
    name VARCHAR(100),
    category VARCHAR(50),
    price DECIMAL(10,2)
)
DISTSTYLE KEY
DISTKEY (product_id)
SORTKEY (product_id);

-- redshift/schemas/warehouse/facts.sql
-- Tabla de hechos
CREATE TABLE warehouse.fact_sales (
    sale_key INTEGER IDENTITY(1,1),
    order_id VARCHAR(50),
    customer_key INTEGER,
    product_key INTEGER,
    order_date TIMESTAMP,
    quantity INTEGER,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(10,2)
)
DISTSTYLE KEY
DISTKEY (customer_key)
SORTKEY (order_date);

2. Configuración de AWS Glue

2.1 Job de Transformación

python
# glue/jobs/customer_transform.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

def transform_customers():
    glueContext = GlueContext(SparkContext.getOrCreate())
    
    # Leer datos
    customers_df = glueContext.create_dynamic_frame.from_catalog(
        database="raw_data",
        table_name="customers"
    )
    
    # Aplicar transformaciones
    transformed_df = customers_df.apply_mapping([
        ("id", "string", "customer_id", "string"),
        ("full_name", "string", "name", "string"),
        ("email", "string", "email", "string"),
        ("address", "string", "address", "string"),
        ("reg_date", "timestamp", "registration_date", "timestamp"),
        ("last_purchase", "timestamp", "last_purchase_date", "timestamp")
    ])
    
    # Escribir en Redshift
    glueContext.write_dynamic_frame.from_jdbc_conf(
        frame=transformed_df,
        catalog_connection="redshift_connection",
        connection_options={
            "dbtable": "staging.customers",
            "database": "analytics_db"
        },
        redshift_tmp_dir="s3://bucket/temp/"
    )

2.2 Control de Calidad

python
# glue/scripts/quality_check.py
def validate_data(glueContext, table_name):
    # Verificar nulos
    df = glueContext.create_dynamic_frame.from_catalog(
        database="staging",
        table_name=table_name
    ).toDF()
    
    null_counts = df.select([
        sum(col(c).isNull().cast("int")).alias(c)
        for c in df.columns
    ]).collect()[0]
    
    # Verificar duplicados
    duplicate_count = df.count() - df.dropDuplicates().count()
    
    return {
        "null_counts": null_counts,
        "duplicate_count": duplicate_count
    }

3. Scripts de Infraestructura

3.1 Configuración de VPC

python
# infrastructure/vpc/setup_vpc.py
def setup_vpc():
    ec2 = boto3.client('ec2')
    
    # Crear VPC
    vpc = ec2.create_vpc(
        CidrBlock='10.0.0.0/16',
        TagSpecifications=[
            {
                'ResourceType': 'vpc',
                'Tags': [
                    {
                        'Key': 'Name',
                        'Value': 'redshift-vpc'
                    }
                ]
            }
        ]
    )
    
    # Crear subredes
    subnet1 = ec2.create_subnet(
        VpcId=vpc['Vpc']['VpcId'],
        CidrBlock='10.0.1.0/24',
        AvailabilityZone='us-east-1a'
    )
    
    subnet2 = ec2.create_subnet(
        VpcId=vpc['Vpc']['VpcId'],
        CidrBlock='10.0.2.0/24',
        AvailabilityZone='us-east-1b'
    )
    
    return {
        'vpc_id': vpc['Vpc']['VpcId'],
        'subnet_ids': [
            subnet1['Subnet']['SubnetId'],
            subnet2['Subnet']['SubnetId']
        ]
    }

Verificación Parte 1

1. Verificar Redshift

  • [ ] Cluster creado
  • [ ] Esquemas creados
  • [ ] Conectividad establecida
  • [ ] Permisos configurados

2. Verificar Glue

  • [ ] Crawlers configurados
  • [ ] Jobs creados
  • [ ] Conexiones probadas
  • [ ] Transformaciones funcionando

3. Verificar Infraestructura

  • [ ] VPC configurada
  • [ ] Subnets creadas
  • [ ] Grupos de seguridad activos
  • [ ] IAM roles configurados

Troubleshooting Común

Errores de Redshift

  1. Verificar estado del cluster
  2. Revisar logs de conexión
  3. Verificar permisos de IAM

Errores de Glue

  1. Verificar scripts de transformación
  2. Revisar conexiones
  3. Verificar permisos S3

Errores de Red

  1. Verificar configuración VPC
  2. Revisar grupos de seguridad
  3. Verificar rutas y gateways

Parte 2: Carga de Datos, Optimización y Análisis

1. Carga de Datos en Redshift

1.1 Cargar Datos desde S3

python
# redshift/scripts/load_data.py
import boto3
import psycopg2

def load_from_s3():
    # Conexión a Redshift
    conn = psycopg2.connect(
        dbname='analytics_db',
        host='your-cluster.region.redshift.amazonaws.com',
        port=5439,
        user='admin',
        password='your-password'
    )
    
    cur = conn.cursor()
    
    try:
        # Cargar datos usando COPY
        copy_command = """
            COPY staging.customers
            FROM 's3://your-bucket/processed/customers/'
            IAM_ROLE 'arn:aws:iam::account:role/RedshiftLoadRole'
            FORMAT CSV
            IGNOREHEADER 1
            REGION 'us-east-1';
        """
        cur.execute(copy_command)
        
        conn.commit()
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        cur.close()
        conn.close()

1.2 Transformación a Tablas Finales

sql
-- redshift/scripts/transform_to_warehouse.sql
-- Cargar dimensión cliente
INSERT INTO warehouse.dim_customer (
    customer_id,
    name,
    email,
    address,
    registration_date,
    last_purchase_date
)
SELECT DISTINCT
    customer_id,
    name,
    email,
    address,
    registration_date,
    last_purchase_date
FROM staging.customers;

-- Cargar hechos de ventas
INSERT INTO warehouse.fact_sales (
    order_id,
    customer_key,
    product_key,
    order_date,
    quantity,
    unit_price,
    total_amount
)
SELECT 
    o.order_id,
    dc.customer_key,
    dp.product_key,
    o.order_date,
    oi.quantity,
    p.price as unit_price,
    (oi.quantity * p.price) as total_amount
FROM staging.orders o
JOIN staging.order_items oi ON o.order_id = oi.order_id
JOIN warehouse.dim_customer dc ON o.customer_id = dc.customer_id
JOIN warehouse.dim_product dp ON oi.product_id = dp.product_id
JOIN staging.products p ON oi.product_id = p.product_id;

2. Optimización de Rendimiento

2.1 Configuración de Distribución y Ordenamiento

sql
-- redshift/scripts/optimize_tables.sql
-- Análisis de distribución
SELECT 
    trim(tablename) as table,
    count(*) as rows,
    distribution_style
FROM svv_table_info
WHERE schema = 'warehouse'
GROUP BY tablename, distribution_style
ORDER BY rows desc;

-- Optimizar distribución
ALTER TABLE warehouse.fact_sales ALTER DISTSTYLE KEY DISTKEY(customer_key);
ALTER TABLE warehouse.fact_sales ALTER COMPOUND SORTKEY(order_date, customer_key);

-- Vaciar y analizar
VACUUM warehouse.fact_sales;
ANALYZE warehouse.fact_sales;

2.2 Script de Mantenimiento

python
# redshift/scripts/maintenance.py
def perform_maintenance():
    maintenance_queries = [
        "VACUUM DELETE ONLY warehouse.fact_sales;",
        "ANALYZE warehouse.fact_sales;",
        "VACUUM SORT ONLY warehouse.dim_customer;",
        "ANALYZE warehouse.dim_customer;"
    ]
    
    conn = get_redshift_connection()
    cur = conn.cursor()
    
    for query in maintenance_queries:
        try:
            cur.execute(query)
            conn.commit()
        except Exception as e:
            print(f"Error executing {query}: {str(e)}")
            conn.rollback()

3. Consultas de Análisis

3.1 Análisis de Ventas

sql
-- redshift/queries/analysis/sales_analysis.sql
-- Ventas por cliente y categoría
WITH customer_category_sales AS (
    SELECT 
        dc.name as customer_name,
        dp.category,
        SUM(fs.total_amount) as total_sales,
        COUNT(DISTINCT fs.order_id) as order_count,
        AVG(fs.total_amount) as avg_order_value
    FROM warehouse.fact_sales fs
    JOIN warehouse.dim_customer dc ON fs.customer_key = dc.customer_key
    JOIN warehouse.dim_product dp ON fs.product_key = dp.product_key
    WHERE fs.order_date >= DATEADD(month, -12, GETDATE())
    GROUP BY dc.name, dp.category
)
SELECT 
    customer_name,
    category,
    total_sales,
    order_count,
    avg_order_value,
    RANK() OVER (PARTITION BY category ORDER BY total_sales DESC) as rank_in_category
FROM customer_category_sales
WHERE total_sales > 1000
ORDER BY category, total_sales DESC;

3.2 Tendencias Temporales

sql
-- redshift/queries/analysis/time_analysis.sql
SELECT 
    DATE_TRUNC('month', fs.order_date) as month,
    dp.category,
    COUNT(DISTINCT fs.order_id) as orders,
    SUM(fs.total_amount) as revenue,
    SUM(fs.total_amount) / COUNT(DISTINCT fs.order_id) as avg_order_value,
    LAG(SUM(fs.total_amount)) OVER (PARTITION BY dp.category ORDER BY DATE_TRUNC('month', fs.order_date)) as prev_month_revenue,
    SUM(fs.total_amount) - LAG(SUM(fs.total_amount)) OVER (PARTITION BY dp.category ORDER BY DATE_TRUNC('month', fs.order_date)) as revenue_change
FROM warehouse.fact_sales fs
JOIN warehouse.dim_product dp ON fs.product_key = dp.product_key
GROUP BY DATE_TRUNC('month', fs.order_date), dp.category
ORDER BY month DESC, revenue DESC;

4. Monitoreo y Rendimiento

4.1 Monitoreo de Consultas

python
# scripts/monitor.py
def monitor_query_performance():
    monitoring_query = """
    SELECT 
        q.query,
        q.substring,
        q.starttime,
        q.endtime,
        q.elapsed/1000000 as elapsed_seconds,
        q.querytxt
    FROM stl_query q
    WHERE q.starttime >= DATEADD(hour, -24, GETDATE())
    AND q.elapsed > 1000000  -- queries longer than 1 second
    ORDER BY q.elapsed DESC
    LIMIT 10;
    """
    
    results = execute_query(monitoring_query)
    
    # Enviar resultados a CloudWatch
    cloudwatch = boto3.client('cloudwatch')
    for query in results:
        cloudwatch.put_metric_data(
            Namespace='Redshift/QueryPerformance',
            MetricData=[
                {
                    'MetricName': 'QueryDuration',
                    'Value': query['elapsed_seconds'],
                    'Unit': 'Seconds',
                    'Dimensions': [
                        {
                            'Name': 'QueryID',
                            'Value': str(query['query'])
                        }
                    ]
                }
            ]
        )

4.2 Dashboard de Métricas

json
// config/dashboard_config.json
{
    "widgets": [
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["AWS/Redshift", "CPUUtilization", "ClusterIdentifier", "analytics-cluster"],
                    ["AWS/Redshift", "DatabaseConnections", "ClusterIdentifier", "analytics-cluster"],
                    ["AWS/Redshift", "ReadIOPS", "ClusterIdentifier", "analytics-cluster"],
                    ["AWS/Redshift", "WriteIOPS", "ClusterIdentifier", "analytics-cluster"]
                ],
                "period": 300,
                "stat": "Average",
                "region": "us-east-1",
                "title": "Redshift Cluster Performance"
            }
        }
    ]
}

5. Automatización de Procesos

5.1 Script de Automatización

python
# scripts/automate.py
from datetime import datetime
import boto3

def automate_daily_process():
    # 1. Ejecutar ETL en Glue
    glue = boto3.client('glue')
    job_response = glue.start_job_run(
        JobName='daily-etl-job'
    )
    
    # 2. Cargar datos en Redshift
    load_from_s3()
    
    # 3. Ejecutar mantenimiento
    perform_maintenance()
    
    # 4. Generar reportes
    generate_daily_reports()
    
    # 5. Monitorear rendimiento
    monitor_query_performance()

def generate_daily_reports():
    reports = [
        'daily_sales_summary.sql',
        'inventory_status.sql',
        'customer_activity.sql'
    ]
    
    for report in reports:
        with open(f'redshift/queries/reports/{report}', 'r') as f:
            query = f.read()
            results = execute_query(query)
            
            # Guardar resultados en S3
            save_to_s3(
                results,
                f'reports/{datetime.now().strftime("%Y-%m-%d")}/{report}.csv'
            )

Verificación Final

1. Verificar Datos

  • [ ] Datos cargados correctamente
  • [ ] Integridad referencial
  • [ ] Calidad de datos
  • [ ] Actualizaciones funcionando

2. Verificar Rendimiento

  • [ ] Distribución optimizada
  • [ ] Consultas eficientes
  • [ ] Mantenimiento programado
  • [ ] Métricas monitoreadas

3. Verificar Reportes

  • [ ] Reportes generándose
  • [ ] Datos actualizados
  • [ ] Formato correcto
  • [ ] Distribución automática

Troubleshooting Final

Problemas de Rendimiento

  1. Revisar plan de consultas
  2. Verificar distribución
  3. Analizar métricas

Problemas de Datos

  1. Verificar logs de carga
  2. Revisar transformaciones
  3. Validar integridad

Problemas de Automatización

  1. Verificar programación
  2. Revisar dependencias
  3. Monitorear procesos

Puntos Importantes

  1. La distribución de datos es clave
  2. Mantener estadísticas actualizadas
  3. Monitorear constantemente
  4. Automatizar mantenimiento

Este ejercicio completo te permite:

  1. Cargar y transformar datos eficientemente
  2. Optimizar el rendimiento de Redshift
  3. Crear análisis avanzados
  4. Implementar monitoreo y automatización

Puntos clave para recordar:

  • La distribución correcta es crucial
  • El mantenimiento regular es necesario
  • El monitoreo ayuda a prevenir problemas
  • La automatización reduce errores