Skip to content
English
On this page

Ejercicio: Contador de Monedas con AWS Rekognition

Descripción del Escenario

Desarrollaremos una aplicación web que permitirá a los usuarios cargar fotos de monedas y automáticamente calculará el valor total utilizando AWS Rekognition para la detección y clasificación. La aplicación deberá:

  • Aceptar imágenes mediante drag & drop o selección de archivo
  • Procesar la imagen usando AWS Rekognition
  • Detectar y clasificar las monedas
  • Calcular el valor total
  • Mostrar visualización detallada del análisis
  • Guardar histórico de análisis
  • Proporcionar estadísticas de uso

Estructura del Proyecto

coin-counter/
├── frontend/
│   ├── src/
│   │   ├── components/
│   │   │   ├── ImageUploader/
│   │   │   ├── Analysis/
│   │   │   │   ├── CoinDetection/
│   │   │   │   ├── ValueCalculator/
│   │   │   │   └── ResultsDisplay/
│   │   │   └── History/
│   │   ├── services/
│   │   │   ├── api.js
│   │   │   └── imageProcessor.js
│   │   ├── store/
│   │   └── utils/
│   └── tests/

├── backend/
│   ├── app/
│   │   ├── api/
│   │   │   ├── routes/
│   │   │   └── endpoints/
│   │   ├── core/
│   │   │   ├── config.py
│   │   │   └── rekognition.py
│   │   ├── models/
│   │   ├── services/
│   │   │   ├── coin_detector.py
│   │   │   ├── value_calculator.py
│   │   │   └── image_processor.py
│   │   └── utils/
│   ├── tests/
│   │   ├── unit/
│   │   └── integration/
│   └── requirements/

├── infrastructure/
│   ├── terraform/
│   └── docker/

└── docs/

Etapas del Ejercicio

Etapa 1: Configuración y Setup Base

  • Configurar proyecto FastAPI
  • Configurar proyecto Vue.js
  • Establecer estructura de directorios
  • Configurar AWS credentials
  • Implementar upload básico de imágenes

Etapa 2: Integración con Rekognition

  • Configurar cliente Rekognition
  • Implementar detección de objetos
  • Procesar respuestas de Rekognition
  • Implementar clasificación de monedas
  • Manejar errores de AWS

Etapa 3: Procesamiento de Imágenes

  • Implementar preprocesamiento de imágenes
  • Validar formatos y tamaños
  • Optimizar imágenes para Rekognition
  • Implementar detección de calidad
  • Manejar diferentes condiciones de luz

Etapa 4: Algoritmos de Detección

  • Implementar detección de círculos
  • Clasificar monedas por tamaño
  • Calcular valores basados en dimensiones
  • Implementar corrección de perspectiva
  • Manejar solapamiento de monedas

Etapa 5: Frontend Interactivo

  • Implementar drag & drop
  • Crear visualización de resultados
  • Implementar feedback en tiempo real
  • Añadir histórico de análisis
  • Implementar estadísticas

Etapa 6: Testing y Optimización

  • Implementar pruebas unitarias
  • Crear pruebas de integración
  • Optimizar rendimiento
  • Implementar caché
  • Manejar errores

Etapa 7: Deployment y Monitoreo

  • Configurar CI/CD
  • Implementar logging
  • Configurar alertas
  • Documentar API
  • Desplegar a producción

Consideraciones Técnicas Principales

  1. Preprocesamiento de Imágenes:

    • Normalización de brillo y contraste
    • Detección de bordes
    • Corrección de perspectiva
    • Calibración de escala
  2. Algoritmo de Detección:

    • Detección de círculos (Hough Transform)
    • Clasificación por tamaño
    • Matching de patrones
    • Manejo de oclusiones
  3. Integración AWS:

    • Rekognition Custom Labels
    • S3 para almacenamiento
    • CloudWatch para monitoreo
    • Lambda para procesamiento
  4. Consideraciones de UI/UX:

    • Feedback visual en tiempo real
    • Indicadores de progreso
    • Visualización de detección
    • Historial de análisis

Modelo de Datos

python
# backend/app/models/analysis.py
from sqlalchemy import Column, Integer, String, Float, DateTime
from sqlalchemy.dialects.postgresql import JSONB

class CoinAnalysis(Base):
    __tablename__ = 'coin_analyses'
    
    id = Column(Integer, primary_key=True)
    image_url = Column(String, nullable=False)
    total_value = Column(Float, nullable=False)
    coin_counts = Column(JSONB, nullable=False)
    confidence_score = Column(Float)
    processing_time = Column(Float)
    created_at = Column(DateTime, nullable=False)
    metadata = Column(JSONB)

# backend/app/schemas/analysis.py
class CoinAnalysisCreate(BaseModel):
    image_url: str
    total_value: float
    coin_counts: Dict[str, int]
    confidence_score: Optional[float]
    processing_time: Optional[float]
    metadata: Optional[Dict[str, Any]]

class CoinAnalysisResponse(BaseModel):
    id: int
    image_url: str
    total_value: float
    coin_counts: Dict[str, int]
    confidence_score: float
    created_at: datetime

Ejemplos de Tests

python
# backend/tests/unit/test_coin_detector.py
import pytest
from app.services.coin_detector import CoinDetector
from app.services.value_calculator import ValueCalculator

class TestCoinDetector:
    @pytest.fixture
    def detector(self):
        return CoinDetector()

    def test_detect_coins_in_clear_image(self, detector):
        image_path = "tests/fixtures/clear_coins.jpg"
        result = detector.detect_coins(image_path)
        
        assert result.coin_count > 0
        assert result.confidence_score > 0.8
        assert isinstance(result.coin_locations, list)

    def test_handle_poor_lighting(self, detector):
        image_path = "tests/fixtures/dark_coins.jpg"
        result = detector.detect_coins(image_path)
        
        assert result.confidence_score < 0.8
        assert result.warnings == ["Poor lighting detected"]

    def test_calculate_total_value(self):
        calculator = ValueCalculator()
        coins = {
            "25_cent": 4,
            "10_cent": 3,
            "5_cent": 2
        }
        
        total = calculator.calculate_total(coins)
        assert total == 1.45

# backend/tests/integration/test_rekognition.py
class TestRekognitionIntegration:
    @pytest.mark.integration
    def test_end_to_end_detection(self):
        image_path = "tests/fixtures/mixed_coins.jpg"
        
        # Upload image
        s3_client = boto3.client('s3')
        s3_url = upload_to_s3(image_path)
        
        # Process with Rekognition
        rekognition = RekognitionService()
        result = rekognition.analyze_image(s3_url)
        
        # Validate results
        assert result.status == "success"
        assert result.total_value > 0
        assert len(result.detected_coins) > 0

Frontend Aspectos Clave

vue
<!-- frontend/src/components/ImageUploader/index.vue -->
<template>
  <div 
    class="image-uploader"
    @dragover.prevent
    @drop.prevent="handleDrop"
  >
    <div v-if="!image" class="upload-prompt">
      <icon-upload />
      <p>Arrastra una imagen o haz click para seleccionar</p>
      <input 
        type="file"
        ref="fileInput"
        @change="handleFileSelect"
        accept="image/*"
        class="hidden"
      >
    </div>
    
    <div v-else class="preview-container">
      <img :src="imagePreview" class="preview-image">
      <coin-detection-overlay
        :detections="detections"
        :imageSize="imageSize"
      />
    </div>
    
    <analysis-results
      v-if="results"
      :results="results"
      @retry="resetUpload"
    />
  </div>
</template>

<script>
export default {
  data() {
    return {
      image: null,
      imagePreview: null,
      detections: [],
      results: null,
      imageSize: { width: 0, height: 0 }
    }
  },
  
  methods: {
    async handleDrop(e) {
      const file = e.dataTransfer.files[0]
      if (file && file.type.startsWith('image/')) {
        await this.processImage(file)
      }
    },
    
    async processImage(file) {
      try {
        this.loading = true
        const formData = new FormData()
        formData.append('image', file)
        
        const response = await this.api.analyzeCoinImage(formData)
        
        this.detections = response.detections
        this.results = response.results
      } catch (error) {
        this.error = error.message
      } finally {
        this.loading = false
      }
    }
  }
}
</script>

APIs y Endpoints

python
# backend/app/api/endpoints/analysis.py
from fastapi import APIRouter, UploadFile, File
from app.services.coin_detector import CoinDetector
from app.services.value_calculator import ValueCalculator

router = APIRouter()

@router.post("/analyze")
async def analyze_image(
    image: UploadFile = File(...)
) -> Dict[str, Any]:
    # Validar imagen
    if not image.content_type.startswith("image/"):
        raise HTTPException(400, "File must be an image")
    
    try:
        # Procesar imagen
        detector = CoinDetector()
        coins = await detector.detect_coins(image)
        
        # Calcular valor
        calculator = ValueCalculator()
        total_value = calculator.calculate_total(coins)
        
        return {
            "total_value": total_value,
            "coin_counts": coins,
            "confidence_score": detector.confidence_score
        }
    except Exception as e:
        raise HTTPException(500, str(e))

Siguientes Pasos

  1. Comenzar con la Etapa 1 implementando la estructura base
  2. Configurar el entorno de desarrollo
  3. Implementar pruebas iniciales
  4. Comenzar con la integración de AWS

Etapa 1: Configuración y Setup Base.

python
# Backend: FastAPI Setup

# backend/app/core/config.py
from pydantic_settings import BaseSettings
from typing import Optional
import os

class Settings(BaseSettings):
    PROJECT_NAME: str = "Coin Counter"
    API_V1_STR: str = "/api/v1"
    
    # AWS Configuration
    AWS_ACCESS_KEY_ID: str
    AWS_SECRET_ACCESS_KEY: str
    AWS_REGION: str = "us-east-1"
    S3_BUCKET: str
    
    # Database Configuration
    POSTGRES_SERVER: str
    POSTGRES_USER: str
    POSTGRES_PASSWORD: str
    POSTGRES_DB: str
    SQLALCHEMY_DATABASE_URI: Optional[str] = None
    
    # CORS Configuration
    BACKEND_CORS_ORIGINS: list = ["http://localhost:3000"]
    
    # File Upload Settings
    MAX_FILE_SIZE: int = 5 * 1024 * 1024  # 5MB
    ALLOWED_EXTENSIONS: set = {"jpg", "jpeg", "png"}
    
    # Rekognition Configuration
    REKOGNITION_MIN_CONFIDENCE: float = 80.0
    CUSTOM_LABELS_PROJECT_ARN: Optional[str] = None
    
    class Config:
        case_sensitive = True
        env_file = ".env"

