""" CORS and CSP Security Testing Framework Author: Security Testing Framework Version: 1.0 """ import json import datetime import re from typing import Dict, List, Optional, Any, Union class CORSCSPTester: """Test framework for CORS and CSP security configurations""" def __init__(self): self.vulnerabilities = [] self.test_results = [] def test_cors_configuration(self, endpoints: List[Dict]) -> Dict[str, Any]: """Test CORS configuration for security misconfigurations""" cors_issues = [] for endpoint in endpoints: url = endpoint.get('url', '') headers = endpoint.get('headers', {}) # Test for wildcard origin wildcard_result = self._test_wildcard_origin(url, headers) cors_issues.append(wildcard_result) # Test for trusted origin validation trusted_result = self._test_trusted_origin_validation(url, headers) cors_issues.append(trusted_result) # Test for credential exposure credential_result = self._test_credential_exposure(url, headers) cors_issues.append(credential_result) # Test for method validation method_result = self._test_method_validation(url, headers) cors_issues.append(method_result) # Test for header validation header_result = self._test_header_validation(url, headers) cors_issues.append(header_result) vulnerable_endpoints = [r for r in cors_issues if r.get('vulnerable', False)] return { 'endpoints_tested': len(endpoints), 'vulnerable_endpoints': len(vulnerable_endpoints), 'vulnerabilities': vulnerable_endpoints, 'all_results': cors_issues } def test_csp_configuration(self, endpoints: List[Dict]) -> Dict[str, Any]: """Test CSP configuration for security bypasses""" csp_issues = [] for endpoint in endpoints: url = endpoint.get('url', '') headers = endpoint.get('headers', {}) html_content = endpoint.get('html_content', '') # Test for CSP header presence header_result = self._test_csp_header_presence(url, headers) csp_issues.append(header_result) # Test for unsafe inline scripts inline_result = self._test_unsafe_inline_scripts(url, headers) csp_issues.append(inline_result) # Test for unsafe eval eval_result = self._test_unsafe_eval(url, headers) csp_issues.append(eval_result) # Test for data URLs data_result = self._test_data_urls(url, headers) csp_issues.append(data_result) # Test for wildcard sources wildcard_result = self._test_wildcard_sources(url, headers) csp_issues.append(wildcard_result) # Test for CSP bypass techniques bypass_result = self._test_csp_bypass_techniques(url, headers, html_content) csp_issues.append(bypass_result) vulnerable_endpoints = [r for r in csp_issues if r.get('vulnerable', False)] return { 'endpoints_tested': len(endpoints), 'vulnerable_endpoints': len(vulnerable_endpoints), 'vulnerabilities': vulnerable_endpoints } def _test_wildcard_origin(self, url: str, headers: Dict) -> Dict[str, Any]: """Test for wildcard Access-Control-Allow-Origin""" acao_header = headers.get('Access-Control-Allow-Origin', '') vulnerable = acao_header == '*' or acao_header == 'null' return { 'endpoint': url, 'test_type': 'Wildcard CORS Origin', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': f'Allows any origin: {acao_header}' if vulnerable else 'Origin properly restricted', 'recommendation': 'Specify trusted origins instead of wildcard', 'header_value': acao_header } def _test_trusted_origin_validation(self, url: str, headers: Dict) -> Dict[str, Any]: """Test for proper trusted origin validation""" acao_header = headers.get('Access-Control-Allow-Origin', '') acac_header = headers.get('Access-Control-Allow-Credentials', '') # Check if credentials are allowed with non-exact origin matching vulnerable = False if acac_header.lower() == 'true' and acao_header != '*': # Check for partial or regex-based origin matching if '*' in acao_header or '.' in acao_header: vulnerable = True return { 'endpoint': url, 'test_type': 'Trusted Origin Validation', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': 'Credentials allowed with non-exact origin matching' if vulnerable else 'Origin validation properly configured', 'recommendation': 'Use exact origin matching when credentials are allowed', 'headers': { 'Access-Control-Allow-Origin': acao_header, 'Access-Control-Allow-Credentials': acac_header } } def _test_credential_exposure(self, url: str, headers: Dict) -> Dict[str, Any]: """Test for credential exposure through CORS""" acao_header = headers.get('Access-Control-Allow-Origin', '') acac_header = headers.get('Access-Control-Allow-Credentials', '') acah_header = headers.get('Access-Control-Allow-Headers', '') vulnerable = False issues = [] # Check if credentials are exposed to all origins if acao_header == '*' and acac_header.lower() == 'true': vulnerable = True issues.append('Credentials exposed to all origins') # Check if sensitive headers are exposed sensitive_headers = ['authorization', 'cookie', 'set-cookie'] for sensitive_header in sensitive_headers: if sensitive_header in acah_header.lower(): issues.append(f'Sensitive header exposed: {sensitive_header}') vulnerable = True return { 'endpoint': url, 'test_type': 'Credential Exposure', 'vulnerable': vulnerable, 'severity': 'Critical' if vulnerable else 'Low', 'description': f'Credential exposure issues: {", ".join(issues)}' if issues else 'No credential exposure detected', 'recommendation': 'Restrict credential exposure to trusted origins only', 'issues': issues } def _test_method_validation(self, url: str, headers: Dict) -> Dict[str, Any]: """Test for proper HTTP method validation in CORS""" acam_header = headers.get('Access-Control-Allow-Methods', '') # Check if dangerous methods are allowed dangerous_methods = ['DELETE', 'PUT', 'PATCH'] allowed_methods = [m.strip().upper() for m in acam_header.split(',')] exposed_dangerous_methods = [m for m in dangerous_methods if m in allowed_methods] vulnerable = len(exposed_dangerous_methods) > 0 return { 'endpoint': url, 'test_type': 'CORS Method Validation', 'vulnerable': vulnerable, 'severity': 'Medium' if vulnerable else 'Low', 'description': f'Dangerous methods allowed: {", ".join(exposed_dangerous_methods)}' if exposed_dangerous_methods else 'Methods properly restricted', 'recommendation': 'Only allow necessary HTTP methods via CORS', 'allowed_methods': allowed_methods, 'dangerous_methods': exposed_dangerous_methods } def _test_header_validation(self, url: str, headers: Dict) -> Dict[str, Any]: """Test for proper header validation in CORS""" acah_header = headers.get('Access-Control-Allow-Headers', '') # Check if overly permissive header wildcard is used vulnerable = acah_header == '*' or '*' in acah_header # Check for sensitive headers sensitive_headers = ['authorization', 'cookie', 'x-api-key'] exposed_sensitive = [h for h in sensitive_headers if h in acah_header.lower()] if not vulnerable and exposed_sensitive: vulnerable = True return { 'endpoint': url, 'test_type': 'CORS Header Validation', 'vulnerable': vulnerable, 'severity': 'Medium' if vulnerable else 'Low', 'description': f'Overly permissive headers or sensitive headers exposed' if vulnerable else 'Headers properly restricted', 'recommendation': 'Specify exact allowed headers instead of wildcards', 'allowed_headers': acah_header, 'sensitive_headers': exposed_sensitive } def _test_csp_header_presence(self, url: str, headers: Dict) -> Dict[str, Any]: """Test for CSP header presence""" csp_header = headers.get('Content-Security-Policy', '') csp_report_only = headers.get('Content-Security-Policy-Report-Only', '') has_csp = bool(csp_header) has_report_only = bool(csp_report_only) vulnerable = not has_csp if has_report_only and not has_csp: vulnerable = True # Report-only doesn't enforce policies return { 'endpoint': url, 'test_type': 'CSP Header Presence', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': 'No CSP header found' if vulnerable else 'CSP header properly configured', 'recommendation': 'Implement strong Content-Security-Policy header', 'has_csp': has_csp, 'has_report_only': has_report_only, 'csp_value': csp_header[:200] + '...' if len(csp_header) > 200 else csp_header } def _test_unsafe_inline_scripts(self, url: str, headers: Dict) -> Dict[str, Any]: """Test for unsafe inline scripts in CSP""" csp_header = headers.get('Content-Security-Policy', '') unsafe_directives = [ 'unsafe-inline', 'unsafe-hashes', 'unsafe-eval' ] found_unsafe = [] for directive in unsafe_directives: if directive in csp_header: found_unsafe.append(directive) vulnerable = len(found_unsafe) > 0 return { 'endpoint': url, 'test_type': 'Unsafe Inline Scripts', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': f'Unsafe directives found: {", ".join(found_unsafe)}' if found_unsafe else 'No unsafe directives detected', 'recommendation': 'Remove unsafe-* directives and use nonces or hashes', 'unsafe_directives': found_unsafe } def _test_unsafe_eval(self, url: str, headers: Dict) -> Dict[str, Any]: """Test for unsafe eval in CSP""" csp_header = headers.get('Content-Security-Policy', '') vulnerable = 'unsafe-eval' in csp_header return { 'endpoint': url, 'test_type': 'Unsafe Eval', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': 'unsafe-eval allowed in CSP' if vulnerable else 'eval properly blocked', 'recommendation': 'Remove unsafe-eval from CSP policy', 'has_unsafe_eval': vulnerable } def _test_data_urls(self, url: str, headers: Dict) -> Dict[str, Any]: """Test for data URLs in CSP""" csp_header = headers.get('Content-Security-Policy', '') vulnerable = 'data:' in csp_header return { 'endpoint': url, 'test_type': 'Data URLs', 'vulnerable': vulnerable, 'severity': 'Medium' if vulnerable else 'Low', 'description': 'data: URLs allowed in CSP' if vulnerable else 'data: URLs properly blocked', 'recommendation': 'Remove data: from CSP unless absolutely necessary', 'allows_data_urls': vulnerable } def _test_wildcard_sources(self, url: str, headers: Dict) -> Dict[str, Any]: """Test for wildcard sources in CSP""" csp_header = headers.get('Content-Security-Policy', '') # Parse CSP to find sources with wildcards wildcard_sources = [] # Common CSP directives directives = ['default-src', 'script-src', 'style-src', 'img-src', 'font-src', 'connect-src'] for directive in directives: if directive in csp_header: # Extract the value after the directive pattern = f'{directive}\\s+([^;]*)' match = re.search(pattern, csp_header) if match: sources = match.group(1).strip() if '*' in sources: wildcard_sources.append(f'{directive}: *') vulnerable = len(wildcard_sources) > 0 return { 'endpoint': url, 'test_type': 'Wildcard Sources', 'vulnerable': vulnerable, 'severity': 'Medium' if vulnerable else 'Low', 'description': f'Wildcard sources found: {", ".join(wildcard_sources)}' if wildcard_sources else 'No wildcard sources detected', 'recommendation': 'Replace wildcard sources with specific trusted domains', 'wildcard_sources': wildcard_sources } def _test_csp_bypass_techniques(self, url: str, headers: Dict, html_content: str) -> Dict[str, Any]: """Test for common CSP bypass techniques""" csp_header = headers.get('Content-Security-Policy', '') bypass_techniques = [] # Check for potential JSONP bypass if 'script-src' in csp_header and 'self' not in csp_header: bypass_techniques.append('JSONP endpoint injection') # Check for CSS expression bypass if 'style-src' in csp_header and 'unsafe-inline' in csp_header: bypass_techniques.append('CSS expression injection') # Check for iframe sandbox bypass if 'frame-src' in csp_header or 'child-src' in csp_header: bypass_techniques.append('iframe sandbox bypass') # Check for meta tag bypass if html_content and ' 0 return { 'endpoint': url, 'test_type': 'CSP Bypass Techniques', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': f'Potential bypass techniques: {", ".join(bypass_techniques)}' if bypass_techniques else 'No obvious bypass vectors detected', 'recommendation': 'Review CSP policy for potential bypass vectors', 'bypass_techniques': bypass_techniques } def generate_cors_csp_report(self) -> Dict[str, Any]: """Generate comprehensive CORS and CSP security report""" return { 'test_summary': { 'total_tests': len(self.test_results), 'vulnerabilities_found': len(self.vulnerabilities), 'test_completion_time': datetime.datetime.now().isoformat() }, 'vulnerability_breakdown': { 'critical': len([v for v in self.vulnerabilities if v.get('severity') == 'Critical']), 'high': len([v for v in self.vulnerabilities if v.get('severity') == 'High']), 'medium': len([v for v in self.vulnerabilities if v.get('severity') == 'Medium']), 'low': len([v for v in self.vulnerabilities if v.get('severity') == 'Low']) }, 'vulnerabilities': self.vulnerabilities, 'recommendations': self._generate_cors_csp_recommendations() } def _generate_cors_csp_recommendations(self) -> List[str]: """Generate CORS and CSP security recommendations""" recommendations = [] vuln_types = [v.get('test_type', '') for v in self.vulnerabilities] if 'Wildcard CORS Origin' in vuln_types: recommendations.append('Configure CORS to allow only specific trusted origins') if 'Trusted Origin Validation' in vuln_types: recommendations.append('Use exact origin matching when allowing credentials') if 'Credential Exposure' in vuln_types: recommendations.append('Restrict Access-Control-Allow-Credentials to trusted origins only') if 'CORS Method Validation' in vuln_types: recommendations.append('Limit Access-Control-Allow-Methods to only necessary HTTP methods') if 'CORS Header Validation' in vuln_types: recommendations.append('Specify exact allowed headers instead of using wildcards') if 'CSP Header Presence' in vuln_types: recommendations.append('Implement Content-Security-Policy header on all endpoints') if 'Unsafe Inline Scripts' in vuln_types: recommendations.append('Replace unsafe-inline with nonce or hash-based CSP') if 'Unsafe Eval' in vuln_types: recommendations.append('Remove unsafe-eval from CSP policy') if 'Data URLs' in vuln_types: recommendations.append('Remove data: URLs from CSP unless absolutely necessary') if 'Wildcard Sources' in vuln_types: recommendations.append('Replace wildcard sources with specific trusted domains') if 'CSP Bypass Techniques' in vuln_types: recommendations.append('Review CSP policy for potential bypass vectors and implement strict policies') return recommendations # Sample test data for CORS and CSP CORS_CSP_ENDPOINTS = [ { 'url': '/api/v1/data', 'headers': { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true', 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', 'Access-Control-Allow-Headers': '*' } }, { 'url': '/api/v1/users', 'headers': { 'Access-Control-Allow-Origin': 'https://trusted-site.com', 'Access-Control-Allow-Credentials': 'true', 'Access-Control-Allow-Methods': 'GET, POST', 'Access-Control-Allow-Headers': 'Content-Type, Authorization' } }, { 'url': '/admin/dashboard', 'headers': { 'Content-Security-Policy': 'default-src *; script-src * unsafe-inline; style-src * unsafe-inline', 'Access-Control-Allow-Origin': 'null' } }, { 'url': '/public/home', 'headers': { 'Content-Security-Policy': 'default-src \'self\'; script-src \'self\' unsafe-eval; style-src \'self\' unsafe-inline', 'Access-Control-Allow-Origin': 'https://example.com' }, 'html_content': '' } ] if __name__ == "__main__": # Run CORS and CSP tests tester = CORSCSPTester() # Test CORS configuration cors_results = tester.test_cors_configuration(CORS_CSP_ENDPOINTS) tester.vulnerabilities.extend(cors_results['vulnerabilities']) # Test CSP configuration csp_results = tester.test_csp_configuration(CORS_CSP_ENDPOINTS) tester.vulnerabilities.extend(csp_results['vulnerabilities']) # Generate report report = tester.generate_cors_csp_report() print("CORS and CSP Security Testing Complete") print(f"Vulnerabilities Found: {len(report['vulnerabilities'])}") print(f"Critical: {report['vulnerability_breakdown']['critical']}") print(f"High: {report['vulnerability_breakdown']['high']}") print(json.dumps(report, indent=2))