Skip to content
English
On this page

Ejercicio: Sistema de Evaluación de Riesgo Crediticio con Bedrock

Descripción del Escenario

Desarrollaremos un sistema de evaluación de riesgo crediticio que utilizará Amazon Bedrock para analizar perfiles de solicitantes y determinar su nivel de riesgo para préstamos. El sistema constará de:

  • Frontend en Vue.js para ingreso y visualización de datos
  • Backend en Python (FastAPI) para procesamiento
  • Integración con Amazon Bedrock para análisis de riesgo
  • Sistema de puntuación personalizado
  • Dashboard para visualización de métricas
  • API RESTful para integraciones
  • Sistema de alertas y notificaciones

Estructura del Proyecto

loan-risk-analyzer/
├── frontend/
│   ├── src/
│   │   ├── components/
│   │   │   ├── ApplicationForm/
│   │   │   ├── RiskDashboard/
│   │   │   └── common/
│   │   ├── views/
│   │   ├── store/
│   │   ├── router/
│   │   └── utils/
│   └── tests/

├── backend/
│   ├── app/
│   │   ├── api/
│   │   │   ├── routes/
│   │   │   └── endpoints/
│   │   ├── core/
│   │   │   ├── config.py
│   │   │   └── security.py
│   │   ├── models/
│   │   ├── schemas/
│   │   ├── services/
│   │   │   ├── bedrock_service.py
│   │   │   ├── risk_analyzer.py
│   │   │   └── notification_service.py
│   │   └── utils/
│   └── tests/

├── infrastructure/
│   ├── terraform/
│   ├── docker/
│   └── scripts/

└── docs/

Etapas del Ejercicio

Etapa 1: Configuración Base y Modelos de Datos

  • Configurar proyecto Python con FastAPI
  • Configurar proyecto Vue.js
  • Definir modelos y schemas
  • Configurar base de datos
  • Implementar validaciones básicas

Etapa 2: Integración con Amazon Bedrock

  • Configurar cliente Bedrock
  • Implementar prompt engineering
  • Crear servicio de análisis
  • Definir lógica de evaluación
  • Implementar cache de resultados

Etapa 3: Frontend - Formularios y Visualización

  • Crear formulario de aplicación
  • Implementar validaciones
  • Desarrollar dashboard
  • Crear componentes reutilizables
  • Implementar sistema de notificaciones

Etapa 4: Backend - Lógica de Negocio

  • Implementar cálculo de DTI
  • Crear sistema de scoring
  • Desarrollar reglas de negocio
  • Implementar histórico de evaluaciones
  • Crear API endpoints

Etapa 5: Sistema de Evaluación

  • Implementar motor de reglas
  • Crear matriz de decisión
  • Desarrollar análisis predictivo
  • Implementar umbrales dinámicos
  • Crear sistema de recomendaciones

Etapa 6: Análisis y Reporting

  • Implementar métricas de rendimiento
  • Crear reportes automáticos
  • Desarrollar sistema de alertas
  • Implementar exportación de datos
  • Crear dashboard administrativo

Etapa 7: Testing y Deployment

  • Implementar pruebas unitarias
  • Crear pruebas de integración
  • Configurar CI/CD
  • Implementar monitoreo
  • Documentar APIs y sistemas

Detalles Técnicos Clave

Cálculo del DTI

python
def calculate_dti(monthly_income: float, total_debt: float) -> float:
    """
    Calcula el ratio Deuda/Ingreso (DTI)
    """
    if monthly_income <= 0:
        raise ValueError("El ingreso mensual debe ser mayor a 0")
    return (total_debt / monthly_income) * 100

Matriz de Evaluación de Riesgo

FactorPesoBajo RiesgoMedio RiesgoAlto Riesgo
Score Crediticio30%>750650-750<650
DTI25%<30%30-40%>40%
Antigüedad Laboral20%>5 años2-5 años<2 años
Ingreso15%>$5,000$3,000-$5,000<$3,000
Vivienda10%PropiaFamiliarRentada

Prompt para Bedrock

python
def create_risk_assessment_prompt(applicant_data: dict) -> str:
    return f"""Analiza el siguiente perfil para un préstamo:
    - Nombre: {applicant_data['nombre']}
    - Score Crediticio: {applicant_data['score']}
    - DTI: {applicant_data['dti']}%
    - Antigüedad Laboral: {applicant_data['antiguedad']}
    - Ingreso Mensual: ${applicant_data['ingreso']}
    - Vivienda: {applicant_data['vivienda']}
    
    Considera los siguientes factores para el análisis:
    1. Historial crediticio y capacidad de pago
    2. Estabilidad laboral y nivel de ingresos
    3. Ratio de endeudamiento actual
    4. Perfil de riesgo general
    
    Proporciona una evaluación detallada del riesgo y una recomendación."""

Ejemplo de Respuesta Bedrock

json
{
  "risk_assessment": {
    "risk_level": "MEDIO",
    "confidence_score": 0.75,
    "factors": {
      "credit_history": {
        "score": 0.8,
        "details": "Buen historial crediticio con score por encima del promedio"
      },
      "income_stability": {
        "score": 0.7,
        "details": "Empleo estable con ingresos moderados"
      },
      "debt_burden": {
        "score": 0.6,
        "details": "DTI en rango medio, indica capacidad de pago ajustada"
      }
    },
    "recommendation": "Aprobación condicional con monitoreo de DTI",
    "suggested_terms": {
      "max_amount": 25000,
      "interest_rate": "12.5%",
      "term_months": 36
    }
  }
}

Consideraciones de Implementación

  1. Seguridad

    • Implementar autenticación JWT
    • Validar inputs
    • Encriptar datos sensibles
    • Manejar rate limiting
  2. Performance

    • Implementar caching
    • Optimizar queries
    • Usar lazy loading
    • Implementar paginación
  3. Escalabilidad

    • Diseñar para carga variable
    • Implementar microservicios
    • Usar contenedores
    • Configurar auto-scaling
  4. Monitoreo

    • Logging centralizado
    • Métricas de performance
    • Alertas automáticas
    • Dashboard operativo

Entregables Esperados

  1. Frontend

    • Interfaz de usuario responsive
    • Dashboard interactivo
    • Formularios validados
    • Sistema de notificaciones
  2. Backend

    • API RESTful documentada
    • Sistema de evaluación
    • Integración con Bedrock
    • Manejo de errores
  3. Documentación

    • API docs
    • Guía de implementación
    • Manual de usuario
    • Documentación técnica
  4. Testing

    • Suite de pruebas unitarias
    • Pruebas de integración
    • Pruebas de carga
    • Plan de QA

Etapa 1: Configuración Base y Modelos de Datos.

python
# Backend: FastAPI Models y Schemas

# backend/app/models/applicant.py
from sqlalchemy import Column, Integer, String, Float, Date, Enum
from sqlalchemy.sql import func
from enum import Enum as PyEnum
from .base import Base

class MaritalStatus(PyEnum):
    SINGLE = "SINGLE"
    MARRIED = "MARRIED"
    DIVORCED = "DIVORCED"
    WIDOWED = "WIDOWED"

class HomeStatus(PyEnum):
    OWNED = "OWNED"
    RENTED = "RENTED"
    FAMILY = "FAMILY"

class Applicant(Base):
    __tablename__ = "applicants"

    id = Column(Integer, primary_key=True, index=True)
    full_name = Column(String(100), nullable=False)
    identification = Column(String(20), unique=True, nullable=False)
    age = Column(Integer, nullable=False)
    monthly_income = Column(Float, nullable=False)
    marital_status = Column(Enum(MaritalStatus), nullable=False)
    occupation = Column(String(100), nullable=False)
    work_experience_years = Column(Float, nullable=False)
    credit_score = Column(Integer, nullable=False)
    home_status = Column(Enum(HomeStatus), nullable=False)
    dti = Column(Float, nullable=True)  # Calculado automáticamente
    created_at = Column(Date, server_default=func.now())
    updated_at = Column(Date, onupdate=func.now())

# backend/app/schemas/applicant.py
from pydantic import BaseModel, Field, validator
from datetime import date
from typing import Optional
from enum import Enum

class MaritalStatus(str, Enum):
    SINGLE = "SINGLE"
    MARRIED = "MARRIED"
    DIVORCED = "DIVORCED"
    WIDOWED = "WIDOWED"

class HomeStatus(str, Enum):
    OWNED = "OWNED"
    RENTED = "RENTED"
    FAMILY = "FAMILY"

class ApplicantBase(BaseModel):
    full_name: str = Field(..., min_length=3, max_length=100)
    identification: str = Field(..., regex="^DNI-[0-9]{8}$")
    age: int = Field(..., ge=18, le=80)
    monthly_income: float = Field(..., gt=0)
    marital_status: MaritalStatus
    occupation: str = Field(..., min_length=3, max_length=100)
    work_experience_years: float = Field(..., ge=0)
    credit_score: int = Field(..., ge=300, le=850)
    home_status: HomeStatus

class ApplicantCreate(ApplicantBase):
    pass

class ApplicantUpdate(ApplicantBase):
    pass

class ApplicantInDB(ApplicantBase):
    id: int
    dti: Optional[float]
    created_at: date
    updated_at: Optional[date]

    class Config:
        orm_mode = True

# backend/app/core/config.py
from pydantic_settings import BaseSettings
from typing import Optional

class Settings(BaseSettings):
    PROJECT_NAME: str = "Loan Risk Analyzer"
    VERSION: str = "1.0.0"
    API_V1_STR: str = "/api/v1"
    
    POSTGRES_SERVER: str
    POSTGRES_USER: str
    POSTGRES_PASSWORD: str
    POSTGRES_DB: str
    SQLALCHEMY_DATABASE_URI: Optional[str] = None

    AWS_ACCESS_KEY_ID: str
    AWS_SECRET_ACCESS_KEY: str
    AWS_REGION: str = "us-east-1"
    
    BEDROCK_MODEL_ID: str = "anthropic.claude-v2"
    
    @validator("SQLALCHEMY_DATABASE_URI", pre=True)
    def assemble_db_connection(cls, v: Optional[str], values: dict) -> str:
        if v:
            return v
        return f"postgresql://{values['POSTGRES_USER']}:{values['POSTGRES_PASSWORD']}@{values['POSTGRES_SERVER']}/{values['POSTGRES_DB']}"

    class Config:
        env_file = ".env"

settings = Settings()

# backend/app/db/session.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings

engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# backend/app/api/deps.py
from typing import Generator
from app.db.session import SessionLocal

def get_db() -> Generator:
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Frontend: Vue.js Components Base