settings = Settings()

# backend/app/db/base.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.core.config import settings

engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

# backend/app/models/analysis.py
from sqlalchemy import Column, Integer, String, Float, DateTime, JSON
from sqlalchemy.sql import func
from app.db.base import Base

class Analysis(Base):
    __tablename__ = "analyses"

    id = Column(Integer, primary_key=True, index=True)
    image_path = Column(String, nullable=False)
    total_value = Column(Float, nullable=False)
    coin_details = Column(JSON, nullable=False)
    confidence_score = Column(Float)
    processing_time = Column(Float)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())

# backend/app/api/deps.py
from typing import Generator
from app.db.base import SessionLocal
from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")

def get_db() -> Generator:
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

async def verify_api_key(api_key: str = Depends(api_key_header)):
    if api_key != settings.API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate API key"
        )
    return api_key

# backend/main.py
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from app.core.config import settings
from app.api.api_v1.api import api_router

app = FastAPI(
    title=settings.PROJECT_NAME,
    openapi_url=f"{settings.API_V1_STR}/openapi.json"
)

# Set up CORS
if settings.BACKEND_CORS_ORIGINS:
    app.add_middleware(
        CORSMiddleware,
        allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

app.include_router(api_router, prefix=settings.API_V1_STR)

# Frontend: Vue 3 Setup

# frontend/src/App.vue
<template>
  <div class="min-h-screen bg-gray-100">
    <nav class="bg-white shadow-sm">
      <div class="max-w-7xl mx-auto px-4 sm:px-6 lg:px-8">
        <div class="flex justify-between h-16">
          <div class="flex">
            <div class="flex-shrink-0 flex items-center">
              <h1 class="text-xl font-bold text-gray-900">Coin Counter</h1>
            </div>
          </div>
        </div>
      </div>
    </nav>

    <main class="max-w-7xl mx-auto py-6 sm:px-6 lg:px-8">
      <div class="px-4 py-6 sm:px-0">
        <router-view></router-view>
      </div>
    </main>
  </div>
</template>

<script setup>
import { onMounted } from 'vue'
import { useStore } from 'vuex'

const store = useStore()

onMounted(() => {
  // Initialize any required store state
})
</script>

# frontend/src/store/index.js
import { createStore } from 'vuex'

export default createStore({
  state: {
    analyses: [],
    currentAnalysis: null,
    loading: false,
    error: null
  },
  
  mutations: {
    SET_ANALYSES(state, analyses) {
      state.analyses = analyses
    },
    SET_CURRENT_ANALYSIS(state, analysis) {
      state.currentAnalysis = analysis
    },
    SET_LOADING(state, loading) {
      state.loading = loading
    },
    SET_ERROR(state, error) {
      state.error = error
    }
  },
  
  actions: {
    async fetchAnalyses({ commit }) {
      try {
        commit('SET_LOADING', true)
        const response = await api.getAnalyses()
        commit('SET_ANALYSES', response.data)
      } catch (error) {
        commit('SET_ERROR', error.message)
      } finally {
        commit('SET_LOADING', false)
      }
    },
    
    async createAnalysis({ commit }, imageData) {
      try {
        commit('SET_LOADING', true)
        const response = await api.analyzeImage(imageData)
        commit('SET_CURRENT_ANALYSIS', response.data)
        return response.data
      } catch (error) {
        commit('SET_ERROR', error.message)
        throw error
      } finally {
        commit('SET_LOADING', false)
      }
    }
  },
  
  getters: {
    recentAnalyses: state => {
      return state.analyses.slice(0, 5)
    },
    hasError: state => !!state.error
  }
})

# frontend/src/api/index.js
import axios from 'axios'

const api = axios.create({
  baseURL: import.meta.env.VITE_API_URL,
  headers: {
    'Content-Type': 'application/json'
  }
})

api.interceptors.request.use(config => {
  const apiKey = import.meta.env.VITE_API_KEY
  if (apiKey) {
    config.headers['X-API-Key'] = apiKey
  }
  return config
})

export default {
  async analyzeImage(formData) {
    return api.post('/analyze', formData, {
      headers: {
        'Content-Type': 'multipart/form-data'
      }
    })
  },
  
  async getAnalyses() {
    return api.get('/analyses')
  },
  
  async getAnalysis(id) {
    return api.get(`/analyses/${id}`)
  }
}

# docker-compose.yml
version: '3.8'

services:
  backend:
    build: ./backend
    ports:
      - "8000:8000"
    environment:
      - POSTGRES_SERVER=db
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=coin_counter
    depends_on:
      - db
    volumes:
      - ./backend:/app

  frontend:
    build: ./frontend
    ports:
      - "3000:3000"
    volumes:
      - ./frontend:/app
      - /app/node_modules
    environment:
      - VITE_API_URL=http://localhost:8000/api/v1

  db:
    image: postgres:13
    volumes:
      - postgres_data:/var/lib/postgresql/data
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=coin_counter

volumes:
  postgres_data:

Esta primera etapa establece:

  1. Backend FastAPI:

    • Configuración del proyecto
    • Modelos de base de datos
    • Dependencias básicas
    • Configuración CORS
  2. Frontend Vue 3:

    • Estructura base
    • Store con Vuex
    • Cliente API
    • Componentes iniciales
  3. Docker Setup:

    • Contenedores para cada servicio
    • Configuración de desarrollo
    • Volúmenes persistentes
    • Variables de entorno

Los siguientes pasos serían:

  1. Configurar las variables de entorno
  2. Inicializar la base de datos
  3. Probar la conexión API
  4. Verificar CORS y seguridad

Etapa 2: Integración con Rekognition.

python
# backend/app/services/rekognition_service.py
import boto3
import logging
from typing import Dict, Any, List, Optional
from PIL import Image
import io
from botocore.exceptions import ClientError
from app.core.config import settings

logger = logging.getLogger(__name__)

class RekognitionService:
    def __init__(self):
        self.client = boto3.client(
            'rekognition',
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
            region_name=settings.AWS_REGION
        )
        self.s3_client = boto3.client(
            's3',
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
            region_name=settings.AWS_REGION
        )

    async def analyze_image(self, image_bytes: bytes) -> Dict[str, Any]:
        try:
            # Detectar objetos en la imagen
            response = self.client.detect_custom_labels(
                Image={'Bytes': image_bytes},
                ProjectVersionArn=settings.CUSTOM_LABELS_PROJECT_ARN,
                MinConfidence=settings.REKOGNITION_MIN_CONFIDENCE
            )

            # Procesar detecciones
            coins = self._process_detections(response['CustomLabels'])
            
            # Calcular dimensiones y posiciones relativas
            image = Image.open(io.BytesIO(image_bytes))
            dimensions = {'width': image.width, 'height': image.height}
            
            return {
                'coins': coins,
                'dimensions': dimensions,
                'confidence_score': self._calculate_average_confidence(coins),
                'detection_metadata': self._generate_metadata(response, dimensions)
            }

        except ClientError as e:
            logger.error(f"AWS Rekognition error: {str(e)}")
            raise
        except Exception as e:
            logger.error(f"Error analyzing image: {str(e)}")
            raise

    def _process_detections(
        self,
        detections: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        processed_coins = []
        
        for detection in detections:
            if self._is_coin(detection):
                coin_info = {
                    'type': self._determine_coin_type(detection),
                    'value': self._get_coin_value(detection['Name']),
                    'confidence': detection['Confidence'],
                    'location': self._extract_location(detection['Geometry']),
                    'size': self._calculate_size(detection['Geometry']),
                    'rotation': self._calculate_rotation(detection.get('Pose', {}))
                }
                processed_coins.append(coin_info)

        return processed_coins

    def _is_coin(self, detection: Dict[str, Any]) -> bool:
        coin_labels = {'penny', 'nickel', 'dime', 'quarter', 'half_dollar', 'dollar'}
        return detection['Name'].lower() in coin_labels

    def _determine_coin_type(self, detection: Dict[str, Any]) -> str:
        # Mapeo de etiquetas a tipos de monedas
        coin_mapping = {
            'penny': '0.01',
            'nickel': '0.05',
            'dime': '0.10',
            'quarter': '0.25',
            'half_dollar': '0.50',
            'dollar': '1.00'
        }
        return coin_mapping.get(detection['Name'].lower(), 'unknown')

    def _get_coin_value(self, coin_type: str) -> float:
        value_mapping = {
            'penny': 0.01,
            'nickel': 0.05,
            'dime': 0.10,
            'quarter': 0.25,
            'half_dollar': 0.50,
            'dollar': 1.00
        }
        return value_mapping.get(coin_type.lower(), 0.0)

    def _extract_location(self, geometry: Dict[str, Any]) -> Dict[str, float]:
        bbox = geometry['BoundingBox']
        return {
            'x': bbox['Left'],
            'y': bbox['Top'],
            'width': bbox['Width'],
            'height': bbox['Height']
        }

    def _calculate_size(self, geometry: Dict[str, Any]) -> float:
        # Calcular área relativa del objeto
        return geometry['BoundingBox']['Width'] * geometry['BoundingBox']['Height']

    def _calculate_rotation(self, pose: Dict[str, Any]) -> Optional[float]:
        return pose.get('Rotation', 0.0)

    def _calculate_average_confidence(
        self,
        coins: List[Dict[str, Any]]
    ) -> float:
        if not coins:
            return 0.0
        return sum(coin['confidence'] for coin in coins) / len(coins)

    def _generate_metadata(
        self,
        response: Dict[str, Any],
        dimensions: Dict[str, int]
    ) -> Dict[str, Any]:
        return {
            'image_dimensions': dimensions,
            'detection_count': len(response['CustomLabels']),
            'timestamp': response['ResponseMetadata']['HTTPHeaders']['date'],
            'request_id': response['ResponseMetadata']['RequestId']
        }

# backend/app/services/coin_analyzer.py
from typing import Dict, Any, List
import numpy as np
from dataclasses import dataclass
from PIL import Image
import io

@dataclass
class CoinAnalysis:
    total_value: float
    coin_count: Dict[str, int]
    confidence_score: float
    dimensions: Dict[str, Any]
    detections: List[Dict[str, Any]]

class CoinAnalyzer:
    def __init__(self, rekognition_service):
        self.rekognition_service = rekognition_service
        self.size_thresholds = {
            'penny': {'min': 0.008, 'max': 0.012},
            'nickel': {'min': 0.012, 'max': 0.016},
            'dime': {'min': 0.007, 'max': 0.011},
            'quarter': {'min': 0.014, 'max': 0.018}
        }

    async def analyze_coins(self, image_bytes: bytes) -> CoinAnalysis:
        # Obtener detecciones de Rekognition
        rekognition_results = await self.rekognition_service.analyze_image(image_bytes)
        
        # Validar y refinar detecciones
        refined_detections = self._refine_detections(rekognition_results['coins'])
        
        # Calcular valor total
        total_value = sum(coin['value'] for coin in refined_detections)
        
        # Contar monedas por tipo
        coin_counts = self._count_coins(refined_detections)
        
        return CoinAnalysis(
            total_value=total_value,
            coin_count=coin_counts,
            confidence_score=rekognition_results['confidence_score'],
            dimensions=rekognition_results['dimensions'],
            detections=refined_detections
        )

    def _refine_detections(
        self,
        detections: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        refined = []
        
        for detection in detections:
            # Validar por tamaño
            if self._validate_coin_size(detection):
                # Verificar solapamiento
                if not self._check_overlap(detection, refined):
                    refined.append(detection)
                    
        return refined

    def _validate_coin_size(self, detection: Dict[str, Any]) -> bool:
        coin_type = detection['type']
        size = detection['size']
        
        thresholds = self.size_thresholds.get(coin_type)
        if not thresholds:
            return True
            
        return thresholds['min'] <= size <= thresholds['max']

    def _check_overlap(
        self,
        detection: Dict[str, Any],
        existing_detections: List[Dict[str, Any]]
    ) -> bool:
        for existing in existing_detections:
            if self._calculate_iou(
                detection['location'],
                existing['location']
            ) > 0.3:  # 30% overlap threshold
                return True
        return False

    def _calculate_iou(
        self,
        box1: Dict[str, float],
        box2: Dict[str, float]
    ) -> float:
        # Calcular coordenadas de intersección
        x1 = max(box1['x'], box2['x'])
        y1 = max(box1['y'], box2['y'])
        x2 = min(box1['x'] + box1['width'], box2['x'] + box2['width'])
        y2 = min(box1['y'] + box1['height'], box2['y'] + box2['height'])
        
        # Calcular área de intersección
        if x2 <= x1 or y2 <= y1:
            return 0.0
            
        intersection = (x2 - x1) * (y2 - y1)
        
        # Calcular áreas individuales
        box1_area = box1['width'] * box1['height']
        box2_area = box2['width'] * box2['height']
        
        # Calcular IoU
        union = box1_area + box2_area - intersection
        return intersection / union if union > 0 else 0.0

    def _count_coins(self, detections: List[Dict[str, Any]]) -> Dict[str, int]:
        counts = {}
        for detection in detections:
            coin_type = detection['type']
            counts[coin_type] = counts.get(coin_type, 0) + 1
        return counts

# backend/app/api/endpoints/analysis.py
from fastapi import APIRouter, UploadFile, File, Depends
from sqlalchemy.orm import Session
from typing import Dict, Any
from app.services.rekognition_service import RekognitionService
from app.services.coin_analyzer import CoinAnalyzer
from app.db.deps import get_db
from app.models.analysis import Analysis

router = APIRouter()

@router.post("/analyze")
async def analyze_image(
    image: UploadFile = File(...),
    db: Session = Depends(get_db)
) -> Dict[str, Any]:
    # Validar imagen
    contents = await image.read()
    if len(contents) > settings.MAX_FILE_SIZE:
        raise HTTPException(400, "File too large")
        
    # Analizar imagen
    rekognition_service = RekognitionService()
    analyzer = CoinAnalyzer(rekognition_service)
    
    try:
        analysis = await analyzer.analyze_coins(contents)
        
        # Guardar resultados
        db_analysis = Analysis(
            image_path=f"uploads/{image.filename}",
            total_value=analysis.total_value,
            coin_details={
                'counts': analysis.coin_count,
                'detections': analysis.detections
            },
            confidence_score=analysis.confidence_score
        )
        
        db.add(db_analysis)
        db.commit()
        db.refresh(db_analysis)
        
        return {
            "id": db_analysis.id,
            "total_value": analysis.total_value,
            "coin_count": analysis.coin_count,
            "confidence_score": analysis.confidence_score,
            "detections": analysis.detections
        }
        
    except Exception as e:
        logger.error(f"Error analyzing image: {str(e)}")
        raise HTTPException(500, "Error analyzing image")

Esta segunda etapa implementa:

  1. Servicio Rekognition:

    • Conexión con AWS
    • Procesamiento de imágenes
    • Detección de monedas
    • Cálculo de dimensiones
  2. Analizador de Monedas:

    • Validación de detecciones
    • Cálculo de valores
    • Manejo de solapamiento
    • Refinamiento de resultados
  3. API Endpoints:

    • Upload de imágenes
    • Procesamiento
    • Almacenamiento de resultados
    • Manejo de errores

Los siguientes pasos serían:

  1. Configurar credenciales AWS
  2. Entrenar modelo de Custom Labels
  3. Probar detecciones
  4. Ajustar umbrales

Etapa 3: Procesamiento de Imágenes, enfocándonos en la optimización y preparación de imágenes antes del análisis.

python
# backend/app/services/image_processor.py
import cv2
import numpy as np
from PIL import Image
import io
from typing import Tuple, Optional, Dict, Any
from dataclasses import dataclass
import logging
from app.core.config import settings

logger = logging.getLogger(__name__)

@dataclass
class ProcessedImage:
    image: np.ndarray
    enhanced: bool
    metadata: Dict[str, Any]
    quality_score: float

class ImageProcessor:
    def __init__(self):
        self.target_size = (800, 800)
        self.min_brightness = 0.3
        self.max_brightness = 0.85
        self.min_contrast = 0.4
        self.quality_threshold = 0.6

    async def process_image(
        self,
        image_bytes: bytes
    ) -> Tuple[bytes, Dict[str, Any]]:
        try:
            # Convertir bytes a imagen
            image = self._bytes_to_opencv(image_bytes)
            
            # Evaluar calidad
            quality_score = self._assess_quality(image)
            
            # Preprocesar imagen
            processed = await self._preprocess_image(
                image,
                quality_score < self.quality_threshold
            )
            
            # Convertir resultado a bytes
            output_bytes = self._opencv_to_bytes(processed.image)
            
            return output_bytes, {
                "quality_score": processed.quality_score,
                "enhanced": processed.enhanced,
                "metadata": processed.metadata
            }
            
        except Exception as e:
            logger.error(f"Error processing image: {str(e)}")
            raise

    def _bytes_to_opencv(self, image_bytes: bytes) -> np.ndarray:
        nparr = np.frombuffer(image_bytes, np.uint8)
        return cv2.imdecode(nparr, cv2.IMREAD_COLOR)

    def _opencv_to_bytes(self, image: np.ndarray) -> bytes:
        success, buffer = cv2.imencode('.jpg', image, [
            cv2.IMWRITE_JPEG_QUALITY, 95
        ])
        if not success:
            raise ValueError("Failed to encode image")
        return buffer.tobytes()

    async def _preprocess_image(
        self,
        image: np.ndarray,
        needs_enhancement: bool
    ) -> ProcessedImage:
        metadata = {}
        enhanced = False

        # Redimensionar si es necesario
        if max(image.shape[:2]) > max(self.target_size):
            image = self._resize_image(image)
            metadata["resized"] = True

        if needs_enhancement:
            # Mejorar brillo y contraste
            image = self._enhance_brightness_contrast(image)
            
            # Reducir ruido
            image = cv2.fastNlMeansDenoisingColored(image)
            
            # Mejorar nitidez
            image = self._sharpen_image(image)
            
            enhanced = True
            metadata["enhanced"] = True

        # Normalizar
        image = self._normalize_image(image)
        
        # Evaluar calidad final
        final_quality = self._assess_quality(image)

        return ProcessedImage(
            image=image,
            enhanced=enhanced,
            metadata=metadata,
            quality_score=final_quality
        )

    def _assess_quality(self, image: np.ndarray) -> float:
        scores = []
        
        # Evaluar brillo
        brightness = np.mean(image) / 255
        brightness_score = self._score_in_range(
            brightness,
            self.min_brightness,
            self.max_brightness
        )
        scores.append(brightness_score)
        
        # Evaluar contraste
        contrast = np.std(image) / 255
        contrast_score = self._score_in_range(
            contrast,
            self.min_contrast,
            1.0
        )
        scores.append(contrast_score)
        
        # Evaluar nitidez
        sharpness = self._calculate_sharpness(image)
        scores.append(sharpness)
        
        return np.mean(scores)

    def _score_in_range(
        self,
        value: float,
        min_val: float,
        max_val: float
    ) -> float:
        if value < min_val:
            return value / min_val
        if value > max_val:
            return 1 - ((value - max_val) / (1 - max_val))
        return 1.0

    def _calculate_sharpness(self, image: np.ndarray) -> float:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
        # Normalizar a un rango 0-1
        return min(laplacian_var / 500, 1.0)

    def _resize_image(self, image: np.ndarray) -> np.ndarray:
        height, width = image.shape[:2]
        
        # Calcular nueva dimensión manteniendo aspecto
        if height > width:
            new_height = self.target_size[0]
            new_width = int(width * (new_height / height))
        else:
            new_width = self.target_size[1]
            new_height = int(height * (new_width / width))
            
        return cv2.resize(
            image,
            (new_width, new_height),
            interpolation=cv2.INTER_AREA
        )

    def _enhance_brightness_contrast(self, image: np.ndarray) -> np.ndarray:
        # Convertir a LAB
        lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
        l, a, b = cv2.split(lab)
        
        # Aplicar CLAHE al canal L
        clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
        l = clahe.apply(l)
        
        # Combinar canales
        enhanced_lab = cv2.merge([l, a, b])
        
        # Convertir de vuelta a BGR
        return cv2.cvtColor(enhanced_lab, cv2.COLOR_LAB2BGR)

    def _sharpen_image(self, image: np.ndarray) -> np.ndarray:
        kernel = np.array([
            [-1,-1,-1],
            [-1, 9,-1],
            [-1,-1,-1]
        ])
        return cv2.filter2D(image, -1, kernel)

    def _normalize_image(self, image: np.ndarray) -> np.ndarray:
        return cv2.normalize(
            image,
            None,
            alpha=0,
            beta=255,
            norm_type=cv2.NORM_MINMAX
        )

# backend/app/services/perspective_corrector.py
import cv2
import numpy as np
from typing import List, Tuple, Optional

class PerspectiveCorrector:
    def __init__(self):
        self.reference_diameter = 19.05  # Diámetro de penny en mm

    def correct_perspective(
        self,
        image: np.ndarray,
        coins: List[dict]
    ) -> Tuple[np.ndarray, List[dict]]:
        try:
            # Encontrar monedas circulares
            circles = self._find_circles(image)
            if not circles:
                return image, coins

            # Calcular transformación de perspectiva
            transform_matrix = self._calculate_perspective_transform(
                image,
                circles
            )
            
            if transform_matrix is None:
                return image, coins

            # Aplicar transformación
            height, width = image.shape[:2]
            corrected_image = cv2.warpPerspective(
                image,
                transform_matrix,
                (width, height)
            )

            # Actualizar coordenadas de monedas
            corrected_coins = self._update_coin_coordinates(
                coins,
                transform_matrix
            )

            return corrected_image, corrected_coins

        except Exception as e:
            logger.error(f"Error correcting perspective: {str(e)}")
            return image, coins

    def _find_circles(
        self,
        image: np.ndarray
    ) -> Optional[np.ndarray]:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        blurred = cv2.GaussianBlur(gray, (9, 9), 2)
        
        circles = cv2.HoughCircles(
            blurred,
            cv2.HOUGH_GRADIENT,
            dp=1,
            minDist=50,
            param1=50,
            param2=30,
            minRadius=20,
            maxRadius=100
        )
        
        return circles[0] if circles is not None else None

    def _calculate_perspective_transform(
        self,
        image: np.ndarray,
        circles: np.ndarray
    ) -> Optional[np.ndarray]:
        # Seleccionar 4 círculos más grandes y distantes
        sorted_circles = sorted(
            circles,
            key=lambda x: x[2],
            reverse=True
        )[:4]
        
        if len(sorted_circles) < 4:
            return None

        # Convertir centros a puntos
        points = np.float32([circle[:2] for circle in sorted_circles])
        
        # Calcular rectángulo destino
        rect = cv2.minAreaRect(points)
        box = cv2.boxPoints(rect)
        box = np.int0(box)
        
        # Ordenar puntos
        src_points = self._order_points(points)
        dst_points = self._order_points(box.astype(np.float32))
        
        # Calcular matriz de transformación
        return cv2.getPerspectiveTransform(src_points, dst_points)

    def _order_points(self, points: np.ndarray) -> np.ndarray:
        # Ordenar puntos en orden: top-left, top-right, bottom-right, bottom-left
        rect = np.zeros((4, 2), dtype=np.float32)
        
        s = points.sum(axis=1)
        rect[0] = points[np.argmin(s)]  # top-left
        rect[2] = points[np.argmax(s)]  # bottom-right
        
        d = np.diff(points, axis=1)
        rect[1] = points[np.argmin(d)]  # top-right
        rect[3] = points[np.argmax(d)]  # bottom-left
        
        return rect

    def _update_coin_coordinates(
        self,
        coins: List[dict],
        transform_matrix: np.ndarray
    ) -> List[dict]:
        updated_coins = []
        
        for coin in coins:
            # Obtener coordenadas originales
            x = coin['location']['x']
            y = coin['location']['y']
            
            # Aplicar transformación
            point = np.array([[x, y]], dtype=np.float32)
            transformed_point = cv2.perspectiveTransform(
                point[None, :, :],
                transform_matrix
            )
            
            # Actualizar coordenadas
            coin_copy = coin.copy()
            coin_copy['location']['x'] = float(transformed_point[0][0][0])
            coin_copy['location']['y'] = float(transformed_point[0][0][1])
            
            updated_coins.append(coin_copy)
            
        return updated_coins

# backend/app/api/endpoints/analysis.py
from fastapi import APIRouter, UploadFile, File, Depends, HTTPException
from app.services.image_processor import ImageProcessor
from app.services.perspective_corrector import PerspectiveCorrector
from app.services.coin_analyzer import CoinAnalyzer
import asyncio

router = APIRouter()

@router.post("/analyze")
async def analyze_image(
    image: UploadFile = File(...),
    db: Session = Depends(get_db)
) -> Dict[str, Any]:
    try:
        contents = await image.read()
        
        # Procesar imagen
        image_processor = ImageProcessor()
        perspective_corrector = PerspectiveCorrector()
        analyzer = CoinAnalyzer(rekognition_service)
        
        # Realizar procesamiento en paralelo
        processed_image, processing_metadata = await image_processor.process_image(
            contents
        )
        
        # Analizar monedas
        analysis = await analyzer.analyze_coins(processed_image)
        
        # Corregir perspectiva si es necesario
        if processing_metadata['quality_score'] < 0.8:
            corrected_image, updated_detections = perspective_corrector.correct_perspective(
                cv2.imdecode(
                    np.frombuffer(processed_image, np.uint8),
                    cv2.IMREAD_COLOR
                ),
                analysis.detections
            )
            analysis.detections = updated_detections
        
        return {
            "total_value": analysis.total_value,
            "coin_count": analysis.coin_count,
            "confidence_score": analysis.confidence_score,
            "processing_metadata": processing_metadata,
            "detections": analysis.detections
        }
        
    except Exception as e:
        logger.error(f"Error processing image: {str(e)}")
        raise HTTPException(
            status_code=500,
            detail="Error processing image"
        )

Esta tercera etapa implementa:

  1. Procesamiento de Imágenes:

    • Normalización y redimensionamiento
    • Mejora de brillo y contraste
    • Reducción de ruido
    • Evaluación de calidad
  2. Corrección de Perspectiva:

    • Detección de círculos
    • Transformación de perspectiva
    • Actualización de coordenadas
    • Calibración de escala
  3. Pipeline de Procesamiento:

    • Procesamiento asíncrono
    • Manejo de errores
    • Metadatos de procesamiento
    • Optimización de recursos

Características clave:

  1. Optimización de Imagen:

    • Ajuste automático de parámetros
    • Manejo de diferentes condiciones de luz
    • Preservación de detalles importantes
    • Control de calidad
  2. Corrección Geométrica:

    • Detección de distorsión
    • Corrección de ángulo
    • Mantenimiento de proporciones
    • Calibración automática
  3. Manejo de Errores:

    • Validación robusta
    • Fallbacks elegantes
    • Logging detallado
    • Recuperación de errores

Etapa 4: Algoritmos de Detección, enfocándonos en la implementación precisa de la detección y clasificación de monedas.

python
# backend/app/services/detection/coin_detector.py
import cv2
import numpy as np
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass
import logging
from scipy.spatial.distance import cdist

logger = logging.getLogger(__name__)

@dataclass
class CoinDetection:
    type: str
    value: float
    center: Tuple[int, int]
    radius: float
    confidence: float
    features: Dict[str, Any]

class CoinDetector:
    def __init__(self):
        self.coin_templates = {
            'penny': {
                'diameter_mm': 19.05,
                'color_range': ((0, 20, 40), (20, 50, 80)),  # Cobre
                'features': self._load_coin_features('penny')
            },
            'nickel': {
                'diameter_mm': 21.21,
                'color_range': ((90, 90, 90), (140, 140, 140)),  # Plata
                'features': self._load_coin_features('nickel')
            },
            'dime': {
                'diameter_mm': 17.91,
                'color_range': ((90, 90, 90), (140, 140, 140)),  # Plata
                'features': self._load_coin_features('dime')
            },
            'quarter': {
                'diameter_mm': 24.26,
                'color_range': ((90, 90, 90), (140, 140, 140)),  # Plata
                'features': self._load_coin_features('quarter')
            }
        }
        
        # Inicializar detector de características
        self.feature_detector = cv2.SIFT_create()
        self.matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=True)

    def detect_coins(self, image: np.ndarray) -> List[CoinDetection]:
        try:
            # Preprocesar imagen
            processed = self._preprocess_image(image)
            
            # Detectar círculos
            circles = self._detect_circles(processed)
            
            if circles is None:
                return []
                
            detections = []
            
            for circle in circles[0]:
                x, y, r = map(int, circle)
                
                # Extraer región de interés (ROI)
                roi = self._extract_roi(image, x, y, r)
                
                # Clasificar moneda
                coin_type, confidence = self._classify_coin(roi, r)
                
                if confidence > 0.6:  # Umbral de confianza
                    detection = CoinDetection(
                        type=coin_type,
                        value=self._get_coin_value(coin_type),
                        center=(x, y),
                        radius=r,
                        confidence=confidence,
                        features=self._extract_features(roi)
                    )
                    detections.append(detection)
            
            # Validar y refinar detecciones
            refined_detections = self._refine_detections(detections)
            
            return refined_detections
            
        except Exception as e:
            logger.error(f"Error detecting coins: {str(e)}")
            raise

    def _preprocess_image(self, image: np.ndarray) -> np.ndarray:
        # Convertir a escala de grises
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        
        # Aplicar suavizado Gaussiano
        blurred = cv2.GaussianBlur(gray, (9, 9), 2)
        
        # Ecualización de histograma adaptativo
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
        enhanced = clahe.apply(blurred)
        
        return enhanced

    def _detect_circles(self, image: np.ndarray) -> np.ndarray:
        circles = cv2.HoughCircles(
            image,
            cv2.HOUGH_GRADIENT,
            dp=1,
            minDist=50,
            param1=50,
            param2=30,
            minRadius=20,
            maxRadius=100
        )
        return circles

    def _extract_roi(
        self,
        image: np.ndarray,
        x: int,
        y: int,
        r: int
    ) -> np.ndarray:
        # Crear máscara circular
        mask = np.zeros(image.shape[:2], dtype=np.uint8)
        cv2.circle(mask, (x, y), r, 255, -1)
        
        # Aplicar máscara
        roi = cv2.bitwise_and(image, image, mask=mask)
        
        # Recortar región
        x1, y1 = max(0, x-r), max(0, y-r)
        x2, y2 = min(image.shape[1], x+r), min(image.shape[0], y+r)
        
        return roi[y1:y2, x1:x2]

    def _classify_coin(
        self,
        roi: np.ndarray,
        radius: float
    ) -> Tuple[str, float]:
        scores = {}
        
        for coin_type, specs in self.coin_templates.items():
            # Verificar tamaño
            size_score = self._calculate_size_score(radius, specs['diameter_mm'])
            
            # Verificar color
            color_score = self._calculate_color_score(
                roi,
                specs['color_range']
            )
            
            # Comparar características
            feature_score = self._compare_features(
                roi,
                specs['features']
            )
            
            # Combinar scores
            scores[coin_type] = (
                size_score * 0.4 +
                color_score * 0.3 +
                feature_score * 0.3
            )
        
        # Seleccionar mejor coincidencia
        best_match = max(scores.items(), key=lambda x: x[1])
        return best_match[0], best_match[1]

    def _calculate_size_score(
        self,
        detected_radius: float,
        template_diameter: float
    ) -> float:
        # Normalizar radio detectado a milímetros
        scale_factor = 19.05 / 50  # Usando penny como referencia
        detected_diameter = detected_radius * 2 * scale_factor
        
        # Calcular diferencia relativa
        difference = abs(detected_diameter - template_diameter)
        max_difference = template_diameter * 0.2  # 20% de tolerancia
        
        return max(0, 1 - (difference / max_difference))

    def _calculate_color_score(
        self,
        roi: np.ndarray,
        color_range: Tuple[Tuple[int, int, int], Tuple[int, int, int]]
    ) -> float:
        # Convertir a HSV
        hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
        
        # Crear máscara de color
        mask = cv2.inRange(hsv, color_range[0], color_range[1])
        
        # Calcular porcentaje de píxeles en rango
        total_pixels = np.count_nonzero(roi[:,:,0] > 0)
        matched_pixels = np.count_nonzero(mask)
        
        return matched_pixels / total_pixels if total_pixels > 0 else 0

    def _compare_features(
        self,
        roi: np.ndarray,
        template_features: Dict[str, Any]
    ) -> float:
        # Detectar características en ROI
        keypoints, descriptors = self.feature_detector.detectAndCompute(
            roi,
            None
        )
        
        if descriptors is None or len(descriptors) == 0:
            return 0.0
            
        # Encontrar coincidencias
        matches = self.matcher.match(
            descriptors,
            template_features['descriptors']
        )
        
        # Calcular score basado en cantidad y calidad de coincidencias
        if len(matches) == 0:
            return 0.0
            
        distances = [m.distance for m in matches]
        return 1.0 - (min(distances) / max(distances))

    def _extract_features(self, roi: np.ndarray) -> Dict[str, Any]:
        keypoints, descriptors = self.feature_detector.detectAndCompute(
            roi,
            None
        )
        
        return {
            'keypoints': keypoints,
            'descriptors': descriptors
        }

    def _refine_detections(
        self,
        detections: List[CoinDetection]
    ) -> List[CoinDetection]:
        if not detections:
            return []
            
        refined = []
        centers = np.array([d.center for d in detections])
        
        # Calcular matriz de distancias
        distances = cdist(centers, centers)
        
        for i, detection in enumerate(detections):
            # Verificar solapamiento
            overlapping = False
            for j in range(len(detections)):
                if i != j and distances[i,j] < (detection.radius + detections[j].radius):
                    # Si hay solapamiento, mantener la detección con mayor confianza
                    if detection.confidence < detections[j].confidence:
                        overlapping = True
                        break
                        
            if not overlapping:
                refined.append(detection)
        
        return refined

    def _load_coin_features(self, coin_type: str) -> Dict[str, Any]:
        # Cargar características pre-computadas
        # En una implementación real, esto cargaría desde archivos
        return {
            'descriptors': np.array([]),  # Placeholder
            'keypoints': []  # Placeholder
        }

    def _get_coin_value(self, coin_type: str) -> float:
        values = {
            'penny': 0.01,
            'nickel': 0.05,
            'dime': 0.10,
            'quarter': 0.25
        }
        return values.get(coin_type, 0.0)

# backend/app/services/detection/feature_matcher.py
class FeatureMatcher:
    def __init__(self):
        self.orb = cv2.ORB_create(
            nfeatures=1000,
            scaleFactor=1.2,
            nlevels=8,
            edgeThreshold=31,
            firstLevel=0,
            WTA_K=2,
            patchSize=31,
            fastThreshold=20
        )
        
        self.bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)

    def match_coin(
        self,
        roi: np.ndarray,
        template: np.ndarray,
        threshold: float = 0.7
    ) -> Tuple[float, List[cv2.DMatch]]:
        # Detectar características en ambas imágenes
        kp1, des1 = self.orb.detectAndCompute(roi, None)
        kp2, des2 = self.orb.detectAndCompute(template, None)
        
        if des1 is None or des2 is None:
            return 0.0, []
        
        # Encontrar coincidencias
        matches = self.bf.match(des1, des2)
        
        # Ordenar por distancia
        matches = sorted(matches, key=lambda x: x.distance)
        
        # Calcular score
        if len(matches) > 0:
            avg_distance = sum(m.distance for m in matches) / len(matches)
            score = 1.0 - (avg_distance / 100)  # Normalizar a [0,1]
        else:
            score = 0.0
            
        # Filtrar coincidencias por threshold
        good_matches = [m for m in matches if m.distance < threshold * avg_distance]
        
        return score, good_matches

# backend/app/services/detection/geometric_analyzer.py
class GeometricAnalyzer:
    def __init__(self):
        self.reference_diameters = {
            'penny': 19.05,
            'nickel': 21.21,
            'dime': 17.91,
            'quarter': 24.26
        }

    def analyze_geometry(
        self,
        contour: np.ndarray
    ) -> Dict[str, Any]:
        # Calcular propiedades geométricas
        area = cv2.contourArea(contour)
        perimeter = cv2.arcLength(contour, True)
        
        # Ajustar círculo
        (x, y), radius = cv2.minEnclosingCircle(contour)
        
        # Calcular circularidad
        circularity = 4 * np.pi * area / (perimeter * perimeter)
        
        # Calcular relación de aspecto
        _, (width, height), _ = cv2.minAreaRect(contour)
        aspect_ratio = min(width, height) / max(width, height)
        
        return {
            'center': (float(x), float(y)),
            'radius': float(radius),
            'area': float(area),
            'perimeter': float(perimeter),
            'circularity': float(circularity),
            'aspect_ratio': float(aspect_ratio)
        }

    def classify_by_size(
        self,
        radius: float,
        pixel_to_mm: float
    ) -> Tuple[str, float]:
        actual_diameter = 2 * radius * pixel_to_mm
        
        scores = {}
        for coin_type, ref_diameter in self.reference_diameters.items():
            difference = abs(actual_diameter - ref_diameter)
            scores[coin_type] = 1.0 - (difference / ref_diameter)
            
        best_match = max(scores.items(), key=lambda x: x[1])
        return best_match[0], best_match[1]

Esta cuarta etapa implementa:

  1. Detector de Monedas:

    • Detección precisa de círculos
    • Clasificación por características
    • Manejo de solapamientos
    • Validación de detecciones
  2. Feature Matching:

    • Extracción de características ORB
    • Matching robusto
    • Cálculo de scores
    • Filtrado de coincidencias
  3. Análisis Geométrico:

    • Medición precisa
    • Validación de forma
    • Clasificación por tamaño
    • Calibración automática

Características principales:

  1. Detección Robusta:

    • Múltiples criterios de clasificación
    • Manejo de oclusiones
    • Validación cruzada
    • Refinamiento de resultados
  2. Precisión Mejorada:

    • Calibración automática
    • Análisis multicaracterística
    • Validación geométrica
    • Control de calidad
  3. Optimización:

    • Procesamiento eficiente
    • Caché de características
    • Paralelización
    • Manejo de memoria

Etapa 5: Frontend Interactivo, desarrollando la interfaz de usuario con Vue 3 y Tailwind CSS.

javascript
// frontend/src/components/ImageUploader/UploadZone.vue
<template>
  <div
    class="upload-zone relative"
    @dragover.prevent="isDragging = true"
    @dragleave.prevent="isDragging = false"
    @drop.prevent="handleDrop"
  >
    <div
      class="border-2 border-dashed rounded-lg p-8"
      :class="{
        'border-blue-400 bg-blue-50': isDragging,
        'border-gray-300': !isDragging
      }"
    >
      <div class="space-y-4 text-center">
        <svg
          class="mx-auto h-12 w-12 text-gray-400"
          stroke="currentColor"
          fill="none"
          viewBox="0 0 48 48"
        >
          <path
            d="M28 8H12a4 4 0 00-4 4v20m32-12v8m0 0v8a4 4 0 01-4 4H12a4 4 0 01-4-4v-4m32-4l-3.172-3.172a4 4 0 00-5.656 0L28 28M8 32l9.172-9.172a4 4 0 015.656 0L28 28m0 0l4 4m4-24h8m-4-4v8m-12 4h.02"
            stroke-width="2"
            stroke-linecap="round"
            stroke-linejoin="round"
          />
        </svg>

        <div class="flex text-sm text-gray-600">
          <label
            class="relative cursor-pointer rounded-md font-medium text-blue-600 hover:text-blue-500 focus-within:outline-none"
          >
            <span>Sube una imagen</span>
            <input
              type="file"
              class="sr-only"
              accept="image/*"
              @change="handleFileSelect"
            >
          </label>
          <p class="pl-1">o arrastra y suelta</p>
        </div>
        <p class="text-xs text-gray-500">PNG, JPG hasta 5MB</p>
      </div>
    </div>

    <!-- Loading Overlay -->
    <div
      v-if="uploading"
      class="absolute inset-0 bg-white bg-opacity-75 flex items-center justify-center rounded-lg"
    >
      <div class="text-center">
        <svg
          class="animate-spin h-8 w-8 text-blue-500 mx-auto"
          xmlns="http://www.w3.org/2000/svg"
          fill="none"
          viewBox="0 0 24 24"
        >
          <circle
            class="opacity-25"
            cx="12"
            cy="12"
            r="10"
            stroke="currentColor"
            stroke-width="4"
          />
          <path
            class="opacity-75"
            fill="currentColor"
            d="M4 12a8 8 0 018-8V0C5.373 0 0 5.373 0 12h4zm2 5.291A7.962 7.962 0 014 12H0c0 3.042 1.135 5.824 3 7.938l3-2.647z"
          />
        </svg>
        <p class="mt-2 text-sm text-gray-600">Procesando imagen...</p>
      </div>
    </div>
  </div>
