Saltar al contenido principal

Estrategias de Deployment 🚀

¿Por qué las Estrategias de Deployment Importan?

El deployment no es solo "subir código a producción". En DevOps moderno, necesitamos estrategias que minimicen riesgo, reduzcan downtime y permitan rollbacks rápidos mientras mantenemos alta velocidad de entrega.

🎯 Objetivos del Capítulo

  • Dominar diferentes estrategias de deployment
  • Implementar deployments zero-downtime
  • Configurar rollbacks automáticos
  • Manejar deployments de bases de datos
  • Optimizar para confiabilidad y velocidad

📊 Panorama de Estrategias

🔄 1. Rolling Deployment

Concepto

Reemplaza instancias gradualmente, manteniendo servicio disponible durante todo el proceso.

Implementación con Kubernetes

# rolling-deployment.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp-rolling
labels:
app: myapp
spec:
replicas: 6
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1 # Máximo 1 pod no disponible
maxSurge: 2 # Máximo 2 pods extra durante update
selector:
matchLabels:
app: myapp
template:
metadata:
labels:
app: myapp
spec:
containers:
- name: myapp
image: myapp:v2.0.0
ports:
- containerPort: 8080
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
# Health checks críticos para rolling
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 2

---
apiVersion: v1
kind: Service
metadata:
name: myapp-service
spec:
selector:
app: myapp
ports:
- port: 80
targetPort: 8080
type: LoadBalancer

Script de Rolling Deployment con Validación

#!/bin/bash
# rolling-deploy.sh

set -e

APP_NAME="myapp"
NEW_VERSION=$1
NAMESPACE=${2:-default}

if [ -z "$NEW_VERSION" ]; then
echo "Usage: $0 <new-version> [namespace]"
exit 1
fi

echo "🚀 Starting rolling deployment of $APP_NAME:$NEW_VERSION"

# 1. Update deployment image
kubectl set image deployment/$APP_NAME \
$APP_NAME=$APP_NAME:$NEW_VERSION \
--namespace=$NAMESPACE

# 2. Monitor rollout status
echo "📊 Monitoring rollout status..."
kubectl rollout status deployment/$APP_NAME \
--namespace=$NAMESPACE \
--timeout=600s

# 3. Health check validation
echo "🏥 Running health checks..."
for i in {1..30}; do
if kubectl run health-check-$RANDOM \
--image=curlimages/curl \
--rm -i --restart=Never \
--namespace=$NAMESPACE \
-- curl -f http://$APP_NAME-service/health; then
echo "✅ Health check passed"
break
fi

if [ $i -eq 30 ]; then
echo "❌ Health checks failed, rolling back"
kubectl rollout undo deployment/$APP_NAME --namespace=$NAMESPACE
exit 1
fi

echo "⏳ Waiting for health check... ($i/30)"
sleep 10
done

# 4. Smoke tests
echo "🧪 Running smoke tests..."
kubectl run smoke-test-$RANDOM \
--image=$APP_NAME-tests:$NEW_VERSION \
--rm -i --restart=Never \
--namespace=$NAMESPACE \
--env="TARGET_URL=http://$APP_NAME-service" \
--command -- /run-smoke-tests.sh

if [ $? -eq 0 ]; then
echo "✅ Rolling deployment completed successfully"

# 5. Update monitoring annotations
kubectl annotate deployment/$APP_NAME \
deployment.kubernetes.io/revision=$(date +%s) \
--namespace=$NAMESPACE
else
echo "❌ Smoke tests failed, rolling back"
kubectl rollout undo deployment/$APP_NAME --namespace=$NAMESPACE
exit 1
fi

echo "🎉 Deployment of $APP_NAME:$NEW_VERSION completed!"

🔵🟢 2. Blue-Green Deployment

Concepto

Mantiene dos entornos idénticos. Cambia tráfico instantáneamente entre "blue" (actual) y "green" (nuevo).

Implementación con AWS ALB

# blue-green-infrastructure.yml
apiVersion: v1
kind: ConfigMap
metadata:
name: blue-green-config
data:
CURRENT_COLOR: "blue"
TRAFFIC_SPLIT: "100:0" # blue:green

---
# Blue deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp-blue
labels:
app: myapp
color: blue
spec:
replicas: 3
selector:
matchLabels:
app: myapp
color: blue
template:
metadata:
labels:
app: myapp
color: blue
spec:
containers:
- name: myapp
image: myapp:v1.9.0 # Current version
ports:
- containerPort: 8080