# frontend/src/components/ApplicationForm/index.vue
<template>
  <div class="application-form">
    <v-form @submit.prevent="handleSubmit" v-model="valid">
      <v-container>
        <v-row>
          <v-col cols="12" md="6">
            <v-text-field
              v-model="formData.full_name"
              :rules="nameRules"
              label="Nombre Completo"
              required
            ></v-text-field>
          </v-col>
          <v-col cols="12" md="6">
            <v-text-field
              v-model="formData.identification"
              :rules="idRules"
              label="Identificación"
              hint="Formato: DNI-12345678"
              required
            ></v-text-field>
          </v-col>
        </v-row>
        <v-row>
          <v-col cols="12" md="4">
            <v-text-field
              v-model.number="formData.age"
              type="number"
              :rules="ageRules"
              label="Edad"
              required
            ></v-text-field>
          </v-col>
          <v-col cols="12" md="4">
            <v-text-field
              v-model.number="formData.monthly_income"
              type="number"
              :rules="incomeRules"
              label="Ingreso Mensual"
              prefix="$"
              required
            ></v-text-field>
          </v-col>
          <v-col cols="12" md="4">
            <v-select
              v-model="formData.marital_status"
              :items="maritalStatusOptions"
              label="Estado Civil"
              required
            ></v-select>
          </v-col>
        </v-row>
        <!-- Más campos según el modelo -->
        <v-btn
          :disabled="!valid"
          color="primary"
          class="mr-4"
          type="submit"
        >
          Enviar Solicitud
        </v-btn>
      </v-container>
    </v-form>
  </div>
</template>

<script>
export default {
  name: 'ApplicationForm',
  data: () => ({
    valid: false,
    formData: {
      full_name: '',
      identification: '',
      age: null,
      monthly_income: null,
      marital_status: '',
      occupation: '',
      work_experience_years: null,
      credit_score: null,
      home_status: ''
    },
    nameRules: [
      v => !!v || 'Nombre es requerido',
      v => v.length >= 3 || 'Nombre debe tener al menos 3 caracteres'
    ],
    idRules: [
      v => !!v || 'Identificación es requerida',
      v => /^DNI-[0-9]{8}$/.test(v) || 'Formato inválido (DNI-12345678)'
    ],
    ageRules: [
      v => !!v || 'Edad es requerida',
      v => v >= 18 || 'Debe ser mayor de 18 años',
      v => v <= 80 || 'Edad máxima 80 años'
    ],
    incomeRules: [
      v => !!v || 'Ingreso es requerido',
      v => v > 0 || 'Ingreso debe ser mayor a 0'
    ],
    maritalStatusOptions: [
      { text: 'Soltero/a', value: 'SINGLE' },
      { text: 'Casado/a', value: 'MARRIED' },
      { text: 'Divorciado/a', value: 'DIVORCED' },
      { text: 'Viudo/a', value: 'WIDOWED' }
    ]
  }),
  methods: {
    async handleSubmit() {
      if (!this.valid) return;
      
      try {
        // Implementar lógica de envío
        const response = await this.$api.submitApplication(this.formData);
        this.$emit('submission-success', response.data);
      } catch (error) {
        this.$emit('submission-error', error);
      }
    }
  }
}
</script>

# Configuración de Base de Datos

# backend/alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from app.core.config import settings
from app.models import Base

config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata

