""" Advanced Abort Procedures System with Real-time Decision Logic """ import json import math import datetime import dataclasses import collections from typing import Dict, List, Optional, Any, Tuple @dataclasses.dataclass class AbortCondition: name: str description: str threshold: float current_value: float severity: str time_window: int # seconds consecutive_readings: int auto_abort_enabled: bool @dataclasses.dataclass class AbortDecision: timestamp: datetime.datetime abort_required: bool abort_mode: Optional[str] primary_trigger: Optional[str] all_conditions: List[AbortCondition] time_to_abort: int # seconds remaining confidence: float # 0.0 to 1.0 class AbortSystemController: """Intelligent abort system with multi-factor decision making""" def __init__(self): self.abort_modes = { "LAUNCH_ESCAPE": { "name": "Launch Escape System", "time_window": 180, # 0-3 minutes after launch "description": "Activate launch escape tower, separate capsule", "success_probability": 0.95, "crew_survival_probability": 0.98, "recovery_time": "2-6 hours", "actions": [ "Confirm abort command authority", "Activate LES motors", "Separate capsule from vehicle", "Deploy drogue chutes", "Deploy main chutes", "Activate beacon systems" ] }, "ORBITAL_ABORT": { "name": "Orbital Abort/Deorbit", "time_window": None, # Any time after orbital insertion "description": "Perform deorbit burn and emergency reentry", "success_probability": 0.85, "crew_survival_probability": 0.92, "recovery_time": "4-12 hours", "actions": [ "Verify deorbit trajectory", "Configure reentry attitude", "Execute deorbit burn", "Monitor reentry profile", "Deploy landing systems", "Initiate recovery protocols" ] }, "CONTINGENCY_ABORT": { "name": "Contingency/Safe Mode", "time_window": None, "description": "Enter safe mode, minimal operations", "success_probability": 0.70, "crew_survival_probability": 0.95, "recovery_time": "12-48 hours", "actions": [ "Shut down non-essential systems", "Configure safe mode attitude", "Activate backup systems", "Establish emergency communications", "Await ground control instructions" ] }, "IMMEDIATE_ABORT": { "name": "Immediate Emergency Abort", "time_window": None, "description": "Catastrophic failure - immediate response", "success_probability": 0.60, "crew_survival_probability": 0.75, "recovery_time": "Unknown", "actions": [ "Execute emergency separation", "Activate all survival systems", "Broadcast emergency beacon", "Prepare for worst-case scenario" ] } } self.abort_triggers = { "TRAJECTORY_DEVIATION": { "description": "Significant deviation from planned trajectory", "threshold_type": "position_error", "threshold_value": 500.0, # meters "severity": "CRITICAL", "abort_mode": "IMMEDIATE_ABORT", "auto_abort": True, "verification_required": False }, "VELOCITY_ERROR": { "description": "Critical velocity deviation", "threshold_type": "velocity_error", "threshold_value": 10.0, # m/s "severity": "CRITICAL", "abort_mode": "ORBITAL_ABORT", "auto_abort": False, "verification_required": True }, "MULTIPLE_ENGINE_FAILURE": { "description": "Failure of multiple engines", "threshold_type": "engine_count", "threshold_value": 2, # number of engines "severity": "CRITICAL", "abort_mode": "IMMEDIATE_ABORT", "auto_abort": True, "verification_required": False }, "STRUCTURAL_INTEGRITY": { "description": "Structural integrity breach detected", "threshold_type": "pressure_leak", "threshold_value": 0.1, # pressure loss rate "severity": "CRITICAL", "abort_mode": "IMMEDIATE_ABORT", "auto_abort": True, "verification_required": False }, "POWER_SYSTEM_FAILURE": { "description": "Critical power system failure", "threshold_type": "power_level", "threshold_value": 20.0, # percentage "severity": "CRITICAL", "abort_mode": "CONTINGENCY_ABORT", "auto_abort": False, "verification_required": True }, "COMMUNICATION_LOSS": { "description": "Extended communication loss", "threshold_type": "time_without_comms", "threshold_value": 300, # seconds "severity": "WARNING", "abort_mode": "CONTINGENCY_ABORT", "auto_abort": False, "verification_required": True }, "LIFE_SUPPORT_FAILURE": { "description": "Life support system failure", "threshold_type": "system_health", "threshold_value": 50.0, # percentage "severity": "CRITICAL", "abort_mode": "IMMEDIATE_ABORT", "auto_abort": True, "verification_required": False }, "THERMAL_CONTROL_FAILURE": { "description": "Critical thermal control failure", "threshold_type": "temperature_deviation", "threshold_value": 30.0, # degrees from nominal "severity": "WARNING", "abort_mode": "CONTINGENCY_ABORT", "auto_abort": False, "verification_required": True } } self.active_conditions = [] self.abort_history = [] self.system_status = {} self.launch_time = None self.abort_authority_required = True self.abort_countdown = {} def set_launch_time(self, launch_timestamp: datetime.datetime) -> None: """Set the launch time for time-based abort decisions""" self.launch_time = launch_timestamp def update_system_status(self, system_name: str, status_data: Dict[str, Any]) -> None: """Update system status data for abort evaluation""" self.system_status[system_name] = { **status_data, "last_updated": datetime.datetime.now().isoformat() } def evaluate_abort_conditions(self, current_trajectory: Dict[str, Any], system_health: Dict[str, float]) -> AbortDecision: """Evaluate all abort conditions and make abort decision""" current_time = datetime.datetime.now() triggered_conditions = [] # Evaluate each abort trigger for trigger_name, trigger_config in self.abort_triggers.items(): condition = self._evaluate_single_trigger(trigger_name, trigger_config, current_trajectory, system_health) if condition: triggered_conditions.append(condition) # Determine if abort is required abort_required = len(triggered_conditions) > 0 if abort_required: # Select appropriate abort mode abort_mode = self._select_abort_mode(triggered_conditions) # Calculate confidence based on number and severity of conditions confidence = self._calculate_abort_confidence(triggered_conditions) # Calculate time to abort time_to_abort = self._calculate_time_to_abort(triggered_conditions, current_time) # Create abort decision decision = AbortDecision( timestamp=current_time, abort_required=True, abort_mode=abort_mode, primary_trigger=max(triggered_conditions, key=lambda x: x.severity).name, all_conditions=triggered_conditions, time_to_abort=time_to_abort, confidence=confidence ) else: decision = AbortDecision( timestamp=current_time, abort_required=False, abort_mode=None, primary_trigger=None, all_conditions=[], time_to_abort=0, confidence=0.0 ) self.abort_history.append(decision) return decision def _evaluate_single_trigger(self, trigger_name: str, trigger_config: Dict[str, Any], current_trajectory: Dict[str, Any], system_health: Dict[str, float]) -> Optional[AbortCondition]: """Evaluate a single abort trigger""" threshold_type = trigger_config["threshold_type"] threshold_value = trigger_config["threshold_value"] current_value = None # Get current value based on threshold type if threshold_type == "position_error": current_value = current_trajectory.get("position_error", 0) elif threshold_type == "velocity_error": current_value = current_trajectory.get("velocity_error", 0) elif threshold_type == "engine_count": current_value = current_trajectory.get("failed_engines", 0) elif threshold_type == "pressure_leak": current_value = current_trajectory.get("pressure_loss_rate", 0) elif threshold_type == "power_level": current_value = system_health.get("power_system", 100) elif threshold_type == "time_without_comms": if "communications" in self.system_status: last_comm = datetime.datetime.fromisoformat( self.system_status["communications"].get("last_contact", "") ) current_value = (datetime.datetime.now() - last_comm).total_seconds() elif threshold_type == "system_health": # Check worst-case system health current_value = min(system_health.values()) if system_health else 100 elif threshold_type == "temperature_deviation": if "thermal_control" in self.system_status: current_value = self.system_status["thermal_control"].get("max_deviation", 0) if current_value is None: return None # Check if threshold is exceeded threshold_exceeded = False if threshold_type in ["position_error", "velocity_error", "engine_count", "pressure_leak", "time_without_comms", "temperature_deviation"]: threshold_exceeded = current_value >= threshold_value elif threshold_type in ["power_level", "system_health"]: threshold_exceeded = current_value <= threshold_value if threshold_exceeded: return AbortCondition( name=trigger_name, description=trigger_config["description"], threshold=threshold_value, current_value=current_value, severity=trigger_config["severity"], time_window=0, # Immediate for now consecutive_readings=1, auto_abort_enabled=trigger_config["auto_abort"] ) return None def _select_abort_mode(self, conditions: List[AbortCondition]) -> str: """Select appropriate abort mode based on conditions""" # Check for immediate abort conditions first immediate_triggers = [c for c in conditions if c.severity == "CRITICAL" and c.auto_abort_enabled] if immediate_triggers: return "IMMEDIATE_ABORT" # Check for launch window (first 3 minutes) if self.launch_time: time_since_launch = (datetime.datetime.now() - self.launch_time).total_seconds() if time_since_launch <= 180: return "LAUNCH_ESCAPE" # Check for orbital conditions critical_conditions = [c for c in conditions if c.severity == "CRITICAL"] if critical_conditions: return "ORBITAL_ABORT" # Default to contingency abort return "CONTINGENCY_ABORT" def _calculate_abort_confidence(self, conditions: List[AbortCondition]) -> float: """Calculate confidence in abort decision (0.0 to 1.0)""" if not conditions: return 0.0 # Base confidence on severity and number of conditions severity_weights = {"WARNING": 0.3, "CRITICAL": 0.8} total_weight = 0 for condition in conditions: weight = severity_weights.get(condition.severity, 0.5) # Add bonus for conditions that exceed thresholds significantly excess_ratio = condition.current_value / condition.threshold if condition.threshold > 0 else 1 if excess_ratio > 1.5: weight += 0.2 total_weight += weight # Normalize to 0-1 range confidence = min(1.0, total_weight / len(conditions)) return confidence def _calculate_time_to_abort(self, conditions: List[AbortCondition], current_time: datetime.datetime) -> int: """Calculate time remaining before abort must occur""" if not conditions: return 0 # For immediate abort triggers, time is critical critical_conditions = [c for c in conditions if c.severity == "CRITICAL" and c.auto_abort_enabled] if critical_conditions: return 30 # 30 seconds to execute immediate abort # For less critical conditions, allow more time warning_conditions = [c for c in conditions if c.severity == "WARNING"] if warning_conditions: return 300 # 5 minutes for contingency abort return 120 # 2 minutes default def execute_abort_sequence(self, abort_mode: str) -> Dict[str, Any]: """Execute the abort sequence for the specified mode""" if abort_mode not in self.abort_modes: return {"error": f"Unknown abort mode: {abort_mode}"} mode_config = self.abort_modes[abort_mode] abort_execution = { "abort_mode": abort_mode, "timestamp": datetime.datetime.now().isoformat(), "success_probability": mode_config["success_probability"], "crew_survival_probability": mode_config["crew_survival_probability"], "actions_executed": [], "status": "INITIATED" } # Simulate executing abort actions for i, action in enumerate(mode_config["actions"]): action_result = { "step": i + 1, "action": action, "timestamp": (datetime.datetime.now() + datetime.timedelta(seconds=i*5)).isoformat(), "status": "COMPLETED" } abort_execution["actions_executed"].append(action_result) abort_execution["status"] = "COMPLETED" return abort_execution def get_abort_readiness_status(self) -> Dict[str, Any]: """Get current abort system readiness status""" return { "timestamp": datetime.datetime.now().isoformat(), "abort_system_status": "READY", "available_abort_modes": list(self.abort_modes.keys()), "active_abort_conditions": len(self.active_conditions), "abort_history_count": len(self.abort_history), "abort_authority_required": self.abort_authority_required, "last_system_update": max([s.get("last_updated", "") for s in self.system_status.values()], default="Never"), "system_health": { system: health for system, health in self.system_status.items() if isinstance(health, (int, float)) } } # Test the abort system def test_abort_system(): """Test the advanced abort system""" abort_system = AbortSystemController() abort_system.set_launch_time(datetime.datetime.now() - datetime.timedelta(minutes=2)) # Sample trajectory data with issues sample_trajectory = { "position_error": 600.0, # Exceeds threshold "velocity_error": 5.0, "failed_engines": 1 } # Sample system health sample_health = { "propulsion": 70.0, "power_system": 85.0, "life_support": 95.0 } # Update system status abort_system.update_system_status("communications", { "last_contact": (datetime.datetime.now() - datetime.timedelta(minutes=2)).isoformat() }) # Evaluate abort conditions decision = abort_system.evaluate_abort_conditions(sample_trajectory, sample_health) print("Abort Decision:") print(f"Abort Required: {decision.abort_required}") print(f"Abort Mode: {decision.abort_mode}") print(f"Confidence: {decision.confidence:.2f}") print(f"Time to Abort: {decision.time_to_abort} seconds") if decision.abort_required and decision.abort_mode: execution = abort_system.execute_abort_sequence(decision.abort_mode) print(f"\nExecuting {execution['abort_mode']} sequence...") print(f"Actions completed: {len(execution['actions_executed'])}") return True if __name__ == "__main__": test_abort_system()