""" Predictive Maintenance System for Technical Equipment """ import json import math import datetime import dataclasses import hashlib from typing import Dict, List, Optional, Any, Tuple @dataclasses.dataclass class Equipment: equipment_id: str name: str equipment_type: str location: str installation_date: datetime.date manufacturer: str model: str specifications: Dict[str, Any] maintenance_interval_days: int last_maintenance: Optional[datetime.date] operating_hours: float status: str @dataclasses.dataclass class MaintenanceRecord: record_id: str equipment_id: str maintenance_type: str # "preventive", "corrective", "predictive" performed_date: datetime.datetime technician: str cost: float downtime_hours: float parts_replaced: List[str] notes: str @dataclasses.dataclass class SensorReading: reading_id: str equipment_id: str sensor_type: str # "temperature", "vibration", "pressure", "current", "noise" value: float unit: str timestamp: datetime.datetime status: str # "normal", "warning", "critical" class PredictiveMaintenanceSystem: """Predictive maintenance system with ML-like algorithms""" def __init__(self, platform): self.platform = platform self.equipment: Dict[str, Equipment] = {} self.maintenance_records: Dict[str, MaintenanceRecord] = {} self.sensor_readings: Dict[str, List[SensorReading]] = {} self.failure_patterns: Dict[str, Dict] = {} self.maintenance_schedules: Dict[str, List[Dict]] = {} def add_equipment(self, equipment_id: str, name: str, equipment_type: str, location: str, manufacturer: str, model: str, specifications: Dict[str, Any], maintenance_interval_days: int = 90) -> Dict: """Add new equipment to maintenance system""" try: equipment = Equipment( equipment_id=equipment_id, name=name, equipment_type=equipment_type, location=location, installation_date=datetime.date.today(), manufacturer=manufacturer, model=model, specifications=specifications, maintenance_interval_days=maintenance_interval_days, last_maintenance=None, operating_hours=0.0, status="operational" ) self.equipment[equipment_id] = equipment self.sensor_readings[equipment_id] = [] # Initialize maintenance schedule self._generate_maintenance_schedule(equipment_id) return { "success": True, "equipment_id": equipment_id, "name": name, "equipment_type": equipment_type, "maintenance_interval": maintenance_interval_days, "next_maintenance": ( datetime.date.today() + datetime.timedelta(days=maintenance_interval_days) ).isoformat() } except Exception as e: return {"success": False, "error": str(e)} def record_sensor_data(self, equipment_id: str, sensor_type: str, value: float, unit: str) -> Dict: """Record sensor reading and analyze for anomalies""" if equipment_id not in self.equipment: return {"error": "Equipment not found"} # Determine reading status based on thresholds status = self._analyze_sensor_value(equipment_id, sensor_type, value) reading_id = self._generate_reading_id() reading = SensorReading( reading_id=reading_id, equipment_id=equipment_id, sensor_type=sensor_type, value=value, unit=unit, timestamp=datetime.datetime.now(), status=status ) self.sensor_readings[equipment_id].append(reading) # Check for immediate alerts alerts = [] if status == "critical": alert = self._create_maintenance_alert(equipment_id, sensor_type, value, "critical") alerts.append(alert) self.platform.maintenance_alerts.append(alert) elif status == "warning": # Trend analysis for warning readings trend_alert = self._analyze_sensor_trends(equipment_id, sensor_type) if trend_alert: alerts.append(trend_alert) self.platform.maintenance_alerts.append(trend_alert) # Keep only last 1000 readings per equipment to manage memory if len(self.sensor_readings[equipment_id]) > 1000: self.sensor_readings[equipment_id] = self.sensor_readings[equipment_id][-1000:] return { "success": True, "reading_id": reading_id, "equipment_id": equipment_id, "sensor_type": sensor_type, "value": value, "status": status, "alerts_generated": len(alerts), "timestamp": reading.timestamp.isoformat() } def predict_failure_probability(self, equipment_id: str, days_ahead: int = 30) -> Dict: """Predict equipment failure probability using historical patterns""" if equipment_id not in self.equipment: return {"error": "Equipment not found"} equipment = self.equipment[equipment_id] readings = self.sensor_readings[equipment_id] if not readings: return { "equipment_id": equipment_id, "failure_probability": 0.05, # Default low probability "confidence": 0.1, "factors": ["insufficient_data"], "recommendation": "Collect more sensor data" } # Calculate factors influencing failure probability factors = [] base_probability = 0.02 # 2% base probability for well-maintained equipment # Age factor age_days = (datetime.date.today() - equipment.installation_date).days age_factor = min(0.3, age_days / 3650) # Max 30% after 10 years if age_factor > 0.1: factors.append(f"age_risk_{age_factor:.3f}") # Maintenance factor days_since_maintenance = 0 if equipment.last_maintenance: days_since_maintenance = (datetime.date.today() - equipment.last_maintenance).days maintenance_factor = min(0.4, days_since_maintenance / equipment.maintenance_interval_days) if maintenance_factor > 0.2: factors.append(f"maintenance_overdue_{maintenance_factor:.3f}") # Sensor anomaly factor anomaly_score = self._calculate_anomaly_score(equipment_id) if anomaly_score > 0.2: factors.append(f"sensor_anomalies_{anomaly_score:.3f}") # Operating hours factor hours_factor = min(0.3, equipment.operating_hours / 10000) # Max 30% at 10k hours # Historical failure patterns pattern_factor = self._apply_failure_patterns(equipment_id) # Combine factors total_probability = base_probability + age_factor + maintenance_factor + anomaly_score + hours_factor + pattern_factor failure_probability = min(0.95, total_probability) # Cap at 95% # Confidence based on data quality confidence = min(0.9, len(readings) / 500) # Higher confidence with more data # Generate recommendation recommendation = self._generate_maintenance_recommendation( failure_probability, equipment, factors ) return { "equipment_id": equipment_id, "equipment_name": equipment.name, "prediction_date": datetime.date.today().isoformat(), "failure_probability": round(failure_probability, 4), "confidence": round(confidence, 3), "factors": factors, "recommendation": recommendation, "next_maintenance": ( equipment.last_maintenance + datetime.timedelta(days=equipment.maintenance_interval_days) ).isoformat() if equipment.last_maintenance else "no_maintenance_history" } def schedule_maintenance(self, equipment_id: str, maintenance_type: str, scheduled_date: datetime.date, technician: str = None) -> Dict: """Schedule maintenance for equipment""" if equipment_id not in self.equipment: return {"error": "Equipment not found"} equipment = self.equipment[equipment_id] # Estimate cost and duration based on equipment type and maintenance type cost_estimate = self._estimate_maintenance_cost(equipment, maintenance_type) duration_hours = self._estimate_maintenance_duration(equipment, maintenance_type) maintenance_id = hashlib.md5( f"{equipment_id}_{maintenance_type}_{datetime.datetime.now().isoformat()}".encode() ).hexdigest()[:12] # Add to maintenance schedule if equipment_id not in self.maintenance_schedules: self.maintenance_schedules[equipment_id] = [] self.maintenance_schedules[equipment_id].append({ "maintenance_id": maintenance_id, "type": maintenance_type, "scheduled_date": scheduled_date.isoformat(), "technician": technician, "estimated_cost": cost_estimate, "estimated_duration_hours": duration_hours, "status": "scheduled" }) # Update equipment status if it's corrective maintenance if maintenance_type == "corrective": equipment.status = "maintenance_required" return { "success": True, "maintenance_id": maintenance_id, "equipment_id": equipment_id, "equipment_name": equipment.name, "maintenance_type": maintenance_type, "scheduled_date": scheduled_date.isoformat(), "estimated_cost": cost_estimate, "estimated_duration_hours": duration_hours } def record_maintenance_performed(self, maintenance_id: str, equipment_id: str, technician: str, cost: float, downtime_hours: float, parts_replaced: List[str], notes: str = "") -> Dict: """Record completed maintenance""" if equipment_id not in self.equipment: return {"error": "Equipment not found"} # Update maintenance schedule for sched in self.maintenance_schedules.get(equipment_id, []): if sched["maintenance_id"] == maintenance_id: sched["status"] = "completed" sched["actual_cost"] = cost sched["actual_duration"] = downtime_hours break # Create maintenance record record = MaintenanceRecord( record_id=maintenance_id, equipment_id=equipment_id, maintenance_type="preventive", # Default, could be parameter performed_date=datetime.datetime.now(), technician=technician, cost=cost, downtime_hours=downtime_hours, parts_replaced=parts_replaced, notes=notes ) self.maintenance_records[maintenance_id] = record # Update equipment equipment = self.equipment[equipment_id] equipment.last_maintenance = datetime.date.today() equipment.status = "operational" # Regenerate maintenance schedule self._generate_maintenance_schedule(equipment_id) return { "success": True, "maintenance_id": maintenance_id, "equipment_id": equipment_id, "recorded_at": datetime.datetime.now().isoformat(), "cost": cost, "downtime_hours": downtime_hours } def get_maintenance_dashboard(self) -> Dict: """Generate comprehensive maintenance dashboard""" total_equipment = len(self.equipment) operational = len([e for e in self.equipment.values() if e.status == "operational"]) maintenance_required = len([e for e in self.equipment.values() if e.status == "maintenance_required"]) # Upcoming maintenance (next 30 days) upcoming_maintenance = [] today = datetime.date.today() thirty_days_later = today + datetime.timedelta(days=30) for equipment_id, schedules in self.maintenance_schedules.items(): for sched in schedules: if (sched["status"] == "scheduled" and today <= datetime.date.fromisoformat(sched["scheduled_date"]) <= thirty_days_later): upcoming_maintenance.append({ "equipment_id": equipment_id, "equipment_name": self.equipment[equipment_id].name, "maintenance_id": sched["maintenance_id"], "scheduled_date": sched["scheduled_date"], "type": sched["type"], "estimated_cost": sched["estimated_cost"] }) # Recent alerts recent_alerts = [ alert for alert in self.platform.maintenance_alerts[-10:] if alert.equipment_type in [e.equipment_type for e in self.equipment.values()] ] # Failure probability analysis high_risk_equipment = [] for equipment_id in self.equipment.keys(): prediction = self.predict_failure_probability(equipment_id) if prediction.get("failure_probability", 0) > 0.3: # >30% risk high_risk_equipment.append({ "equipment_id": equipment_id, "equipment_name": self.equipment[equipment_id].name, "failure_probability": prediction["failure_probability"], "recommendation": prediction["recommendation"] }) return { "timestamp": datetime.datetime.now().isoformat(), "equipment_summary": { "total": total_equipment, "operational": operational, "maintenance_required": maintenance_required, "operational_rate": operational / total_equipment if total_equipment > 0 else 0 }, "upcoming_maintenance": { "count": len(upcoming_maintenance), "total_estimated_cost": sum(m["estimated_cost"] for m in upcoming_maintenance), "items": upcoming_maintenance[:10] # Top 10 }, "alerts": { "recent_count": len(recent_alerts), "critical_alerts": len([a for a in recent_alerts if a.priority == "critical"]), "high_priority_alerts": len([a for a in recent_alerts if a.priority == "high"]) }, "risk_analysis": { "high_risk_equipment_count": len(high_risk_equipment), "high_risk_items": high_risk_equipment[:5] # Top 5 }, "maintenance_efficiency": self._calculate_maintenance_efficiency() } def _analyze_sensor_value(self, equipment_id: str, sensor_type: str, value: float) -> str: """Analyze sensor value against equipment-specific thresholds""" # Define baseline thresholds (could be equipment-specific) thresholds = { "temperature": {"normal": 60, "warning": 80, "critical": 95}, "vibration": {"normal": 2.0, "warning": 4.0, "critical": 6.0}, "pressure": {"normal": 100, "warning": 120, "critical": 140}, "current": {"normal": 10, "warning": 15, "critical": 20}, "noise": {"normal": 70, "warning": 85, "critical": 100} } if sensor_type not in thresholds: return "normal" threshold = thresholds[sensor_type] if value >= threshold["critical"]: return "critical" elif value >= threshold["warning"]: return "warning" else: return "normal" def _calculate_anomaly_score(self, equipment_id: str) -> float: """Calculate anomaly score based on recent sensor readings""" readings = self.sensor_readings[equipment_id] if len(readings) < 10: return 0.0 # Focus on last 50 readings recent_readings = readings[-50:] # Count anomalies anomaly_count = len([r for r in recent_readings if r.status in ["warning", "critical"]]) return anomaly_count / len(recent_readings) def _analyze_sensor_trends(self, equipment_id: str, sensor_type: str) -> Optional[MaintenanceAlert]: """Analyze trends in sensor readings for early warning""" readings = [r for r in self.sensor_readings[equipment_id] if r.sensor_type == sensor_type] if len(readings) < 20: return None # Simple trend analysis: compare recent average to historical average recent_readings = readings[-10:] historical_readings = readings[-20:-10] recent_avg = sum(r.value for r in recent_readings) / len(recent_readings) historical_avg = sum(r.value for r in historical_readings) / len(historical_readings) # Check for concerning trend if recent_avg > historical_avg * 1.2: # 20% increase alert_id = hashlib.md5( f"trend_{equipment_id}_{sensor_type}_{datetime.datetime.now().isoformat()}".encode() ).hexdigest()[:10] return MaintenanceAlert( equipment_id=equipment_id, equipment_type=self.equipment[equipment_id].equipment_type, alert_type="trend_warning", priority="medium", predicted_failure_date=datetime.date.today() + datetime.timedelta(days=30), recommended_action=f"Monitor {sensor_type} sensor, consider preventive maintenance", cost_estimate=200.0 ) return None def _create_maintenance_alert(self, equipment_id: str, sensor_type: str, value: float, severity: str) -> MaintenanceAlert: """Create maintenance alert for critical sensor reading""" alert_id = hashlib.md5( f"critical_{equipment_id}_{sensor_type}_{datetime.datetime.now().isoformat()}".encode() ).hexdigest()[:10] days_to_failure = { "temperature": 7, "vibration": 3, "pressure": 5, "current": 2, "noise": 10 } return MaintenanceAlert( equipment_id=equipment_id, equipment_type=self.equipment[equipment_id].equipment_type, alert_type=f"{sensor_type}_critical", priority=severity, predicted_failure_date=datetime.date.today() + datetime.timedelta(days=days_to_failure.get(sensor_type, 5)), recommended_action=f"Immediate inspection of {sensor_type} system", cost_estimate=500.0 ) def _apply_failure_patterns(self, equipment_id: str) -> float: """Apply historical failure patterns to prediction""" equipment_type = self.equipment[equipment_id].equipment_type # Equipment type failure patterns (simplified) patterns = { "pump": 0.1, "motor": 0.08, "compressor": 0.12, "conveyor": 0.06, "generator": 0.15 } return patterns.get(equipment_type, 0.05) def _generate_maintenance_recommendation(self, probability: float, equipment: Equipment, factors: List[str]) -> str: """Generate maintenance recommendation based on risk factors""" if probability > 0.7: return "Schedule immediate corrective maintenance" elif probability > 0.4: return "Schedule preventive maintenance within 7 days" elif probability > 0.2: return "Increase monitoring frequency, plan preventive maintenance" else: return "Continue normal monitoring" def _estimate_maintenance_cost(self, equipment: Equipment, maintenance_type: str) -> float: """Estimate maintenance cost based on equipment and type""" base_costs = { "preventive": 200, "corrective": 800, "predictive": 400 } equipment_multipliers = { "pump": 1.0, "motor": 1.2, "compressor": 1.5, "conveyor": 2.0, "generator": 3.0 } base_cost = base_costs.get(maintenance_type, 300) multiplier = equipment_multipliers.get(equipment.equipment_type, 1.0) return base_cost * multiplier def _estimate_maintenance_duration(self, equipment: Equipment, maintenance_type: str) -> float: """Estimate maintenance duration in hours""" base_durations = { "preventive": 2.0, "corrective": 6.0, "predictive": 3.0 } equipment_multipliers = { "pump": 1.0, "motor": 1.2, "compressor": 1.5, "conveyor": 2.0, "generator": 2.5 } base_duration = base_durations.get(maintenance_type, 3.0) multiplier = equipment_multipliers.get(equipment.equipment_type, 1.0) return base_duration * multiplier def _generate_maintenance_schedule(self, equipment_id: str): """Generate preventive maintenance schedule""" equipment = self.equipment[equipment_id] # Schedule next 12 months of preventive maintenance schedule = [] base_date = equipment.last_maintenance or datetime.date.today() for i in range(1, 13): # Next 12 maintenance intervals maintenance_date = base_date + datetime.timedelta(days=i * equipment.maintenance_interval_days) # Skip past dates if maintenance_date < datetime.date.today(): continue maintenance_id = hashlib.md5( f"scheduled_{equipment_id}_{maintenance_date.isoformat()}".encode() ).hexdigest()[:12] schedule.append({ "maintenance_id": maintenance_id, "type": "preventive", "scheduled_date": maintenance_date.isoformat(), "technician": None, "estimated_cost": self._estimate_maintenance_cost(equipment, "preventive"), "estimated_duration_hours": self._estimate_maintenance_duration(equipment, "preventive"), "status": "scheduled" }) self.maintenance_schedules[equipment_id] = schedule def _calculate_maintenance_efficiency(self) -> Dict: """Calculate maintenance efficiency metrics""" if not self.maintenance_records: return { "mtbf": 0, # Mean Time Between Failures "mttr": 0, # Mean Time To Repair "maintenance_cost_per_hour": 0, "preventive_ratio": 0 } # Calculate metrics (simplified) total_records = len(self.maintenance_records) preventive_count = len([r for r in self.maintenance_records.values() if r.maintenance_type == "preventive"]) total_cost = sum(r.cost for r in self.maintenance_records.values()) total_downtime = sum(r.downtime_hours for r in self.maintenance_records.values()) return { "total_maintenance_events": total_records, "preventive_maintenance_ratio": preventive_count / total_records, "average_cost_per_event": total_cost / total_records, "average_downtime_per_event": total_downtime / total_records, "total_maintenance_cost": total_cost, "total_downtime_hours": total_downtime } def _generate_reading_id(self) -> str: """Generate unique reading ID""" import hashlib return hashlib.md5( f"reading_{datetime.datetime.now().isoformat()}".encode() ).hexdigest()[:10] # Import required classes from supply_chain_platform import MaintenanceAlert