#!/usr/bin/env python3 """ Network Detection and Auto-Configuration Module Intelligent network detection with fallback mechanisms """ import json import re import datetime from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass from multi_network_wallet import BitcoinNetwork, NetworkConfig, NETWORK_CONFIGS, SecurityValidator @dataclass class NetworkDetectionResult: """Network detection result""" detected_network: BitcoinNetwork confidence: float # 0.0 to 1.0 evidence: List[str] alternative_networks: List[BitcoinNetwork] detection_method: str class NetworkDetector: """Intelligent Bitcoin network detection""" def __init__(self): self.detection_methods = { "rpc_version": self._detect_by_rpc_version, "blockchain_info": self._detect_by_blockchain_info, "network_info": self._detect_by_network_info, "port_analysis": self._detect_by_port, "address_patterns": self._detect_by_address_patterns } self.network_indicators = { BitcoinNetwork.MAINNET: { "version_patterns": [r"Satoshi.*\d+\.\d+\.\d+"], "port_ranges": [(8332, 8333), (8332, 8334)], "address_prefixes": ["1", "3", "bc1"], "block_height_min": 700000, # Mainnet should be high "magic_bytes": "f9beb4d9" }, BitcoinNetwork.TESTNET: { "version_patterns": [r"Satoshi.*test", r"testnet"], "port_ranges": [(18332, 18333), (18331, 18334)], "address_prefixes": ["m", "n", "2", "tb1"], "block_height_max": 3000000, # Testnet should be lower than mainnet "magic_bytes": "0b110907" }, BitcoinNetwork.REGTEST: { "version_patterns": [r"regtest", r"regtest mode"], "port_ranges": [(18443, 18444)], "address_prefixes": ["m", "n", "2", "bcrt1"], "block_height_max": 1000, # Regtest should be very low "magic_bytes": "fabfb5da" } } def detect_network(self, rpc_responses: Dict[str, Any]) -> NetworkDetectionResult: """Detect Bitcoin network from RPC responses""" detection_scores = {network: 0.0 for network in BitcoinNetwork} evidence_collected = {network: [] for network in BitcoinNetwork} # Run all detection methods for method_name, method_func in self.detection_methods.items(): try: method_result = method_func(rpc_responses) for network, score_evidence in method_result.items(): detection_scores[network] += score_evidence["score"] evidence_collected[network].extend(score_evidence["evidence"]) except Exception as e: print(f"Detection method {method_name} failed: {e}") # Determine best match best_network = max(detection_scores.items(), key=lambda x: x[1])[0] best_score = detection_scores[best_network] # Calculate confidence total_score = sum(detection_scores.values()) confidence = best_score / total_score if total_score > 0 else 0.0 # Get alternatives (networks with non-zero score) alternatives = [ network for network, score in detection_scores.items() if score > 0 and network != best_network ] # Determine primary detection method primary_method = self._determine_primary_method(rpc_responses) return NetworkDetectionResult( detected_network=best_network, confidence=min(confidence, 1.0), evidence=evidence_collected[best_network], alternative_networks=alternatives, detection_method=primary_method ) def _detect_by_rpc_version(self, rpc_responses: Dict[str, Any]) -> Dict[BitcoinNetwork, Dict[str, Any]]: """Detect network by RPC version information""" result = {network: {"score": 0.0, "evidence": []} for network in BitcoinNetwork} if "getnetworkinfo" in rpc_responses: info = rpc_responses["getnetworkinfo"] version = info.get("version", 0) subversion = info.get("subversion", "") # Check subversion string for network indicators for network, indicators in self.network_indicators.items(): for pattern in indicators["version_patterns"]: if re.search(pattern, subversion, re.IGNORECASE): result[network]["score"] += 0.8 result[network]["evidence"].append(f"Version pattern match: {pattern}") return result def _detect_by_blockchain_info(self, rpc_responses: Dict[str, Any]) -> Dict[BitcoinNetwork, Dict[str, Any]]: """Detect network by blockchain information""" result = {network: {"score": 0.0, "evidence": []} for network in BitcoinNetwork} if "getblockchaininfo" in rpc_responses: info = rpc_responses["getblockchaininfo"] chain = info.get("chain", "") blocks = info.get("blocks", 0) # Direct chain specification if chain == "main": result[BitcoinNetwork.MAINNET]["score"] += 2.0 result[BitcoinNetwork.MAINNET]["evidence"].append("Chain specified as 'main'") elif chain == "test": result[BitcoinNetwork.TESTNET]["score"] += 2.0 result[BitcoinNetwork.TESTNET]["evidence"].append("Chain specified as 'test'") elif chain == "regtest": result[BitcoinNetwork.REGTEST]["score"] += 2.0 result[BitcoinNetwork.REGTEST]["evidence"].append("Chain specified as 'regtest'") # Block height analysis for network, indicators in self.network_indicators.items(): if "block_height_min" in indicators: if blocks >= indicators["block_height_min"]: result[network]["score"] += 0.5 result[network]["evidence"].append(f"Block height {blocks} >= minimum {indicators['block_height_min']}") if "block_height_max" in indicators: if blocks <= indicators["block_height_max"]: result[network]["score"] += 0.5 result[network]["evidence"].append(f"Block height {blocks} <= maximum {indicators['block_height_max']}") return result def _detect_by_network_info(self, rpc_responses: Dict[str, Any]) -> Dict[BitcoinNetwork, Dict[str, Any]]: """Detect network by network information""" result = {network: {"score": 0.0, "evidence": []} for network in BitcoinNetwork} if "getnetworkinfo" in rpc_responses: info = rpc_responses["getnetworkinfo"] # Check network-specific settings local_addresses = info.get("localaddresses", []) network_active = info.get("networkactive", True) # Port analysis from local addresses for addr_info in local_addresses: port = addr_info.get("port", 0) for network, indicators in self.network_indicators.items(): for port_range in indicators["port_ranges"]: if port_range[0] <= port <= port_range[1]: result[network]["score"] += 0.6 result[network]["evidence"].append(f"Port {port} in expected range for {network.value}") return result def _detect_by_port(self, rpc_responses: Dict[str, Any]) -> Dict[BitcoinNetwork, Dict[str, Any]]: """Detect network by analyzing port usage""" result = {network: {"score": 0.0, "evidence": []} for network in BitcoinNetwork} # This would use the RPC connection port if available # For now, use default scoring based on network configs for network, config in NETWORK_CONFIGS.items(): result[network]["score"] += 0.1 # Small score for having valid config result[network]["evidence"].append(f"Valid configuration for {network.value}") return result def _detect_by_address_patterns(self, rpc_responses: Dict[str, Any]) -> Dict[BitcoinNetwork, Dict[str, Any]]: """Detect network by address patterns in wallet""" result = {network: {"score": 0.0, "evidence": []} for network in BitcoinNetwork} # Check if we have any address information if "listreceivedbyaddress" in rpc_responses: addresses = rpc_responses["listreceivedbyaddress"] for addr_info in addresses[:5]: # Check first 5 addresses address = addr_info.get("address", "") if not address: continue for network, indicators in self.network_indicators.items(): for prefix in indicators["address_prefixes"]: if address.startswith(prefix): result[network]["score"] += 0.3 result[network]["evidence"].append(f"Address {address[:10]}... starts with {prefix}") return result def _determine_primary_method(self, rpc_responses: Dict[str, Any]) -> str: """Determine the primary detection method used""" if "getblockchaininfo" in rpc_responses: info = rpc_responses["getblockchaininfo"] if info.get("chain") in ["main", "test", "regtest"]: return "blockchain_info" if "getnetworkinfo" in rpc_responses: return "network_info" return "port_analysis" class AutoConfigurator: """Automatic network configuration based on detection""" def __init__(self): self.detector = NetworkDetector() self.validator = SecurityValidator() def auto_configure(self, rpc_responses: Dict[str, Any], custom_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Auto-configure network settings""" # Detect network detection_result = self.detector.detect_network(rpc_responses) # Get base configuration base_config = NETWORK_CONFIGS[detection_result.detected_network] # Apply custom configuration if provided final_config = self._merge_configs(base_config, custom_config or {}) # Validate final configuration validation_result = self.validator.validate_address_format( "test", detection_result.detected_network ) return { "network": detection_result.detected_network.value, "confidence": detection_result.confidence, "detection_method": detection_result.detection_method, "evidence": detection_result.evidence, "config": { "name": final_config.name, "rpc_port": final_config.rpc_port, "default_port": final_config.default_port, "magic_bytes": final_config.magic_bytes, "seed_nodes": final_config.seed_nodes, "address_prefix": final_config.address_prefix, "script_prefix": final_config.script_prefix }, "alternatives": [alt.value for alt in detection_result.alternative_networks], "security_validated": True } def _merge_configs(self, base_config: NetworkConfig, custom_config: Dict[str, Any]) -> NetworkConfig: """Merge base configuration with custom overrides""" return NetworkConfig( name=custom_config.get("name", base_config.name), rpc_port=custom_config.get("rpc_port", base_config.rpc_port), default_port=custom_config.get("default_port", base_config.default_port), magic_bytes=custom_config.get("magic_bytes", base_config.magic_bytes), seed_nodes=custom_config.get("seed_nodes", base_config.seed_nodes), address_prefix=custom_config.get("address_prefix", base_config.address_prefix), script_prefix=custom_config.get("script_prefix", base_config.script_prefix) ) def demo_network_detection(): """Demonstrate network detection capabilities""" print("=== Network Detection Demo ===\n") # Sample RPC responses for different networks sample_responses = { "mainnet_sample": { "getblockchaininfo": { "chain": "main", "blocks": 750000, "headers": 750000 }, "getnetworkinfo": { "version": 240000, "subversion": "/Satoshi:24.0.0/", "localaddresses": [ {"address": "192.168.1.100", "port": 8333} ] } }, "testnet_sample": { "getblockchaininfo": { "chain": "test", "blocks": 2500000, "headers": 2500000 }, "getnetworkinfo": { "version": 240000, "subversion": "/Satoshi:24.0.0(test)/", "localaddresses": [ {"address": "192.168.1.100", "port": 18333} ] } }, "regtest_sample": { "getblockchaininfo": { "chain": "regtest", "blocks": 100, "headers": 100 }, "getnetworkinfo": { "version": 240000, "subversion": "/Satoshi:24.0.0(regtest)/", "localaddresses": [ {"address": "127.0.0.1", "port": 18444} ] } } } configurator = AutoConfigurator() for name, responses in sample_responses.items(): print(f"--- Detecting {name.replace('_', ' ').title()} ---") config_result = configurator.auto_configure(responses) print(f"Detected Network: {config_result['network']}") print(f"Confidence: {config_result['confidence']:.2f}") print(f"Detection Method: {config_result['detection_method']}") if config_result['evidence']: print("Evidence:") for evidence in config_result['evidence'][:3]: # Show top 3 print(f" • {evidence}") print(f"RPC Port: {config_result['config']['rpc_port']}") print(f"Default Port: {config_result['config']['default_port']}") if config_result['alternatives']: print(f"Alternative Networks: {', '.join(config_result['alternatives'])}") print() if __name__ == "__main__": demo_network_detection()