import json import math import datetime from typing import Dict, List, Optional, Any, Tuple import dataclasses import collections import hashlib import re import base64 @dataclasses.dataclass class SecurityThreat: """Security threat information.""" threat_id: str threat_type: str # malware, phishing, ddos, intrusion, data_breach, etc. severity: str # low, medium, high, critical source_ip: str target_system: str description: str timestamp: datetime.datetime confidence: float status: str # active, mitigated, investigating, resolved @dataclasses.dataclass class SecurityIncident: """Security incident record.""" incident_id: str incident_type: str affected_systems: List[str] impact_level: str # low, medium, high, critical detection_method: str containment_actions: List[str] recovery_actions: List[str] timestamp: datetime.datetime resolved: bool = False resolution_time: Optional[datetime.timedelta] = None class CybersecurityInfrastructure: """Comprehensive cybersecurity infrastructure for digital systems.""" def __init__(self): self.monitored_systems = {} self.threat_intelligence = {} self.security_policies = {} self.threats = [] self.incidents = [] self.firewall_rules = [] self.vulnerability_scans = [] self.access_logs = collections.deque(maxlen=10000) def register_system(self, system_id: str, system_type: str, ip_address: str, criticality: str, owner: str) -> bool: """Register a system for security monitoring.""" system_info = { "system_id": system_id, "system_type": system_type, "ip_address": ip_address, "criticality": criticality, # low, medium, high, critical "owner": owner, "status": "active", "last_scan": None, "vulnerabilities": 0, "security_score": 100, "registered_at": datetime.datetime.now().isoformat() } self.monitored_systems[system_id] = system_info return True def scan_system_vulnerabilities(self, system_id: str) -> Dict[str, Any]: """Perform vulnerability scan on a system.""" if system_id not in self.monitored_systems: return {"error": "System not found"} system = self.monitored_systems[system_id] # Simulate vulnerability scan vulnerabilities = [] vuln_types = ["SQL Injection", "XSS", "Outdated Software", "Weak Passwords", "Open Ports", "SSL Issues"] # Generate random vulnerabilities vuln_count = (hash(f"{system_id}_{datetime.datetime.now().strftime('%Y%m%d')}") % 8) for i in range(vuln_count): vuln_type = vuln_types[hash(f"{system_id}_vuln_{i}") % len(vuln_types)] severity = ["low", "medium", "high"][hash(f"{system_id}_sev_{i}") % 3] vulnerabilities.append({ "vulnerability_id": hashlib.sha256(f"{system_id}_{vuln_type}_{i}".encode()).hexdigest()[:12], "type": vuln_type, "severity": severity, "description": f"Potential {vuln_type} vulnerability detected", "cve_id": f"CVE-2024-{hash(f'{system_id}_{i}') % 10000:04d}", "recommendation": f"Update {vuln_type} protection" }) # Calculate security score severity_weights = {"low": 1, "medium": 3, "high": 5} risk_score = sum(severity_weights.get(v["severity"], 1) for v in vulnerabilities) security_score = max(0, 100 - (risk_score * 2)) scan_result = { "system_id": system_id, "scan_timestamp": datetime.datetime.now().isoformat(), "vulnerabilities_found": len(vulnerabilities), "vulnerabilities": vulnerabilities, "security_score": security_score, "risk_level": self.calculate_risk_level(security_score), "scan_complete": True } self.vulnerability_scans.append(scan_result) system["last_scan"] = scan_result["scan_timestamp"] system["vulnerabilities"] = len(vulnerabilities) system["security_score"] = security_score return scan_result def calculate_risk_level(self, security_score: int) -> str: """Calculate risk level based on security score.""" if security_score >= 90: return "low" elif security_score >= 70: return "medium" elif security_score >= 50: return "high" else: return "critical" def detect_intrusion_attempts(self, system_id: str) -> List[SecurityThreat]: """Detect intrusion attempts against a system.""" if system_id not in self.monitored_systems: return [] system = self.monitored_systems[system_id] threats = [] # Simulate intrusion detection threat_types = ["brute_force", "port_scan", "sql_injection", "xss", "ddos", "malware"] # Generate potential threats based on system type and security score if system["security_score"] < 80: threat_count = (hash(f"{system_id}_{datetime.datetime.now().strftime('%H')}") % 3) + 1 for i in range(threat_count): threat_type = threat_types[hash(f"{system_id}_threat_{i}") % len(threat_types)] severity = ["low", "medium", "high", "critical"][hash(f"{system_id}_severity_{i}") % 4] # Generate fake source IP source_ip = f"{hash(f'src_{i}') % 256}.{hash(f'src_{i}_2') % 256}.{hash(f'src_{i}_3') % 256}.{hash(f'src_{i}_4') % 256}" threat = SecurityThreat( threat_id=hashlib.sha256(f"{system_id}_{threat_type}_{datetime.datetime.now()}".encode()).hexdigest()[:16], threat_type=threat_type, severity=severity, source_ip=source_ip, target_system=system_id, description=f"Detected {threat_type} activity from {source_ip}", timestamp=datetime.datetime.now(), confidence=0.7 + (hash(f"{system_id}_conf_{i}") % 25) / 100, status="active" ) threats.append(threat) self.threats.append(threat) return threats def analyze_network_traffic(self, traffic_data: Dict[str, Any]) -> List[SecurityThreat]: """Analyze network traffic for security threats.""" threats = [] # Simulate traffic analysis source_ip = traffic_data.get("source_ip", "unknown") destination = traffic_data.get("destination", "unknown") protocol = traffic_data.get("protocol", "TCP") data_size = traffic_data.get("data_size", 0) # Check for suspicious patterns if data_size > 10000000: # Large data transfer threat = SecurityThreat( threat_id=hashlib.sha256(f"traffic_{source_ip}_{datetime.datetime.now()}".encode()).hexdigest()[:16], threat_type="data_exfiltration", severity="high", source_ip=source_ip, target_system=destination, description=f"Large data transfer detected: {data_size} bytes", timestamp=datetime.datetime.now(), confidence=0.8, status="active" ) threats.append(threat) # Check for frequent connections (potential DDoS) connection_count = traffic_data.get("connection_count", 1) if connection_count > 1000: threat = SecurityThreat( threat_id=hashlib.sha256(f"ddos_{source_ip}_{datetime.datetime.now()}".encode()).hexdigest()[:16], threat_type="ddos", severity="critical", source_ip=source_ip, target_system=destination, description=f"High connection rate detected: {connection_count} connections", timestamp=datetime.datetime.now(), confidence=0.9, status="active" ) threats.append(threat) for threat in threats: self.threats.append(threat) return threats def create_security_incident(self, incident_type: str, affected_systems: List[str], description: str, detection_method: str) -> str: """Create a security incident record.""" incident_id = hashlib.sha256(f"{incident_type}_{datetime.datetime.now()}".encode()).hexdigest()[:16] # Determine impact level based on affected systems impact_level = "low" for system_id in affected_systems: if system_id in self.monitored_systems: system_criticality = self.monitored_systems[system_id]["criticality"] if system_criticality == "critical": impact_level = "critical" break elif system_criticality == "high" and impact_level != "critical": impact_level = "high" elif system_criticality == "medium" and impact_level not in ["high", "critical"]: impact_level = "medium" incident = SecurityIncident( incident_id=incident_id, incident_type=incident_type, affected_systems=affected_systems, impact_level=impact_level, detection_method=detection_method, containment_actions=[], recovery_actions=[], timestamp=datetime.datetime.now() ) self.incidents.append(incident) return incident_id def implement_firewall_rule(self, rule_name: str, source: str, destination: str, port: int, action: str, description: str) -> bool: """Implement firewall rule.""" rule = { "rule_id": hashlib.sha256(f"{rule_name}_{datetime.datetime.now()}".encode()).hexdigest()[:12], "name": rule_name, "source": source, "destination": destination, "port": port, "action": action, # allow, deny, log "description": description, "created_at": datetime.datetime.now().isoformat(), "status": "active" } self.firewall_rules.append(rule) return True def log_access_event(self, system_id: str, user_id: str, action: str, ip_address: str, success: bool) -> bool: """Log system access event.""" log_entry = { "timestamp": datetime.datetime.now().isoformat(), "system_id": system_id, "user_id": user_id, "action": action, "ip_address": ip_address, "success": success, "session_id": hashlib.sha256(f"{user_id}_{datetime.datetime.now()}".encode()).hexdigest()[:16] } self.access_logs.append(log_entry) return True def generate_security_dashboard(self) -> Dict[str, Any]: """Generate comprehensive security dashboard data.""" current_time = datetime.datetime.now() # Calculate recent threats (last 24 hours) recent_threats = [t for t in self.threats if (current_time - t.timestamp).total_seconds() < 86400] # Calculate active incidents active_incidents = [i for i in self.incidents if not i.resolved] # System security overview total_systems = len(self.monitored_systems) secure_systems = len([s for s in self.monitored_systems.values() if s.get("security_score", 0) >= 70]) # Threat breakdown threat_breakdown = collections.Counter(t.threat_type for t in recent_threats) severity_breakdown = collections.Counter(t.severity for t in recent_threats) # Recent incidents recent_incidents = [i for i in self.incidents if (current_time - i.timestamp).total_seconds() < 86400] return { "dashboard_timestamp": current_time.isoformat(), "systems_overview": { "total_monitored": total_systems, "secure_systems": secure_systems, "at_risk_systems": total_systems - secure_systems, "average_security_score": sum(s.get("security_score", 0) for s in self.monitored_systems.values()) / max(1, total_systems) }, "threat_summary": { "total_threats_24h": len(recent_threats), "active_threats": len([t for t in recent_threats if t.status == "active"]), "threat_types": dict(threat_breakdown), "severity_breakdown": dict(severity_breakdown), "threats": [ { "threat_id": t.threat_id, "type": t.threat_type, "severity": t.severity, "source_ip": t.source_ip, "target": t.target_system, "confidence": t.confidence, "timestamp": t.timestamp.isoformat() } for t in recent_threats[-10:] # Last 10 threats ] }, "incidents_summary": { "active_incidents": len(active_incidents), "total_incidents_24h": len(recent_incidents), "incidents_by_impact": collections.Counter(i.impact_level for i in recent_incidents), "recent_incidents": [ { "incident_id": i.incident_id, "type": i.incident_type, "impact_level": i.impact_level, "affected_systems": i.affected_systems, "timestamp": i.timestamp.isoformat() } for i in recent_incidents[-5:] # Last 5 incidents ] }, "security_metrics": { "firewall_rules": len([r for r in self.firewall_rules if r["status"] == "active"]), "vulnerability_scans_24h": len([s for s in self.vulnerability_scans if (current_time - datetime.datetime.fromisoformat(s["scan_timestamp"])).total_seconds() < 86400]), "access_logs_24h": len([log for log in self.access_logs if (current_time - datetime.datetime.fromisoformat(log["timestamp"])).total_seconds() < 86400]) }, "recommendations": self.generate_security_recommendations() } def generate_security_recommendations(self) -> List[str]: """Generate security recommendations based on current state.""" recommendations = [] # Check for vulnerable systems vulnerable_systems = [s for s in self.monitored_systems.values() if s.get("security_score", 100) < 70] if vulnerable_systems: recommendations.append(f"Update patches for {len(vulnerable_systems)} vulnerable systems") # Check for active threats active_threats = [t for t in self.threats if t.status == "active"] if len(active_threats) > 5: recommendations.append("Implement additional intrusion detection measures") # Check for recent incidents recent_incidents = [i for i in self.incidents if (datetime.datetime.now() - i.timestamp).total_seconds() < 86400] if len(recent_incidents) > 3: recommendations.append("Review and strengthen security policies") if not recommendations: recommendations.append("Security posture is optimal - continue monitoring") return recommendations def test_cybersecurity_system(): """Test the cybersecurity infrastructure.""" cs_system = CybersecurityInfrastructure() # Register systems for monitoring cs_system.register_system("WEB_SERVER_01", "web_server", "192.168.1.10", "critical", "IT Department") cs_system.register_system("DB_SERVER_01", "database", "192.168.1.20", "high", "DB Team") cs_system.register_system("APP_SERVER_01", "application", "192.168.1.30", "medium", "Dev Team") cs_system.register_system("FILE_SERVER_01", "file_server", "192.168.1.40", "low", "Operations") print("✅ Systems registered for security monitoring") # Perform vulnerability scans for system_id in ["WEB_SERVER_01", "DB_SERVER_01", "APP_SERVER_01"]: scan_result = cs_system.scan_system_vulnerabilities(system_id) print(f"✅ {system_id}: {scan_result['vulnerabilities_found']} vulnerabilities found, security score: {scan_result['security_score']}") # Detect intrusion attempts for system_id in ["WEB_SERVER_01", "DB_SERVER_01"]: threats = cs_system.detect_intrusion_attempts(system_id) print(f"✅ {system_id}: {len(threats)} intrusion attempts detected") # Analyze network traffic traffic_data = { "source_ip": "203.0.113.1", "destination": "WEB_SERVER_01", "protocol": "TCP", "data_size": 15000000, # 15MB - suspicious "connection_count": 1500 # High frequency } traffic_threats = cs_system.analyze_network_traffic(traffic_data) print(f"✅ Network traffic analysis: {len(traffic_threats)} threats detected") # Create security incident incident_id = cs_system.create_security_incident( "data_breach", ["WEB_SERVER_01", "DB_SERVER_01"], "Unauthorized access attempt detected and blocked", "intrusion_detection_system" ) print(f"✅ Security incident created: {incident_id}") # Implement firewall rules cs_system.implement_firewall_rule( "BLOCK_SUSPICIOUS_IP", "203.0.113.1", "any", 0, "deny", "Block known malicious IP address" ) cs_system.implement_firewall_rule( "RATE_LIMIT", "any", "WEB_SERVER_01", 80, "log", "Log and rate limit web traffic" ) print("✅ Firewall rules implemented") # Log access events cs_system.log_access_event("WEB_SERVER_01", "admin_user", "login", "192.168.1.100", True) cs_system.log_access_event("DB_SERVER_01", "app_user", "query", "192.168.1.30", True) cs_system.log_access_event("WEB_SERVER_01", "unknown", "login", "203.0.113.1", False) print("✅ Access events logged") # Generate dashboard dashboard = cs_system.generate_security_dashboard() print(f"✅ Security dashboard generated") print(f" - Systems monitored: {dashboard['systems_overview']['total_monitored']}") print(f" - Active threats: {dashboard['threat_summary']['active_threats']}") print(f" - Active incidents: {dashboard['incidents_summary']['active_incidents']}") print(f" - Recommendations: {len(dashboard['recommendations'])}") return cs_system if __name__ == "__main__": test_cybersecurity_system()