Performance Testing & Load Testing: Benchmarking Cloud Services with Grafana
Posted on May 20, 2024 • 6ย minutes • 1192ย words
Introduction
Performance testing isn’t optional for production systems. At IBM, we used JMeter, Locust, and Grafana to ensure our microservices could handle peak loads. This article shares practical approaches to load testing cloud infrastructure.
Performance Testing Fundamentals
Types of Testing
Load Testing: Normal load + expected concurrent users
Stress Testing: Push system beyond expected capacity
Soak Testing: Run at expected load for extended period
Spike Testing: Sudden surge in load
Breakpoint Testing: Gradually increase load until failure
JMeter for Load Testing
Basic Test Plan Configuration
#!/bin/bash
# jmeter-test.sh
# Variables
THREADS=100 # Concurrent users
RAMP_TIME=60 # Ramp up time (seconds)
DURATION=300 # Test duration (seconds)
TARGET_HOST="api.example.com"
TARGET_PORT="443"
# Create test plan
jmeter -n \
-t load-test.jmx \
-l results.jtl \
-j jmeter.log \
-Jthreads=$THREADS \
-Jramp_time=$RAMP_TIME \
-Jduration=$DURATION \
-Jhostname=$TARGET_HOST \
-Jport=$TARGET_PORT
JMeter Test Plan Template
# jmeter-config.yaml
---
threadGroup:
numberOfThreads: 100
rampUpTime: 60 # seconds
duration: 300 # seconds
samplers:
- name: "GET /api/users"
protocol: "https"
domain: "api.example.com"
method: "GET"
path: "/api/users?page=1&limit=20"
assertions:
- responseCode: "200"
- responseTime:
max: 1000 # milliseconds
- name: "POST /api/orders"
protocol: "https"
domain: "api.example.com"
method: "POST"
path: "/api/orders"
body: |
{
"product_id": 123,
"quantity": 5
}
assertions:
- responseCode: "201"
timers:
- type: "Constant"
delay: 100 # milliseconds between requests
listeners:
- type: "Simple Data Writer"
filename: "results.csv"
- type: "Aggregate Report"
- type: "Response Time Graph"
Python JMeter Script
# jmeter_runner.py
import subprocess
import time
from datetime import datetime
import json
class JMeterRunner:
def __init__(self, test_plan, output_dir):
self.test_plan = test_plan
self.output_dir = output_dir
self.results_file = f"{output_dir}/results-{datetime.now().strftime('%Y%m%d-%H%M%S')}.jtl"
def run_test(self, config):
"""Execute JMeter load test"""
cmd = [
'jmeter',
'-n', # Non-GUI mode
'-t', self.test_plan,
'-l', self.results_file,
'-j', f"{self.output_dir}/jmeter.log",
'-Jthreads=' + str(config['threads']),
'-Jramp_time=' + str(config['ramp_up']),
'-Jduration=' + str(config['duration']),
'-Jhostname=' + config['target_host'],
'-Jport=' + str(config['target_port']),
]
print(f"๐ Starting JMeter test: {self.test_plan}")
print(f" Threads: {config['threads']}")
print(f" Ramp-up: {config['ramp_up']}s")
print(f" Duration: {config['duration']}s")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"โ JMeter failed: {result.stderr}")
return False
print("โ
Test completed")
self.analyze_results()
return True
def analyze_results(self):
"""Parse and analyze JMeter results"""
metrics = {
'total_requests': 0,
'successful': 0,
'failed': 0,
'response_times': []
}
with open(self.results_file, 'r') as f:
for line in f:
if line.startswith('timeStamp'):
continue
parts = line.strip().split(',')
if len(parts) >= 5:
metrics['total_requests'] += 1
success = int(parts[7]) == 1 # Success flag
metrics['successful'] += success
metrics['failed'] += not success
metrics['response_times'].append(int(parts[2]))
# Calculate statistics
if metrics['response_times']:
avg_response = sum(metrics['response_times']) / len(metrics['response_times'])
p95_response = sorted(metrics['response_times'])[int(len(metrics['response_times']) * 0.95)]
p99_response = sorted(metrics['response_times'])[int(len(metrics['response_times']) * 0.99)]
print("\n๐ Load Test Results:")
print(f" Total Requests: {metrics['total_requests']}")
print(f" Successful: {metrics['successful']} ({100*metrics['successful']/metrics['total_requests']:.1f}%)")
print(f" Failed: {metrics['failed']}")
print(f" Avg Response Time: {avg_response:.0f}ms")
print(f" P95 Response Time: {p95_response}ms")
print(f" P99 Response Time: {p99_response}ms")
return {
'success_rate': metrics['successful'] / metrics['total_requests'],
'avg_response': avg_response,
'p95_response': p95_response,
'p99_response': p99_response
}
Locust for Load Testing
Python-based Load Testing
# locustfile.py
from locust import HttpUser, task, between
import random
class UserBehavior(HttpUser):
"""Define user behavior patterns"""
wait_time = between(1, 3) # Wait 1-3 seconds between requests
def on_start(self):
"""Initialize user session"""
self.client.headers = {
'User-Agent': 'LoadTestClient/1.0',
'Authorization': f'Bearer {self.get_auth_token()}'
}
def get_auth_token(self):
"""Get authentication token"""
response = self.client.post(
'/api/auth/login',
json={'username': 'testuser', 'password': 'password'}
)
return response.json().get('token')
@task(3)
def browse_products(self):
"""Browse product catalog (3x more likely)"""
page = random.randint(1, 10)
self.client.get(f'/api/products?page={page}&limit=20')
@task(2)
def view_product(self):
"""View product details (2x more likely)"""
product_id = random.randint(1, 1000)
self.client.get(f'/api/products/{product_id}')
@task(1)
def add_to_cart(self):
"""Add to cart (1x probability)"""
self.client.post(
'/api/cart',
json={
'product_id': random.randint(1, 1000),
'quantity': random.randint(1, 5)
}
)
@task(1)
def checkout(self):
"""Proceed to checkout"""
self.client.post(
'/api/orders',
json={'payment_method': 'credit_card'}
)
Running Locust
#!/bin/bash
# run-locust.sh
locust \
-f locustfile.py \
--host=https://api.example.com \
--users=1000 \
--spawn-rate=50 \
--run-time=10m \
--csv=results \
--headless
AWS ALB Metrics & CloudWatch
Monitoring ALB Performance
# alb_monitoring.py
import boto3
from datetime import datetime, timedelta
class ALBMonitor:
def __init__(self, alb_name):
self.cloudwatch = boto3.client('cloudwatch')
self.elb = boto3.client('elbv2')
self.alb_name = alb_name
self.alb_arn = self._get_alb_arn()
def _get_alb_arn(self):
"""Get ALB ARN by name"""
response = self.elb.describe_load_balancers(Names=[self.alb_name])
return response['LoadBalancers'][0]['LoadBalancerArn']
def get_metrics(self, metric_name, start_time, end_time, period=60):
"""Fetch CloudWatch metrics"""
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/ApplicationELB',
MetricName=metric_name,
Dimensions=[
{
'Name': 'LoadBalancer',
'Value': self.alb_arn.split(':')[-1]
}
],
StartTime=start_time,
EndTime=end_time,
Period=period,
Statistics=['Average', 'Maximum', 'Sum']
)
return response['Datapoints']
def analyze_load_test(self, duration_minutes=10):
"""Analyze ALB metrics during load test"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=duration_minutes)
metrics_to_check = [
'TargetResponseTime',
'RequestCount',
'HTTPCode_Target_5XX_Count',
'HTTPCode_Target_4XX_Count',
'UnHealthyHostCount',
'HealthyHostCount',
'ActiveConnectionCount'
]
results = {}
for metric in metrics_to_check:
print(f"\n๐ Analyzing {metric}...")
data = self.get_metrics(metric, start_time, end_time)
if data:
values = [d['Average'] or d['Sum'] or d['Maximum'] for d in data]
results[metric] = {
'avg': sum(values) / len(values),
'max': max(values),
'min': min(values),
'data_points': len(values)
}
print(f" Avg: {results[metric]['avg']:.2f}")
print(f" Max: {results[metric]['max']:.2f}")
print(f" Min: {results[metric]['min']:.2f}")
return results
def check_alb_health(self):
"""Check ALB target health"""
response = self.elb.describe_target_health(
TargetGroupArn=self._get_target_group_arn()
)
healthy = sum(1 for t in response['TargetHealthDescriptions']
if t['TargetHealth']['State'] == 'healthy')
unhealthy = len(response['TargetHealthDescriptions']) - healthy
print(f"\n๐ฅ ALB Health Status:")
print(f" Healthy: {healthy}")
print(f" Unhealthy: {unhealthy}")
for target in response['TargetHealthDescriptions']:
if target['TargetHealth']['State'] != 'healthy':
print(f" โ ๏ธ {target['Target']['Id']}: {target['TargetHealth']['Reason']}")
Grafana Dashboards
Prometheus Queries for Load Testing
# prometheus-rules.yaml
groups:
- name: performance
rules:
- alert: HighResponseTime
expr: histogram_quantile(0.95, http_request_duration_seconds) > 1
for: 5m
annotations:
summary: "High response time detected"
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05
for: 5m
annotations:
summary: "Error rate > 5%"
Grafana Dashboard JSON
{
"dashboard": {
"title": "Load Test Performance",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(http_requests_total[1m])"
}
]
},
{
"title": "Response Time (P95)",
"targets": [
{
"expr": "histogram_quantile(0.95, http_request_duration_seconds_bucket)"
}
]
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(http_requests_total{status=~\"5..\"}[1m])"
}
]
},
{
"title": "CPU Usage",
"targets": [
{
"expr": "rate(container_cpu_usage_seconds_total[1m])"
}
]
},
{
"title": "Memory Usage",
"targets": [
{
"expr": "container_memory_usage_bytes"
}
]
}
]
}
}
Network Load Balancer Testing
NLB Performance Analysis
# nlb_performance_test.py
import socket
import time
import threading
from statistics import mean, stdev
class NLBTester:
def __init__(self, host, port, num_connections=1000):
self.host = host
self.port = port
self.num_connections = num_connections
self.latencies = []
def test_connection(self, connection_id):
"""Test single connection latency"""
try:
start = time.time()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
# Send test data
sock.sendall(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
# Receive response
response = sock.recv(1024)
sock.close()
latency = (time.time() - start) * 1000 # ms
self.latencies.append(latency)
return True
except Exception as e:
print(f"Connection {connection_id} failed: {e}")
return False
def run_load_test(self):
"""Run parallel connections"""
threads = []
for i in range(self.num_connections):
t = threading.Thread(target=self.test_connection, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# Analyze results
if self.latencies:
print("\n๐ NLB Performance Results:")
print(f" Successful Connections: {len(self.latencies)}/{self.num_connections}")
print(f" Avg Latency: {mean(self.latencies):.2f}ms")
print(f" Min Latency: {min(self.latencies):.2f}ms")
print(f" Max Latency: {max(self.latencies):.2f}ms")
if len(self.latencies) > 1:
print(f" StdDev: {stdev(self.latencies):.2f}ms")
Performance Testing Best Practices
1. Test in Production-like Environment
# Test Environment Configuration
test_env:
instance_types:
- t3.medium # Same as production
network:
- Enhanced networking enabled
storage:
- Same EBS volume types
- Same IOPS configuration
2. Gradual Load Increase
def gradual_load_increase(target_users, duration_minutes=10):
"""Gradually increase load for stability"""
ramp_up_per_minute = target_users / duration_minutes
for minute in range(duration_minutes):
current_users = int(ramp_up_per_minute * minute)
print(f"Minute {minute}: {current_users} users")
time.sleep(60)
3. Monitor Infrastructure During Testing
- CPU usage
- Memory consumption
- Disk I/O
- Network throughput
- Database connection pool
- Cache hit rates
Conclusion
Performance testing is essential for production readiness. By combining JMeter/Locust load tests with CloudWatch/Grafana monitoring and ALB analytics, you can identify bottlenecks before they impact users.
What tools do you use for performance testing? Share your experience in the comments!