""" Production Monitoring & Alerting System Enterprise-grade monitoring and alerting for Project Starlight steganography detection """ import json import math import base64 import hashlib import datetime import re import string import itertools import collections import dataclasses from typing import Dict, List, Optional, Any, Union, Tuple from dataclasses import dataclass from enum import Enum class MetricType(Enum): COUNTER = "counter" GAUGE = "gauge" HISTOGRAM = "histogram" TIMER = "timer" class AlertSeverity(Enum): INFO = "info" WARNING = "warning" CRITICAL = "critical" EMERGENCY = "emergency" class ServiceHealth(Enum): HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" UNKNOWN = "unknown" @dataclass class Metric: name: str metric_type: MetricType value: float labels: Dict[str, str] timestamp: datetime.datetime @dataclass class Alert: alert_id: str metric_name: str severity: AlertSeverity message: str current_value: float threshold: float service: str timestamp: datetime.datetime resolved: bool = False resolved_timestamp: Optional[datetime.datetime] = None @dataclass class HealthCheck: service_name: str status: ServiceHealth response_time: float last_check: datetime.datetime error_message: Optional[str] = None @dataclass class MonitoringDashboard: dashboard_id: str name: str metrics: List[str] alerts: List[str] refresh_interval_seconds: int class MetricsCollector: """Enterprise metrics collection system""" def __init__(self): self.metrics: List[Metric] = [] self.counters: Dict[str, float] = {} self.gauges: Dict[str, float] = {} def increment_counter(self, name: str, labels: Dict[str, str] = None, value: float = 1.0): """Increment counter metric""" labels = labels or {} key = f"{name}:{hash(json.dumps(labels, sort_keys=True))}" self.counters[key] = self.counters.get(key, 0) + value metric = Metric( name=name, metric_type=MetricType.COUNTER, value=self.counters[key], labels=labels, timestamp=datetime.datetime.now() ) self.metrics.append(metric) def set_gauge(self, name: str, value: float, labels: Dict[str, str] = None): """Set gauge metric value""" labels = labels or {} key = f"{name}:{hash(json.dumps(labels, sort_keys=True))}" self.gauges[key] = value metric = Metric( name=name, metric_type=MetricType.GAUGE, value=value, labels=labels, timestamp=datetime.datetime.now() ) self.metrics.append(metric) def record_timer(self, name: str, duration_seconds: float, labels: Dict[str, str] = None): """Record timing metric""" labels = labels or {} metric = Metric( name=name, metric_type=MetricType.TIMER, value=duration_seconds, labels=labels, timestamp=datetime.datetime.now() ) self.metrics.append(metric) def get_metrics_summary(self) -> Dict[str, Any]: """Get metrics summary""" if not self.metrics: return {"message": "No metrics collected"} # Group metrics by name metric_groups = collections.defaultdict(list) for metric in self.metrics: metric_groups[metric.name].append(metric) summary = { "total_metrics": len(self.metrics), "unique_metric_names": len(metric_groups), "latest_timestamp": max(m.timestamp for m in self.metrics).isoformat(), "metrics_by_type": {}, "metrics_summary": {} } # Count by type for metric_type in MetricType: count = sum(1 for m in self.metrics if m.metric_type == metric_type) summary["metrics_by_type"][metric_type.value] = count # Summarize each metric for name, metric_list in metric_groups.items(): values = [m.value for m in metric_list] summary["metrics_summary"][name] = { "count": len(values), "latest": values[-1] if values else 0, "average": sum(values) / len(values) if values else 0, "min": min(values) if values else 0, "max": max(values) if values else 0 } return summary class AlertManager: """Enterprise alert management system""" def __init__(self): self.alert_rules: List[Dict[str, Any]] = [] self.active_alerts: List[Alert] = [] self.alert_history: List[Alert] = [] def add_alert_rule(self, metric_name: str, threshold: float, operator: str, severity: AlertSeverity, service: str, message_template: str): """Add alert rule""" rule = { "metric_name": metric_name, "threshold": threshold, "operator": operator, # ">", "<", ">=", "<=", "==" "severity": severity, "service": service, "message_template": message_template } self.alert_rules.append(rule) def evaluate_alerts(self, metrics: List[Metric]): """Evaluate metrics against alert rules""" # Get latest metric values latest_metrics = {} for metric in metrics: key = f"{metric.name}:{json.dumps(metric.labels, sort_keys=True)}" latest_metrics[key] = metric for rule in self.alert_rules: # Find matching metrics matching_metrics = [] for key, metric in latest_metrics.items(): if metric.name == rule["metric_name"]: matching_metrics.append(metric) # Evaluate each matching metric for metric in matching_metrics: alert_triggered = self._evaluate_condition( metric.value, rule["threshold"], rule["operator"] ) if alert_triggered: # Check if alert already exists existing_alert = self._find_existing_alert(rule, metric) if not existing_alert: # Create new alert alert = Alert( alert_id=f"ALERT_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(metric.name) % 10000}", metric_name=metric.name, severity=rule["severity"], message=rule["message_template"].format( metric_name=metric.name, current_value=metric.value, threshold=rule["threshold"] ), current_value=metric.value, threshold=rule["threshold"], service=rule["service"], timestamp=datetime.datetime.now() ) self.active_alerts.append(alert) self.alert_history.append(alert) print(f"🚨 ALERT: [{rule['severity'].value.upper()}] {alert.message}") def _evaluate_condition(self, value: float, threshold: float, operator: str) -> bool: """Evaluate alert condition""" if operator == ">": return value > threshold elif operator == "<": return value < threshold elif operator == ">=": return value >= threshold elif operator == "<=": return value <= threshold elif operator == "==": return value == threshold else: return False def _find_existing_alert(self, rule: Dict[str, Any], metric: Metric) -> Optional[Alert]: """Find existing unresolved alert""" for alert in self.active_alerts: if (alert.metric_name == rule["metric_name"] and alert.service == rule["service"] and not alert.resolved): return alert return None def resolve_alert(self, alert_id: str): """Resolve an alert""" for alert in self.active_alerts: if alert.alert_id == alert_id: alert.resolved = True alert.resolved_timestamp = datetime.datetime.now() break class HealthChecker: """Enterprise health checking system""" def __init__(self): self.health_checks: Dict[str, HealthCheck] = {} self.health_history: List[HealthCheck] = [] def check_service_health(self, service_name: str, check_endpoint: str = None) -> HealthCheck: """Check health of a service""" start_time = datetime.datetime.now() try: # Mock health check # Simulate different service behaviors based on service name if "critical" in service_name: # Critical services have 99% uptime is_healthy = (hash(service_name + datetime.datetime.now().strftime('%Y%m%d%H%M')) % 100) < 99 response_time = 0.05 + ((hash(service_name) % 100) / 2000) elif "secondary" in service_name: # Secondary services have 95% uptime is_healthy = (hash(service_name + datetime.datetime.now().strftime('%Y%m%d%H%M')) % 100) < 95 response_time = 0.1 + ((hash(service_name) % 100) / 1000) else: # Regular services have 98% uptime is_healthy = (hash(service_name + datetime.datetime.now().strftime('%Y%m%d%H%M')) % 100) < 98 response_time = 0.08 + ((hash(service_name) % 100) / 1500) end_time = datetime.datetime.now() check_duration = (end_time - start_time).total_seconds() if is_healthy and check_duration < 1.0: status = ServiceHealth.HEALTHY error_message = None elif check_duration >= 1.0: status = ServiceHealth.DEGRADED error_message = "Response time above threshold" else: status = ServiceHealth.UNHEALTHY error_message = "Service check failed" except Exception as e: end_time = datetime.datetime.now() check_duration = (end_time - start_time).total_seconds() status = ServiceHealth.UNHEALTHY error_message = str(e) health_check = HealthCheck( service_name=service_name, status=status, response_time=check_duration, last_check=end_time, error_message=error_message ) self.health_checks[service_name] = health_check self.health_history.append(health_check) return health_check def get_system_health(self) -> Dict[str, Any]: """Get overall system health""" if not self.health_checks: return {"status": ServiceHealth.UNKNOWN.value, "services": 0} service_count = len(self.health_checks) healthy_count = sum(1 for hc in self.health_checks.values() if hc.status == ServiceHealth.HEALTHY) degraded_count = sum(1 for hc in self.health_checks.values() if hc.status == ServiceHealth.DEGRADED) unhealthy_count = sum(1 for hc in self.health_checks.values() if hc.status == ServiceHealth.UNHEALTHY) if healthy_count == service_count: overall_status = ServiceHealth.HEALTHY elif unhealthy_count == 0: overall_status = ServiceHealth.DEGRADED else: overall_status = ServiceHealth.UNHEALTHY return { "overall_status": overall_status.value, "total_services": service_count, "healthy_services": healthy_count, "degraded_services": degraded_count, "unhealthy_services": unhealthy_count, "health_percentage": (healthy_count / service_count) * 100 if service_count > 0 else 0 } class MonitoringSystem: """Enterprise monitoring and alerting system""" def __init__(self): self.metrics_collector = MetricsCollector() self.alert_manager = AlertManager() self.health_checker = HealthChecker() self.dashboards: List[MonitoringDashboard] = [] def setup_monitoring(self): """Setup monitoring with default rules and checks""" print("šŸ”§ Setting up Enterprise Monitoring System...") # Setup alert rules self._setup_default_alert_rules() # Setup health checks self._setup_health_checks() # Setup dashboards self._setup_dashboards() print("āœ… Monitoring System Ready") def _setup_default_alert_rules(self): """Setup default alert rules""" # Steganography detection performance alerts self.alert_manager.add_alert_rule( metric_name="steganography_detection_duration", threshold=1.0, operator=">", severity=AlertSeverity.WARNING, service="steganography_service", message_template="Steganography detection slow: {current_value:.2f}s > {threshold}s" ) self.alert_manager.add_alert_rule( metric_name="steganography_detection_duration", threshold=2.0, operator=">", severity=AlertSeverity.CRITICAL, service="steganography_service", message_template="Steganography detection critical: {current_value:.2f}s > {threshold}s" ) # Error rate alerts self.alert_manager.add_alert_rule( metric_name="error_rate", threshold=5.0, operator=">", severity=AlertSeverity.WARNING, service="api_gateway", message_template="High error rate: {current_value:.1f}% > {threshold}%" ) self.alert_manager.add_alert_rule( metric_name="error_rate", threshold=10.0, operator=">", severity=AlertSeverity.CRITICAL, service="api_gateway", message_template="Critical error rate: {current_value:.1f}% > {threshold}%" ) # CPU utilization alerts self.alert_manager.add_alert_rule( metric_name="cpu_utilization", threshold=80.0, operator=">", severity=AlertSeverity.WARNING, service="ml_infrastructure", message_template="High CPU usage: {current_value:.1f}% > {threshold}%" ) self.alert_manager.add_alert_rule( metric_name="cpu_utilization", threshold=95.0, operator=">", severity=AlertSeverity.CRITICAL, service="ml_infrastructure", message_template="Critical CPU usage: {current_value:.1f}% > {threshold}%" ) # Memory utilization alerts self.alert_manager.add_alert_rule( metric_name="memory_utilization", threshold=85.0, operator=">", severity=AlertSeverity.WARNING, service="ml_infrastructure", message_template="High memory usage: {current_value:.1f}% > {threshold}%" ) # Request rate alerts self.alert_manager.add_alert_rule( metric_name="requests_per_second", threshold=0.0, operator="==", severity=AlertSeverity.CRITICAL, service="api_gateway", message_template="No requests detected - possible service outage" ) def _setup_health_checks(self): """Setup health checks for core services""" services = [ "steganography_detection_service", "critical_ml_model_service", "api_gateway", "database_primary", "database_replica", "redis_cache", "object_storage", "monitoring_system" ] for service in services: self.health_checker.check_service_health(service) def _setup_dashboards(self): """Setup monitoring dashboards""" dashboards = [ MonitoringDashboard( dashboard_id="system_overview", name="System Overview", metrics=["cpu_utilization", "memory_utilization", "error_rate", "requests_per_second"], alerts=["cpu_utilization", "memory_utilization", "error_rate", "requests_per_second"], refresh_interval_seconds=30 ), MonitoringDashboard( dashboard_id="steganography_performance", name="Steganography Detection Performance", metrics=["steganography_detection_duration", "detection_accuracy", "detections_per_second"], alerts=["steganography_detection_duration"], refresh_interval_seconds=15 ), MonitoringDashboard( dashboard_id="infrastructure_health", name="Infrastructure Health", metrics=["disk_usage", "network_throughput", "database_connections"], alerts=["disk_usage"], refresh_interval_seconds=60 ) ] self.dashboards = dashboards def simulate_monitoring_cycle(self, duration_minutes: int = 5) -> Dict[str, Any]: """Simulate monitoring cycle""" print(f"šŸ“Š Running Monitoring Simulation for {duration_minutes} minutes...") start_time = datetime.datetime.now() cycle_count = 0 while (datetime.datetime.now() - start_time).total_seconds() < (duration_minutes * 60): cycle_count += 1 cycle_start = datetime.datetime.now() print(f"šŸ”„ Monitoring Cycle {cycle_count}") # Generate mock metrics self._generate_mock_metrics() # Evaluate alerts self.alert_manager.evaluate_alerts(self.metrics_collector.metrics) # Check service health for service_name in self.health_checker.health_checks.keys(): self.health_checker.check_service_health(service_name) # Sleep for 10 seconds between cycles cycle_duration = (datetime.datetime.now() - cycle_start).total_seconds() if cycle_duration < 10: remaining = 10 - cycle_duration # Simulate waiting without actual sleep wait_end = datetime.datetime.now() + datetime.timedelta(seconds=remaining) while datetime.datetime.now() < wait_end: pass # Generate monitoring report report = self._generate_monitoring_report(cycle_count) return report def _generate_mock_metrics(self): """Generate mock monitoring metrics""" # Steganography detection metrics detection_duration = 0.1 + ((hash(str(datetime.datetime.now().microsecond)) % 100) / 100) self.metrics_collector.record_timer("steganography_detection_duration", detection_duration) detection_accuracy = 0.85 + ((hash(str(datetime.datetime.now().microsecond)) % 20) / 100) self.metrics_collector.set_gauge("detection_accuracy", detection_accuracy) # Infrastructure metrics cpu_usage = 30 + ((hash(str(datetime.datetime.now().microsecond)) % 60)) self.metrics_collector.set_gauge("cpu_utilization", cpu_usage) memory_usage = 40 + ((hash(str(datetime.datetime.now().microsecond + 1000)) % 50)) self.metrics_collector.set_gauge("memory_utilization", memory_usage) # API metrics requests_per_second = 50 + ((hash(str(datetime.datetime.now().microsecond + 2000)) % 200)) self.metrics_collector.set_gauge("requests_per_second", requests_per_second) error_rate = max(0, ((hash(str(datetime.datetime.now().microsecond + 3000)) % 20)) - 5) self.metrics_collector.set_gauge("error_rate", error_rate) # Increment counters self.metrics_collector.increment_counter("total_requests", {"endpoint": "/detect"}) self.metrics_collector.increment_counter("total_detections", {"service": "steganography"}) def _generate_monitoring_report(self, cycles: int) -> Dict[str, Any]: """Generate comprehensive monitoring report""" metrics_summary = self.metrics_collector.get_metrics_summary() system_health = self.health_checker.get_system_health() report = { "monitoring_summary": { "simulation_cycles": cycles, "total_metrics_collected": len(self.metrics_collector.metrics), "active_alerts": len(self.alert_manager.active_alerts), "alert_history_size": len(self.alert_manager.alert_history), "monitored_services": len(self.health_checker.health_checks), "generated_at": datetime.datetime.now().isoformat() }, "system_health": system_health, "metrics_summary": metrics_summary, "active_alerts": [], "service_health_details": [] } # Active alerts details for alert in self.alert_manager.active_alerts: alert_detail = { "alert_id": alert.alert_id, "metric_name": alert.metric_name, "severity": alert.severity.value, "message": alert.message, "current_value": alert.current_value, "threshold": alert.threshold, "service": alert.service, "timestamp": alert.timestamp.isoformat() } report["active_alerts"].append(alert_detail) # Service health details for service_name, health_check in self.health_checker.health_checks.items(): service_detail = { "service_name": service_name, "status": health_check.status.value, "response_time": health_check.response_time, "last_check": health_check.last_check.isoformat(), "error_message": health_check.error_message } report["service_health_details"].append(service_detail) return report def main(): """Main monitoring system runner""" monitoring = MonitoringSystem() # Setup monitoring monitoring.setup_monitoring() # Run monitoring simulation report = monitoring.simulate_monitoring_cycle(duration_minutes=2) # Print summary print(f"\nšŸ“Š Monitoring Summary:") print(f"Metrics Collected: {report['monitoring_summary']['total_metrics_collected']}") print(f"Active Alerts: {report['monitoring_summary']['active_alerts']}") print(f"System Health: {report['system_health']['overall_status']}") print(f"Health Percentage: {report['system_health']['health_percentage']:.1f}%") return report if __name__ == "__main__": main()