Saltar al contenido principal

402. Microservicios

Índice

Introducción

Los microservicios representan un enfoque arquitectónico que estructura una aplicación como una colección de servicios débilmente acoplados.

¿Qué son los Microservicios?

Ventajas y Desventajas

Ventajas:

  • Escalabilidad independiente
  • Tecnologías heterogéneas
  • Desarrollo independiente por equipos
  • Resiliencia y tolerancia a fallos
  • Deployment independiente

Desventajas:

  • Complejidad operacional
  • Latencia de red
  • Consistencia de datos
  • Testing complejo
  • Monitoreo distribuido

Arquitectura

Principios de Diseño

# Ejemplo de definición de servicios
services:
user-service:
domain: user-management
responsibilities:
- user-authentication
- user-profile-management
- user-preferences
data-ownership:
- users
- profiles

order-service:
domain: order-management
responsibilities:
- order-creation
- order-tracking
- order-fulfillment
data-ownership:
- orders
- order-items

Domain-Driven Design (DDD)

# Ejemplo de agregado en Python
class Order:
def __init__(self, customer_id: str):
self.id = str(uuid.uuid4())
self.customer_id = customer_id
self.items = []
self.status = OrderStatus.PENDING
self.created_at = datetime.utcnow()

def add_item(self, product_id: str, quantity: int, price: float):
"""Añadir item al pedido"""
if self.status != OrderStatus.PENDING:
raise InvalidOperationError("Cannot modify confirmed order")

item = OrderItem(product_id, quantity, price)
self.items.append(item)
self._publish_event(ItemAddedEvent(self.id, item))

def confirm(self):
"""Confirmar pedido"""
if not self.items:
raise InvalidOperationError("Cannot confirm empty order")

self.status = OrderStatus.CONFIRMED
self._publish_event(OrderConfirmedEvent(self.id))

Patrones de Diseño

1. Database per Service

version: '3.8'
services:
user-service:
image: user-service:latest
environment:
- DATABASE_URL=postgres://user:pass@user-db:5432/userdb
depends_on:
- user-db

user-db:
image: postgres:13
environment:
POSTGRES_DB: userdb
POSTGRES_USER: user
POSTGRES_PASSWORD: pass

order-service:
image: order-service:latest
environment:
- DATABASE_URL=postgres://order:pass@order-db:5432/orderdb
depends_on:
- order-db

order-db:
image: postgres:13
environment:
POSTGRES_DB: orderdb
POSTGRES_USER: order
POSTGRES_PASSWORD: pass

2. Saga Pattern

# Saga para procesamiento de pedidos
class OrderSaga:
def __init__(self):
self.steps = [
self.validate_order,
self.reserve_inventory,
self.process_payment,
self.ship_order
]
self.compensation_steps = [
self.cancel_shipping,
self.refund_payment,
self.release_inventory,
self.cancel_order
]

async def execute(self, order_id: str):
completed_steps = []

try:
for step in self.steps:
await step(order_id)
completed_steps.append(step)

except Exception as e:
# Ejecutar compensación
for step in reversed(completed_steps):
compensation_index = self.steps.index(step)
await self.compensation_steps[compensation_index](order_id)
raise e

3. Circuit Breaker

import asyncio
from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"

class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED

async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit breaker is open")

try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e

def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED

def _on_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.now()

if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN

Comunicación entre Servicios

Comunicación Síncrona (HTTP/REST)

# Cliente HTTP con retry y circuit breaker
import aiohttp
import asyncio

class ServiceClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.circuit_breaker = CircuitBreaker()

async def get_user(self, user_id: str):
async def _call():
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.base_url}/users/{user_id}",
timeout=aiohttp.ClientTimeout(total=5)
) as response:
response.raise_for_status()
return await response.json()

return await self.circuit_breaker.call(_call)

Comunicación Asíncrona (Message Queues)

# Productor de eventos
import aiormq
import json

