Skip to content
English
On this page

Ejercicio: Extracción de Texto con Textract y Lambda

Arquitectura

S3 -> Lambda -> Textract -> Lambda -> DynamoDB
                    |
                    v
                  SNS (notificaciones)

Parte 1: Configuración Inicial

1. Crear Buckets S3

  1. Ve a S3 en la consola AWS:
Crear dos buckets:
input-documents-{random-suffix}
processed-documents-{random-suffix}

Configuración:
- Región: tu-región
- Acceso público: Bloqueado
- Versionado: Habilitado

2. Crear Tabla DynamoDB

  1. Ve a DynamoDB:
Crear tabla:
Nombre: DocumentProcessing
Clave de partición: document_id (String)
Clave de ordenación: timestamp (Number)

3. Crear Tema SNS

  1. Ve a SNS:
Crear tema:
Nombre: document-processing-notifications
Tipo: Standard

Crear suscripción:
- Protocolo: Email
- Endpoint: tu-email@dominio.com

Parte 2: Configurar IAM

1. Crear Rol para Lambda

  1. Ve a IAM:
json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "textract:*",
                "s3:GetObject",
                "s3:PutObject",
                "s3:ListBucket",
                "dynamodb:PutItem",
                "sns:Publish",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}

Parte 3: Crear Funciones Lambda

1. Lambda para Iniciar Procesamiento

python
import boto3
import json
import os
import uuid
from datetime import datetime

def lambda_handler(event, context):
    # Inicializar clientes
    textract = boto3.client('textract')
    dynamodb = boto3.client('dynamodb')
    sns = boto3.client('sns')
    
    # Obtener información del archivo S3
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']
    
    try:
        # Iniciar procesamiento asíncrono con Textract
        response = textract.start_document_text_detection(
            DocumentLocation={
                'S3Object': {
                    'Bucket': bucket,
                    'Name': key
                }
            },
            NotificationChannel={
                'SNSTopicArn': os.environ['SNS_TOPIC_ARN'],
                'RoleArn': os.environ['TEXTRACT_ROLE_ARN']
            }
        )
        
        # Guardar información del trabajo en DynamoDB
        document_id = str(uuid.uuid4())
        timestamp = int(datetime.now().timestamp())
        
        dynamodb.put_item(
            TableName=os.environ['DYNAMODB_TABLE'],
            Item={
                'document_id': {'S': document_id},
                'timestamp': {'N': str(timestamp)},
                'job_id': {'S': response['JobId']},
                'status': {'S': 'IN_PROGRESS'},
                'source_bucket': {'S': bucket},
                'source_key': {'S': key}
            }
        )
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Document processing started',
                'job_id': response['JobId'],
                'document_id': document_id
            })
        }
        
    except Exception as e:
        print(e)
        sns.publish(
            TopicArn=os.environ['SNS_TOPIC_ARN'],
            Message=f'Error processing document {key}: {str(e)}',
            Subject='Document Processing Error'
        )
        raise e

2. Lambda para Procesar Resultados

python
import boto3
import json
import os
from datetime import datetime

def lambda_handler(event, context):
    # Inicializar clientes
    textract = boto3.client('textract')
    s3 = boto3.client('s3')
    dynamodb = boto3.client('dynamodb')
    sns = boto3.client('sns')
    
    try:
        # Obtener mensaje SNS
        message = json.loads(event['Records'][0]['Sns']['Message'])
        job_id = message['JobId']
        status = message['Status']
        
        if status == 'SUCCEEDED':
            # Obtener resultados de Textract
            response = textract.get_document_text_detection(JobId=job_id)
            
            # Extraer texto
            text_blocks = []
            for block in response['Blocks']:
                if block['BlockType'] == 'LINE':
                    text_blocks.append(block['Text'])
            
            # Obtener información del documento de DynamoDB
            dynamo_response = dynamodb.query(
                TableName=os.environ['DYNAMODB_TABLE'],
                IndexName='JobIdIndex',
                KeyConditionExpression='job_id = :job_id',
                ExpressionAttributeValues={
                    ':job_id': {'S': job_id}
                }
            )
            
            document_info = dynamo_response['Items'][0]
            document_id = document_info['document_id']['S']
            source_bucket = document_info['source_bucket']['S']
            source_key = document_info['source_key']['S']
            
            # Guardar resultados en S3
            output_key = f'processed/{document_id}/extracted_text.txt'
            s3.put_object(
                Bucket=os.environ['OUTPUT_BUCKET'],
                Key=output_key,
                Body='\n'.join(text_blocks)
            )
            
            # Actualizar DynamoDB
            dynamodb.update_item(
                TableName=os.environ['DYNAMODB_TABLE'],
                Key={
                    'document_id': {'S': document_id},
                    'timestamp': {'N': document_info['timestamp']['N']}
                },
                UpdateExpression='SET #status = :status, output_key = :output_key',
                ExpressionAttributeNames={
                    '#status': 'status'
                },
                ExpressionAttributeValues={
                    ':status': {'S': 'COMPLETED'},
                    ':output_key': {'S': output_key}
                }
            )
            
            # Notificar completación
            sns.publish(
                TopicArn=os.environ['SNS_TOPIC_ARN'],
                Message=f'Document {source_key} processing completed. Results available at s3://{os.environ["OUTPUT_BUCKET"]}/{output_key}',
                Subject='Document Processing Completed'
            )
            
        else:
            # Manejar error
            sns.publish(
                TopicArn=os.environ['SNS_TOPIC_ARN'],
                Message=f'Document processing failed for job {job_id}',
                Subject='Document Processing Failed'
            )
            
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': f'Processing {status}',
                'job_id': job_id
            })
        }
        
    except Exception as e:
        print(e)
        sns.publish(
            TopicArn=os.environ['SNS_TOPIC_ARN'],
            Message=f'Error processing Textract results: {str(e)}',
            Subject='Document Processing Error'
        )
        raise e