</template>

<script setup>
import { ref } from 'vue'
import { useAnalysisStore } from '@/stores/analysis'

const analysisStore = useAnalysisStore()
const isDragging = ref(false)
const uploading = ref(false)

const handleDrop = async (e) => {
  isDragging.value = false
  const file = e.dataTransfer.files[0]
  if (file && file.type.startsWith('image/')) {
    await uploadFile(file)
  }
}

const handleFileSelect = async (e) => {
  const file = e.target.files[0]
  if (file) {
    await uploadFile(file)
  }
}

const uploadFile = async (file) => {
  try {
    uploading.value = true
    await analysisStore.analyzeImage(file)
  } catch (error) {
    console.error('Upload error:', error)
  } finally {
    uploading.value = false
  }
}
</script>

// frontend/src/components/Analysis/ResultDisplay.vue
<template>
  <div class="analysis-result bg-white rounded-lg shadow-lg p-6">
    <div class="grid grid-cols-1 md:grid-cols-2 gap-6">
      <!-- Image Preview -->
      <div class="relative">
        <img
          :src="analysis.imageUrl"
          class="rounded-lg w-full"
          alt="Analyzed image"
        >
        <DetectionOverlay
          :detections="analysis.detections"
          :dimensions="analysis.dimensions"
        />
      </div>

      <!-- Results -->
      <div class="space-y-6">
        <!-- Total Value -->
        <div class="text-center p-4 bg-blue-50 rounded-lg">
          <h3 class="text-lg font-medium text-gray-900">Valor Total</h3>
          <p class="text-3xl font-bold text-blue-600">
            ${{ analysis.totalValue.toFixed(2) }}
          </p>
        </div>

        <!-- Coin Breakdown -->
        <div>
          <h3 class="text-lg font-medium text-gray-900 mb-4">
            Desglose de Monedas
          </h3>
          <div class="space-y-3">
            <div
              v-for="(count, type) in analysis.coinCount"
              :key="type"
              class="flex justify-between items-center"
            >
              <span class="text-gray-600">
                {{ formatCoinType(type) }}
              </span>
              <div class="flex items-center space-x-2">
                <span class="text-gray-900 font-medium">{{ count }}</span>
                <span class="text-sm text-gray-500">
                  (${{ (getCoinValue(type) * count).toFixed(2) }})
                </span>
              </div>
            </div>
          </div>
        </div>

        <!-- Confidence Score -->
        <div>
          <h3 class="text-lg font-medium text-gray-900 mb-2">
            Confianza de Detección
          </h3>
          <div class="relative pt-1">
            <div class="overflow-hidden h-2 text-xs flex rounded bg-blue-200">
              <div
                :style="`width: ${analysis.confidenceScore * 100}%`"
                class="shadow-none flex flex-col text-center whitespace-nowrap text-white justify-center bg-blue-500"
              />
            </div>
            <div class="flex justify-between text-sm mt-1">
              <span class="text-gray-600">
                {{ (analysis.confidenceScore * 100).toFixed(0) }}%
              </span>
              <span
                class="text-gray-500"
                :class="{
                  'text-green-500': analysis.confidenceScore > 0.8,
                  'text-yellow-500': analysis.confidenceScore > 0.6 && analysis.confidenceScore <= 0.8,
                  'text-red-500': analysis.confidenceScore <= 0.6
                }"
              >
                {{ getConfidenceLabel(analysis.confidenceScore) }}
              </span>
            </div>
          </div>
        </div>

        <!-- Actions -->
        <div class="flex space-x-4">
          <button
            @click="downloadReport"
            class="flex-1 bg-blue-600 text-white px-4 py-2 rounded-lg hover:bg-blue-700 transition-colors"
          >
            Descargar Reporte
          </button>
          <button
            @click="$emit('retry')"
            class="flex-1 bg-gray-100 text-gray-700 px-4 py-2 rounded-lg hover:bg-gray-200 transition-colors"
          >
            Nuevo Análisis
          </button>
        </div>
      </div>
    </div>
  </div>
