#!/usr/bin/env python3 """ Logging and Monitoring Evaluation for Project Starlight Assesses logging and monitoring security capabilities """ import json import re import base64 import hashlib import datetime import math from typing import Dict, List, Optional, Any, Set class LoggingMonitoringEvaluator: """Evaluates logging and monitoring security capabilities.""" def __init__(self): self.critical_log_events = [ 'authentication_failure', 'authentication_success', 'authorization_failure', 'privilege_escalation', 'admin_access', 'config_change', 'data_access', 'file_upload', 'file_download', 'api_access', 'database_access', 'error_5xx', 'suspicious_activity', 'security_violation', 'system_change' ] self.required_log_fields = [ 'timestamp', 'event_type', 'user_id', 'source_ip', 'user_agent', 'resource_accessed', 'action_result', 'session_id' ] self.monitoring_metrics = [ 'request_rate', 'error_rate', 'response_time', 'cpu_usage', 'memory_usage', 'disk_usage', 'network_io', 'active_sessions', 'failed_logins', 'bandwidth_usage' ] def evaluate_logging_configuration(self, log_config: Dict[str, Any]) -> Dict[str, Any]: """Evaluate logging configuration for security compliance.""" evaluation = { 'timestamp': datetime.datetime.now().isoformat(), 'configuration': log_config, 'compliance_score': 0, 'missing_events': [], 'missing_fields': [], 'security_issues': [], 'recommendations': [] } # Check critical events logging logged_events = set(log_config.get('logged_events', [])) missing_events = set(self.critical_log_events) - logged_events evaluation['missing_events'] = list(missing_events) # Check required fields logged_fields = set(log_config.get('log_fields', [])) missing_fields = set(self.required_log_fields) - logged_fields evaluation['missing_fields'] = list(missing_fields) # Security issues if not log_config.get('log_rotation_enabled', False): evaluation['security_issues'].append({ 'severity': 'medium', 'issue': 'Log rotation not enabled - may lead to disk exhaustion' }) if not log_config.get('log_encryption_enabled', False): evaluation['security_issues'].append({ 'severity': 'high', 'issue': 'Log encryption not enabled - sensitive data exposed' }) if not log_config.get('access_controls', False): evaluation['security_issues'].append({ 'severity': 'high', 'issue': 'Log access controls not implemented' }) if log_config.get('log_level', 'INFO') == 'DEBUG': evaluation['security_issues'].append({ 'severity': 'medium', 'issue': 'DEBUG logging enabled - potential information disclosure risk' }) # Calculate compliance score max_score = 100 events_score = ((len(self.critical_log_events) - len(missing_events)) / len(self.critical_log_events)) * 40 fields_score = ((len(self.required_log_fields) - len(missing_fields)) / len(self.required_log_fields)) * 30 security_score = max(0, 30 - len(evaluation['security_issues']) * 5) evaluation['compliance_score'] = int(events_score + fields_score + security_score) # Generate recommendations evaluation['recommendations'] = self._generate_logging_recommendations(evaluation) return evaluation def evaluate_monitoring_setup(self, monitoring_config: Dict[str, Any]) -> Dict[str, Any]: """Evaluate monitoring configuration for security effectiveness.""" evaluation = { 'timestamp': datetime.datetime.now().isoformat(), 'configuration': monitoring_config, 'monitoring_score': 0, 'missing_metrics': [], 'alerting_gaps': [], 'monitoring_issues': [], 'recommendations': [] } # Check monitoring metrics monitored_metrics = set(monitoring_config.get('metrics', [])) missing_metrics = set(self.monitoring_metrics) - monitored_metrics evaluation['missing_metrics'] = list(missing_metrics) # Check alerting rules alert_rules = monitoring_config.get('alert_rules', []) critical_alerts = [ 'high_error_rate', 'unusual_traffic_patterns', 'failed_login_threshold', 'privilege_escalation_attempts', 'file_upload_anomalies', 'database_access_anomalies', 'resource_exhaustion', 'security_violation_events' ] missing_alerts = [] for alert in critical_alerts: if not any(alert in str(rule).lower() for rule in alert_rules): missing_alerts.append(alert) evaluation['alerting_gaps'] = missing_alerts # Check monitoring issues if not monitoring_config.get('real_time_monitoring', False): evaluation['monitoring_issues'].append({ 'severity': 'high', 'issue': 'Real-time monitoring not enabled' }) if not monitoring_config.get('retention_period', 0) >= 90: evaluation['monitoring_issues'].append({ 'severity': 'medium', 'issue': 'Monitoring retention period less than 90 days' }) if not monitoring_config.get('correlation_enabled', False): evaluation['monitoring_issues'].append({ 'severity': 'medium', 'issue': 'Security event correlation not enabled' }) # Calculate monitoring score metrics_score = ((len(self.monitoring_metrics) - len(missing_metrics)) / len(self.monitoring_metrics)) * 40 alerts_score = ((len(critical_alerts) - len(missing_alerts)) / len(critical_alerts)) * 30 config_score = max(0, 30 - len(evaluation['monitoring_issues']) * 5) evaluation['monitoring_score'] = int(metrics_score + alerts_score + config_score) # Generate recommendations evaluation['recommendations'] = self._generate_monitoring_recommendations(evaluation) return evaluation def analyze_log_entries(self, log_entries: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze log entries for security patterns and anomalies.""" analysis = { 'timestamp': datetime.datetime.now().isoformat(), 'total_entries': len(log_entries), 'time_period': self._get_time_period(log_entries), 'security_events': [], 'anomalies': [], 'patterns': {}, 'risk_score': 0 } if not log_entries: return analysis # Analyze for security events for entry in log_entries: security_findings = self._analyze_security_event(entry) analysis['security_events'].extend(security_findings) # Detect anomalies analysis['anomalies'] = self._detect_anomalies(log_entries) # Analyze patterns analysis['patterns'] = self._analyze_patterns(log_entries) # Calculate risk score analysis['risk_score'] = self._calculate_risk_score(analysis) return analysis def _analyze_security_event(self, log_entry: Dict[str, Any]) -> List[Dict[str, Any]]: """Analyze individual log entry for security indicators.""" findings = [] # Check for failed authentication if log_entry.get('event_type') == 'authentication_failure': findings.append({ 'type': 'failed_authentication', 'severity': 'medium', 'description': 'Failed authentication attempt', 'timestamp': log_entry.get('timestamp'), 'source_ip': log_entry.get('source_ip'), 'user_id': log_entry.get('user_id') }) # Check for privilege escalation if log_entry.get('event_type') == 'privilege_escalation': findings.append({ 'type': 'privilege_escalation', 'severity': 'high', 'description': 'Privilege escalation attempt', 'timestamp': log_entry.get('timestamp'), 'source_ip': log_entry.get('source_ip'), 'user_id': log_entry.get('user_id') }) # Check for admin access if log_entry.get('event_type') == 'admin_access': findings.append({ 'type': 'admin_access', 'severity': 'medium', 'description': 'Administrative access', 'timestamp': log_entry.get('timestamp'), 'source_ip': log_entry.get('source_ip'), 'user_id': log_entry.get('user_id') }) # Check for suspicious IP addresses source_ip = log_entry.get('source_ip', '') if self._is_suspicious_ip(source_ip): findings.append({ 'type': 'suspicious_ip', 'severity': 'medium', 'description': f'Activity from suspicious IP: {source_ip}', 'timestamp': log_entry.get('timestamp'), 'source_ip': source_ip }) return findings def _detect_anomalies(self, log_entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Detect anomalies in log patterns.""" anomalies = [] # Group entries by hour hourly_counts = {} for entry in log_entries: timestamp = entry.get('timestamp', '') if timestamp: hour = timestamp[:13] # YYYY-MM-DDTHH hourly_counts[hour] = hourly_counts.get(hour, 0) + 1 if hourly_counts: avg_requests = sum(hourly_counts.values()) / len(hourly_counts) std_dev = math.sqrt(sum((count - avg_requests) ** 2 for count in hourly_counts.values()) / len(hourly_counts)) for hour, count in hourly_counts.items(): if count > avg_requests + 2 * std_dev: anomalies.append({ 'type': 'traffic_spike', 'severity': 'medium', 'description': f'Unusual traffic spike at {hour}: {count} requests (avg: {avg_requests:.1f})' }) # Check for unusual authentication patterns failed_logins_by_ip = {} for entry in log_entries: if entry.get('event_type') == 'authentication_failure': ip = entry.get('source_ip', '') if ip: failed_logins_by_ip[ip] = failed_logins_by_ip.get(ip, 0) + 1 for ip, count in failed_logins_by_ip.items(): if count > 10: # Threshold for failed login attempts anomalies.append({ 'type': 'brute_force_attempt', 'severity': 'high', 'description': f'Possible brute force attack from {ip}: {count} failed logins' }) return anomalies def _analyze_patterns(self, log_entries: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze patterns in log data.""" patterns = { 'top_ips': {}, 'top_users': {}, 'top_events': {}, 'time_distribution': {} } # Analyze top source IPs ip_counts = {} for entry in log_entries: ip = entry.get('source_ip', '') if ip: ip_counts[ip] = ip_counts.get(ip, 0) + 1 patterns['top_ips'] = dict(sorted(ip_counts.items(), key=lambda x: x[1], reverse=True)[:10]) # Analyze top users user_counts = {} for entry in log_entries: user = entry.get('user_id', '') if user: user_counts[user] = user_counts.get(user, 0) + 1 patterns['top_users'] = dict(sorted(user_counts.items(), key=lambda x: x[1], reverse=True)[:10]) # Analyze event types event_counts = {} for entry in log_entries: event = entry.get('event_type', '') if event: event_counts[event] = event_counts.get(event, 0) + 1 patterns['top_events'] = dict(sorted(event_counts.items(), key=lambda x: x[1], reverse=True)) return patterns def _is_suspicious_ip(self, ip: str) -> bool: """Check if IP address is suspicious.""" # Suspicious IP patterns suspicious_patterns = [ r'^10\.', # Private network r'^172\.(1[6-9]|2[0-9]|3[0-1])\.', # Private network r'^192\.168\.', # Private network r'^169\.254\.', # Link-local r'^127\.', # Loopback ] for pattern in suspicious_patterns: if re.match(pattern, ip): return True return False def _get_time_period(self, log_entries: List[Dict[str, Any]]) -> Dict[str, str]: """Get time period covered by log entries.""" timestamps = [entry.get('timestamp', '') for entry in log_entries if entry.get('timestamp')] if timestamps: return { 'start': min(timestamps), 'end': max(timestamps) } return {} def _calculate_risk_score(self, analysis: Dict[str, Any]) -> int: """Calculate overall risk score from log analysis.""" score = 0 # Score based on security events for event in analysis['security_events']: if event['severity'] == 'critical': score += 20 elif event['severity'] == 'high': score += 15 elif event['severity'] == 'medium': score += 10 elif event['severity'] == 'low': score += 5 # Score based on anomalies for anomaly in analysis['anomalies']: if anomaly['severity'] == 'high': score += 15 elif anomaly['severity'] == 'medium': score += 10 elif anomaly['severity'] == 'low': score += 5 return min(100, score) def _generate_logging_recommendations(self, evaluation: Dict[str, Any]) -> List[str]: """Generate logging recommendations.""" recommendations = [] if evaluation['missing_events']: recommendations.append(f"Enable logging for missing events: {', '.join(evaluation['missing_events'])}") if evaluation['missing_fields']: recommendations.append(f"Add missing log fields: {', '.join(evaluation['missing_fields'])}") for issue in evaluation['security_issues']: recommendations.append(f"Fix: {issue['issue']}") return recommendations def _generate_monitoring_recommendations(self, evaluation: Dict[str, Any]) -> List[str]: """Generate monitoring recommendations.""" recommendations = [] if evaluation['missing_metrics']: recommendations.append(f"Monitor missing metrics: {', '.join(evaluation['missing_metrics'])}") if evaluation['alerting_gaps']: recommendations.append(f"Add missing alerts: {', '.join(evaluation['alerting_gaps'])}") for issue in evaluation['monitoring_issues']: recommendations.append(f"Fix: {issue['issue']}") return recommendations def generate_comprehensive_report(self) -> Dict[str, Any]: """Generate comprehensive logging and monitoring evaluation report.""" return { 'timestamp': datetime.datetime.now().isoformat(), 'evaluation_type': 'Logging and Monitoring Security Assessment', 'critical_log_events': self.critical_log_events, 'required_log_fields': self.required_log_fields, 'monitoring_metrics': self.monitoring_metrics, 'best_practices': self._get_best_practices(), 'compliance_requirements': self._get_compliance_requirements() } def _get_best_practices(self) -> List[str]: """Get logging and monitoring best practices.""" return [ "Enable comprehensive logging for all security-relevant events", "Use structured logging format (JSON) for better analysis", "Implement log rotation and retention policies", "Encrypt logs containing sensitive information", "Implement strict access controls for log data", "Use centralized logging solution for correlation", "Enable real-time monitoring and alerting", "Implement automated threat detection rules", "Regular security log reviews and audits", "Maintain logs for compliance requirements (minimum 90 days)", "Monitor for unusual patterns and anomalies", "Implement log tampering detection" ] def _get_compliance_requirements(self) -> Dict[str, List[str]]: """Get compliance requirements for logging and monitoring.""" return { 'GDPR': [ "Log all personal data processing activities", "Maintain logs for data subject requests", "Ensure data protection impact assessments" ], 'SOC2': [ "Comprehensive logging of security events", "Monitoring of system access and changes", "Regular log reviews and analysis" ], 'PCI_DSS': [ "Log all access to cardholder data", "Maintain audit trail for all actions", "Daily log review and monitoring" ], 'ISO27001': [ "Comprehensive logging policy and procedures", "Monitoring of information security events", "Regular testing of logging systems" ] } def main(): """Test the logging and monitoring evaluator.""" evaluator = LoggingMonitoringEvaluator() # Test logging configuration log_config = { 'logged_events': ['authentication_success', 'authentication_failure', 'file_upload'], 'log_fields': ['timestamp', 'event_type', 'user_id', 'source_ip'], 'log_rotation_enabled': True, 'log_encryption_enabled': False, 'access_controls': True, 'log_level': 'INFO' } logging_eval = evaluator.evaluate_logging_configuration(log_config) print("Logging Configuration Evaluation:") print(f"Compliance Score: {logging_eval['compliance_score']}") print(f"Missing Events: {len(logging_eval['missing_events'])}") print(f"Security Issues: {len(logging_eval['security_issues'])}") # Test monitoring configuration monitoring_config = { 'metrics': ['request_rate', 'error_rate', 'cpu_usage'], 'alert_rules': ['high_error_rate', 'failed_login_threshold'], 'real_time_monitoring': True, 'retention_period': 60, 'correlation_enabled': False } monitoring_eval = evaluator.evaluate_monitoring_setup(monitoring_config) print(f"\nMonitoring Setup Evaluation:") print(f"Monitoring Score: {monitoring_eval['monitoring_score']}") print(f"Missing Metrics: {len(monitoring_eval['missing_metrics'])}") print(f"Alerting Gaps: {len(monitoring_eval['alerting_gaps'])}") # Test log analysis sample_logs = [ { 'timestamp': '2026-01-31T10:00:00Z', 'event_type': 'authentication_failure', 'user_id': 'user1', 'source_ip': '192.168.1.100', 'user_agent': 'Mozilla/5.0' }, { 'timestamp': '2026-01-31T10:05:00Z', 'event_type': 'admin_access', 'user_id': 'admin1', 'source_ip': '10.0.0.50', 'user_agent': 'Mozilla/5.0' } ] log_analysis = evaluator.analyze_log_entries(sample_logs) print(f"\nLog Analysis Results:") print(f"Security Events: {len(log_analysis['security_events'])}") print(f"Anomalies: {len(log_analysis['anomalies'])}") print(f"Risk Score: {log_analysis['risk_score']}") return { 'logging_evaluation': logging_eval, 'monitoring_evaluation': monitoring_eval, 'log_analysis': log_analysis } if __name__ == "__main__": main()