""" Real-time Fraud Detection System with Machine Learning Author: Starlight Agent Version: 1.0 """ import json import math import datetime import hashlib from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass, asdict from collections import defaultdict @dataclass class TransactionFeature: """Transaction features for fraud detection""" user_id: str transaction_id: str amount: float timestamp: str ip_address: str device_id: str location: str payment_method: str ticket_count: int user_age_days: int previous_transactions: int time_since_last_transaction: int velocity_score: float anomaly_score: float @dataclass class FraudAlert: """Fraud alert structure""" alert_id: str transaction_id: str user_id: str fraud_score: float risk_level: str reasons: List[str] timestamp: str status: str class FraudDetectionModel: """Simplified ML model for fraud detection""" def __init__(self): self.weights = { "amount": 0.15, "velocity": 0.20, "location": 0.15, "device": 0.10, "time_pattern": 0.15, "user_history": 0.15, "payment_method": 0.10 } self.thresholds = { "low": 0.3, "medium": 0.6, "high": 0.8 } def predict_fraud_probability(self, features: TransactionFeature) -> float: """Predict fraud probability using simplified ML model""" # Calculate individual risk scores amount_score = self._calculate_amount_risk(features.amount) velocity_score = self._calculate_velocity_risk(features.velocity_score) location_score = self._calculate_location_risk(features.location) device_score = self._calculate_device_risk(features.device_id, features.user_id) time_score = self._calculate_time_pattern_risk(features.time_since_last_transaction) history_score = self._calculate_user_history_risk(features.previous_transactions, features.user_age_days) payment_score = self._calculate_payment_method_risk(features.payment_method) # Weighted combination fraud_score = ( amount_score * self.weights["amount"] + velocity_score * self.weights["velocity"] + location_score * self.weights["location"] + device_score * self.weights["device"] + time_score * self.weights["time_pattern"] + history_score * self.weights["user_history"] + payment_score * self.weights["payment_method"] ) return min(1.0, fraud_score) def _calculate_amount_risk(self, amount: float) -> float: """Calculate amount-based risk score""" if amount > 1000: return 0.8 elif amount > 500: return 0.5 elif amount > 200: return 0.3 else: return 0.1 def _calculate_velocity_risk(self, velocity_score: float) -> float: """Calculate velocity-based risk score""" return min(1.0, velocity_score / 100) # Normalize velocity score def _calculate_location_risk(self, location: str) -> float: """Calculate location-based risk score""" high_risk_countries = ["XX", "YY", "ZZ"] # Placeholder if any(loc in location for loc in high_risk_countries): return 0.7 return 0.2 def _calculate_device_risk(self, device_id: str, user_id: str) -> float: """Calculate device-based risk score""" # Simplified device risk calculation if len(device_id) < 10: return 0.6 # Suspicious device ID return 0.1 def _calculate_time_pattern_risk(self, time_since_last: int) -> float: """Calculate time pattern risk score""" if time_since_last < 60: # Less than 1 minute return 0.9 elif time_since_last < 300: # Less than 5 minutes return 0.6 elif time_since_last < 3600: # Less than 1 hour return 0.3 else: return 0.1 def _calculate_user_history_risk(self, prev_transactions: int, user_age_days: int) -> float: """Calculate user history-based risk score""" if prev_transactions == 0 and user_age_days < 7: return 0.8 # New user with no history elif prev_transactions < 5: return 0.4 elif prev_transactions < 20: return 0.2 else: return 0.05 def _calculate_payment_method_risk(self, payment_method: str) -> float: """Calculate payment method risk score""" high_risk_methods = ["crypto", "prepaid", "gift_card"] if payment_method.lower() in high_risk_methods: return 0.6 return 0.2 class RealTimeFraudDetector: """Real-time fraud detection system""" def __init__(self): self.model = FraudDetectionModel() self.user_profiles = defaultdict(dict) self.transaction_history = defaultdict(list) self.active_alerts = {} self.blocked_users = set() self.suspicious_patterns = {} def analyze_transaction(self, features: TransactionFeature) -> Dict[str, Any]: """Analyze transaction for fraud in real-time""" # Update user profile self._update_user_profile(features) # Calculate fraud probability fraud_score = self.model.predict_fraud_probability(features) # Determine risk level risk_level = self._determine_risk_level(fraud_score) # Generate fraud reasons reasons = self._generate_fraud_reasons(features, fraud_score) # Create alert if high risk alert = None if risk_level in ["medium", "high"]: alert = self._create_fraud_alert(features, fraud_score, risk_level, reasons) # Apply risk-based actions action = self._determine_action(risk_level, fraud_score) # Update detection statistics self._update_detection_stats(features.user_id, fraud_score, risk_level) return { "transaction_id": features.transaction_id, "user_id": features.user_id, "fraud_score": round(fraud_score, 4), "risk_level": risk_level, "action": action, "reasons": reasons, "alert_id": alert.alert_id if alert else None, "timestamp": datetime.datetime.now().isoformat(), "recommendations": self._generate_recommendations(risk_level, features) } def detect_suspicious_patterns(self, user_id: str) -> List[Dict[str, Any]]: """Detect suspicious patterns in user behavior""" patterns = [] user_transactions = self.transaction_history[user_id] if len(user_transactions) < 3: return patterns # Pattern 1: Rapid consecutive transactions recent_transactions = [t for t in user_transactions if (datetime.datetime.now() - datetime.datetime.fromisoformat(t['timestamp'])).seconds < 3600] if len(recent_transactions) > 5: patterns.append({ "pattern_type": "rapid_transactions", "severity": "high", "description": f"{len(recent_transactions)} transactions in last hour", "transaction_ids": [t['transaction_id'] for t in recent_transactions] }) # Pattern 2: Amount escalation amounts = [t['amount'] for t in recent_transactions[-5:]] if len(amounts) >= 3 and all(amounts[i] < amounts[i+1] for i in range(len(amounts)-1)): patterns.append({ "pattern_type": "amount_escalation", "severity": "medium", "description": "Increasing transaction amounts detected", "amounts": amounts }) # Pattern 3: Location hopping locations = [t['location'] for t in recent_transactions[-10:]] unique_locations = len(set(locations)) if unique_locations > 3: patterns.append({ "pattern_type": "location_hopping", "severity": "high", "description": f"Transactions from {unique_locations} different locations", "locations": locations }) return patterns def update_fraud_patterns(self, feedback: Dict[str, Any]) -> Dict[str, Any]: """Update fraud detection model based on feedback""" transaction_id = feedback.get("transaction_id") actual_fraud = feedback.get("actual_fraud", False) # Find the transaction transaction = None for user_trans in self.transaction_history.values(): for trans in user_trans: if trans['transaction_id'] == transaction_id: transaction = trans break if transaction: break if not transaction: return {"error": "Transaction not found"} # Update model weights (simplified) predicted_score = transaction.get('fraud_score', 0) if actual_fraud and predicted_score < 0.5: # False negative - increase sensitivity self._increase_model_sensitivity() elif not actual_fraud and predicted_score > 0.5: # False positive - decrease sensitivity self._decrease_model_sensitivity() return { "transaction_id": transaction_id, "model_updated": True, "new_weights": self.model.weights, "feedback_processed": True } def _update_user_profile(self, features: TransactionFeature): """Update user profile with new transaction""" user_id = features.user_id # Update basic profile if user_id not in self.user_profiles: self.user_profiles[user_id] = { "first_transaction": features.timestamp, "total_transactions": 0, "total_amount": 0.0, "average_amount": 0.0, "locations": set(), "devices": set(), "payment_methods": set() } profile = self.user_profiles[user_id] profile["total_transactions"] += 1 profile["total_amount"] += features.amount profile["average_amount"] = profile["total_amount"] / profile["total_transactions"] profile["locations"].add(features.location) profile["devices"].add(features.device_id) profile["payment_methods"].add(features.payment_method) # Store transaction self.transaction_history[user_id].append({ "transaction_id": features.transaction_id, "amount": features.amount, "timestamp": features.timestamp, "location": features.location, "device_id": features.device_id, "payment_method": features.payment_method }) # Keep only last 100 transactions if len(self.transaction_history[user_id]) > 100: self.transaction_history[user_id] = self.transaction_history[user_id][-100:] def _determine_risk_level(self, fraud_score: float) -> str: """Determine risk level based on fraud score""" if fraud_score >= self.model.thresholds["high"]: return "high" elif fraud_score >= self.model.thresholds["medium"]: return "medium" elif fraud_score >= self.model.thresholds["low"]: return "low" else: return "minimal" def _generate_fraud_reasons(self, features: TransactionFeature, fraud_score: float) -> List[str]: """Generate reasons for fraud detection""" reasons = [] if features.amount > 500: reasons.append("High transaction amount") if features.velocity_score > 50: reasons.append("High transaction velocity") if features.time_since_last_transaction < 300: reasons.append("Rapid consecutive transactions") if features.previous_transactions < 5: reasons.append("New user with limited history") if features.user_age_days < 7: reasons.append("Recently created account") if len(self.user_profiles[features.user_id].get("locations", set())) > 3: reasons.append("Multiple geographic locations") return reasons def _create_fraud_alert(self, features: TransactionFeature, fraud_score: float, risk_level: str, reasons: List[str]) -> FraudAlert: """Create fraud alert""" alert_id = hashlib.sha256(f"{features.transaction_id}{datetime.datetime.now().isoformat()}".encode()).hexdigest()[:16] alert = FraudAlert( alert_id=alert_id, transaction_id=features.transaction_id, user_id=features.user_id, fraud_score=fraud_score, risk_level=risk_level, reasons=reasons, timestamp=datetime.datetime.now().isoformat(), status="active" ) self.active_alerts[alert_id] = alert return alert def _determine_action(self, risk_level: str, fraud_score: float) -> str: """Determine action based on risk level""" if risk_level == "high": return "block_transaction" elif risk_level == "medium": return "manual_review" elif risk_level == "low": return "additional_verification" else: return "approve" def _update_detection_stats(self, user_id: str, fraud_score: float, risk_level: str): """Update detection statistics""" if user_id not in self.suspicious_patterns: self.suspicious_patterns[user_id] = { "high_risk_count": 0, "total_risk_score": 0.0, "last_updated": datetime.datetime.now().isoformat() } patterns = self.suspicious_patterns[user_id] patterns["high_risk_count"] += 1 if risk_level == "high" else 0 patterns["total_risk_score"] += fraud_score patterns["last_updated"] = datetime.datetime.now().isoformat() def _generate_recommendations(self, risk_level: str, features: TransactionFeature) -> List[str]: """Generate recommendations based on risk level""" recommendations = [] if risk_level == "high": recommendations.append("Block transaction immediately") recommendations.append("Flag user account for review") recommendations.append("Require additional ID verification") elif risk_level == "medium": recommendations.append("Request additional payment verification") recommendations.append("Limit transaction amount") recommendations.append("Monitor future transactions closely") elif risk_level == "low": recommendations.append("Proceed with standard verification") recommendations.append("Log for pattern analysis") return recommendations def _increase_model_sensitivity(self): """Increase model sensitivity for fraud detection""" for key in self.model.weights: self.model.weights[key] *= 1.1 def _decrease_model_sensitivity(self): """Decrease model sensitivity for fraud detection""" for key in self.model.weights: self.model.weights[key] *= 0.9 def test_fraud_detection(): """Test fraud detection system""" detector = RealTimeFraudDetector() # Create test transaction with high fraud risk features = TransactionFeature( user_id="user123", transaction_id="tx123", amount=800.0, timestamp=datetime.datetime.now().isoformat(), ip_address="192.168.1.1", device_id="device123", location="US", payment_method="credit_card", ticket_count=5, user_age_days=2, previous_transactions=1, time_since_last_transaction=30, velocity_score=75.0, anomaly_score=0.8 ) # Analyze transaction result = detector.analyze_transaction(features) # Verify results assert result["fraud_score"] > 0 assert "risk_level" in result assert "action" in result assert "reasons" in result # Test suspicious patterns patterns = detector.detect_suspicious_patterns(features.user_id) assert isinstance(patterns, list) # Test model update feedback = {"transaction_id": "tx123", "actual_fraud": True} update_result = detector.update_fraud_patterns(feedback) assert update_result["model_updated"] == True print("✅ Fraud detection system working correctly") return result if __name__ == "__main__": result = test_fraud_detection() print(f"Fraud detection completed with score: {result['fraud_score']:.4f} ({result['risk_level']} risk)")