#!/usr/bin/env python3 """ Post-Launch Monitoring System Implementation Real-time telemetry processing and automated alerting """ import json import datetime import math from typing import Dict, List, Optional, Tuple class PostLaunchMonitor: def __init__(self, config_file: str = "monitor_config.json"): self.config = self.load_config(config_file) self.start_time = datetime.datetime.now() self.alert_log = [] self.system_status = "NOMINAL" def load_config(self, config_file: str) -> Dict: """Load monitoring configuration""" try: with open(config_file, 'r') as f: return json.load(f) except FileNotFoundError: return self.default_config() def default_config(self) -> Dict: return { "monitoring_interval": 1.0, "alert_thresholds": { "critical": 90, "warning": 75 }, "log_retention_days": 30, "trajectory_tolerance": { "position": 100, # meters "velocity": 1.0, # percentage "attitude": 0.1 # degrees } } def process_telemetry(self, data: Dict) -> Dict: """Process incoming telemetry data""" timestamp = datetime.datetime.now().isoformat() # Calculate derived metrics if 'velocity' in data and 'altitude' in data: mass = data.get('mass', 1000) data['kinetic_energy'] = 0.5 * mass * data['velocity']**2 data['potential_energy'] = mass * 9.81 * data['altitude'] # Check for anomalies anomalies = self.detect_anomalies(data) # Update system status if anomalies: self.system_status = "ANOMALY_DETECTED" else: self.system_status = "NOMINAL" return { "timestamp": timestamp, "telemetry": data, "anomalies": anomalies, "system_status": self.system_status, "mission_elapsed_time": (datetime.datetime.now() - self.start_time).total_seconds() } def detect_anomalies(self, data: Dict) -> List[str]: """Detect system anomalies""" anomalies = [] # Trajectory verification if 'position_deviation' in data: if data['position_deviation'] > self.config['trajectory_tolerance']['position']: anomalies.append("POSITION_DEVIATION_EXCEEDED") if 'velocity_deviation' in data: if data['velocity_deviation'] > self.config['trajectory_tolerance']['velocity']: anomalies.append("VELOCITY_DEVIATION_EXCEEDED") if 'attitude_error' in data: if data['attitude_error'] > self.config['trajectory_tolerance']['attitude']: anomalies.append("ATTITUDE_ERROR_EXCEEDED") # System health checks if data.get('velocity', 0) < 0: anomalies.append("NEGATIVE_VELOCITY") if data.get('altitude', 0) < 0: anomalies.append("NEGATIVE_ALTITUDE") # Temperature monitoring temp = data.get('temperature', 20) if temp < -40 or temp > 85: anomalies.append("TEMPERATURE_OUT_OF_RANGE") # Power systems battery = data.get('battery_voltage', 3.7) if battery < 3.0: anomalies.append("BATTERY_VOLTAGE_CRITICAL") elif battery < 3.2: anomalies.append("BATTERY_VOLTAGE_LOW") # Communication signal = data.get('signal_strength', 0) if signal < 20: anomalies.append("COMMUNICATION_SIGNAL_WEAK") return anomalies def check_abort_conditions(self, data: Dict) -> Tuple[str, Optional[str]]: """Check if abort conditions are met""" abort_triggers = { "IMMEDIATE_ABORT": [ "explosion_detected", "catastrophic_structural_failure", "loss_of_vehicle_control" ], "CONTROLLED_ABORT": [ "engine_failure", "POSITION_DEVIATION_EXCEEDED", "VELOCITY_DEVIATION_EXCEEDED", "BATTERY_VOLTAGE_CRITICAL" ] } anomalies = self.detect_anomalies(data) for level, triggers in abort_triggers.items(): if any(trigger in anomalies for trigger in triggers): return level, self.get_abort_procedure(level) return "CONTINUE_MISSION", None def get_abort_procedure(self, level: str) -> str: """Get abort procedure for given level""" procedures = { "IMMEDIATE_ABORT": "execute_emergency_shutdown activate_recovery_beacons", "CONTROLLED_ABORT": "initiate_safe_mode engage_backup_systems notify_ground_control" } return procedures.get(level, "") def generate_status_report(self) -> Dict: """Generate comprehensive status report""" return { "report_timestamp": datetime.datetime.now().isoformat(), "mission_duration": str(datetime.datetime.now() - self.start_time), "system_status": self.system_status, "total_anomalies": len(self.alert_log), "recent_anomalies": self.alert_log[-10:], "health_score": self.calculate_health_score() } def calculate_health_score(self) -> float: """Calculate overall system health score (0-100)""" recent_anomalies = [ a for a in self.alert_log if (datetime.datetime.now() - datetime.datetime.fromisoformat(a['timestamp'])).seconds < 3600 ] base_score = 100.0 anomaly_penalty = len(recent_anomalies) * 5.0 return max(0.0, base_score - anomaly_penalty) # Example usage and testing if __name__ == "__main__": monitor = PostLaunchMonitor() # Test scenarios test_cases = [ { "name": "Nominal Flight", "data": { "altitude": 150000, "velocity": 7800, "temperature": 25, "battery_voltage": 3.8, "signal_strength": 85, "position_deviation": 50, "velocity_deviation": 0.5, "attitude_error": 0.05 } }, { "name": "Critical Battery", "data": { "altitude": 100000, "velocity": 7500, "temperature": 30, "battery_voltage": 2.8, "signal_strength": 70, "position_deviation": 80, "velocity_deviation": 0.8, "attitude_error": 0.08 } }, { "name": "Trajectory Deviation", "data": { "altitude": 120000, "velocity": 7000, "temperature": 35, "battery_voltage": 3.6, "signal_strength": 60, "position_deviation": 150, "velocity_deviation": 2.0, "attitude_error": 0.15 } } ] print("=== POST-LAUNCH MONITORING TEST ===\n") for test in test_cases: print(f"Test Case: {test['name']}") result = monitor.process_telemetry(test['data']) abort_level, abort_proc = monitor.check_abort_conditions(test['data']) print(f" System Status: {result['system_status']}") print(f" Anomalies: {result['anomalies']}") print(f" Abort Level: {abort_level}") if abort_proc: print(f" Abort Procedure: {abort_proc}") print(f" Health Score: {monitor.calculate_health_score():.1f}") print() # Generate final report report = monitor.generate_status_report() print("=== FINAL STATUS REPORT ===") print(json.dumps(report, indent=2))