---
# Green deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp-green
labels:
app: myapp
color: green
spec:
replicas: 3
selector:
matchLabels:
app: myapp
color: green
template:
metadata:
labels:
app: myapp
color: green
spec:
containers:
- name: myapp
image: myapp:v2.0.0 # New version
ports:
- containerPort: 8080

---
# Services for each color
apiVersion: v1
kind: Service
metadata:
name: myapp-blue-service
spec:
selector:
app: myapp
color: blue
ports:
- port: 80
targetPort: 8080

---
apiVersion: v1
kind: Service
metadata:
name: myapp-green-service
spec:
selector:
app: myapp
color: green
ports:
- port: 80
targetPort: 8080

---
# Active service (points to current color)
apiVersion: v1
kind: Service
metadata:
name: myapp-active-service
spec:
selector:
app: myapp
color: blue # Will be updated during deployment
ports:
- port: 80
targetPort: 8080

Blue-Green Deployment Script

#!/bin/bash
# blue-green-deploy.sh

set -e

NEW_VERSION=$1
NAMESPACE=${2:-default}

if [ -z "$NEW_VERSION" ]; then
echo "Usage: $0 <new-version> [namespace]"
exit 1
fi

# Get current active color
CURRENT_COLOR=$(kubectl get service myapp-active-service \
-o jsonpath='{.spec.selector.color}' \
--namespace=$NAMESPACE)

if [ "$CURRENT_COLOR" = "blue" ]; then
DEPLOY_COLOR="green"
NEW_SERVICE="myapp-green-service"
else
DEPLOY_COLOR="blue"
NEW_SERVICE="myapp-blue-service"
fi

echo "🎨 Current active color: $CURRENT_COLOR"
echo "🚀 Deploying version $NEW_VERSION to $DEPLOY_COLOR environment"

# 1. Deploy to inactive color
kubectl set image deployment/myapp-$DEPLOY_COLOR \
myapp=myapp:$NEW_VERSION \
--namespace=$NAMESPACE

# 2. Wait for deployment
kubectl rollout status deployment/myapp-$DEPLOY_COLOR \
--namespace=$NAMESPACE \
--timeout=600s

# 3. Comprehensive health checks on new deployment
echo "🏥 Running comprehensive health checks on $DEPLOY_COLOR..."

# Health check
for i in {1..60}; do
if kubectl run health-check-$RANDOM \
--image=curlimages/curl \
--rm -i --restart=Never \
--namespace=$NAMESPACE \
-- curl -f http://$NEW_SERVICE/health; then
echo "✅ Health check passed"
break
fi

if [ $i -eq 60 ]; then
echo "❌ Health checks failed"
exit 1
fi

sleep 5
done

# 4. Database migration check (if needed)
echo "🗃️ Checking database compatibility..."
kubectl run db-check-$RANDOM \
--image=myapp:$NEW_VERSION \
--rm -i --restart=Never \
--namespace=$NAMESPACE \
--command -- python manage.py check --deploy

# 5. Performance baseline test
echo "⚡ Running performance baseline..."
kubectl run perf-test-$RANDOM \
--image=k6-runner \
--rm -i --restart=Never \
--namespace=$NAMESPACE \
--env="TARGET_URL=http://$NEW_SERVICE" \
-- k6 run /tests/baseline.js

# 6. Full smoke test suite
echo "🧪 Running full smoke test suite..."
kubectl run smoke-tests-$RANDOM \
--image=myapp-tests:$NEW_VERSION \
--rm -i --restart=Never \
--namespace=$NAMESPACE \
--env="TARGET_URL=http://$NEW_SERVICE" \
--command -- /run-full-smoke-tests.sh

if [ $? -ne 0 ]; then
echo "❌ Smoke tests failed, keeping $CURRENT_COLOR active"
exit 1
fi

# 7. Switch traffic to new color
echo "🔄 Switching traffic from $CURRENT_COLOR to $DEPLOY_COLOR"

kubectl patch service myapp-active-service \
-p '{"spec":{"selector":{"color":"'$DEPLOY_COLOR'"}}}' \
--namespace=$NAMESPACE

# 8. Monitor metrics for 5 minutes
echo "📊 Monitoring metrics for 5 minutes..."
sleep 300

