Automation Crunch ๐Ÿ˜Ž
May 20, 2024

Performance Testing & Load Testing: Benchmarking Cloud Services with Grafana

Posted on May 20, 2024  •  6ย minutes  • 1192ย words

Introduction

Performance testing isn’t optional for production systems. At IBM, we used JMeter, Locust, and Grafana to ensure our microservices could handle peak loads. This article shares practical approaches to load testing cloud infrastructure.

Performance Testing Fundamentals

Types of Testing

Load Testing:      Normal load + expected concurrent users
Stress Testing:    Push system beyond expected capacity
Soak Testing:      Run at expected load for extended period
Spike Testing:     Sudden surge in load
Breakpoint Testing: Gradually increase load until failure

JMeter for Load Testing

Basic Test Plan Configuration

#!/bin/bash
# jmeter-test.sh

# Variables
THREADS=100          # Concurrent users
RAMP_TIME=60         # Ramp up time (seconds)
DURATION=300         # Test duration (seconds)
TARGET_HOST="api.example.com"
TARGET_PORT="443"

# Create test plan
jmeter -n \
  -t load-test.jmx \
  -l results.jtl \
  -j jmeter.log \
  -Jthreads=$THREADS \
  -Jramp_time=$RAMP_TIME \
  -Jduration=$DURATION \
  -Jhostname=$TARGET_HOST \
  -Jport=$TARGET_PORT

JMeter Test Plan Template

# jmeter-config.yaml
---
threadGroup:
  numberOfThreads: 100
  rampUpTime: 60  # seconds
  duration: 300   # seconds

samplers:
  - name: "GET /api/users"
    protocol: "https"
    domain: "api.example.com"
    method: "GET"
    path: "/api/users?page=1&limit=20"
    assertions:
      - responseCode: "200"
      - responseTime:
          max: 1000  # milliseconds

  - name: "POST /api/orders"
    protocol: "https"
    domain: "api.example.com"
    method: "POST"
    path: "/api/orders"
    body: |
      {
        "product_id": 123,
        "quantity": 5
      }
    assertions:
      - responseCode: "201"

timers:
  - type: "Constant"
    delay: 100  # milliseconds between requests

listeners:
  - type: "Simple Data Writer"
    filename: "results.csv"
  - type: "Aggregate Report"
  - type: "Response Time Graph"

Python JMeter Script

# jmeter_runner.py
import subprocess
import time
from datetime import datetime
import json

class JMeterRunner:
    def __init__(self, test_plan, output_dir):
        self.test_plan = test_plan
        self.output_dir = output_dir
        self.results_file = f"{output_dir}/results-{datetime.now().strftime('%Y%m%d-%H%M%S')}.jtl"
    
    def run_test(self, config):
        """Execute JMeter load test"""
        
        cmd = [
            'jmeter',
            '-n',  # Non-GUI mode
            '-t', self.test_plan,
            '-l', self.results_file,
            '-j', f"{self.output_dir}/jmeter.log",
            '-Jthreads=' + str(config['threads']),
            '-Jramp_time=' + str(config['ramp_up']),
            '-Jduration=' + str(config['duration']),
            '-Jhostname=' + config['target_host'],
            '-Jport=' + str(config['target_port']),
        ]
        
        print(f"๐Ÿš€ Starting JMeter test: {self.test_plan}")
        print(f"   Threads: {config['threads']}")
        print(f"   Ramp-up: {config['ramp_up']}s")
        print(f"   Duration: {config['duration']}s")
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"โŒ JMeter failed: {result.stderr}")
            return False
        
        print("โœ… Test completed")
        self.analyze_results()
        return True
    
    def analyze_results(self):
        """Parse and analyze JMeter results"""
        
        metrics = {
            'total_requests': 0,
            'successful': 0,
            'failed': 0,
            'response_times': []
        }
        
        with open(self.results_file, 'r') as f:
            for line in f:
                if line.startswith('timeStamp'):
                    continue
                
                parts = line.strip().split(',')
                if len(parts) >= 5:
                    metrics['total_requests'] += 1
                    success = int(parts[7]) == 1  # Success flag
                    metrics['successful'] += success
                    metrics['failed'] += not success
                    metrics['response_times'].append(int(parts[2]))
        
        # Calculate statistics
        if metrics['response_times']:
            avg_response = sum(metrics['response_times']) / len(metrics['response_times'])
            p95_response = sorted(metrics['response_times'])[int(len(metrics['response_times']) * 0.95)]
            p99_response = sorted(metrics['response_times'])[int(len(metrics['response_times']) * 0.99)]
            
            print("\n๐Ÿ“Š Load Test Results:")
            print(f"   Total Requests: {metrics['total_requests']}")
            print(f"   Successful: {metrics['successful']} ({100*metrics['successful']/metrics['total_requests']:.1f}%)")
            print(f"   Failed: {metrics['failed']}")
            print(f"   Avg Response Time: {avg_response:.0f}ms")
            print(f"   P95 Response Time: {p95_response}ms")
            print(f"   P99 Response Time: {p99_response}ms")
            
            return {
                'success_rate': metrics['successful'] / metrics['total_requests'],
                'avg_response': avg_response,
                'p95_response': p95_response,
                'p99_response': p99_response
            }