</template>

<script setup>
import { computed } from 'vue'
import DetectionOverlay from './DetectionOverlay.vue'

const props = defineProps({
  analysis: {
    type: Object,
    required: true
  }
})

const emit = defineEmits(['retry'])

const coinValues = {
  'penny': 0.01,
  'nickel': 0.05,
  'dime': 0.10,
  'quarter': 0.25
}

const formatCoinType = (type) => {
  const names = {
    'penny': 'Penny (1¢)',
    'nickel': 'Nickel (5¢)',
    'dime': 'Dime (10¢)',
    'quarter': 'Quarter (25¢)'
  }
  return names[type] || type
}

const getCoinValue = (type) => coinValues[type] || 0

const getConfidenceLabel = (score) => {
  if (score > 0.8) return 'Alta Precisión'
  if (score > 0.6) return 'Precisión Moderada'
  return 'Baja Precisión'
}

const downloadReport = async () => {
  try {
    const response = await fetch(`/api/reports/${props.analysis.id}`)
    const blob = await response.blob()
    const url = window.URL.createObjectURL(blob)
    const a = document.createElement('a')
    a.href = url
    a.download = `coin-analysis-${props.analysis.id}.pdf`
    document.body.appendChild(a)
    a.click()
    window.URL.revokeObjectURL(url)
    document.body.removeChild(a)
  } catch (error) {
    console.error('Error downloading report:', error)
  }
}
</script>