# 9. Check error rates and performance
ERROR_RATE=$(kubectl exec deployment/prometheus -- \
curl -s "http://localhost:9090/api/v1/query?query=rate(http_requests_total{status=~'5..'}[5m])/rate(http_requests_total[5m])" | \
jq -r '.data.result[0].value[1] // 0')

RESPONSE_TIME=$(kubectl exec deployment/prometheus -- \
curl -s "http://localhost:9090/api/v1/query?query=histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))" | \
jq -r '.data.result[0].value[1] // 0')

echo "📈 Current error rate: $ERROR_RATE"
echo "⏱️ 95th percentile response time: ${RESPONSE_TIME}s"

# 10. Rollback if metrics are bad
if (( $(echo "$ERROR_RATE > 0.01" | bc -l) )) || (( $(echo "$RESPONSE_TIME > 1.0" | bc -l) )); then
echo "❌ Metrics degraded, rolling back to $CURRENT_COLOR"
kubectl patch service myapp-active-service \
-p '{"spec":{"selector":{"color":"'$CURRENT_COLOR'"}}}' \
--namespace=$NAMESPACE
exit 1
fi

# 11. Update config map
kubectl patch configmap blue-green-config \
--patch '{"data":{"CURRENT_COLOR":"'$DEPLOY_COLOR'","LAST_DEPLOYMENT":"'$(date -u +"%Y-%m-%dT%H:%M:%SZ")'"}}' \
--namespace=$NAMESPACE

echo "✅ Blue-Green deployment completed successfully!"
echo "🎉 Version $NEW_VERSION is now live on $DEPLOY_COLOR environment"

# 12. Optional: Scale down old environment after delay
read -p "Scale down $CURRENT_COLOR environment? (y/N): " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
kubectl scale deployment myapp-$CURRENT_COLOR --replicas=0 --namespace=$NAMESPACE
echo "📉 Scaled down $CURRENT_COLOR environment"
fi

🐤 3. Canary Deployment

Concepto

Despliega nueva versión a un pequeño porcentaje de usuarios, incrementando gradualmente basado en métricas.

Implementación con Argo Rollouts

# canary-rollout.yml
apiVersion: argoproj.io/v1alpha1
kind: Rollout
metadata:
name: myapp-canary
spec:
replicas: 10
strategy:
canary:
# Services
canaryService: myapp-canary-service
stableService: myapp-stable-service

# Traffic routing
trafficRouting:
nginx:
stableIngress: myapp-stable-ingress
annotationPrefix: nginx.ingress.kubernetes.io
additionalIngressAnnotations:
canary-by-header: X-Canary
canary-by-header-value: "true"

# Analysis and steps
steps:
- setWeight: 5
- pause: {duration: 2m}

- analysis:
templates:
- templateName: success-rate-analysis
- templateName: latency-analysis
args:
- name: service-name
value: myapp-canary-service

- setWeight: 10
- pause: {duration: 5m}

- analysis:
templates:
- templateName: success-rate-analysis
- templateName: latency-analysis
- templateName: cpu-usage-analysis
args:
- name: service-name
value: myapp-canary-service

- setWeight: 25
- pause: {duration: 10m}

- setWeight: 50
- pause: {duration: 10m}

- setWeight: 75
- pause: {duration: 5m}

selector:
matchLabels:
app: myapp
template:
metadata:
labels:
app: myapp
spec:
containers:
- name: myapp
image: myapp:v2.0.0
ports:
- containerPort: 8080
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"

---
# Analysis Templates
apiVersion: argoproj.io/v1alpha1
kind: AnalysisTemplate
metadata:
name: success-rate-analysis
spec:
args:
- name: service-name
metrics:
- name: success-rate
interval: 60s
count: 5
successCondition: result[0] >= 0.95
failureLimit: 3
provider:
prometheus:
address: http://prometheus:9090
query: |
sum(rate(http_requests_total{service="{{args.service-name}}",status!~"5.."}[2m])) /
sum(rate(http_requests_total{service="{{args.service-name}}"}[2m]))

---
apiVersion: argoproj.io/v1alpha1
kind: AnalysisTemplate
metadata:
name: latency-analysis
spec:
args:
- name: service-name
metrics:
- name: latency
interval: 60s
count: 5
successCondition: result[0] <= 0.5
failureLimit: 2
provider:
prometheus:
address: http://prometheus:9090
query: |
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket{service="{{args.service-name}}"}[2m])) by (le)
)

