""" API Security Testing Framework - Authentication & Authorization Author: Security Testing Framework Version: 1.0 """ import json import hashlib import datetime import re import base64 import math from typing import Dict, List, Optional, Any, Union class APISecurityTester: """Test framework for API authentication and authorization vulnerabilities""" def __init__(self): self.vulnerabilities = [] self.auth_test_results = [] def test_authentication_bypass(self, endpoints: List[Dict]) -> Dict[str, Any]: """Test for authentication bypass vulnerabilities""" auth_bypasses = [] for endpoint in endpoints: url = endpoint.get('url', '') method = endpoint.get('method', 'GET') # Test missing authentication bypass_result = self._test_missing_auth(endpoint) if bypass_result['vulnerable']: auth_bypasses.append(bypass_result) # Test weak token validation weak_token_result = self._test_weak_tokens(endpoint) if weak_token_result['vulnerable']: auth_bypasses.append(weak_token_result) # Test hardcoded credentials hardcoded_result = self._test_hardcoded_credentials(endpoint) if hardcoded_result['vulnerable']: auth_bypasses.append(hardcoded_result) return { 'authentication_bypasses': len(auth_bypasses), 'vulnerabilities': auth_bypasses } def test_authorization_flaws(self, endpoints: List[Dict]) -> Dict[str, Any]: """Test for authorization and access control flaws""" auth_flaws = [] for endpoint in endpoints: # Test privilege escalation priv_result = self._test_privilege_escalation(endpoint) if priv_result['vulnerable']: auth_flaws.append(priv_result) # Test direct object reference dor_result = self._test_direct_object_reference(endpoint) if dor_result['vulnerable']: auth_flaws.append(dor_result) # Test endpoint enumeration enum_result = self._test_endpoint_enumeration(endpoint) if enum_result['vulnerable']: auth_flaws.append(enum_result) return { 'authorization_flaws': len(auth_flaws), 'vulnerabilities': auth_flaws } def _test_missing_auth(self, endpoint: Dict) -> Dict[str, Any]: """Test if endpoint works without authentication""" url = endpoint.get('url', '') method = endpoint.get('method', 'GET') # Check for sensitive endpoints sensitive_patterns = [ r'/admin', r'/api/user', r'/api/transfer', r'/api/delete', r'/api/update' ] vulnerable = False for pattern in sensitive_patterns: if re.search(pattern, url, re.IGNORECASE): # Simulate unauthenticated access if endpoint.get('requires_auth', True): vulnerable = True break return { 'endpoint': url, 'method': method, 'vulnerability_type': 'Missing Authentication', 'vulnerable': vulnerable, 'severity': 'Critical' if vulnerable else 'Low', 'description': f'Endpoint {url} may be accessible without authentication', 'recommendation': 'Implement proper authentication middleware' } def _test_weak_tokens(self, endpoint: Dict) -> Dict[str, Any]: """Test for weak JWT token validation""" url = endpoint.get('url', '') method = endpoint.get('method', 'GET') # Simulate weak token scenarios weak_token_scenarios = [ 'none', # No algorithm '', # Empty token '12345', # Simple numeric token 'admin', # Admin as token 'eyJ0eXAiOiJKV1QiLCJhbGciOiJub25lIn0.eyJzdWIiOiIxMjM0NTY3ODkwIn0.' # alg=none ] vulnerable = False for token in weak_token_scenarios: if endpoint.get('token_validation') == 'weak': vulnerable = True break return { 'endpoint': url, 'method': method, 'vulnerability_type': 'Weak Token Validation', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': 'API accepts weak or malformed tokens', 'recommendation': 'Implement proper JWT validation with strong algorithms' } def _test_hardcoded_credentials(self, endpoint: Dict) -> Dict[str, Any]: """Test for hardcoded credentials in API""" url = endpoint.get('url', '') # Common hardcoded credential patterns hardcoded_patterns = [ r'api_key\s*=\s*["\'][^"\']+["\']', r'password\s*=\s*["\'][^"\']+["\']', r'secret\s*=\s*["\'][^"\']+["\']', r'token\s*=\s*["\'][^"\']+["\']' ] vulnerable = False if endpoint.get('has_hardcoded_creds', False): vulnerable = True return { 'endpoint': url, 'vulnerability_type': 'Hardcoded Credentials', 'vulnerable': vulnerable, 'severity': 'Critical' if vulnerable else 'Low', 'description': 'Hardcoded credentials found in endpoint implementation', 'recommendation': 'Remove hardcoded credentials and use environment variables' } def _test_privilege_escalation(self, endpoint: Dict) -> Dict[str, Any]: """Test for privilege escalation vulnerabilities""" url = endpoint.get('url', '') # Test privilege escalation scenarios escalation_payloads = [ {'role': 'admin'}, {'role': 'administrator'}, {'role': 'root'}, {'privileges': 'all'}, {'is_admin': True} ] vulnerable = False if endpoint.get('privilege_escalation_possible', False): vulnerable = True return { 'endpoint': url, 'vulnerability_type': 'Privilege Escalation', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': 'User can escalate privileges through API parameters', 'recommendation': 'Implement proper role-based access control' } def _test_direct_object_reference(self, endpoint: Dict) -> Dict[str, Any]: """Test for insecure direct object references""" url = endpoint.get('url', '') # Test IDOR patterns idor_patterns = [ r'/user/(\d+)', r'/file/(\d+)', r'/order/(\d+)', r'/account/(\d+)' ] vulnerable = False for pattern in idor_patterns: if re.search(pattern, url): if endpoint.get('idor_vulnerable', False): vulnerable = True break return { 'endpoint': url, 'vulnerability_type': 'Insecure Direct Object Reference', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': 'Users can access resources they don\'t own', 'recommendation': 'Implement proper object-level authorization' } def _test_endpoint_enumeration(self, endpoint: Dict) -> Dict[str, Any]: """Test for endpoint enumeration vulnerabilities""" url = endpoint.get('url', '') # Common API endpoints to enumerate enum_endpoints = [ '/api/v1/users', '/api/v1/admin', '/api/v1/config', '/api/v1/secrets', '/api/docs', '/api/swagger' ] vulnerable = False if endpoint.get('enumeration_possible', False): vulnerable = True return { 'endpoint': url, 'vulnerability_type': 'Endpoint Enumeration', 'vulnerable': vulnerable, 'severity': 'Medium' if vulnerable else 'Low', 'description': 'API endpoints can be discovered and accessed', 'recommendation': 'Implement API discovery protection and authentication' } def generate_security_report(self) -> Dict[str, Any]: """Generate API security testing report""" return { 'test_summary': { 'total_endpoints_tested': len(self.auth_test_results), 'vulnerabilities_found': len(self.vulnerabilities), 'test_completion_time': datetime.datetime.now().isoformat() }, 'vulnerability_breakdown': { 'critical': len([v for v in self.vulnerabilities if v.get('severity') == 'Critical']), 'high': len([v for v in self.vulnerabilities if v.get('severity') == 'High']), 'medium': len([v for v in self.vulnerabilities if v.get('severity') == 'Medium']), 'low': len([v for v in self.vulnerabilities if v.get('severity') == 'Low']) }, 'vulnerabilities': self.vulnerabilities, 'recommendations': self._generate_recommendations() } def _generate_recommendations(self) -> List[str]: """Generate security recommendations based on findings""" recommendations = [] vuln_types = [v.get('vulnerability_type', '') for v in self.vulnerabilities] if 'Missing Authentication' in vuln_types: recommendations.append('Implement authentication middleware for all sensitive endpoints') if 'Weak Token Validation' in vuln_types: recommendations.append('Use strong JWT algorithms (RS256, ES256) and proper validation') if 'Hardcoded Credentials' in vuln_types: recommendations.append('Remove hardcoded credentials and use secure key management') if 'Privilege Escalation' in vuln_types: recommendations.append('Implement strict role-based access control with least privilege') if 'Insecure Direct Object Reference' in vuln_types: recommendations.append('Add object-level authorization checks') if 'Endpoint Enumeration' in vuln_types: recommendations.append('Implement API discovery protection and authentication') return recommendations # Sample API endpoints for security testing TEST_ENDPOINTS = [ { 'url': '/api/v1/user/profile', 'method': 'GET', 'requires_auth': True, 'token_validation': 'weak', 'has_hardcoded_creds': False }, { 'url': '/api/v1/admin/users', 'method': 'GET', 'requires_auth': True, 'privilege_escalation_possible': True, 'has_hardcoded_creds': True }, { 'url': '/api/v1/transfer/funds', 'method': 'POST', 'requires_auth': True, 'idor_vulnerable': True }, { 'url': '/api/v1/documents/123', 'method': 'GET', 'requires_auth': True, 'idor_vulnerable': True }, { 'url': '/api/v1/config/settings', 'method': 'GET', 'enumeration_possible': True, 'has_hardcoded_creds': True } ] if __name__ == "__main__": # Run API security tests tester = APISecurityTester() # Test authentication bypasses auth_results = tester.test_authentication_bypass(TEST_ENDPOINTS) tester.vulnerabilities.extend(auth_results['vulnerabilities']) # Test authorization flaws authz_results = tester.test_authorization_flaws(TEST_ENDPOINTS) tester.vulnerabilities.extend(authz_results['vulnerabilities']) # Generate report report = tester.generate_security_report() print("API Security Testing Complete") print(f"Vulnerabilities Found: {len(report['vulnerabilities'])}") print(f"Critical: {report['vulnerability_breakdown']['critical']}") print(f"High: {report['vulnerability_breakdown']['high']}") print(json.dumps(report, indent=2))