class EventPublisher:
def __init__(self, rabbitmq_url: str):
self.rabbitmq_url = rabbitmq_url
self.connection = None
self.channel = None

async def connect(self):
self.connection = await aiormq.connect(self.rabbitmq_url)
self.channel = await self.connection.channel()

async def publish_event(self, event_type: str, data: dict):
message = {
"event_type": event_type,
"data": data,
"timestamp": datetime.utcnow().isoformat()
}

await self.channel.basic_publish(
json.dumps(message).encode(),
routing_key=event_type,
exchange="events"
)

# Consumidor de eventos
class EventConsumer:
def __init__(self, rabbitmq_url: str):
self.rabbitmq_url = rabbitmq_url
self.handlers = {}

def register_handler(self, event_type: str, handler):
self.handlers[event_type] = handler

async def start_consuming(self):
connection = await aiormq.connect(self.rabbitmq_url)
channel = await connection.channel()

queue = await channel.queue_declare("order_events", durable=True)

async def on_message(message):
try:
data = json.loads(message.body.decode())
event_type = data["event_type"]

if event_type in self.handlers:
await self.handlers[event_type](data["data"])

await message.ack()
except Exception as e:
await message.nack(requeue=False)

await channel.basic_consume(queue.queue, on_message)

Gestión de Datos

Event Sourcing

# Implementación de Event Store
class Event:
def __init__(self, aggregate_id: str, event_type: str, data: dict):
self.id = str(uuid.uuid4())
self.aggregate_id = aggregate_id
self.event_type = event_type
self.data = data
self.timestamp = datetime.utcnow()
self.version = 1

class EventStore:
def __init__(self):
self.events = []

async def save_events(self, aggregate_id: str, events: List[Event], expected_version: int):
# Verificar versión para evitar conflictos
current_version = await self.get_current_version(aggregate_id)
if current_version != expected_version:
raise ConcurrencyError(f"Expected version {expected_version}, got {current_version}")

# Guardar eventos
for i, event in enumerate(events):
event.version = expected_version + i + 1
self.events.append(event)

async def get_events(self, aggregate_id: str, from_version: int = 0) -> List[Event]:
return [e for e in self.events
if e.aggregate_id == aggregate_id and e.version > from_version]

async def get_current_version(self, aggregate_id: str) -> int:
events = await self.get_events(aggregate_id)
return max([e.version for e in events], default=0)

CQRS (Command Query Responsibility Segregation)

# Separación de comandos y consultas
class OrderCommand:
pass

class CreateOrderCommand(OrderCommand):
def __init__(self, customer_id: str, items: List[dict]):
self.customer_id = customer_id
self.items = items

class OrderCommandHandler:
def __init__(self, event_store: EventStore):
self.event_store = event_store

async def handle(self, command: CreateOrderCommand):
# Crear eventos
events = [
OrderCreatedEvent(
order_id=str(uuid.uuid4()),
customer_id=command.customer_id
)
]

for item in command.items:
events.append(
ItemAddedEvent(
order_id=events[0].order_id,
product_id=item["product_id"],
quantity=item["quantity"]
)
)

await self.event_store.save_events(events[0].order_id, events, 0)

# Read Model para consultas
class OrderReadModel:
def __init__(self):
self.orders = {}

async def handle_order_created(self, event: OrderCreatedEvent):
self.orders[event.order_id] = {
"id": event.order_id,
"customer_id": event.customer_id,
"items": [],
"status": "created"
}

async def handle_item_added(self, event: ItemAddedEvent):
order = self.orders[event.order_id]
order["items"].append({
"product_id": event.product_id,
"quantity": event.quantity
})

async def get_order(self, order_id: str):
return self.orders.get(order_id)

Service Discovery

Consul con Python

import consul

class ServiceRegistry:
def __init__(self, consul_host='localhost', consul_port=8500):
self.consul = consul.Consul(host=consul_host, port=consul_port)