---
apiVersion: argoproj.io/v1alpha1
kind: AnalysisTemplate
metadata:
name: cpu-usage-analysis
spec:
args:
- name: service-name
metrics:
- name: cpu-usage
interval: 60s
count: 5
successCondition: result[0] <= 0.8
failureLimit: 3
provider:
prometheus:
address: http://prometheus:9090
query: |
sum(rate(container_cpu_usage_seconds_total{pod=~"{{args.service-name}}-.*"}[2m])) /
sum(container_spec_cpu_quota{pod=~"{{args.service-name}}-.*"} / container_spec_cpu_period{pod=~"{{args.service-name}}-.*"})

Canary con Feature Flags

# feature_flags.py
import random
from enum import Enum
from typing import Dict, Any

class RolloutStrategy(Enum):
PERCENTAGE = "percentage"
USER_ATTRIBUTE = "user_attribute"
GEOGRAPHY = "geography"
TIME_BASED = "time_based"

class FeatureFlagManager:
def __init__(self):
self.flags = {
'new_payment_system': {
'enabled': True,
'strategy': RolloutStrategy.PERCENTAGE,
'percentage': 10, # 10% of users
'rollout_start': '2024-01-15T00:00:00Z',
'criteria': {
'user_segments': ['premium', 'beta_testers'],
'countries': ['US', 'CA', 'UK'],
'min_account_age_days': 30
}
},
'enhanced_search': {
'enabled': True,
'strategy': RolloutStrategy.USER_ATTRIBUTE,
'percentage': 25,
'criteria': {
'user_segments': ['premium']
}
}
}

def is_feature_enabled(self, flag_name: str, user_context: Dict[str, Any]) -> bool:
"""Determinar si feature está habilitada para usuario específico"""
flag = self.flags.get(flag_name)
if not flag or not flag['enabled']:
return False

strategy = flag['strategy']

if strategy == RolloutStrategy.PERCENTAGE:
return self._percentage_rollout(flag, user_context)
elif strategy == RolloutStrategy.USER_ATTRIBUTE:
return self._user_attribute_rollout(flag, user_context)
elif strategy == RolloutStrategy.GEOGRAPHY:
return self._geography_rollout(flag, user_context)

return False

def _percentage_rollout(self, flag: Dict, user_context: Dict) -> bool:
"""Rollout basado en porcentaje"""
user_id = user_context.get('user_id', '')
# Hash consistente basado en user_id para mismo usuario siempre tenga mismo resultado
user_hash = hash(f"{flag}_{user_id}") % 100
return user_hash < flag['percentage']

def _user_attribute_rollout(self, flag: Dict, user_context: Dict) -> bool:
"""Rollout basado en atributos de usuario"""
user_segment = user_context.get('segment')
allowed_segments = flag['criteria'].get('user_segments', [])

if user_segment in allowed_segments:
return self._percentage_rollout(flag, user_context)

return False

def _geography_rollout(self, flag: Dict, user_context: Dict) -> bool:
"""Rollout basado en geografía"""
user_country = user_context.get('country')
allowed_countries = flag['criteria'].get('countries', [])

return user_country in allowed_countries

# Uso en aplicación
feature_flags = FeatureFlagManager()

def process_payment(user_context, payment_data):
if feature_flags.is_feature_enabled('new_payment_system', user_context):
return new_payment_processor.process(payment_data)
else:
return legacy_payment_processor.process(payment_data)

# Middleware para tracking
class FeatureFlagMiddleware:
def __init__(self, app):
self.app = app

def __call__(self, environ, start_response):
# Añadir headers para identificar feature flags activas
user_context = self.extract_user_context(environ)

active_flags = []
for flag_name in feature_flags.flags.keys():
if feature_flags.is_feature_enabled(flag_name, user_context):
active_flags.append(flag_name)

# Añadir header para debugging/monitoreo
environ['HTTP_X_ACTIVE_FLAGS'] = ','.join(active_flags)

return self.app(environ, start_response)

🔍 4. A/B Testing Deployment

Implementación con Istio

