Skip to content
English
On this page

Ejercicio: Procesamiento de Imágenes con AWS Batch y Step Functions

Parte 1: Configuración de Infraestructura y Jobs

Escenario

Implementaremos un sistema de procesamiento por lotes que:

  • Procesa imágenes en paralelo usando AWS Batch
  • Orquesta flujos con Step Functions
  • Almacena resultados en S3
  • Notifica completitud con SNS

Estructura del Proyecto

image-batch-processor/
├── infrastructure/
│   ├── batch/
│   │   ├── compute_environment.json
│   │   ├── job_queue.json
│   │   └── job_definition.json
│   │
│   ├── containers/
│   │   ├── image_processor/
│   │   │   ├── Dockerfile
│   │   │   ├── requirements.txt
│   │   │   └── process_image.py
│   │   └── build_and_push.sh
│   │
│   └── vpc/
│       ├── vpc_setup.py
│       └── security_groups.json

├── src/
│   ├── lambdas/
│   │   ├── job_submitter/
│   │   │   └── handler.py
│   │   └── job_monitor/
│   │       └── handler.py
│   │
│   └── batch_jobs/
│       ├── image_resize.py
│       ├── image_watermark.py
│       └── utils.py

├── tests/
│   ├── unit/
│   │   └── test_image_processor.py
│   └── integration/
│       └── test_batch_job.py

├── scripts/
│   ├── deploy.sh
│   └── cleanup.sh

└── config/
    ├── batch_config.json
    └── job_parameters.json

1. Configuración de AWS Batch

1.1 Configurar Entorno de Computación

json
// infrastructure/batch/compute_environment.json
{
    "computeEnvironmentName": "ImageProcessingEnv",
    "type": "MANAGED",
    "state": "ENABLED",
    "computeResources": {
        "type": "SPOT",
        "maxvCpus": 16,
        "minvCpus": 0,
        "desiredvCpus": 0,
        "instanceTypes": [
            "optimal"
        ],
        "subnets": ["subnet-xxx", "subnet-yyy"],
        "securityGroupIds": ["sg-xxx"],
        "instanceRole": "ecsInstanceRole",
        "bidPercentage": 60,
        "spotIamFleetRole": "arn:aws:iam::account:role/AWSServiceRoleForEC2SpotFleet"
    },
    "serviceRole": "arn:aws:iam::account:role/service-role/AWSBatchServiceRole"
}

1.2 Crear Cola de Trabajos

json
// infrastructure/batch/job_queue.json
{
    "jobQueueName": "ImageProcessingQueue",
    "state": "ENABLED",
    "priority": 1,
    "computeEnvironmentOrder": [
        {
            "order": 1,
            "computeEnvironment": "ImageProcessingEnv"
        }
    ]
}

1.3 Definir Job

json
// infrastructure/batch/job_definition.json
{
    "jobDefinitionName": "ImageProcessor",
    "type": "container",
    "containerProperties": {
        "image": "account.dkr.ecr.region.amazonaws.com/image-processor:latest",
        "vcpus": 2,
        "memory": 2048,
        "command": ["python", "process_image.py", "Ref::input_bucket", "Ref::output_bucket"],
        "jobRoleArn": "arn:aws:iam::account:role/BatchJobRole",
        "volumes": [
            {
                "host": {
                    "sourcePath": "/tmp"
                },
                "name": "temp"
            }
        ],
        "mountPoints": [
            {
                "containerPath": "/tmp",
                "readOnly": false,
                "sourceVolume": "temp"
            }
        ],
        "environment": [
            {
                "name": "AWS_DEFAULT_REGION",
                "value": "us-east-1"
            }
        ]
    }
}

2. Contenedor de Procesamiento

2.1 Dockerfile

dockerfile
# infrastructure/containers/image_processor/Dockerfile
FROM python:3.9-slim

