""" Real-time Fleet Tracking and Optimization System """ import json import math import datetime import dataclasses import hashlib from typing import Dict, List, Optional, Tuple, Any @dataclasses.dataclass class Route: route_id: str waypoints: List[Dict[str, float]] distance: float estimated_time: datetime.timedelta fuel_cost: float traffic_factor: float priority: str @dataclasses.dataclass class Delivery: delivery_id: str vehicle_id: str pickup_location: Dict[str, float] delivery_location: Dict[str, float] cargo: List[str] deadline: datetime.datetime status: str actual_arrival: Optional[datetime.datetime] class FleetTrackingSystem: """Real-time fleet tracking and optimization engine""" def __init__(self, platform): self.platform = platform self.routes: Dict[str, Route] = {} self.deliveries: Dict[str, Delivery] = {} self.traffic_data: Dict[str, Dict] = {} def add_vehicle(self, vehicle_id: str, driver_id: str, location: Dict[str, float], fuel_level: float = 100.0) -> bool: """Add new vehicle to fleet""" try: vehicle = FleetVehicle( vehicle_id=vehicle_id, driver_id=driver_id, location=location, destination=None, cargo=[], fuel_level=fuel_level, maintenance_due=datetime.datetime.now() + datetime.timedelta(days=30), status="available" ) self.platform.fleet[vehicle_id] = vehicle return True except Exception as e: print(f"Error adding vehicle: {e}") return False def update_vehicle_location(self, vehicle_id: str, location: Dict[str, float]) -> Dict: """Update vehicle location in real-time""" if vehicle_id not in self.platform.fleet: return {"error": "Vehicle not found"} vehicle = self.platform.fleet[vehicle_id] old_location = vehicle.location.copy() vehicle.location = location # Check if vehicle reached destination if vehicle.destination: distance = self._calculate_distance(location, vehicle.destination) if distance < 0.1: # Within 100m vehicle.destination = None vehicle.status = "available" return { "vehicle_id": vehicle_id, "old_location": old_location, "new_location": location, "timestamp": datetime.datetime.now().isoformat(), "status": vehicle.status } def calculate_optimal_route(self, start: Dict[str, float], end: Dict[str, float], constraints: Dict = None) -> Route: """Calculate optimal route considering traffic, fuel, and time""" # Basic distance calculation (Haversine formula) distance = self._calculate_distance(start, end) # Get traffic factor (mock implementation) traffic_factor = self._get_traffic_factor(start, end) # Calculate estimated time based on average speed and traffic avg_speed_kmh = 50.0 # Average speed adjusted_speed = avg_speed_kmh * (1.0 - traffic_factor * 0.5) estimated_time = datetime.timedelta(hours=distance / adjusted_speed) # Calculate fuel cost fuel_consumption = distance * 0.08 # 8L per 100km fuel_price = 1.5 # $1.5 per liter fuel_cost = fuel_consumption * fuel_price route_id = hashlib.md5( f"{start}_{end}_{datetime.datetime.now().isoformat()}".encode() ).hexdigest()[:8] route = Route( route_id=route_id, waypoints=[start, end], distance=distance, estimated_time=estimated_time, fuel_cost=fuel_cost, traffic_factor=traffic_factor, priority=constraints.get("priority", "normal") if constraints else "normal" ) self.routes[route_id] = route return route def assign_delivery(self, vehicle_id: str, delivery: Delivery) -> Dict: """Assign delivery to vehicle with optimization""" if vehicle_id not in self.platform.fleet: return {"error": "Vehicle not found"} vehicle = self.platform.fleet[vehicle_id] # Check vehicle availability if vehicle.status != "available": return {"error": "Vehicle not available"} # Calculate route route = self.calculate_optimal_route( vehicle.location, delivery.delivery_location, {"priority": "high"} ) # Assign delivery vehicle.destination = delivery.delivery_location vehicle.cargo.extend(delivery.cargo) vehicle.status = "in_transit" delivery.status = "assigned" delivery.vehicle_id = vehicle_id self.deliveries[delivery.delivery_id] = delivery return { "success": True, "vehicle_id": vehicle_id, "delivery_id": delivery.delivery_id, "route_id": route.route_id, "estimated_arrival": ( datetime.datetime.now() + route.estimated_time ).isoformat(), "fuel_cost": route.fuel_cost } def optimize_fleet_routes(self) -> Dict: """Optimize all active fleet routes""" optimizations = [] # Group deliveries by geographical area area_deliveries = self._group_deliveries_by_area() # Optimize each area for area, deliveries in area_deliveries.items(): optimized = self._optimize_area_deliveries(deliveries) optimizations.extend(optimized) return { "optimizations_count": len(optimizations), "estimated_savings": sum(opt["savings"] for opt in optimizations), "optimizations": optimizations[:5] # Return top 5 } def get_fleet_status(self) -> Dict: """Get real-time fleet status""" vehicles = list(self.platform.fleet.values()) return { "total_vehicles": len(vehicles), "available": len([v for v in vehicles if v.status == "available"]), "in_transit": len([v for v in vehicles if v.status == "in_transit"]), "maintenance": len([v for v in vehicles if v.status == "maintenance"]), "active_deliveries": len([d for d in self.deliveries.values() if d.status in ["assigned", "in_transit"]]), "average_fuel_level": sum(v.fuel_level for v in vehicles) / len(vehicles) if vehicles else 0, "vehicles_needing_fuel": len([v for v in vehicles if v.fuel_level < 20]) } def _calculate_distance(self, point1: Dict[str, float], point2: Dict[str, float]) -> float: """Calculate distance between two points using Haversine formula""" lat1, lon1 = math.radians(point1["lat"]), math.radians(point1["lng"]) lat2, lon2 = math.radians(point2["lat"]), math.radians(point2["lng"]) dlat = lat2 - lat1 dlon = lon2 - lon1 a = (math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2) c = 2 * math.asin(math.sqrt(a)) r = 6371 # Earth radius in kilometers return c * r def _get_traffic_factor(self, start: Dict[str, float], end: Dict[str, float]) -> float: """Get traffic factor between two points (mock implementation)""" # In real implementation, this would fetch real-time traffic data hour = datetime.datetime.now().hour if 7 <= hour <= 9 or 17 <= hour <= 19: # Rush hours return 0.6 elif 10 <= hour <= 16: # Moderate traffic return 0.3 else: # Light traffic return 0.1 def _group_deliveries_by_area(self) -> Dict[str, List[Delivery]]: """Group deliveries by geographical area""" area_deliveries = {} for delivery in self.deliveries.values(): if delivery.status in ["pending", "assigned"]: # Simple area grouping (rounded coordinates) lat_key = round(delivery.delivery_location["lat"], 1) lng_key = round(delivery.delivery_location["lng"], 1) area_key = f"{lat_key}_{lng_key}" if area_key not in area_deliveries: area_deliveries[area_key] = [] area_deliveries[area_key].append(delivery) return area_deliveries def _optimize_area_deliveries(self, deliveries: List[Delivery]) -> List[Dict]: """Optimize deliveries within a specific area""" optimizations = [] # Simple nearest neighbor optimization if len(deliveries) > 1: # Calculate potential savings by combining deliveries total_individual_cost = sum( self._calculate_delivery_cost(d) for d in deliveries ) # Calculate combined delivery cost combined_cost = self._calculate_combined_delivery_cost(deliveries) if combined_cost < total_individual_cost: savings = total_individual_cost - combined_cost optimizations.append({ "type": "route_combination", "deliveries": [d.delivery_id for d in deliveries], "savings": savings, "efficiency_gain": (savings / total_individual_cost) * 100 }) return optimizations def _calculate_delivery_cost(self, delivery: Delivery) -> float: """Calculate individual delivery cost""" if not delivery.pickup_location or not delivery.delivery_location: return 100.0 # Default cost distance = self._calculate_distance( delivery.pickup_location, delivery.delivery_location ) # Base cost + fuel cost + time cost base_cost = 20.0 fuel_cost = distance * 0.08 * 1.5 # Fuel consumption * fuel price time_cost = (distance / 50.0) * 30.0 # Time * hourly rate return base_cost + fuel_cost + time_cost def _calculate_combined_delivery_cost(self, deliveries: List[Delivery]) -> float: """Calculate combined delivery cost for multiple deliveries""" if not deliveries: return 0.0 # Simplified combined cost calculation total_distance = 0.0 current_location = deliveries[0].pickup_location for delivery in deliveries: if current_location and delivery.pickup_location: total_distance += self._calculate_distance( current_location, delivery.pickup_location ) current_location = delivery.delivery_location base_cost = 30.0 # Higher base cost for combined delivery fuel_cost = total_distance * 0.08 * 1.5 time_cost = (total_distance / 50.0) * 35.0 # Higher time rate for combined return base_cost + fuel_cost + time_cost # Import required classes from main platform from supply_chain_platform import FleetVehicle, Delivery