import json import math import re import hashlib import datetime import base64 import html from typing import Dict, List, Optional, Any, Union class AuthorizationControlTesting: """ Authorization control testing across user roles Tests privilege escalation possibilities and validates access controls """ def __init__(self): self.vulnerabilities = [] self.test_results = {} self.user_roles = ["admin", "user", "guest", "moderator", "developer"] def test_role_based_access_control(self, rbac_config: Dict) -> Dict: """Test role-based access control (RBAC) implementation""" results = { "role_hierarchy_violations": [], "broken_access_rules": [], "privilege_escalation_vectors": [] } # Define expected access matrix expected_access = { "admin": ["read", "write", "delete", "manage_users", "system_config"], "moderator": ["read", "write", "moderate_content"], "developer": ["read", "write", "deploy", "debug"], "user": ["read", "write_own"], "guest": ["read"] } actual_access = rbac_config.get("role_permissions", {}) # Check for privilege escalation for role, permissions in actual_access.items(): if role not in expected_access: results["broken_access_rules"].append(f"Unexpected role: {role}") continue expected_perms = set(expected_access[role]) actual_perms = set(permissions) # Check for over-privileged roles extra_perms = actual_perms - expected_perms if extra_perms: results["privilege_escalation_vectors"].append({ "role": role, "extra_permissions": list(extra_perms), "risk": "high" if "manage_users" in extra_perms else "medium" }) # Check for under-privileged roles missing_perms = expected_perms - actual_perms if missing_perms: results["broken_access_rules"].append({ "role": role, "missing_permissions": list(missing_perms) }) return results def test_horizontal_privilege_escalation(self, user_config: Dict) -> Dict: """Test horizontal privilege escalation between users of same role""" results = { "vulnerabilities": [], "test_cases": [] } # Simulate user scenarios test_scenarios = [ { "attacker": {"id": 1, "role": "user", "username": "user1"}, "target": {"id": 2, "role": "user", "username": "user2"}, "resource": "/api/users/2/profile" }, { "attacker": {"id": 3, "role": "moderator", "username": "mod1"}, "target": {"id": 4, "role": "moderator", "username": "mod2"}, "resource": "/api/moderators/4/settings" } ] for scenario in test_scenarios: attacker = scenario["attacker"] target = scenario["target"] resource = scenario["resource"] # Test if attacker can access target's resources can_access = self._test_user_resource_access(attacker, target, resource, user_config) test_result = { "attacker": attacker["username"], "target": target["username"], "resource": resource, "access_granted": can_access } results["test_cases"].append(test_result) if can_access: results["vulnerabilities"].append({ "type": "horizontal_privilege_escalation", "description": f"{attacker['username']} can access {target['username']}'s resources", "severity": "high" }) return results def test_vertical_privilege_escalation(self, access_config: Dict) -> Dict: """Test vertical privilege escalation from lower to higher privilege levels""" results = { "escalation_attempts": [], "vulnerabilities": [], "risk_level": "low" } # Test scenarios: lower role trying to access higher role resources escalation_tests = [ {"from": "guest", "to": "user", "resource": "/api/user/profile"}, {"from": "user", "to": "moderator", "resource": "/api/moderate/content"}, {"from": "user", "to": "admin", "resource": "/api/admin/dashboard"}, {"from": "moderator", "to": "admin", "resource": "/api/admin/users"}, {"from": "developer", "to": "admin", "resource": "/api/admin/system"} ] for test in escalation_tests: from_role = test["from"] to_role = test["to"] resource = test["resource"] # Simulate access attempt can_access = self._test_role_resource_access(from_role, resource, access_config) attempt_result = { "from_role": from_role, "to_role": to_role, "resource": resource, "escalation_successful": can_access, "severity": self._calculate_severity(from_role, to_role) } results["escalation_attempts"].append(attempt_result) if can_access: results["vulnerabilities"].append({ "type": "vertical_privilege_escalation", "from_role": from_role, "to_role": to_role, "resource": resource, "severity": attempt_result["severity"] }) # Calculate overall risk level if any(v["severity"] == "critical" for v in results["vulnerabilities"]): results["risk_level"] = "critical" elif any(v["severity"] == "high" for v in results["vulnerabilities"]): results["risk_level"] = "high" elif len(results["vulnerabilities"]) > 0: results["risk_level"] = "medium" return results def test_insecure_direct_object_references(self, idor_config: Dict) -> Dict: """Test for Insecure Direct Object References (IDOR) vulnerabilities""" results = { "idor_vulnerabilities": [], "tested_endpoints": [], "risk_assessment": "low" } # Common IDOR-prone endpoints test_endpoints = [ "/api/user/{id}/profile", "/api/documents/{doc_id}", "/api/orders/{order_id}", "/api/messages/{message_id}", "/api/invoices/{invoice_id}" ] for endpoint_template in test_endpoints: # Test with different user IDs test_ids = [1, 2, 100, 999, "admin", "self", "../../etc/passwd"] for test_id in test_ids: endpoint = endpoint_template.replace("{id}", str(test_id)).replace("{doc_id}", str(test_id)).replace("{order_id}", str(test_id)).replace("{message_id}", str(test_id)).replace("{invoice_id}", str(test_id)) # Simulate access test is_vulnerable = self._test_idor_endpoint(endpoint, test_id, idor_config) if is_vulnerable: results["idor_vulnerabilities"].append({ "endpoint": endpoint, "tested_id": test_id, "vulnerability_type": "insecure_object_reference", "severity": "high" }) results["tested_endpoints"].append(endpoint_template) # Calculate risk assessment if len(results["idor_vulnerabilities"]) > 3: results["risk_assessment"] = "high" elif len(results["idor_vulnerabilities"]) > 0: results["risk_assessment"] = "medium" return results def _test_user_resource_access(self, attacker: Dict, target: Dict, resource: str, config: Dict) -> bool: """Test if attacker can access target's resources""" # Check if proper user ownership validation exists ownership_validation = config.get("validate_user_ownership", True) if not ownership_validation: return True # Vulnerable # Check resource access rules resource_rules = config.get("resource_access_rules", {}) resource_type = self._extract_resource_type(resource) if resource_type == "profile": # Profiles should only be accessible by owner or admin return not config.get("strict_profile_access", True) return False def _test_role_resource_access(self, role: str, resource: str, config: Dict) -> bool: """Test if a role can access a specific resource""" role_permissions = config.get("role_permissions", {}).get(role, []) required_permission = self._get_required_permission(resource) # Check if role bypasses authorization authorization_bypass = config.get("authorization_bypass_enabled", False) if authorization_bypass: return True # Check proper permission checking return required_permission in role_permissions def _test_idor_endpoint(self, endpoint: str, test_id: Any, config: Dict) -> bool: """Test endpoint for IDOR vulnerability""" # Check if proper object ownership validation is implemented object_validation = config.get("object_ownership_validation", False) if not object_validation: # Check for sequential ID exposure if isinstance(test_id, int) and (test_id == 1 or test_id == 999): return True # Check for path traversal if isinstance(test_id, str) and "../" in test_id: return config.get("allow_path_traversal", True) return False def _extract_resource_type(self, resource: str) -> str: """Extract resource type from endpoint""" if "profile" in resource: return "profile" elif "settings" in resource: return "settings" elif "content" in resource: return "content" else: return "general" def _get_required_permission(self, resource: str) -> str: """Get required permission for resource""" if "/api/user/" in resource: return "user_access" elif "/api/moderate/" in resource: return "moderate_access" elif "/api/admin/" in resource: return "admin_access" else: return "read" def _calculate_severity(self, from_role: str, to_role: str) -> str: """Calculate severity based on role hierarchy difference""" role_hierarchy = { "guest": 0, "user": 1, "moderator": 2, "developer": 3, "admin": 4 } from_level = role_hierarchy.get(from_role, 0) to_level = role_hierarchy.get(to_role, 0) difference = to_level - from_level if difference >= 3: return "critical" elif difference >= 2: return "high" elif difference >= 1: return "medium" else: return "low" def generate_authorization_report(self) -> Dict: """Generate comprehensive authorization security report""" report = { "assessment_date": datetime.datetime.now().isoformat(), "assessment_type": "authorization_security", "vulnerabilities": self.vulnerabilities, "test_results": self.test_results, "risk_summary": self._calculate_risk_summary(), "recommendations": self._generate_recommendations() } return report def _calculate_risk_summary(self) -> Dict: """Calculate overall risk summary""" total_vulnerabilities = len(self.vulnerabilities) critical_count = len([v for v in self.vulnerabilities if v.get("severity") == "critical"]) high_count = len([v for v in self.vulnerabilities if v.get("severity") == "high"]) if critical_count > 0: risk_level = "critical" elif high_count > 2: risk_level = "high" elif total_vulnerabilities > 0: risk_level = "medium" else: risk_level = "low" return { "risk_level": risk_level, "vulnerability_count": total_vulnerabilities, "critical_vulnerabilities": critical_count, "high_vulnerabilities": high_count } def _generate_recommendations(self) -> List[str]: """Generate security recommendations based on findings""" recommendations = [] # Analyze vulnerabilities and generate targeted recommendations vuln_types = [v.get("type", "") for v in self.vulnerabilities] if "privilege_escalation" in vuln_types: recommendations.append("Implement strict role-based access controls with proper permission validation") if "horizontal_privilege_escalation" in vuln_types: recommendations.append("Add user ownership validation for all user-specific resources") if "vertical_privilege_escalation" in vuln_types: recommendations.append("Implement proper authorization checks before resource access") if "insecure_object_reference" in vuln_types: recommendations.append("Validate object ownership and implement indirect object references") if not recommendations: recommendations.append("Authorization controls appear secure - continue regular security reviews") return recommendations def demo_authorization_testing(): """Demonstration of authorization control testing""" tester = AuthorizationControlTesting() # Test 1: Role-based access control rbac_config = { "role_permissions": { "admin": ["read", "write", "delete", "manage_users", "system_config"], "moderator": ["read", "write", "moderate_content", "manage_users"], # Over-privileged "developer": ["read", "write", "deploy", "debug", "system_config"], # Over-privileged "user": ["read", "write_own"], "guest": ["read"] } } rbac_results = tester.test_role_based_access_control(rbac_config) tester.test_results["rbac_testing"] = rbac_results # Test 2: Horizontal privilege escalation user_config = { "validate_user_ownership": False, # Vulnerable "strict_profile_access": False } horizontal_results = tester.test_horizontal_privilege_escalation(user_config) tester.test_results["horizontal_privilege_escalation"] = horizontal_results # Test 3: Vertical privilege escalation access_config = { "authorization_bypass_enabled": True, # Vulnerable "role_permissions": { "user": ["read"], "moderator": ["read", "write"], "admin": ["read", "write", "delete", "manage_users"] } } vertical_results = tester.test_vertical_privilege_escalation(access_config) tester.test_results["vertical_privilege_escalation"] = vertical_results # Test 4: IDOR vulnerabilities idor_config = { "object_ownership_validation": False, # Vulnerable "allow_path_traversal": True } idor_results = tester.test_insecure_direct_object_references(idor_config) tester.test_results["idor_testing"] = idor_results # Collect vulnerabilities if len(rbac_results["privilege_escalation_vectors"]) > 0: for vector in rbac_results["privilege_escalation_vectors"]: tester.vulnerabilities.append({ "type": "privilege_escalation", "severity": vector.get("risk", "medium"), "description": f"Role {vector['role']} has excessive permissions: {vector['extra_permissions']}" }) if len(horizontal_results["vulnerabilities"]) > 0: tester.vulnerabilities.extend(horizontal_results["vulnerabilities"]) if len(vertical_results["vulnerabilities"]) > 0: tester.vulnerabilities.extend(vertical_results["vulnerabilities"]) if len(idor_results["idor_vulnerabilities"]) > 0: tester.vulnerabilities.extend(idor_results["idor_vulnerabilities"]) return tester.generate_authorization_report() if __name__ == "__main__": # Run demonstration authz_report = demo_authorization_testing() print("Authorization Control Testing Results:") print(json.dumps(authz_report, indent=2))