""" Logistics Optimization Platform - Core Infrastructure Real-time tracking, inventory management, scheduling, and predictive maintenance """ import json import math import base64 import hashlib import datetime import re import string import itertools import collections import dataclasses from typing import Dict, List, Optional, Any, Union, Tuple @dataclasses.dataclass class IoTDevice: device_id: str device_type: str location: Dict[str, float] status: str last_update: datetime.datetime metadata: Dict[str, Any] @dataclasses.dataclass class FleetVehicle: vehicle_id: str driver_id: str location: Dict[str, float] destination: Optional[Dict[str, float]] cargo: List[str] fuel_level: float maintenance_due: datetime.datetime status: str @dataclasses.dataclass class InventoryItem: item_id: str name: str category: str quantity: int location: str min_threshold: int last_updated: datetime.datetime unit_cost: float @dataclasses.dataclass class StaffMember: staff_id: str name: str role: str skills: List[str] availability: Dict[str, List[Tuple[datetime.datetime, datetime.datetime]]] current_assignment: Optional[str] hourly_rate: float @dataclasses.dataclass class MaintenanceAlert: equipment_id: str equipment_type: str alert_type: str priority: str predicted_failure_date: datetime.datetime recommended_action: str cost_estimate: float class SupplyChainPlatform: """Core logistics optimization platform with IoT integration""" def __init__(self): self.iot_devices: Dict[str, IoTDevice] = {} self.fleet: Dict[str, FleetVehicle] = {} self.inventory: Dict[str, InventoryItem] = {} self.staff: Dict[str, StaffMember] = {} self.maintenance_alerts: List[MaintenanceAlert] = [] self.shipments: Dict[str, Dict] = {} def register_iot_device(self, device_id: str, device_type: str, location: Dict[str, float], metadata: Dict = None) -> bool: """Register new IoT device in the system""" try: device = IoTDevice( device_id=device_id, device_type=device_type, location=location, status="active", last_update=datetime.datetime.now(), metadata=metadata or {} ) self.iot_devices[device_id] = device return True except Exception as e: print(f"Error registering IoT device: {e}") return False def process_iot_data(self, device_id: str, data: Dict) -> Dict: """Process incoming IoT data and trigger appropriate actions""" if device_id not in self.iot_devices: return {"error": "Device not found"} device = self.iot_devices[device_id] device.last_update = datetime.datetime.now() device.metadata.update(data) # Trigger alerts based on data alerts = [] if "temperature" in data and data["temperature"] > 30: alerts.append({"type": "temperature_alert", "device": device_id}) if "vibration" in data and data["vibration"] > 0.8: alerts.append({"type": "vibration_alert", "device": device_id}) return { "status": "processed", "device": device_id, "alerts": alerts, "timestamp": device.last_update.isoformat() } def get_active_devices_by_type(self, device_type: str) -> List[IoTDevice]: """Get all active devices of a specific type""" return [device for device in self.iot_devices.values() if device.device_type == device_type and device.status == "active"] def generate_supply_chain_report(self) -> Dict: """Generate comprehensive supply chain status report""" active_devices = len([d for d in self.iot_devices.values() if d.status == "active"]) active_vehicles = len([v for v in self.fleet.values() if v.status == "active"]) low_stock_items = len([i for i in self.inventory.values() if i.quantity < i.min_threshold]) return { "timestamp": datetime.datetime.now().isoformat(), "iot_devices": { "total": len(self.iot_devices), "active": active_devices, "inactive": len(self.iot_devices) - active_devices }, "fleet": { "total_vehicles": len(self.fleet), "active": active_vehicles, "in_maintenance": len([v for v in self.fleet.values() if v.status == "maintenance"]) }, "inventory": { "total_items": len(self.inventory), "low_stock": low_stock_items, "total_value": sum(item.quantity * item.unit_cost for item in self.inventory.values()) }, "maintenance_alerts": len(self.maintenance_alerts) }