Locust for Load Testing

Python-based Load Testing

# locustfile.py
from locust import HttpUser, task, between
import random

class UserBehavior(HttpUser):
    """Define user behavior patterns"""
    
    wait_time = between(1, 3)  # Wait 1-3 seconds between requests
    
    def on_start(self):
        """Initialize user session"""
        self.client.headers = {
            'User-Agent': 'LoadTestClient/1.0',
            'Authorization': f'Bearer {self.get_auth_token()}'
        }
    
    def get_auth_token(self):
        """Get authentication token"""
        response = self.client.post(
            '/api/auth/login',
            json={'username': 'testuser', 'password': 'password'}
        )
        return response.json().get('token')
    
    @task(3)
    def browse_products(self):
        """Browse product catalog (3x more likely)"""
        page = random.randint(1, 10)
        self.client.get(f'/api/products?page={page}&limit=20')
    
    @task(2)
    def view_product(self):
        """View product details (2x more likely)"""
        product_id = random.randint(1, 1000)
        self.client.get(f'/api/products/{product_id}')
    
    @task(1)
    def add_to_cart(self):
        """Add to cart (1x probability)"""
        self.client.post(
            '/api/cart',
            json={
                'product_id': random.randint(1, 1000),
                'quantity': random.randint(1, 5)
            }
        )
    
    @task(1)
    def checkout(self):
        """Proceed to checkout"""
        self.client.post(
            '/api/orders',
            json={'payment_method': 'credit_card'}
        )

Running Locust

#!/bin/bash
# run-locust.sh

locust \
  -f locustfile.py \
  --host=https://api.example.com \
  --users=1000 \
  --spawn-rate=50 \
  --run-time=10m \
  --csv=results \
  --headless

AWS ALB Metrics & CloudWatch

Monitoring ALB Performance

# alb_monitoring.py
import boto3
from datetime import datetime, timedelta

class ALBMonitor:
    def __init__(self, alb_name):
        self.cloudwatch = boto3.client('cloudwatch')
        self.elb = boto3.client('elbv2')
        self.alb_name = alb_name
        self.alb_arn = self._get_alb_arn()
    
    def _get_alb_arn(self):
        """Get ALB ARN by name"""
        response = self.elb.describe_load_balancers(Names=[self.alb_name])
        return response['LoadBalancers'][0]['LoadBalancerArn']
    
    def get_metrics(self, metric_name, start_time, end_time, period=60):
        """Fetch CloudWatch metrics"""
        
        response = self.cloudwatch.get_metric_statistics(
            Namespace='AWS/ApplicationELB',
            MetricName=metric_name,
            Dimensions=[
                {
                    'Name': 'LoadBalancer',
                    'Value': self.alb_arn.split(':')[-1]
                }
            ],
            StartTime=start_time,
            EndTime=end_time,
            Period=period,
            Statistics=['Average', 'Maximum', 'Sum']
        )
        
        return response['Datapoints']
    
    def analyze_load_test(self, duration_minutes=10):
        """Analyze ALB metrics during load test"""
        
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(minutes=duration_minutes)
        
        metrics_to_check = [
            'TargetResponseTime',
            'RequestCount',
            'HTTPCode_Target_5XX_Count',
            'HTTPCode_Target_4XX_Count',
            'UnHealthyHostCount',
            'HealthyHostCount',
            'ActiveConnectionCount'
        ]
        
        results = {}
        
        for metric in metrics_to_check:
            print(f"\n๐Ÿ“Š Analyzing {metric}...")
            data = self.get_metrics(metric, start_time, end_time)
            
            if data:
                values = [d['Average'] or d['Sum'] or d['Maximum'] for d in data]
                results[metric] = {
                    'avg': sum(values) / len(values),
                    'max': max(values),
                    'min': min(values),
                    'data_points': len(values)
                }
                
                print(f"   Avg: {results[metric]['avg']:.2f}")
                print(f"   Max: {results[metric]['max']:.2f}")
                print(f"   Min: {results[metric]['min']:.2f}")
        
        return results
    
    def check_alb_health(self):
        """Check ALB target health"""
        
        response = self.elb.describe_target_health(
            TargetGroupArn=self._get_target_group_arn()
        )
        
        healthy = sum(1 for t in response['TargetHealthDescriptions'] 
                     if t['TargetHealth']['State'] == 'healthy')
        unhealthy = len(response['TargetHealthDescriptions']) - healthy
        
        print(f"\n๐Ÿฅ ALB Health Status:")
        print(f"   Healthy: {healthy}")
        print(f"   Unhealthy: {unhealthy}")
        
        for target in response['TargetHealthDescriptions']:
            if target['TargetHealth']['State'] != 'healthy':
                print(f"   โš ๏ธ  {target['Target']['Id']}: {target['TargetHealth']['Reason']}")

