#!/usr/bin/env python3 """ Enhanced Security Validator Hardened security validation with comprehensive private key handling """ import json import hashlib import re import secrets import string import base64 import datetime from typing import Dict, List, Optional, Any, Tuple, Union from dataclasses import dataclass from enum import Enum class SecurityLevel(Enum): """Security validation levels""" CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" INFO = "info" @dataclass class SecurityIssue: """Security issue data structure""" level: SecurityLevel category: str description: str recommendation: str score_impact: float # 0.0 to 100.0 cve_reference: Optional[str] = None @dataclass class PrivateKeyValidation: """Private key validation result""" is_valid: bool key_type: str network_compatible: bool security_issues: List[SecurityIssue] entropy_score: float # 0.0 to 1.0 warnings: List[str] @dataclass class SecurityAuditResult: """Complete security audit result""" overall_score: float # 0.0 to 100.0 critical_issues: List[SecurityIssue] high_issues: List[SecurityIssue] medium_issues: List[SecurityIssue] low_issues: List[SecurityIssue] recommendations: List[str] audit_timestamp: str compliance_status: Dict[str, bool] class HardenedSecurityValidator: """Enterprise-grade security validation""" def __init__(self): # Security validation patterns will be defined as needed self.security_thresholds = { "min_entropy_bits": 128, "max_key_age_hours": 24, "min_password_length": 12, "max_failed_attempts": 3, "session_timeout_minutes": 30 } self.compliance_frameworks = { "nist_800_63": self._check_nist_compliance, "sox_controls": self._check_sox_compliance, "pci_dss": self._check_pci_compliance, "gdpr_data": self._check_gdpr_compliance } def comprehensive_security_audit(self, wallet_data: Dict[str, Any], network_info: Dict[str, Any]) -> SecurityAuditResult: """Perform comprehensive security audit""" all_issues = [] # Validate private keys if "private_keys" in wallet_data: for key_data in wallet_data["private_keys"]: key_validation = self.validate_private_key_hardened( key_data.get("key", ""), key_data.get("network", "mainnet") ) all_issues.extend(key_validation.security_issues) # Check wallet security wallet_issues = self._audit_wallet_security(wallet_data, network_info) all_issues.extend(wallet_issues) # Check network security network_issues = self._audit_network_security(network_info) all_issues.extend(network_issues) # Check compliance compliance_results = {} for framework_name, check_func in self.compliance_frameworks.items(): compliance_results[framework_name] = check_func(wallet_data, network_info) # Categorize issues by severity critical_issues = [i for i in all_issues if i.level == SecurityLevel.CRITICAL] high_issues = [i for i in all_issues if i.level == SecurityLevel.HIGH] medium_issues = [i for i in all_issues if i.level == SecurityLevel.MEDIUM] low_issues = [i for i in all_issues if i.level == SecurityLevel.LOW] # Calculate overall security score score_deduction = sum(issue.score_impact for issue in all_issues) overall_score = max(0, 100.0 - score_deduction) # Generate recommendations recommendations = self._generate_security_recommendations(all_issues) return SecurityAuditResult( overall_score=overall_score, critical_issues=critical_issues, high_issues=high_issues, medium_issues=medium_issues, low_issues=low_issues, recommendations=recommendations, audit_timestamp=datetime.datetime.now().isoformat(), compliance_status=compliance_results ) def validate_private_key_hardened(self, private_key: str, network: str = "mainnet") -> PrivateKeyValidation: """Hardened private key validation with comprehensive checks""" issues = [] warnings = [] # Sanitize input sanitized_key = self._sanitize_input(private_key) if not sanitized_key: return PrivateKeyValidation( is_valid=False, key_type="unknown", network_compatible=False, security_issues=[ SecurityIssue( level=SecurityLevel.CRITICAL, category="key_validation", description="Empty or invalid private key", recommendation="Provide a valid private key", score_impact=100.0 ) ], entropy_score=0.0, warnings=[] ) # Determine key type and validate format key_type, format_issues = self._determine_key_type(sanitized_key) issues.extend(format_issues) # Check entropy strength entropy_score, entropy_issues = self._assess_key_entropy(sanitized_key, key_type) issues.extend(entropy_issues) # Network compatibility check network_compatible, network_issues = self._verify_network_compatibility( sanitized_key, key_type, network ) issues.extend(network_issues) # Exposure risk assessment exposure_issues = self._assess_exposure_risk(sanitized_key) issues.extend(exposure_issues) # Generate warnings if entropy_score < 0.8: warnings.append("Low entropy detected - consider using a stronger key") if len(issues) == 0: validation_result = PrivateKeyValidation( is_valid=True, key_type=key_type, network_compatible=network_compatible, security_issues=[], entropy_score=entropy_score, warnings=warnings ) else: validation_result = PrivateKeyValidation( is_valid=False, key_type=key_type, network_compatible=network_compatible, security_issues=issues, entropy_score=entropy_score, warnings=warnings ) return validation_result def _sanitize_input(self, input_data: str) -> str: """Sanitize user input to prevent injection attacks""" if not input_data: return "" # Remove control characters except newlines and tabs sanitized = ''.join( char for char in input_data if char.isprintable() or char in ['\n', '\t'] ) # Remove excessive whitespace sanitized = re.sub(r'\s+', ' ', sanitized.strip()) return sanitized def _determine_key_type(self, key: str) -> Tuple[str, List[SecurityIssue]]: """Determine key type and validate format""" issues = [] # WIF format check if re.match(r'^[5KL][1-9A-HJ-NP-Za-km-z]{50,51}$', key): return "wif_uncompressed", issues # Compressed WIF format check if re.match(r'^[5KL][1-9A-HJ-NP-Za-km-z]{52}$', key): return "wif_compressed", issues # Hex format check if re.match(r'^[0-9a-fA-F]{64}$', key): return "hex_raw", issues # Mini private key format if len(key) >= 22 and len(key) <= 30 and key.startswith('S'): return "mini_private", issues # BIP38 encrypted format if key.startswith('6P') and len(key) == 58: issues.append(SecurityIssue( level=SecurityLevel.HIGH, category="encryption", description="BIP38 encrypted key detected", recommendation="Ensure you have the passphrase to decrypt this key", score_impact=20.0 )) return "bip38_encrypted", issues issues.append(SecurityIssue( level=SecurityLevel.CRITICAL, category="format_validation", description=f"Unrecognized private key format: {key[:10]}...", recommendation="Provide a valid WIF, hex, or mini private key", score_impact=80.0 )) return "unknown", issues def _assess_key_entropy(self, key: str, key_type: str) -> Tuple[float, List[SecurityIssue]]: """Assess the entropy strength of the private key""" issues = [] if key_type == "unknown": return 0.0, [SecurityIssue( level=SecurityLevel.CRITICAL, category="entropy", description="Cannot assess entropy for unknown key format", recommendation="Use a standard key format with proper entropy", score_impact=90.0 )] # For WIF keys, assess based on randomness indicators if key_type in ["wif_compressed", "wif_uncompressed"]: # Check for common patterns (weak entropy indicators) weak_patterns = [ r'^5{52}$', # All 5s r'^K{52}$', # All Ks r'^L{52}$', # All Ls r'(.)\1{10,}', # Repeated characters r'1234567890', # Sequential digits r'abcdefghij', # Sequential letters ] for pattern in weak_patterns: if re.search(pattern, key, re.IGNORECASE): issues.append(SecurityIssue( level=SecurityLevel.HIGH, category="entropy", description="Weak entropy pattern detected in private key", recommendation="Generate a new private key using a cryptographically secure random generator", score_impact=60.0 )) return 0.2, issues # For hex keys, assess bit randomness if key_type == "hex_raw": # Check for bit patterns if re.search(r'^(0{32}|f{32})$', key.lower()): issues.append(SecurityIssue( level=SecurityLevel.CRITICAL, category="entropy", description="Extremely low entropy detected - all bits identical", recommendation="Generate a new private key with proper randomness", score_impact=95.0 )) return 0.1, issues # If no issues found, assume good entropy return 0.95, issues def _verify_network_compatibility(self, key: str, key_type: str, network: str) -> Tuple[bool, List[SecurityIssue]]: """Verify key compatibility with specified network""" issues = [] # Most private keys are network-agnostic in raw form # Network compatibility is usually handled during address generation if key_type == "wif_uncompressed" or key_type == "wif_compressed": # WIF keys can indicate network through their structure # This is a simplified check - in practice, you'd decode the WIF if network not in ["mainnet", "testnet", "regtest"]: issues.append(SecurityIssue( level=SecurityLevel.MEDIUM, category="network_compatibility", description=f"Unknown network specified: {network}", recommendation="Use mainnet, testnet, or regtest", score_impact=15.0 )) return False, issues # Mini private keys are testnet-specific by convention if key_type == "mini_private" and network == "mainnet": issues.append(SecurityIssue( level=SecurityLevel.HIGH, category="network_compatibility", description="Mini private key format typically used for testnet", recommendation="Use standard WIF format for mainnet", score_impact=30.0 )) return False, issues return True, issues def _assess_exposure_risk(self, key: str) -> List[SecurityIssue]: """Assess potential exposure risks for the private key""" issues = [] # Check if key appears to be a well-known test key known_test_keys = [ "5HueCGU8rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ", # Common test key "5HpHagT65TZzG1PH3CSu63k8DbpvD8s5ip4nEB3kEsreAnchuDf", ] if key in known_test_keys: issues.append(SecurityIssue( level=SecurityLevel.CRITICAL, category="exposure_risk", description="Using a well-known test private key", recommendation="Generate a unique private key for production use", score_impact=85.0 )) # Check for recently exposed keys (this would need external data) # For now, just warn about key reuse if len(key) < 52: # Potentially shortened or manipulated issues.append(SecurityIssue( level=SecurityLevel.HIGH, category="exposure_risk", description="Unusual key length detected", recommendation="Ensure the private key is complete and unmodified", score_impact=40.0 )) return issues def _audit_wallet_security(self, wallet_data: Dict[str, Any], network_info: Dict[str, Any]) -> List[SecurityIssue]: """Audit overall wallet security""" issues = [] # Check for encryption if not wallet_data.get("encrypted", False): issues.append(SecurityIssue( level=SecurityLevel.HIGH, category="wallet_security", description="Wallet is not encrypted", recommendation="Enable wallet encryption with a strong passphrase", score_impact=25.0 )) # Check for backup if not wallet_data.get("has_backup", False): issues.append(SecurityIssue( level=SecurityLevel.MEDIUM, category="wallet_security", description="No wallet backup detected", recommendation="Create and securely store a wallet backup", score_impact=15.0 )) return issues def _audit_network_security(self, network_info: Dict[str, Any]) -> List[SecurityIssue]: """Audit network security configuration""" issues = [] # Check RPC authentication if not network_info.get("rpc_auth_enabled", True): issues.append(SecurityIssue( level=SecurityLevel.CRITICAL, category="network_security", description="RPC authentication is disabled", recommendation="Enable RPC authentication with strong credentials", score_impact=75.0 )) # Check for SSL/TLS if not network_info.get("rpc_ssl_enabled", False): issues.append(SecurityIssue( level=SecurityLevel.HIGH, category="network_security", description="RPC communication is not encrypted", recommendation="Enable SSL/TLS for RPC connections", score_impact=35.0 )) # Check for open ports if network_info.get("rpc_port_open_internet", False): issues.append(SecurityIssue( level=SecurityLevel.CRITICAL, category="network_security", description="RPC port exposed to internet", recommendation="Restrict RPC access to local networks only", score_impact=80.0 )) return issues def _check_nist_compliance(self, wallet_data: Dict[str, Any], network_info: Dict[str, Any]) -> bool: """Check NIST 800-63 compliance""" # Simplified NIST compliance check entropy_ok = True # Would check actual entropy auth_ok = network_info.get("rpc_auth_enabled", False) encryption_ok = wallet_data.get("encrypted", False) return entropy_ok and auth_ok and encryption_ok def _check_sox_compliance(self, wallet_data: Dict[str, Any], network_info: Dict[str, Any]) -> bool: """Check SOX compliance""" # Simplified SOX compliance check has_backup = wallet_data.get("has_backup", False) has_audit_log = network_info.get("audit_logging_enabled", False) return has_backup and has_audit_log def _check_pci_compliance(self, wallet_data: Dict[str, Any], network_info: Dict[str, Any]) -> bool: """Check PCI DSS compliance""" # PCI not typically applicable to Bitcoin wallets return True def _check_gdpr_compliance(self, wallet_data: Dict[str, Any], network_info: Dict[str, Any]) -> bool: """Check GDPR compliance""" # GDPR focuses on personal data - less relevant for pure crypto return True def _generate_security_recommendations(self, issues: List[SecurityIssue]) -> List[str]: """Generate security recommendations based on issues found""" recommendations = [] # Group issues by category categories = {} for issue in issues: if issue.category not in categories: categories[issue.category] = [] categories[issue.category].append(issue) # Generate category-specific recommendations if "encryption" in categories: recommendations.append("Enable comprehensive encryption for all sensitive data") if "network_security" in categories: recommendations.append("Implement network security best practices including authentication and encryption") if "entropy" in categories: recommendations.append("Use cryptographically secure random number generators for key creation") if "exposure_risk" in categories: recommendations.append("Implement proper key management and exposure prevention measures") # General recommendations if len(issues) > 5: recommendations.append("Consider a comprehensive security audit and remediation plan") return recommendations def demo_security_validation(): """Demonstrate security validation capabilities""" print("=== Hardened Security Validation Demo ===\n") validator = HardenedSecurityValidator() # Test cases test_cases = [ { "name": "Valid WIF Key", "key": "5KJvsngHeMpm884wtkJNzQGaCErckhHJBGFsvd3VyK5qMZXj3hS", "network": "mainnet" }, { "name": "Weak Pattern Key", "key": "5555555555555555555555555555555555555555555555555555555555555555", "network": "mainnet" }, { "name": "Invalid Format", "key": "invalid_key_format", "network": "mainnet" }, { "name": "Known Test Key", "key": "5HueCGU8rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ", "network": "testnet" } ] for test_case in test_cases: print(f"--- Testing: {test_case['name']} ---") validation_result = validator.validate_private_key_hardened( test_case["key"], test_case["network"] ) print(f"Valid: {'✅' if validation_result.is_valid else '❌'}") print(f"Key Type: {validation_result.key_type}") print(f"Network Compatible: {'✅' if validation_result.network_compatible else '❌'}") print(f"Entropy Score: {validation_result.entropy_score:.2f}") if validation_result.security_issues: print("Security Issues:") for issue in validation_result.security_issues[:3]: # Show top 3 print(f" {issue.level.value.upper()}: {issue.description}") if validation_result.warnings: print("Warnings:") for warning in validation_result.warnings: print(f" ⚠️ {warning}") print() # Demo comprehensive audit print("--- Comprehensive Security Audit Demo ---") wallet_data = { "private_keys": [ {"key": "5KJvsngHeMpm884wtkJNzQGaCErckhHJBGFsvd3VyK5qMZXj3hS", "network": "mainnet"} ], "encrypted": False, "has_backup": False } network_info = { "rpc_auth_enabled": False, "rpc_ssl_enabled": False, "rpc_port_open_internet": True } audit_result = validator.comprehensive_security_audit(wallet_data, network_info) print(f"Overall Security Score: {audit_result.overall_score:.1f}/100") print(f"Critical Issues: {len(audit_result.critical_issues)}") print(f"High Issues: {len(audit_result.high_issues)}") print(f"Medium Issues: {len(audit_result.medium_issues)}") if audit_result.recommendations: print("\nRecommendations:") for rec in audit_result.recommendations: print(f" 💡 {rec}") if __name__ == "__main__": demo_security_validation()