""" Fraud Detection Dashboard Real-time monitoring and analytics dashboard for fraud prevention """ import json import hashlib import datetime import math from typing import Dict, List, Optional, Any, Union, Tuple from dataclasses import dataclass, asdict from collections import defaultdict, deque @dataclass class FraudAlert: alert_id: str transaction_id: str user_id: str risk_level: str fraud_probability: float alert_type: str description: str timestamp: str status: str # 'active', 'investigating', 'resolved' @dataclass class DashboardMetrics: total_transactions: int blocked_transactions: int flagged_transactions: int fraud_rate: float blocked_rate: float active_alerts: int resolved_alerts: int average_risk_score: float class FraudDetectionDashboard: """Real-time fraud detection dashboard""" def __init__(self): self.alerts = deque(maxlen=1000) self.metrics_history = deque(maxlen=100) self.active_investigations = {} self.resolution_history = [] self.dashboard_config = { 'auto_refresh_interval': 30, # seconds 'alert_threshold': 0.6, 'max_alerts_display': 50, 'risk_levels': ['low', 'medium', 'high', 'critical'] } def add_fraud_alert(self, transaction_data: Dict[str, Any], fraud_prediction: Dict[str, Any]) -> FraudAlert: """Add new fraud alert to dashboard""" alert_id = self._generate_alert_id() alert = FraudAlert( alert_id=alert_id, transaction_id=transaction_data.get('transaction_id', ''), user_id=transaction_data.get('user_id', ''), risk_level=fraud_prediction.get('risk_level', 'medium'), fraud_probability=fraud_prediction.get('fraud_probability', 0.0), alert_type=self._determine_alert_type(fraud_prediction), description=self._generate_alert_description(transaction_data, fraud_prediction), timestamp=datetime.datetime.now().isoformat(), status='active' ) self.alerts.append(alert) # Auto-flag for investigation if high risk if alert.risk_level in ['high', 'critical']: self._auto_create_investigation(alert) return alert def get_dashboard_data(self) -> Dict[str, Any]: """Get complete dashboard data""" current_metrics = self._calculate_current_metrics() recent_alerts = self._get_recent_alerts() active_investigations = self._get_active_investigations() risk_trends = self._calculate_risk_trends() return { 'timestamp': datetime.datetime.now().isoformat(), 'metrics': asdict(current_metrics), 'recent_alerts': [asdict(alert) for alert in recent_alerts], 'active_investigations': active_investigations, 'risk_trends': risk_trends, 'performance_metrics': self._get_performance_metrics() } def update_alert_status(self, alert_id: str, new_status: str, notes: str = "") -> bool: """Update alert status""" for alert in self.alerts: if alert.alert_id == alert_id: old_status = alert.status alert.status = new_status # Record resolution if applicable if new_status in ['resolved', 'false_positive']: self.resolution_history.append({ 'alert_id': alert_id, 'resolved_at': datetime.datetime.now().isoformat(), 'resolution_type': new_status, 'notes': notes }) # Update investigation if exists if alert_id in self.active_investigations: if new_status == 'resolved': self.active_investigations[alert_id]['status'] = 'resolved' self.active_investigations[alert_id]['resolved_at'] = datetime.datetime.now().isoformat() else: self.active_investigations[alert_id]['status'] = new_status return True return False def get_fraud_analytics(self, period_days: int = 30) -> Dict[str, Any]: """Get comprehensive fraud analytics""" cutoff_date = datetime.datetime.now() - datetime.timedelta(days=period_days) # Filter alerts within period period_alerts = [ alert for alert in self.alerts if datetime.datetime.fromisoformat(alert.timestamp) > cutoff_date ] # Calculate analytics analytics = { 'period_days': period_days, 'total_alerts': len(period_alerts), 'risk_distribution': self._calculate_risk_distribution(period_alerts), 'alert_type_distribution': self._calculate_alert_type_distribution(period_alerts), 'resolution_rate': self._calculate_resolution_rate(period_alerts), 'false_positive_rate': self._calculate_false_positive_rate(period_alerts), 'average_resolution_time': self._calculate_average_resolution_time(period_alerts), 'top_risk_factors': self._identify_top_risk_factors(period_alerts), 'time_series_data': self._generate_time_series_data(period_alerts, period_days) } return analytics def _generate_alert_id(self) -> str: """Generate unique alert ID""" timestamp = datetime.datetime.now().isoformat() raw_string = f"ALERT_{timestamp}_{len(self.alerts)}" return hashlib.sha256(raw_string.encode()).hexdigest()[:12] def _determine_alert_type(self, fraud_prediction: Dict[str, Any]) -> str: """Determine alert type based on fraud prediction""" factors = fraud_prediction.get('factors', {}) if factors.get('velocity', 0) > 0.7: return 'velocity_anomaly' elif factors.get('behavior', 0) > 0.7: return 'behavior_anomaly' elif factors.get('network', 0) > 0.7: return 'network_anomaly' elif factors.get('anomaly', 0) > 0.7: return 'transaction_anomaly' else: return 'general_fraud' def _generate_alert_description(self, transaction_data: Dict[str, Any], fraud_prediction: Dict[str, Any]) -> str: """Generate human-readable alert description""" risk_level = fraud_prediction.get('risk_level', 'medium') fraud_probability = fraud_prediction.get('fraud_probability', 0.0) recommendation = fraud_prediction.get('recommendation', '') description = f"High-risk transaction detected (Risk: {risk_level}, Probability: {fraud_probability:.2f}). " if recommendation: description += f"Recommendation: {recommendation}. " # Add specific indicators amount = transaction_data.get('amount', 0) if amount > 1000: description += f"High-value transaction: ${amount:.2f}. " user_id = transaction_data.get('user_id', '') if user_id: description += f"User: {user_id}. " return description.strip() def _auto_create_investigation(self, alert: FraudAlert): """Automatically create investigation for high-risk alerts""" investigation = { 'investigation_id': self._generate_investigation_id(), 'alert_id': alert.alert_id, 'transaction_id': alert.transaction_id, 'user_id': alert.user_id, 'risk_level': alert.risk_level, 'created_at': datetime.datetime.now().isoformat(), 'status': 'active', 'assigned_to': 'auto', 'priority': self._determine_priority(alert.risk_level), 'notes': 'Auto-created investigation due to high risk level' } self.active_investigations[alert.alert_id] = investigation def _generate_investigation_id(self) -> str: """Generate unique investigation ID""" timestamp = datetime.datetime.now().isoformat() raw_string = f"INV_{timestamp}_{len(self.active_investigations)}" return hashlib.sha256(raw_string.encode()).hexdigest()[:12] def _determine_priority(self, risk_level: str) -> str: """Determine investigation priority""" priority_map = { 'critical': 'urgent', 'high': 'high', 'medium': 'medium', 'low': 'low' } return priority_map.get(risk_level, 'medium') def _calculate_current_metrics(self) -> DashboardMetrics: """Calculate current dashboard metrics""" # Get recent transactions (simulated) total_transactions = 10000 # This would come from transaction service # Calculate from alerts blocked_transactions = len([alert for alert in self.alerts if alert.status in ['resolved'] and alert.risk_level in ['high', 'critical']]) flagged_transactions = len([alert for alert in self.alerts if alert.status == 'active']) # Calculate rates fraud_rate = flagged_transactions / max(total_transactions, 1) * 100 blocked_rate = blocked_transactions / max(total_transactions, 1) * 100 # Active and resolved alerts active_alerts = len([alert for alert in self.alerts if alert.status == 'active']) resolved_alerts = len([alert for alert in self.alerts if alert.status in ['resolved', 'false_positive']]) # Average risk score if self.alerts: avg_risk = sum(alert.fraud_probability for alert in self.alerts) / len(self.alerts) else: avg_risk = 0.0 return DashboardMetrics( total_transactions=total_transactions, blocked_transactions=blocked_transactions, flagged_transactions=flagged_transactions, fraud_rate=fraud_rate, blocked_rate=blocked_rate, active_alerts=active_alerts, resolved_alerts=resolved_alerts, average_risk_score=avg_risk ) def _get_recent_alerts(self, limit: int = 20) -> List[FraudAlert]: """Get recent alerts""" return list(self.alerts)[-limit:] def _get_active_investigations(self) -> List[Dict[str, Any]]: """Get active investigations""" return [ investigation for investigation in self.active_investigations.values() if investigation['status'] == 'active' ] def _calculate_risk_trends(self) -> Dict[str, Any]: """Calculate risk trends over time""" # Group alerts by hour for last 24 hours current_time = datetime.datetime.now() hourly_data = defaultdict(int) for alert in self.alerts: alert_time = datetime.datetime.fromisoformat(alert.timestamp) hours_diff = (current_time - alert_time).total_seconds() / 3600 if hours_diff < 24: hour_bucket = int(hours_diff) hourly_data[hour_bucket] += 1 # Calculate trend recent_hours = list(range(6)) # Last 6 hours earlier_hours = list(range(6, 12)) # 6-12 hours ago recent_count = sum(hourly_data.get(h, 0) for h in recent_hours) earlier_count = sum(hourly_data.get(h, 0) for h in earlier_hours) trend = 'stable' if recent_count > earlier_count * 1.2: trend = 'increasing' elif recent_count < earlier_count * 0.8: trend = 'decreasing' return { 'trend': trend, 'hourly_data': dict(hourly_data), 'recent_count': recent_count, 'earlier_count': earlier_count } def _get_performance_metrics(self) -> Dict[str, Any]: """Get dashboard performance metrics""" return { 'alert_processing_time': 0.05, # seconds 'dashboard_load_time': 0.2, # seconds 'api_response_time': 0.1, # seconds 'uptime': 99.9, # percentage 'data_freshness': 30 # seconds } def _calculate_risk_distribution(self, alerts: List[FraudAlert]) -> Dict[str, int]: """Calculate distribution of risk levels""" distribution = defaultdict(int) for alert in alerts: distribution[alert.risk_level] += 1 return dict(distribution) def _calculate_alert_type_distribution(self, alerts: List[FraudAlert]) -> Dict[str, int]: """Calculate distribution of alert types""" distribution = defaultdict(int) for alert in alerts: distribution[alert.alert_type] += 1 return dict(distribution) def _calculate_resolution_rate(self, alerts: List[FraudAlert]) -> float: """Calculate alert resolution rate""" if not alerts: return 0.0 resolved_count = len([alert for alert in alerts if alert.status in ['resolved', 'false_positive']]) return (resolved_count / len(alerts)) * 100 def _calculate_false_positive_rate(self, alerts: List[FraudAlert]) -> float: """Calculate false positive rate""" if not alerts: return 0.0 false_positive_count = len([alert for alert in alerts if alert.status == 'false_positive']) resolved_count = len([alert for alert in alerts if alert.status in ['resolved', 'false_positive']]) if resolved_count == 0: return 0.0 return (false_positive_count / resolved_count) * 100 def _calculate_average_resolution_time(self, alerts: List[FraudAlert]) -> float: """Calculate average time to resolve alerts""" resolution_times = [] for alert in alerts: if alert.status in ['resolved', 'false_positive']: # Find resolution time from history for resolution in self.resolution_history: if resolution['alert_id'] == alert.alert_id: created_time = datetime.datetime.fromisoformat(alert.timestamp) resolved_time = datetime.datetime.fromisoformat(resolution['resolved_at']) resolution_time = (resolved_time - created_time).total_seconds() / 3600 # hours resolution_times.append(resolution_time) break if not resolution_times: return 0.0 return sum(resolution_times) / len(resolution_times) def _identify_top_risk_factors(self, alerts: List[FraudAlert]) -> List[Dict[str, Any]]: """Identify most common risk factors""" # Simulate risk factor analysis risk_factors = [ {'factor': 'high_velocity', 'count': 45, 'percentage': 35.2}, {'factor': 'unusual_behavior', 'count': 38, 'percentage': 29.7}, {'factor': 'network_anomaly', 'count': 28, 'percentage': 21.9}, {'factor': 'transaction_anomaly', 'count': 17, 'percentage': 13.3} ] return risk_factors def _generate_time_series_data(self, alerts: List[FraudAlert], period_days: int) -> List[Dict[str, Any]]: """Generate time series data for analytics""" time_series = [] current_date = datetime.datetime.now() - datetime.timedelta(days=period_days) for day in range(period_days): date = current_date + datetime.timedelta(days=day) date_str = date.strftime('%Y-%m-%d') # Count alerts for this day day_alerts = [ alert for alert in alerts if datetime.datetime.fromisoformat(alert.timestamp).date() == date.date() ] time_series.append({ 'date': date_str, 'alert_count': len(day_alerts), 'high_risk_count': len([alert for alert in day_alerts if alert.risk_level == 'high']), 'critical_risk_count': len([alert for alert in day_alerts if alert.risk_level == 'critical']) }) return time_series # Global dashboard instance fraud_dashboard = FraudDetectionDashboard() def test_fraud_dashboard(): """Test fraud detection dashboard""" print("Testing Fraud Detection Dashboard...") # Add sample alerts for i in range(5): transaction_data = { 'transaction_id': f'txn_{i}', 'user_id': f'user_{i}', 'amount': 100 + (i * 100), 'timestamp': datetime.datetime.now().isoformat() } fraud_prediction = { 'risk_level': ['low', 'medium', 'high', 'critical'][i % 4], 'fraud_probability': 0.2 + (i * 0.2), 'factors': { 'velocity': 0.3 + (i * 0.1), 'behavior': 0.2 + (i * 0.15), 'network': 0.1 + (i * 0.1), 'anomaly': 0.2 + (i * 0.1) }, 'recommendation': 'Review transaction' } alert = fraud_dashboard.add_fraud_alert(transaction_data, fraud_prediction) print(f"Added Alert: {alert.alert_id} - Risk: {alert.risk_level}") # Get dashboard data dashboard_data = fraud_dashboard.get_dashboard_data() print(f"Dashboard Metrics: {dashboard_data['metrics']}") print(f"Recent Alerts: {len(dashboard_data['recent_alerts'])}") print(f"Active Investigations: {len(dashboard_data['active_investigations'])}") # Get analytics analytics = fraud_dashboard.get_fraud_analytics(7) print(f"Analytics: {analytics['total_alerts']} alerts, {analytics['resolution_rate']:.1f}% resolution rate") # Test alert status update if dashboard_data['recent_alerts']: alert_id = dashboard_data['recent_alerts'][0]['alert_id'] success = fraud_dashboard.update_alert_status(alert_id, 'resolved', 'False positive') print(f"Alert Status Update: {success}") return True if __name__ == "__main__": test_fraud_dashboard()