import json import math import datetime from typing import Dict, List, Optional, Any, Union from collections import deque class SystemMonitor: """ Real-time system status monitoring with alerting capabilities. """ def __init__(self, mission_id: str): self.mission_id = mission_id self.monitoring_active = True self.alert_history = deque(maxlen=1000) self.system_status = { "overall": "NOMINAL", "subsystems": {}, "last_update": datetime.datetime.now().isoformat() } self.thresholds = self.initialize_thresholds() self.alert_levels = ["GREEN", "YELLOW", "RED"] def initialize_thresholds(self) -> Dict[str, Dict[str, Any]]: """Initialize monitoring thresholds for all subsystems.""" return { "power": { "voltage": {"normal": (28.0, 32.0), "warning": (26.0, 34.0), "critical": (24.0, 36.0)}, "current": {"normal": (5.0, 45.0), "warning": (3.0, 50.0), "critical": (2.0, 55.0)}, "battery_level": {"normal": (30, 100), "warning": (15, 30), "critical": (0, 15)}, "solar_array_efficiency": {"normal": (85, 100), "warning": (70, 85), "critical": (50, 70)} }, "thermal": { "component_temp": {"normal": (-40, 85), "warning": (-55, 95), "critical": (-70, 105)}, "radiator_temp": {"normal": (-60, 50), "warning": (-70, 60), "critical": (-80, 70)}, "heater_status": {"normal": "OFF", "warning": "INTERMITTENT", "critical": "FAIL"} }, "communication": { "signal_strength": {"normal": (-60, -30), "warning": (-75, -60), "critical": (-85, -75)}, "data_rate": {"normal": (10000, 1000000), "warning": (5000, 10000), "critical": (0, 5000)}, "link_quality": {"normal": (90, 100), "warning": (70, 90), "critical": (0, 70)}, "packet_loss": {"normal": (0, 1), "warning": (1, 5), "critical": (5, 100)} }, "propulsion": { "fuel_pressure": {"normal": (200, 400), "warning": (150, 450), "critical": (100, 500)}, "thruster_status": {"normal": "READY", "warning": "STANDBY", "critical": "FAIL"}, "propellant_level": {"normal": (20, 100), "warning": (10, 20), "critical": (0, 10)}, "valve_position": {"normal": "CLOSED", "warning": "PARTIAL", "critical": "STUCK"} }, "attitude_control": { "angular_rate": {"normal": (0, 0.1), "warning": (0.1, 0.5), "critical": (0.5, 10.0)}, "pointing_accuracy": {"normal": (0, 0.1), "warning": (0.1, 0.5), "critical": (0.5, 5.0)}, "reaction_wheel_speed": {"normal": (0, 3000), "warning": (3000, 4000), "critical": (4000, 6000)}, "magnetometer_health": {"normal": "OK", "warning": "DEGRADED", "critical": "FAIL"} }, "payload": { "instrument_temp": {"normal": (15, 25), "warning": (10, 30), "critical": (0, 40)}, "data_storage": {"normal": (20, 100), "warning": (10, 20), "critical": (0, 10)}, "instrument_health": {"normal": "OPERATIONAL", "warning": "DEGRADED", "critical": "FAIL"}, "calibration_status": {"normal": "VALID", "warning": "EXPIRED", "critical": "FAIL"} } } def evaluate_parameter(self, subsystem: str, parameter: str, value: Union[float, str]) -> Dict[str, Any]: """Evaluate a single parameter against thresholds.""" if subsystem not in self.thresholds or parameter not in self.thresholds[subsystem]: return {"status": "UNKNOWN", "level": "GREEN", "message": "Parameter not configured"} thresholds = self.thresholds[subsystem][parameter] if isinstance(value, (int, float)): normal_range = thresholds["normal"] warning_range = thresholds["warning"] critical_range = thresholds["critical"] if isinstance(normal_range, tuple): if normal_range[0] <= value <= normal_range[1]: status = "NOMINAL" level = "GREEN" elif isinstance(warning_range, tuple) and warning_range[0] <= value <= warning_range[1]: status = "WARNING" level = "YELLOW" elif isinstance(critical_range, tuple) and critical_range[0] <= value <= critical_range[1]: status = "CRITICAL" level = "RED" else: status = "OUT_OF_RANGE" level = "RED" else: status = "NOMINAL" level = "GREEN" else: # String values if value == thresholds["normal"]: status = "NOMINAL" level = "GREEN" elif value == thresholds["warning"]: status = "WARNING" level = "YELLOW" elif value == thresholds["critical"]: status = "CRITICAL" level = "RED" else: status = "UNKNOWN" level = "YELLOW" return { "status": status, "level": level, "value": value, "thresholds": thresholds, "timestamp": datetime.datetime.now().isoformat() } def monitor_telemetry(self, telemetry_data: Dict[str, Dict[str, Union[float, str]]]) -> Dict[str, Any]: """Monitor all telemetry data and generate alerts.""" monitoring_results = { "timestamp": datetime.datetime.now().isoformat(), "subsystem_status": {}, "alerts": [], "overall_status": "NOMINAL", "alert_level": "GREEN" } critical_alerts = 0 warning_alerts = 0 for subsystem, parameters in telemetry_data.items(): subsystem_results = { "status": "NOMINAL", "parameters": {}, "alerts": [] } for parameter, value in parameters.items(): evaluation = self.evaluate_parameter(subsystem, parameter, value) subsystem_results["parameters"][parameter] = evaluation if evaluation["level"] == "RED": critical_alerts += 1 alert = { "timestamp": evaluation["timestamp"], "subsystem": subsystem, "parameter": parameter, "level": "CRITICAL", "value": value, "message": f"{subsystem.upper()} {parameter} CRITICAL: {value}" } subsystem_results["alerts"].append(alert) monitoring_results["alerts"].append(alert) elif evaluation["level"] == "YELLOW": warning_alerts += 1 alert = { "timestamp": evaluation["timestamp"], "subsystem": subsystem, "parameter": parameter, "level": "WARNING", "value": value, "message": f"{subsystem.upper()} {parameter} WARNING: {value}" } subsystem_results["alerts"].append(alert) monitoring_results["alerts"].append(alert) # Determine subsystem status if critical_alerts > 0: subsystem_results["status"] = "CRITICAL" elif warning_alerts > 0: subsystem_results["status"] = "WARNING" monitoring_results["subsystem_status"][subsystem] = subsystem_results # Determine overall status if critical_alerts > 0: monitoring_results["overall_status"] = "CRITICAL" monitoring_results["alert_level"] = "RED" elif warning_alerts > 0: monitoring_results["overall_status"] = "WARNING" monitoring_results["alert_level"] = "YELLOW" # Store alerts in history for alert in monitoring_results["alerts"]: self.alert_history.append(alert) self.system_status = { "overall": monitoring_results["overall_status"], "subsystems": monitoring_results["subsystem_status"], "last_update": monitoring_results["timestamp"] } return monitoring_results def generate_alert_summary(self, time_window_hours: int = 24) -> Dict[str, Any]: """Generate summary of alerts within time window.""" cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=time_window_hours) recent_alerts = [ alert for alert in self.alert_history if datetime.datetime.fromisoformat(alert["timestamp"]) > cutoff_time ] critical_count = len([a for a in recent_alerts if a["level"] == "CRITICAL"]) warning_count = len([a for a in recent_alerts if a["level"] == "WARNING"]) subsystem_alert_counts = {} for alert in recent_alerts: subsystem = alert["subsystem"] subsystem_alert_counts[subsystem] = subsystem_alert_counts.get(subsystem, 0) + 1 return { "time_window_hours": time_window_hours, "total_alerts": len(recent_alerts), "critical_alerts": critical_count, "warning_alerts": warning_count, "alert_rate": len(recent_alerts) / time_window_hours, "most_problematic_subsystem": max(subsystem_alert_counts.items(), key=lambda x: x[1])[0] if subsystem_alert_counts else None, "alert_trend": self.calculate_alert_trend(recent_alerts) } def calculate_alert_trend(self, alerts: List[Dict[str, Any]]) -> str: """Calculate alert trend (increasing, decreasing, stable).""" if len(alerts) < 2: return "INSUFFICIENT_DATA" # Compare alerts in first half vs second half midpoint = len(alerts) // 2 first_half = len(alerts[:midpoint]) second_half = len(alerts[midpoint:]) if second_half > first_half * 1.2: return "INCREASING" elif second_half < first_half * 0.8: return "DECREASING" else: return "STABLE" def recommend_actions(self, monitoring_results: Dict[str, Any]) -> List[str]: """Generate recommended actions based on monitoring results.""" recommendations = [] if monitoring_results["alert_level"] == "RED": recommendations.extend([ "IMMEDIATE: Evaluate abort conditions", "Execute emergency protocols for critical subsystems", "Notify mission control of critical status" ]) elif monitoring_results["alert_level"] == "YELLOW": recommendations.extend([ "Increase monitoring frequency to 30-second intervals", "Prepare contingency procedures for affected subsystems", "Verify redundant system functionality" ]) # Subsystem-specific recommendations for subsystem, status in monitoring_results["subsystem_status"].items(): if status["status"] == "CRITICAL": recommendations.append(f"CRITICAL: {subsystem.upper()} system failure detected") elif status["status"] == "WARNING": recommendations.append(f"Monitor {subsystem.upper()} system closely") return recommendations def export_monitoring_data(self) -> Dict[str, Any]: """Export all monitoring data for analysis.""" return { "mission_id": self.mission_id, "export_timestamp": datetime.datetime.now().isoformat(), "current_status": self.system_status, "alert_history": list(self.alert_history), "thresholds": self.thresholds, "alert_summary": self.generate_alert_summary() } def demonstrate_system_monitoring(): """Demonstrate system monitoring capabilities.""" monitor = SystemMonitor("DEMO-MISSION-001") # Sample telemetry data sample_telemetry = { "power": { "voltage": 29.5, "current": 25.3, "battery_level": 85, "solar_array_efficiency": 92 }, "thermal": { "component_temp": 35.2, "radiator_temp": -15.8, "heater_status": "OFF" }, "communication": { "signal_strength": -45, "data_rate": 50000, "link_quality": 95, "packet_loss": 0.2 }, "propulsion": { "fuel_pressure": 350, "thruster_status": "READY", "propellant_level": 88, "valve_position": "CLOSED" } } # Add some anomalies for demonstration sample_telemetry["power"]["battery_level"] = 8 # Critical level sample_telemetry["thermal"]["component_temp"] = 98 # Critical temperature # Monitor telemetry results = monitor.monitor_telemetry(sample_telemetry) # Generate recommendations recommendations = monitor.recommend_actions(results) # Generate alert summary alert_summary = monitor.generate_alert_summary(1) # Last hour return { "monitoring_results": results, "recommendations": recommendations, "alert_summary": alert_summary, "export_data": monitor.export_monitoring_data() } if __name__ == "__main__": demo = demonstrate_system_monitoring() print("System Monitoring Demonstration") print(f"Overall Status: {demo['monitoring_results']['overall_status']}") print(f"Alert Level: {demo['monitoring_results']['alert_level']}") print(f"Total Alerts: {len(demo['monitoring_results']['alerts'])}") print(f"Recommendations: {len(demo['recommendations'])}")