import json import math import datetime from typing import Dict, List, Optional, Any, Callable import dataclasses import collections import hashlib @dataclasses.dataclass class ThreatAlert: """Real-time threat alert.""" alert_id: str threat_type: str severity: str # low, medium, high, critical source: str target: str description: str timestamp: datetime.datetime confidence: float status: str # active, acknowledged, resolved, false_positive assigned_to: Optional[str] = None resolution_time: Optional[datetime.datetime] = None @dataclasses.dataclass class DetectionRule: """Threat detection rule.""" rule_id: str name: str description: str threat_pattern: str severity: str enabled: bool detection_function: Callable false_positive_rate: float = 0.05 class RealTimeThreatDetector: """Real-time threat detection and alerting system.""" def __init__(self): self.alerts = [] self.detection_rules = {} self.alert_handlers = {} self.alert_escalation_policies = {} self.detection_history = collections.deque(maxlen=100000) self.false_positive_feedback = collections.deque(maxlen=10000) # Initialize default detection rules self.initialize_default_rules() def initialize_default_rules(self): """Initialize default threat detection rules.""" def detect_anomalous_login(data: Dict[str, Any]) -> bool: """Detect anomalous login patterns.""" user = data.get("user_id", "") ip = data.get("ip_address", "") time = data.get("timestamp", datetime.datetime.now()) # Check for login from unusual location if user in self.detection_history: user_history = [h for h in self.detection_history if h.get("user_id") == user] if user_history: recent_ips = [h.get("ip_address") for h in user_history[-10:]] if ip not in recent_ips and len(recent_ips) > 3: return True # Check for unusual time hour = time.hour if isinstance(time, datetime.datetime) else datetime.datetime.now().hour if hour < 6 or hour > 22: # Unusual hours return True return False def detect_suspicious_network_activity(data: Dict[str, Any]) -> bool: """Detect suspicious network activity.""" connections = data.get("connection_count", 0) data_size = data.get("data_size", 0) port_scan = data.get("port_scan_detected", False) if connections > 1000 or data_size > 100000000 or port_scan: return True return False def detect_unauthorized_access(data: Dict[str, Any]) -> bool: """Detect unauthorized access attempts.""" resource = data.get("resource", "") permissions = data.get("user_permissions", []) required_permissions = data.get("required_permissions", []) if not all(perm in permissions for perm in required_permissions): return True return False def detect_data_exfiltration(data: Dict[str, Any]) -> bool: """Detect potential data exfiltration.""" data_accessed = data.get("data_accessed", []) access_volume = data.get("access_volume", 0) unusual_pattern = data.get("unusual_pattern", False) if len(data_accessed) > 1000 or access_volume > 1000000 or unusual_pattern: return True return False # Register default rules self.register_detection_rule( "ANOMALOUS_LOGIN", "Anomalous Login Detection", "Detects unusual login patterns and potential account compromise", "login_anomaly", "high", detect_anomalous_login ) self.register_detection_rule( "SUSPICIOUS_NETWORK", "Suspicious Network Activity", "Detects abnormal network behavior and potential attacks", "network_anomaly", "medium", detect_suspicious_network_activity ) self.register_detection_rule( "UNAUTHORIZED_ACCESS", "Unauthorized Access Attempt", "Detects attempts to access resources without proper permissions", "access_violation", "high", detect_unauthorized_access ) self.register_detection_rule( "DATA_EXFILTRATION", "Data Exfiltration Detection", "Detects patterns consistent with data theft or exfiltration", "data_theft", "critical", detect_data_exfiltration ) def register_detection_rule(self, rule_id: str, name: str, description: str, threat_pattern: str, severity: str, detection_function: Callable) -> bool: """Register a new threat detection rule.""" rule = DetectionRule( rule_id=rule_id, name=name, description=description, threat_pattern=threat_pattern, severity=severity, enabled=True, detection_function=detection_function ) self.detection_rules[rule_id] = rule return True def register_alert_handler(self, handler_id: str, handler_function: Callable) -> bool: """Register an alert handler function.""" self.alert_handlers[handler_id] = handler_function return True def set_escalation_policy(self, severity: str, timeout_minutes: int, escalation_action: str) -> bool: """Set alert escalation policy.""" self.alert_escalation_policies[severity] = { "timeout_minutes": timeout_minutes, "escalation_action": escalation_action } return True def process_event(self, event_data: Dict[str, Any]) -> List[ThreatAlert]: """Process an event and generate threat alerts.""" alerts = [] # Store event in detection history self.detection_history.append(event_data) # Apply all enabled detection rules for rule_id, rule in self.detection_rules.items(): if not rule.enabled: continue try: # Apply detection rule if rule.detection_function(event_data): # Create threat alert alert = self.create_threat_alert( rule_id=rule_id, threat_type=rule.threat_pattern, severity=rule.severity, source=event_data.get("source", "unknown"), target=event_data.get("target", "unknown"), description=f"{rule.name}: {rule.description}", confidence=0.8 + (hash(f"{rule_id}_{event_data.get('timestamp', '')}") % 20) / 100 ) alerts.append(alert) self.alerts.append(alert) # Trigger alert handlers self.trigger_alert_handlers(alert) except Exception as e: # Log error but continue processing print(f"Error in detection rule {rule_id}: {e}") return alerts def create_threat_alert(self, rule_id: str, threat_type: str, severity: str, source: str, target: str, description: str, confidence: float) -> ThreatAlert: """Create a new threat alert.""" alert_id = hashlib.sha256(f"{rule_id}_{threat_type}_{datetime.datetime.now()}".encode()).hexdigest()[:16] alert = ThreatAlert( alert_id=alert_id, threat_type=threat_type, severity=severity, source=source, target=target, description=description, timestamp=datetime.datetime.now(), confidence=confidence, status="active" ) return alert def trigger_alert_handlers(self, alert: ThreatAlert) -> None: """Trigger all registered alert handlers for an alert.""" for handler_id, handler_function in self.alert_handlers.items(): try: handler_function(alert) except Exception as e: print(f"Error in alert handler {handler_id}: {e}") def acknowledge_alert(self, alert_id: str, assigned_to: str) -> bool: """Acknowledge a threat alert.""" for alert in self.alerts: if alert.alert_id == alert_id and alert.status == "active": alert.status = "acknowledged" alert.assigned_to = assigned_to return True return False def resolve_alert(self, alert_id: str, resolution: str = "resolved") -> bool: """Resolve a threat alert.""" for alert in self.alerts: if alert.alert_id == alert_id: alert.status = resolution alert.resolution_time = datetime.datetime.now() return True return False def mark_false_positive(self, alert_id: str, feedback_notes: str = "") -> bool: """Mark an alert as false positive.""" for alert in self.alerts: if alert.alert_id == alert_id: alert.status = "false_positive" alert.resolution_time = datetime.datetime.now() # Store feedback for improving detection self.false_positive_feedback.append({ "alert_id": alert_id, "rule_id": alert.threat_type, "feedback_time": datetime.datetime.now().isoformat(), "notes": feedback_notes }) # Adjust false positive rate for the rule for rule in self.detection_rules.values(): if rule.threat_pattern == alert.threat_type: rule.false_positive_rate = min(0.5, rule.false_positive_rate * 1.1) return True return False def check_escalation_policies(self) -> List[str]: """Check for alerts that need escalation.""" escalation_actions = [] current_time = datetime.datetime.now() for alert in self.alerts: if alert.status in ["active", "acknowledged"]: # Check if escalation policy exists for this severity if alert.severity in self.alert_escalation_policies: policy = self.alert_escalation_policies[alert.severity] timeout_minutes = policy["timeout_minutes"] # Check if alert has been open longer than timeout time_open = (current_time - alert.timestamp).total_seconds() / 60 if time_open > timeout_minutes: escalation_actions.append( f"Alert {alert.alert_id} ({alert.severity}) requires escalation: {policy['escalation_action']}" ) return escalation_actions def get_active_alerts(self) -> List[Dict[str, Any]]: """Get all active alerts.""" active_alerts = [a for a in self.alerts if a.status in ["active", "acknowledged"]] return [ { "alert_id": a.alert_id, "threat_type": a.threat_type, "severity": a.severity, "source": a.source, "target": a.target, "description": a.description, "timestamp": a.timestamp.isoformat(), "confidence": a.confidence, "status": a.status, "assigned_to": a.assigned_to, "age_minutes": int((datetime.datetime.now() - a.timestamp).total_seconds() / 60) } for a in sorted(active_alerts, key=lambda x: x.timestamp, reverse=True) ] def get_threat_summary(self, hours_back: int = 24) -> Dict[str, Any]: """Get threat detection summary.""" cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=hours_back) recent_alerts = [a for a in self.alerts if a.timestamp > cutoff_time] threat_breakdown = collections.Counter(a.threat_type for a in recent_alerts) severity_breakdown = collections.Counter(a.severity for a in recent_alerts) status_breakdown = collections.Counter(a.status for a in recent_alerts) # Calculate detection accuracy total_alerts = len(recent_alerts) false_positives = len([a for a in recent_alerts if a.status == "false_positive"]) accuracy_rate = ((total_alerts - false_positives) / total_alerts * 100) if total_alerts > 0 else 100 return { "summary_period_hours": hours_back, "total_alerts": total_alerts, "active_alerts": len([a for a in recent_alerts if a.status in ["active", "acknowledged"]]), "resolved_alerts": len([a for a in recent_alerts if a.status == "resolved"]), "false_positives": false_positives, "accuracy_rate": round(accuracy_rate, 2), "threat_types": dict(threat_breakdown), "severity_breakdown": dict(severity_breakdown), "status_breakdown": dict(status_breakdown), "avg_confidence": sum(a.confidence for a in recent_alerts) / max(1, total_alerts) } def test_threat_detection_system(): """Test the real-time threat detection system.""" detector = RealTimeThreatDetector() # Set escalation policies detector.set_escalation_policy("critical", 5, "immediate_notification") detector.set_escalation_policy("high", 15, "escalate_to_manager") detector.set_escalation_policy("medium", 30, "email_notification") # Register alert handler def email_handler(alert: ThreatAlert): print(f"📧 EMAIL ALERT: {alert.severity.upper()} - {alert.threat_type}") def sms_handler(alert: ThreatAlert): if alert.severity in ["high", "critical"]: print(f"📱 SMS ALERT: {alert.severity.upper()} - {alert.threat_type}") detector.register_alert_handler("email", email_handler) detector.register_alert_handler("sms", sms_handler) print("✅ Alert handlers and escalation policies configured") # Simulate various security events events = [ { "event_type": "login", "user_id": "john_doe", "ip_address": "203.0.113.100", # Unusual IP "timestamp": datetime.datetime.now(), "source": "login_system", "target": "user_account" }, { "event_type": "network_activity", "connection_count": 1500, # High number "data_size": 50000000, "port_scan_detected": True, "source": "external_ip", "target": "web_server" }, { "event_type": "resource_access", "user_id": "malicious_user", "resource": "admin_panel", "user_permissions": ["read"], "required_permissions": ["read", "write", "admin"], "source": "application", "target": "admin_system" }, { "event_type": "data_access", "data_accessed": [f"record_{i}" for i in range(1500)], # Large dataset "access_volume": 2000000, "unusual_pattern": True, "source": "database", "target": "sensitive_data" } ] # Process events total_alerts = 0 for i, event in enumerate(events): alerts = detector.process_event(event) total_alerts += len(alerts) print(f"✅ Event {i+1} processed: {len(alerts)} alerts generated") print(f"✅ Total alerts generated: {total_alerts}") # Acknowledge and resolve some alerts active_alerts = detector.get_active_alerts() if active_alerts: first_alert_id = active_alerts[0]["alert_id"] detector.acknowledge_alert(first_alert_id, "security_analyst_01") print(f"✅ Alert {first_alert_id} acknowledged") # Check escalation policies escalations = detector.check_escalation_policies() if escalations: print(f"⚠️ Escalation needed: {len(escalations)} alerts") # Get threat summary summary = detector.get_threat_summary() print(f"✅ Threat Summary (24h):") print(f" - Total alerts: {summary['total_alerts']}") print(f" - Active alerts: {summary['active_alerts']}") print(f" - Accuracy rate: {summary['accuracy_rate']}%") print(f" - Average confidence: {summary['avg_confidence']:.2f}") return detector if __name__ == "__main__": test_threat_detection_system()