#!/usr/bin/env python3 """ Intelligent Configuration Manager Advanced network configuration with intelligent defaults and adaptive management """ import json import os import re import datetime from typing import Dict, List, Optional, Any, Union, Callable from dataclasses import dataclass, asdict from pathlib import Path from multi_network_wallet import BitcoinNetwork, NetworkConfig, NETWORK_CONFIGS from network_detection import AutoConfigurator, NetworkDetectionResult @dataclass class ConfigurationProfile: """Configuration profile for different use cases""" name: str description: str network: str # Changed to string for JSON serialization priority_security: bool performance_optimized: bool development_mode: bool custom_overrides: Dict[str, Any] @dataclass class ConfigurationHistory: """Configuration change tracking""" timestamp: str change_type: str previous_config: Dict[str, Any] new_config: Dict[str, Any] reason: str auto_applied: bool @dataclass class IntelligentConfig: """Enhanced configuration with metadata""" base_config: NetworkConfig profile: ConfigurationProfile auto_detected: bool confidence_score: float security_level: str performance_tier: str last_updated: str change_history: List[ConfigurationHistory] class IntelligentConfigurationManager: """Advanced configuration management with intelligent defaults""" def __init__(self, config_dir: str = "./configs"): self.config_dir = Path(config_dir) self.config_dir.mkdir(exist_ok=True) self.auto_configurator = AutoConfigurator() self.profiles = self._initialize_profiles() self.adaptive_rules = self._initialize_adaptive_rules() # Configuration state self.current_config: Optional[IntelligentConfig] = None self.fallback_configs = self._load_fallback_configs() self.performance_metrics = [] def create_intelligent_configuration(self, rpc_responses: Optional[Dict[str, Any]] = None, profile_name: Optional[str] = None, custom_settings: Optional[Dict[str, Any]] = None) -> IntelligentConfig: """Create intelligent configuration with adaptive defaults""" # Step 1: Auto-detect network if RPC responses provided detection_result = None if rpc_responses: auto_config = self.auto_configurator.auto_configure(rpc_responses, custom_settings) network = BitcoinNetwork(auto_config["network"]) confidence = auto_config["confidence"] detection_result = auto_config else: # Default to mainnet with low confidence network = BitcoinNetwork.MAINNET confidence = 0.3 # Step 2: Select configuration profile profile = self._select_profile(profile_name, network) # Step 3: Apply intelligent defaults based on profile and detection base_config = self._apply_intelligent_defaults(network, profile, detection_result) # Step 4: Apply custom overrides if custom_settings: base_config = self._merge_custom_settings(base_config, custom_settings) # Step 5: Determine security and performance levels security_level = self._calculate_security_level(base_config, profile) performance_tier = self._calculate_performance_tier(base_config, profile) # Step 6: Create intelligent config object intelligent_config = IntelligentConfig( base_config=base_config, profile=profile, auto_detected=detection_result is not None, confidence_score=confidence, security_level=security_level, performance_tier=performance_tier, last_updated=datetime.datetime.now().isoformat(), change_history=[] ) # Step 7: Store current config self.current_config = intelligent_config self._save_configuration(intelligent_config) return intelligent_config def adaptive_optimization(self, performance_data: Dict[str, Any]) -> IntelligentConfig: """Adaptively optimize configuration based on performance data""" if not self.current_config: raise ValueError("No current configuration to optimize") # Analyze performance data recommendations = self._analyze_performance(performance_data) # Apply adaptive rules optimized_config = self._apply_adaptive_rules( self.current_config.base_config, recommendations ) # Create new configuration with history history_entry = ConfigurationHistory( timestamp=datetime.datetime.now().isoformat(), change_type="adaptive_optimization", previous_config=asdict(self.current_config.base_config), new_config=asdict(optimized_config), reason="Performance-based adaptive optimization", auto_applied=True ) # Update current config new_intelligent_config = IntelligentConfig( base_config=optimized_config, profile=self.current_config.profile, auto_detected=self.current_config.auto_detected, confidence_score=self.current_config.confidence_score, security_level=self._calculate_security_level(optimized_config, self.current_config.profile), performance_tier=self._calculate_performance_tier(optimized_config, self.current_config.profile), last_updated=datetime.datetime.now().isoformat(), change_history=self.current_config.change_history + [history_entry] ) self.current_config = new_intelligent_config self._save_configuration(new_intelligent_config) return new_intelligent_config def rollback_configuration(self, steps: int = 1) -> IntelligentConfig: """Rollback configuration by specified number of changes""" if not self.current_config or not self.current_config.change_history: raise ValueError("No configuration history available for rollback") if steps > len(self.current_config.change_history): steps = len(self.current_config.change_history) # Get the configuration to rollback to target_history = self.current_config.change_history[:-steps] if steps > 0 else [] # Restore the configuration from the history if target_history: restored_config_dict = target_history[-1].new_config else: # Rollback to initial configuration restored_config_dict = self.current_config.change_history[0].previous_config # Recreate NetworkConfig from dictionary restored_config = NetworkConfig( name=restored_config_dict["name"], rpc_port=restored_config_dict["rpc_port"], default_port=restored_config_dict["default_port"], magic_bytes=restored_config_dict["magic_bytes"], seed_nodes=restored_config_dict["seed_nodes"], address_prefix=restored_config_dict["address_prefix"], script_prefix=restored_config_dict["script_prefix"] ) # Create rollback entry rollback_entry = ConfigurationHistory( timestamp=datetime.datetime.now().isoformat(), change_type="rollback", previous_config=asdict(self.current_config.base_config), new_config=asdict(restored_config), reason=f"Rollback {steps} steps", auto_applied=True ) # Update current config rolled_back_config = IntelligentConfig( base_config=restored_config, profile=self.current_config.profile, auto_detected=self.current_config.auto_detected, confidence_score=self.current_config.confidence_score, security_level=self._calculate_security_level(restored_config, self.current_config.profile), performance_tier=self._calculate_performance_tier(restored_config, self.current_config.profile), last_updated=datetime.datetime.now().isoformat(), change_history=target_history + [rollback_entry] ) self.current_config = rolled_back_config self._save_configuration(rolled_back_config) return rolled_back_config def get_configuration_recommendations(self) -> List[str]: """Get intelligent configuration recommendations""" if not self.current_config: return ["Create a configuration first to get recommendations"] recommendations = [] # Security recommendations if self.current_config.security_level == "low": recommendations.append("Consider increasing security settings for production use") # Performance recommendations if self.current_config.performance_tier == "basic": recommendations.append("Performance settings could be optimized for better throughput") # Network-specific recommendations if self.current_config.auto_detected and self.current_config.confidence_score < 0.7: recommendations.append("Low confidence in network detection - consider manual verification") # Configuration age recommendations config_age = self._get_configuration_age() if config_age > 30: # days recommendations.append("Configuration is over 30 days old - consider reviewing and updating") return recommendations def _initialize_profiles(self) -> Dict[str, ConfigurationProfile]: """Initialize predefined configuration profiles""" return { "production_high_security": ConfigurationProfile( name="production_high_security", description="Production environment with maximum security", network="mainnet", priority_security=True, performance_optimized=False, development_mode=False, custom_overrides={ "rpc_timeout": 30, "max_connections": 8, "enable_whitelist": True, "log_level": "warn" } ), "development": ConfigurationProfile( name="development", description="Development environment with debugging enabled", network="regtest", priority_security=False, performance_optimized=False, development_mode=True, custom_overrides={ "rpc_timeout": 60, "max_connections": 20, "enable_whitelist": False, "log_level": "debug" } ), "testing": ConfigurationProfile( name="testing", description="Testing environment with balanced settings", network="testnet", priority_security=False, performance_optimized=True, development_mode=False, custom_overrides={ "rpc_timeout": 15, "max_connections": 15, "enable_whitelist": False, "log_level": "info" } ), "production_balanced": ConfigurationProfile( name="production_balanced", description="Production with balanced security and performance", network="mainnet", priority_security=True, performance_optimized=True, development_mode=False, custom_overrides={ "rpc_timeout": 20, "max_connections": 12, "enable_whitelist": True, "log_level": "info" } ) } def _initialize_adaptive_rules(self) -> List[Callable]: """Initialize adaptive optimization rules""" return [ self._rule_connection_timeout, self._rule_connection_pool_size, self._rule_retry_mechanism, self._rule_memory_usage ] def _select_profile(self, profile_name: Optional[str], network: BitcoinNetwork) -> ConfigurationProfile: """Select appropriate configuration profile""" if profile_name and profile_name in self.profiles: profile = self.profiles[profile_name] # Update network to string for compatibility profile.network = network.value return profile # Auto-select based on network if network == BitcoinNetwork.MAINNET: profile = self.profiles["production_balanced"] profile.network = network.value return profile elif network == BitcoinNetwork.TESTNET: profile = self.profiles["testing"] profile.network = network.value return profile elif network == BitcoinNetwork.REGTEST: profile = self.profiles["development"] profile.network = network.value return profile # Default fallback profile = self.profiles["production_balanced"] profile.network = network.value return profile def _apply_intelligent_defaults(self, network: BitcoinNetwork, profile: ConfigurationProfile, detection_result: Optional[Dict[str, Any]]) -> NetworkConfig: """Apply intelligent defaults based on network and profile""" base_config = NETWORK_CONFIGS[network] # Apply profile-specific overrides if profile.custom_overrides: # Create a mutable copy of the config config_dict = asdict(base_config) # Apply profile overrides for key, value in profile.custom_overrides.items(): if hasattr(base_config, key): config_dict[key] = value # Reconstruct NetworkConfig base_config = NetworkConfig(**config_dict) # Apply detection-based adjustments if detection_result: if detection_result.get("confidence", 0) > 0.8: # High confidence - use detected settings detected_config = detection_result.get("config", {}) for key, value in detected_config.items(): if hasattr(base_config, key): setattr(base_config, key, value) return base_config def _merge_custom_settings(self, base_config: NetworkConfig, custom_settings: Dict[str, Any]) -> NetworkConfig: """Merge custom settings with base configuration""" config_dict = asdict(base_config) for key, value in custom_settings.items(): if key in config_dict: config_dict[key] = value return NetworkConfig(**config_dict) def _calculate_security_level(self, config: NetworkConfig, profile: ConfigurationProfile) -> str: """Calculate security level based on configuration""" security_score = 0 # Network-based security if config.rpc_port >= 8332: # Standard port ranges security_score += 20 # Profile-based security if profile.priority_security: security_score += 40 if not profile.development_mode: security_score += 20 # Port-specific security if config.rpc_port > 1024: # Non-privileged port security_score += 10 # Seed node security if config.seed_nodes and len(config.seed_nodes) >= 3: security_score += 10 if security_score >= 80: return "high" elif security_score >= 60: return "medium" elif security_score >= 40: return "low" else: return "minimal" def _calculate_performance_tier(self, config: NetworkConfig, profile: ConfigurationProfile) -> str: """Calculate performance tier based on configuration""" performance_score = 0 # Profile-based performance if profile.performance_optimized: performance_score += 40 # Network-based performance considerations if config.rpc_port not in [8332, 18332, 18443]: # Non-standard port performance_score += 10 # Seed node count affects performance if not config.seed_nodes or len(config.seed_nodes) <= 2: performance_score += 20 # Development mode affects performance if not profile.development_mode: performance_score += 30 if performance_score >= 80: return "high" elif performance_score >= 60: return "medium" elif performance_score >= 40: return "basic" else: return "minimal" def _analyze_performance(self, performance_data: Dict[str, Any]) -> Dict[str, Any]: """Analyze performance data and generate recommendations""" recommendations = {} # Analyze response times avg_response_time = performance_data.get("avg_response_time", 0) if avg_response_time > 5000: # 5 seconds recommendations["increase_timeout"] = True recommendations["reduce_connections"] = True # Analyze error rates error_rate = performance_data.get("error_rate", 0) if error_rate > 0.05: # 5% recommendations["increase_retries"] = True recommendations["add_fallback_nodes"] = True # Analyze connection usage max_connections = performance_data.get("max_connections_reached", False) if max_connections: recommendations["increase_connection_pool"] = True return recommendations def _apply_adaptive_rules(self, config: NetworkConfig, recommendations: Dict[str, Any]) -> NetworkConfig: """Apply adaptive rules based on recommendations""" config_dict = asdict(config) for rule in self.adaptive_rules: config_dict = rule(config_dict, recommendations) return NetworkConfig(**config_dict) def _rule_connection_timeout(self, config_dict: Dict[str, Any], recommendations: Dict[str, Any]) -> Dict[str, Any]: """Adaptive rule for connection timeout""" if recommendations.get("increase_timeout"): # This would need to be added to NetworkConfig - for now, return unchanged pass return config_dict def _rule_connection_pool_size(self, config_dict: Dict[str, Any], recommendations: Dict[str, Any]) -> Dict[str, Any]: """Adaptive rule for connection pool size""" if recommendations.get("increase_connection_pool"): # Add more seed nodes if needed current_nodes = config_dict.get("seed_nodes", []) if len(current_nodes) < 8: additional_nodes = [ "seed.bitcoin.sipa.be", "dnsseed.bluematt.me", "dnsseed.bitcoin.dashjr.org" ] config_dict["seed_nodes"] = list(set(current_nodes + additional_nodes)) return config_dict def _rule_retry_mechanism(self, config_dict: Dict[str, Any], recommendations: Dict[str, Any]) -> Dict[str, Any]: """Adaptive rule for retry mechanisms""" if recommendations.get("increase_retries"): # This would need to be added to NetworkConfig pass return config_dict def _rule_memory_usage(self, config_dict: Dict[str, Any], recommendations: Dict[str, Any]) -> Dict[str, Any]: """Adaptive rule for memory usage""" if recommendations.get("reduce_connections"): # Reduce seed nodes for lower memory usage current_nodes = config_dict.get("seed_nodes", []) if len(current_nodes) > 3: config_dict["seed_nodes"] = current_nodes[:3] return config_dict def _get_configuration_age(self) -> int: """Get age of current configuration in days""" if not self.current_config: return 0 try: last_updated = datetime.datetime.fromisoformat(self.current_config.last_updated) age = datetime.datetime.now() - last_updated return age.days except: return 0 def _save_configuration(self, config: IntelligentConfig) -> None: """Save configuration to file""" config_file = self.config_dir / "current_config.json" config_data = { "base_config": asdict(config.base_config), "profile": asdict(config.profile), "profile_network": config.profile.network, "auto_detected": config.auto_detected, "confidence_score": config.confidence_score, "security_level": config.security_level, "performance_tier": config.performance_tier, "last_updated": config.last_updated, "change_history": [asdict(h) for h in config.change_history] } with open(config_file, 'w') as f: json.dump(config_data, f, indent=2) def _load_fallback_configs(self) -> Dict[str, NetworkConfig]: """Load fallback configurations for emergencies""" fallback = {} for network, config in NETWORK_CONFIGS.items(): fallback[network.value] = config return fallback def demo_intelligent_configuration(): """Demonstrate intelligent configuration management""" print("=== Intelligent Configuration Management Demo ===\n") manager = IntelligentConfigurationManager() # Demo 1: Auto-configuration print("--- Auto-Configuration Demo ---") sample_rpc = { "getblockchaininfo": { "chain": "main", "blocks": 750000 }, "getnetworkinfo": { "version": 240000, "subversion": "/Satoshi:24.0.0/" } } config = manager.create_intelligent_configuration( rpc_responses=sample_rpc, profile_name="production_balanced" ) print(f"Network: {config.base_config.name}") print(f"Auto-Detected: {'✅' if config.auto_detected else '❌'}") print(f"Confidence: {config.confidence_score:.2f}") print(f"Security Level: {config.security_level}") print(f"Performance Tier: {config.performance_tier}") print(f"Profile: {config.profile.name}") print() # Demo 2: Get recommendations print("--- Configuration Recommendations ---") recommendations = manager.get_configuration_recommendations() for rec in recommendations: print(f"💡 {rec}") print() # Demo 3: Adaptive optimization print("--- Adaptive Optimization Demo ---") performance_data = { "avg_response_time": 6000, # High latency "error_rate": 0.08, # High error rate "max_connections_reached": True } optimized_config = manager.adaptive_optimization(performance_data) print(f"Optimized Performance Tier: {optimized_config.performance_tier}") print(f"Configuration Changes: {len(optimized_config.change_history)}") print() if __name__ == "__main__": demo_intelligent_configuration()