import json import math import datetime from typing import Dict, List, Optional, Any, Tuple import dataclasses import collections import hashlib @dataclasses.dataclass class FlowData: """Crowd flow data point.""" timestamp: datetime.datetime location: str people_count: int flow_rate: float # people per minute direction: str # north, south, east, west, mixed density: float # people per square meter @dataclasses.dataclass class FlowPrediction: """Crowd flow prediction.""" timestamp: datetime.datetime location: str predicted_count: int confidence: float prediction_horizon: int # minutes into future risk_level: str class CrowdFlowAnalyzer: """AI-powered crowd flow analysis and prediction system.""" def __init__(self): self.flow_history = collections.deque(maxlen=50000) self.predictions = [] self.thresholds = { "normal_density": 2.0, # people/m² "high_density": 4.0, # people/m² "critical_density": 6.0, # people/m² "max_capacity": 1000 # people } self.zones = {} def add_zone(self, zone_id: str, name: str, capacity: int, area: float) -> bool: """Add a monitoring zone.""" self.zones[zone_id] = { "zone_id": zone_id, "name": name, "capacity": capacity, "area": area, "current_count": 0, "status": "normal" } return True def simulate_crowd_data(self, location: str, base_count: int = 50) -> FlowData: """Generate simulated crowd flow data.""" current_time = datetime.datetime.now() # Add time-based variations (rush hours, etc.) hour_factor = 1.0 hour = current_time.hour if 8 <= hour <= 10 or 17 <= hour <= 19: # Rush hours hour_factor = 2.0 elif 12 <= hour <= 13: # Lunch hour_factor = 1.5 elif 22 <= hour or hour <= 6: # Night hour_factor = 0.3 # Random variations random_factor = 0.8 + (hash(f"{location}_{current_time.strftime('%Y%m%d%H%M')}") % 40) / 100 people_count = int(base_count * hour_factor * random_factor) flow_rate = people_count * (0.5 + (hash(location) % 10) / 20) # Determine flow direction directions = ["north", "south", "east", "west", "mixed"] direction = directions[hash(f"{location}_{current_time.hour}") % len(directions)] # Calculate density (people/m²) area = 100.0 + (hash(location) % 200) # 100-300 m² density = people_count / area return FlowData( timestamp=current_time, location=location, people_count=people_count, flow_rate=flow_rate, direction=direction, density=density ) def analyze_flow_patterns(self, location: str, hours_back: int = 24) -> Dict[str, Any]: """Analyze historical flow patterns for a location.""" cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=hours_back) location_data = [d for d in self.flow_history if d.location == location and d.timestamp > cutoff_time] if not location_data: return {"error": "No data available for analysis"} # Calculate statistics counts = [d.people_count for d in location_data] densities = [d.density for d in location_data] flow_rates = [d.flow_rate for d in location_data] # Identify peak hours hourly_avg = collections.defaultdict(list) for data in location_data: hourly_avg[data.timestamp.hour].append(data.people_count) peak_hours = {} for hour, hour_counts in hourly_avg.items(): peak_hours[hour] = sum(hour_counts) / len(hour_counts) top_peak_hours = sorted(peak_hours.items(), key=lambda x: x[1], reverse=True)[:3] return { "location": location, "analysis_period_hours": hours_back, "total_data_points": len(location_data), "avg_people_count": sum(counts) / len(counts), "max_people_count": max(counts), "min_people_count": min(counts), "avg_density": sum(densities) / len(densities), "max_density": max(densities), "avg_flow_rate": sum(flow_rates) / len(flow_rates), "peak_hours": [{"hour": h, "avg_count": c} for h, c in top_peak_hours], "analysis_complete": True } def predict_crowd_flow(self, location: str, prediction_horizon: int = 60) -> List[FlowPrediction]: """Predict crowd flow for the future.""" # Get recent data for prediction recent_data = [d for d in self.flow_history if d.location == location] recent_data.sort(key=lambda x: x.timestamp) if len(recent_data) < 5: return [FlowPrediction( timestamp=datetime.datetime.now() + datetime.timedelta(minutes=prediction_horizon), location=location, predicted_count=50, confidence=0.3, prediction_horizon=prediction_horizon, risk_level="low" )] # Simple linear trend prediction recent_counts = [d.people_count for d in recent_data[-10:]] trend = (recent_counts[-1] - recent_counts[0]) / len(recent_counts) # Apply trend to current count current_count = recent_counts[-1] predicted_count = int(current_count + (trend * prediction_horizon / 5)) # Calculate confidence based on data consistency count_std = math.sqrt(sum((c - sum(recent_counts)/len(recent_counts))**2 for c in recent_counts) / len(recent_counts)) confidence = max(0.3, min(0.95, 1.0 - (count_std / (sum(recent_counts)/len(recent_counts))))) # Determine risk level risk_level = "low" if predicted_count > self.thresholds["max_capacity"] * 0.8: risk_level = "high" elif predicted_count > self.thresholds["max_capacity"] * 0.6: risk_level = "medium" prediction = FlowPrediction( timestamp=datetime.datetime.now() + datetime.timedelta(minutes=prediction_horizon), location=location, predicted_count=predicted_count, confidence=confidence, prediction_horizon=prediction_horizon, risk_level=risk_level ) self.predictions.append(prediction) return [prediction] def detect_anomalies(self, location: str) -> List[Dict[str, Any]]: """Detect anomalies in crowd flow.""" location_data = [d for d in self.flow_history if d.location == location] anomalies = [] if len(location_data) < 10: return anomalies # Calculate normal ranges counts = [d.people_count for d in location_data] densities = [d.density for d in location_data] avg_count = sum(counts) / len(counts) std_count = math.sqrt(sum((c - avg_count)**2 for c in counts) / len(counts)) avg_density = sum(densities) / len(densities) std_density = math.sqrt(sum((d - avg_density)**2 for d in densities) / len(densities)) # Check recent data for anomalies recent_data = location_data[-5:] for data in recent_data: anomaly_type = None severity = "low" # Count anomalies if abs(data.people_count - avg_count) > 2 * std_count: anomaly_type = "unusual_count" if abs(data.people_count - avg_count) > 3 * std_count: severity = "high" # Density anomalies if data.density > self.thresholds["critical_density"]: anomaly_type = "critical_density" severity = "critical" elif data.density > self.thresholds["high_density"]: anomaly_type = "high_density" severity = "medium" if anomaly_type: anomalies.append({ "anomaly_type": anomaly_type, "severity": severity, "timestamp": data.timestamp.isoformat(), "location": location, "people_count": data.people_count, "density": data.density, "flow_rate": data.flow_rate }) return anomalies def process_real_time_data(self, location: str) -> Dict[str, Any]: """Process real-time crowd data and return insights.""" # Generate new data flow_data = self.simulate_crowd_data(location) self.flow_history.append(flow_data) # Analyze patterns analysis = self.analyze_flow_patterns(location, hours_back=2) # Predict future flow predictions = self.predict_crowd_flow(location, prediction_horizon=30) # Detect anomalies anomalies = self.detect_anomalies(location) # Generate alerts alerts = [] if flow_data.density > self.thresholds["critical_density"]: alerts.append({ "alert_type": "critical_density", "severity": "critical", "location": location, "density": flow_data.density, "people_count": flow_data.people_count, "timestamp": flow_data.timestamp.isoformat() }) elif flow_data.density > self.thresholds["high_density"]: alerts.append({ "alert_type": "high_density", "severity": "medium", "location": location, "density": flow_data.density, "people_count": flow_data.people_count, "timestamp": flow_data.timestamp.isoformat() }) return { "current_data": { "timestamp": flow_data.timestamp.isoformat(), "location": location, "people_count": flow_data.people_count, "density": flow_data.density, "flow_rate": flow_data.flow_rate, "direction": flow_data.direction }, "analysis": analysis, "predictions": [ { "predicted_count": p.predicted_count, "confidence": p.confidence, "risk_level": p.risk_level, "horizon_minutes": p.prediction_horizon } for p in predictions ], "anomalies": anomalies, "alerts": alerts, "processing_complete": True } def test_crowd_flow_system(): """Test the crowd flow analysis system.""" analyzer = CrowdFlowAnalyzer() # Add monitoring zones analyzer.add_zone("ZONE_001", "Main Plaza", 500, 200.0) analyzer.add_zone("ZONE_002", "Entrance Area", 200, 50.0) analyzer.add_zone("ZONE_003", "Food Court", 300, 150.0) print("✅ Monitoring zones added successfully") # Process real-time data for multiple locations locations = ["Main Plaza", "Entrance Area", "Food Court"] for i in range(5): for location in locations: result = analyzer.process_real_time_data(location) current = result["current_data"] alerts_count = len(result["alerts"]) anomalies_count = len(result["anomalies"]) print(f"✅ {location}: {current['people_count']} people, density: {current['density']:.2f}") if alerts_count > 0: print(f" ⚠️ {alerts_count} alerts generated") if anomalies_count > 0: print(f" 🔍 {anomalies_count} anomalies detected") print("✅ Crowd flow analysis system operational") return analyzer if __name__ == "__main__": test_crowd_flow_system()