# ab-testing-istio.yml
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: myapp-ab-testing
spec:
hosts:
- myapp.company.com
http:
- match:
- headers:
x-user-segment:
exact: premium
fault:
delay:
percentage:
value: 0.1
fixedDelay: 100ms
route:
- destination:
host: myapp-service
subset: version-b
weight: 50
- destination:
host: myapp-service
subset: version-a
weight: 50

- match:
- headers:
x-user-segment:
exact: beta
route:
- destination:
host: myapp-service
subset: version-b
weight: 100

- route: # Default route
- destination:
host: myapp-service
subset: version-a
weight: 90
- destination:
host: myapp-service
subset: version-b
weight: 10

---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: myapp-destination
spec:
host: myapp-service
subsets:
- name: version-a
labels:
version: v1.9.0
- name: version-b
labels:
version: v2.0.0
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 50
maxRequestsPerConnection: 10
outlierDetection:
consecutive5xxErrors: 3
interval: 30s
baseEjectionTime: 30s

A/B Testing Analytics

# ab_testing_analytics.py
import json
from datetime import datetime, timedelta
from typing import Dict, List
import pandas as pd
import numpy as np
from scipy import stats

class ABTestAnalyzer:
def __init__(self):
self.experiments = {}

def create_experiment(self, experiment_id: str, config: Dict):
"""Crear nuevo experimento A/B"""
self.experiments[experiment_id] = {
'config': config,
'start_date': datetime.now(),
'metrics': {
'control': {'conversions': 0, 'visitors': 0, 'revenue': 0},
'treatment': {'conversions': 0, 'visitors': 0, 'revenue': 0}
},
'events': []
}

def track_event(self, experiment_id: str, variant: str, event_type: str, value: float = 1.0):
"""Trackear evento en experimento"""
if experiment_id not in self.experiments:
return

event = {
'timestamp': datetime.now().isoformat(),
'variant': variant,
'event_type': event_type,
'value': value
}

self.experiments[experiment_id]['events'].append(event)

# Actualizar métricas agregadas
metrics = self.experiments[experiment_id]['metrics'][variant]

if event_type == 'visitor':
metrics['visitors'] += 1
elif event_type == 'conversion':
metrics['conversions'] += 1
elif event_type == 'revenue':
metrics['revenue'] += value

def calculate_statistical_significance(self, experiment_id: str) -> Dict:
"""Calcular significancia estadística"""
experiment = self.experiments.get(experiment_id)
if not experiment:
return {}

control = experiment['metrics']['control']
treatment = experiment['metrics']['treatment']

# Conversion rate test
control_rate = control['conversions'] / max(control['visitors'], 1)
treatment_rate = treatment['conversions'] / max(treatment['visitors'], 1)

# Z-test para proporciones
n1, n2 = control['visitors'], treatment['visitors']
p1, p2 = control_rate, treatment_rate

if n1 == 0 or n2 == 0:
return {'significance': 'insufficient_data'}

# Pooled proportion
p_pool = (control['conversions'] + treatment['conversions']) / (n1 + n2)
se = np.sqrt(p_pool * (1 - p_pool) * (1/n1 + 1/n2))

if se == 0:
return {'significance': 'no_variance'}

z_score = (p2 - p1) / se
p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))

# Revenue test (t-test)
control_revenues = [e['value'] for e in experiment['events']
if e['variant'] == 'control' and e['event_type'] == 'revenue']
treatment_revenues = [e['value'] for e in experiment['events']
if e['variant'] == 'treatment' and e['event_type'] == 'revenue']

revenue_significance = None
if len(control_revenues) > 1 and len(treatment_revenues) > 1:
t_stat, revenue_p_value = stats.ttest_ind(control_revenues, treatment_revenues)
revenue_significance = revenue_p_value < 0.05

return {
'conversion_rate': {
'control': control_rate,
'treatment': treatment_rate,
'lift': ((treatment_rate - control_rate) / control_rate * 100) if control_rate > 0 else 0,
'p_value': p_value,
'significant': p_value < 0.05,
'confidence_level': (1 - p_value) * 100
},
'revenue': {
'control_avg': np.mean(control_revenues) if control_revenues else 0,
'treatment_avg': np.mean(treatment_revenues) if treatment_revenues else 0,
'significant': revenue_significance
},
'sample_size': {
'control': n1,
'treatment': n2,
'total': n1 + n2
}
}