Grafana Dashboards

Prometheus Queries for Load Testing

# prometheus-rules.yaml
groups:
  - name: performance
    rules:
      - alert: HighResponseTime
        expr: histogram_quantile(0.95, http_request_duration_seconds) > 1
        for: 5m
        annotations:
          summary: "High response time detected"
      
      - alert: HighErrorRate
        expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05
        for: 5m
        annotations:
          summary: "Error rate > 5%"

Grafana Dashboard JSON

{
  "dashboard": {
    "title": "Load Test Performance",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [
          {
            "expr": "rate(http_requests_total[1m])"
          }
        ]
      },
      {
        "title": "Response Time (P95)",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, http_request_duration_seconds_bucket)"
          }
        ]
      },
      {
        "title": "Error Rate",
        "targets": [
          {
            "expr": "rate(http_requests_total{status=~\"5..\"}[1m])"
          }
        ]
      },
      {
        "title": "CPU Usage",
        "targets": [
          {
            "expr": "rate(container_cpu_usage_seconds_total[1m])"
          }
        ]
      },
      {
        "title": "Memory Usage",
        "targets": [
          {
            "expr": "container_memory_usage_bytes"
          }
        ]
      }
    ]
  }
}

Network Load Balancer Testing

NLB Performance Analysis

# nlb_performance_test.py
import socket
import time
import threading
from statistics import mean, stdev

class NLBTester:
    def __init__(self, host, port, num_connections=1000):
        self.host = host
        self.port = port
        self.num_connections = num_connections
        self.latencies = []
    
    def test_connection(self, connection_id):
        """Test single connection latency"""
        try:
            start = time.time()
            
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.connect((self.host, self.port))
            
            # Send test data
            sock.sendall(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
            
            # Receive response
            response = sock.recv(1024)
            sock.close()
            
            latency = (time.time() - start) * 1000  # ms
            self.latencies.append(latency)
            
            return True
        except Exception as e:
            print(f"Connection {connection_id} failed: {e}")
            return False
    
    def run_load_test(self):
        """Run parallel connections"""
        threads = []
        
        for i in range(self.num_connections):
            t = threading.Thread(target=self.test_connection, args=(i,))
            threads.append(t)
            t.start()
        
        for t in threads:
            t.join()
        
        # Analyze results
        if self.latencies:
            print("\n๐Ÿ“ˆ NLB Performance Results:")
            print(f"   Successful Connections: {len(self.latencies)}/{self.num_connections}")
            print(f"   Avg Latency: {mean(self.latencies):.2f}ms")
            print(f"   Min Latency: {min(self.latencies):.2f}ms")
            print(f"   Max Latency: {max(self.latencies):.2f}ms")
            if len(self.latencies) > 1:
                print(f"   StdDev: {stdev(self.latencies):.2f}ms")

Performance Testing Best Practices

1. Test in Production-like Environment

# Test Environment Configuration
test_env:
  instance_types:
    - t3.medium  # Same as production
  network:
    - Enhanced networking enabled
  storage:
    - Same EBS volume types
    - Same IOPS configuration

2. Gradual Load Increase

def gradual_load_increase(target_users, duration_minutes=10):
    """Gradually increase load for stability"""
    
    ramp_up_per_minute = target_users / duration_minutes
    
    for minute in range(duration_minutes):
        current_users = int(ramp_up_per_minute * minute)
        print(f"Minute {minute}: {current_users} users")
        time.sleep(60)

3. Monitor Infrastructure During Testing

Conclusion

Performance testing is essential for production readiness. By combining JMeter/Locust load tests with CloudWatch/Grafana monitoring and ALB analytics, you can identify bottlenecks before they impact users.


What tools do you use for performance testing? Share your experience in the comments!

Follow me

You can find me on