#!/usr/bin/env python3
"""
Comprehensive Security Module
Unified security checks, input sanitization, and threat detection
"""
import json
import re
import hashlib
import secrets
import datetime
import urllib.parse
import html
from typing import Dict, List, Optional, Any, Tuple, Union
from dataclasses import dataclass
from enum import Enum
class ThreatLevel(Enum):
"""Security threat levels"""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class AttackVector(Enum):
"""Common attack vectors"""
INJECTION = "injection"
XSS = "xss"
CSRF = "csrf"
PRIVILEGE_ESCALATION = "privilege_escalation"
RECONNAISSANCE = "reconnaissance"
DOS = "dos"
BRUTE_FORCE = "brute_force"
TIMING = "timing"
@dataclass
class SecurityAlert:
"""Security alert data structure"""
threat_level: ThreatLevel
attack_vector: AttackVector
description: str
source: str
timestamp: str
evidence: Dict[str, Any]
recommendation: str
blocked: bool
@dataclass
class SanitizationResult:
"""Input sanitization result"""
original_input: str
sanitized_input: str
threats_detected: List[SecurityAlert]
safe: bool
modifications_made: bool
risk_score: float
@dataclass
class SecurityPolicy:
"""Security policy configuration"""
max_input_length: int
allowed_characters: str
forbidden_patterns: List[str]
rate_limit_enabled: bool
request_timeout: int
audit_logging: bool
class ComprehensiveSecurityModule:
"""Enterprise-grade comprehensive security module"""
def __init__(self, policy: Optional[SecurityPolicy] = None):
self.policy = policy or self._default_security_policy()
self.alert_log = []
self.blocked_ips = set()
self.rate_limiter = {}
# Security patterns and signatures
self.injection_patterns = self._initialize_injection_patterns()
self.xss_patterns = self._initialize_xss_patterns()
self.malicious_payloads = self._initialize_malicious_payloads()
self.suspicious_user_agents = self._initialize_suspicious_user_agents()
# Input sanitization rules
self.sanitization_rules = {
"html_escape": self._html_escape_input,
"url_encode": self._url_encode_input,
"remove_control_chars": self._remove_control_characters,
"normalize_whitespace": self._normalize_whitespace,
"validate_encoding": self._validate_encoding,
"check_length": self._check_input_length
}
def sanitize_and_validate_input(self,
input_data: str,
input_type: str = "general",
source: str = "unknown") -> SanitizationResult:
"""Comprehensive input sanitization and validation"""
if not input_data:
return SanitizationResult(
original_input="",
sanitized_input="",
threats_detected=[],
safe=True,
modifications_made=False,
risk_score=0.0
)
original_input = input_data
current_input = input_data
threats_detected = []
modifications_made = False
risk_score = 0.0
# Step 1: Apply sanitization rules
for rule_name, rule_func in self.sanitization_rules.items():
try:
sanitized_result = rule_func(current_input)
if isinstance(sanitized_result, tuple):
new_input, rule_threats, rule_score = sanitized_result
if new_input != current_input:
modifications_made = True
current_input = new_input
threats_detected.extend(rule_threats)
risk_score += rule_score
else:
if sanitized_result != current_input:
modifications_made = True
current_input = sanitized_result
except Exception as e:
threats_detected.append(SecurityAlert(
threat_level=ThreatLevel.LOW,
attack_vector=AttackVector.INJECTION,
description=f"Sanitization rule {rule_name} failed: {e}",
source=source,
timestamp=datetime.datetime.now().isoformat(),
evidence={"rule": rule_name, "error": str(e)},
recommendation="Review sanitization rule implementation",
blocked=False
))
# Step 2: Detect specific attack patterns
injection_threats = self._detect_injection_attacks(current_input, source)
xss_threats = self._detect_xss_attacks(current_input, source)
payload_threats = self._detect_malicious_payloads(current_input, source)
threats_detected.extend(injection_threats)
threats_detected.extend(xss_threats)
threats_detected.extend(payload_threats)
# Step 3: Calculate final risk score
for threat in threats_detected:
if threat.threat_level == ThreatLevel.CRITICAL:
risk_score += 50.0
elif threat.threat_level == ThreatLevel.HIGH:
risk_score += 25.0
elif threat.threat_level == ThreatLevel.MEDIUM:
risk_score += 10.0
elif threat.threat_level == ThreatLevel.LOW:
risk_score += 5.0
# Step 4: Determine if input is safe
critical_threats = [t for t in threats_detected if t.threat_level in [ThreatLevel.CRITICAL, ThreatLevel.HIGH]]
safe = len(critical_threats) == 0 and risk_score < 30.0
# Step 5: Log and block if necessary
if threats_detected:
self.alert_log.extend(threats_detected)
# Auto-block critical threats
for threat in threats_detected:
if threat.threat_level == ThreatLevel.CRITICAL and source != "unknown":
self.blocked_ips.add(source)
threat.blocked = True
# Step 6: Apply rate limiting
if self.policy.rate_limit_enabled:
self._apply_rate_limiting(source)
return SanitizationResult(
original_input=original_input,
sanitized_input=current_input,
threats_detected=threats_detected,
safe=safe,
modifications_made=modifications_made,
risk_score=min(risk_score, 100.0)
)
def validate_request_security(self,
request_data: Dict[str, Any],
headers: Dict[str, str],
ip_address: str) -> List[SecurityAlert]:
"""Validate overall request security"""
alerts = []
# Check IP blocklist
if ip_address in self.blocked_ips:
alerts.append(SecurityAlert(
threat_level=ThreatLevel.CRITICAL,
attack_vector=AttackVector.BRUTE_FORCE,
description="Request from blocked IP address",
source=ip_address,
timestamp=datetime.datetime.now().isoformat(),
evidence={"ip": ip_address},
recommendation="IP is permanently blocked",
blocked=True
))
return alerts
# Check User-Agent
user_agent = headers.get("User-Agent", "")
suspicious_ua = self._detect_suspicious_user_agent(user_agent, ip_address)
alerts.extend(suspicious_ua)
# Check for common attack patterns in headers
header_attacks = self._detect_header_attacks(headers, ip_address)
alerts.extend(header_attacks)
# Check request size
request_size = len(json.dumps(request_data)) + sum(len(k) + len(v) for k, v in headers.items())
if request_size > self.policy.max_input_length * 10: # 10x multiplier for requests
alerts.append(SecurityAlert(
threat_level=ThreatLevel.MEDIUM,
attack_vector=AttackVector.DOS,
description="Oversized request detected",
source=ip_address,
timestamp=datetime.datetime.now().isoformat(),
evidence={"size": request_size, "max_allowed": self.policy.max_input_length * 10},
recommendation="Reduce request size or increase limits",
blocked=False
))
return alerts
def get_security_report(self, hours: int = 24) -> Dict[str, Any]:
"""Generate comprehensive security report"""
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=hours)
recent_alerts = [
alert for alert in self.alert_log
if datetime.datetime.fromisoformat(alert.timestamp) > cutoff_time
]
# Categorize alerts
threat_counts = {level.value: 0 for level in ThreatLevel}
attack_vector_counts = {vector.value: 0 for vector in AttackVector}
for alert in recent_alerts:
threat_counts[alert.threat_level.value] += 1
attack_vector_counts[alert.attack_vector.value] += 1
# Top sources
top_sources = {}
for alert in recent_alerts:
top_sources[alert.source] = top_sources.get(alert.source, 0) + 1
# Calculate security score
total_threats = len(recent_alerts)
critical_count = threat_counts["critical"]
high_count = threat_counts["high"]
security_score = max(0, 100 - (critical_count * 20) - (high_count * 10) - (total_threats - critical_count - high_count))
return {
"timeframe_hours": hours,
"total_alerts": total_threats,
"security_score": security_score,
"threat_levels": threat_counts,
"attack_vectors": attack_vector_counts,
"top_sources": dict(sorted(top_sources.items(), key=lambda x: x[1], reverse=True)[:10]),
"blocked_ips_count": len(self.blocked_ips),
"recommendations": self._generate_security_recommendations(recent_alerts)
}
def _default_security_policy(self) -> SecurityPolicy:
"""Default security policy configuration"""
return SecurityPolicy(
max_input_length=10000,
allowed_characters="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 .,!?-_@#$%^&*()[]{}|:;<>",
forbidden_patterns=[
r'',
r'javascript:',
r'eval\s*\(',
r'exec\s*\(',
r'system\s*\(',
r'passthru\s*\(',
r'shell_exec\s*\(',
r'union.*select',
r'drop.*table',
r'insert.*into',
r'delete.*from',
r'update.*set',
r'alert\s*\(',
r'confirm\s*\(',
r'prompt\s*\('
],
rate_limit_enabled=True,
request_timeout=30,
audit_logging=True
)
def _initialize_injection_patterns(self) -> List[str]:
"""Initialize SQL injection and command injection patterns"""
return [
r"['\"]\s*or\s*['\"][\w]*['\"]\s*=\s*['\"]",
r"union\s+select",
r"insert\s+into",
r"delete\s+from",
r"update\s+.*\s+set",
r"drop\s+table",
r"create\s+table",
r"alter\s+table",
r"exec\s*\(",
r"system\s*\(",
r"eval\s*\(",
r"passthru\s*\(",
r"shell_exec\s*\(",
r"`[^`]*`",
r"\$\([^)]*\)",
r";\s*(rm|del|format|mkfs)",
r"[|&]\s*(rm|del|format|mkfs)",
r"<\?php",
r"",
r"javascript:",
r"on\w+\s*=",
r"