// frontend/src/components/Analysis/DetectionOverlay.vue
<template>
  <div
    class="detection-overlay absolute inset-0"
    :style="overlayStyle"
  >
    <svg
      class="w-full h-full"
      :viewBox="`0 0 ${dimensions.width} ${dimensions.height}`"
    >
      <g
        v-for="detection in detections"
        :key="detection.id"
      >
        <!-- Círculo de detección -->
        <circle
          :cx="detection.center.x"
          :cy="detection.center.y"
          :r="detection.radius"
          :class="getDetectionClass(detection)"
          fill="none"
          stroke-width="2"
        />
        
        <!-- Etiqueta de valor -->
        <text
          :x="detection.center.x"
          :y="detection.center.y"
          text-anchor="middle"
          dominant-baseline="middle"
          class="text-xs fill-current font-bold"
          :class="getTextClass(detection)"
        >
          {{ formatValue(detection.value) }}
        </text>
      </g>
    </svg>
  </div>
</template>

<script setup>
import { computed } from 'vue'

const props = defineProps({
  detections: {
    type: Array,
    required: true
  },
  dimensions: {
    type: Object,
    required: true
  }
})

const overlayStyle = computed(() => ({
  'pointer-events': 'none'
}))

const getDetectionClass = (detection) => {
  const confidenceColor = detection.confidence > 0.8
    ? 'stroke-green-500'
    : detection.confidence > 0.6
      ? 'stroke-yellow-500'
      : 'stroke-red-500'
  
  return `${confidenceColor} opacity-70`
}