def run_migrations_offline() -> None:
    url = settings.SQLALCHEMY_DATABASE_URI
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online() -> None:
    configuration = config.get_section(config.config_ini_section)
    configuration["sqlalchemy.url"] = settings.SQLALCHEMY_DATABASE_URI
    connectable = engine_from_config(
        configuration,
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Esta primera etapa establece la base del proyecto con:

  1. Modelos de datos completos para los aplicantes
  2. Schemas de validación con Pydantic
  3. Configuración base de la aplicación
  4. Formulario Vue.js inicial
  5. Configuración de migraciones con Alembic

Los siguientes pasos serían:

  1. Configurar el entorno de desarrollo
  2. Inicializar la base de datos
  3. Probar las validaciones
  4. Verificar el formulario base

Etapa 2: Integración con Amazon Bedrock.

python
# Servicio de Integración con Bedrock

# backend/app/services/bedrock_service.py
from typing import Dict, Any
import boto3
import json
from app.core.config import settings
from app.schemas.applicant import ApplicantInDB
import logging

logger = logging.getLogger(__name__)

class BedrockService:
    def __init__(self):
        self.client = boto3.client(
            service_name='bedrock-runtime',
            region_name=settings.AWS_REGION,
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY
        )
        self.model_id = settings.BEDROCK_MODEL_ID

    def _create_risk_assessment_prompt(self, applicant: ApplicantInDB) -> str:
        return f"""Analiza el siguiente perfil para un préstamo y proporciona una evaluación detallada del riesgo:

Información del Aplicante:
- Nombre: {applicant.full_name}
- Edad: {applicant.age}
- Score Crediticio: {applicant.credit_score}
- DTI: {applicant.dti}%
- Ingreso Mensual: ${applicant.monthly_income}
- Estado Civil: {applicant.marital_status}
- Ocupación: {applicant.occupation}
- Experiencia Laboral: {applicant.work_experience_years} años
- Vivienda: {applicant.home_status}

Factores a considerar:
1. Analiza el historial crediticio y capacidad de pago
2. Evalúa la estabilidad laboral y nivel de ingresos
3. Considera el ratio de endeudamiento (DTI)
4. Analiza el perfil de riesgo general

Proporciona:
1. Nivel de riesgo (BAJO, MEDIO, ALTO)
2. Puntaje de confianza (0-1)
3. Análisis detallado de factores de riesgo
4. Recomendación específica
5. Términos sugeridos para el préstamo si aplica

Formato JSON requerido para la respuesta."""

    async def analyze_risk(self, applicant: ApplicantInDB) -> Dict[str, Any]:
        try:
            prompt = self._create_risk_assessment_prompt(applicant)
            
            response = self.client.invoke_model(
                modelId=self.model_id,
                body=json.dumps({
                    "prompt": prompt,
                    "max_tokens": 1000,
                    "temperature": 0.1,
                    "response_format": {"type": "json_object"}
                })
            )

            response_body = json.loads(response['body'].read())
            
            # Procesar y validar la respuesta
            risk_assessment = self._process_bedrock_response(response_body)
            
            # Guardar en caché si es necesario
            await self._cache_risk_assessment(applicant.id, risk_assessment)
            
            return risk_assessment

        except Exception as e:
            logger.error(f"Error in Bedrock risk analysis: {str(e)}")
            raise

    def _process_bedrock_response(self, response: Dict) -> Dict[str, Any]:
        try:
            # Estructura esperada de la respuesta
            return {
                "risk_level": response["risk_assessment"]["risk_level"],
                "confidence_score": float(response["risk_assessment"]["confidence_score"]),
                "factors": response["risk_assessment"]["factors"],
                "recommendation": response["risk_assessment"]["recommendation"],
                "suggested_terms": response["risk_assessment"]["suggested_terms"]
            }
        except KeyError as e:
            logger.error(f"Invalid Bedrock response structure: {str(e)}")
            raise ValueError("Invalid response structure from Bedrock")

    async def _cache_risk_assessment(self, applicant_id: int, assessment: Dict) -> None:
        # Implementar lógica de caché si es necesario
        pass

# backend/app/services/risk_analyzer.py
from typing import Dict, Any, List
from app.schemas.applicant import ApplicantInDB
from .bedrock_service import BedrockService

class RiskAnalyzer:
    def __init__(self):
        self.bedrock_service = BedrockService()
        self.risk_weights = {
            "credit_score": 0.30,
            "dti": 0.25,
            "work_experience": 0.20,
            "income": 0.15,
            "home_status": 0.10
        }

    async def analyze_applicant(self, applicant: ApplicantInDB) -> Dict[str, Any]:
        # Obtener análisis de Bedrock
        bedrock_analysis = await self.bedrock_service.analyze_risk(applicant)
        
        # Calcular score interno
        internal_score = self._calculate_internal_score(applicant)
        
        # Combinar resultados
        return self._combine_analysis(bedrock_analysis, internal_score)

    def _calculate_internal_score(self, applicant: ApplicantInDB) -> float:
        scores = {
            "credit_score": self._score_credit(applicant.credit_score),
            "dti": self._score_dti(applicant.dti),
            "work_experience": self._score_experience(applicant.work_experience_years),
            "income": self._score_income(applicant.monthly_income),
            "home_status": self._score_home_status(applicant.home_status)
        }
        
        return sum(scores[k] * self.risk_weights[k] for k in scores)

    def _score_credit(self, score: int) -> float:
        if score >= 750: return 1.0
        if score >= 650: return 0.7
        return 0.3

    def _score_dti(self, dti: float) -> float:
        if dti <= 30: return 1.0
        if dti <= 40: return 0.6
        return 0.2

    def _score_experience(self, years: float) -> float:
        if years >= 5: return 1.0
        if years >= 2: return 0.7
        return 0.3

    def _score_income(self, income: float) -> float:
        if income >= 5000: return 1.0
        if income >= 3000: return 0.7
        return 0.4

    def _score_home_status(self, status: str) -> float:
        scores = {"OWNED": 1.0, "FAMILY": 0.7, "RENTED": 0.5}
        return scores.get(status, 0.5)

    def _combine_analysis(
        self, 
        bedrock_analysis: Dict[str, Any], 
        internal_score: float
    ) -> Dict[str, Any]:
        risk_levels = {
            "BAJO": 1.0,
            "MEDIO": 0.5,
            "ALTO": 0.2
        }
        
        bedrock_score = risk_levels.get(bedrock_analysis["risk_level"], 0.5)
        
        # Combinar scores con peso 60/40
        final_score = (bedrock_score * 0.6) + (internal_score * 0.4)
        
        return {
            "final_score": final_score,
            "risk_level": self._get_risk_level(final_score),
            "bedrock_analysis": bedrock_analysis,
            "internal_score": internal_score,
            "details": {
                "confidence": bedrock_analysis["confidence_score"],
                "factors": bedrock_analysis["factors"],
                "recommendation": bedrock_analysis["recommendation"],
                "suggested_terms": bedrock_analysis["suggested_terms"]
            }
        }

    def _get_risk_level(self, score: float) -> str:
        if score >= 0.7: return "BAJO"
        if score >= 0.4: return "MEDIO"
        return "ALTO"

# backend/app/api/endpoints/risk_assessment.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import Dict, Any
from app.api import deps
from app.services.risk_analyzer import RiskAnalyzer
from app.schemas.applicant import ApplicantInDB

router = APIRouter()

@router.post("/analyze/{applicant_id}", response_model=Dict[str, Any])
async def analyze_risk(
    applicant_id: int,
    db: Session = Depends(deps.get_db)
) -> Dict[str, Any]:
    # Obtener aplicante de la base de datos
    applicant = db.query(ApplicantInDB).filter(ApplicantInDB.id == applicant_id).first()
    if not applicant:
        raise HTTPException(status_code=404, detail="Applicant not found")

    # Analizar riesgo
    analyzer = RiskAnalyzer()
    try:
        result = await analyzer.analyze_applicant(applicant)
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Esta segunda etapa implementa:

  1. Integración completa con Amazon Bedrock
  2. Sistema de análisis de riesgo personalizado
  3. Combinación de análisis de Bedrock con scoring interno
  4. API endpoints para evaluación de riesgo
  5. Manejo de errores y logging

Aspectos clave de esta implementación:

  1. Análisis Multi-factor: Combina análisis de IA con reglas de negocio tradicionales
  2. Scoring Ponderado: Utiliza diferentes pesos para cada factor de riesgo
  3. Cache: Preparado para implementar caché de resultados
  4. Error Handling: Manejo robusto de errores y logging
  5. Configurabilidad: Parámetros ajustables para el análisis

Los siguientes pasos serían:

  1. Configurar credenciales de AWS
  2. Probar la integración con Bedrock
  3. Ajustar los pesos y umbrales
  4. Implementar el sistema de caché

Etapa 3: Frontend - Formularios y Visualización.

javascript
// frontend/src/components/RiskDashboard/index.vue
<template>
  <div class="risk-dashboard">
    <v-container>
      <!-- Header con resumen -->
      <v-row class="mb-6">
        <v-col cols="12" md="4">
          <v-card :color="riskLevelColor" dark>
            <v-card-title>
              Nivel de Riesgo
              <v-spacer></v-spacer>
              <risk-badge :level="assessment.risk_level" />
            </v-card-title>
            <v-card-text>
              <div class="text-h3">{{ assessment.final_score * 100 }}%</div>
              <div>Score Final</div>
            </v-card-text>
          </v-card>
        </v-col>
        
        <v-col cols="12" md="8">
          <v-card>
            <v-card-title>Resumen de Evaluación</v-card-title>
            <v-card-text>
              <v-row>
                <v-col cols="6">
                  <div class="text-subtitle-1">Score Interno</div>
                  <div class="text-h5">{{ assessment.internal_score * 100 }}%</div>
                </v-col>
                <v-col cols="6">
                  <div class="text-subtitle-1">Confianza IA</div>
                  <div class="text-h5">{{ assessment.details.confidence * 100 }}%</div>
                </v-col>
              </v-row>
            </v-card-text>
          </v-card>
        </v-col>
      </v-row>

      <!-- Detalles del análisis -->
      <v-row>
        <v-col cols="12" md="6">
          <v-card>
            <v-card-title>Factores de Riesgo</v-card-title>
            <v-card-text>
              <risk-factors-chart :factors="assessment.details.factors" />
              <v-list>
                <v-list-item v-for="(value, key) in assessment.details.factors" :key="key">
                  <v-list-item-content>
                    <v-list-item-title>{{ formatFactorName(key) }}</v-list-item-title>
                    <v-list-item-subtitle>{{ value.details }}</v-list-item-subtitle>
                  </v-list-item-content>
                  <v-list-item-action>
                    <v-chip :color="getScoreColor(value.score)">
                      {{ (value.score * 100).toFixed(1) }}%
                    </v-chip>
                  </v-list-item-action>
                </v-list-item>
              </v-list>
            </v-card-text>
          </v-card>
        </v-col>
        
        <v-col cols="12" md="6">
          <v-card>
            <v-card-title>Recomendación</v-card-title>
            <v-card-text>
              <div class="text-body-1 mb-4">
                {{ assessment.details.recommendation }}
              </div>
              
              <v-divider class="my-4"></v-divider>
              
              <div class="text-h6 mb-2">Términos Sugeridos</div>
              <v-simple-table>
                <template v-slot:default>
                  <tbody>
                    <tr v-for="(value, key) in assessment.details.suggested_terms" :key="key">
                      <td>{{ formatTermName(key) }}</td>
                      <td class="text-right">{{ formatTermValue(key, value) }}</td>
                    </tr>
                  </tbody>
                </template>
              </v-simple-table>
            </v-card-text>
          </v-card>
        </v-col>
      </v-row>
    </v-container>
  </div>
</template>

<script>
import RiskBadge from './RiskBadge.vue'
import RiskFactorsChart from './RiskFactorsChart.vue'

export default {
  name: 'RiskDashboard',
  
  components: {
    RiskBadge,
    RiskFactorsChart
  },
  
  props: {
    assessment: {
      type: Object,
      required: true
    }
  },
  
  computed: {
    riskLevelColor() {
      const colors = {
        BAJO: 'success',
        MEDIO: 'warning',
        ALTO: 'error'
      }
      return colors[this.assessment.risk_level] || 'info'
    }
  },
  
  methods: {
    formatFactorName(key) {
      const names = {
        credit_history: 'Historial Crediticio',
        income_stability: 'Estabilidad de Ingresos',
        debt_burden: 'Carga de Deuda'
      }
      return names[key] || key
    },
    
    getScoreColor(score) {
      if (score >= 0.7) return 'success'
      if (score >= 0.4) return 'warning'
      return 'error'
    },
    
    formatTermName(key) {
      const names = {
        max_amount: 'Monto Máximo',
        interest_rate: 'Tasa de Interés',
        term_months: 'Plazo (meses)'
      }
      return names[key] || key
    },
    
    formatTermValue(key, value) {
      switch (key) {
        case 'max_amount':
          return new Intl.NumberFormat('es-ES', {
            style: 'currency',
            currency: 'USD'
          }).format(value)
        case 'interest_rate':
          return `${value}%`
        default:
          return value
      }
    }
  }
}
</script>

// frontend/src/components/RiskDashboard/RiskFactorsChart.vue
<template>
  <div class="risk-factors-chart">
    <apexchart
      type="radar"
      height="300"
      :options="chartOptions"
      :series="chartSeries"
    ></apexchart>
  </div>
</template>

<script>
import VueApexCharts from 'vue-apexcharts'

export default {
  name: 'RiskFactorsChart',
  
  components: {
    apexchart: VueApexCharts
  },
  
  props: {
    factors: {
      type: Object,
      required: true
    }
  },
  
  computed: {
    chartOptions() {
      return {
        chart: {
          toolbar: {
            show: false
          }
        },
        xaxis: {
          categories: Object.keys(this.factors).map(this.formatFactorName)
        },
        yaxis: {
          max: 1,
          min: 0,
          tickAmount: 4
        },
        markers: {
          size: 4,
          hover: {
            size: 6
          }
        },
        theme: {
          palette: 'palette2'
        }
      }
    },
    
    chartSeries() {
      return [{
        name: 'Score',
        data: Object.values(this.factors).map(f => f.score)
      }]
    }
  },
  
  methods: {
    formatFactorName(key) {
      return key.split('_').map(word => 
        word.charAt(0).toUpperCase() + word.slice(1)
      ).join(' ')
    }
  }
}
</script>

// frontend/src/components/RiskDashboard/RiskBadge.vue
<template>
  <v-chip
    :color="color"
    text-color="white"
    label
    large
  >
    {{ level }}
  </v-chip>
</template>

<script>
export default {
  name: 'RiskBadge',
  
  props: {
    level: {
      type: String,
      required: true,
      validator: value => ['BAJO', 'MEDIO', 'ALTO'].includes(value)
    }
  },
  
  computed: {
    color() {
      const colors = {
        BAJO: 'success',
        MEDIO: 'warning',
        ALTO: 'error'
      }
      return colors[this.level]
    }
  }
}
</script>

// frontend/src/store/modules/riskAssessment.js
import api from '@/api'

export default {
  namespaced: true,
  
  state: {
    currentAssessment: null,
    loading: false,
    error: null,
    history: []
  },
  
  mutations: {
    SET_ASSESSMENT(state, assessment) {
      state.currentAssessment = assessment
    },
    SET_LOADING(state, loading) {
      state.loading = loading
    },
    SET_ERROR(state, error) {
      state.error = error
    },
    ADD_TO_HISTORY(state, assessment) {
      state.history.push({
        ...assessment,
        timestamp: new Date()
      })
    }
  },
  
  actions: {
    async analyzeRisk({ commit }, applicantId) {
      commit('SET_LOADING', true)
      commit('SET_ERROR', null)
      
      try {
        const response = await api.post(`/risk/analyze/${applicantId}`)
        commit('SET_ASSESSMENT', response.data)
        commit('ADD_TO_HISTORY', response.data)
        return response.data
      } catch (error) {
        commit('SET_ERROR', error.message)
        throw error
      } finally {
        commit('SET_LOADING', false)
      }
    },
    
    clearAssessment({ commit }) {
      commit('SET_ASSESSMENT', null)
      commit('SET_ERROR', null)
    }
  },
  
  getters: {
    hasActiveAssessment: state => !!state.currentAssessment,
    riskLevel: state => state.currentAssessment?.risk_level || null,
    factors: state => state.currentAssessment?.details?.factors || {},
    recommendation: state => state.currentAssessment?.details?.recommendation || '',
    recentAssessments: state => state.history.slice(-5).reverse()
  }
}

// frontend/src/api/index.js
import axios from 'axios'

const api = axios.create({
  baseURL: process.env.VUE_APP_API_URL,
  headers: {
    'Content-Type': 'application/json'
  }
})

api.interceptors.request.use(config => {
  // Agregar token si existe
  const token = localStorage.getItem('token')
  if (token) {
    config.headers.Authorization = `Bearer ${token}`
  }
  return config
})

api.interceptors.response.use(
  response => response,
  error => {
    if (error.response?.status === 401) {
      // Manejar error de autenticación
      localStorage.removeItem('token')
      window.location.href = '/login'
    }
    return Promise.reject(error)
  }
)

export default api

Esta tercera etapa implementa:

  1. Dashboard completo para visualización de riesgo
  2. Gráficos interactivos para factores de riesgo
  3. Gestión de estado con Vuex
  4. Integración con API
  5. Componentes reutilizables

Características principales:

  1. Visualización Intuitiva: Uso de colores y badges para indicar niveles de riesgo
  2. Gráficos Interactivos: Radar chart para factores de riesgo
  3. Estado Centralizado: Manejo de estado con Vuex
  4. Componentes Modulares: Estructura modular y reutilizable
  5. Error Handling: Manejo robusto de errores y loading states

Los siguientes pasos serían:

  1. Integrar los componentes en las vistas principales
  2. Configurar el router
  3. Implementar pruebas de componentes
  4. Agregar animaciones y transiciones

Etapa 4: Backend - Lógica de Negocio, enfocándonos en el procesamiento y las reglas de negocio.

python
# backend/app/services/loan_rules_engine.py
from dataclasses import dataclass
from typing import List, Dict, Any
from decimal import Decimal
import logging

logger = logging.getLogger(__name__)

@dataclass
class LoanTerms:
    max_amount: Decimal
    interest_rate: Decimal
    term_months: int
    monthly_payment: Decimal
    required_documents: List[str]

class LoanRulesEngine:
    def __init__(self):
        self.base_interest_rate = Decimal('0.10')  # 10% base
        self.max_dti_ratio = Decimal('0.43')  # 43% max DTI
        self.min_credit_score = 600
        
        # Multiplicadores por nivel de riesgo
        self.risk_multipliers = {
            "BAJO": Decimal('1.0'),
            "MEDIO": Decimal('1.25'),
            "ALTO": Decimal('1.5')
        }

    def calculate_loan_terms(
        self,
        monthly_income: Decimal,
        credit_score: int,
        dti: Decimal,
        risk_level: str,
        work_experience_years: float
    ) -> LoanTerms:
        try:
            # Validar elegibilidad básica
            if not self._is_eligible(credit_score, dti):
                raise ValueError("No elegible para préstamo")

            # Calcular monto máximo
            max_amount = self._calculate_max_amount(monthly_income, dti, risk_level)
            
            # Calcular tasa de interés
            interest_rate = self._calculate_interest_rate(credit_score, risk_level, work_experience_years)
            
            # Determinar plazo máximo
            term_months = self._determine_term_months(max_amount, risk_level)
            
            # Calcular pago mensual
            monthly_payment = self._calculate_monthly_payment(max_amount, interest_rate, term_months)
            
            # Determinar documentos requeridos
            required_docs = self._determine_required_documents(max_amount, risk_level)

            return LoanTerms(
                max_amount=max_amount,
                interest_rate=interest_rate,
                term_months=term_months,
                monthly_payment=monthly_payment,
                required_documents=required_docs
            )

        except Exception as e:
            logger.error(f"Error calculating loan terms: {str(e)}")
            raise

    def _is_eligible(self, credit_score: int, dti: Decimal) -> bool:
        return credit_score >= self.min_credit_score and dti <= self.max_dti_ratio

    def _calculate_max_amount(
        self,
        monthly_income: Decimal,
        dti: Decimal,
        risk_level: str
    ) -> Decimal:
        # Base: 36 meses de ingreso
        base_amount = monthly_income * 36
        
        # Ajustar por DTI
        dti_factor = (self.max_dti_ratio - dti) / self.max_dti_ratio
        
        # Ajustar por nivel de riesgo
        risk_factor = Decimal('1.0') / self.risk_multipliers[risk_level]
        
        max_amount = base_amount * dti_factor * risk_factor
        
        # Redondear a miles más cercanos
        return (max_amount // 1000) * 1000

    def _calculate_interest_rate(
        self,
        credit_score: int,
        risk_level: str,
        work_experience_years: float
    ) -> Decimal:
        # Tasa base
        rate = self.base_interest_rate
        
        # Ajuste por score crediticio
        if credit_score >= 800:
            rate -= Decimal('0.02')
        elif credit_score >= 750:
            rate -= Decimal('0.01')
        elif credit_score < 650:
            rate += Decimal('0.02')
            
        # Ajuste por experiencia laboral
        if work_experience_years >= 5:
            rate -= Decimal('0.005')
        elif work_experience_years < 2:
            rate += Decimal('0.01')
            
        # Ajuste por nivel de riesgo
        rate *= self.risk_multipliers[risk_level]
        
        return rate.quantize(Decimal('0.0001'))

    def _determine_term_months(self, amount: Decimal, risk_level: str) -> int:
        base_term = 36  # Base: 36 meses
        
        if amount > Decimal('50000'):
            base_term = 60
        elif amount > Decimal('100000'):
            base_term = 84
            
        # Ajustar por nivel de riesgo
        if risk_level == "ALTO":
            base_term = min(base_term, 48)
            
        return base_term

    def _calculate_monthly_payment(
        self,
        amount: Decimal,
        annual_rate: Decimal,
        term_months: int
    ) -> Decimal:
        monthly_rate = annual_rate / 12
        payment = amount * (
            monthly_rate * (1 + monthly_rate) ** term_months
        ) / ((1 + monthly_rate) ** term_months - 1)
        
        return payment.quantize(Decimal('0.01'))

    def _determine_required_documents(
        self,
        amount: Decimal,
        risk_level: str
    ) -> List[str]:
        docs = [
            "Identificación oficial",
            "Comprobante de ingresos",
            "Comprobante de domicilio"
        ]
        
        if amount > Decimal('50000'):
            docs.extend([
                "Declaración de impuestos",
                "Estados de cuenta bancarios"
            ])
            
        if risk_level == "ALTO":
            docs.extend([
                "Aval o garantía",
                "Historial laboral detallado"
            ])
            
        return docs

# backend/app/services/loan_processor.py
from decimal import Decimal
from typing import Dict, Any
from .loan_rules_engine import LoanRulesEngine
from .risk_analyzer import RiskAnalyzer
from app.schemas.applicant import ApplicantInDB
from app.models.loan import LoanApplication, LoanStatus

class LoanProcessor:
    def __init__(self):
        self.rules_engine = LoanRulesEngine()
        self.risk_analyzer = RiskAnalyzer()

    async def process_application(
        self,
        applicant: ApplicantInDB
    ) -> Dict[str, Any]:
        try:
            # Analizar riesgo
            risk_assessment = await self.risk_analyzer.analyze_applicant(applicant)
            
            # Si el riesgo es muy alto, rechazar automáticamente
            if risk_assessment["risk_level"] == "ALTO" and risk_assessment["final_score"] < 0.3:
                return self._create_rejection_response(
                    "Riesgo crediticio demasiado alto",
                    risk_assessment
                )

            # Calcular términos del préstamo
            loan_terms = self.rules_engine.calculate_loan_terms(
                monthly_income=Decimal(str(applicant.monthly_income)),
                credit_score=applicant.credit_score,
                dti=Decimal(str(applicant.dti)),
                risk_level=risk_assessment["risk_level"],
                work_experience_years=applicant.work_experience_years
            )

            # Validar si los términos son viables
            if not self._validate_terms(loan_terms, applicant):
                return self._create_rejection_response(
                    "Términos del préstamo no viables",
                    risk_assessment,
                    loan_terms
                )

            return {
                "status": "APPROVED",
                "risk_assessment": risk_assessment,
                "loan_terms": {
                    "max_amount": float(loan_terms.max_amount),
                    "interest_rate": float(loan_terms.interest_rate),
                    "term_months": loan_terms.term_months,
                    "monthly_payment": float(loan_terms.monthly_payment),
                    "required_documents": loan_terms.required_documents
                },
                "applicant_id": applicant.id
            }

        except Exception as e:
            logger.error(f"Error processing loan application: {str(e)}")
            return self._create_error_response(str(e))

    def _validate_terms(self, terms: LoanTerms, applicant: ApplicantInDB) -> bool:
        # Validar que el pago mensual no exceda cierto porcentaje del ingreso
        max_payment_ratio = Decimal('0.35')  # 35% del ingreso mensual
        max_allowed_payment = Decimal(str(applicant.monthly_income)) * max_payment_ratio
        
        return terms.monthly_payment <= max_allowed_payment

    def _create_rejection_response(
        self,
        reason: str,
        risk_assessment: Dict[str, Any],
        loan_terms: LoanTerms = None
    ) -> Dict[str, Any]:
        response = {
            "status": "REJECTED",
            "reason": reason,
            "risk_assessment": risk_assessment
        }
        
        if loan_terms:
            response["loan_terms"] = {
                "max_amount": float(loan_terms.max_amount),
                "interest_rate": float(loan_terms.interest_rate),
                "term_months": loan_terms.term_months,
                "monthly_payment": float(loan_terms.monthly_payment)
            }
            
        return response

    def _create_error_response(self, error_message: str) -> Dict[str, Any]:
        return {
            "status": "ERROR",
            "error_message": error_message
        }

# backend/app/api/endpoints/loan_application.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import Dict, Any
from app.api import deps
from app.services.loan_processor import LoanProcessor
from app.schemas.applicant import ApplicantInDB
from app.models.loan import LoanApplication

router = APIRouter()

@router.post("/apply/{applicant_id}", response_model=Dict[str, Any])
async def process_loan_application(
    applicant_id: int,
    db: Session = Depends(deps.get_db)
) -> Dict[str, Any]:
    # Obtener aplicante
    applicant = db.query(ApplicantInDB).filter(
        ApplicantInDB.id == applicant_id
    ).first()
    
    if not applicant:
        raise HTTPException(status_code=404, detail="Aplicante no encontrado")
        
    # Procesar solicitud
    processor = LoanProcessor()
    result = await processor.process_application(applicant)
    
    # Guardar resultado
    loan_application = LoanApplication(
        applicant_id=applicant_id,
        status=result["status"],
        max_amount=result.get("loan_terms", {}).get("max_amount"),
        interest_rate=result.get("loan_terms", {}).get("interest_rate"),
        term_months=result.get("loan_terms", {}).get("term_months"),
        monthly_payment=result.get("loan_terms", {}).get("monthly_payment"),
        risk_level=result["risk_assessment"]["risk_level"],
        risk_score=result["risk_assessment"]["final_score"]
    )
    
    db.add(loan_application)
    db.commit()
    db.refresh(loan_application)
    
    return result

Esta cuarta etapa implementa:

  1. Motor de reglas de préstamos completo
  2. Procesador de solicitudes de préstamos
  3. Cálculos financieros precisos
  4. Validaciones de negocio
  5. Endpoints de API para procesamiento

Características principales:

  1. Cálculos Precisos: Uso de Decimal para cálculos financieros
  2. Reglas Configurables: Sistema flexible de reglas de negocio
  3. Validaciones Completas: Múltiples niveles de validación
  4. Manejo de Errores: Sistema robusto de manejo de errores
  5. Documentación: Documentación clara de requerimientos

Los siguientes pasos serían:

  1. Implementar pruebas unitarias para la lógica de negocio
  2. Configurar parámetros para diferentes productos
  3. Agregar más validaciones específicas
  4. Implementar logging detallado

Etapa 5: Sistema de Evaluación, que implementará un sistema más sofisticado de evaluación y puntuación.

python
# backend/app/services/scoring/score_engine.py
from dataclasses import dataclass
from typing import Dict, List, Any, Optional
from datetime import datetime
import numpy as np
from app.schemas.applicant import ApplicantInDB

@dataclass
class ScoringFactor:
    name: str
    weight: float
    thresholds: Dict[str, float]
    description: str

@dataclass
class ScoreResult:
    total_score: float
    factor_scores: Dict[str, float]
    risk_level: str
    recommendations: List[str]
    details: Dict[str, Any]
    confidence: float

class ScoringEngine:
    def __init__(self):
        self.scoring_factors = {
            "credit_score": ScoringFactor(
                name="Credit Score",
                weight=0.30,
                thresholds={
                    "excellent": 800,
                    "good": 720,
                    "fair": 640,
                    "poor": 580
                },
                description="Historial crediticio del aplicante"
            ),
            "dti_ratio": ScoringFactor(
                name="DTI Ratio",
                weight=0.25,
                thresholds={
                    "excellent": 20,
                    "good": 30,
                    "fair": 40,
                    "poor": 45
                },
                description="Ratio de deuda sobre ingreso"
            ),
            "employment": ScoringFactor(
                name="Employment",
                weight=0.20,
                thresholds={
                    "excellent": 5,
                    "good": 3,
                    "fair": 1,
                    "poor": 0.5
                },
                description="Estabilidad laboral"
            ),
            "income": ScoringFactor(
                name="Income",
                weight=0.15,
                thresholds={
                    "excellent": 5000,
                    "good": 3500,
                    "fair": 2500,
                    "poor": 1500
                },
                description="Nivel de ingresos mensuales"
            ),
            "housing": ScoringFactor(
                name="Housing",
                weight=0.10,
                thresholds={
                    "owned": 1.0,
                    "family": 0.7,
                    "rented": 0.5
                },
                description="Situación de vivienda"
            )
        }

    def evaluate_applicant(self, applicant: ApplicantInDB) -> ScoreResult:
        try:
            # Calcular scores individuales
            factor_scores = {}
            weighted_scores = []
            
            # Credit Score
            credit_score = self._evaluate_credit_score(applicant.credit_score)
            factor_scores["credit_score"] = credit_score
            weighted_scores.append(credit_score * self.scoring_factors["credit_score"].weight)
            
            # DTI Ratio
            dti_score = self._evaluate_dti(applicant.dti)
            factor_scores["dti_ratio"] = dti_score
            weighted_scores.append(dti_score * self.scoring_factors["dti_ratio"].weight)
            
            # Employment
            employment_score = self._evaluate_employment(applicant.work_experience_years)
            factor_scores["employment"] = employment_score
            weighted_scores.append(employment_score * self.scoring_factors["employment"].weight)
            
            # Income
            income_score = self._evaluate_income(applicant.monthly_income)
            factor_scores["income"] = income_score
            weighted_scores.append(income_score * self.scoring_factors["income"].weight)
            
            # Housing
            housing_score = self._evaluate_housing(applicant.home_status)
            factor_scores["housing"] = housing_score
            weighted_scores.append(housing_score * self.scoring_factors["housing"].weight)
            
            # Calcular score total
            total_score = sum(weighted_scores)
            
            # Determinar nivel de riesgo
            risk_level = self._determine_risk_level(total_score)
            
            # Generar recomendaciones
            recommendations = self._generate_recommendations(factor_scores)
            
            # Calcular confianza
            confidence = self._calculate_confidence(factor_scores)
            
            return ScoreResult(
                total_score=total_score,
                factor_scores=factor_scores,
                risk_level=risk_level,
                recommendations=recommendations,
                details=self._generate_details(factor_scores, applicant),
                confidence=confidence
            )

        except Exception as e:
            logger.error(f"Error in scoring evaluation: {str(e)}")
            raise

    def _evaluate_credit_score(self, score: int) -> float:
        thresholds = self.scoring_factors["credit_score"].thresholds
        
        if score >= thresholds["excellent"]:
            return 1.0
        elif score >= thresholds["good"]:
            return 0.8
        elif score >= thresholds["fair"]:
            return 0.6
        elif score >= thresholds["poor"]:
            return 0.4
        return 0.2

    def _evaluate_dti(self, dti: float) -> float:
        thresholds = self.scoring_factors["dti_ratio"].thresholds
        
        if dti <= thresholds["excellent"]:
            return 1.0
        elif dti <= thresholds["good"]:
            return 0.8
        elif dti <= thresholds["fair"]:
            return 0.6
        elif dti <= thresholds["poor"]:
            return 0.4
        return 0.2

    def _evaluate_employment(self, years: float) -> float:
        thresholds = self.scoring_factors["employment"].thresholds
        
        if years >= thresholds["excellent"]:
            return 1.0
        elif years >= thresholds["good"]:
            return 0.8
        elif years >= thresholds["fair"]:
            return 0.6
        elif years >= thresholds["poor"]:
            return 0.4
        return 0.2

    def _evaluate_income(self, income: float) -> float:
        thresholds = self.scoring_factors["income"].thresholds
        
        if income >= thresholds["excellent"]:
            return 1.0
        elif income >= thresholds["good"]:
            return 0.8
        elif income >= thresholds["fair"]:
            return 0.6
        elif income >= thresholds["poor"]:
            return 0.4
        return 0.2

    def _evaluate_housing(self, status: str) -> float:
        thresholds = self.scoring_factors["housing"].thresholds
        return thresholds.get(status.lower(), 0.3)

    def _determine_risk_level(self, total_score: float) -> str:
        if total_score >= 0.8:
            return "BAJO"
        elif total_score >= 0.6:
            return "MEDIO"
        return "ALTO"

    def _generate_recommendations(self, factor_scores: Dict[str, float]) -> List[str]:
        recommendations = []
        
        if factor_scores["credit_score"] < 0.6:
            recommendations.append(
                "Mejorar historial crediticio y puntaje de crédito"
            )
            
        if factor_scores["dti_ratio"] < 0.6:
            recommendations.append(
                "Reducir nivel de endeudamiento actual"
            )
            
        if factor_scores["employment"] < 0.6:
            recommendations.append(
                "Demostrar mayor estabilidad laboral"
            )
            
        if factor_scores["income"] < 0.6:
            recommendations.append(
                "Buscar fuentes adicionales de ingreso"
            )
            
        return recommendations

    def _calculate_confidence(self, factor_scores: Dict[str, float]) -> float:
        # Calcular desviación estándar de los scores
        scores_array = np.array(list(factor_scores.values()))
        std_dev = np.std(scores_array)
        
        # Mayor dispersión = menor confianza
        base_confidence = 1.0 - (std_dev * 0.5)
        
        # Ajustar por completitud de datos
        completeness = 1.0  # Implementar lógica de completitud
        
        return min(max(base_confidence * completeness, 0.0), 1.0)

    def _generate_details(
        self,
        factor_scores: Dict[str, float],
        applicant: ApplicantInDB
    ) -> Dict[str, Any]:
        return {
            "evaluation_date": datetime.utcnow().isoformat(),
            "factors": {
                name: {
                    "score": score,
                    "weight": self.scoring_factors[name].weight,
                    "description": self.scoring_factors[name].description
                }
                for name, score in factor_scores.items()
            },
            "applicant_snapshot": {
                "credit_score": applicant.credit_score,
                "dti": applicant.dti,
                "monthly_income": applicant.monthly_income,
                "work_experience": applicant.work_experience_years,
                "home_status": applicant.home_status
            }
        }

# backend/app/services/scoring/matrix_evaluator.py
from typing import Dict, Any, List, Tuple
import numpy as np
from sklearn.preprocessing import MinMaxScaler

class RiskMatrix:
    def __init__(self):
        # Matriz de pesos para diferentes combinaciones de factores
        self.weight_matrix = {
            ("credit_score", "dti_ratio"): 1.5,  # Correlación fuerte
            ("credit_score", "income"): 1.2,
            ("dti_ratio", "income"): 1.3,
            ("employment", "income"): 1.4,
            ("employment", "housing"): 1.1
        }
        
        # Factores de ajuste por combinaciones específicas
        self.adjustment_rules = [
            {
                "factors": ["credit_score", "dti_ratio"],
                "condition": lambda x, y: x > 0.8 and y > 0.7,
                "adjustment": 1.1
            },
            {
                "factors": ["income", "employment"],
                "condition": lambda x, y: x > 0.7 and y > 0.8,
                "adjustment": 1.15
            }
        ]

    def evaluate_combinations(
        self,
        factor_scores: Dict[str, float]
    ) -> Tuple[float, List[Dict[str, Any]]]:
        adjustments = []
        total_adjustment = 1.0
        
        # Evaluar combinaciones de factores
        for (factor1, factor2), weight in self.weight_matrix.items():
            if factor1 in factor_scores and factor2 in factor_scores:
                score1 = factor_scores[factor1]
                score2 = factor_scores[factor2]
                
                # Calcular score combinado
                combined_score = np.mean([score1, score2]) * weight
                
                # Aplicar reglas de ajuste
                for rule in self.adjustment_rules:
                    if set(rule["factors"]) == {factor1, factor2}:
                        if rule["condition"](score1, score2):
                            total_adjustment *= rule["adjustment"]
                            adjustments.append({
                                "factors": [factor1, factor2],
                                "reason": "Combinación favorable",
                                "adjustment": rule["adjustment"]
                            })
                
                # Registrar interacción
                adjustments.append({
                    "factors": [factor1, factor2],
                    "combined_score": combined_score,
                    "weight": weight
                })
        
        return total_adjustment, adjustments

# backend/app/services/scoring/trend_analyzer.py
from typing import List, Dict, Any
import pandas as pd
from datetime import datetime, timedelta

class TrendAnalyzer:
    def __init__(self, db_session):
        self.db_session = db_session

    async def analyze_trends(
        self,
        applicant_id: int,
        lookback_days: int = 90
    ) -> Dict[str, Any]:
        # Obtener historial de evaluaciones
        cutoff_date = datetime.utcnow() - timedelta(days=lookback_days)
        historical_scores = await self._get_historical_scores(applicant_id, cutoff_date)
        
        if not historical_scores:
            return {"trend_data": None, "analysis": None}
        
        # Convertir a DataFrame
        df = pd.DataFrame(historical_scores)
        df['evaluation_date'] = pd.to_datetime(df['evaluation_date'])
        
        # Análisis de tendencias
        trends = {
            "score_trend": self._analyze_score_trend(df),
            "factor_trends": self._analyze_factor_trends(df),
            "volatility": self._calculate_volatility(df),
            "improvement_areas": self._identify_improvement_areas(df)
        }
        
        return trends

    async def _get_historical_scores(
        self,
        applicant_id: int,
        cutoff_date: datetime
    ) -> List[Dict[str, Any]]:
        # Implementar query a la base de datos
        query = """
        SELECT 
            total_score,
            factor_scores,
            evaluation_date
        FROM risk_evaluations
        WHERE applicant_id = :applicant_id
        AND evaluation_date >= :cutoff_date
        ORDER BY evaluation_date ASC
        """
        
        result = await self.db_session.execute(
            query,
            {"applicant_id": applicant_id, "cutoff_date": cutoff_date}
        )
        
        return [dict(row) for row in result]

    def _analyze_score_trend(self, df: pd.DataFrame) -> Dict[str, Any]:
        # Calcular tendencia general
        score_series = df['total_score']
        trend = {
            "start_score": score_series.iloc[0],
            "end_score": score_series.iloc[-1],
            "min_score": score_series.min(),
            "max_score": score_series.max(),
            "avg_score": score_series.mean(),
            "trend_direction": "up" if score_series.iloc[-1] > score_series.iloc[0] else "down",
            "improvement_rate": ((score_series.iloc[-1] - score_series.iloc[0]) / 
                               score_series.iloc[0] * 100)
        }
        
        return trend

    def _analyze_factor_trends(self, df: pd.DataFrame) -> Dict[str, Dict[str, float]]:
        factor_trends = {}
        
        # Analizar cada factor
        for factor in df['factor_scores'].iloc[0].keys():
            factor_series = df['factor_scores'].apply(lambda x: x[factor])
            
            factor_trends[factor] = {
                "start_value": factor_series.iloc[0],
                "end_value": factor_series.iloc[-1],
                "improvement_rate": ((factor_series.iloc[-1] - factor_series.iloc[0]) / 
                                   factor_series.iloc[0] * 100),
                "volatility": factor_series.std()
            }
            
        return factor_trends

    def _calculate_volatility(self, df: pd.DataFrame) -> Dict[str, float]:
        # Calcular volatilidad general y por factor
        volatility = {
            "total_score": df['total_score'].std(),
            "factors": {
                factor: df['factor_scores'].apply(lambda x: x[factor]).std()
                for factor in df['factor_scores'].iloc[0].keys()
            },
            "stability_score": self._calculate_stability_score(df),
            "confidence_interval": self._calculate_confidence_interval(df)
        }
        
        return volatility

    def _identify_improvement_areas(self, df: pd.DataFrame) -> List[Dict[str, Any]]:
        improvements = []
        latest_scores = df['factor_scores'].iloc[-1]
        
        for factor, score in latest_scores.items():
            historical_scores = df['factor_scores'].apply(lambda x: x[factor])
            
            if score < 0.6:  # Factor necesita mejora
                improvements.append({
                    "factor": factor,
                    "current_score": score,
                    "potential_improvement": 1.0 - score,
                    "historical_best": historical_scores.max(),
                    "recommendation": self._get_factor_recommendation(factor, score)
                })
        
        return sorted(improvements, key=lambda x: x['potential_improvement'], reverse=True)

    def _calculate_stability_score(self, df: pd.DataFrame) -> float:
        # Calcular qué tan estable han sido los scores a lo largo del tiempo
        score_std = df['total_score'].std()
        max_variation = df['total_score'].max() - df['total_score'].min()
        
        stability = 1.0 - (score_std / max_variation if max_variation > 0 else 0)
        return round(stability, 2)

    def _calculate_confidence_interval(self, df: pd.DataFrame) -> Dict[str, float]:
        # Calcular intervalos de confianza para el score
        scores = df['total_score']
        std_dev = scores.std()
        mean = scores.mean()
        
        return {
            "lower_bound": max(0, mean - (2 * std_dev)),
            "upper_bound": min(1, mean + (2 * std_dev)),
            "mean": mean
        }

    def _get_factor_recommendation(self, factor: str, score: float) -> str:
        recommendations = {
            "credit_score": {
                "low": "Mejorar historial de pagos y reducir utilización de crédito",
                "medium": "Mantener pagos puntuales y reducir deudas existentes",
                "high": "Mantener excelente historial crediticio"
            },
            "dti_ratio": {
                "low": "Reducir deudas actuales o aumentar ingresos",
                "medium": "Evitar nuevas deudas y considerar consolidación",
                "high": "Mantener bajo nivel de endeudamiento"
            },
            "employment": {
                "low": "Buscar mayor estabilidad laboral",
                "medium": "Mantener posición actual y buscar crecimiento",
                "high": "Continuar desarrollo profesional"
            },
            "income": {
                "low": "Explorar oportunidades de ingreso adicional",
                "medium": "Buscar aumento o promoción",
                "high": "Mantener nivel de ingresos actual"
            },
            "housing": {
                "low": "Considerar opciones más estables de vivienda",
                "medium": "Mantener situación actual de vivienda",
                "high": "Situación de vivienda óptima"
            }
        }
        
        if score < 0.4:
            level = "low"
        elif score < 0.7:
            level = "medium"
        else:
            level = "high"
            
        return recommendations.get(factor, {}).get(level, "No hay recomendación específica")

# backend/app/services/scoring/predictive_evaluator.py
from typing import Dict, Any, List
import numpy as np
from sklearn.ensemble import IsolationForest
from datetime import datetime

class PredictiveEvaluator:
    def __init__(self):
        self.anomaly_detector = IsolationForest(
            contamination=0.1,
            random_state=42
        )
        
    def evaluate_risk_patterns(
        self,
        current_evaluation: Dict[str, Any],
        historical_data: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        try:
            # Preparar datos para análisis
            features = self._prepare_features(current_evaluation, historical_data)
            
            # Detectar anomalías
            anomaly_score = self._detect_anomalies(features)
            
            # Analizar patrones de riesgo
            risk_patterns = self._analyze_risk_patterns(
                current_evaluation,
                historical_data
            )
            
            # Predecir tendencia
            trend_prediction = self._predict_trend(historical_data)
            
            return {
                "anomaly_score": anomaly_score,
                "risk_patterns": risk_patterns,
                "trend_prediction": trend_prediction,
                "evaluation_timestamp": datetime.utcnow().isoformat(),
                "confidence_score": self._calculate_prediction_confidence(
                    anomaly_score,
                    risk_patterns
                )
            }
            
        except Exception as e:
            logger.error(f"Error in predictive evaluation: {str(e)}")
            raise

    def _prepare_features(
        self,
        current: Dict[str, Any],
        historical: List[Dict[str, Any]]
    ) -> np.ndarray:
        # Extraer características relevantes
        features = []
        
        for evaluation in [current] + historical:
            feature_vector = [
                evaluation["factor_scores"].get("credit_score", 0),
                evaluation["factor_scores"].get("dti_ratio", 0),
                evaluation["factor_scores"].get("employment", 0),
                evaluation["factor_scores"].get("income", 0),
                evaluation["total_score"]
            ]
            features.append(feature_vector)
            
        return np.array(features)

    def _detect_anomalies(self, features: np.ndarray) -> float:
        # Entrenar detector y obtener score de anomalía
        self.anomaly_detector.fit(features)
        
        # -1 para anomalías, 1 para normales
        scores = self.anomaly_detector.score_samples(features)
        
        # Normalizar scores a [0,1] donde 1 es más anómalo
        normalized_score = 1 - ((scores - scores.min()) / (scores.max() - scores.min()))
        
        return float(normalized_score[0])  # Score para evaluación actual

    def _analyze_risk_patterns(
        self,
        current: Dict[str, Any],
        historical: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        patterns = {
            "factor_stability": {},
            "risk_transitions": {},
            "correlation_patterns": {}
        }
        
        # Analizar estabilidad de factores
        for factor in current["factor_scores"]:
            historical_scores = [h["factor_scores"].get(factor, 0) for h in historical]
            current_score = current["factor_scores"][factor]
            
            patterns["factor_stability"][factor] = {
                "mean": np.mean(historical_scores),
                "std": np.std(historical_scores),
                "current_deviation": abs(current_score - np.mean(historical_scores)),
                "trend": self._calculate_trend(historical_scores + [current_score])
            }
        
        # Analizar transiciones de riesgo
        risk_levels = [h.get("risk_level") for h in historical]
        if risk_levels:
            patterns["risk_transitions"] = {
                "most_common": max(set(risk_levels), key=risk_levels.count),
                "current": current.get("risk_level"),
                "stability": len(set(risk_levels[-3:])) if len(risk_levels) >= 3 else None
            }
        
        # Analizar correlaciones entre factores
        factor_scores = np.array([[s["factor_scores"].get(f, 0) 
                                 for f in current["factor_scores"]]
                                for s in historical + [current]])
        if len(factor_scores) > 1:
            corr_matrix = np.corrcoef(factor_scores.T)
            patterns["correlation_patterns"] = {
                "strongest_correlation": float(np.max(np.abs(corr_matrix - np.eye(len(corr_matrix))))),
                "factor_pairs": self._get_significant_correlations(corr_matrix, list(current["factor_scores"].keys()))
            }
            
        return patterns

    def _predict_trend(
        self,
        historical_data: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        if len(historical_data) < 3:
            return {"confidence": 0, "direction": None}
            
        recent_scores = [h["total_score"] for h in historical_data[-3:]]
        
        trend = {
            "direction": "improving" if recent_scores[-1] > recent_scores[0] else "declining",
            "rate": (recent_scores[-1] - recent_scores[0]) / len(recent_scores),
            "volatility": np.std(recent_scores),
            "confidence": self._calculate_trend_confidence(recent_scores)
        }
        
        return trend

    def _calculate_prediction_confidence(
        self,
        anomaly_score: float,
        patterns: Dict[str, Any]
    ) -> float:
        # Factores que afectan la confianza
        stability_scores = [p["std"] for p in patterns["factor_stability"].values()]
        factor_stability = 1 - np.mean(stability_scores) if stability_scores else 0
        
        # Penalizar por anomalías y baja estabilidad
        confidence = 1.0
        confidence *= (1 - anomaly_score * 0.5)  # Anomalías reducen confianza
        confidence *= (0.5 + factor_stability * 0.5)  # Estabilidad aumenta confianza
        
        return max(min(confidence, 1.0), 0.0)

    def _get_significant_correlations(
        self,
        corr_matrix: np.ndarray,
        factor_names: List[str]
    ) -> List[Dict[str, Any]]:
        significant_pairs = []
        threshold = 0.7  # Correlación significativa
        
        for i in range(len(corr_matrix)):
            for j in range(i + 1, len(corr_matrix)):
                if abs(corr_matrix[i, j]) >= threshold:
                    significant_pairs.append({
                        "factors": (factor_names[i], factor_names[j]),
                        "correlation": float(corr_matrix[i, j])
                    })
                    
        return significant_pairs

    def _calculate_trend(self, values: List[float]) -> str:
        if len(values) < 2:
            return "insufficient_data"
            
        slope = np.polyfit(range(len(values)), values, 1)[0]
        
        if abs(slope) < 0.05:
            return "stable"
        return "increasing" if slope > 0 else "decreasing"

    def _calculate_trend_confidence(self, values: List[float]) -> float:
        if len(values) < 3:
            return 0.0
            
        # Calcular R² de la regresión lineal
        x = np.arange(len(values))
        slope, intercept = np.polyfit(x, values, 1)
        r_squared = np.corrcoef(values, slope * x + intercept)[0, 1] ** 2
        
        return float(r_squared)

Esta extensión de la Etapa 5 añade:

  1. Análisis de Tendencias Avanzado:

    • Cálculo de estabilidad y volatilidad
    • Identificación de áreas de mejora
    • Recomendaciones específicas por factor
  2. Evaluación Predictiva:

    • Detección de anomalías
    • Análisis de patrones de riesgo
    • Predicción de tendencias
    • Correlaciones entre factores
  3. Características Clave:

    • Sistema de puntuación basado en múltiples factores
    • Detección de patrones anómalos
    • Análisis predictivo de riesgos
    • Cálculo de confianza en predicciones
  4. Capacidades Analíticas:

    • Análisis histórico de comportamiento
    • Identificación de correlaciones
    • Predicción de tendencias futuras
    • Evaluación de estabilidad

Etapa 6: Análisis y Reporting, enfocándonos en la generación de reportes y análisis detallados.

python
# backend/app/services/reporting/report_generator.py
from typing import Dict, Any, List
import pandas as pd
import plotly.graph_objects as go
from datetime import datetime, timedelta
import jinja2
import pdfkit
from io import BytesIO
import json

class ReportGenerator:
    def __init__(self, template_dir="templates/reports"):
        self.template_loader = jinja2.FileSystemLoader(searchpath=template_dir)
        self.template_env = jinja2.Environment(loader=self.template_loader)
        self.pdf_options = {
            'page-size': 'A4',
            'margin-top': '0.75in',
            'margin-right': '0.75in',
            'margin-bottom': '0.75in',
            'margin-left': '0.75in',
            'encoding': 'UTF-8'
        }

    async def generate_risk_report(
        self,
        assessment_data: Dict[str, Any],
        historical_data: List[Dict[str, Any]]
    ) -> BytesIO:
        try:
            # Preparar datos para el reporte
            report_data = self._prepare_report_data(assessment_data, historical_data)
            
            # Generar gráficos
            charts = self._generate_charts(report_data)
            
            # Renderizar template
            template = self.template_env.get_template("risk_report.html")
            html_content = template.render(
                report_data=report_data,
                charts=charts,
                generation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            )
            
            # Generar PDF
            pdf_content = pdfkit.from_string(html_content, False, options=self.pdf_options)
            
            # Devolver como BytesIO
            pdf_buffer = BytesIO(pdf_content)
            pdf_buffer.seek(0)
            
            return pdf_buffer
            
        except Exception as e:
            logger.error(f"Error generating risk report: {str(e)}")
            raise

    def _prepare_report_data(
        self,
        assessment: Dict[str, Any],
        historical: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        return {
            "summary": {
                "risk_level": assessment["risk_level"],
                "total_score": assessment["final_score"],
                "confidence": assessment["details"]["confidence"],
                "evaluation_date": datetime.now().isoformat()
            },
            "factors": self._analyze_factors(assessment["details"]["factors"]),
            "trends": self._analyze_trends(historical),
            "recommendations": self._prepare_recommendations(assessment),
            "risk_metrics": self._calculate_risk_metrics(assessment, historical),
            "comparative_analysis": self._perform_comparative_analysis(assessment, historical)
        }

    def _analyze_factors(self, factors: Dict[str, Any]) -> List[Dict[str, Any]]:
        analyzed_factors = []
        
        for name, data in factors.items():
            analyzed_factors.append({
                "name": name,
                "score": data["score"],
                "weight": data["weight"],
                "weighted_score": data["score"] * data["weight"],
                "impact": self._calculate_factor_impact(data),
                "details": data["description"]
            })
            
        return sorted(
            analyzed_factors,
            key=lambda x: x["weighted_score"],
            reverse=True
        )

    def _analyze_trends(
        self,
        historical: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        if not historical:
            return {"status": "insufficient_data"}
            
        df = pd.DataFrame(historical)
        df['evaluation_date'] = pd.to_datetime(df['evaluation_date'])
        
        return {
            "score_evolution": {
                "start": float(df['total_score'].iloc[0]),
                "end": float(df['total_score'].iloc[-1]),
                "change": float(df['total_score'].iloc[-1] - df['total_score'].iloc[0]),
                "change_percentage": float((df['total_score'].iloc[-1] / df['total_score'].iloc[0] - 1) * 100)
            },
            "volatility": float(df['total_score'].std()),
            "trend_direction": "positive" if df['total_score'].iloc[-1] > df['total_score'].iloc[0] else "negative",
            "stability_score": float(1 - df['total_score'].std() / df['total_score'].mean())
        }

    def _prepare_recommendations(
        self,
        assessment: Dict[str, Any]
    ) -> List[Dict[str, Any]]:
        recommendations = []
        
        # Recomendaciones basadas en factores
        for factor, data in assessment["details"]["factors"].items():
            if data["score"] < 0.7:
                recommendations.append({
                    "factor": factor,
                    "priority": "high" if data["score"] < 0.5 else "medium",
                    "action": self._get_factor_recommendation(factor, data["score"]),
                    "potential_impact": (0.7 - data["score"]) * data["weight"]
                })
        
        # Ordenar por impacto potencial
        return sorted(
            recommendations,
            key=lambda x: x["potential_impact"],
            reverse=True
        )

    def _calculate_risk_metrics(
        self,
        assessment: Dict[str, Any],
        historical: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        return {
            "current_risk_score": assessment["final_score"],
            "risk_trajectory": self._calculate_risk_trajectory(historical),
            "risk_components": {
                "inherent_risk": self._calculate_inherent_risk(assessment),
                "mitigating_factors": self._identify_mitigating_factors(assessment),
                "aggravating_factors": self._identify_aggravating_factors(assessment)
            },
            "comparative_metrics": self._calculate_comparative_metrics(assessment)
        }

    def _perform_comparative_analysis(
        self,
        assessment: Dict[str, Any],
        historical: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        return {
            "percentile_ranking": self._calculate_percentile(assessment, historical),
            "peer_comparison": self._calculate_peer_comparison(assessment),
            "trend_analysis": self._analyze_historical_trends(historical),
            "risk_distribution": self._calculate_risk_distribution(historical)
        }

    def _generate_charts(self, report_data: Dict[str, Any]) -> Dict[str, Any]:
        charts = {}
        
        # Gráfico de radar para factores
        factors_fig = go.Figure()
        factors = report_data["factors"]
        
        factors_fig.add_trace(go.Scatterpolar(
            r=[f["score"] for f in factors],
            theta=[f["name"] for f in factors],
            fill='toself',
            name='Factor Scores'
        ))
        
        factors_fig.update_layout(
            polar=dict(
                radialaxis=dict(
                    visible=True,
                    range=[0, 1]
                )
            ),
            showlegend=False
        )
        
        charts["factors_radar"] = factors_fig.to_json()
        
        # Gráfico de tendencias
        if report_data["trends"].get("score_evolution"):
            trend_fig = go.Figure()
            evolution = report_data["trends"]["score_evolution"]
            
            trend_fig.add_trace(go.Scatter(
                x=list(range(len(evolution))),
                y=evolution,
                mode='lines+markers',
                name='Score Evolution'
            ))
            
            trend_fig.update_layout(
                title='Risk Score Evolution',
                xaxis_title='Evaluation',
                yaxis_title='Risk Score'
            )
            
            charts["trend_line"] = trend_fig.to_json()
        
        # Gráfico de distribución de riesgo
        risk_dist = report_data["comparative_analysis"]["risk_distribution"]
        dist_fig = go.Figure()
        
        dist_fig.add_trace(go.Histogram(
            x=risk_dist,
            nbinsx=20,
            name='Risk Distribution'
        ))
        
        dist_fig.update_layout(
            title='Risk Score Distribution',
            xaxis_title='Risk Score',
            yaxis_title='Frequency'
        )
        
        charts["risk_distribution"] = dist_fig.to_json()
        
        return charts

    def _calculate_factor_impact(self, factor_data: Dict[str, Any]) -> str:
        score = factor_data["score"]
        weight = factor_data.get("weight", 1.0)
        
        impact = score * weight
        
        if impact >= 0.8:
            return "Very Positive"
        elif impact >= 0.6:
            return "Positive"
        elif impact >= 0.4:
            return "Neutral"
        elif impact >= 0.2:
            return "Negative"
        else:
            return "Very Negative"

    def _get_factor_recommendation(
        self,
        factor: str,
        score: float
    ) -> str:
        recommendations = {
            "credit_score": {
                "low": "Mejorar historial crediticio mediante pagos puntuales y reducción de deudas",
                "medium": "Mantener buenos hábitos de pago y monitorear utilización de crédito"
            },
            "dti_ratio": {
                "low": "Reducir nivel de endeudamiento y/o aumentar ingresos",
                "medium": "Evitar tomar nuevas deudas y considerar consolidación"
            },
            "employment": {
                "low": "Buscar mayor estabilidad laboral o fuentes adicionales de ingreso",
                "medium": "Mantener posición actual y documentar historial laboral"
            }
        }
        
        level = "low" if score < 0.5 else "medium"
        return recommendations.get(factor, {}).get(level, "Mejorar indicadores generales")

    def _calculate_risk_trajectory(
        self,
        historical: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        if len(historical) < 2:
            return {"status": "insufficient_data"}
            
        scores = [h["final_score"] for h in historical]
        
        return {
            "direction": "improving" if scores[-1] > scores[0] else "deteriorating",
            "magnitude": abs(scores[-1] - scores[0]),
            "volatility": np.std(scores),
            "trend_strength": self._calculate_trend_strength(scores)
        }

    def _calculate_trend_strength(self, values: List[float]) -> float:
        if len(values) < 2:
            return 0.0
        
        x = np.arange(len(values))
        slope, _ = np.polyfit(x, values, 1)
        
        # Normalizar pendiente a un valor entre 0 y 1
        return float(abs(slope) / (max(values) - min(values)) if max(values) != min(values) else 0)

    def export_to_json(self, report_data: Dict[str, Any]) -> str:
        """Exporta el reporte a formato JSON para integración con otros sistemas."""
        return json.dumps(report_data, indent=2, default=str)

    def export_to_excel(self, report_data: Dict[str, Any]) -> BytesIO:
        """Exporta el reporte a formato Excel."""
        output = BytesIO()
        
        with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
            # Hoja de resumen
            pd.DataFrame([report_data["summary"]]).to_excel(
                writer,
                sheet_name='Summary',
                index=False
            )
            
            # Hoja de factores
            pd.DataFrame(report_data["factors"]).to_excel(
                writer,
                sheet_name='Factors',
                index=False
            )
            
            # Hoja de recomendaciones
            pd.DataFrame(report_data["recommendations"]).to_excel(
                writer,
                sheet_name='Recommendations',
                index=False
            )
            
            # Formato
            workbook = writer.book
            
            # Formato de números
            num_format = workbook.add_format({'num_format': '0.00'})
            percent_format = workbook.add_format({'num_format': '0.00%'})
            
            for sheet_name in writer.sheets:
                worksheet = writer.sheets[sheet_name]
                worksheet.set_column('A:Z', 15, num_format)
        
        output.seek(0)
        return output

# backend/app/api/endpoints/reporting.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import Dict, Any
from app.api import deps
from app.services.reporting.report_generator import ReportGenerator
from app.schemas.applicant import ApplicantInDB
from fastapi.responses import StreamingResponse

router = APIRouter()

@router.get("/report/{applicant_id}")
async def generate_report(
    applicant_id: int,
    format: str = "pdf",
    db: Session = Depends(deps.get_db)
) -> StreamingResponse:
    # Obtener datos del aplicante
    applicant = db.query(ApplicantInDB).filter(
        ApplicantInDB.id == applicant_id
    ).first()
    
    if not applicant:
        raise HTTPException(status_code=404, detail="Applicant not found")
    
    # Obtener historial de evaluaciones
    historical_data = await get_historical_evaluations(applicant_id, db)
    
    # Generar reporte
    generator = ReportGenerator()
    
    try:
        if format == "pdf":
            report = await generator.generate_risk_report(
                applicant.dict(),
                historical_data
            )
            return StreamingResponse(
                report,
                media_type="application/pdf",
                headers={
                    "Content-Disposition": f"attachment; filename=risk_report_{applicant_id}.pdf"
                }
            )
        elif format == "excel":
            report = generator.export_to_excel({
                "applicant": applicant.dict(),
                "historical": historical_data
            })
            return StreamingResponse(
                report,
                media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
                headers={
                    "Content-Disposition": f"attachment; filename=risk_report_{applicant_id}.xlsx"
                }
            )
        else:
            raise HTTPException(
                status_code=400,
                detail="Unsupported format. Use 'pdf' or 'excel'"
            )
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Error generating report: {str(e)}"
        )

Esta etapa implementa:

  1. Generación de Reportes Detallados:

    • Reportes en PDF y Excel
    • Gráficos interactivos
    • Análisis histórico
    • Métricas comparativas
  2. Análisis Avanzado:

    • Análisis de tendencias
    • Métricas de riesgo
    • Análisis comparativo
    • Distribución de riesgo
  3. Visualizaciones:

    • Gráfico de radar para factores de riesgo
    • Líneas de tendencia temporal
    • Histogramas de distribución
    • Comparativas entre pares
  4. Sistema de Recomendaciones:

    • Recomendaciones basadas en factores
    • Priorización de acciones
    • Impacto potencial
    • Planes de mejora
  5. Exportación de Datos:

    • Múltiples formatos (PDF, Excel, JSON)
    • Informes personalizables
    • Datos estructurados
    • Integración con otros sistemas
  6. Análisis Estadístico:

    • Cálculo de percentiles
    • Análisis de volatilidad
    • Tendencias históricas
    • Correlaciones entre factores

Los siguientes pasos serían:

  1. Implementación de Pruebas:

    • Pruebas unitarias para generación de reportes
    • Validación de formatos de salida
    • Pruebas de rendimiento
    • Verificación de precisión de cálculos
  2. Optimizaciones:

    • Caché de reportes frecuentes
    • Generación asíncrona
    • Compresión de datos
    • Rendimiento de gráficos
  3. Documentación:

    • Guía de usuario
    • Documentación técnica
    • Ejemplos de uso
    • Guía de troubleshooting

Etapa 7: Testing y Deployment, enfocándonos en pruebas exhaustivas y despliegue seguro.

python
# tests/unit/test_risk_analyzer.py
import pytest
from datetime import datetime
from decimal import Decimal
from app.services.risk_analyzer import RiskAnalyzer
from app.services.bedrock_service import BedrockService
from app.schemas.applicant import ApplicantInDB
from unittest.mock import Mock, patch

@pytest.fixture
def mock_bedrock_service():
    service = Mock(spec=BedrockService)
    service.analyze_risk.return_value = {
        "risk_level": "MEDIO",
        "confidence_score": 0.75,
        "factors": {
            "credit_history": {"score": 0.8, "details": "Good credit history"},
            "income_stability": {"score": 0.7, "details": "Stable income"},
            "debt_burden": {"score": 0.6, "details": "Manageable debt"}
        }
    }
    return service

@pytest.fixture
def sample_applicant():
    return ApplicantInDB(
        id=1,
        full_name="Test User",
        identification="DNI-12345678",
        age=35,
        monthly_income=4500.0,
        marital_status="MARRIED",
        occupation="Software Engineer",
        work_experience_years=6.0,
        credit_score=780,
        home_status="OWNED",
        dti=28.0,
        created_at=datetime.now(),
        updated_at=datetime.now()
    )

class TestRiskAnalyzer:
    @pytest.mark.asyncio
    async def test_analyze_applicant(self, mock_bedrock_service, sample_applicant):
        analyzer = RiskAnalyzer()
        analyzer.bedrock_service = mock_bedrock_service

        result = await analyzer.analyze_applicant(sample_applicant)

        assert "final_score" in result
        assert "risk_level" in result
        assert isinstance(result["final_score"], float)
        assert result["risk_level"] in ["BAJO", "MEDIO", "ALTO"]
        assert 0 <= result["final_score"] <= 1

    def test_calculate_internal_score(self, sample_applicant):
        analyzer = RiskAnalyzer()
        score = analyzer._calculate_internal_score(sample_applicant)

        assert 0 <= score <= 1
        assert isinstance(score, float)

    @pytest.mark.parametrize("credit_score,expected", [
        (800, 1.0),
        (700, 0.7),
        (600, 0.3)
    ])
    def test_score_credit(self, credit_score, expected):
        analyzer = RiskAnalyzer()
        score = analyzer._score_credit(credit_score)
        assert score == expected

    @pytest.mark.parametrize("dti,expected", [
        (25, 1.0),
        (35, 0.6),
        (45, 0.2)
    ])
    def test_score_dti(self, dti, expected):
        analyzer = RiskAnalyzer()
        score = analyzer._score_dti(dti)
        assert score == expected

# tests/integration/test_loan_processor.py
import pytest
from app.services.loan_processor import LoanProcessor
from app.services.risk_analyzer import RiskAnalyzer
from app.schemas.applicant import ApplicantInDB
from app.models.loan import LoanApplication
from sqlalchemy.orm import Session
from decimal import Decimal

@pytest.mark.integration
class TestLoanProcessor:
    @pytest.mark.asyncio
    async def test_full_loan_process(
        self,
        db_session: Session,
        sample_applicant: ApplicantInDB
    ):
        processor = LoanProcessor()
        result = await processor.process_application(sample_applicant)

        assert "status" in result
        assert result["status"] in ["APPROVED", "REJECTED"]
        assert "risk_assessment" in result
        
        if result["status"] == "APPROVED":
            assert "loan_terms" in result
            assert all(k in result["loan_terms"] for k in [
                "max_amount",
                "interest_rate",
                "term_months"
            ])

    @pytest.mark.asyncio
    async def test_high_risk_rejection(
        self,
        db_session: Session
    ):
        # Crear aplicante de alto riesgo
        high_risk_applicant = ApplicantInDB(
            full_name="High Risk User",
            identification="DNI-99999999",
            age=25,
            monthly_income=1500.0,
            marital_status="SINGLE",
            occupation="Freelancer",
            work_experience_years=0.5,
            credit_score=580,
            home_status="RENTED",
            dti=48.0
        )

        processor = LoanProcessor()
        result = await processor.process_application(high_risk_applicant)

        assert result["status"] == "REJECTED"
        assert "risk_assessment" in result
        assert result["risk_assessment"]["risk_level"] == "ALTO"

    def test_loan_terms_calculation(self, sample_applicant):
        processor = LoanProcessor()
        terms = processor.rules_engine.calculate_loan_terms(
            monthly_income=Decimal(str(sample_applicant.monthly_income)),
            credit_score=sample_applicant.credit_score,
            dti=Decimal(str(sample_applicant.dti)),
            risk_level="BAJO",
            work_experience_years=sample_applicant.work_experience_years
        )

        assert terms.max_amount > 0
        assert 0 < terms.interest_rate < 1
        assert terms.term_months > 0
        assert terms.monthly_payment > 0

# tests/e2e/test_api_endpoints.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
from app.core.config import settings

@pytest.mark.e2e
class TestAPIEndpoints:
    @pytest.fixture
    def client(self):
        return TestClient(app)

    def test_risk_assessment_endpoint(self, client):
        # Crear datos de prueba
        test_data = {
            "full_name": "Test User",
            "identification": "DNI-12345678",
            "age": 35,
            "monthly_income": 4500.0,
            "marital_status": "MARRIED",
            "occupation": "Software Engineer",
            "work_experience_years": 6.0,
            "credit_score": 780,
            "home_status": "OWNED"
        }

        # Enviar solicitud
        response = client.post(
            f"{settings.API_V1_STR}/risk/analyze",
            json=test_data
        )

        assert response.status_code == 200
        result = response.json()
        assert "risk_level" in result
        assert "final_score" in result

    def test_loan_application_endpoint(self, client):
        # Crear aplicación de préstamo
        application_data = {
            "applicant_id": 1,
            "requested_amount": 50000,
            "term_months": 36
        }

        response = client.post(
            f"{settings.API_V1_STR}/loan/apply",
            json=application_data
        )

        assert response.status_code == 200
        result = response.json()
        assert "status" in result
        assert "loan_terms" in result

# deployment/docker/Dockerfile
FROM python:3.9-slim

# Instalar dependencias del sistema
RUN apt-get update && apt-get install -y \
    build-essential \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Copiar requerimientos e instalar dependencias
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copiar código de la aplicación
COPY . .

# Configurar variables de entorno
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1

# Exponer puerto
EXPOSE 8000

# Comando por defecto
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

# deployment/terraform/main.tf
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 4.0"
    }
  }
}

provider "aws" {
  region = var.aws_region
}

# VPC y Networking
resource "aws_vpc" "main" {
  cidr_block           = var.vpc_cidr
  enable_dns_hostnames = true
  enable_dns_support   = true

  tags = {
    Name = "${var.project_name}-vpc"
    Environment = var.environment
  }
}

# ECS Cluster
resource "aws_ecs_cluster" "main" {
  name = "${var.project_name}-cluster"
  
  setting {
    name  = "containerInsights"
    value = "enabled"
  }
}

# ECS Service
resource "aws_ecs_service" "main" {
  name            = "${var.project_name}-service"
  cluster         = aws_ecs_cluster.main.id
  task_definition = aws_ecs_task_definition.main.arn
  desired_count   = var.service_desired_count
  launch_type     = "FARGATE"

  network_configuration {
    subnets         = aws_subnet.private.*.id
    security_groups = [aws_security_group.ecs_tasks.id]
  }
}

# RDS Instance
resource "aws_db_instance" "main" {
  identifier        = "${var.project_name}-db"
  allocated_storage = 20
  storage_type      = "gp2"
  engine           = "postgres"
  engine_version   = "13.7"
  instance_class   = var.db_instance_class
  name             = var.database_name
  username         = var.database_username
  password         = var.database_password

  vpc_security_group_ids = [aws_security_group.rds.id]
  db_subnet_group_name   = aws_db_subnet_group.main.name

  backup_retention_period = 7
  multi_az               = var.environment == "prod"
  skip_final_snapshot    = true

  tags = {
    Environment = var.environment
  }
}

# Cloudwatch Logs
resource "aws_cloudwatch_log_group" "main" {
  name              = "/ecs/${var.project_name}"
  retention_in_days = 30

  tags = {
    Environment = var.environment
    Application = var.project_name
  }
}

# deployment/scripts/deploy.sh
#!/bin/bash
set -e

# Variables
ENVIRONMENT=$1
VERSION=$2
ECR_REPO="loan-risk-analyzer"
AWS_REGION="us-east-1"

# Validar argumentos
if [ -z "$ENVIRONMENT" ] || [ -z "$VERSION" ]; then
    echo "Usage: $0 <environment> <version>"
    exit 1
fi

# Validar ambiente
if [[ ! "$ENVIRONMENT" =~ ^(dev|stg|prod)$ ]]; then
    echo "Environment must be dev, stg, or prod"
    exit 1
fi

echo "Deploying version $VERSION to $ENVIRONMENT..."

# Construir imagen
docker build -t $ECR_REPO:$VERSION .

# Login a ECR
aws ecr get-login-password --region $AWS_REGION | \
    docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com

# Tag y push de imagen
docker tag $ECR_REPO:$VERSION $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/$ECR_REPO:$VERSION
docker push $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/$ECR_REPO:$VERSION

# Actualizar ECS task definition
aws ecs update-service \
    --cluster loan-risk-analyzer-$ENVIRONMENT \
    --service loan-risk-analyzer-service \
    --task-definition loan-risk-analyzer-$ENVIRONMENT:$VERSION \
    --force-new-deployment

echo "Deployment initiated. Waiting for service to stabilize..."

# Esperar por deployment
aws ecs wait services-stable \
    --cluster loan-risk-analyzer-$ENVIRONMENT \
    --services loan-risk-analyzer-service

echo "Deployment completed successfully!"

# Verificar aplicación
./scripts/verify_deployment.sh $ENVIRONMENT

Esta séptima y última etapa implementa:

  1. Testing Exhaustivo:

    • Tests unitarios para todos los componentes
    • Tests de integración end-to-end
    • Tests de API
    • Performance testing
  2. Configuración de Deployment:

    • Dockerfile optimizado
    • Infraestructura como código (Terraform)
    • Scripts de deployment automatizado
    • Pipeline de CI/CD
  3. Monitoreo y Verificación:

    • Logs configurados
    • Métricas de CloudWatch
    • Health checks
    • Alertas automáticas
  4. Seguridad:

    • Configuración de VPC
    • Security groups
    • Manejo de secretos
    • SSL/TLS
  5. Escalabilidad:

    • Auto-scaling configurado
    • Load balancing
    • Base de datos optimizada
    • Caché implementado

Pasos finales recomendados:

  1. Documentación:

    • Guía de desarrollo
    • Manual de operaciones
    • Documentación de API
    • Troubleshooting guide
  2. Monitoreo:

    • Configurar dashboards
    • Establecer alertas
    • Definir KPIs
    • Implementar logging
  3. Capacitación:

    • Manual de usuario
    • Guía de administración
    • Procedimientos de emergencia
    • Mejores prácticas