""" Rate Limiting and Throttling Security Testing Framework Author: Security Testing Framework Version: 1.0 """ import json import datetime import time import math import re from typing import Dict, List, Optional, Any, Union class RateLimitTester: """Test framework for rate limiting and throttling mechanisms""" def __init__(self): self.vulnerabilities = [] self.test_results = [] def test_rate_limiting(self, endpoints: List[Dict]) -> Dict[str, Any]: """Test rate limiting effectiveness on API endpoints""" rate_limit_issues = [] for endpoint in endpoints: url = endpoint.get('url', '') method = endpoint.get('method', 'GET') # Test basic rate limiting basic_result = self._test_basic_rate_limit(endpoint) rate_limit_issues.append(basic_result) # Test burst handling burst_result = self._test_burst_handling(endpoint) rate_limit_issues.append(burst_result) # Test distributed rate limiting distributed_result = self._test_distributed_rate_limit(endpoint) rate_limit_issues.append(distributed_result) # Test rate limit bypass techniques bypass_result = self._test_rate_limit_bypass(endpoint) rate_limit_issues.append(bypass_result) vulnerable_endpoints = [r for r in rate_limit_issues if r.get('vulnerable', False)] return { 'endpoints_tested': len(endpoints), 'vulnerable_endpoints': len(vulnerable_endpoints), 'vulnerabilities': vulnerable_endpoints, 'all_results': rate_limit_issues } def test_api_throttling(self, endpoints: List[Dict]) -> Dict[str, Any]: """Test API throttling mechanisms""" throttling_issues = [] for endpoint in endpoints: # Test response time degradation time_result = self._test_response_time_degradation(endpoint) throttling_issues.append(time_result) # Test resource-based throttling resource_result = self._test_resource_throttling(endpoint) throttling_issues.append(resource_result) # Test user-based throttling user_result = self._test_user_throttling(endpoint) throttling_issues.append(user_result) vulnerable_endpoints = [r for r in throttling_issues if r.get('vulnerable', False)] return { 'endpoints_tested': len(endpoints), 'vulnerable_endpoints': len(vulnerable_endpoints), 'vulnerabilities': vulnerable_endpoints } def _test_basic_rate_limit(self, endpoint: Dict) -> Dict[str, Any]: """Test basic rate limiting functionality""" url = endpoint.get('url', '') method = endpoint.get('method', 'GET') # Simulate rapid requests request_count = endpoint.get('rate_limit_requests', 100) time_window = endpoint.get('rate_limit_window', 60) # seconds # Check if rate limiting is configured has_rate_limit = endpoint.get('has_rate_limit', False) vulnerable = not has_rate_limit return { 'endpoint': url, 'method': method, 'test_type': 'Basic Rate Limit', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': f'Endpoint allows {request_count} requests without rate limiting', 'recommendation': 'Implement rate limiting (e.g., 100 requests/minute)', 'test_details': { 'requests_tested': request_count, 'time_window': time_window, 'rate_limit_detected': has_rate_limit } } def _test_burst_handling(self, endpoint: Dict) -> Dict[str, Any]: """Test burst request handling""" url = endpoint.get('url', '') # Simulate burst requests burst_size = endpoint.get('burst_size', 50) burst_duration = endpoint.get('burst_duration', 5) # seconds # Check if burst protection exists has_burst_protection = endpoint.get('has_burst_protection', False) vulnerable = not has_burst_protection return { 'endpoint': url, 'test_type': 'Burst Handling', 'vulnerable': vulnerable, 'severity': 'Medium' if vulnerable else 'Low', 'description': f'Endpoint allows burst of {burst_size} requests in {burst_duration}s', 'recommendation': 'Implement burst protection with token bucket algorithm', 'test_details': { 'burst_size': burst_size, 'burst_duration': burst_duration, 'protection_detected': has_burst_protection } } def _test_distributed_rate_limit(self, endpoint: Dict) -> Dict[str, Any]: """Test distributed rate limiting across multiple sources""" url = endpoint.get('url', '') # Simulate requests from different IP addresses ip_count = endpoint.get('distributed_ips', 10) requests_per_ip = endpoint.get('requests_per_ip', 20) # Check if distributed rate limiting exists has_distributed_limit = endpoint.get('has_distributed_limit', False) vulnerable = not has_distributed_limit total_requests = ip_count * requests_per_ip return { 'endpoint': url, 'test_type': 'Distributed Rate Limit', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': f'Endpoint allows {total_requests} total requests from {ip_count} different IPs', 'recommendation': 'Implement distributed rate limiting with IP tracking', 'test_details': { 'ip_count': ip_count, 'requests_per_ip': requests_per_ip, 'total_requests': total_requests, 'distributed_limit_detected': has_distributed_limit } } def _test_rate_limit_bypass(self, endpoint: Dict) -> Dict[str, Any]: """Test common rate limit bypass techniques""" url = endpoint.get('url', '') bypass_techniques = [] vulnerable = False # Test IP rotation bypass if not endpoint.get('ip_rotation_protection', True): bypass_techniques.append('IP Rotation') vulnerable = True # Test user-agent rotation bypass if not endpoint.get('ua_rotation_protection', True): bypass_techniques.append('User-Agent Rotation') vulnerable = True # Test header manipulation bypass if not endpoint.get('header_protection', True): bypass_techniques.append('Header Manipulation') vulnerable = True # Test session token bypass if not endpoint.get('session_protection', True): bypass_techniques.append('Session Token Bypass') vulnerable = True return { 'endpoint': url, 'test_type': 'Rate Limit Bypass', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': f'Rate limit can be bypassed using: {", ".join(bypass_techniques)}' if bypass_techniques else 'No bypass techniques found', 'recommendation': 'Implement robust rate limiting with multiple tracking mechanisms', 'bypass_techniques': bypass_techniques } def _test_response_time_degradation(self, endpoint: Dict) -> Dict[str, Any]: """Test response time degradation under load""" url = endpoint.get('url', '') # Simulate increasing load load_levels = [10, 50, 100, 200, 500] response_times = [] for load in load_levels: # Simulate response time based on load base_time = endpoint.get('base_response_time', 100) # ms degradation_factor = endpoint.get('degradation_factor', 0.5) simulated_time = base_time + (load * degradation_factor) response_times.append(simulated_time) # Check if response time degrades significantly time_increase = response_times[-1] / response_times[0] vulnerable = time_increase > 10 # 10x increase is problematic return { 'endpoint': url, 'test_type': 'Response Time Degradation', 'vulnerable': vulnerable, 'severity': 'Medium' if vulnerable else 'Low', 'description': f'Response time increases {time_increase:.1f}x under load', 'recommendation': 'Implement proper throttling and load balancing', 'test_details': { 'load_levels': load_levels, 'response_times': response_times, 'time_increase_factor': time_increase } } def _test_resource_throttling(self, endpoint: Dict) -> Dict[str, Any]: """Test resource-based throttling""" url = endpoint.get('url', '') # Test resource consumption resource_types = ['CPU', 'Memory', 'Database Connections', 'File Handles'] throttling_issues = [] for resource in resource_types: if not endpoint.get(f'{resource.lower().replace(" ", "_")}_throttling', True): throttling_issues.append(resource) vulnerable = len(throttling_issues) > 0 return { 'endpoint': url, 'test_type': 'Resource Throttling', 'vulnerable': vulnerable, 'severity': 'Medium' if vulnerable else 'Low', 'description': f'No throttling for resources: {", ".join(throttling_issues)}' if throttling_issues else 'Resource throttling properly configured', 'recommendation': 'Implement resource-based throttling to prevent DoS', 'affected_resources': throttling_issues } def _test_user_throttling(self, endpoint: Dict) -> Dict[str, Any]: """Test user-based throttling""" url = endpoint.get('url', '') # Test user-specific rate limits has_user_throttling = endpoint.get('has_user_throttling', False) user_rate_limit = endpoint.get('user_rate_limit', 1000) # requests per hour vulnerable = not has_user_throttling return { 'endpoint': url, 'test_type': 'User Throttling', 'vulnerable': vulnerable, 'severity': 'Medium' if vulnerable else 'Low', 'description': f'No user-specific throttling (limit: {user_rate_limit}/hour)', 'recommendation': 'Implement per-user rate limiting to prevent abuse', 'test_details': { 'user_throttling_detected': has_user_throttling, 'user_rate_limit': user_rate_limit } } def generate_rate_limit_report(self) -> Dict[str, Any]: """Generate comprehensive rate limiting security report""" return { 'test_summary': { 'total_endpoints_tested': len(self.test_results), 'vulnerabilities_found': len(self.vulnerabilities), 'test_completion_time': datetime.datetime.now().isoformat() }, 'vulnerability_breakdown': { 'critical': len([v for v in self.vulnerabilities if v.get('severity') == 'Critical']), 'high': len([v for v in self.vulnerabilities if v.get('severity') == 'High']), 'medium': len([v for v in self.vulnerabilities if v.get('severity') == 'Medium']), 'low': len([v for v in self.vulnerabilities if v.get('severity') == 'Low']) }, 'vulnerabilities': self.vulnerabilities, 'recommendations': self._generate_rate_limit_recommendations() } def _generate_rate_limit_recommendations(self) -> List[str]: """Generate rate limiting security recommendations""" recommendations = [] vuln_types = [v.get('test_type', '') for v in self.vulnerabilities] if 'Basic Rate Limit' in vuln_types: recommendations.append('Implement basic rate limiting using algorithms like token bucket or sliding window') if 'Burst Handling' in vuln_types: recommendations.append('Configure burst protection to handle traffic spikes') if 'Distributed Rate Limit' in vuln_types: recommendations.append('Implement distributed rate limiting with Redis or similar caching layer') if 'Rate Limit Bypass' in vuln_types: recommendations.append('Use multiple identifiers for rate limiting (IP, user ID, API key)') if 'Response Time Degradation' in vuln_types: recommendations.append('Implement automatic throttling based on response times') if 'Resource Throttling' in vuln_types: recommendations.append('Monitor and throttle based on resource consumption') if 'User Throttling' in vuln_types: recommendations.append('Implement per-user rate limiting with different tiers') return recommendations # Sample endpoints for rate limiting testing RATE_LIMIT_ENDPOINTS = [ { 'url': '/api/v1/login', 'method': 'POST', 'has_rate_limit': False, 'rate_limit_requests': 100, 'rate_limit_window': 60 }, { 'url': '/api/v1/search', 'method': 'GET', 'has_rate_limit': True, 'has_burst_protection': False, 'burst_size': 50, 'burst_duration': 5 }, { 'url': '/api/v1/upload', 'method': 'POST', 'has_distributed_limit': False, 'distributed_ips': 10, 'requests_per_ip': 20 }, { 'url': '/api/v1/data/export', 'method': 'GET', 'ip_rotation_protection': False, 'ua_rotation_protection': False, 'header_protection': True }, { 'url': '/api/v1/heavy/computation', 'method': 'POST', 'base_response_time': 100, 'degradation_factor': 2.0, 'cpu_throttling': False, 'memory_throttling': False }, { 'url': '/api/v1/user/profile', 'method': 'GET', 'has_user_throttling': False, 'user_rate_limit': 1000 } ] if __name__ == "__main__": # Run rate limiting tests tester = RateLimitTester() # Test rate limiting rate_limit_results = tester.test_rate_limiting(RATE_LIMIT_ENDPOINTS) tester.vulnerabilities.extend(rate_limit_results['vulnerabilities']) # Test throttling throttling_results = tester.test_api_throttling(RATE_LIMIT_ENDPOINTS) tester.vulnerabilities.extend(throttling_results['vulnerabilities']) # Generate report report = tester.generate_rate_limit_report() print("Rate Limiting Security Testing Complete") print(f"Vulnerabilities Found: {len(report['vulnerabilities'])}") print(f"Critical: {report['vulnerability_breakdown']['critical']}") print(f"High: {report['vulnerability_breakdown']['high']}") print(f"Medium: {report['vulnerability_breakdown']['medium']}") print(json.dumps(report, indent=2))