const getTextClass = (detection) => {
  return detection.confidence > 0.6
    ? 'text-white'
    : 'text-gray-900'
}

const formatValue = (value) => {
  return `${value}¢`
}
</script>

// frontend/src/stores/analysis.js
import { defineStore } from 'pinia'
import { ref } from 'vue'
import api from '@/api'

export const useAnalysisStore = defineStore('analysis', () => {
  const currentAnalysis = ref(null)
  const loading = ref(false)
  const error = ref(null)
  const history = ref([])

  const analyzeImage = async (file) => {
    try {
      loading.value = true
      error.value = null
      
      const formData = new FormData()
      formData.append('image', file)
      
      const response = await api.post('/analyze', formData)
      currentAnalysis.value = response.data
      history.value.unshift(response.data)
      
      return response.data
    } catch (err) {
      error.value = err.message
      throw err
    } finally {
      loading.value = false
    }
  }

  const clearAnalysis = () => {
    currentAnalysis.value = null
    error.value = null
  }

  return {
    currentAnalysis,
    loading,
    error,
    history,
    analyzeImage,
    clearAnalysis
  }
})
</script>

Esta quinta etapa implementa:

  1. Componentes Interactivos:

    • Zona de upload con drag & drop
    • Visualización de resultados
    • Overlay de detecciones
    • Indicadores de confianza
  2. Gestión de Estado:

    • Store con Pinia
    • Manejo de análisis actual
    • Historial de análisis
    • Estado de carga
  3. UI/UX:

    • Feedback visual
    • Animaciones
    • Responsive design
    • Accesibilidad

Características principales:

  1. Experiencia de Usuario:

    • Interfaz intuitiva
    • Feedback en tiempo real
    • Visualizaciones claras
    • Acciones contextuales
  2. Visualización de Datos:

    • Overlay de detecciones
    • Métricas de confianza
    • Desglose de monedas
    • Exportación de resultados
  3. Optimización Frontend:

    • Componentes modulares
    • Gestión eficiente de estado
    • Lazy loading
    • Caché de resultados

