import json import math import hashlib import datetime from typing import Dict, List, Optional, Any, Tuple import dataclasses import re import base64 import itertools import collections @dataclasses.dataclass class FaceDetection: """Face detection result from video frame.""" face_id: str confidence: float bbox: List[int] # [x, y, width, height] timestamp: datetime.datetime camera_id: str @dataclasses.dataclass class PersonMatch: """Person match against watchlist database.""" face_id: str match_id: Optional[str] confidence: float risk_level: str # low, medium, high, critical alert_sent: bool = False class VideoSurveillanceSystem: """AI-powered video surveillance with facial recognition.""" def __init__(self): self.camera_feeds = {} self.face_database = {} self.watchlist = {} self.alerts = [] self.detection_history = collections.deque(maxlen=10000) def register_camera(self, camera_id: str, location: str, resolution: Tuple[int, int]) -> bool: """Register a new camera in the surveillance system.""" camera_config = { "camera_id": camera_id, "location": location, "resolution": resolution, "status": "active", "registered_at": datetime.datetime.now().isoformat() } self.camera_feeds[camera_id] = camera_config return True def simulate_face_detection(self, camera_id: str, frame_data: str) -> List[FaceDetection]: """Simulate face detection in video frame.""" faces = [] num_faces = hash(frame_data + camera_id) % 4 + 1 # 1-4 faces for i in range(num_faces): face_id = hashlib.sha256(f"{frame_data}_{camera_id}_{i}".encode()).hexdigest()[:16] confidence = 0.75 + (hash(face_id) % 25) / 100 # 75-100% confidence bbox = [ (hash(face_id) % 200) + 50, # x (hash(face_id + "y") % 200) + 50, # y (hash(face_id + "w") % 100) + 80, # width (hash(face_id + "h") % 100) + 100 # height ] detection = FaceDetection( face_id=face_id, confidence=confidence, bbox=bbox, timestamp=datetime.datetime.now(), camera_id=camera_id ) faces.append(detection) self.detection_history.append(detection) return faces def match_face_to_watchlist(self, face: FaceDetection) -> PersonMatch: """Match detected face against watchlist.""" match_id = None confidence = 0.0 risk_level = "low" # Simulate matching against watchlist face_hash = hashlib.md5(f"watchlist_{face.face_id}".encode()).hexdigest() if int(face_hash[:8], 16) % 10 < 3: # 30% chance of match match_id = f"person_{face_hash[:8]}" confidence = 0.6 + (int(face_hash[8:10], 16) % 40) / 100 # Determine risk level based on match characteristics risk_score = int(face_hash[10:12], 16) % 100 if risk_score > 80: risk_level = "critical" elif risk_score > 60: risk_level = "high" elif risk_score > 40: risk_level = "medium" return PersonMatch( face_id=face.face_id, match_id=match_id, confidence=confidence, risk_level=risk_level ) def process_video_frame(self, camera_id: str, frame_data: str) -> Dict[str, Any]: """Process a single video frame for security monitoring.""" if camera_id not in self.camera_feeds: return {"error": "Camera not registered"} # Detect faces in frame faces = self.simulate_face_detection(camera_id, frame_data) matches = [] alerts = [] for face in faces: match = self.match_face_to_watchlist(face) matches.append(match) # Generate alerts for high-risk matches if match.match_id and match.risk_level in ["high", "critical"]: alert = { "alert_id": hashlib.sha256(f"{match.face_id}_{datetime.datetime.now()}".encode()).hexdigest()[:16], "camera_id": camera_id, "face_id": face.face_id, "match_id": match.match_id, "risk_level": match.risk_level, "confidence": match.confidence, "timestamp": datetime.datetime.now().isoformat(), "location": self.camera_feeds[camera_id]["location"], "bbox": face.bbox } alerts.append(alert) self.alerts.append(alert) return { "camera_id": camera_id, "timestamp": datetime.datetime.now().isoformat(), "faces_detected": len(faces), "watchlist_matches": len([m for m in matches if m.match_id]), "high_risk_alerts": len(alerts), "alerts": alerts, "processing_complete": True } def get_surveillance_stats(self) -> Dict[str, Any]: """Get surveillance system statistics.""" total_detections = len(self.detection_history) total_alerts = len(self.alerts) active_cameras = len([c for c in self.camera_feeds.values() if c["status"] == "active"]) risk_breakdown = collections.Counter(alert["risk_level"] for alert in self.alerts) return { "active_cameras": active_cameras, "total_face_detections": total_detections, "total_security_alerts": total_alerts, "risk_level_breakdown": dict(risk_breakdown), "cameras": list(self.camera_feeds.keys()), "system_uptime": datetime.datetime.now().isoformat() } def test_surveillance_system(): """Test the video surveillance system.""" system = VideoSurveillanceSystem() # Register cameras system.register_camera("CAM_001", "Main Entrance", (1920, 1080)) system.register_camera("CAM_002", "Parking Lot", (1280, 720)) system.register_camera("CAM_003", "Corridor A", (1920, 1080)) print("✅ Cameras registered successfully") # Process some video frames test_frames = ["frame_data_1", "frame_data_2", "frame_data_3"] for i, frame_data in enumerate(test_frames): camera_id = f"CAM_{(i % 3) + 1:03d}" result = system.process_video_frame(camera_id, frame_data) print(f"✅ Frame {i+1} processed - {result['faces_detected']} faces, {result['high_risk_alerts']} alerts") # Get statistics stats = system.get_surveillance_stats() print(f"✅ System stats - {stats['active_cameras']} cameras, {stats['total_face_detections']} detections, {stats['total_security_alerts']} alerts") return system if __name__ == "__main__": test_surveillance_system()