def get_experiment_report(self, experiment_id: str) -> Dict:
"""Generar reporte completo del experimento"""
stats = self.calculate_statistical_significance(experiment_id)
experiment = self.experiments[experiment_id]

duration = datetime.now() - experiment['start_date']

return {
'experiment_id': experiment_id,
'duration_days': duration.days,
'config': experiment['config'],
'statistics': stats,
'recommendation': self._get_recommendation(stats),
'raw_metrics': experiment['metrics']
}

def _get_recommendation(self, stats: Dict) -> str:
"""Generar recomendación basada en estadísticas"""
if 'conversion_rate' not in stats:
return "Insufficient data for recommendation"

conv_stats = stats['conversion_rate']
sample_size = stats['sample_size']['total']

if sample_size < 1000:
return "Continue test - need more sample size"

if conv_stats['significant']:
if conv_stats['lift'] > 5:
return "SHIP - Treatment shows significant improvement"
elif conv_stats['lift'] < -5:
return "STOP - Treatment shows significant degradation"
else:
return "NEUTRAL - Significant but minimal impact"
else:
return "Continue test - no significant difference yet"

# Uso en aplicación
ab_analyzer = ABTestAnalyzer()

# Crear experimento
ab_analyzer.create_experiment('checkout_flow_v2', {
'description': 'New checkout flow with one-click purchase',
'allocation': {'control': 50, 'treatment': 50},
'target_metric': 'conversion_rate'
})

# Trackear eventos
def track_user_interaction(user_id, experiment_id, action, value=1.0):
# Determinar variant basado en user_id
variant = 'treatment' if hash(user_id) % 2 else 'control'

ab_analyzer.track_event(experiment_id, variant, action, value)

# Log para análisis posterior
print(f"User {user_id} in {variant} performed {action}")

🗃️ 5. Database Deployment Strategies

Blue-Green con Database

-- migration-strategy.sql

-- 1. Backward compatible changes (safe for rolling/blue-green)
-- ✅ Add new columns with default values
ALTER TABLE users ADD COLUMN preferences JSONB DEFAULT '{}';

-- ✅ Add new tables
CREATE TABLE user_notifications (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
message TEXT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);

-- ✅ Add indexes
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);

-- 2. Forward-only changes (require careful coordination)
-- ⚠️ Rename columns (requires app changes)
-- Step 1: Add new column
ALTER TABLE users ADD COLUMN full_name VARCHAR(255);

-- Step 2: Migrate data
UPDATE users SET full_name = first_name || ' ' || last_name;

-- Step 3: Update application to use new column
-- Step 4: Drop old columns (in next deployment)
-- ALTER TABLE users DROP COLUMN first_name, DROP COLUMN last_name;

-- 3. Breaking changes (require multi-phase deployment)
-- ❌ Drop columns/tables (data loss risk)
-- ❌ Change data types (compatibility issues)
-- ❌ Add NOT NULL constraints to existing columns

Database Migration Script

#!/usr/bin/env python3
# db_deployment.py

import psycopg2
import time
import sys
from typing import List, Dict

class DatabaseDeployment:
def __init__(self, connection_string: str):
self.conn_string = connection_string
self.connection = None

def connect(self):
"""Establecer conexión a base de datos"""
self.connection = psycopg2.connect(self.conn_string)
self.connection.autocommit = False

def execute_migration_phase(self, phase: str, migrations: List[str]) -> bool:
"""Ejecutar fase de migración"""
print(f"🗃️ Executing migration phase: {phase}")

try:
cursor = self.connection.cursor()

for migration in migrations:
print(f" 📝 Executing: {migration[:50]}...")
cursor.execute(migration)

# Verificar integridad después de cada fase
self._verify_database_integrity(cursor)

self.connection.commit()
print(f"✅ Phase {phase} completed successfully")
return True

except Exception as e:
print(f"❌ Phase {phase} failed: {e}")
self.connection.rollback()
return False

def _verify_database_integrity(self, cursor):
"""Verificar integridad de base de datos"""
# Check foreign key constraints
cursor.execute("""
SELECT conname, conrelid::regclass, confrelid::regclass
FROM pg_constraint
WHERE contype = 'f' AND NOT convalidated
""")

invalid_constraints = cursor.fetchall()
if invalid_constraints:
raise Exception(f"Invalid foreign key constraints: {invalid_constraints}")

