#!/usr/bin/env python3 """ SSRF Testing Utilities for Project Starlight Tests for Server-Side Request Forgery vulnerabilities """ import json import re import base64 import hashlib import datetime import urllib.parse from typing import Dict, List, Optional, Any class SSRFTester: """Tests for Server-Side Request Forgery vulnerabilities.""" def __init__(self): self.test_payloads = self._generate_ssrf_payloads() self.malicious_schemes = ['file://', 'ftp://', 'dict://', 'gopher://', 'ldap://'] self.local_networks = [ '127.0.0.1', 'localhost', '0.0.0.0', '169.254.169.254', # AWS metadata 'metadata.google.internal', # GCP metadata '192.168.', # Private network '10.', # Private network '172.16.', # Private network '172.17.', # Private network '172.18.', # Private network '172.19.', # Private network '172.20.', # Private network '172.21.', # Private network '172.22.', # Private network '172.23.', # Private network '172.24.', # Private network '172.25.', # Private network '172.26.', # Private network '172.27.', # Private network '172.28.', # Private network '172.29.', # Private network '172.30.', # Private network '172.31.', # Private network ] def _generate_ssrf_payloads(self) -> List[Dict[str, Any]]: """Generate SSRF test payloads.""" payloads = [] # Basic localhost variants localhost_variants = [ 'http://127.0.0.1', 'http://localhost', 'http://0.0.0.0', 'http://127.0.0.1:80', 'http://127.0.0.1:443', 'http://127.0.0.1:8080', ] # URL encoding bypasses encoding_variants = [ 'http://%31%32%37%2E%30%2E%30%2E%31', # 127.0.0.1 'http://127.0.0.1%2F', 'http://127.0.0.1%3A80', ] # Decimal and octal formats format_variants = [ 'http://2130706433', # 127.0.0.1 in decimal 'http://0177.0.0.1', # 127.0.0.1 in octal 'http://0x7F000001', # 127.0.0.1 in hex ] # Cloud metadata services cloud_metadata = [ 'http://169.254.169.254/latest/meta-data/', 'http://metadata.google.internal/computeMetadata/v1/', 'http://169.254.169.254/metadata/identity/oauth2/token', ] # Domain redirection redirect_variants = [ 'http://localtest.me', 'http://127-0-0-1.sslip.io', 'http://nip.io', ] # Combine all variants for variant in localhost_variants: payloads.append({ 'type': 'localhost', 'payload': variant, 'description': f'Basic localhost test: {variant}' }) for variant in encoding_variants: payloads.append({ 'type': 'encoding', 'payload': variant, 'description': f'URL encoding bypass: {variant}' }) for variant in format_variants: payloads.append({ 'type': 'format', 'payload': variant, 'description': f'IP format bypass: {variant}' }) for variant in cloud_metadata: payloads.append({ 'type': 'cloud_metadata', 'payload': variant, 'description': f'Cloud metadata test: {variant}' }) for variant in redirect_variants: payloads.append({ 'type': 'redirect', 'payload': variant, 'description': f'Domain redirect test: {variant}' }) return payloads def analyze_input_validation(self, user_input: str) -> Dict[str, Any]: """Analyze input for SSRF vulnerabilities.""" analysis = { 'input': user_input, 'is_suspicious': False, 'risks': [], 'recommendations': [], 'validation_score': 100, 'timestamp': datetime.datetime.now().isoformat() } # Check for malicious schemes for scheme in self.malicious_schemes: if user_input.lower().startswith(scheme): analysis['risks'].append({ 'type': 'malicious_scheme', 'severity': 'critical', 'description': f'Dangerous URL scheme detected: {scheme}', 'payload': user_input }) analysis['is_suspicious'] = True analysis['validation_score'] -= 30 # Check for localhost/network references decoded_input = urllib.parse.unquote(user_input) for network_ref in self.local_networks: if network_ref in decoded_input: analysis['risks'].append({ 'type': 'internal_network', 'severity': 'high', 'description': f'Internal network reference detected: {network_ref}', 'payload': user_input }) analysis['is_suspicious'] = True analysis['validation_score'] -= 25 # Check for URL encoding attempts if user_input != urllib.parse.unquote(user_input): if analysis['validation_score'] == 100: # No other risks found analysis['risks'].append({ 'type': 'url_encoding', 'severity': 'medium', 'description': 'URL encoding detected - possible bypass attempt', 'payload': user_input }) analysis['is_suspicious'] = True analysis['validation_score'] -= 10 # Check for IP format variations ip_patterns = [ r'\b\d+\.\d+\.\d+\.\d+\b', # Standard IP r'\b\d{10}\b', # Decimal IP r'\b0x[0-9a-fA-F]+\b', # Hex IP r'\b0[0-7]+\.[0-7]+\.[0-7]+\.[0-7]+\b' # Octal IP ] for pattern in ip_patterns: if re.search(pattern, decoded_input): if not analysis['risks'] or not any(r['type'] == 'internal_network' for r in analysis['risks']): analysis['risks'].append({ 'type': 'ip_variation', 'severity': 'medium', 'description': f'IP format variation detected: {pattern}', 'payload': user_input }) analysis['is_suspicious'] = True analysis['validation_score'] -= 15 break # Generate recommendations if analysis['risks']: analysis['recommendations'] = self._generate_ssrf_recommendations(analysis['risks']) return analysis def _generate_ssrf_recommendations(self, risks: List[Dict]) -> List[str]: """Generate SSRF mitigation recommendations.""" recommendations = [] risk_types = set(risk['type'] for risk in risks) if 'malicious_scheme' in risk_types: recommendations.append("Implement strict URL scheme whitelist (allow only http/https)") if 'internal_network' in risk_types: recommendations.append("Implement IP address blacklist for private networks") recommendations.append("Use DNS resolution to validate hostnames") if 'url_encoding' in risk_types: recommendations.append("Normalize and decode URLs before validation") if 'ip_variation' in risk_types: recommendations.append("Convert all IP formats to standard format for validation") # General recommendations recommendations.extend([ "Implement server-side request timeout limits", "Use allow-list approach for permitted domains", "Validate and sanitize all user-provided URLs", "Implement response size limits", "Consider using a dedicated URL validation library" ]) return list(set(recommendations)) # Remove duplicates def generate_test_report(self) -> Dict[str, Any]: """Generate comprehensive SSRF testing report.""" report = { 'timestamp': datetime.datetime.now().isoformat(), 'test_type': 'SSRF Vulnerability Assessment', 'total_payloads': len(self.test_payloads), 'payload_categories': {}, 'recommendations': self._get_general_recommendations(), 'test_payloads': self.test_payloads } # Categorize payloads categories = {} for payload in self.test_payloads: category = payload['type'] if category not in categories: categories[category] = [] categories[category].append(payload) report['payload_categories'] = categories return report def _get_general_recommendations(self) -> List[str]: """Get general SSRF mitigation recommendations.""" return [ "Implement strict input validation for all URL parameters", "Use an allow-list approach for permitted domains and IP ranges", "Never send user-controlled URLs directly to backend services", "Implement proper URL parsing and validation before making requests", "Use network segmentation to limit server outbound access", "Implement request timeout and response size limits", "Monitor and log all outbound requests for anomaly detection", "Consider using a dedicated URL sanitization library", "Test regularly with automated SSRF scanning tools", "Educate developers about SSRF risks and prevention techniques" ] def test_endpoint_simulation(self, endpoint_url: str, param_name: str = 'url') -> Dict[str, Any]: """Simulate testing an endpoint for SSRF vulnerabilities.""" test_results = { 'endpoint': endpoint_url, 'parameter': param_name, 'timestamp': datetime.datetime.now().isoformat(), 'vulnerabilities_found': 0, 'test_results': [] } # Simulate testing with different payloads for payload in self.test_payloads[:5]: # Test first 5 payloads for demo test_result = { 'payload': payload['payload'], 'type': payload['type'], 'description': payload['description'], 'simulated_response': self._simulate_response(payload), 'vulnerable': False } # Simulate vulnerability detection if payload['type'] in ['localhost', 'cloud_metadata', 'internal_network']: test_result['vulnerable'] = True test_results['vulnerabilities_found'] += 1 test_results['test_results'].append(test_result) return test_results def _simulate_response(self, payload: Dict[str, Any]) -> Dict[str, Any]: """Simulate server response for SSRF testing.""" payload_str = payload['payload'] payload_type = payload['type'] if payload_type == 'localhost': return { 'status_code': 200, 'content': 'Apache2 Ubuntu Default Page', 'response_time': 0.05, 'indicates_vulnerability': True } elif payload_type == 'cloud_metadata': return { 'status_code': 200, 'content': '{"instance-id": "i-1234567890abcdef0"}', 'response_time': 0.15, 'indicates_vulnerability': True } else: return { 'status_code': 400, 'content': 'Bad Request - Invalid URL', 'response_time': 0.01, 'indicates_vulnerability': False } def main(): """Test the SSRF tester.""" tester = SSRFTester() # Test with sample inputs test_inputs = [ 'http://example.com/image.jpg', 'http://127.0.0.1/admin', 'http://169.254.169.254/latest/meta-data/', 'http://%31%32%37%2E%30%2E%30%2E%31', 'file:///etc/passwd' ] print("SSRF Testing Results:") print("=" * 50) test_results = [] for test_input in test_inputs: analysis = tester.analyze_input_validation(test_input) test_results.append(analysis) print(f"\nTesting: {test_input}") print(f"Suspicious: {analysis['is_suspicious']}") print(f"Score: {analysis['validation_score']}") if analysis['risks']: for risk in analysis['risks']: print(f" - {risk['severity']}: {risk['description']}") # Generate test report report = tester.generate_test_report() print(f"\nTotal test payloads: {report['total_payloads']}") print("Payload categories:", list(report['payload_categories'].keys())) # Test endpoint simulation endpoint_test = tester.test_endpoint_simulation('/api/fetch', 'url') print(f"\nEndpoint test vulnerabilities found: {endpoint_test['vulnerabilities_found']}") return { 'report': report, 'test_results': test_results, 'endpoint_test': endpoint_test, 'test_completed': True } if __name__ == "__main__": main()