""" Data Exposure and Information Leakage Security Testing Framework Author: Security Testing Framework Version: 1.0 """ import json import datetime import re import hashlib import base64 import math from typing import Dict, List, Optional, Any, Union class DataExposureTester: """Test framework for data exposure and information leakage vulnerabilities""" def __init__(self): self.vulnerabilities = [] self.test_results = [] # Sensitive data patterns self.sensitive_patterns = { 'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'phone': r'\b(?:\+?1[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})\b', 'ssn': r'\b\d{3}-\d{2}-\d{4}\b', 'credit_card': r'\b(?:\d[ -]*?){13,16}\b', 'api_key': r'(?:api[_-]?key|apikey)[\'"\s]*[:=][\'"\s]*[A-Za-z0-9_\-]{16,}', 'password': r'(?:password|pwd)[\'"\s]*[:=][\'"\s]*[^\s\'"]{6,}', 'token': r'(?:token|jwt)[\'"\s]*[:=][\'"\s]*[A-Za-z0-9._-]{20,}', 'ip_address': r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', 'private_key': r'-----BEGIN [A-Z]+ PRIVATE KEY-----', 'database_url': r'(?:database|db)[\'"\s]*[:=][\'"\s]*[^\s\'"]+', 'aws_key': r'AKIA[0-9A-Z]{16}' } # File extension patterns for sensitive files self.sensitive_extensions = [ '.env', '.pem', '.key', '.p12', '.pfx', '.bak', '.backup', '.old', '.tmp', '.sql', '.dump', '.log' ] def test_data_exposure(self, endpoints: List[Dict], responses: List[Dict]) -> Dict[str, Any]: """Test for data exposure in API responses""" exposure_issues = [] for i, endpoint in enumerate(endpoints): url = endpoint.get('url', '') method = endpoint.get('method', 'GET') # Get corresponding response response = responses[i] if i < len(responses) else {} response_data = response.get('data', '') response_headers = response.get('headers', {}) # Test for sensitive data in response body body_result = self._test_response_body_data(url, response_data) exposure_issues.append(body_result) # Test for sensitive data in headers header_result = self._test_response_headers_data(url, response_headers) exposure_issues.append(header_result) # Test for information leakage in error messages error_result = self._test_error_message_leakage(endpoint, response) exposure_issues.append(error_result) # Test for debug information exposure debug_result = self._test_debug_information(endpoint, response) exposure_issues.append(debug_result) vulnerable_endpoints = [r for r in exposure_issues if r.get('vulnerable', False)] return { 'endpoints_tested': len(endpoints), 'vulnerable_endpoints': len(vulnerable_endpoints), 'vulnerabilities': vulnerable_endpoints, 'all_results': exposure_issues } def test_information_leakage(self, application_data: Dict) -> Dict[str, Any]: """Test for various types of information leakage""" leakage_issues = [] # Test for configuration leakage config_result = self._test_configuration_leakage(application_data) leakage_issues.append(config_result) # Test for source code leakage source_result = self._test_source_code_leakage(application_data) leakage_issues.append(source_result) # Test for backup file exposure backup_result = self._test_backup_file_exposure(application_data) leakage_issues.append(backup_result) # Test for directory listing dir_result = self._test_directory_listing(application_data) leakage_issues.append(dir_result) # Test for technology stack leakage tech_result = self._test_technology_leakage(application_data) leakage_issues.append(tech_result) vulnerable_issues = [r for r in leakage_issues if r.get('vulnerable', False)] return { 'total_tests': len(leakage_issues), 'vulnerabilities_found': len(vulnerable_issues), 'vulnerabilities': vulnerable_issues } def _test_response_body_data(self, url: str, response_data: str) -> Dict[str, Any]: """Test response body for sensitive data exposure""" found_patterns = {} for pattern_name, pattern in self.sensitive_patterns.items(): matches = re.findall(pattern, response_data, re.IGNORECASE) if matches: found_patterns[pattern_name] = len(matches) vulnerable = len(found_patterns) > 0 return { 'endpoint': url, 'test_type': 'Response Body Data Exposure', 'vulnerable': vulnerable, 'severity': self._calculate_severity(found_patterns), 'description': f'Sensitive data found: {", ".join(found_patterns.keys())}' if found_patterns else 'No sensitive data detected', 'recommendation': 'Remove sensitive data from API responses', 'found_patterns': found_patterns, 'data_sample': self._sanitize_sample(response_data[:200]) if vulnerable else None } def _test_response_headers_data(self, url: str, headers: Dict) -> Dict[str, Any]: """Test response headers for information leakage""" sensitive_headers = [] # Check for headers that might leak information leaky_headers = { 'server': 'Server version information', 'x-powered-by': 'Technology stack information', 'x-aspnet-version': 'ASP.NET version', 'x-generator': 'Generator information', 'x-debug-token': 'Debug information', 'set-cookie': 'Session/configuration data' } for header, description in leaky_headers.items(): if header.lower() in [h.lower() for h in headers.keys()]: sensitive_headers.append({ 'header': header, 'value': headers.get(header, ''), 'description': description }) vulnerable = len(sensitive_headers) > 0 return { 'endpoint': url, 'test_type': 'Response Header Information Leakage', 'vulnerable': vulnerable, 'severity': 'Medium' if vulnerable else 'Low', 'description': f'Information leak in headers: {", ".join([h["header"] for h in sensitive_headers])}' if sensitive_headers else 'No header leakage detected', 'recommendation': 'Remove or anonymize sensitive headers', 'sensitive_headers': sensitive_headers } def _test_error_message_leakage(self, endpoint: Dict, response: Dict) -> Dict[str, Any]: """Test error messages for information leakage""" error_data = response.get('error', '') status_code = response.get('status_code', 200) leaky_info = [] # Check for stack traces if 'stack trace' in error_data.lower() or 'at line' in error_data.lower(): leaky_info.append('Stack trace exposure') # Check for file paths path_patterns = [ r'/[a-zA-Z0-9_\-/\.]+\.py', r'/[a-zA-Z0-9_\-/\.]+\.js', r'/[a-zA-Z0-9_\-/\.]+\.php', r'C:\\[a-zA-Z0-9_\-\\\s\.]+' ] for pattern in path_patterns: if re.search(pattern, error_data): leaky_info.append('File path exposure') break # Check for database errors db_patterns = [ r'mysql', r'postgresql', r'oracle', r'sql server', r'database connection' ] for pattern in db_patterns: if re.search(pattern, error_data, re.IGNORECASE): leaky_info.append('Database information exposure') break vulnerable = len(leaky_info) > 0 return { 'endpoint': endpoint.get('url', ''), 'test_type': 'Error Message Information Leakage', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': f'Error messages leak: {", ".join(leaky_info)}' if leaky_info else 'No error message leakage detected', 'recommendation': 'Implement generic error messages and logging', 'leaked_info': leaky_info, 'status_code': status_code } def _test_debug_information(self, endpoint: Dict, response: Dict) -> Dict[str, Any]: """Test for debug information exposure""" response_data = response.get('data', '') debug_indicators = [] # Check for debug mode indicators debug_patterns = [ r'debug.*?true', r'debug.*?on', r'console\.log', r'print_r', r'var_dump', r'debugger', r'devtools', r'show_errors.*?true' ] for pattern in debug_patterns: if re.search(pattern, response_data, re.IGNORECASE): debug_indicators.append(pattern) # Check for development endpoints dev_patterns = [ '/debug', '/dev', '/test', '/staging', '/phpinfo', '/info' ] url = endpoint.get('url', '') for pattern in dev_patterns: if pattern in url.lower(): debug_indicators.append(f'Development endpoint: {pattern}') vulnerable = len(debug_indicators) > 0 return { 'endpoint': url, 'test_type': 'Debug Information Exposure', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': f'Debug information exposed: {", ".join(debug_indicators)}' if debug_indicators else 'No debug information detected', 'recommendation': 'Disable debug mode in production', 'debug_indicators': debug_indicators } def _test_configuration_leakage(self, application_data: Dict) -> Dict[str, Any]: """Test for configuration file leakage""" config_files = application_data.get('accessible_files', []) sensitive_configs = [] sensitive_patterns = [ r'\.env', r'config\.php', r'web\.config', r'application\.yml', r'database\.yml', r'settings\.json' ] for file_path in config_files: for pattern in sensitive_patterns: if re.search(pattern, file_path, re.IGNORECASE): sensitive_configs.append(file_path) vulnerable = len(sensitive_configs) > 0 return { 'test_type': 'Configuration File Leakage', 'vulnerable': vulnerable, 'severity': 'Critical' if vulnerable else 'Low', 'description': f'Configuration files accessible: {", ".join(sensitive_configs)}' if sensitive_configs else 'No configuration files exposed', 'recommendation': 'Restrict access to configuration files', 'sensitive_files': sensitive_configs } def _test_source_code_leakage(self, application_data: Dict) -> Dict[str, Any]: """Test for source code leakage""" accessible_files = application_data.get('accessible_files', []) source_files = [] source_extensions = ['.py', '.js', '.php', '.java', '.rb', '.go', '.cs'] for file_path in accessible_files: for ext in source_extensions: if file_path.endswith(ext) and not file_path.startswith('/public/'): source_files.append(file_path) vulnerable = len(source_files) > 0 return { 'test_type': 'Source Code Leakage', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': f'Source files accessible: {", ".join(source_files)}' if source_files else 'No source code exposed', 'recommendation': 'Restrict access to source code files', 'source_files': source_files } def _test_backup_file_exposure(self, application_data: Dict) -> Dict[str, Any]: """Test for backup file exposure""" accessible_files = application_data.get('accessible_files', []) backup_files = [] backup_patterns = [ r'\.bak$', r'\.backup$', r'\.old$', r'\.tmp$', r'\.orig$', r'~$', r'\.swp$' ] for file_path in accessible_files: for pattern in backup_patterns: if re.search(pattern, file_path, re.IGNORECASE): backup_files.append(file_path) vulnerable = len(backup_files) > 0 return { 'test_type': 'Backup File Exposure', 'vulnerable': vulnerable, 'severity': 'High' if vulnerable else 'Low', 'description': f'Backup files accessible: {", ".join(backup_files)}' if backup_files else 'No backup files exposed', 'recommendation': 'Remove or restrict access to backup files', 'backup_files': backup_files } def _test_directory_listing(self, application_data: Dict) -> Dict[str, Any]: """Test for directory listing vulnerabilities""" accessible_dirs = application_data.get('accessible_directories', []) vulnerable_dirs = [] for dir_path in accessible_dirs: if application_data.get('directory_listing_enabled', {}).get(dir_path, False): vulnerable_dirs.append(dir_path) vulnerable = len(vulnerable_dirs) > 0 return { 'test_type': 'Directory Listing', 'vulnerable': vulnerable, 'severity': 'Medium' if vulnerable else 'Low', 'description': f'Directory listing enabled: {", ".join(vulnerable_dirs)}' if vulnerable_dirs else 'Directory listing properly disabled', 'recommendation': 'Disable directory listing in web server configuration', 'vulnerable_directories': vulnerable_dirs } def _test_technology_leakage(self, application_data: Dict) -> Dict[str, Any]: """Test for technology stack information leakage""" headers = application_data.get('response_headers', {}) tech_info = [] # Check Server header if 'server' in headers: tech_info.append(f"Server: {headers['server']}") # Check X-Powered-By header if 'x-powered-by' in headers: tech_info.append(f"Powered By: {headers['x-powered-by']}") # Check other technology revealing headers tech_headers = [ 'x-aspnet-version', 'x-generator', 'x-drupal-cache', 'x-via', 'x-variant' ] for header in tech_headers: if header in headers: tech_info.append(f"{header}: {headers[header]}") vulnerable = len(tech_info) > 0 return { 'test_type': 'Technology Stack Information Leakage', 'vulnerable': vulnerable, 'severity': 'Low' if vulnerable else 'Low', 'description': f'Technology information leaked: {", ".join(tech_info)}' if tech_info else 'No technology information leaked', 'recommendation': 'Remove or obscure technology-revealing headers', 'tech_information': tech_info } def _calculate_severity(self, found_patterns: Dict) -> str: """Calculate severity based on found sensitive patterns""" high_risk_patterns = ['ssn', 'credit_card', 'private_key', 'password', 'api_key'] medium_risk_patterns = ['email', 'phone', 'database_url', 'aws_key'] if any(pattern in high_risk_patterns for pattern in found_patterns.keys()): return 'Critical' elif any(pattern in medium_risk_patterns for pattern in found_patterns.keys()): return 'High' elif len(found_patterns) > 0: return 'Medium' else: return 'Low' def _sanitize_sample(self, sample: str) -> str: """Sanitize sensitive data for reporting""" # Mask sensitive patterns in sample sanitized = sample for pattern_name, pattern in self.sensitive_patterns.items(): sanitized = re.sub(pattern, f'[MASKED_{pattern_name.upper()}]', sanitized, flags=re.IGNORECASE) return sanitized def generate_exposure_report(self) -> Dict[str, Any]: """Generate comprehensive data exposure security report""" return { 'test_summary': { 'total_tests': len(self.test_results), 'vulnerabilities_found': len(self.vulnerabilities), 'test_completion_time': datetime.datetime.now().isoformat() }, 'vulnerability_breakdown': { 'critical': len([v for v in self.vulnerabilities if v.get('severity') == 'Critical']), 'high': len([v for v in self.vulnerabilities if v.get('severity') == 'High']), 'medium': len([v for v in self.vulnerabilities if v.get('severity') == 'Medium']), 'low': len([v for v in self.vulnerabilities if v.get('severity') == 'Low']) }, 'vulnerabilities': self.vulnerabilities, 'recommendations': self._generate_exposure_recommendations() } def _generate_exposure_recommendations(self) -> List[str]: """Generate data exposure security recommendations""" recommendations = [] vuln_types = [v.get('test_type', '') for v in self.vulnerabilities] if 'Response Body Data Exposure' in vuln_types: recommendations.append('Implement data filtering to remove sensitive information from API responses') if 'Response Header Information Leakage' in vuln_types: recommendations.append('Configure web server to remove or anonymize sensitive headers') if 'Error Message Information Leakage' in vuln_types: recommendations.append('Implement generic error messages and detailed error logging') if 'Debug Information Exposure' in vuln_types: recommendations.append('Ensure debug mode is disabled in production environments') if 'Configuration File Leakage' in vuln_types: recommendations.append('Restrict access to configuration files using file permissions') if 'Source Code Leakage' in vuln_types: recommendations.append('Configure web server to deny access to source code files') if 'Backup File Exposure' in vuln_types: recommendations.append('Remove backup files from production or restrict access') if 'Directory Listing' in vuln_types: recommendations.append('Disable directory listing in web server configuration') if 'Technology Stack Information Leakage' in vuln_types: recommendations.append('Remove or obscure technology-revealing HTTP headers') return recommendations # Sample test data SAMPLE_ENDPOINTS = [ {'url': '/api/v1/users', 'method': 'GET'}, {'url': '/api/v1/orders', 'method': 'GET'}, {'url': '/api/v1/config', 'method': 'GET'}, {'url': '/api/v1/debug', 'method': 'GET'} ] SAMPLE_RESPONSES = [ { 'data': '{"users": [{"email": "user@example.com", "phone": "555-123-4567"}]}', 'headers': {'server': 'nginx/1.18.0', 'x-powered-by': 'Express.js'}, 'error': '' }, { 'data': '{"orders": [{"total": 99.99, "credit_card": "4111-1111-1111-1111"}]}', 'headers': {'set-cookie': 'session=abc123'}, 'error': '' }, { 'data': 'Error: Database connection failed at /var/www/app/config.py line 42', 'headers': {'server': 'Apache/2.4.41'}, 'error': 'Database connection failed at /var/www/app/config.py line 42', 'status_code': 500 }, { 'data': '{"debug": true, "database_url": "mysql://user:pass@localhost/db"}', 'headers': {'x-debug-token': 'debug123'}, 'error': '' } ] SAMPLE_APPLICATION_DATA = { 'accessible_files': [ '/var/www/.env', '/var/www/config.php', '/var/www/backup.sql', '/var/www/app.py' ], 'accessible_directories': [ '/uploads', '/backup' ], 'directory_listing_enabled': { '/uploads': True, '/backup': True }, 'response_headers': { 'server': 'nginx/1.18.0', 'x-powered-by': 'PHP/7.4.3', 'x-aspnet-version': '4.0.30319' } } if __name__ == "__main__": # Run data exposure tests tester = DataExposureTester() # Test data exposure in responses exposure_results = tester.test_data_exposure(SAMPLE_ENDPOINTS, SAMPLE_RESPONSES) tester.vulnerabilities.extend(exposure_results['vulnerabilities']) # Test information leakage leakage_results = tester.test_information_leakage(SAMPLE_APPLICATION_DATA) tester.vulnerabilities.extend(leakage_results['vulnerabilities']) # Generate report report = tester.generate_exposure_report() print("Data Exposure Security Testing Complete") print(f"Vulnerabilities Found: {len(report['vulnerabilities'])}") print(f"Critical: {report['vulnerability_breakdown']['critical']}") print(f"High: {report['vulnerability_breakdown']['high']}") print(json.dumps(report, indent=2))