# Check for long-running queries that might be blocked
cursor.execute("""
SELECT query, state, now() - query_start as duration
FROM pg_stat_activity
WHERE state = 'active' AND now() - query_start > interval '30 seconds'
""")

long_queries = cursor.fetchall()
if long_queries:
print(f"⚠️ Warning: Long running queries detected: {len(long_queries)}")

def blue_green_database_deployment(self, migrations: Dict[str, List[str]]) -> bool:
"""Deployment de base de datos compatible con blue-green"""

# Phase 1: Backward compatible changes
if not self.execute_migration_phase("backward_compatible",
migrations.get("backward_compatible", [])):
return False

# Wait for application deployment
print("⏳ Waiting for application deployment...")
time.sleep(60) # En producción, esto sería una verificación de health checks

# Phase 2: Data migration
if not self.execute_migration_phase("data_migration",
migrations.get("data_migration", [])):
return False

# Phase 3: Cleanup (after confirming new version works)
print("🧹 Cleanup phase will be executed in next deployment window")

return True

def rolling_database_deployment(self, migrations: List[str]) -> bool:
"""Deployment de base de datos compatible con rolling updates"""

# Solo cambios backward-compatible
safe_migrations = []
risky_migrations = []

for migration in migrations:
if self._is_safe_for_rolling(migration):
safe_migrations.append(migration)
else:
risky_migrations.append(migration)

if risky_migrations:
print(f"❌ Found {len(risky_migrations)} risky migrations for rolling deployment")
for migration in risky_migrations:
print(f" - {migration[:100]}...")
return False

return self.execute_migration_phase("rolling_safe", safe_migrations)

def _is_safe_for_rolling(self, migration: str) -> bool:
"""Verificar si migración es segura para rolling deployment"""
unsafe_patterns = [
"DROP COLUMN",
"DROP TABLE",
"ALTER COLUMN",
"ADD CONSTRAINT",
"NOT NULL"
]

migration_upper = migration.upper()
return not any(pattern in migration_upper for pattern in unsafe_patterns)

# Ejemplo de uso
if __name__ == "__main__":
db_deployment = DatabaseDeployment("postgresql://user:pass@localhost/myapp")
db_deployment.connect()

migrations = {
"backward_compatible": [
"ALTER TABLE users ADD COLUMN preferences JSONB DEFAULT '{}'",
"CREATE INDEX CONCURRENTLY idx_users_preferences ON users USING GIN (preferences)",
"""CREATE TABLE user_activities (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
activity_type VARCHAR(50) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
)"""
],
"data_migration": [
"""UPDATE users SET preferences = '{"theme": "light", "notifications": true}'
WHERE preferences = '{}'""",
]
}

if db_deployment.blue_green_database_deployment(migrations):
print("🎉 Database deployment completed successfully!")
sys.exit(0)
else:
print("💥 Database deployment failed!")
sys.exit(1)

📊 Monitoring y Rollback Automático

Automated Rollback System

# automated_rollback.py
import time
import requests
from typing import Dict, List
import logging

class DeploymentMonitor:
def __init__(self, config: Dict):
self.config = config
self.metrics_endpoint = config['metrics_endpoint']
self.rollback_threshold = config['rollback_threshold']
self.monitoring_duration = config['monitoring_duration']

def monitor_deployment(self, deployment_id: str) -> bool:
"""Monitorear deployment y decidir si hacer rollback"""

print(f"🔍 Monitoring deployment {deployment_id} for {self.monitoring_duration} seconds")

start_time = time.time()
samples = []

while time.time() - start_time < self.monitoring_duration:
metrics = self._collect_metrics()
samples.append(metrics)

# Check immediate failure conditions
if self._should_immediate_rollback(metrics):
print("🚨 Immediate rollback triggered!")
return False

time.sleep(30) # Sample every 30 seconds

# Analyze overall performance
return self._analyze_deployment_success(samples)

def _collect_metrics(self) -> Dict:
"""Recopilar métricas del sistema"""
try:
# Error rate
error_rate_query = """
rate(http_requests_total{status=~"5.."}[5m]) /
rate(http_requests_total[5m])
"""
error_rate = self._query_prometheus(error_rate_query)

# Response time
latency_query = """
histogram_quantile(0.95,
rate(http_request_duration_seconds_bucket[5m])
)
"""
latency = self._query_prometheus(latency_query)