def register_service(self, name: str, service_id: str, address: str, port: int):
"""Registrar servicio en Consul"""
self.consul.agent.service.register(
name=name,
service_id=service_id,
address=address,
port=port,
check=consul.Check.http(
url=f"http://{address}:{port}/health",
interval="10s"
)
)

def discover_service(self, service_name: str):
"""Descubrir instancias de un servicio"""
_, services = self.consul.health.service(service_name, passing=True)

instances = []
for service in services:
instances.append({
'address': service['Service']['Address'],
'port': service['Service']['Port']
})

return instances

def deregister_service(self, service_id: str):
"""Desregistrar servicio"""
self.consul.agent.service.deregister(service_id)

# Uso en el servicio
async def startup_event():
registry = ServiceRegistry()
registry.register_service(
name="order-service",
service_id=f"order-service-{os.getenv('INSTANCE_ID')}",
address=os.getenv('HOST', 'localhost'),
port=int(os.getenv('PORT', 8000))
)

API Gateway

Configuración con Kong

# kong.yml
_format_version: "2.1"

services:
- name: user-service
url: http://user-service:8000
routes:
- name: user-routes
paths:
- /api/users
methods:
- GET
- POST
- PUT
- DELETE
plugins:
- name: rate-limiting
config:
minute: 100
hour: 1000
- name: jwt
config:
secret_is_base64: false

- name: order-service
url: http://order-service:8000
routes:
- name: order-routes
paths:
- /api/orders
methods:
- GET
- POST
- PUT
plugins:
- name: rate-limiting
config:
minute: 50
hour: 500
- name: jwt
config:
secret_is_base64: false

# Global plugins
plugins:
- name: prometheus
config:
per_consumer: true
- name: cors
config:
origins:
- "*"
methods:
- GET
- POST
- PUT
- DELETE
headers:
- Accept
- Authorization
- Content-Type

Observabilidad

Distributed Tracing con OpenTelemetry

from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

# Configuración de tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)

span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# Instrumentación automática de requests
RequestsInstrumentor().instrument()

# Uso en el código
async def process_order(order_data: dict):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_data["id"])
span.set_attribute("order.customer_id", order_data["customer_id"])

# Validar pedido
with tracer.start_as_current_span("validate_order"):
await validate_order(order_data)

# Reservar inventario
with tracer.start_as_current_span("reserve_inventory"):
await reserve_inventory(order_data["items"])

# Procesar pago
with tracer.start_as_current_span("process_payment"):
await process_payment(order_data["payment"])

span.set_attribute("order.status", "completed")

Métricas con Prometheus

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Definir métricas
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code']
)

REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)

ACTIVE_CONNECTIONS = Gauge(
'active_connections',
'Active database connections'
)

# Middleware para métricas
class MetricsMiddleware:
def __init__(self, app):
self.app = app

async def __call__(self, scope, receive, send):
if scope["type"] == "http":
method = scope["method"]
path = scope["path"]

start_time = time.time()

async def send_wrapper(message):
if message["type"] == "http.response.start":
status_code = message["status"]

# Incrementar contador
REQUEST_COUNT.labels(
method=method,
endpoint=path,
status_code=status_code
).inc()

# Registrar duración
duration = time.time() - start_time
REQUEST_DURATION.labels(
method=method,
endpoint=path
).observe(duration)

await send(message)

await self.app(scope, receive, send_wrapper)
else:
await self.app(scope, receive, send)

# Iniciar servidor de métricas
start_http_server(8001)

DevOps para Microservicios

CI/CD Pipeline

# .github/workflows/microservices.yml
name: Microservices CI/CD

on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
detect-changes:
runs-on: ubuntu-latest
outputs:
user-service: ${{ steps.changes.outputs.user-service }}
order-service: ${{ steps.changes.outputs.order-service }}
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
id: changes
with:
filters: |
user-service:
- 'services/user-service/**'
order-service:
- 'services/order-service/**'

test-user-service:
needs: detect-changes
if: needs.detect-changes.outputs.user-service == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Test User Service
run: |
cd services/user-service
docker-compose -f docker-compose.test.yml up --build --abort-on-container-exit

