#!/usr/bin/env python3 """ API Security Testing Framework Tests API endpoints for authentication, authorization, and input validation vulnerabilities Author: Security Engineer Purpose: Comprehensive API security assessment """ import json import math import hashlib import datetime import re import base64 import itertools import collections import html import urllib.parse from typing import Dict, List, Optional, Any, Union class APISecurityTestingFramework: """Comprehensive API security testing tool""" def __init__(self): self.vulnerabilities = [] self.test_results = {} self.assessment_id = hashlib.sha256( datetime.datetime.now().isoformat().encode() ).hexdigest()[:16] def test_authentication_endpoints(self, api_config: Dict) -> Dict: """Test API authentication endpoints for vulnerabilities""" results = { "authentication_tests": [], "vulnerabilities_found": [], "bypass_techniques": [] } # Test 1: Authentication bypass via SQL injection sql_bypass = self._test_auth_sql_injection(api_config.get("login_endpoint", {})) results["authentication_tests"].append(sql_bypass) # Test 2: Brute force protection brute_force = self._test_brute_force_protection(api_config.get("auth_protection", {})) results["authentication_tests"].append(brute_force) # Test 3: Token manipulation token_tests = self._test_token_manipulation(api_config.get("token_config", {})) results["authentication_tests"].append(token_tests) # Test 4: Password reset abuse password_reset = self._test_password_reset_abuse(api_config.get("password_reset", {})) results["authentication_tests"].append(password_reset) # Test 5: Session fixation session_tests = self._test_session_fixation(api_config.get("session_config", {})) results["authentication_tests"].append(session_tests) # Collect vulnerabilities for test in results["authentication_tests"]: if test.get("vulnerable", False): results["vulnerabilities_found"].append({ "type": test.get("type", "authentication"), "severity": test.get("severity", "high"), "description": test.get("description", "Authentication vulnerability"), "endpoint": test.get("endpoint", "/auth/login"), "bypass_method": test.get("bypass_method", "unknown") }) if test.get("bypass_method"): results["bypass_techniques"].append(test["bypass_method"]) return results def test_authorization_controls(self, authz_config: Dict) -> Dict: """Test API authorization controls""" results = { "authorization_tests": [], "vulnerabilities_found": [], "privilege_escalation_vectors": [] } # Test 1: Horizontal privilege escalation horizontal_tests = self._test_horizontal_privilege_escalation(authz_config.get("horizontal", {})) results["authorization_tests"].append(horizontal_tests) # Test 2: Vertical privilege escalation vertical_tests = self._test_vertical_privilege_escalation(authz_config.get("vertical", {})) results["authorization_tests"].append(vertical_tests) # Test 3: IDOR vulnerabilities idor_tests = self._test_idor_vulnerabilities(authz_config.get("idor", {})) results["authorization_tests"].append(idor_tests) # Test 4: Resource enumeration enumeration_tests = self._test_resource_enumeration(authz_config.get("enumeration", {})) results["authorization_tests"].append(enumeration_tests) # Test 5: Parameter pollution pollution_tests = self._test_parameter_pollution(authz_config.get("parameter_pollution", {})) results["authorization_tests"].append(pollution_tests) # Collect vulnerabilities for test in results["authorization_tests"]: if test.get("vulnerable", False): results["vulnerabilities_found"].append({ "type": test.get("type", "authorization"), "severity": test.get("severity", "high"), "description": test.get("description", "Authorization vulnerability"), "endpoint": test.get("endpoint", "/api/resource"), "escalation_type": test.get("escalation_type", "unknown") }) if test.get("escalation_type"): results["privilege_escalation_vectors"].append(test["escalation_type"]) return results def test_input_validation(self, validation_config: Dict) -> Dict: """Test API input validation""" results = { "validation_tests": [], "vulnerabilities_found": [], "injection_vectors": [] } # Test 1: SQL injection sql_tests = self._test_api_sql_injection(validation_config.get("sql_injection", {})) results["validation_tests"].append(sql_tests) # Test 2: NoSQL injection nosql_tests = self._test_nosql_injection(validation_config.get("nosql_injection", {})) results["validation_tests"].append(nosql_tests) # Test 3: Command injection command_tests = self._test_command_injection(validation_config.get("command_injection", {})) results["validation_tests"].append(command_tests) # Test 4: XSS in API responses xss_tests = self._test_api_xss(validation_config.get("xss", {})) results["validation_tests"].append(xss_tests) # Test 5: XML/XXE injection xxe_tests = self._test_xxe_injection(validation_config.get("xxe", {})) results["validation_tests"].append(xxe_tests) # Test 6: Deserialization attacks deserialization_tests = self._test_deserialization_attacks(validation_config.get("deserialization", {})) results["validation_tests"].append(deserialization_tests) # Collect vulnerabilities for test in results["validation_tests"]: if test.get("vulnerable", False): results["vulnerabilities_found"].append({ "type": test.get("type", "input_validation"), "severity": test.get("severity", "high"), "description": test.get("description", "Input validation vulnerability"), "endpoint": test.get("endpoint", "/api/endpoint"), "injection_type": test.get("injection_type", "unknown") }) if test.get("injection_type"): results["injection_vectors"].append(test["injection_type"]) return results def test_rate_limiting(self, rate_limit_config: Dict) -> Dict: """Test API rate limiting and throttling""" results = { "rate_limiting_tests": [], "vulnerabilities_found": [], "bypass_methods": [] } # Test 1: Request flooding flood_test = self._test_request_flooding(rate_limit_config.get("request_flooding", {})) results["rate_limiting_tests"].append(flood_test) # Test 2: Concurrent connections concurrent_test = self._test_concurrent_connections(rate_limit_config.get("concurrent_connections", {})) results["rate_limiting_tests"].append(concurrent_test) # Test 3: Distributed attacks distributed_test = self._test_distributed_attacks(rate_limit_config.get("distributed_attacks", {})) results["rate_limiting_tests"].append(distributed_test) # Test 4: Rate limit bypass bypass_test = self._test_rate_limit_bypass(rate_limit_config.get("bypass_protection", {})) results["rate_limiting_tests"].append(bypass_test) # Collect vulnerabilities for test in results["rate_limiting_tests"]: if test.get("vulnerable", False): results["vulnerabilities_found"].append({ "type": test.get("type", "rate_limiting"), "severity": test.get("severity", "medium"), "description": test.get("description", "Rate limiting vulnerability"), "endpoint": test.get("endpoint", "/api/endpoint") }) if test.get("bypass_method"): results["bypass_methods"].append(test["bypass_method"]) return results def test_data_exposure(self, exposure_config: Dict) -> Dict: """Test for data exposure and information leakage""" results = { "exposure_tests": [], "vulnerabilities_found": [], "sensitive_data_types": [] } # Test 1: Sensitive data in responses sensitive_test = self._test_sensitive_data_exposure(exposure_config.get("sensitive_data", {})) results["exposure_tests"].append(sensitive_test) # Test 2: Error message leakage error_test = self._test_error_message_leakage(exposure_config.get("error_handling", {})) results["exposure_tests"].append(error_test) # Test 3: Debug information debug_test = self._test_debug_information(exposure_config.get("debug_mode", {})) results["exposure_tests"].append(debug_test) # Test 4: API documentation exposure docs_test = self._test_api_documentation_exposure(exposure_config.get("api_docs", {})) results["exposure_tests"].append(docs_test) # Test 5: Backup file exposure backup_test = self._test_backup_file_exposure(exposure_config.get("backup_files", {})) results["exposure_tests"].append(backup_test) # Collect vulnerabilities for test in results["exposure_tests"]: if test.get("vulnerable", False): results["vulnerabilities_found"].append({ "type": test.get("type", "data_exposure"), "severity": test.get("severity", "medium"), "description": test.get("description", "Data exposure vulnerability"), "endpoint": test.get("endpoint", "/api/endpoint"), "data_type": test.get("data_type", "unknown") }) if test.get("data_type"): results["sensitive_data_types"].append(test["data_type"]) return results def test_cors_csp_configuration(self, security_config: Dict) -> Dict: """Test CORS and CSP security configurations""" results = { "security_tests": [], "vulnerabilities_found": [], "misconfigurations": [] } # Test 1: CORS policy cors_test = self._test_cors_policy(security_config.get("cors", {})) results["security_tests"].append(cors_test) # Test 2: CSP headers csp_test = self._test_csp_headers(security_config.get("csp", {})) results["security_tests"].append(csp_test) # Test 3: Security headers headers_test = self._test_security_headers(security_config.get("security_headers", {})) results["security_tests"].append(headers_test) # Test 4: HTTPS enforcement https_test = self._test_https_enforcement(security_config.get("https", {})) results["security_tests"].append(https_test) # Test 5: Cross-site request forgery csrf_test = self._test_csrf_protection(security_config.get("csrf", {})) results["security_tests"].append(csrf_test) # Collect vulnerabilities for test in results["security_tests"]: if test.get("vulnerable", False): results["vulnerabilities_found"].append({ "type": test.get("type", "security_misconfiguration"), "severity": test.get("severity", "medium"), "description": test.get("description", "Security misconfiguration"), "header": test.get("header", "unknown") }) if test.get("misconfiguration"): results["misconfigurations"].append(test["misconfiguration"]) return results def _test_auth_sql_injection(self, login_config: Dict) -> Dict: """Test SQL injection in authentication""" sql_payloads = [ "' OR '1'='1", "admin'--", "' OR 'x'='x", "1' UNION SELECT 'admin','password'--" ] parameterized_queries = login_config.get("parameterized_queries", False) input_validation = login_config.get("input_validation", True) vulnerable = not parameterized_queries or not input_validation return { "type": "authentication_sql_injection", "vulnerable": vulnerable, "severity": "critical" if vulnerable else "none", "description": "SQL injection in login endpoint allows authentication bypass", "endpoint": "/auth/login", "bypass_method": "sql_injection", "payload": sql_payloads[0] if vulnerable else "" } def _test_brute_force_protection(self, auth_protection: Dict) -> Dict: """Test brute force protection""" rate_limiting = auth_protection.get("rate_limiting", False) account_lockout = auth_protection.get("account_lockout", False) ip_blocking = auth_protection.get("ip_blocking", False) vulnerable = not (rate_limiting and account_lockout) return { "type": "brute_force_vulnerability", "vulnerable": vulnerable, "severity": "high" if vulnerable else "none", "description": "No protection against brute force attacks", "endpoint": "/auth/login", "bypass_method": "brute_force" } def _test_token_manipulation(self, token_config: Dict) -> Dict: """Test token manipulation vulnerabilities""" token_encryption = token_config.get("encryption", False) token_signature = token_config.get("signature_verification", True) token_expiry = token_config.get("expiry_check", True) vulnerable = not (token_encryption and token_signature and token_expiry) return { "type": "token_manipulation", "vulnerable": vulnerable, "severity": "high" if vulnerable else "none", "description": "Authentication tokens can be manipulated", "endpoint": "/auth/verify", "bypass_method": "token_manipulation" } def _test_password_reset_abuse(self, password_reset: Dict) -> Dict: """Test password reset abuse""" rate_limiting = password_reset.get("rate_limiting", False) token_validation = password_reset.get("token_validation", True) secure_token = password_reset.get("secure_token_generation", True) vulnerable = not (rate_limiting and token_validation and secure_token) return { "type": "password_reset_abuse", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "Password reset functionality can be abused", "endpoint": "/auth/reset-password", "bypass_method": "password_reset_abuse" } def _test_session_fixation(self, session_config: Dict) -> Dict: """Test session fixation vulnerabilities""" session_regeneration = session_config.get("regeneration_on_login", True) secure_cookies = session_config.get("secure_cookies", False) http_only = session_config.get("http_only", False) vulnerable = not (session_regeneration and secure_cookies and http_only) return { "type": "session_fixation", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "Session fixation vulnerabilities exist", "endpoint": "/auth/login", "bypass_method": "session_fixation" } def _test_horizontal_privilege_escalation(self, horizontal_config: Dict) -> Dict: """Test horizontal privilege escalation""" ownership_validation = horizontal_config.get("ownership_validation", True) user_isolation = horizontal_config.get("user_isolation", True) resource_access_control = horizontal_config.get("resource_access_control", True) vulnerable = not (ownership_validation and user_isolation and resource_access_control) return { "type": "horizontal_privilege_escalation", "vulnerable": vulnerable, "severity": "high" if vulnerable else "none", "description": "Users can access other users' resources", "endpoint": "/api/users/{id}/profile", "escalation_type": "horizontal" } def _test_vertical_privilege_escalation(self, vertical_config: Dict) -> Dict: """Test vertical privilege escalation""" role_validation = vertical_config.get("role_validation", True) permission_checking = vertical_config.get("permission_checking", True) admin_endpoint_protection = vertical_config.get("admin_endpoint_protection", True) vulnerable = not (role_validation and permission_checking and admin_endpoint_protection) return { "type": "vertical_privilege_escalation", "vulnerable": vulnerable, "severity": "critical" if vulnerable else "none", "description": "Regular users can access admin functionality", "endpoint": "/api/admin/users", "escalation_type": "vertical" } def _test_idor_vulnerabilities(self, idor_config: Dict) -> Dict: """Test Insecure Direct Object Reference vulnerabilities""" object_ownership_check = idor_config.get("object_ownership_check", True) id_validation = idor_config.get("id_validation", True) access_control = idor_config.get("access_control", True) vulnerable = not (object_ownership_check and id_validation and access_control) return { "type": "idor", "vulnerable": vulnerable, "severity": "high" if vulnerable else "none", "description": "Insecure direct object reference vulnerabilities", "endpoint": "/api/orders/{id}", "escalation_type": "idor" } def _test_resource_enumeration(self, enumeration_config: Dict) -> Dict: """Test resource enumeration vulnerabilities""" sequential_ids = enumeration_config.get("sequential_ids", True) resource_discovery_protection = enumeration_config.get("resource_discovery_protection", False) access_checking = enumeration_config.get("access_checking", True) vulnerable = sequential_ids and not resource_discovery_protection return { "type": "resource_enumeration", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "Resources can be enumerated", "endpoint": "/api/resources/{id}", "escalation_type": "enumeration" } def _test_parameter_pollution(self, pollution_config: Dict) -> Dict: """Test parameter pollution vulnerabilities""" parameter_validation = pollution_config.get("parameter_validation", True) duplicate_parameter_handling = pollution_config.get("duplicate_parameter_handling", True) input_sanitization = pollution_config.get("input_sanitization", True) vulnerable = not (parameter_validation and duplicate_parameter_handling and input_sanitization) return { "type": "parameter_pollution", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "Parameter pollution vulnerabilities", "endpoint": "/api/endpoint", "escalation_type": "parameter_pollution" } def _test_api_sql_injection(self, sql_config: Dict) -> Dict: """Test SQL injection in API endpoints""" parameterized_queries = sql_config.get("parameterized_queries", False) input_validation = sql_config.get("input_validation", True) orm_usage = sql_config.get("orm_usage", True) vulnerable = not (parameterized_queries and input_validation and orm_usage) return { "type": "sql_injection", "vulnerable": vulnerable, "severity": "critical" if vulnerable else "none", "description": "SQL injection vulnerability in API endpoint", "endpoint": "/api/data/search", "injection_type": "sql_injection" } def _test_nosql_injection(self, nosql_config: Dict) -> Dict: """Test NoSQL injection vulnerabilities""" input_validation = nosql_config.get("input_validation", True) query_sanitization = nosql_config.get("query_sanitization", True) object_deserialization = nosql_config.get("object_deserialization", False) vulnerable = not (input_validation and query_sanitization) or object_deserialization return { "type": "nosql_injection", "vulnerable": vulnerable, "severity": "high" if vulnerable else "none", "description": "NoSQL injection vulnerability", "endpoint": "/api/users/search", "injection_type": "nosql_injection" } def _test_command_injection(self, command_config: Dict) -> Dict: """Test command injection vulnerabilities""" input_validation = command_config.get("input_validation", True) shell_escape = command_config.get("shell_escape", True) whitelist_approach = command_config.get("whitelist_approach", True) vulnerable = not (input_validation and shell_escape and whitelist_approach) return { "type": "command_injection", "vulnerable": vulnerable, "severity": "critical" if vulnerable else "none", "description": "Command injection vulnerability", "endpoint": "/api/system/execute", "injection_type": "command_injection" } def _test_api_xss(self, xss_config: Dict) -> Dict: """Test XSS in API responses""" output_encoding = xss_config.get("output_encoding", True) input_sanitization = xss_config.get("input_sanitization", True) content_type_headers = xss_config.get("content_type_headers", True) vulnerable = not (output_encoding and input_sanitization and content_type_headers) return { "type": "xss", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "XSS vulnerability in API responses", "endpoint": "/api/comments", "injection_type": "xss" } def _test_xxe_injection(self, xxe_config: Dict) -> Dict: """Test XML External Entity injection""" xml_parser_secure = xxe_config.get("xml_parser_secure", True) entity_validation = xxe_config.get("entity_validation", True) external_entity_resolution = xxe_config.get("external_entity_resolution", False) vulnerable = not xml_parser_secure or external_entity_resolution return { "type": "xxe", "vulnerable": vulnerable, "severity": "high" if vulnerable else "none", "description": "XXE injection vulnerability", "endpoint": "/api/xml/upload", "injection_type": "xxe" } def _test_deserialization_attacks(self, deserialization_config: Dict) -> Dict: """Test deserialization attacks""" input_validation = deserialization_config.get("input_validation", True) secure_deserialization = deserialization_config.get("secure_deserialization", True) type_safety = deserialization_config.get("type_safety", True) vulnerable = not (input_validation and secure_deserialization and type_safety) return { "type": "deserialization", "vulnerable": vulnerable, "severity": "critical" if vulnerable else "none", "description": "Insecure deserialization vulnerability", "endpoint": "/api/data/serialize", "injection_type": "deserialization" } def _test_request_flooding(self, flood_config: Dict) -> Dict: """Test request flooding protection""" rate_limiting = flood_config.get("rate_limiting", False) request_threshold = flood_config.get("request_threshold", 100) time_window = flood_config.get("time_window", 60) vulnerable = not rate_limiting or request_threshold > 1000 return { "type": "request_flooding", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "No protection against request flooding", "endpoint": "/api/endpoint", "bypass_method": "high_volume_requests" } def _test_concurrent_connections(self, concurrent_config: Dict) -> Dict: """Test concurrent connection limits""" connection_limiting = concurrent_config.get("connection_limiting", False) max_connections = concurrent_config.get("max_connections", 1000) connection_timeout = concurrent_config.get("connection_timeout", 300) vulnerable = not connection_limiting or max_connections > 500 return { "type": "concurrent_connections", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "No protection against excessive concurrent connections", "endpoint": "/api/endpoint", "bypass_method": "concurrent_connections" } def _test_distributed_attacks(self, distributed_config: Dict) -> Dict: """Test distributed attack protection""" ip_reputation = distributed_config.get("ip_reputation", False) geo_blocking = distributed_config.get("geo_blocking", False) behavior_analysis = distributed_config.get("behavior_analysis", False) vulnerable = not (ip_reputation or geo_blocking or behavior_analysis) return { "type": "distributed_attacks", "vulnerable": vulnerable, "severity": "low" if vulnerable else "none", "description": "No protection against distributed attacks", "endpoint": "/api/endpoint", "bypass_method": "distributed_attack" } def _test_rate_limit_bypass(self, bypass_config: Dict) -> Dict: """Test rate limit bypass protection""" header_validation = bypass_config.get("header_validation", False) ip_validation = bypass_config.get("ip_validation", True) user_identification = bypass_config.get("user_identification", False) vulnerable = not (header_validation and ip_validation and user_identification) return { "type": "rate_limit_bypass", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "Rate limit can be bypassed", "endpoint": "/api/endpoint", "bypass_method": "header_manipulation" } def _test_sensitive_data_exposure(self, sensitive_config: Dict) -> Dict: """Test sensitive data exposure""" data_filtering = sensitive_config.get("data_filtering", True) pii_redaction = sensitive_config.get("pii_redaction", True) credit_card_masking = sensitive_config.get("credit_card_masking", True) vulnerable = not (data_filtering and pii_redaction and credit_card_masking) return { "type": "sensitive_data_exposure", "vulnerable": vulnerable, "severity": "high" if vulnerable else "none", "description": "Sensitive data exposed in API responses", "endpoint": "/api/users/profile", "data_type": "personal_information" } def _test_error_message_leakage(self, error_config: Dict) -> Dict: """Test error message information leakage""" generic_errors = error_config.get("generic_errors", True) stack_trace_suppression = error_config.get("stack_trace_suppression", True) debug_info_suppression = error_config.get("debug_info_suppression", True) vulnerable = not (generic_errors and stack_trace_suppression and debug_info_suppression) return { "type": "error_message_leakage", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "Error messages leak sensitive information", "endpoint": "/api/endpoint", "data_type": "system_information" } def _test_debug_information(self, debug_config: Dict) -> Dict: """Test debug information exposure""" debug_mode = debug_config.get("debug_mode", False) error_details = debug_config.get("error_details", False) system_info = debug_config.get("system_info", False) vulnerable = debug_mode or error_details or system_info return { "type": "debug_information", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "Debug information exposed", "endpoint": "/api/endpoint", "data_type": "debug_info" } def _test_api_documentation_exposure(self, docs_config: Dict) -> Dict: """Test API documentation exposure""" public_docs = docs_config.get("public_docs", True) auth_required = docs_config.get("auth_required", False) internal_docs_protection = docs_config.get("internal_docs_protection", True) vulnerable = public_docs and not auth_required return { "type": "api_documentation_exposure", "vulnerable": vulnerable, "severity": "low" if vulnerable else "none", "description": "API documentation exposed without authentication", "endpoint": "/api/docs", "data_type": "api_documentation" } def _test_backup_file_exposure(self, backup_config: Dict) -> Dict: """Test backup file exposure""" backup_protection = backup_config.get("backup_protection", True) access_restriction = backup_config.get("access_restriction", True) secure_location = backup_config.get("secure_location", True) vulnerable = not (backup_protection and access_restriction and secure_location) return { "type": "backup_file_exposure", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "Backup files exposed", "endpoint": "/api/backup.sql", "data_type": "backup_files" } def _test_cors_policy(self, cors_config: Dict) -> Dict: """Test CORS policy configuration""" strict_origin = cors_config.get("strict_origin", False) credentials_allowed = cors_config.get("credentials_allowed", True) wildcard_origin = cors_config.get("wildcard_origin", True) vulnerable = wildcard_origin and credentials_allowed return { "type": "cors_misconfiguration", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "CORS policy allows any origin with credentials", "header": "Access-Control-Allow-Origin", "misconfiguration": "permissive_cors" } def _test_csp_headers(self, csp_config: Dict) -> Dict: """Test Content Security Policy headers""" csp_enabled = csp_config.get("csp_enabled", False) strict_policy = csp_config.get("strict_policy", False) inline_scripts_blocked = csp_config.get("inline_scripts_blocked", True) vulnerable = not csp_enabled or not strict_policy return { "type": "csp_misconfiguration", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "Content Security Policy missing or too permissive", "header": "Content-Security-Policy", "misconfiguration": "missing_csp" } def _test_security_headers(self, headers_config: Dict) -> Dict: """Test security headers configuration""" hsts_enabled = headers_config.get("hsts_enabled", False) x_frame_options = headers_config.get("x_frame_options", True) x_content_type_options = headers_config.get("x_content_type_options", True) vulnerable = not (hsts_enabled and x_frame_options and x_content_type_options) return { "type": "security_headers_missing", "vulnerable": vulnerable, "severity": "low" if vulnerable else "none", "description": "Important security headers missing", "header": "multiple", "misconfiguration": "missing_security_headers" } def _test_https_enforcement(self, https_config: Dict) -> Dict: """Test HTTPS enforcement""" https_only = https_config.get("https_only", False) hsts_enabled = https_config.get("hsts_enabled", False) mixed_content_blocked = https_config.get("mixed_content_blocked", True) vulnerable = not https_only return { "type": "https_not_enforced", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "HTTPS not properly enforced", "header": "Strict-Transport-Security", "misconfiguration": "missing_https" } def _test_csrf_protection(self, csrf_config: Dict) -> Dict: """Test CSRF protection""" csrf_tokens = csrf_config.get("csrf_tokens", False) same_site_cookies = csrf_config.get("same_site_cookies", True) origin_validation = csrf_config.get("origin_validation", True) vulnerable = not (csrf_tokens or same_site_cookies or origin_validation) return { "type": "csrf_vulnerability", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "CSRF protection missing", "header": "multiple", "misconfiguration": "missing_csrf_protection" } def generate_api_security_report(self, test_results: Dict) -> Dict: """Generate comprehensive API security report""" # Count vulnerabilities total_vulnerabilities = 0 critical_count = 0 high_count = 0 medium_count = 0 for test_category, results in test_results.items(): if "vulnerabilities_found" in results: total_vulnerabilities += len(results["vulnerabilities_found"]) for vuln in results["vulnerabilities_found"]: severity = vuln.get("severity", "low") if severity == "critical": critical_count += 1 elif severity == "high": high_count += 1 elif severity == "medium": medium_count += 1 # Calculate overall risk risk_level = "critical" if critical_count > 0 else "high" if high_count > 3 else "medium" if high_count > 0 or medium_count > 5 else "low" report = { "assessment_id": self.assessment_id, "assessment_date": datetime.datetime.now().isoformat(), "assessment_type": "api_security", "overall_risk_level": risk_level, "vulnerability_summary": { "total_vulnerabilities": total_vulnerabilities, "critical_vulnerabilities": critical_count, "high_vulnerabilities": high_count, "medium_vulnerabilities": medium_count, "vulnerability_categories": list(test_results.keys()) }, "detailed_results": test_results, "remediation_plan": self._generate_api_remediation_plan(test_results), "compliance_impact": self._assess_api_compliance(test_results) } return report def _generate_api_remediation_plan(self, test_results: Dict) -> List[Dict]: """Generate API-specific remediation plan""" priorities = [] for test_category, results in test_results.items(): if "vulnerabilities_found" in results: for vuln in results["vulnerabilities_found"]: priorities.append({ "category": test_category, "vulnerability_type": vuln.get("type", "unknown"), "severity": vuln.get("severity", "medium"), "endpoint": vuln.get("endpoint", "unknown"), "remediation": self._get_api_remediation(vuln.get("type", "unknown")), "timeline": self._get_remediation_timeline(vuln.get("severity", "medium")) }) # Sort by severity severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} priorities.sort(key=lambda x: severity_order.get(x["severity"], 3)) return priorities[:15] # Top 15 priorities def _get_api_remediation(self, vuln_type: str) -> str: """Get API-specific remediation""" remediation_map = { "authentication_sql_injection": "Implement parameterized queries and input validation", "brute_force_vulnerability": "Implement rate limiting, account lockout, and IP blocking", "token_manipulation": "Use signed JWT tokens with proper validation and expiration", "horizontal_privilege_escalation": "Implement proper ownership validation and user isolation", "vertical_privilege_escalation": "Implement role-based access control with permission checking", "idor": "Validate object ownership and implement access controls", "sql_injection": "Use parameterized queries and ORM frameworks", "command_injection": "Avoid system calls with user input or use strict validation", "deserialization": "Use secure deserialization with type safety", "cors_misconfiguration": "Configure CORS with specific allowed origins", "csp_misconfiguration": "Implement strict Content Security Policy", "csrf_vulnerability": "Implement CSRF tokens and SameSite cookies" } return remediation_map.get(vuln_type, "Review and implement appropriate security controls") def _get_remediation_timeline(self, severity: str) -> str: """Get remediation timeline based on severity""" timelines = { "critical": "1-7 days", "high": "1-4 weeks", "medium": "1-3 months", "low": "3-6 months" } return timelines.get(severity, "1-3 months") def _assess_api_compliance(self, test_results: Dict) -> Dict: """Assess compliance impact""" total_vulns = sum(len(results.get("vulnerabilities_found", [])) for results in test_results.values()) critical_count = sum(1 for results in test_results.values() for vuln in results.get("vulnerabilities_found", []) if vuln.get("severity") == "critical") return { "owasp_api_security": "Non-compliant" if total_vulns > 0 else "Compliant", "pci_dss": "Non-compliant" if critical_count > 0 else "Potentially compliant", "gdpr": "Non-compliant" if critical_count > 2 else "Potentially compliant", "soc2": "Non-compliant" if total_vulns > 5 else "Potentially compliant" } def main(): """Main function to demonstrate API security testing""" print("šŸ” API Security Testing Framework") print("=" * 50) tester = APISecurityTestingFramework() # Test configuration (vulnerable for demonstration) api_config = { "login_endpoint": { "parameterized_queries": False, "input_validation": False }, "auth_protection": { "rate_limiting": False, "account_lockout": False, "ip_blocking": False }, "token_config": { "encryption": False, "signature_verification": False, "expiry_check": False }, "password_reset": { "rate_limiting": False, "token_validation": False, "secure_token_generation": False }, "session_config": { "regeneration_on_login": False, "secure_cookies": False, "http_only": False } } authz_config = { "horizontal": { "ownership_validation": False, "user_isolation": False, "resource_access_control": False }, "vertical": { "role_validation": False, "permission_checking": False, "admin_endpoint_protection": False }, "idor": { "object_ownership_check": False, "id_validation": False, "access_control": False }, "enumeration": { "sequential_ids": True, "resource_discovery_protection": False, "access_checking": False }, "parameter_pollution": { "parameter_validation": False, "duplicate_parameter_handling": False, "input_sanitization": False } } validation_config = { "sql_injection": { "parameterized_queries": False, "input_validation": False, "orm_usage": False }, "nosql_injection": { "input_validation": False, "query_sanitization": False, "object_deserialization": True }, "command_injection": { "input_validation": False, "shell_escape": False, "whitelist_approach": False }, "xss": { "output_encoding": False, "input_sanitization": False, "content_type_headers": False }, "xxe": { "xml_parser_secure": False, "entity_validation": False, "external_entity_resolution": True }, "deserialization": { "input_validation": False, "secure_deserialization": False, "type_safety": False } } rate_limit_config = { "request_flooding": { "rate_limiting": False, "request_threshold": 10000, "time_window": 60 }, "concurrent_connections": { "connection_limiting": False, "max_connections": 10000, "connection_timeout": 600 }, "distributed_attacks": { "ip_reputation": False, "geo_blocking": False, "behavior_analysis": False }, "bypass_protection": { "header_validation": False, "ip_validation": False, "user_identification": False } } exposure_config = { "sensitive_data": { "data_filtering": False, "pii_redaction": False, "credit_card_masking": False }, "error_handling": { "generic_errors": False, "stack_trace_suppression": False, "debug_info_suppression": False }, "debug_mode": { "debug_mode": True, "error_details": True, "system_info": True }, "api_docs": { "public_docs": True, "auth_required": False, "internal_docs_protection": False }, "backup_files": { "backup_protection": False, "access_restriction": False, "secure_location": False } } security_config = { "cors": { "strict_origin": False, "credentials_allowed": True, "wildcard_origin": True }, "csp": { "csp_enabled": False, "strict_policy": False, "inline_scripts_blocked": False }, "security_headers": { "hsts_enabled": False, "x_frame_options": False, "x_content_type_options": False }, "https": { "https_only": False, "hsts_enabled": False, "mixed_content_blocked": False }, "csrf": { "csrf_tokens": False, "same_site_cookies": False, "origin_validation": False } } # Run tests print("Running comprehensive API security tests...") # Test 1: Authentication auth_results = tester.test_authentication_endpoints(api_config) print(f"\nšŸ”‘ Authentication Tests: {len(auth_results['vulnerabilities_found'])} vulnerabilities found") # Test 2: Authorization authz_results = tester.test_authorization_controls(authz_config) print(f"šŸ›”ļø Authorization Tests: {len(authz_results['vulnerabilities_found'])} vulnerabilities found") # Test 3: Input validation validation_results = tester.test_input_validation(validation_config) print(f"āœ… Input Validation Tests: {len(validation_results['vulnerabilities_found'])} vulnerabilities found") # Test 4: Rate limiting rate_results = tester.test_rate_limiting(rate_limit_config) print(f"ā±ļø Rate Limiting Tests: {len(rate_results['vulnerabilities_found'])} vulnerabilities found") # Test 5: Data exposure exposure_results = tester.test_data_exposure(exposure_config) print(f"šŸ“Š Data Exposure Tests: {len(exposure_results['vulnerabilities_found'])} vulnerabilities found") # Test 6: CORS/CSP security_results = tester.test_cors_csp_configuration(security_config) print(f"🌐 CORS/CSP Tests: {len(security_results['vulnerabilities_found'])} vulnerabilities found") # Compile results test_results = { "authentication": auth_results, "authorization": authz_results, "input_validation": validation_results, "rate_limiting": rate_results, "data_exposure": exposure_results, "cors_csp": security_results } # Generate report report = tester.generate_api_security_report(test_results) print(f"\nšŸ“Š API SECURITY ASSESSMENT REPORT") print("-" * 40) print(f"Overall Risk Level: {report['overall_risk_level']}") print(f"Total Vulnerabilities: {report['vulnerability_summary']['total_vulnerabilities']}") print(f"Critical Issues: {report['vulnerability_summary']['critical_vulnerabilities']}") print(f"High Priority Issues: {report['vulnerability_summary']['high_vulnerabilities']}") print(f"Medium Priority Issues: {report['vulnerability_summary']['medium_vulnerabilities']}") # Show top critical vulnerabilities critical_vulns = [item for item in report['remediation_plan'] if item['severity'] == 'critical'] if critical_vulns: print(f"\n🚨 CRITICAL VULNERABILITIES") print("-" * 30) for i, vuln in enumerate(critical_vulns[:5], 1): print(f"{i}. [{vuln['endpoint']}] {vuln['remediation']}") print(f" Timeline: {vuln['timeline']}") # Show compliance impact print(f"\nšŸ“‹ COMPLIANCE IMPACT") print("-" * 25) for framework, status in report['compliance_impact'].items(): print(f"{framework}: {status}") # Save report report_filename = f"api_security_report_{tester.assessment_id}.json" with open(report_filename, 'w') as f: json.dump(report, f, indent=2) print(f"\nšŸ“„ Detailed report saved to: {report_filename}") print(f"āš ļø Immediate action required for {len(critical_vulns)} critical vulnerabilities") return report if __name__ == "__main__": api_security_report = main()