#!/usr/bin/env python3 """ Denial of Service Vulnerability Tester Tests for DoS vulnerabilities and resilience capabilities """ import json import base64 import hashlib import datetime import re import math from typing import Dict, List, Optional, Any class DOSVulnerabilityTester: """Test for Denial of Service vulnerabilities""" def __init__(self): self.dos_vectors = [ 'resource_exhaustion', 'slowloris_attack', 'amplification_attack', 'hash_collision_attack', 'regex_dos', 'memory_bomb', 'cpu_intensive', 'connection_exhaustion', 'bandwidth_exhaustion', 'disk_exhaustion' ] self.test_payloads = { 'large_request': 'A' * 1000000, # 1MB payload 'deep_json': '{"a":' * 10000 + 'null' + '}' * 10000, 'complex_regex': 'a' * 10000 + '.*.*.*.*.*.*', 'nested_objects': '{"a":' * 5000 + '{}' + '}' * 5000, 'unicode_bomb': '\U0001F4A9' * 10000, # Bomb emoji repeated 'gzip_bomb': None, # Will be generated 'xml_bomb': None, # Will be generated 'sql_injection_heavy': "SELECT " + ",".join([f"col{i}" for i in range(1000)]) + " FROM table" } self.rate_limits = { 'requests_per_minute': 1000, 'concurrent_connections': 100, 'bandwidth_mbps': 100, 'request_size_kb': 10240 } def assess_dos_resilience(self, application_info: Dict[str, Any]) -> Dict[str, Any]: """Comprehensive DoS resilience assessment""" assessment = { 'timestamp': datetime.datetime.now().isoformat(), 'assessment_type': 'dos_vulnerability_assessment', 'overall_score': 0, 'risk_level': 'low', 'vulnerabilities_found': [], 'mitigations_assessed': [], 'recommendations': [], 'test_results': {} } # Test different DoS vectors dos_tests = [ ('rate_limiting', self._test_rate_limiting), ('resource_limits', self._test_resource_limits), ('input_validation', self._test_input_validation), ('timeout_handling', self._test_timeout_handling), ('memory_management', self._test_memory_management), ('connection_handling', self._test_connection_handling), ('file_upload_limits', self._test_file_upload_limits), ('regex_safety', self._test_regex_safety) ] for test_name, test_function in dos_tests: test_result = test_function(application_info) assessment['test_results'][test_name] = test_result if test_result.get('vulnerable', False): assessment['vulnerabilities_found'].append({ 'test_type': test_name, 'severity': test_result.get('severity', 'medium'), 'description': test_result.get('description', ''), 'impact': test_result.get('impact', '') }) assessment['overall_score'] += test_result.get('score', 0) # Assess existing mitigations assessment['mitigations_assessed'] = self._assess_mitigations(application_info) # Calculate risk level assessment['risk_level'] = self._calculate_risk_level(assessment['overall_score']) # Generate recommendations assessment['recommendations'] = self._generate_recommendations(assessment) # Normalize score (0-100) assessment['overall_score'] = min(100, max(0, assessment['overall_score'])) return assessment def _test_rate_limiting(self, app_info: Dict[str, Any]) -> Dict[str, Any]: """Test rate limiting effectiveness""" rate_limits = app_info.get('rate_limits', {}) requests_per_minute = rate_limits.get('requests_per_minute', 0) connection_limits = rate_limits.get('concurrent_connections', 0) result = { 'vulnerable': False, 'severity': 'high', 'score': 20, 'description': '', 'findings': [] } if requests_per_minute == 0: result['vulnerable'] = True result['score'] = -20 result['description'] = "No request rate limiting implemented" result['findings'].append("Application vulnerable to request flooding") elif requests_per_minute > 10000: result['score'] = 5 result['description'] = "Rate limit too permissive" result['findings'].append("Consider reducing rate limit threshold") else: result['score'] = 15 result['description'] = "Rate limiting implemented" result['findings'].append("Adequate rate limiting in place") if connection_limits == 0: result['vulnerable'] = True result['score'] -= 10 result['findings'].append("No connection limits implemented") elif connection_limits > 1000: result['score'] = 5 result['findings'].append("Connection limit might be too high") return result def _test_resource_limits(self, app_info: Dict[str, Any]) -> Dict[str, Any]: """Test resource consumption limits""" resources = app_info.get('resource_limits', {}) max_memory = resources.get('max_memory_mb', 0) max_cpu_time = resources.get('max_cpu_time_seconds', 0) max_file_size = resources.get('max_file_size_mb', 0) result = { 'vulnerable': False, 'severity': 'high', 'score': 20, 'description': '', 'findings': [] } if max_memory == 0: result['vulnerable'] = True result['score'] = -15 result['description'] = "No memory limits configured" result['findings'].append("Application vulnerable to memory exhaustion") elif max_memory > 1024: # > 1GB result['score'] = 5 result['description'] = "Memory limit too generous" result['findings'].append("Consider reducing memory allocation") else: result['score'] = 10 result['findings'].append("Memory limits configured") if max_cpu_time == 0: result['vulnerable'] = True result['score'] -= 10 result['findings'].append("No CPU time limits configured") elif max_cpu_time > 30: result['score'] = 5 result['findings'].append("CPU time limit might be too high") else: result['score'] = 10 result['findings'].append("CPU time limits configured") if max_file_size == 0: result['vulnerable'] = True result['score'] -= 10 result['findings'].append("No file upload size limits") elif max_file_size > 100: # > 100MB result['score'] = 5 result['findings'].append("File upload limit might be too high") else: result['score'] = 10 result['findings'].append("File upload limits configured") return result def _test_input_validation(self, app_info: Dict[str, Any]) -> Dict[str, Any]: """Test input validation against DoS""" validation = app_info.get('input_validation', {}) max_request_size = validation.get('max_request_size_mb', 0) max_field_length = validation.get('max_field_length', 0) json_depth_limit = validation.get('json_depth_limit', 0) result = { 'vulnerable': False, 'severity': 'medium', 'score': 15, 'description': '', 'findings': [] } if max_request_size == 0: result['vulnerable'] = True result['score'] = -10 result['description'] = "No request size limits" result['findings'].append("Vulnerable to large request attacks") elif max_request_size > 50: result['score'] = 5 result['description'] = "Request size limit too high" else: result['score'] = 10 result['findings'].append("Request size limits configured") if max_field_length == 0: result['vulnerable'] = True result['score'] -= 5 result['findings'].append("No field length limits") else: result['score'] = 5 result['findings'].append("Field length limits configured") if json_depth_limit == 0: result['vulnerable'] = True result['score'] -= 5 result['findings'].append("No JSON depth limits - vulnerable to nested object attacks") else: result['score'] = 5 result['findings'].append("JSON depth limits configured") return result def _test_timeout_handling(self, app_info: Dict[str, Any]) -> Dict[str, Any]: """Test timeout configuration""" timeouts = app_info.get('timeouts', {}) request_timeout = timeouts.get('request_timeout_seconds', 0) connection_timeout = timeouts.get('connection_timeout_seconds', 0) read_timeout = timeouts.get('read_timeout_seconds', 0) result = { 'vulnerable': False, 'severity': 'high', 'score': 15, 'description': '', 'findings': [] } if request_timeout == 0: result['vulnerable'] = True result['score'] = -15 result['description'] = "No request timeout configured" result['findings'].append("Vulnerable to slowloris attacks") elif request_timeout > 120: result['score'] = 5 result['description'] = "Request timeout too long" else: result['score'] = 10 result['findings'].append("Request timeout configured") if connection_timeout == 0: result['vulnerable'] = True result['score'] -= 10 result['findings'].append("No connection timeout configured") else: result['score'] = 5 result['findings'].append("Connection timeout configured") if read_timeout == 0: result['vulnerable'] = True result['score'] -= 5 result['findings'].append("No read timeout configured") else: result['score'] = 5 result['findings'].append("Read timeout configured") return result def _test_memory_management(self, app_info: Dict[str, Any]) -> Dict[str, Any]: """Test memory management against DoS""" memory_config = app_info.get('memory_management', {}) streaming_enabled = memory_config.get('streaming_enabled', False) buffering_limited = memory_config.get('buffering_limited', False) garbage_collection = memory_config.get('garbage_collection_optimized', False) result = { 'vulnerable': False, 'severity': 'medium', 'score': 10, 'description': '', 'findings': [] } if not streaming_enabled: result['vulnerable'] = True result['score'] = -10 result['description'] = "Streaming not enabled for large data processing" result['findings'].append("Vulnerable to memory exhaustion from large payloads") else: result['score'] = 5 result['findings'].append("Streaming enabled for large data") if not buffering_limited: result['vulnerable'] = True result['score'] = -5 result['findings'].append("Buffer size not limited") else: result['score'] = 3 result['findings'].append("Buffer sizes limited") if garbage_collection: result['score'] = 2 result['findings'].append("Garbage collection optimized") return result def _test_connection_handling(self, app_info: Dict[str, Any]) -> Dict[str, Any]: """Test connection handling limits""" connections = app_info.get('connection_handling', {}) keep_alive_enabled = connections.get('keep_alive_enabled', False) max_connections = connections.get('max_connections', 0) connection_timeout = connections.get('connection_timeout_seconds', 0) result = { 'vulnerable': False, 'severity': 'high', 'score': 15, 'description': '', 'findings': [] } if max_connections == 0: result['vulnerable'] = True result['score'] = -15 result['description'] = "No maximum connection limit configured" result['findings'].append("Vulnerable to connection exhaustion attacks") elif max_connections > 10000: result['score'] = 5 result['description'] = "Connection limit might be too high" else: result['score'] = 10 result['findings'].append("Connection limits configured") if keep_alive_enabled and connection_timeout > 30: result['vulnerable'] = True result['score'] = -5 result['findings'].append("Keep-alive timeout too long - vulnerable to slowloris") elif keep_alive_enabled: result['score'] = 5 result['findings'].append("Keep-alive with reasonable timeout") else: result['score'] = 5 result['findings'].append("Keep-alive disabled or properly configured") return result def _test_file_upload_limits(self, app_info: Dict[str, Any]) -> Dict[str, Any]: """Test file upload DoS protection""" file_uploads = app_info.get('file_uploads', {}) total_size_limit = file_uploads.get('total_size_limit_mb', 0) concurrent_uploads = file_uploads.get('concurrent_uploads_limit', 0) storage_quota = file_uploads.get('storage_quota_gb', 0) result = { 'vulnerable': False, 'severity': 'medium', 'score': 10, 'description': '', 'findings': [] } if total_size_limit == 0: result['vulnerable'] = True result['score'] = -10 result['description'] = "No total upload size limit" result['findings'].append("Vulnerable to disk exhaustion from uploads") else: result['score'] = 5 result['findings'].append("Total upload size limited") if concurrent_uploads == 0: result['vulnerable'] = True result['score'] = -5 result['findings'].append("No concurrent upload limits") else: result['score'] = 3 result['findings'].append("Concurrent uploads limited") if storage_quota == 0: result['vulnerable'] = True result['score'] = -5 result['findings'].append("No storage quota configured") else: result['score'] = 2 result['findings'].append("Storage quota configured") return result def _test_regex_safety(self, app_info: Dict[str, Any]) -> Dict[str, Any]: """Test regex against ReDoS attacks""" regex_config = app_info.get('regex_config', {}) safe_regex_engine = regex_config.get('safe_engine', False) timeout_enabled = regex_config.get('timeout_enabled', False) complexity_limited = regex_config.get('complexity_limited', False) result = { 'vulnerable': False, 'severity': 'medium', 'score': 5, 'description': '', 'findings': [] } if not safe_regex_engine: result['vulnerable'] = True result['score'] = -5 result['description'] = "Regex engine vulnerable to ReDoS" result['findings'].append("Vulnerable to regex denial of service attacks") else: result['score'] = 2 result['findings'].append("Safe regex engine used") if timeout_enabled: result['score'] = 2 result['findings'].append("Regex timeout configured") if complexity_limited: result['score'] = 1 result['findings'].append("Regex complexity limited") return result def _assess_mitigations(self, app_info: Dict[str, Any]) -> List[Dict[str, Any]]: """Assess existing DoS mitigations""" mitigations = [] # Check for CDN usage if app_info.get('cdn_enabled', False): mitigations.append({ 'type': 'cdn', 'effectiveness': 'high', 'description': 'CDN can absorb DDoS attacks' }) # Check for load balancer if app_info.get('load_balancer', False): mitigations.append({ 'type': 'load_balancer', 'effectiveness': 'medium', 'description': 'Load balancer distributes traffic' }) # Check for firewall if app_info.get('firewall', False): mitigations.append({ 'type': 'firewall', 'effectiveness': 'medium', 'description': 'Firewall can block malicious traffic' }) # Check for caching if app_info.get('caching_enabled', False): mitigations.append({ 'type': 'caching', 'effectiveness': 'low', 'description': 'Caching reduces server load' }) return mitigations def _calculate_risk_level(self, score: int) -> str: """Calculate DoS risk level""" if score >= 80: return 'low' elif score >= 60: return 'medium' elif score >= 40: return 'high' else: return 'critical' def _generate_recommendations(self, assessment: Dict[str, Any]) -> List[str]: """Generate DoS mitigation recommendations""" recommendations = [] vulnerabilities = assessment.get('vulnerabilities_found', []) vulnerability_types = set(vuln['test_type'] for vuln in vulnerabilities) if 'rate_limiting' in vulnerability_types: recommendations.extend([ "Implement request rate limiting per IP and per user", "Set reasonable connection limits", "Use progressive rate limiting (gradual increase in restrictions)", "Implement IP reputation-based filtering" ]) if 'resource_limits' in vulnerability_types: recommendations.extend([ "Configure memory limits for application processes", "Set CPU time limits for requests", "Implement file upload size restrictions", "Use containerization to limit resource usage" ]) if 'input_validation' in vulnerability_types: recommendations.extend([ "Set maximum request size limits", "Limit input field lengths", "Configure JSON depth and complexity limits", "Validate and sanitize all user input" ]) if 'timeout_handling' in vulnerability_types: recommendations.extend([ "Configure request timeouts (ideally under 30 seconds)", "Set connection and read timeouts", "Implement progressive timeouts", "Use circuit breakers for slow operations" ]) if 'memory_management' in vulnerability_types: recommendations.extend([ "Enable streaming for large data processing", "Limit buffer sizes", "Optimize garbage collection", "Use memory-efficient algorithms" ]) if 'connection_handling' in vulnerability_types: recommendations.extend([ "Set maximum connection limits", "Configure keep-alive timeouts", "Use connection pooling", "Implement connection rate limiting" ]) if 'file_upload_limits' in vulnerability_types: recommendations.extend([ "Set total upload size limits per session", "Limit concurrent file uploads", "Configure storage quotas per user", "Implement file type and content validation" ]) if 'regex_safety' in vulnerability_types: recommendations.extend([ "Use ReDoS-safe regex engines", "Implement regex execution timeouts", "Limit regex complexity", "Pre-validate regex patterns for safety" ]) # General recommendations recommendations.extend([ "Deploy behind a CDN for DDoS protection", "Use web application firewalls (WAF)", "Implement proper load balancing", "Enable application caching", "Set up monitoring and alerting for resource usage", "Conduct regular DoS testing", "Create incident response plans for DoS attacks", "Consider cloud-based DDoS protection services" ]) return list(set(recommendations)) # Remove duplicates def generate_dos_test_scenarios(self) -> List[Dict[str, Any]]: """Generate test scenarios for DoS testing""" scenarios = [ { 'name': 'Large Request Payload', 'description': 'Test handling of extremely large HTTP requests', 'method': 'POST', 'payload_type': 'large_request', 'expected_behavior': 'Request should be rejected or size-limited', 'risk_level': 'medium' }, { 'name': 'Slowloris Attack Simulation', 'description': 'Test slow HTTP header sending', 'method': 'GET', 'payload_type': 'slow_headers', 'expected_behavior': 'Connection should timeout', 'risk_level': 'high' }, { 'name': 'JSON Depth Bomb', 'description': 'Test deeply nested JSON objects', 'method': 'POST', 'payload_type': 'deep_json', 'expected_behavior': 'Should reject overly complex JSON', 'risk_level': 'medium' }, { 'name': 'Regex Denial of Service', 'description': 'Test complex regex patterns', 'method': 'POST', 'payload_type': 'complex_regex', 'expected_behavior': 'Should timeout or reject quickly', 'risk_level': 'medium' }, { 'name': 'Connection Flooding', 'description': 'Test maximum connection handling', 'method': 'CONNECTION_FLOOD', 'payload_type': 'connection_flood', 'expected_behavior': 'Should limit concurrent connections', 'risk_level': 'high' }, { 'name': 'File Upload Bomb', 'description': 'Test large file upload handling', 'method': 'POST', 'payload_type': 'file_upload', 'expected_behavior': 'Should limit file size and count', 'risk_level': 'medium' } ] return scenarios def simulate_dos_attack(self, attack_type: str, target_config: Dict[str, Any]) -> Dict[str, Any]: """Simulate a DoS attack for testing purposes""" simulation = { 'attack_type': attack_type, 'timestamp': datetime.datetime.now().isoformat(), 'target_config': target_config, 'simulation_results': {}, 'vulnerability_detected': False, 'impact_assessment': {} } # Simulate different attack types if attack_type == 'large_request': max_size = target_config.get('max_request_size_mb', 10) * 1024 * 1024 attack_size = max_size * 2 # Double the limit simulation['simulation_results'] = { 'attack_payload_size': attack_size, 'max_allowed_size': max_size, 'should_be_blocked': attack_size > max_size, 'simulation_blocked': target_config.get('size_validation_enabled', False) } simulation['vulnerability_detected'] = not simulation['simulation_results']['simulation_blocked'] elif attack_type == 'connection_flood': max_connections = target_config.get('max_connections', 100) attack_connections = max_connections * 2 simulation['simulation_results'] = { 'attack_connections': attack_connections, 'max_allowed_connections': max_connections, 'should_be_limited': attack_connections > max_connections, 'simulation_limited': target_config.get('connection_limiting_enabled', False) } simulation['vulnerability_detected'] = not simulation['simulation_results']['simulation_limited'] elif attack_type == 'slowloris': timeout = target_config.get('connection_timeout_seconds', 30) simulation['simulation_results'] = { 'attack_method': 'slow_header_sending', 'timeout_threshold': timeout, 'should_timeout': True, 'timeout_configured': timeout > 0 } simulation['vulnerability_detected'] = not simulation['simulation_results']['timeout_configured'] # Generate impact assessment if simulation['vulnerability_detected']: simulation['impact_assessment'] = { 'service_availability': 'high_risk', 'resource_consumption': 'high', 'user_experience': 'severely_degraded', 'recovery_time': 'manual_intervention_required' } else: simulation['impact_assessment'] = { 'service_availability': 'protected', 'resource_consumption': 'controlled', 'user_experience': 'minimal_impact', 'recovery_time': 'automatic' } return simulation def main(): """Test the DoS vulnerability tester""" tester = DOSVulnerabilityTester() # Sample application configuration app_config = { 'rate_limits': { 'requests_per_minute': 5000, 'concurrent_connections': 1500 }, 'resource_limits': { 'max_memory_mb': 2048, 'max_cpu_time_seconds': 60, 'max_file_size_mb': 200 }, 'input_validation': { 'max_request_size_mb': 100, 'max_field_length': 10000, 'json_depth_limit': 0 # Not configured }, 'timeouts': { 'request_timeout_seconds': 180, # Too long 'connection_timeout_seconds': 60, 'read_timeout_seconds': 45 }, 'memory_management': { 'streaming_enabled': False, # Not configured 'buffering_limited': True, 'garbage_collection_optimized': True }, 'connection_handling': { 'keep_alive_enabled': True, 'max_connections': 1500, 'connection_timeout_seconds': 60 }, 'file_uploads': { 'total_size_limit_mb': 500, 'concurrent_uploads_limit': 0, # Not configured 'storage_quota_gb': 50 }, 'regex_config': { 'safe_engine': False, # Vulnerable 'timeout_enabled': False, 'complexity_limited': False }, 'cdn_enabled': False, 'load_balancer': True, 'firewall': False, 'caching_enabled': True } # Run DoS resilience assessment assessment = tester.assess_dos_resilience(app_config) # Generate test scenarios test_scenarios = tester.generate_dos_test_scenarios() # Simulate some attacks attack_simulations = [ tester.simulate_dos_attack('large_request', app_config), tester.simulate_dos_attack('connection_flood', app_config), tester.simulate_dos_attack('slowloris', app_config) ] results = { 'assessment_completed': True, 'dos_assessment': assessment, 'test_scenarios': test_scenarios, 'attack_simulations': attack_simulations, 'overall_summary': { 'overall_score': assessment['overall_score'], 'risk_level': assessment['risk_level'], 'vulnerabilities_count': len(assessment['vulnerabilities_found']), 'mitigations_count': len(assessment['mitigations_assessed']), 'test_scenarios_count': len(test_scenarios) } } print("DoS Vulnerability Assessment Results:") print("=" * 50) print(f"Overall Score: {assessment['overall_score']}") print(f"Risk Level: {assessment['risk_level']}") print(f"Vulnerabilities Found: {len(assessment['vulnerabilities_found'])}") print(f"Mitigations Assessed: {len(assessment['mitigations_assessed'])}") print(f"Test Scenarios Generated: {len(test_scenarios)}") print(f"\nVulnerabilities:") for vuln in assessment['vulnerabilities_found']: print(f" - {vuln['severity']}: {vuln['description']}") print(f"\nTop Recommendations (first 5):") for rec in assessment['recommendations'][:5]: print(f" - {rec}") return { 'test_completed': True, 'results': results } if __name__ == "__main__": main()