import json import math import re import hashlib import datetime import base64 import html from typing import Dict, List, Optional, Any, Union class SessionManagementSecurity: """ Session management vulnerability detection Tests session hijacking, fixation, timeout, and secure handling """ def __init__(self): self.vulnerabilities = [] self.test_results = {} def test_session_token_generation(self, session_config: Dict) -> Dict: """Test session token generation for randomness and predictability""" results = { "token_entropy": 0, "token_predictability": "unknown", "vulnerabilities": [] } # Simulate session token generation token_length = session_config.get("token_length", 32) token_source = session_config.get("token_source", "random") # Test token entropy if token_length < 16: results["vulnerabilities"].append({ "type": "short_token", "severity": "medium", "description": f"Token length too short: {token_length} characters" }) results["token_entropy"] = token_length * 4 # Approximate entropy else: results["token_entropy"] = token_length * 8 # Higher entropy for longer tokens # Test token source if token_source == "timestamp": results["vulnerabilities"].append({ "type": "predictable_token", "severity": "high", "description": "Token based on timestamp is predictable" }) results["token_predictability"] = "high" elif token_source == "sequential": results["vulnerabilities"].append({ "type": "sequential_token", "severity": "critical", "description": "Sequential tokens are easily predictable" }) results["token_predictability"] = "critical" else: results["token_predictability"] = "low" # Test for insufficient randomness if not session_config.get("use_crypto_random", True): results["vulnerabilities"].append({ "type": "weak_randomness", "severity": "high", "description": "Not using cryptographically secure random number generator" }) return results def test_session_hijacking_vulnerabilities(self, hijack_config: Dict) -> Dict: """Test for session hijacking vulnerabilities""" results = { "session_fixation_vulnerable": False, "session_hijacking_vectors": [], "security_measures": [] } # Test session fixation allow_external_session = hijack_config.get("allow_external_session", False) regenerate_on_login = hijack_config.get("regenerate_on_login", True) regenerate_on_privilege_change = hijack_config.get("regenerate_on_privilege_change", True) if allow_external_session: results["session_fixation_vulnerable"] = True results["session_hijacking_vectors"].append({ "type": "session_fixation", "severity": "high", "description": "External session IDs are accepted" }) if not regenerate_on_login: results["session_hijacking_vectors"].append({ "type": "no_regeneration_on_login", "severity": "medium", "description": "Session not regenerated after login" }) if not regenerate_on_privilege_change: results["session_hijacking_vectors"].append({ "type": "no_regeneration_on_privilege_change", "severity": "high", "description": "Session not regenerated after privilege escalation" }) # Test security measures if hijack_config.get("use_https", False): results["security_measures"].append("HTTPS enforced") else: results["session_hijacking_vectors"].append({ "type": "missing_https", "severity": "critical", "description": "Session tokens transmitted over HTTP" }) if hijack_config.get("secure_cookie", False): results["security_measures"].append("Secure cookie flag set") else: results["session_hijacking_vectors"].append({ "type": "insecure_cookie", "severity": "high", "description": "Session cookie not marked as Secure" }) if hijack_config.get("http_only_cookie", True): results["security_measures"].append("HttpOnly cookie flag set") else: results["session_hijacking_vectors"].append({ "type": "accessible_via_javascript", "severity": "medium", "description": "Session cookie accessible via JavaScript" }) return results def test_session_timeout_configuration(self, timeout_config: Dict) -> Dict: """Test session timeout and expiration configuration""" results = { "timeout_vulnerabilities": [], "recommendations": [] } idle_timeout = timeout_config.get("idle_timeout", 0) # in seconds absolute_timeout = timeout_config.get("absolute_timeout", 0) # in seconds remember_me_timeout = timeout_config.get("remember_me_timeout", 0) # Check idle timeout if idle_timeout == 0: results["timeout_vulnerabilities"].append({ "type": "no_idle_timeout", "severity": "high", "description": "No idle timeout configured" }) results["recommendations"].append("Implement idle timeout (recommended: 15-30 minutes)") elif idle_timeout > 7200: # 2 hours results["timeout_vulnerabilities"].append({ "type": "excessive_idle_timeout", "severity": "medium", "description": f"Idle timeout too long: {idle_timeout/3600:.1f} hours" }) # Check absolute timeout if absolute_timeout == 0: results["timeout_vulnerabilities"].append({ "type": "no_absolute_timeout", "severity": "medium", "description": "No absolute session timeout configured" }) results["recommendations"].append("Implement absolute timeout (recommended: 8-24 hours)") elif absolute_timeout > 86400: # 24 hours results["timeout_vulnerabilities"].append({ "type": "excessive_absolute_timeout", "severity": "low", "description": f"Absolute timeout very long: {absolute_timeout/86400:.1f} days" }) # Check remember me functionality if remember_me_timeout > 2592000: # 30 days results["timeout_vulnerabilities"].append({ "type": "excessive_remember_me", "severity": "medium", "description": f"Remember me timeout too long: {remember_me_timeout/86400:.1f} days" }) results["recommendations"].append("Limit remember me duration to 30 days or less") return results def test_session_storage_security(self, storage_config: Dict) -> Dict: """Test session storage security""" results = { "storage_vulnerabilities": [], "security_score": 0 } storage_type = storage_config.get("storage_type", "file") encrypt_data = storage_config.get("encrypt_session_data", False) validate_data = storage_config.get("validate_session_data", True) # Check storage type if storage_type == "file": if not storage_config.get("secure_file_permissions", True): results["storage_vulnerabilities"].append({ "type": "insecure_file_permissions", "severity": "high", "description": "Session files with insecure permissions" }) if storage_config.get("world_readable", False): results["storage_vulnerabilities"].append({ "type": "world_readable_sessions", "severity": "critical", "description": "Session files readable by all users" }) elif storage_type == "database": if not encrypt_data: results["storage_vulnerabilities"].append({ "type": "unencrypted_database_storage", "severity": "medium", "description": "Session data stored in clear text in database" }) elif storage_type == "client_side": results["storage_vulnerabilities"].append({ "type": "client_side_storage", "severity": "high", "description": "Session data stored on client side" }) # Check data encryption if not encrypt_data: results["storage_vulnerabilities"].append({ "type": "unencrypted_sessions", "severity": "medium", "description": "Session data not encrypted" }) # Check data validation if not validate_data: results["storage_vulnerabilities"].append({ "type": "unvalidated_session_data", "severity": "medium", "description": "Session data not validated on retrieval" }) # Calculate security score base_score = 100 for vuln in results["storage_vulnerabilities"]: severity_penalty = { "critical": 40, "high": 25, "medium": 15, "low": 5 } base_score -= severity_penalty.get(vuln["severity"], 10) results["security_score"] = max(0, base_score) return results def test_session_concurrency_and_locking(self, concurrency_config: Dict) -> Dict: """Test session concurrency and race conditions""" results = { "concurrency_issues": [], "race_conditions": [] } allow_concurrent_sessions = concurrency_config.get("allow_concurrent_sessions", True) max_concurrent_sessions = concurrency_config.get("max_concurrent_sessions", 0) session_locking = concurrency_config.get("session_locking", False) # Check concurrent sessions if allow_concurrent_sessions and max_concurrent_sessions == 0: results["concurrency_issues"].append({ "type": "unlimited_concurrent_sessions", "severity": "medium", "description": "Unlimited concurrent sessions allowed" }) if allow_concurrent_sessions and max_concurrent_sessions > 5: results["concurrency_issues"].append({ "type": "excessive_concurrent_sessions", "severity": "low", "description": f"Too many concurrent sessions allowed: {max_concurrent_sessions}" }) # Check for race conditions if not session_locking: results["race_conditions"].append({ "type": "no_session_locking", "severity": "medium", "description": "No session locking implemented" }) if concurrency_config.get("atomic_operations", False) == False: results["race_conditions"].append({ "type": "non_atomic_operations", "severity": "low", "description": "Session operations not atomic" }) return results def _calculate_token_entropy(self, token: str) -> float: """Calculate Shannon entropy of a token""" if not token: return 0 # Count character frequencies char_counts = {} for char in token: char_counts[char] = char_counts.get(char, 0) + 1 # Calculate entropy entropy = 0 token_len = len(token) for count in char_counts.values(): probability = count / token_len entropy -= probability * math.log2(probability) return entropy * token_len # Total entropy def _test_session_predictability(self, tokens: List[str]) -> str: """Test if session tokens are predictable""" if len(tokens) < 2: return "insufficient_data" # Test for patterns has_pattern = False for i in range(1, len(tokens)): # Check for sequential patterns if self._is_sequential(tokens[i-1], tokens[i]): has_pattern = True break # Check for time-based patterns if self._is_time_based(tokens[i-1], tokens[i]): has_pattern = True break return "high" if has_pattern else "low" def _is_sequential(self, token1: str, token2: str) -> bool: """Check if tokens are sequential""" try: # Try to parse as hexadecimal numbers num1 = int(token1, 16) num2 = int(token2, 16) return num2 == num1 + 1 except: # Try to check string patterns if len(token1) != len(token2): return False differences = sum(c1 != c2 for c1, c2 in zip(token1, token2)) return differences <= 2 # Small differences suggest sequential pattern def _is_time_based(self, token1: str, token2: str) -> bool: """Check if tokens show time-based patterns""" # Simple heuristic: check if tokens contain timestamp-like substrings import re timestamp_pattern = r'\d{10,13}' # Unix timestamp patterns return bool(re.search(timestamp_pattern, token1) and re.search(timestamp_pattern, token2)) def generate_session_management_report(self) -> Dict: """Generate comprehensive session management security report""" report = { "assessment_date": datetime.datetime.now().isoformat(), "assessment_type": "session_management_security", "vulnerabilities": self.vulnerabilities, "test_results": self.test_results, "risk_summary": self._calculate_risk_summary(), "recommendations": self._generate_recommendations() } return report def _calculate_risk_summary(self) -> Dict: """Calculate overall risk summary""" total_vulnerabilities = len(self.vulnerabilities) critical_count = len([v for v in self.vulnerabilities if v.get("severity") == "critical"]) high_count = len([v for v in self.vulnerabilities if v.get("severity") == "high"]) if critical_count > 0 or high_count > 3: risk_level = "critical" elif high_count > 0: risk_level = "high" elif total_vulnerabilities > 2: risk_level = "medium" elif total_vulnerabilities > 0: risk_level = "low" else: risk_level = "minimal" return { "risk_level": risk_level, "vulnerability_count": total_vulnerabilities, "critical_vulnerabilities": critical_count, "high_vulnerabilities": high_count, "security_score": max(0, 100 - (critical_count * 40) - (high_count * 25) - (total_vulnerabilities * 5)) } def _generate_recommendations(self) -> List[str]: """Generate security recommendations based on findings""" recommendations = [] # Analyze vulnerabilities and generate targeted recommendations vuln_types = [v.get("type", "") for v in self.vulnerabilities] if "predictable_token" in vuln_types or "sequential_token" in vuln_types: recommendations.append("Use cryptographically secure random number generators for session tokens") recommendations.append("Implement minimum 32-character session tokens with high entropy") if "session_fixation" in vuln_types: recommendations.append("Regenerate session IDs on authentication and privilege changes") recommendations.append("Reject external session ID values") if "missing_https" in vuln_types or "insecure_cookie" in vuln_types: recommendations.append("Enforce HTTPS for all session-related communications") recommendations.append("Set Secure, HttpOnly, and SameSite flags on session cookies") if "no_idle_timeout" in vuln_types: recommendations.append("Implement appropriate session timeouts (15-30 minutes idle, 8-24 hours absolute)") if "unencrypted_sessions" in vuln_types: recommendations.append("Encrypt session data at rest") if not recommendations: recommendations.append("Session management appears secure - continue regular security audits") return recommendations def demo_session_management_testing(): """Demonstration of session management security testing""" tester = SessionManagementSecurity() # Test 1: Session token generation session_config = { "token_length": 8, # Too short "token_source": "sequential", # Predictable "use_crypto_random": False } token_results = tester.test_session_token_generation(session_config) tester.test_results["token_generation"] = token_results # Test 2: Session hijacking vulnerabilities hijack_config = { "allow_external_session": True, # Vulnerable "regenerate_on_login": False, "use_https": False, # Vulnerable "secure_cookie": False, # Vulnerable "http_only_cookie": False } hijack_results = tester.test_session_hijacking_vulnerabilities(hijack_config) tester.test_results["session_hijacking"] = hijack_results # Test 3: Session timeout configuration timeout_config = { "idle_timeout": 0, # No timeout "absolute_timeout": 0, # No timeout "remember_me_timeout": 7776000 # 90 days, too long } timeout_results = tester.test_session_timeout_configuration(timeout_config) tester.test_results["session_timeout"] = timeout_results # Test 4: Session storage security storage_config = { "storage_type": "file", "encrypt_session_data": False, "secure_file_permissions": False, "world_readable": True # Vulnerable } storage_results = tester.test_session_storage_security(storage_config) tester.test_results["session_storage"] = storage_results # Test 5: Session concurrency concurrency_config = { "allow_concurrent_sessions": True, "max_concurrent_sessions": 0, # Unlimited "session_locking": False } concurrency_results = tester.test_session_concurrency_and_locking(concurrency_config) tester.test_results["session_concurrency"] = concurrency_results # Collect vulnerabilities if len(token_results["vulnerabilities"]) > 0: tester.vulnerabilities.extend(token_results["vulnerabilities"]) if len(hijack_results["session_hijacking_vectors"]) > 0: tester.vulnerabilities.extend(hijack_results["session_hijacking_vectors"]) if len(timeout_results["timeout_vulnerabilities"]) > 0: tester.vulnerabilities.extend(timeout_results["timeout_vulnerabilities"]) if len(storage_results["storage_vulnerabilities"]) > 0: tester.vulnerabilities.extend(storage_results["storage_vulnerabilities"]) if len(concurrency_results["concurrency_issues"]) > 0: tester.vulnerabilities.extend(concurrency_results["concurrency_issues"]) return tester.generate_session_management_report() if __name__ == "__main__": # Run demonstration session_report = demo_session_management_testing() print("Session Management Security Testing Results:") print(json.dumps(session_report, indent=2))