402. Microservicios
Índice
- Introducción a los Microservicios
- Arquitectura de Microservicios
- Patrones de Diseño
- Comunicación entre Servicios
- Gestión de Datos
- Service Discovery
- API Gateway
- Observabilidad
- Seguridad
- Deployment y Orquestación
- Testing en Microservicios
- DevOps para Microservicios
- Ejercicios Prácticos
Introducción
Los microservicios representan un enfoque arquitectónico que estructura una aplicación como una colección de servicios débilmente acoplados.
¿Qué son los Microservicios?
Ventajas y Desventajas
Ventajas:
- Escalabilidad independiente
- Tecnologías heterogéneas
- Desarrollo independiente por equipos
- Resiliencia y tolerancia a fallos
- Deployment independiente
Desventajas:
- Complejidad operacional
- Latencia de red
- Consistencia de datos
- Testing complejo
- Monitoreo distribuido
Arquitectura
Principios de Diseño
# Ejemplo de definición de servicios
services:
user-service:
domain: user-management
responsibilities:
- user-authentication
- user-profile-management
- user-preferences
data-ownership:
- users
- profiles
order-service:
domain: order-management
responsibilities:
- order-creation
- order-tracking
- order-fulfillment
data-ownership:
- orders
- order-items
Domain-Driven Design (DDD)
# Ejemplo de agregado en Python
class Order:
def __init__(self, customer_id: str):
self.id = str(uuid.uuid4())
self.customer_id = customer_id
self.items = []
self.status = OrderStatus.PENDING
self.created_at = datetime.utcnow()
def add_item(self, product_id: str, quantity: int, price: float):
"""Añadir item al pedido"""
if self.status != OrderStatus.PENDING:
raise InvalidOperationError("Cannot modify confirmed order")
item = OrderItem(product_id, quantity, price)
self.items.append(item)
self._publish_event(ItemAddedEvent(self.id, item))
def confirm(self):
"""Confirmar pedido"""
if not self.items:
raise InvalidOperationError("Cannot confirm empty order")
self.status = OrderStatus.CONFIRMED
self._publish_event(OrderConfirmedEvent(self.id))
Patrones de Diseño
1. Database per Service
version: '3.8'
services:
user-service:
image: user-service:latest
environment:
- DATABASE_URL=postgres://user:pass@user-db:5432/userdb
depends_on:
- user-db
user-db:
image: postgres:13
environment:
POSTGRES_DB: userdb
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
order-service:
image: order-service:latest
environment:
- DATABASE_URL=postgres://order:pass@order-db:5432/orderdb
depends_on:
- order-db
order-db:
image: postgres:13
environment:
POSTGRES_DB: orderdb
POSTGRES_USER: order
POSTGRES_PASSWORD: pass
2. Saga Pattern
# Saga para procesamiento de pedidos
class OrderSaga:
def __init__(self):
self.steps = [
self.validate_order,
self.reserve_inventory,
self.process_payment,
self.ship_order
]
self.compensation_steps = [
self.cancel_shipping,
self.refund_payment,
self.release_inventory,
self.cancel_order
]
async def execute(self, order_id: str):
completed_steps = []
try:
for step in self.steps:
await step(order_id)
completed_steps.append(step)
except Exception as e:
# Ejecutar compensación
for step in reversed(completed_steps):
compensation_index = self.steps.index(step)
await self.compensation_steps[compensation_index](order_id)
raise e
3. Circuit Breaker
import asyncio
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Comunicación entre Servicios
Comunicación Síncrona (HTTP/REST)
# Cliente HTTP con retry y circuit breaker
import aiohttp
import asyncio
class ServiceClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.circuit_breaker = CircuitBreaker()
async def get_user(self, user_id: str):
async def _call():
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.base_url}/users/{user_id}",
timeout=aiohttp.ClientTimeout(total=5)
) as response:
response.raise_for_status()
return await response.json()
return await self.circuit_breaker.call(_call)
Comunicación Asíncrona (Message Queues)
# Productor de eventos
import aiormq
import json
class EventPublisher:
def __init__(self, rabbitmq_url: str):
self.rabbitmq_url = rabbitmq_url
self.connection = None
self.channel = None
async def connect(self):
self.connection = await aiormq.connect(self.rabbitmq_url)
self.channel = await self.connection.channel()
async def publish_event(self, event_type: str, data: dict):
message = {
"event_type": event_type,
"data": data,
"timestamp": datetime.utcnow().isoformat()
}
await self.channel.basic_publish(
json.dumps(message).encode(),
routing_key=event_type,
exchange="events"
)
# Consumidor de eventos
class EventConsumer:
def __init__(self, rabbitmq_url: str):
self.rabbitmq_url = rabbitmq_url
self.handlers = {}
def register_handler(self, event_type: str, handler):
self.handlers[event_type] = handler
async def start_consuming(self):
connection = await aiormq.connect(self.rabbitmq_url)
channel = await connection.channel()
queue = await channel.queue_declare("order_events", durable=True)
async def on_message(message):
try:
data = json.loads(message.body.decode())
event_type = data["event_type"]
if event_type in self.handlers:
await self.handlers[event_type](data["data"])
await message.ack()
except Exception as e:
await message.nack(requeue=False)
await channel.basic_consume(queue.queue, on_message)
Gestión de Datos
Event Sourcing
# Implementación de Event Store
class Event:
def __init__(self, aggregate_id: str, event_type: str, data: dict):
self.id = str(uuid.uuid4())
self.aggregate_id = aggregate_id
self.event_type = event_type
self.data = data
self.timestamp = datetime.utcnow()
self.version = 1
class EventStore:
def __init__(self):
self.events = []
async def save_events(self, aggregate_id: str, events: List[Event], expected_version: int):
# Verificar versión para evitar conflictos
current_version = await self.get_current_version(aggregate_id)
if current_version != expected_version:
raise ConcurrencyError(f"Expected version {expected_version}, got {current_version}")
# Guardar eventos
for i, event in enumerate(events):
event.version = expected_version + i + 1
self.events.append(event)
async def get_events(self, aggregate_id: str, from_version: int = 0) -> List[Event]:
return [e for e in self.events
if e.aggregate_id == aggregate_id and e.version > from_version]
async def get_current_version(self, aggregate_id: str) -> int:
events = await self.get_events(aggregate_id)
return max([e.version for e in events], default=0)
CQRS (Command Query Responsibility Segregation)
# Separación de comandos y consultas
class OrderCommand:
pass
class CreateOrderCommand(OrderCommand):
def __init__(self, customer_id: str, items: List[dict]):
self.customer_id = customer_id
self.items = items
class OrderCommandHandler:
def __init__(self, event_store: EventStore):
self.event_store = event_store
async def handle(self, command: CreateOrderCommand):
# Crear eventos
events = [
OrderCreatedEvent(
order_id=str(uuid.uuid4()),
customer_id=command.customer_id
)
]
for item in command.items:
events.append(
ItemAddedEvent(
order_id=events[0].order_id,
product_id=item["product_id"],
quantity=item["quantity"]
)
)
await self.event_store.save_events(events[0].order_id, events, 0)
# Read Model para consultas
class OrderReadModel:
def __init__(self):
self.orders = {}
async def handle_order_created(self, event: OrderCreatedEvent):
self.orders[event.order_id] = {
"id": event.order_id,
"customer_id": event.customer_id,
"items": [],
"status": "created"
}
async def handle_item_added(self, event: ItemAddedEvent):
order = self.orders[event.order_id]
order["items"].append({
"product_id": event.product_id,
"quantity": event.quantity
})
async def get_order(self, order_id: str):
return self.orders.get(order_id)
Service Discovery
Consul con Python
import consul
class ServiceRegistry:
def __init__(self, consul_host='localhost', consul_port=8500):
self.consul = consul.Consul(host=consul_host, port=consul_port)
def register_service(self, name: str, service_id: str, address: str, port: int):
"""Registrar servicio en Consul"""
self.consul.agent.service.register(
name=name,
service_id=service_id,
address=address,
port=port,
check=consul.Check.http(
url=f"http://{address}:{port}/health",
interval="10s"
)
)
def discover_service(self, service_name: str):
"""Descubrir instancias de un servicio"""
_, services = self.consul.health.service(service_name, passing=True)
instances = []
for service in services:
instances.append({
'address': service['Service']['Address'],
'port': service['Service']['Port']
})
return instances
def deregister_service(self, service_id: str):
"""Desregistrar servicio"""
self.consul.agent.service.deregister(service_id)
# Uso en el servicio
async def startup_event():
registry = ServiceRegistry()
registry.register_service(
name="order-service",
service_id=f"order-service-{os.getenv('INSTANCE_ID')}",
address=os.getenv('HOST', 'localhost'),
port=int(os.getenv('PORT', 8000))
)
API Gateway
Configuración con Kong
# kong.yml
_format_version: "2.1"
services:
- name: user-service
url: http://user-service:8000
routes:
- name: user-routes
paths:
- /api/users
methods:
- GET
- POST
- PUT
- DELETE
plugins:
- name: rate-limiting
config:
minute: 100
hour: 1000
- name: jwt
config:
secret_is_base64: false
- name: order-service
url: http://order-service:8000
routes:
- name: order-routes
paths:
- /api/orders
methods:
- GET
- POST
- PUT
plugins:
- name: rate-limiting
config:
minute: 50
hour: 500
- name: jwt
config:
secret_is_base64: false
# Global plugins
plugins:
- name: prometheus
config:
per_consumer: true
- name: cors
config:
origins:
- "*"
methods:
- GET
- POST
- PUT
- DELETE
headers:
- Accept
- Authorization
- Content-Type
Observabilidad
Distributed Tracing con OpenTelemetry
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# Configuración de tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Instrumentación automática de requests
RequestsInstrumentor().instrument()
# Uso en el código
async def process_order(order_data: dict):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_data["id"])
span.set_attribute("order.customer_id", order_data["customer_id"])
# Validar pedido
with tracer.start_as_current_span("validate_order"):
await validate_order(order_data)
# Reservar inventario
with tracer.start_as_current_span("reserve_inventory"):
await reserve_inventory(order_data["items"])
# Procesar pago
with tracer.start_as_current_span("process_payment"):
await process_payment(order_data["payment"])
span.set_attribute("order.status", "completed")
Métricas con Prometheus
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Definir métricas
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
ACTIVE_CONNECTIONS = Gauge(
'active_connections',
'Active database connections'
)
# Middleware para métricas
class MetricsMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
method = scope["method"]
path = scope["path"]
start_time = time.time()
async def send_wrapper(message):
if message["type"] == "http.response.start":
status_code = message["status"]
# Incrementar contador
REQUEST_COUNT.labels(
method=method,
endpoint=path,
status_code=status_code
).inc()
# Registrar duración
duration = time.time() - start_time
REQUEST_DURATION.labels(
method=method,
endpoint=path
).observe(duration)
await send(message)
await self.app(scope, receive, send_wrapper)
else:
await self.app(scope, receive, send)
# Iniciar servidor de métricas
start_http_server(8001)
DevOps para Microservicios
CI/CD Pipeline
# .github/workflows/microservices.yml
name: Microservices CI/CD
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
detect-changes:
runs-on: ubuntu-latest
outputs:
user-service: ${{ steps.changes.outputs.user-service }}
order-service: ${{ steps.changes.outputs.order-service }}
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
id: changes
with:
filters: |
user-service:
- 'services/user-service/**'
order-service:
- 'services/order-service/**'
test-user-service:
needs: detect-changes
if: needs.detect-changes.outputs.user-service == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Test User Service
run: |
cd services/user-service
docker-compose -f docker-compose.test.yml up --build --abort-on-container-exit
- name: Security Scan
run: |
docker run --rm -v "$(pwd)/services/user-service:/app" \
securecodewarrior/docker-image-scanner:latest /app
deploy-user-service:
needs: [detect-changes, test-user-service]
if: needs.detect-changes.outputs.user-service == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Deploy to Kubernetes
run: |
cd services/user-service
kubectl apply -f k8s/
kubectl rollout status deployment/user-service
Kubernetes Deployment
# user-service-deployment.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
labels:
app: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: user-service:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: user-service-secrets
key: database-url
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- port: 80
targetPort: 8000
type: ClusterIP
Ejercicios Prácticos
Ejercicio 1: Implementar Circuit Breaker
# Tarea: Completar la implementación del Circuit Breaker
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60, success_threshold=3):
# TODO: Implementar atributos necesarios
pass
async def call(self, func, *args, **kwargs):
# TODO: Implementar lógica del circuit breaker
pass
def _should_attempt_reset(self):
# TODO: Determinar si se debe intentar resetear el circuit
pass
# Test del circuit breaker
async def unreliable_service():
if random.random() < 0.7: # 70% de fallos
raise Exception("Service unavailable")
return "Success"
# Probar con diferentes escenarios
Ejercicio 2: Event Sourcing
# Tarea: Implementar un agregado con Event Sourcing
class BankAccount:
def __init__(self, account_id: str):
self.id = account_id
self.balance = 0
self.version = 0
self.pending_events = []
def deposit(self, amount: float):
# TODO: Implementar depósito con eventos
pass
def withdraw(self, amount: float):
# TODO: Implementar retiro con validación y eventos
pass
@classmethod
def from_events(cls, account_id: str, events: List[Event]):
# TODO: Reconstruir estado desde eventos
pass
def get_pending_events(self):
# TODO: Retornar eventos pendientes de guardar
pass
# Eventos a implementar:
# - AccountCreatedEvent
# - MoneyDepositedEvent
# - MoneyWithdrawnEvent
Ejercicio 3: Service Mesh con Istio
# Tarea: Configurar políticas de tráfico
# destination-rule.yml
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: user-service
spec:
host: user-service
trafficPolicy:
# TODO: Configurar circuit breaker
connectionPool:
tcp:
maxConnections: 10
http:
http1MaxPendingRequests: 10
maxRequestsPerConnection: 2
outlierDetection:
# TODO: Configurar detección de outliers
pass
---
# virtual-service.yml
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: user-service
spec:
http:
- match:
- headers:
canary:
exact: "true"
route:
- destination:
host: user-service
subset: v2
weight: 100
- route:
- destination:
host: user-service
subset: v1
weight: 90
- destination:
host: user-service
subset: v2
weight: 10
Checklist de Microservicios
Diseño
- Servicios tienen una responsabilidad única
- Boundaries del dominio bien definidos
- Base de datos por servicio
- Comunicación asíncrona para operaciones no críticas
- Idempotencia en operaciones críticas
Observabilidad
- Distributed tracing implementado
- Métricas de negocio y técnicas
- Logging estructurado con correlation IDs
- Health checks configurados
- Dashboard de monitoreo
Resilencia
- Circuit breakers implementados
- Timeouts configurados apropiadamente
- Retry con backoff exponencial
- Bulkhead pattern para aislamiento
- Graceful degradation
Seguridad
- Autenticación y autorización
- Comunicación encriptada (TLS)
- Secrets management
- Input validation
- Rate limiting
DevOps
- CI/CD por servicio
- Deployment automatizado
- Infrastructure as Code
- Testing automatizado (unit, integration, contract)
- Rollback automatizado
Datos
- Eventual consistency manejada
- Saga pattern para transacciones distribuidas
- Event sourcing donde sea apropiado
- Data synchronization strategy
- Backup y recovery procedures
Recursos Adicionales
- Microservices.io - Patrones de microservicios
- Building Microservices - Sam Newman
- Istio Documentation
- OpenTelemetry
- Event Sourcing