Etapa 6: Testing y Optimización, enfocándonos en pruebas exhaustivas y mejoras de rendimiento.

python
# backend/tests/unit/test_coin_detector.py
import pytest
import numpy as np
import cv2
from app.services.detection.coin_detector import CoinDetector, CoinDetection
from unittest.mock import Mock, patch

class TestCoinDetector:
    @pytest.fixture
    def detector(self):
        return CoinDetector()

    @pytest.fixture
    def sample_image(self):
        # Crear imagen sintética para testing
        image = np.zeros((500, 500, 3), dtype=np.uint8)
        # Dibujar círculos simulando monedas
        cv2.circle(image, (100, 100), 50, (120, 120, 120), -1)  # Nickel
        cv2.circle(image, (300, 300), 40, (60, 40, 20), -1)     # Penny
        return image

    def test_detect_coins(self, detector, sample_image):
        detections = detector.detect_coins(sample_image)
        assert len(detections) > 0
        assert all(isinstance(d, CoinDetection) for d in detections)
        assert all(d.confidence > 0.5 for d in detections)

    def test_preprocess_image(self, detector, sample_image):
        processed = detector._preprocess_image(sample_image)
        assert isinstance(processed, np.ndarray)
        assert len(processed.shape) == 2  # Debe ser imagen en escala de grises
        assert processed.dtype == np.uint8

    @pytest.mark.parametrize("coin_type,expected_value", [
        ("penny", 0.01),
        ("nickel", 0.05),
        ("dime", 0.10),
        ("quarter", 0.25)
    ])
    def test_get_coin_value(self, detector, coin_type, expected_value):
        value = detector._get_coin_value(coin_type)
        assert value == expected_value

    def test_handle_poor_lighting(self, detector):
        dark_image = np.ones((500, 500, 3), dtype=np.uint8) * 30
        detections = detector.detect_coins(dark_image)
        assert len(detections) == 0  # No debería detectar monedas en imagen oscura

    def test_overlapping_coins(self, detector):
        image = np.zeros((500, 500, 3), dtype=np.uint8)
        # Dibujar monedas solapadas
        cv2.circle(image, (200, 200), 50, (120, 120, 120), -1)
        cv2.circle(image, (220, 220), 45, (60, 40, 20), -1)
        
        detections = detector.detect_coins(image)
        # Debería detectar la moneda más confiable
        assert len(detections) <= 2

# backend/tests/unit/test_image_processor.py
import pytest
from app.services.image_processor import ImageProcessor
import numpy as np
import cv2

class TestImageProcessor:
    @pytest.fixture
    def processor(self):
        return ImageProcessor()

    def test_image_quality_assessment(self, processor):
        # Crear imagen de prueba con diferentes calidades
        good_image = np.ones((100, 100, 3), dtype=np.uint8) * 128
        bad_image = np.ones((100, 100, 3), dtype=np.uint8) * 30

        good_score = processor._assess_quality(good_image)
        bad_score = processor._assess_quality(bad_image)

        assert good_score > bad_score
        assert 0 <= good_score <= 1
        assert 0 <= bad_score <= 1

    def test_image_enhancement(self, processor):
        dark_image = np.ones((100, 100, 3), dtype=np.uint8) * 50
        processed = processor._enhance_brightness_contrast(dark_image)
        
        assert processed.mean() > dark_image.mean()
        assert processed.shape == dark_image.shape

# backend/tests/integration/test_analysis_pipeline.py
import pytest
from app.services.coin_detector import CoinDetector
from app.services.image_processor import ImageProcessor
from app.services.perspective_corrector import PerspectiveCorrector
import cv2
import os

@pytest.mark.integration
class TestAnalysisPipeline:
    @pytest.fixture
    def setup_pipeline(self):
        return {
            'detector': CoinDetector(),
            'processor': ImageProcessor(),
            'corrector': PerspectiveCorrector()
        }

    def test_full_analysis_pipeline(self, setup_pipeline):
        # Cargar imagen de prueba
        image_path = os.path.join('tests', 'fixtures', 'sample_coins.jpg')
        image = cv2.imread(image_path)
        
        # Procesar imagen
        processor = setup_pipeline['processor']
        processed_image, metadata = processor.process_image(image)
        
        # Corregir perspectiva
        corrector = setup_pipeline['corrector']
        corrected_image, _ = corrector.correct_perspective(processed_image, [])
        
        # Detectar monedas
        detector = setup_pipeline['detector']
        detections = detector.detect_coins(corrected_image)
        
        assert len(detections) > 0
        assert metadata['quality_score'] > 0.5
        assert all(d.confidence > 0.6 for d in detections)

# frontend/tests/unit/components/ImageUploader.spec.js
import { mount } from '@vue/test-utils'
import { createTestingPinia } from '@pinia/testing'
import ImageUploader from '@/components/ImageUploader/UploadZone.vue'

describe('ImageUploader.vue', () => {
    const createWrapper = () => {
        return mount(ImageUploader, {
            global: {
                plugins: [createTestingPinia()]
            }
        })
    }

    it('handles file drop correctly', async () => {
        const wrapper = createWrapper()
        const file = new File([''], 'test.jpg', { type: 'image/jpeg' })
        
        const dataTransfer = {
            files: [file]
        }
        
        await wrapper.trigger('drop', { dataTransfer })
        
        // Verificar que el store fue llamado
        const store = useAnalysisStore()
        expect(store.analyzeImage).toHaveBeenCalledWith(file)
    })

    it('shows loading state during upload', async () => {
        const wrapper = createWrapper()
        
        // Simular carga
        await wrapper.setData({ uploading: true })
        
        const loadingElement = wrapper.find('.loading-overlay')
        expect(loadingElement.exists()).toBe(true)
    })
})

# frontend/tests/unit/components/ResultDisplay.spec.js
import { mount } from '@vue/test-utils'
import ResultDisplay from '@/components/Analysis/ResultDisplay.vue'

describe('ResultDisplay.vue', () => {
    const mockAnalysis = {
        totalValue: 1.85,
        coinCount: {
            'penny': 5,
            'nickel': 2,
            'dime': 3,
            'quarter': 5
        },
        confidenceScore: 0.85,
        detections: []
    }

    it('displays total value correctly', () => {
        const wrapper = mount(ResultDisplay, {
            props: {
                analysis: mockAnalysis
            }
        })

        const totalValue = wrapper.find('.total-value')
        expect(totalValue.text()).toContain('$1.85')
    })

    it('calculates coin breakdown correctly', () => {
        const wrapper = mount(ResultDisplay, {
            props: {
                analysis: mockAnalysis
            }
        })

        const breakdownItems = wrapper.findAll('.coin-breakdown-item')
        expect(breakdownItems).toHaveLength(4)
    })
})

# frontend/tests/e2e/specs/analysis.spec.js
describe('Coin Analysis', () => {
    beforeEach(() => {
        cy.visit('/')
    })

    it('completes full analysis flow', () => {
        // Cargar imagen
        cy.get('.upload-zone').attachFile('sample_coins.jpg')

        // Verificar loading state
        cy.get('.loading-overlay').should('be.visible')
        cy.get('.loading-overlay').should('not.exist')

        // Verificar resultados
        cy.get('.analysis-result').should('be.visible')
        cy.get('.total-value').should('exist')
        cy.get('.coin-breakdown').should('exist')

        // Verificar acciones
        cy.get('.download-report').should('be.enabled')
        cy.get('.new-analysis').should('be.enabled')
    })

    it('handles errors gracefully', () => {
        // Intentar cargar archivo inválido
        cy.get('.upload-zone').attachFile('invalid.txt')

        // Verificar mensaje de error
        cy.get('.error-message').should('be.visible')
        cy.contains('Formato de archivo no soportado')
    })
})

# Optimizaciones de Performance