- name: Security Scan
run: |
docker run --rm -v "$(pwd)/services/user-service:/app" \
securecodewarrior/docker-image-scanner:latest /app

deploy-user-service:
needs: [detect-changes, test-user-service]
if: needs.detect-changes.outputs.user-service == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Deploy to Kubernetes
run: |
cd services/user-service
kubectl apply -f k8s/
kubectl rollout status deployment/user-service

Kubernetes Deployment

# user-service-deployment.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
labels:
app: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: user-service:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: user-service-secrets
key: database-url
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"

---
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- port: 80
targetPort: 8000
type: ClusterIP

Ejercicios Prácticos

Ejercicio 1: Implementar Circuit Breaker

# Tarea: Completar la implementación del Circuit Breaker
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60, success_threshold=3):
# TODO: Implementar atributos necesarios
pass

async def call(self, func, *args, **kwargs):
# TODO: Implementar lógica del circuit breaker
pass

def _should_attempt_reset(self):
# TODO: Determinar si se debe intentar resetear el circuit
pass

# Test del circuit breaker
async def unreliable_service():
if random.random() < 0.7: # 70% de fallos
raise Exception("Service unavailable")
return "Success"

# Probar con diferentes escenarios

Ejercicio 2: Event Sourcing

# Tarea: Implementar un agregado con Event Sourcing
class BankAccount:
def __init__(self, account_id: str):
self.id = account_id
self.balance = 0
self.version = 0
self.pending_events = []

def deposit(self, amount: float):
# TODO: Implementar depósito con eventos
pass

def withdraw(self, amount: float):
# TODO: Implementar retiro con validación y eventos
pass

@classmethod
def from_events(cls, account_id: str, events: List[Event]):
# TODO: Reconstruir estado desde eventos
pass

def get_pending_events(self):
# TODO: Retornar eventos pendientes de guardar
pass

# Eventos a implementar:
# - AccountCreatedEvent
# - MoneyDepositedEvent
# - MoneyWithdrawnEvent

Ejercicio 3: Service Mesh con Istio

# Tarea: Configurar políticas de tráfico
# destination-rule.yml
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: user-service
spec:
host: user-service
trafficPolicy:
# TODO: Configurar circuit breaker
connectionPool:
tcp:
maxConnections: 10
http:
http1MaxPendingRequests: 10
maxRequestsPerConnection: 2
outlierDetection:
# TODO: Configurar detección de outliers
pass

---
# virtual-service.yml
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: user-service
spec:
http:
- match:
- headers:
canary:
exact: "true"
route:
- destination:
host: user-service
subset: v2
weight: 100
- route:
- destination:
host: user-service
subset: v1
weight: 90
- destination:
host: user-service
subset: v2
weight: 10

Checklist de Microservicios

Diseño

  • Servicios tienen una responsabilidad única
  • Boundaries del dominio bien definidos
  • Base de datos por servicio
  • Comunicación asíncrona para operaciones no críticas
  • Idempotencia en operaciones críticas

Observabilidad

  • Distributed tracing implementado
  • Métricas de negocio y técnicas
  • Logging estructurado con correlation IDs
  • Health checks configurados
  • Dashboard de monitoreo

Resilencia

  • Circuit breakers implementados
  • Timeouts configurados apropiadamente
  • Retry con backoff exponencial
  • Bulkhead pattern para aislamiento
  • Graceful degradation

Seguridad

  • Autenticación y autorización
  • Comunicación encriptada (TLS)
  • Secrets management
  • Input validation
  • Rate limiting

DevOps

  • CI/CD por servicio
  • Deployment automatizado
  • Infrastructure as Code
  • Testing automatizado (unit, integration, contract)
  • Rollback automatizado

Datos

  • Eventual consistency manejada
  • Saga pattern para transacciones distribuidas
  • Event sourcing donde sea apropiado
  • Data synchronization strategy
  • Backup y recovery procedures

Recursos Adicionales