import json import math import datetime from typing import Dict, List, Optional, Any, Tuple class TrajectoryValidator: """ Advanced trajectory verification system for post-launch monitoring. """ def __init__(self): self.orbital_parameters = {} self.trajectory_history = [] self.validation_log = [] def calculate_orbital_elements(self, position: List[float], velocity: List[float]) -> Dict[str, float]: """Calculate orbital elements from position and velocity.""" # Simplified orbital mechanics calculations r = math.sqrt(sum(p**2 for p in position)) v = math.sqrt(sum(v**2 for v in velocity)) # Standard gravitational parameter for Earth (km^3/s^2) mu = 398600.4418 # Specific orbital energy energy = (v**2 / 2) - (mu / r) # Semi-major axis a = -mu / (2 * energy) if energy != 0 else float('inf') # Eccentricity (simplified) h = self.calculate_angular_momentum(position, velocity) e = math.sqrt(1 + (2 * energy * h**2) / (mu**2)) if a > 0 else 1.0 # Period (for elliptical orbits) period = 2 * math.pi * math.sqrt(a**3 / mu) if a > 0 else 0 return { "semi_major_axis": a, "eccentricity": e, "period": period, "altitude": r - 6371.0, # Earth radius = 6371 km "velocity": v, "specific_energy": energy } def calculate_angular_momentum(self, position: List[float], velocity: List[float]) -> float: """Calculate specific angular momentum magnitude.""" # Cross product of position and velocity h_x = position[1] * velocity[2] - position[2] * velocity[1] h_y = position[2] * velocity[0] - position[0] * velocity[2] h_z = position[0] * velocity[1] - position[1] * velocity[0] return math.sqrt(h_x**2 + h_y**2 + h_z**2) def predict_trajectory(self, current_state: Dict[str, Any], time_ahead: float) -> Dict[str, Any]: """Predict trajectory position after time_ahead seconds.""" position = current_state["position"] velocity = current_state["velocity"] # Simplified prediction using orbital mechanics elements = self.calculate_orbital_elements(position, velocity) # Mean motion n = 2 * math.pi / elements["period"] if elements["period"] > 0 else 0 # Mean anomaly at future time M_future = n * time_ahead # True anomaly (simplified Kepler equation solution) E = M_future # Eccentric anomaly approximation nu = 2 * math.atan(math.sqrt((1 + elements["eccentricity"]) / (1 - elements["eccentricity"])) * math.tan(E / 2)) # Future position (simplified 2D projection) r = elements["semi_major_axis"] * (1 - elements["eccentricity"]**2) / \ (1 + elements["eccentricity"] * math.cos(nu)) predicted_position = [ r * math.cos(nu), r * math.sin(nu), position[2] # Assume minimal z-component change ] return { "time_ahead": time_ahead, "predicted_position": predicted_position, "orbital_elements": elements, "confidence": 0.95 # Simplified confidence score } def verify_trajectory_integrity(self, observed_positions: List[List[float]], predicted_positions: List[List[float]], tolerance: float = 0.01) -> Dict[str, Any]: """Verify trajectory integrity against predictions.""" total_deviation = 0 deviations = [] for obs, pred in zip(observed_positions, predicted_positions): deviation = math.sqrt(sum((o - p)**2 for o, p in zip(obs, pred))) deviations.append(deviation) total_deviation += deviation avg_deviation = total_deviation / len(deviations) max_deviation = max(deviations) integrity_score = max(0, 1 - (avg_deviation / tolerance)) status = "NOMINAL" if integrity_score < 0.7: status = "DEGRADED" if integrity_score < 0.5: status = "CRITICAL" return { "integrity_score": integrity_score, "average_deviation": avg_deviation, "maximum_deviation": max_deviation, "status": status, "tolerance": tolerance, "deviation_history": deviations } def detect_anomalies(self, trajectory_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Detect trajectory anomalies using statistical analysis.""" anomalies = [] if len(trajectory_data) < 3: return anomalies # Analyze position variations positions = [data["position"] for data in trajectory_data] velocities = [data["velocity"] for data in trajectory_data] # Calculate statistics pos_std_dev = self.calculate_std_deviation(positions) vel_std_dev = self.calculate_std_deviation(velocities) # Detect outliers (simplified z-score method) for i, data in enumerate(trajectory_data): pos_z_score = self.calculate_z_score(data["position"], positions, pos_std_dev) vel_z_score = self.calculate_z_score(data["velocity"], velocities, vel_std_dev) if abs(pos_z_score) > 3 or abs(vel_z_score) > 3: anomalies.append({ "timestamp": data.get("timestamp", f"T{i}"), "type": "TRAJECTORY_ANOMALY", "position_z_score": pos_z_score, "velocity_z_score": vel_z_score, "severity": "HIGH" if abs(pos_z_score) > 5 else "MEDIUM" }) return anomalies def calculate_std_deviation(self, data_points: List[List[float]]) -> List[float]: """Calculate standard deviation for each coordinate.""" if not data_points: return [0.0, 0.0, 0.0] dimensions = len(data_points[0]) std_devs = [] for dim in range(dimensions): values = [point[dim] for point in data_points] mean = sum(values) / len(values) variance = sum((x - mean)**2 for x in values) / len(values) std_devs.append(math.sqrt(variance)) return std_devs def calculate_z_score(self, point: List[float], data_points: List[List[float]], std_dev: List[float]) -> float: """Calculate z-score for anomaly detection.""" if not data_points or not any(std_dev): return 0.0 dimensions = len(point) z_scores = [] for dim in range(dimensions): values = [p[dim] for p in data_points] mean = sum(values) / len(values) if std_dev[dim] != 0: z_scores.append((point[dim] - mean) / std_dev[dim]) return max(abs(z) for z in z_scores) if z_scores else 0.0 def generate_trajectory_report(self, current_trajectory: Dict[str, Any], prediction_data: Dict[str, Any], anomaly_data: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate comprehensive trajectory verification report.""" report = { "timestamp": datetime.datetime.now().isoformat(), "verification_status": "PASS", "alert_level": "GREEN", "recommendations": [], "trajectory_data": current_trajectory, "prediction_confidence": prediction_data.get("confidence", 0), "anomalies_detected": len(anomaly_data) } # Evaluate overall status if len(anomaly_data) > 0: high_severity = [a for a in anomaly_data if a["severity"] == "HIGH"] if high_severity: report["verification_status"] = "FAIL" report["alert_level"] = "RED" report["recommendations"].append("IMMEDIATE TRAJECTORY CORRECTION REQUIRED") else: report["verification_status"] = "WARNING" report["alert_level"] = "YELLOW" report["recommendations"].append("INCREASE MONITORING FREQUENCY") if prediction_data.get("confidence", 1.0) < 0.8: report["verification_status"] = "CAUTION" report["alert_level"] = "YELLOW" report["recommendations"].append("VERIFY PREDICTION MODEL ACCURACY") return report def demonstrate_trajectory_verification(): """Demonstrate trajectory verification capabilities.""" validator = TrajectoryValidator() # Sample trajectory data sample_trajectory = { "position": [407.5, 28.6, 408.2], # km "velocity": [7.8, -0.2, 0.1], # km/s "timestamp": datetime.datetime.now().isoformat() } # Predict future trajectory prediction = validator.predict_trajectory(sample_trajectory, 300) # 5 minutes ahead # Generate some sample historical data for anomaly detection historical_data = [ {"position": [407.0, 28.5, 408.0], "velocity": [7.8, -0.2, 0.1]}, {"position": [408.0, 28.7, 408.1], "velocity": [7.79, -0.21, 0.09]}, {"position": [407.5, 28.6, 408.2], "velocity": [7.8, -0.2, 0.1]}, {"position": [450.0, 35.0, 410.0], "velocity": [8.0, -0.5, 0.3]}, # Anomaly ] # Detect anomalies anomalies = validator.detect_anomalies(historical_data) # Generate comprehensive report report = validator.generate_trajectory_report(sample_trajectory, prediction, anomalies) return { "trajectory_validation": True, "prediction": prediction, "anomalies": anomalies, "report": report } if __name__ == "__main__": result = demonstrate_trajectory_verification() print("Trajectory Verification System Operational") print(f"Alert Level: {result['report']['alert_level']}") print(f"Anomalies Detected: {result['report']['anomalies_detected']}") print(f"Verification Status: {result['report']['verification_status']}")