```python
# backend/app/services/optimization/caching.py
from functools import lru_cache
from typing import Dict, Any
import hashlib

class AnalysisCache:
    def __init__(self):
        self.cache = {}

    @lru_cache(maxsize=100)
    def get_cached_analysis(self, image_hash: str) -> Dict[str, Any]:
        return self.cache.get(image_hash)

    def cache_analysis(
        self,
        image_bytes: bytes,
        analysis_result: Dict[str, Any]
    ) -> None:
        image_hash = self._calculate_hash(image_bytes)
        self.cache[image_hash] = analysis_result

    def _calculate_hash(self, image_bytes: bytes) -> str:
        return hashlib.md5(image_bytes).hexdigest()

# backend/app/services/optimization/batch_processor.py
class BatchProcessor:
    def __init__(self, max_batch_size: int = 10):
        self.max_batch_size = max_batch_size
        self.current_batch = []
        self.results = []

    async def add_to_batch(
        self,
        image_bytes: bytes,
        metadata: Dict[str, Any]
    ) -> None:
        self.current_batch.append((image_bytes, metadata))
        
        if len(self.current_batch) >= self.max_batch_size:
            await self.process_batch()

    async def process_batch(self) -> List[Dict[str, Any]]:
        if not self.current_batch:
            return []

        tasks = []
        for image_bytes, metadata in self.current_batch:
            task = asyncio.create_task(
                self._process_single(image_bytes, metadata)
            )
            tasks.append(task)

        results = await asyncio.gather(*tasks)
        self.current_batch = []
        self.results.extend(results)
        return results

    async def _process_single(
        self,
        image_bytes: bytes,
        metadata: Dict[str, Any]
    ) -> Dict[str, Any]:
        # Implementar procesamiento individual
        pass

Esta sexta etapa implementa:

  1. Testing Exhaustivo:

    • Pruebas unitarias
    • Pruebas de integración
    • Tests end-to-end
    • Pruebas de rendimiento
  2. Optimizaciones:

    • Sistema de caché
    • Procesamiento por lotes
    • Optimización de memoria
    • Mejoras de rendimiento
  3. Testing Frontend:

    • Tests de componentes Vue
    • Tests de integración
    • Tests E2E con Cypress
    • Pruebas de usabilidad

Características principales:

  1. Cobertura de Tests:

    • Casos límite
    • Manejo de errores
    • Diferentes escenarios
    • Validación de resultados
  2. Performance:

    • Caching inteligente
    • Procesamiento asíncrono
    • Optimización de recursos
    • Monitoreo de rendimiento
  3. Calidad de Código:

    • Clean code
    • Patrones de diseño
    • Documentación
    • Mantenibilidad

Etapa 7: Deployment y Monitoreo, enfocándonos en la infraestructura y observabilidad del sistema.

yaml
# infrastructure/terraform/main.tf
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 4.0"
    }
  }
}

# VPC Configuration
resource "aws_vpc" "main" {
  cidr_block           = "10.0.0.0/16"
  enable_dns_hostnames = true
  enable_dns_support   = true

  tags = {
    Name        = "${var.project_name}-vpc"
    Environment = var.environment
  }
}

# ECS Cluster
resource "aws_ecs_cluster" "main" {
  name = "${var.project_name}-cluster"
  
  setting {
    name  = "containerInsights"
    value = "enabled"
  }

  capacity_providers = ["FARGATE", "FARGATE_SPOT"]

  default_capacity_provider_strategy {
    capacity_provider = "FARGATE"
    weight           = 1
  }
}

# Task Definition
resource "aws_ecs_task_definition" "app" {
  family                   = "${var.project_name}-task"
  requires_compatibilities = ["FARGATE"]
  network_mode            = "awsvpc"
  cpu                     = var.task_cpu
  memory                  = var.task_memory

  container_definitions = jsonencode([
    {
      name  = "web"
      image = "${var.ecr_repository_url}:latest"
      portMappings = [
        {
          containerPort = 8000
          hostPort      = 8000
          protocol      = "tcp"
        }
      ]
      environment = [
        {
          name  = "ENVIRONMENT"
          value = var.environment
        }
      ]
      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = "/ecs/${var.project_name}"
          "awslogs-region"        = var.aws_region
          "awslogs-stream-prefix" = "ecs"
        }
      }
    }
  ])
}

# CloudWatch Dashboard
resource "aws_cloudwatch_dashboard" "main" {
  dashboard_name = "${var.project_name}-dashboard"

  dashboard_body = jsonencode({
    widgets = [
      {
        type   = "metric"
        x      = 0
        y      = 0
        width  = 12
        height = 6

        properties = {
          metrics = [
            ["AWS/ECS", "CPUUtilization", "ServiceName", "${var.project_name}-service", "ClusterName", "${var.project_name}-cluster"],
            [".", "MemoryUtilization", ".", ".", ".", "."]
          ]
          period = 300
          stat   = "Average"
          region = var.aws_region
          title  = "ECS Service Metrics"
        }
      },
      {
        type   = "metric"
        x      = 12
        y      = 0
        width  = 12
        height = 6

        properties = {
          metrics = [
            ["${var.project_name}/API", "ProcessingTime", "Environment", var.environment],
            [".", "SuccessRate", ".", "."]
          ]
          period = 300
          stat   = "Average"
          region = var.aws_region
          title  = "Application Metrics"
        }
      }
    ]
  })
}

# Alerting
resource "aws_cloudwatch_metric_alarm" "high_cpu" {
  alarm_name          = "${var.project_name}-high-cpu"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "2"
  metric_name        = "CPUUtilization"
  namespace          = "AWS/ECS"
  period             = "300"
  statistic          = "Average"
  threshold          = "80"
  alarm_description  = "CPU utilization is too high"
  alarm_actions      = [aws_sns_topic.alerts.arn]

  dimensions = {
    ClusterName = aws_ecs_cluster.main.name
    ServiceName = "${var.project_name}-service"
  }
}

# monitoring/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'coin-counter'
    static_configs:
      - targets: ['localhost:8000']
    metrics_path: '/metrics'
    scheme: 'http'

  - job_name: 'node-exporter'
    static_configs:
      - targets: ['node-exporter:9100']

# monitoring/grafana/dashboards/application.json
{
  "annotations": {
    "list": []
  },
  "editable": true,
  "fiscalYearStartMonth": 0,
  "graphTooltip": 0,
  "links": [],
  "panels": [
    {
      "datasource": "Prometheus",
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "palette-classic"
          },
          "custom": {
            "axisCenteredZero": false,
            "axisColorMode": "text",
            "axisLabel": "",
            "axisPlacement": "auto",
            "barAlignment": 0,
            "drawStyle": "line",
            "fillOpacity": 10,
            "gradientMode": "none",
            "hideFrom": {
              "legend": false,
              "tooltip": false,
              "viz": false
            },
            "lineInterpolation": "linear",
            "lineWidth": 1,
            "pointSize": 5,
            "scaleDistribution": {
              "type": "linear"
            },
            "showPoints": "never",
            "spanNulls": false,
            "stacking": {
              "group": "A",
              "mode": "none"
            },
            "thresholdsStyle": {
              "mode": "off"
            }
          },
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": null
              }
            ]
          },
          "unit": "short"
        },
        "overrides": []
      },
      "gridPos": {
        "h": 8,
        "w": 12,
        "x": 0,
        "y": 0
      },
      "id": 1,
      "options": {
        "legend": {
          "calcs": [],
          "displayMode": "list",
          "placement": "bottom",
          "showLegend": true
        },
        "tooltip": {
          "mode": "single",
          "sort": "none"
        }
      },
      "targets": [
        {
          "datasource": "Prometheus",
          "expr": "rate(coin_counter_processing_duration_seconds_sum[5m])",
          "legendFormat": "Processing Time",
          "refId": "A"
        }
      ],
      "title": "Image Processing Duration",
      "type": "timeseries"
    }
  ],
  "schemaVersion": 38,
  "style": "dark",
  "tags": [],
  "templating": {
    "list": []
  },
  "time": {
    "from": "now-6h",
    "to": "now"
  },
  "timepicker": {},
  "timezone": "",
  "title": "Coin Counter Application",
  "uid": "coin-counter",
  "version": 1,
  "weekStart": ""
}

# docker-compose.prod.yml
version: '3.8'

services:
  app:
    image: ${ECR_REPOSITORY_URL}:${IMAGE_TAG}
    ports:
      - "8000:8000"
    environment:
      - AWS_DEFAULT_REGION=${AWS_REGION}
      - ENVIRONMENT=production
    logging:
      driver: awslogs
      options:
        awslogs-group: /ecs/coin-counter
        awslogs-region: ${AWS_REGION}
        awslogs-stream-prefix: ecs

  prometheus:
    image: prom/prometheus
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana
    volumes:
      - ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards
      - ./monitoring/grafana/provisioning:/etc/grafana/provisioning
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
    ports:
      - "3000:3000"
    depends_on:
      - prometheus

# backend/app/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time

# Contadores
image_processing_total = Counter(
    'coin_counter_images_processed_total',
    'Total number of images processed'
)

coin_detection_total = Counter(
    'coin_counter_coins_detected_total',
    'Total number of coins detected'
)

# Histogramas
processing_duration = Histogram(
    'coin_counter_processing_duration_seconds',
    'Time spent processing images',
    buckets=[.1, .5, 1, 2.5, 5, 10]
)

detection_confidence = Histogram(
    'coin_counter_detection_confidence',
    'Confidence scores of coin detections',
    buckets=[.1, .2, .3, .4, .5, .6, .7, .8, .9, 1]
)

# Gauges
active_processing = Gauge(
    'coin_counter_active_processing',
    'Number of images currently being processed'
)

class MetricsMiddleware:
    async def __call__(self, request, call_next):
        active_processing.inc()
        start_time = time.time()
        
        try:
            response = await call_next(request)
            
            if request.url.path.startswith('/api/analyze'):
                image_processing_total.inc()
                processing_duration.observe(time.time() - start_time)
                
            return response
        finally:
            active_processing.dec()

# scripts/deploy_monitoring.sh
#!/bin/bash

# Verificar variables requeridas
if [ -z "$AWS_REGION" ] || [ -z "$CLUSTER_NAME" ]; then
    echo "AWS_REGION and CLUSTER_NAME must be set"
    exit 1
fi

# Desplegar CloudWatch dashboard
aws cloudformation deploy \
    --template-file infrastructure/cloudformation/monitoring.yml \
    --stack-name coin-counter-monitoring \
    --parameter-overrides \
        Environment=${ENVIRONMENT} \
        ClusterName=${CLUSTER_NAME} \
    --capabilities CAPABILITY_IAM \
    --region ${AWS_REGION}

# Configurar alertas
aws cloudwatch put-metric-alarm \
    --alarm-name coin-counter-high-latency \
    --alarm-description "High processing latency detected" \
    --metric-name processing_duration_seconds \
    --namespace CoinCounter \
    --statistic Average \
    --period 300 \
    --threshold 5 \
    --comparison-operator GreaterThanThreshold \
    --evaluation-periods 2 \
    --alarm-actions ${SNS_TOPIC_ARN} \
    --region ${AWS_REGION}

echo "Monitoring configuration completed!"

Esta séptima etapa implementa:

  1. Infraestructura como Código:

    • Configuración Terraform
    • ECS Fargate
    • Networking
    • Auto-scaling
  2. Monitoreo:

    • CloudWatch Dashboards
    • Prometheus/Grafana
    • Métricas personalizadas
    • Alertas
  3. Deployment:

    • Docker Compose
    • ECS Task Definitions
    • CI/CD Pipeline
    • Logging

Características principales:

  1. Observabilidad:

    • Métricas de rendimiento
    • Logs centralizados
    • Dashboards en tiempo real
    • Sistema de alertas
  2. Escalabilidad:

    • Auto-scaling
    • Load balancing
    • Redundancia
    • Alta disponibilidad
  3. Seguridad:

    • VPC configurada
    • IAM roles
    • Secretos seguros
    • Monitoreo de seguridad

Con esto completamos las 7 etapas del proyecto. La aplicación ahora está lista para:

  1. Desarrollo:

    • Código estructurado
    • Tests completos
    • Documentation clara
    • Estándares de calidad
  2. Operación:

    • Deployment automatizado
    • Monitoreo continuo
    • Alertas configuradas
    • Mantenimiento simplificado
  3. Escalamiento:

    • Infraestructura elástica
    • Performance optimizado
    • Recursos balanceados
    • Costos controlados