# Instalar dependencias del sistema
RUN apt-get update && apt-get install -y \
    ffmpeg \
    libsm6 \
    libxext6 \
    && rm -rf /var/lib/apt/lists/*

# Copiar archivos de requerimientos
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copiar código
COPY process_image.py .

# Punto de entrada
ENTRYPOINT ["python", "process_image.py"]

2.2 Script de Procesamiento

python
# infrastructure/containers/image_processor/process_image.py
import sys
import os
import boto3
import cv2
import numpy as np
from PIL import Image

def process_image(input_path, output_path):
    """
    Procesa una imagen aplicando transformaciones.
    """
    # Leer imagen
    image = cv2.imread(input_path)
    
    # Aplicar transformaciones
    resized = cv2.resize(image, (800, 600))
    
    # Aplicar filtro
    filtered = cv2.GaussianBlur(resized, (5,5), 0)
    
    # Guardar resultado
    cv2.imwrite(output_path, filtered)

def main():
    # Obtener parámetros
    input_bucket = sys.argv[1]
    output_bucket = sys.argv[2]
    
    s3 = boto3.client('s3')
    
    # Descargar imagen
    input_path = '/tmp/input.jpg'
    s3.download_file(input_bucket, os.environ['INPUT_KEY'], input_path)
    
    # Procesar
    output_path = '/tmp/output.jpg'
    process_image(input_path, output_path)
    
    # Subir resultado
    s3.upload_file(output_path, output_bucket, 
                  f"processed/{os.environ['INPUT_KEY']}")

if __name__ == "__main__":
    main()

2.3 Script de Construcción

bash
#!/bin/bash
# infrastructure/containers/build_and_push.sh

# Variables
IMAGE_NAME="image-processor"
AWS_ACCOUNT_ID="your-account-id"
AWS_REGION="us-east-1"

# Login a ECR
aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com

# Construir imagen
docker build -t $IMAGE_NAME .

# Etiquetar imagen
docker tag $IMAGE_NAME:latest $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/$IMAGE_NAME:latest

# Push a ECR
docker push $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/$IMAGE_NAME:latest

3. Configuración de VPC

3.1 Setup de VPC

python
# infrastructure/vpc/vpc_setup.py
import boto3

def create_vpc():
    ec2 = boto3.client('ec2')
    
    # Crear VPC
    vpc = ec2.create_vpc(
        CidrBlock='10.0.0.0/16',
        TagSpecifications=[
            {
                'ResourceType': 'vpc',
                'Tags': [
                    {
                        'Key': 'Name',
                        'Value': 'BatchVPC'
                    }
                ]
            }
        ]
    )
    
    # Crear subredes
    subnet1 = ec2.create_subnet(
        VpcId=vpc['Vpc']['VpcId'],
        CidrBlock='10.0.1.0/24',
        AvailabilityZone='us-east-1a'
    )
    
    subnet2 = ec2.create_subnet(
        VpcId=vpc['Vpc']['VpcId'],
        CidrBlock='10.0.2.0/24',
        AvailabilityZone='us-east-1b'
    )
    
    return {
        'vpc_id': vpc['Vpc']['VpcId'],
        'subnet_ids': [
            subnet1['Subnet']['SubnetId'],
            subnet2['Subnet']['SubnetId']
        ]
    }

Verificación Parte 1

1. Verificar AWS Batch

  • [ ] Entorno de computación creado
  • [ ] Cola de trabajos configurada
  • [ ] Definición de trabajo creada
  • [ ] IAM roles configurados

2. Verificar Contenedor

  • [ ] Imagen construida
  • [ ] Push a ECR exitoso
  • [ ] Pruebas locales pasadas
  • [ ] Dependencias instaladas

3. Verificar VPC

  • [ ] VPC creada
  • [ ] Subredes configuradas
  • [ ] Grupos de seguridad activos
  • [ ] Conectividad probada

Troubleshooting Común

Errores de AWS Batch

  1. Verificar roles IAM
  2. Revisar configuración de computación
  3. Verificar cola de trabajos

Errores de Contenedor

  1. Verificar Dockerfile
  2. Revisar dependencias
  3. Probar localmente

Errores de Red

  1. Verificar VPC
  2. Revisar grupos de seguridad
  3. Verificar acceso a S3

Parte 2: Orquestación y Monitoreo

1. Configuración de Step Functions

1.1 Definición del Estado de la Máquina

json
// src/state_machine/workflow.json
{
  "Comment": "Image Processing Workflow",
  "StartAt": "ValidateInput",
  "States": {
    "ValidateInput": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:validate-input",
      "Next": "SubmitBatchJob",
      "Catch": [{
        "ErrorEquals": ["ValidationError"],
        "Next": "NotifyError"
      }]
    },
    "SubmitBatchJob": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:submit-batch-job",
      "Next": "WaitForBatchJob",
      "Parameters": {
        "jobName.$": "$.jobName",
        "jobQueue": "ImageProcessingQueue",
        "jobDefinition": "ImageProcessor",
        "parameters": {
          "input_bucket.$": "$.inputBucket",
          "output_bucket.$": "$.outputBucket",
          "image_key.$": "$.imageKey"
        }
      }
    },
    "WaitForBatchJob": {
      "Type": "Wait",
      "Seconds": 30,
      "Next": "CheckBatchJob"
    },
    "CheckBatchJob": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:check-batch-job",
      "Next": "JobComplete?",
      "Parameters": {
        "jobId.$": "$.jobId"
      }
    },
    "JobComplete?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.status",
          "StringEquals": "SUCCEEDED",
          "Next": "ProcessResults"
        },
        {
          "Variable": "$.status",
          "StringEquals": "FAILED",
          "Next": "NotifyError"
        }
      ],
      "Default": "WaitForBatchJob"
    },
    "ProcessResults": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:process-results",
      "Next": "NotifySuccess"
    },
    "NotifySuccess": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:notify-success",
      "End": true
    },
    "NotifyError": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:notify-error",
      "End": true
    }
  }
}

1.2 Lambda para Validación

python
# src/lambdas/job_submitter/handler.py
import json
import boto3

def validate_input(event, context):
    required_params = ['inputBucket', 'outputBucket', 'imageKey']
    
    # Validar parámetros
    for param in required_params:
        if param not in event:
            raise Exception(f"Missing required parameter: {param}")
            
    # Validar existencia de imagen
    s3 = boto3.client('s3')
    try:
        s3.head_object(
            Bucket=event['inputBucket'],
            Key=event['imageKey']
        )
    except:
        raise Exception("Input image does not exist")
        
    return event

1.3 Lambda para Monitoreo de Jobs

python
# src/lambdas/job_monitor/handler.py
import boto3

def check_batch_job(event, context):
    batch = boto3.client('batch')
    
    response = batch.describe_jobs(
        jobs=[event['jobId']]
    )
    
    job = response['jobs'][0]
    
    return {
        'jobId': job['jobId'],
        'status': job['status'],
        'statusReason': job.get('statusReason', ''),
        'exitCode': job.get('container', {}).get('exitCode')
    }

2. Implementación de Notificaciones

2.1 Configurar SNS

python
# src/notifications/setup_sns.py
import boto3

def setup_notifications():
    sns = boto3.client('sns')
    
    # Crear tópico
    topic = sns.create_topic(
        Name='ImageProcessingNotifications'
    )
    
    # Crear suscripción
    sns.subscribe(
        TopicArn=topic['TopicArn'],
        Protocol='email',
        Endpoint='admin@example.com'
    )
    
    return topic['TopicArn']

2.2 Lambda para Notificaciones

python
# src/lambdas/notifier/handler.py
import boto3
import json

def notify_success(event, context):
    sns = boto3.client('sns')
    
    message = {
        'status': 'SUCCESS',
        'jobId': event['jobId'],
        'outputLocation': f"s3://{event['outputBucket']}/processed/{event['imageKey']}"
    }
    
    sns.publish(
        TopicArn='arn:aws:sns:region:account:ImageProcessingNotifications',
        Message=json.dumps(message),
        Subject='Image Processing Complete'
    )
    
    return event

3. Monitoreo y Métricas

3.1 CloudWatch Métricas

python
# src/monitoring/metrics.py
import boto3

def publish_metrics(job_status, processing_time):
    cloudwatch = boto3.client('cloudwatch')
    
    # Publicar métricas
    cloudwatch.put_metric_data(
        Namespace='ImageProcessing',
        MetricData=[
            {
                'MetricName': 'JobStatus',
                'Value': 1 if job_status == 'SUCCEEDED' else 0,
                'Unit': 'Count'
            },
            {
                'MetricName': 'ProcessingTime',
                'Value': processing_time,
                'Unit': 'Seconds'
            }
        ]
    )

3.2 Dashboard de Monitoreo

json
// src/monitoring/dashboard.json
{
    "widgets": [
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["ImageProcessing", "JobStatus", {"stat": "Sum"}],
                    ["ImageProcessing", "ProcessingTime", {"stat": "Average"}]
                ],
                "period": 300,
                "region": "us-east-1",
                "title": "Image Processing Metrics"
            }
        },
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["AWS/Batch", "JobsSubmitted", "JobQueue", "ImageProcessingQueue"],
                    ["AWS/Batch", "JobsFailed", "JobQueue", "ImageProcessingQueue"]
                ],
                "period": 300,
                "region": "us-east-1",
                "title": "Batch Job Metrics"
            }
        }
    ]
}

4. Scripts de Automatización

4.1 Script de Despliegue

bash
#!/bin/bash
# scripts/deploy.sh

# Variables
STACK_NAME="image-processing-stack"
TEMPLATE_FILE="template.yaml"
S3_BUCKET="deployment-artifacts"

# Empaquetar y desplegar
aws cloudformation package \
    --template-file $TEMPLATE_FILE \
    --s3-bucket $S3_BUCKET \
    --output-template-file packaged.yaml

aws cloudformation deploy \
    --template-file packaged.yaml \
    --stack-name $STACK_NAME \
    --capabilities CAPABILITY_IAM \
    --parameter-overrides \
        Environment=prod \
        NotificationEmail=admin@example.com

4.2 Script de Pruebas

python
# scripts/test_workflow.py
import boto3
import json
import time

def test_workflow():
    # Iniciar Step Function
    sfn = boto3.client('stepfunctions')
    
    input_data = {
        "inputBucket": "input-bucket",
        "outputBucket": "output-bucket",
        "imageKey": "test/image.jpg"
    }
    
    execution = sfn.start_execution(
        stateMachineArn='arn:aws:states:region:account:stateMachine:ImageProcessing',
        input=json.dumps(input_data)
    )
    
    # Monitorear ejecución
    while True:
        status = sfn.describe_execution(
            executionArn=execution['executionArn']
        )
        
        if status['status'] in ['SUCCEEDED', 'FAILED']:
            print(f"Workflow completed with status: {status['status']}")
            break
            
        time.sleep(30)

Verificación Final

1. Verificar Step Functions

  • [ ] Máquina de estado creada
  • [ ] Transiciones correctas
  • [ ] Manejo de errores
  • [ ] Notificaciones funcionando

2. Verificar Monitoreo

  • [ ] Métricas registradas
  • [ ] Dashboard creado
  • [ ] Alertas configuradas
  • [ ] Logs completos

3. Verificar Integración

  • [ ] Flujo completo funcionando
  • [ ] Batch jobs ejecutándose
  • [ ] Resultados procesados
  • [ ] Notificaciones enviadas

Troubleshooting Final

Errores de Step Functions

  1. Verificar definición de estado
  2. Revisar permisos IAM
  3. Verificar inputs/outputs

Errores de Monitoreo

  1. Verificar publicación de métricas
  2. Revisar configuración de dashboard
  3. Verificar entrega de alertas

Errores de Integración

  1. Verificar flujo de datos
  2. Revisar configuraciones de servicio
  3. Verificar conexiones entre servicios

Puntos Importantes

  1. Step Functions maneja la orquestación
  2. El monitoreo es crucial
  3. Las notificaciones mantienen informados a los usuarios
  4. La automatización reduce errores manuales

Este ejercicio completo te permite:

  1. Orquestar flujos de trabajo complejos
  2. Integrar múltiples servicios AWS
  3. Implementar monitoreo robusto
  4. Automatizar procesos de despliegue

Puntos clave para recordar:

  • Step Functions maneja el flujo
  • AWS Batch procesa trabajos
  • El monitoreo es esencial
  • La automatización es crucial