Parte 4: Configurar Triggers

1. Trigger S3 para Primera Lambda

  1. En la función Lambda:
    Add trigger
    Trigger configuration:
    - Service: S3
    - Bucket: input-documents-xxx
    - Event type: All object create events

2. Trigger SNS para Segunda Lambda

  1. En la función Lambda:
    Add trigger
    Trigger configuration:
    - Service: SNS
    - Topic: document-processing-notifications

Parte 5: Pruebas y Monitoreo

1. Probar el Sistema

  1. Subir documento de prueba:
bash
# Usando AWS CLI
aws s3 cp test-document.pdf s3://input-documents-xxx/
  1. Verificar en CloudWatch:
Logs de Lambda:
/aws/lambda/textract-processor
/aws/lambda/textract-results-processor

2. Monitorear Procesamiento

  1. En DynamoDB:

    • Verificar registros
    • Monitorear estado
  2. En S3:

    • Verificar archivos procesados

Parte 6: Agregar Métricas Personalizadas

1. Crear Dashboard CloudWatch

  1. En CloudWatch:
Create dashboard:
Widgets:
- Número de documentos procesados
- Tiempo promedio de procesamiento
- Errores de procesamiento

2. Agregar Métricas Personalizadas

python
# En las funciones Lambda
cloudwatch = boto3.client('cloudwatch')

# Métrica de tiempo de procesamiento
cloudwatch.put_metric_data(
    Namespace='DocumentProcessing',
    MetricData=[
        {
            'MetricName': 'ProcessingTime',
            'Value': processing_time,
            'Unit': 'Seconds'
        }
    ]
)

Verificación

1. Funcionalidad

  • [ ] Carga de documentos funciona
  • [ ] Textract procesa correctamente
  • [ ] Resultados se guardan en S3
  • [ ] DynamoDB se actualiza
  • [ ] Notificaciones SNS funcionan

2. Monitoreo

  • [ ] Logs disponibles
  • [ ] Métricas visibles
  • [ ] Dashboard funcional

Solución de Problemas Comunes

Error de Permisos

  1. Verificar rol IAM
  2. Revisar políticas
  3. Verificar cross-account access

Error de Procesamiento

  1. Verificar formato de documento
  2. Revisar logs de Textract
  3. Verificar límites de servicio

Limpieza

  1. Eliminar funciones Lambda
  2. Eliminar buckets S3
  3. Eliminar tabla DynamoDB
  4. Eliminar tema SNS
  5. Eliminar roles IAM

Conceptos Clave para el Examen

  • Integración de servicios AWS
  • Procesamiento asíncrono
  • Manejo de eventos
  • Almacenamiento y base de datos

Puntos importantes a recordar:

  1. Textract es asíncrono para documentos grandes
  2. Lambda puede procesar eventos de múltiples fuentes
  3. SNS maneja notificaciones asíncronas
  4. El monitoreo es crucial para el sistema

Para el examen Cloud Practitioner, enfócate en:

  • Propósito de cada servicio
  • Integración entre servicios
  • Casos de uso comunes
  • Aspectos básicos de seguridad