""" Real-time Fraud Detection System with Machine Learning Advanced fraud detection for secure ticketing platform """ import json import hashlib import datetime import math import random from typing import Dict, List, Optional, Any, Union, Tuple from dataclasses import dataclass, asdict from collections import defaultdict, deque import itertools @dataclass class FraudFeature: user_id: str transaction_id: str transaction_type: str amount: float timestamp: str ip_address: str device_fingerprint: str user_behavior_score: float velocity_score: float anomaly_score: float @dataclass class FraudPrediction: transaction_id: str fraud_probability: float risk_level: str factors: Dict[str, float] recommendation: str confidence: float class FraudDetectionML: """Machine learning based fraud detection system""" def __init__(self): self.transaction_history = defaultdict(list) self.user_profiles = {} self.risk_thresholds = { 'low': 0.3, 'medium': 0.6, 'high': 0.8 } self.fraud_patterns = self._initialize_fraud_patterns() self.detection_models = { 'velocity': self._velocity_model, 'behavior': self._behavior_model, 'network': self._network_model, 'anomaly': self._anomaly_model } self.blocked_users = set() self.suspicious_transactions = deque(maxlen=1000) def analyze_transaction(self, transaction_data: Dict[str, Any]) -> FraudPrediction: """Analyze transaction for fraud using ML models""" # Extract features features = self._extract_features(transaction_data) # Run detection models model_scores = {} for model_name, model_func in self.detection_models.items(): model_scores[model_name] = model_func(features) # Calculate overall fraud probability fraud_probability = self._calculate_fraud_probability(model_scores) # Determine risk level risk_level = self._determine_risk_level(fraud_probability) # Generate recommendation recommendation = self._generate_recommendation(risk_level, model_scores) # Calculate confidence confidence = self._calculate_confidence(model_scores, features) # Create prediction prediction = FraudPrediction( transaction_id=features.transaction_id, fraud_probability=fraud_probability, risk_level=risk_level, factors=model_scores, recommendation=recommendation, confidence=confidence ) # Store transaction and update models self._store_transaction(features, prediction) self._update_ml_models(features, prediction) # Take action if high risk if risk_level in ['medium', 'high']: self._handle_suspicious_transaction(prediction) return prediction def _extract_features(self, transaction_data: Dict[str, Any]) -> FraudFeature: """Extract features from transaction data""" user_id = transaction_data['user_id'] transaction_id = transaction_data['transaction_id'] transaction_type = transaction_data.get('type', 'purchase') amount = transaction_data['amount'] timestamp = transaction_data.get('timestamp', datetime.datetime.now().isoformat()) ip_address = transaction_data.get('ip_address', 'unknown') device_fingerprint = transaction_data.get('device_fingerprint', 'unknown') # Calculate behavior score user_behavior_score = self._calculate_user_behavior_score(user_id, transaction_data) # Calculate velocity score velocity_score = self._calculate_velocity_score(user_id, timestamp) # Calculate anomaly score anomaly_score = self._calculate_anomaly_score(transaction_data) return FraudFeature( user_id=user_id, transaction_id=transaction_id, transaction_type=transaction_type, amount=amount, timestamp=timestamp, ip_address=ip_address, device_fingerprint=device_fingerprint, user_behavior_score=user_behavior_score, velocity_score=velocity_score, anomaly_score=anomaly_score ) def _velocity_model(self, features: FraudFeature) -> float: """Velocity-based fraud detection model""" user_id = features.user_id current_time = datetime.datetime.fromisoformat(features.timestamp) # Get recent transactions recent_transactions = [ t for t in self.transaction_history[user_id][-10:] if (current_time - datetime.datetime.fromisoformat(t.timestamp)).total_seconds() < 3600 ] # Calculate velocity metrics transactions_per_hour = len(recent_transactions) total_amount_hour = sum(t.amount for t in recent_transactions) # High velocity indicators high_velocity_score = 0.0 if transactions_per_hour > 5: high_velocity_score += 0.4 elif transactions_per_hour > 3: high_velocity_score += 0.2 if total_amount_hour > 1000: high_velocity_score += 0.4 elif total_amount_hour > 500: high_velocity_score += 0.2 # Check for rapid consecutive transactions if len(recent_transactions) >= 2: time_diffs = [] for i in range(1, len(recent_transactions)): diff = (datetime.datetime.fromisoformat(recent_transactions[i].timestamp) - datetime.datetime.fromisoformat(recent_transactions[i-1].timestamp)).total_seconds() time_diffs.append(diff) if time_diffs and min(time_diffs) < 60: # Less than 1 minute apart high_velocity_score += 0.3 return min(1.0, high_velocity_score) def _behavior_model(self, features: FraudFeature) -> float: """Behavior-based fraud detection model""" user_id = features.user_id if user_id not in self.user_profiles: return 0.5 # New user, medium risk user_profile = self.user_profiles[user_id] historical_transactions = self.transaction_history[user_id] behavior_score = 0.0 # Amount deviation if historical_transactions: avg_amount = sum(t.amount for t in historical_transactions) / len(historical_transactions) amount_deviation = abs(features.amount - avg_amount) / max(avg_amount, 1) if amount_deviation > 2.0: behavior_score += 0.3 elif amount_deviation > 1.0: behavior_score += 0.15 # Time pattern deviation if historical_transactions: current_hour = datetime.datetime.fromisoformat(features.timestamp).hour historical_hours = [datetime.datetime.fromisoformat(t.timestamp).hour for t in historical_transactions] if historical_hours: hour_frequency = defaultdict(int) for hour in historical_hours: hour_frequency[hour] += 1 max_frequency = max(hour_frequency.values()) current_frequency = hour_frequency[current_hour] if max_frequency > 0 and current_frequency == 0: behavior_score += 0.2 # Unusual time # Device fingerprint consistency if user_profile.get('device_fingerprint') and user_profile['device_fingerprint'] != features.device_fingerprint: behavior_score += 0.25 # IP address consistency if user_profile.get('ip_address') and user_profile['ip_address'] != features.ip_address: behavior_score += 0.2 return min(1.0, behavior_score) def _network_model(self, features: FraudFeature) -> float: """Network-based fraud detection model""" network_score = 0.0 # Check for shared devices/IPs across multiple accounts device_users = set() ip_users = set() for user_id, transactions in self.transaction_history.items(): for transaction in transactions[-5:]: # Recent transactions if transaction.device_fingerprint == features.device_fingerprint: device_users.add(user_id) if transaction.ip_address == features.ip_address: ip_users.add(user_id) # Multiple accounts on same device if len(device_users) > 3: network_score += 0.4 elif len(device_users) > 1: network_score += 0.2 # Multiple accounts from same IP if len(ip_users) > 5: network_score += 0.4 elif len(ip_users) > 2: network_score += 0.2 # Known fraud patterns for pattern in self.fraud_patterns: if self._matches_pattern(features, pattern): network_score += pattern['severity'] return min(1.0, network_score) def _anomaly_model(self, features: FraudFeature) -> float: """Anomaly detection model""" anomaly_score = 0.0 # Amount anomalies if features.amount > 10000: # Very high amount anomaly_score += 0.3 elif features.amount < 1.0: # Very low amount (testing) anomaly_score += 0.2 # Transaction type anomalies if features.transaction_type not in ['purchase', 'transfer', 'refund']: anomaly_score += 0.2 # Timestamp anomalies (e.g., business hours only) transaction_hour = datetime.datetime.fromisoformat(features.timestamp).hour if transaction_hour < 6 or transaction_hour > 23: anomaly_score += 0.15 # User behavior anomaly if features.user_behavior_score < 0.3: anomaly_score += 0.25 # Velocity anomaly if features.velocity_score > 0.8: anomaly_score += 0.3 return min(1.0, anomaly_score) def _calculate_fraud_probability(self, model_scores: Dict[str, float]) -> float: """Calculate overall fraud probability from model scores""" # Weighted average of model scores weights = { 'velocity': 0.25, 'behavior': 0.30, 'network': 0.25, 'anomaly': 0.20 } weighted_score = 0.0 for model, score in model_scores.items(): weighted_score += score * weights[model] # Apply non-linear transformation for better separation transformed_score = 1 / (1 + math.exp(-5 * (weighted_score - 0.5))) return round(transformed_score, 3) def _determine_risk_level(self, fraud_probability: float) -> str: """Determine risk level from fraud probability""" if fraud_probability >= self.risk_thresholds['high']: return 'high' elif fraud_probability >= self.risk_thresholds['medium']: return 'medium' else: return 'low' def _generate_recommendation(self, risk_level: str, model_scores: Dict[str, float]) -> str: """Generate fraud detection recommendation""" if risk_level == 'high': return 'BLOCK - Immediate action required' elif risk_level == 'medium': return 'REVIEW - Manual verification needed' else: return 'APPROVE - Transaction appears safe' def _calculate_confidence(self, model_scores: Dict[str, float], features: FraudFeature) -> float: """Calculate confidence score for prediction""" # Data availability user_history = len(self.transaction_history[features.user_id]) data_score = min(1.0, user_history / 50) # Model agreement scores = list(model_scores.values()) variance = sum((x - sum(scores)/len(scores))**2 for x in scores) / len(scores) agreement_score = max(0.0, 1.0 - variance) # Feature completeness feature_completeness = 1.0 # All features are present confidence = (data_score * 0.4) + (agreement_score * 0.4) + (feature_completeness * 0.2) return round(confidence, 3) def _store_transaction(self, features: FraudFeature, prediction: FraudPrediction): """Store transaction and prediction""" self.transaction_history[features.user_id].append(features) # Update user profile if features.user_id not in self.user_profiles: self.user_profiles[features.user_id] = {} self.user_profiles[features.user_id].update({ 'last_transaction': features.timestamp, 'transaction_count': len(self.transaction_history[features.user_id]), 'device_fingerprint': features.device_fingerprint, 'ip_address': features.ip_address, 'avg_amount': sum(t.amount for t in self.transaction_history[features.user_id]) / len(self.transaction_history[features.user_id]) }) def _update_ml_models(self, features: FraudFeature, prediction: FraudPrediction): """Update ML models with new data""" # In a real implementation, this would update model parameters # For this demo, we'll simulate model learning if prediction.risk_level == 'high' and prediction.confidence > 0.8: # Update fraud patterns new_pattern = self._extract_fraud_pattern(features) if new_pattern: self.fraud_patterns.append(new_pattern) def _handle_suspicious_transaction(self, prediction: FraudPrediction): """Handle suspicious transactions""" self.suspicious_transactions.append({ 'transaction_id': prediction.transaction_id, 'risk_level': prediction.risk_level, 'fraud_probability': prediction.fraud_probability, 'timestamp': datetime.datetime.now().isoformat(), 'handled': False }) # Block user if very high risk if prediction.risk_level == 'high' and prediction.fraud_probability > 0.9: # Extract user_id from stored transaction for user_id, transactions in self.transaction_history.items(): for transaction in transactions: if transaction.transaction_id == prediction.transaction_id: self.blocked_users.add(user_id) break def _calculate_user_behavior_score(self, user_id: str, transaction_data: Dict[str, Any]) -> float: """Calculate user behavior score""" if user_id not in self.user_profiles: return 0.5 # New user # Simplified behavior scoring behavior_factors = 0.0 # Account age account_age_days = (datetime.datetime.now() - datetime.datetime.fromisoformat(self.user_profiles[user_id].get('created_at', datetime.datetime.now().isoformat()))).days if account_age_days > 365: behavior_factors += 0.3 elif account_age_days > 30: behavior_factors += 0.2 # Transaction history consistency if len(self.transaction_history[user_id]) > 10: behavior_factors += 0.3 elif len(self.transaction_history[user_id]) > 3: behavior_factors += 0.15 # No previous fraud if user_id not in self.blocked_users: behavior_factors += 0.4 return min(1.0, behavior_factors) def _calculate_velocity_score(self, user_id: str, timestamp: str) -> float: """Calculate transaction velocity score""" current_time = datetime.datetime.fromisoformat(timestamp) recent_transactions = [ t for t in self.transaction_history[user_id][-5:] if (current_time - datetime.datetime.fromisoformat(t.timestamp)).total_seconds() < 3600 ] # Higher velocity = higher score return min(1.0, len(recent_transactions) / 5.0) def _calculate_anomaly_score(self, transaction_data: Dict[str, Any]) -> float: """Calculate anomaly score for transaction""" anomaly_score = 0.0 # Check for unusual patterns if transaction_data.get('amount', 0) > 5000: anomaly_score += 0.2 if transaction_data.get('device_fingerprint') == 'unknown': anomaly_score += 0.1 if transaction_data.get('ip_address') == 'unknown': anomaly_score += 0.1 return min(1.0, anomaly_score) def _initialize_fraud_patterns(self) -> List[Dict[str, Any]]: """Initialize known fraud patterns""" return [ { 'name': 'rapid_purchases', 'description': 'Multiple purchases in short time', 'severity': 0.7, 'conditions': {'transaction_count': 5, 'time_window': 300} # 5 minutes }, { 'name': 'high_amount_testing', 'description': 'Very small amounts followed by large amounts', 'severity': 0.6, 'conditions': {'min_amount': 0.01, 'max_amount': 10000} }, { 'name': 'multiple_devices', 'description': 'Same user using multiple devices', 'severity': 0.5, 'conditions': {'device_count': 3} } ] def _matches_pattern(self, features: FraudFeature, pattern: Dict[str, Any]) -> bool: """Check if transaction matches fraud pattern""" if pattern['name'] == 'rapid_purchases': recent_count = len([ t for t in self.transaction_history[features.user_id] if (datetime.datetime.fromisoformat(features.timestamp) - datetime.datetime.fromisoformat(t.timestamp)).total_seconds() < pattern['conditions']['time_window'] ]) return recent_count >= pattern['conditions']['transaction_count'] elif pattern['name'] == 'high_amount_testing': return features.amount < pattern['conditions']['min_amount'] or features.amount > pattern['conditions']['max_amount'] elif pattern['name'] == 'multiple_devices': devices = set(t.device_fingerprint for t in self.transaction_history[features.user_id][-10:]) return len(devices) >= pattern['conditions']['device_count'] return False def _extract_fraud_pattern(self, features: FraudFeature) -> Optional[Dict[str, Any]]: """Extract new fraud pattern from confirmed fraud""" # Simplified pattern extraction if features.velocity_score > 0.8: return { 'name': f'high_velocity_{datetime.datetime.now().strftime("%Y%m%d")}', 'description': 'High transaction velocity detected', 'severity': 0.6, 'conditions': {'velocity_threshold': features.velocity_score} } return None def get_fraud_analytics(self, days: int = 30) -> Dict[str, Any]: """Get fraud detection analytics""" cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) # Analyze recent transactions total_transactions = 0 blocked_transactions = 0 risk_distribution = defaultdict(int) for user_id, transactions in self.transaction_history.items(): for transaction in transactions: transaction_time = datetime.datetime.fromisoformat(transaction.timestamp) if transaction_time > cutoff_date: total_transactions += 1 # Check if user was blocked if user_id in self.blocked_users: blocked_transactions += 1 risk_distribution['blocked'] += 1 # Suspicious transactions recent_suspicious = [ t for t in self.suspicious_transactions if datetime.datetime.fromisoformat(t['timestamp']) > cutoff_date ] return { 'period_days': days, 'total_transactions': total_transactions, 'blocked_transactions': blocked_transactions, 'block_rate': blocked_transactions / max(total_transactions, 1), 'suspicious_transactions': len(recent_suspicious), 'blocked_users': len(self.blocked_users), 'fraud_patterns': len(self.fraud_patterns), 'risk_distribution': dict(risk_distribution) } def is_user_blocked(self, user_id: str) -> bool: """Check if user is blocked""" return user_id in self.blocked_users def unblock_user(self, user_id: str) -> bool: """Unblock user account""" if user_id in self.blocked_users: self.blocked_users.remove(user_id) return True return False # Global fraud detection instance fraud_detector = FraudDetectionML() def test_fraud_detection(): """Test fraud detection system""" print("Testing Fraud Detection System...") # Test normal transaction normal_transaction = { 'user_id': 'user_001', 'transaction_id': 'txn_001', 'type': 'purchase', 'amount': 150.00, 'timestamp': datetime.datetime.now().isoformat(), 'ip_address': '192.168.1.100', 'device_fingerprint': 'device_001' } prediction1 = fraud_detector.analyze_transaction(normal_transaction) print(f"Normal Transaction Risk: {prediction1.risk_level}") print(f"Fraud Probability: {prediction1.fraud_probability}") # Test suspicious transaction suspicious_transaction = { 'user_id': 'user_002', 'transaction_id': 'txn_002', 'type': 'purchase', 'amount': 15000.00, # High amount 'timestamp': datetime.datetime.now().isoformat(), 'ip_address': '192.168.1.200', 'device_fingerprint': 'device_unknown' } prediction2 = fraud_detector.analyze_transaction(suspicious_transaction) print(f"Suspicious Transaction Risk: {prediction2.risk_level}") print(f"Fraud Probability: {prediction2.fraud_probability}") # Test multiple rapid transactions for i in range(6): rapid_transaction = { 'user_id': 'user_003', 'transaction_id': f'txn_rapid_{i}', 'type': 'purchase', 'amount': 50.00, 'timestamp': datetime.datetime.now().isoformat(), 'ip_address': '192.168.1.300', 'device_fingerprint': 'device_003' } prediction = fraud_detector.analyze_transaction(rapid_transaction) print(f"Rapid Transaction {i+1} Risk: {prediction.risk_level}") # Get analytics analytics = fraud_detector.get_fraud_analytics(7) print(f"Analytics: {analytics}") return True if __name__ == "__main__": test_fraud_detection()