# CPU usage
cpu_query = """
avg(rate(container_cpu_usage_seconds_total[5m])) * 100
"""
cpu_usage = self._query_prometheus(cpu_query)

# Memory usage
memory_query = """
avg(container_memory_usage_bytes / container_spec_memory_limit_bytes) * 100
"""
memory_usage = self._query_prometheus(memory_query)

return {
'timestamp': time.time(),
'error_rate': error_rate,
'latency_p95': latency,
'cpu_usage': cpu_usage,
'memory_usage': memory_usage
}

except Exception as e:
logging.error(f"Failed to collect metrics: {e}")
return {
'timestamp': time.time(),
'error_rate': float('inf'), # Trigger rollback on metric failure
'latency_p95': float('inf'),
'cpu_usage': 0,
'memory_usage': 0
}

def _query_prometheus(self, query: str) -> float:
"""Query Prometheus y retornar valor numérico"""
response = requests.get(
f"{self.metrics_endpoint}/api/v1/query",
params={'query': query}
)

data = response.json()
if data['data']['result']:
return float(data['data']['result'][0]['value'][1])
return 0.0

def _should_immediate_rollback(self, metrics: Dict) -> bool:
"""Verificar condiciones de rollback inmediato"""
conditions = [
metrics['error_rate'] > 0.1, # >10% error rate
metrics['latency_p95'] > 5.0, # >5 second latency
metrics['cpu_usage'] > 90, # >90% CPU
metrics['memory_usage'] > 95 # >95% memory
]

return any(conditions)

def _analyze_deployment_success(self, samples: List[Dict]) -> bool:
"""Analizar muestras para determinar éxito del deployment"""
if not samples:
return False

# Calculate averages
avg_error_rate = sum(s['error_rate'] for s in samples) / len(samples)
avg_latency = sum(s['latency_p95'] for s in samples) / len(samples)

# Success criteria
success_criteria = [
avg_error_rate < self.rollback_threshold['error_rate'],
avg_latency < self.rollback_threshold['latency'],
len([s for s in samples if s['error_rate'] > 0.05]) < len(samples) * 0.2 # <20% of samples with >5% error
]

success = all(success_criteria)

print(f"📊 Deployment analysis:")
print(f" Average error rate: {avg_error_rate:.3f}")
print(f" Average latency: {avg_latency:.3f}s")
print(f" Success: {success}")

return success

# Integración con pipeline de deployment
def deploy_with_monitoring(deployment_config: Dict) -> bool:
"""Deployment con monitoreo automático y rollback"""

# 1. Execute deployment
deployment_id = execute_deployment(deployment_config)

# 2. Monitor deployment
monitor = DeploymentMonitor({
'metrics_endpoint': 'http://prometheus:9090',
'rollback_threshold': {
'error_rate': 0.02, # 2%
'latency': 1.0 # 1 second
},
'monitoring_duration': 300 # 5 minutes
})

success = monitor.monitor_deployment(deployment_id)

# 3. Rollback if needed
if not success:
print("🔄 Rolling back deployment...")
rollback_deployment(deployment_id)
return False

print("✅ Deployment successful and stable!")
return True

🎯 Ejercicios Prácticos

Ejercicio 1: Implementar Rolling Deployment

  1. Configura rolling deployment en Kubernetes
  2. Implementa health checks apropiados
  3. Crea script de validación post-deployment
  4. Prueba rollback manual

Ejercicio 2: Blue-Green con Database

  1. Diseña estrategia blue-green para app con DB
  2. Implementa migraciones backward-compatible
  3. Crea proceso de validación de ambos entornos
  4. Automatiza switch de tráfico

Ejercicio 3: Canary con Métricas

  1. Configura canary deployment con Argo Rollouts
  2. Define métricas de éxito apropiadas
  3. Implementa rollback automático basado en métricas
  4. Documenta criterios de promoción

✅ Deployment Strategy Checklist

  • Health checks configurados apropiadamente
  • Rollback procedure documentado y automatizado
  • Database migrations backward-compatible
  • Monitoring y alertas configuradas
  • Performance baselines establecidas
  • Security scanning integrado
  • Smoke tests automatizados
  • Documentation actualizada
  • Team training completado

🔗 Recursos Adicionales


💡 Recuerda: La estrategia de deployment debe alinearse con tus SLAs y tolerancia al riesgo. No existe una estrategia única que funcione para todos los casos.