Testing Automatizado en DevOps 🧪
¿Por qué el Testing es Crítico en DevOps?
En DevOps, la velocidad sin calidad es caos. El testing automatizado es lo que nos permite desplegar rápidamente sin comprometer la calidad, proporcionando la confianza necesaria para la entrega continua.
🎯 Objetivos del Capítulo
- Implementar estrategia completa de testing automatizado
- Dominar diferentes tipos de tests y cuándo usarlos
- Integrar testing en pipelines CI/CD
- Crear cultura de quality assurance compartida
- Optimizar velocidad vs cobertura de testing
🏗️ La Pirámide de Testing
Principios de Testing en DevOps
- Shift-Left: Testing temprano en el desarrollo
- Fast Feedback: Resultados en minutos, no horas
- Fail Fast: Detectar problemas rápidamente
- Test in Production: Monitoring y canary testing
- Shared Responsibility: Todos son responsables de la calidad
🧩 Tipos de Testing Automatizado
1. Unit Testing - La Base
Python con pytest
# tests/test_user_service.py
import pytest
from unittest.mock import Mock, patch
from services.user_service import UserService
from models.user import User
class TestUserService:
def setup_method(self):
"""Setup antes de cada test"""
self.mock_db = Mock()
self.user_service = UserService(self.mock_db)
def test_create_user_success(self):
"""Test exitoso de creación de usuario"""
# Arrange
user_data = {
'email': 'test@example.com',
'password': 'secure_password123',
'name': 'Test User'
}
self.mock_db.find_by_email.return_value = None
self.mock_db.save.return_value = User(id=1, **user_data)
# Act
result = self.user_service.create_user(user_data)
# Assert
assert result.id == 1
assert result.email == 'test@example.com'
self.mock_db.save.assert_called_once()
def test_create_user_duplicate_email(self):
"""Test de usuario duplicado"""
# Arrange
user_data = {'email': 'existing@example.com'}
self.mock_db.find_by_email.return_value = User(id=1, email='existing@example.com')
# Act & Assert
with pytest.raises(ValueError, match="Email already exists"):
self.user_service.create_user(user_data)
@pytest.mark.parametrize("invalid_email", [
"invalid-email",
"@example.com",
"test@",
""
])
def test_create_user_invalid_email(self, invalid_email):
"""Test con emails inválidos"""
user_data = {'email': invalid_email, 'password': 'pass123'}
with pytest.raises(ValueError, match="Invalid email format"):
self.user_service.create_user(user_data)
@patch('services.user_service.send_welcome_email')
def test_create_user_sends_welcome_email(self, mock_send_email):
"""Test que se envía email de bienvenida"""
# Arrange
user_data = {'email': 'test@example.com', 'password': 'pass123'}
self.mock_db.find_by_email.return_value = None
created_user = User(id=1, **user_data)
self.mock_db.save.return_value = created_user
# Act
self.user_service.create_user(user_data)
# Assert
mock_send_email.assert_called_once_with(created_user)
# Configuración de pytest
# pytest.ini
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
--verbose
--cov=src
--cov-report=html
--cov-report=xml
--cov-fail-under=80
--tb=short
markers =
slow: marks tests as slow
integration: marks tests as integration tests
unit: marks tests as unit tests
JavaScript con Jest
// tests/userService.test.js
const { UserService } = require('../src/services/userService');
const { User } = require('../src/models/user');
// Mock de la base de datos
const mockDb = {
findByEmail: jest.fn(),
save: jest.fn(),
findById: jest.fn()
};
// Mock de servicios externos
jest.mock('../src/services/emailService', () => ({
sendWelcomeEmail: jest.fn()
}));
describe('UserService', () => {
let userService;
beforeEach(() => {
// Reset mocks antes de cada test
jest.clearAllMocks();
userService = new UserService(mockDb);
});
describe('createUser', () => {
const validUserData = {
email: 'test@example.com',
password: 'securePassword123',
name: 'Test User'
};
it('should create user successfully', async () => {
// Arrange
mockDb.findByEmail.mockResolvedValue(null);
const savedUser = new User({ id: 1, ...validUserData });
mockDb.save.mockResolvedValue(savedUser);
// Act
const result = await userService.createUser(validUserData);
// Assert
expect(result).toEqual(savedUser);
expect(mockDb.findByEmail).toHaveBeenCalledWith(validUserData.email);
expect(mockDb.save).toHaveBeenCalledWith(expect.objectContaining({
email: validUserData.email,
name: validUserData.name
}));
});
it('should throw error for duplicate email', async () => {
// Arrange
const existingUser = new User({ id: 1, email: validUserData.email });
mockDb.findByEmail.mockResolvedValue(existingUser);
// Act & Assert
await expect(userService.createUser(validUserData))
.rejects.toThrow('Email already exists');
});
it.each([
['invalid-email'],
['@example.com'],
['test@'],
['']
])('should throw error for invalid email: %s', async (invalidEmail) => {
const userData = { ...validUserData, email: invalidEmail };
await expect(userService.createUser(userData))
.rejects.toThrow('Invalid email format');
});
});
describe('updateUser', () => {
it('should update user successfully', async () => {
// Arrange
const userId = 1;
const updateData = { name: 'Updated Name' };
const existingUser = new User({ id: userId, email: 'test@example.com' });
const updatedUser = new User({ ...existingUser, ...updateData });
mockDb.findById.mockResolvedValue(existingUser);
mockDb.save.mockResolvedValue(updatedUser);
// Act
const result = await userService.updateUser(userId, updateData);
// Assert
expect(result.name).toBe(updateData.name);
expect(mockDb.findById).toHaveBeenCalledWith(userId);
});
});
});
// Performance testing
describe('UserService Performance', () => {
it('should create user within acceptable time', async () => {
const start = Date.now();
await userService.createUser(validUserData);
const duration = Date.now() - start;
expect(duration).toBeLessThan(100); // < 100ms
});
});
// package.json scripts
{
"scripts": {
"test": "jest",
"test:watch": "jest --watch",
"test:coverage": "jest --coverage",
"test:ci": "jest --ci --coverage --watchAll=false"
},
"jest": {
"testEnvironment": "node",
"coverageDirectory": "coverage",
"collectCoverageFrom": [
"src/**/*.js",
"!src/**/*.test.js",
"!src/index.js"
],
"coverageThreshold": {
"global": {
"branches": 80,
"functions": 80,
"lines": 80,
"statements": 80
}
}
}
}
2. Integration Testing
Testing de APIs con Newman/Postman
{
"info": {
"name": "User API Integration Tests"
},
"item": [
{
"name": "Create User Flow",
"item": [
{
"name": "Create New User",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"body": {
"mode": "raw",
"raw": "{\n \"email\": \"{{$randomEmail}}\",\n \"password\": \"securePassword123\",\n \"name\": \"{{$randomFullName}}\"\n}"
},
"url": {
"raw": "{{base_url}}/api/users",
"host": ["{{base_url}}"],
"path": ["api", "users"]
}
},
"event": [
{
"listen": "test",
"script": {
"exec": [
"pm.test('Status code is 201', function () {",
" pm.response.to.have.status(201);",
"});",
"",
"pm.test('Response has user data', function () {",
" const responseJson = pm.response.json();",
" pm.expect(responseJson).to.have.property('id');",
" pm.expect(responseJson).to.have.property('email');",
" pm.expect(responseJson).to.not.have.property('password');",
"});",
"",
"pm.test('Response time is less than 1000ms', function () {",
" pm.expect(pm.response.responseTime).to.be.below(1000);",
"});",
"",
"// Guardar user ID para tests siguientes",
"const responseJson = pm.response.json();",
"pm.collectionVariables.set('user_id', responseJson.id);"
]
}
}
]
},
{
"name": "Get Created User",
"request": {
"method": "GET",
"url": {
"raw": "{{base_url}}/api/users/{{user_id}}",
"host": ["{{base_url}}"],
"path": ["api", "users", "{{user_id}}"]
}
},
"event": [
{
"listen": "test",
"script": {
"exec": [
"pm.test('User exists and data is correct', function () {",
" pm.response.to.have.status(200);",
" const user = pm.response.json();",
" pm.expect(user.id).to.eql(pm.collectionVariables.get('user_id'));",
"});"
]
}
}
]
}
]
}
]
}
Database Integration Testing
# tests/integration/test_database.py
import pytest
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models.base import Base
from models.user import User
from services.user_service import UserService
@pytest.fixture(scope="session")
def test_db():
"""Crear base de datos de testing"""
# Usar base de datos en memoria para tests
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
@pytest.fixture
def user_service(test_db):
"""User service con base de datos real"""
return UserService(test_db)
class TestUserServiceIntegration:
def test_full_user_lifecycle(self, user_service, test_db):
"""Test completo del ciclo de vida del usuario"""
# Create
user_data = {
'email': 'integration@test.com',
'password': 'password123',
'name': 'Integration Test'
}
created_user = user_service.create_user(user_data)
assert created_user.id is not None
assert created_user.email == user_data['email']
# Read
retrieved_user = user_service.get_user(created_user.id)
assert retrieved_user.email == created_user.email
# Update
update_data = {'name': 'Updated Name'}
updated_user = user_service.update_user(created_user.id, update_data)
assert updated_user.name == 'Updated Name'
# Delete
user_service.delete_user(created_user.id)
deleted_user = user_service.get_user(created_user.id)
assert deleted_user is None
def test_database_constraints(self, user_service):
"""Test de constraints de base de datos"""
user_data = {'email': 'constraint@test.com', 'password': 'pass123'}
# Crear primer usuario
user1 = user_service.create_user(user_data)
# Intentar crear usuario con mismo email
with pytest.raises(Exception): # Debería fallar por constraint UNIQUE
user_service.create_user(user_data)
def test_transaction_rollback(self, user_service, test_db):
"""Test de rollback de transacciones"""
user_data = {'email': 'rollback@test.com', 'password': 'pass123'}
try:
with test_db.begin():
user = user_service.create_user(user_data)
# Simular error que causa rollback
raise Exception("Simulated error")
except:
pass
# Verificar que el usuario no fue guardado
users = test_db.query(User).filter_by(email='rollback@test.com').all()
assert len(users) == 0
3. End-to-End Testing
Playwright para E2E Testing
// tests/e2e/user-registration.spec.js
const { test, expect } = require('@playwright/test');
test.describe('User Registration Flow', () => {
test.beforeEach(async ({ page }) => {
// Setup test data
await page.goto('/');
});
test('complete user registration flow', async ({ page }) => {
// Navigate to registration
await page.click('text=Sign Up');
await expect(page).toHaveURL('/register');
// Fill registration form
const email = `test-${Date.now()}@example.com`;
await page.fill('[data-testid=email-input]', email);
await page.fill('[data-testid=password-input]', 'securePassword123');
await page.fill('[data-testid=confirm-password-input]', 'securePassword123');
await page.fill('[data-testid=name-input]', 'Test User');
// Submit form
await page.click('[data-testid=submit-button]');
// Verify success
await expect(page.locator('[data-testid=success-message]')).toBeVisible();
await expect(page).toHaveURL('/dashboard');
// Verify user data is displayed
await expect(page.locator('[data-testid=user-name]')).toContainText('Test User');
});
test('shows validation errors for invalid data', async ({ page }) => {
await page.click('text=Sign Up');
// Submit empty form
await page.click('[data-testid=submit-button]');
// Check validation errors
await expect(page.locator('[data-testid=email-error]')).toContainText('Email is required');
await expect(page.locator('[data-testid=password-error]')).toContainText('Password is required');
});
test('handles server errors gracefully', async ({ page }) => {
// Mock server error
await page.route('/api/users', route => {
route.fulfill({
status: 500,
contentType: 'application/json',
body: JSON.stringify({ error: 'Internal server error' })
});
});
await page.click('text=Sign Up');
await page.fill('[data-testid=email-input]', 'test@example.com');
await page.fill('[data-testid=password-input]', 'password123');
await page.click('[data-testid=submit-button]');
// Verify error handling
await expect(page.locator('[data-testid=error-message]')).toContainText('Registration failed');
});
});
// Visual regression testing
test('visual comparison of registration page', async ({ page }) => {
await page.goto('/register');
await expect(page).toHaveScreenshot('registration-page.png');
});
// Performance testing
test('registration page loads within acceptable time', async ({ page }) => {
const startTime = Date.now();
await page.goto('/register');
await page.waitForLoadState('networkidle');
const loadTime = Date.now() - startTime;
expect(loadTime).toBeLessThan(3000); // < 3 segundos
});
Configuración Playwright
// playwright.config.js
module.exports = {
testDir: './tests/e2e',
timeout: 30000,
retries: process.env.CI ? 2 : 0,
workers: process.env.CI ? 1 : undefined,
use: {
baseURL: process.env.BASE_URL || 'http://localhost:3000',
trace: 'on-first-retry',
screenshot: 'only-on-failure',
video: 'retain-on-failure'
},
projects: [
{
name: 'chromium',
use: { ...devices['Desktop Chrome'] }
},
{
name: 'firefox',
use: { ...devices['Desktop Firefox'] }
},
{
name: 'webkit',
use: { ...devices['Desktop Safari'] }
},
{
name: 'mobile-chrome',
use: { ...devices['Pixel 5'] }
}
],
webServer: {
command: 'npm run start:test',
port: 3000,
reuseExistingServer: !process.env.CI
}
};
🚀 Performance Testing
Load Testing con k6
// tests/performance/load-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate } from 'k6/metrics';
// Custom metrics
const errorRate = new Rate('errors');
export const options = {
stages: [
{ duration: '2m', target: 10 }, // Ramp up
{ duration: '5m', target: 10 }, // Stay at 10 users
{ duration: '2m', target: 50 }, // Ramp up to 50
{ duration: '5m', target: 50 }, // Stay at 50
{ duration: '2m', target: 100 }, // Ramp up to 100
{ duration: '5m', target: 100 }, // Stay at 100
{ duration: '2m', target: 0 }, // Ramp down
],
thresholds: {
http_req_duration: ['p(95)<500'], // 95% of requests under 500ms
http_req_failed: ['rate<0.1'], // Error rate under 10%
errors: ['rate<0.1']
}
};
export default function() {
const BASE_URL = 'https://api.example.com';
// Test user registration
const registrationData = {
email: `user-${Math.random().toString(36).substring(7)}@example.com`,
password: 'testPassword123',
name: 'Load Test User'
};
const registrationResponse = http.post(
`${BASE_URL}/api/users`,
JSON.stringify(registrationData),
{
headers: { 'Content-Type': 'application/json' }
}
);
const registrationSuccess = check(registrationResponse, {
'registration status is 201': (r) => r.status === 201,
'registration response time < 1s': (r) => r.timings.duration < 1000,
});
errorRate.add(!registrationSuccess);
if (registrationSuccess) {
const userId = registrationResponse.json().id;
// Test user retrieval
const getUserResponse = http.get(`${BASE_URL}/api/users/${userId}`);
const getUserSuccess = check(getUserResponse, {
'get user status is 200': (r) => r.status === 200,
'get user response time < 500ms': (r) => r.timings.duration < 500,
});
errorRate.add(!getUserSuccess);
}
sleep(1);
}
// Stress testing scenario
export function stressTest() {
const response = http.get('https://api.example.com/health');
check(response, {
'status is 200': (r) => r.status === 200,
});
}
export const stressOptions = {
executor: 'ramping-arrival-rate',
startRate: 10,
timeUnit: '1s',
preAllocatedVUs: 50,
maxVUs: 200,
stages: [
{ target: 10, duration: '1m' },
{ target: 50, duration: '2m' },
{ target: 100, duration: '2m' },
{ target: 200, duration: '2m' },
{ target: 300, duration: '1m' }, // Beyond normal capacity
{ target: 0, duration: '1m' }
]
};
🔄 Testing en CI/CD Pipeline
GitHub Actions Testing Pipeline
# .github/workflows/test-pipeline.yml
name: Test Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
unit-tests:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [16, 18, 20]
steps:
- uses: actions/checkout@v3
- name: Setup Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
cache: 'npm'
- name: Install dependencies
run: npm ci
- name: Run unit tests
run: npm run test:unit -- --coverage
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage/lcov.info
flags: unit-tests
name: codecov-unit-${{ matrix.node-version }}
integration-tests:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:13
env:
POSTGRES_PASSWORD: test_password
POSTGRES_DB: test_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis:
image: redis:7
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: 18
cache: 'npm'
- name: Install dependencies
run: npm ci
- name: Run database migrations
run: npm run migrate:test
env:
DATABASE_URL: postgresql://postgres:test_password@localhost:5432/test_db
- name: Run integration tests
run: npm run test:integration
env:
DATABASE_URL: postgresql://postgres:test_password@localhost:5432/test_db
REDIS_URL: redis://localhost:6379
e2e-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: 18
cache: 'npm'
- name: Install dependencies
run: npm ci
- name: Install Playwright browsers
run: npx playwright install --with-deps
- name: Build application
run: npm run build
- name: Start application
run: npm run start:test &
- name: Wait for application
run: npx wait-on http://localhost:3000
- name: Run E2E tests
run: npx playwright test
- name: Upload test results
uses: actions/upload-artifact@v3
if: always()
with:
name: playwright-report
path: playwright-report/
performance-tests:
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Setup k6
run: |
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys C5AD17C747E3415A3642D57D77C6C491D6AC1D69
echo "deb https://dl.k6.io/deb stable main" | sudo tee /etc/apt/sources.list.d/k6.list
sudo apt-get update
sudo apt-get install k6
- name: Run load tests
run: k6 run tests/performance/load-test.js
env:
BASE_URL: https://staging.example.com
security-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run dependency security audit
run: npm audit --audit-level=high
- name: OWASP ZAP Baseline Scan
uses: zaproxy/action-baseline@v0.7.0
with:
target: 'https://staging.example.com'
rules_file_name: '.zap/rules.tsv'
🎯 Test Data Management
Factory Pattern para Test Data
# tests/factories.py
import factory
from factory.alchemy import SQLAlchemyModelFactory
from models.user import User
from models.order import Order
class UserFactory(SQLAlchemyModelFactory):
class Meta:
model = User
sqlalchemy_session_persistence = "commit"
email = factory.Sequence(lambda n: f"user{n}@example.com")
name = factory.Faker('name')
password = factory.Faker('password')
is_active = True
created_at = factory.Faker('date_time_this_year')
class AdminUserFactory(UserFactory):
is_admin = True
email = factory.Sequence(lambda n: f"admin{n}@example.com")
class OrderFactory(SQLAlchemyModelFactory):
class Meta:
model = Order
sqlalchemy_session_persistence = "commit"
user = factory.SubFactory(UserFactory)
total_amount = factory.Faker('pydecimal', left_digits=3, right_digits=2, positive=True)
status = factory.Iterator(['pending', 'confirmed', 'shipped', 'delivered'])
created_at = factory.Faker('date_time_this_year')
# Uso en tests
def test_user_orders():
user = UserFactory()
orders = OrderFactory.create_batch(3, user=user)
assert len(user.orders) == 3
assert all(order.user_id == user.id for order in orders)
Database Seeding para Tests
# tests/seeders.py
from tests.factories import UserFactory, OrderFactory
class DatabaseSeeder:
def __init__(self, session):
self.session = session
def seed_basic_data(self):
"""Datos básicos para todos los tests"""
# Admin user
admin = UserFactory(
email="admin@test.com",
is_admin=True
)
# Regular users
users = UserFactory.create_batch(5)
# Some orders
for user in users:
OrderFactory.create_batch(
factory.random.randint(1, 3),
user=user
)
self.session.commit()
return {
'admin': admin,
'users': users
}
def seed_performance_data(self):
"""Datos para tests de performance"""
UserFactory.create_batch(1000)
OrderFactory.create_batch(5000)
self.session.commit()
# Fixture para seeders
@pytest.fixture
def seeded_db(test_db):
seeder = DatabaseSeeder(test_db)
data = seeder.seed_basic_data()
yield data
test_db.rollback()
📊 Test Metrics y Reporting
Coverage Analysis
# .coveragerc
[run]
source = src/
omit =
*/tests/*
*/venv/*
*/migrations/*
*/settings/*
manage.py
[report]
exclude_lines =
pragma: no cover
def __repr__
raise AssertionError
raise NotImplementedError
if __name__ == .__main__.:
[html]
directory = htmlcov
Custom Test Reporter
# tests/reporters/custom_reporter.py
import json
from datetime import datetime
class DevOpsTestReporter:
def __init__(self):
self.results = {
'timestamp': datetime.now().isoformat(),
'summary': {},
'details': [],
'coverage': {},
'performance': {}
}
def add_test_result(self, test_name, status, duration, error=None):
result = {
'name': test_name,
'status': status,
'duration': duration,
'timestamp': datetime.now().isoformat()
}
if error:
result['error'] = str(error)
self.results['details'].append(result)
def set_coverage(self, coverage_data):
self.results['coverage'] = coverage_data
def set_performance_metrics(self, metrics):
self.results['performance'] = metrics
def generate_summary(self):
details = self.results['details']
self.results['summary'] = {
'total': len(details),
'passed': len([t for t in details if t['status'] == 'passed']),
'failed': len([t for t in details if t['status'] == 'failed']),
'skipped': len([t for t in details if t['status'] == 'skipped']),
'total_duration': sum(t['duration'] for t in details),
'success_rate': len([t for t in details if t['status'] == 'passed']) / len(details) * 100
}
def save_report(self, filename='test-report.json'):
self.generate_summary()
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
def send_to_metrics_system(self):
"""Enviar métricas a sistema de monitoreo"""
# Implementar integración con Prometheus, InfluxDB, etc.
pass
✅ Testing Best Practices Checklist
- Tests rápidos (unit tests < 100ms)
- Tests determinísticos (no flaky tests)
- Coverage mínimo del 80% en código crítico
- Tests independientes entre sí
- Datos de test aislados y limpiados
- Tests de regresión para bugs críticos
- Performance tests en staging
- Security tests automatizados
- Visual regression tests para UI
- Contract testing para APIs
🎯 Ejercicios Prácticos
Ejercicio 1: Implementar Testing Strategy
- Define testing strategy para tu aplicación
- Implementa unit tests con coverage > 80%
- Crea integration tests para APIs críticas
- Configura E2E tests para user journeys principales
Ejercicio 2: CI/CD Testing Pipeline
- Configura pipeline de testing automatizado
- Implementa parallel testing
- Añade performance testing gates
- Configura reporting automático
Ejercicio 3: Test Data Management
- Implementa factories para test data
- Crea database seeding para diferentes escenarios
- Configura test data cleanup
- Documenta test scenarios
🔗 Recursos Adicionales
- Testing Pyramid (Martin Fowler)
- Jest Documentation
- Playwright Testing
- k6 Performance Testing
- pytest Documentation
💡 Recuerda: Los tests no son una pérdida de tiempo, son una inversión en la confiabilidad y velocidad de desarrollo. Buenos tests = deployments confiables.