import json import math import datetime from typing import Dict, List, Optional, Any import dataclasses import collections import hashlib @dataclasses.dataclass class EmergencyEvent: """Emergency event details.""" event_id: str event_type: str # fire, medical, security, evacuation, etc. severity: str # low, medium, high, critical location: str description: str timestamp: datetime.datetime reporter_id: str coordinates: Optional[List[float]] = None # [lat, lng] affected_area: Optional[str] = None @dataclasses.dataclass class ResponseTeam: """Emergency response team details.""" team_id: str team_type: str # fire, medical, security, hazmat, etc. members: List[str] location: str status: str # available, deployed, responding, on_scene equipment: List[str] contact_info: Dict[str, str] @dataclasses.dataclass class ResponseAction: """Emergency response action.""" action_id: str event_id: str team_id: str action_type: str # dispatch, assess, evacuate, treat, secure, etc. status: str # pending, in_progress, completed, cancelled timestamp: datetime.datetime notes: str = "" class EmergencyResponseSystem: """AI-powered emergency response coordination system.""" def __init__(self): self.events = {} self.response_teams = {} self.response_actions = [] self.evacuation_zones = {} self.communication_logs = [] self.resource_inventory = { "medical_kits": 50, "fire_extinguishers": 20, "emergency_vehicles": 5, "personnel": 30 } def register_response_team(self, team_id: str, team_type: str, members: List[str], location: str, equipment: List[str], contact_info: Dict[str, str]) -> bool: """Register a new emergency response team.""" team = ResponseTeam( team_id=team_id, team_type=team_type, members=members, location=location, status="available", equipment=equipment, contact_info=contact_info ) self.response_teams[team_id] = team return True def create_emergency_event(self, event_type: str, severity: str, location: str, description: str, reporter_id: str, coordinates: Optional[List[float]] = None) -> str: """Create a new emergency event.""" event_id = hashlib.sha256(f"{event_type}_{location}_{datetime.datetime.now()}".encode()).hexdigest()[:16] event = EmergencyEvent( event_id=event_id, event_type=event_type, severity=severity, location=location, description=description, timestamp=datetime.datetime.now(), reporter_id=reporter_id, coordinates=coordinates, affected_area=self.determine_affected_area(event_type, location) ) self.events[event_id] = event # Auto-generate initial response plan self.generate_response_plan(event_id) return event_id def determine_affected_area(self, event_type: str, location: str) -> str: """Determine affected area based on event type and location.""" if event_type == "fire": return f"{location} and immediate surroundings" elif event_type == "evacuation": return f"Entire {location} building/area" elif event_type == "medical": return f"{location} - immediate vicinity" else: return f"{location} and adjacent zones" def generate_response_plan(self, event_id: str) -> Dict[str, Any]: """Generate AI-powered response plan for emergency.""" if event_id not in self.events: return {"error": "Event not found"} event = self.events[event_id] response_plan = { "event_id": event_id, "event_type": event.event_type, "severity": event.severity, "recommended_teams": [], "priority_actions": [], "estimated_response_time": 0, "resource_requirements": {}, "evacuation_needed": False } # Determine required teams based on event type if event.event_type == "fire": response_plan["recommended_teams"] = ["fire", "medical", "security"] response_plan["priority_actions"] = [ "dispatch_fire_team", "evacuate_area", "secure_perimeter", "provide_medical_support" ] response_plan["evacuation_needed"] = True response_plan["resource_requirements"] = {"fire_extinguishers": 3, "emergency_vehicles": 2} elif event.event_type == "medical": response_plan["recommended_teams"] = ["medical"] response_plan["priority_actions"] = [ "dispatch_medical_team", "assess_patients", "prepare_transport" ] response_plan["resource_requirements"] = {"medical_kits": 2} elif event.event_type == "security": response_plan["recommended_teams"] = ["security"] response_plan["priority_actions"] = [ "dispatch_security_team", "secure_area", "investigate_threat" ] elif event.event_type == "evacuation": response_plan["recommended_teams"] = ["security", "medical"] response_plan["priority_actions"] = [ "initiate_evacuation", "manage_crowds", "provide_medical_aid" ] response_plan["evacuation_needed"] = True # Calculate estimated response time based on severity severity_time_map = {"low": 15, "medium": 10, "high": 5, "critical": 2} response_plan["estimated_response_time"] = severity_time_map.get(event.severity, 10) # Auto-deploy teams if critical if event.severity == "critical": self.auto_deploy_teams(event_id, response_plan) return response_plan def auto_deploy_teams(self, event_id: str, response_plan: Dict[str, Any]) -> bool: """Automatically deploy teams for critical events.""" for team_type in response_plan["recommended_teams"]: available_teams = [t for t in self.response_teams.values() if t.team_type == team_type and t.status == "available"] if available_teams: team = available_teams[0] self.dispatch_team(team.team_id, event_id, f"Auto-deployed for {response_plan['event_type']} emergency") return True def dispatch_team(self, team_id: str, event_id: str, notes: str = "") -> bool: """Dispatch a response team to an emergency event.""" if team_id not in self.response_teams or event_id not in self.events: return False team = self.response_teams[team_id] event = self.events[event_id] # Create response action action_id = hashlib.sha256(f"{team_id}_{event_id}_{datetime.datetime.now()}".encode()).hexdigest()[:16] action = ResponseAction( action_id=action_id, event_id=event_id, team_id=team_id, action_type="dispatch", status="in_progress", timestamp=datetime.datetime.now(), notes=notes ) self.response_actions.append(action) team.status = "deployed" # Log communication self.log_communication("DISPATCH", team_id, event_id, f"Team dispatched to {event.location}") return True def update_event_status(self, event_id: str, status: str, notes: str = "") -> bool: """Update emergency event status.""" if event_id not in self.events: return False # Log status update self.log_communication("STATUS_UPDATE", "system", event_id, f"Status changed to: {status}") return True def get_active_emergencies(self) -> List[Dict[str, Any]]: """Get all active emergency events.""" current_time = datetime.datetime.now() active_events = [] for event in self.events.values(): # Consider events from last 24 hours as active if (current_time - event.timestamp).total_seconds() < 86400: active_events.append({ "event_id": event.event_id, "event_type": event.event_type, "severity": event.severity, "location": event.location, "timestamp": event.timestamp.isoformat(), "description": event.description, "age_minutes": int((current_time - event.timestamp).total_seconds() / 60) }) return sorted(active_events, key=lambda x: x["timestamp"], reverse=True) def get_team_status(self) -> Dict[str, Any]: """Get status of all response teams.""" team_stats = collections.Counter(team.status for team in self.response_teams.values()) return { "total_teams": len(self.response_teams), "available_teams": team_stats["available"], "deployed_teams": team_stats["deployed"], "responding_teams": team_stats["responding"], "on_scene_teams": team_stats["on_scene"], "teams": [ { "team_id": team.team_id, "team_type": team.team_type, "status": team.status, "location": team.location, "members_count": len(team.members) } for team in self.response_teams.values() ] } def log_communication(self, comm_type: str, source: str, target: str, message: str) -> bool: """Log communication entry.""" log_entry = { "timestamp": datetime.datetime.now().isoformat(), "type": comm_type, "source": source, "target": target, "message": message } self.communication_logs.append(log_entry) return True def generate_emergency_report(self, hours_back: int = 24) -> Dict[str, Any]: """Generate comprehensive emergency response report.""" cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=hours_back) recent_events = [e for e in self.events.values() if e.timestamp > cutoff_time] recent_actions = [a for a in self.response_actions if a.timestamp > cutoff_time] event_stats = collections.Counter(e.event_type for e in recent_events) severity_stats = collections.Counter(e.severity for e in recent_events) action_stats = collections.Counter(a.action_type for a in recent_actions) # Calculate response times response_times = [] for event in recent_events: event_actions = [a for a in recent_actions if a.event_id == event.event_id and a.action_type == "dispatch"] if event_actions: dispatch_time = min(a.timestamp for a in event_actions) response_time = (dispatch_time - event.timestamp).total_seconds() / 60 response_times.append(response_time) avg_response_time = sum(response_times) / len(response_times) if response_times else 0 return { "report_period_hours": hours_back, "total_events": len(recent_events), "event_types": dict(event_stats), "severity_breakdown": dict(severity_stats), "total_actions": len(recent_actions), "action_types": dict(action_stats), "average_response_time_minutes": round(avg_response_time, 2), "resource_utilization": { "medical_kits_used": sum(1 for a in recent_actions if a.action_type == "provide_medical_aid") * 2, "vehicles_deployed": sum(1 for a in recent_actions if a.action_type == "dispatch" and "fire" in a.team_id) }, "team_utilization": self.get_team_status(), "report_generated": datetime.datetime.now().isoformat() } def test_emergency_system(): """Test the emergency response system.""" system = EmergencyResponseSystem() # Register response teams system.register_response_team( "TEAM_FIRE_01", "fire", ["John Smith", "Mike Johnson"], "Station A", ["fire_truck", "hoses", "oxygen_tanks"], {"radio": "CH-1", "phone": "555-0101"} ) system.register_response_team( "TEAM_MED_01", "medical", ["Dr. Sarah Wilson", "Paramedic Tom Brown"], "Medical Bay", ["medical_kit", "defibrillator", "stretcher"], {"radio": "CH-2", "phone": "555-0202"} ) system.register_response_team( "TEAM_SEC_01", "security", ["Officer Davis", "Officer Lee"], "Security Office", ["handcuffs", "radio", "flashlight"], {"radio": "CH-3", "phone": "555-0303"} ) print("✅ Response teams registered successfully") # Create test emergency events event1_id = system.create_emergency_event( "fire", "high", "Building A - Floor 3", "Smoke detected on third floor, possible electrical fire", "sensor_system" ) event2_id = system.create_emergency_event( "medical", "medium", "Main Lobby", "Person collapsed, requires immediate medical attention", "security_guard_01" ) print(f"✅ Emergency events created: {event1_id}, {event2_id}") # Manually dispatch additional teams system.dispatch_team("TEAM_MED_01", event2_id, "Medical emergency response") # Get system status active_events = system.get_active_emergencies() team_status = system.get_team_status() print(f"✅ Active emergencies: {len(active_events)}") print(f"✅ Team status: {team_status['available_teams']} available, {team_status['deployed_teams']} deployed") # Generate report report = system.generate_emergency_report() print(f"✅ Emergency report generated: {report['total_events']} events, {report['average_response_time_minutes']} min avg response time") return system if __name__ == "__main__": test_emergency_system()