#!/usr/bin/env python3 """ Causal Power Analysis with Perturbation Methods This module implements sophisticated causal power calculations using perturbation theory and intervention analysis for IIT systems. Author: IIT Implementation Team Version: 1.0 """ import math import itertools import collections from typing import Dict, List, Tuple, Optional, Set, Any, Callable from dataclasses import dataclass import json import datetime from iit_core import ( IITCalculator, SystemState, Concept, CauseEffectStructure, ProbabilityDistribution, TransitionProbabilityMatrix ) @dataclass class PerturbationResult: """Results from perturbation analysis.""" perturbation_type: str perturbed_system: 'PerturbedSystem' causal_power: float intervention_strength: float recovery_time: float stability_metric: float @dataclass class Intervention: """Represents an intervention in the system.""" target_elements: Set[int] intervention_type: str # 'clamp', 'noise', 'lesion' intervention_strength: float duration: int class PerturbedSystem: """System under perturbation with modified dynamics.""" def __init__(self, original_tpm: TransitionProbabilityMatrix, perturbation: Intervention): self.original_tpm = original_tpm self.perturbation = perturbation self.perturbed_tpm = self._apply_perturbation(original_tpm, perturbation) self.history = [] def _apply_perturbation(self, tpm: TransitionProbabilityMatrix, intervention: Intervention) -> TransitionProbabilityMatrix: """Apply perturbation to create modified TPM.""" perturbed_tpm = TransitionProbabilityMatrix(tpm.num_elements) # Copy original transitions for (from_state, to_state), prob in tpm.transitions.items(): if intervention.intervention_type == 'clamp': new_prob = self._apply_clamp_perturbation( from_state, to_state, prob, intervention ) elif intervention.intervention_type == 'noise': new_prob = self._apply_noise_perturbation( from_state, to_state, prob, intervention ) elif intervention.intervention_type == 'lesion': new_prob = self._apply_lesion_perturbation( from_state, to_state, prob, intervention ) else: new_prob = prob perturbed_tpm.add_transition(from_state, to_state, new_prob) return perturbed_tpm def _apply_clamp_perturbation(self, from_state: SystemState, to_state: SystemState, original_prob: float, intervention: Intervention) -> float: """Apply clamping intervention to transition probability.""" # Check if target elements are in to_state for elem in intervention.target_elements: if elem < len(to_state.elements): # Force target element to specific value if to_state.elements[elem] != 0: # Assuming we clamp to 0 return 0.0 return original_prob def _apply_noise_perturbation(self, from_state: SystemState, to_state: SystemState, original_prob: float, intervention: Intervention) -> float: """Apply noise perturbation to transition probability.""" noise_level = intervention.intervention_strength # Add noise to transitions involving target elements for elem in intervention.target_elements: if elem < len(to_state.elements): # Stochastic flipping with probability noise_level if hash(f"{from_state}{to_state}{elem}") % 100 < noise_level * 100: return 1.0 - original_prob return original_prob * (1 - noise_level) + noise_level * 0.5 def _apply_lesion_perturbation(self, from_state: SystemState, to_state: SystemState, original_prob: float, intervention: Intervention) -> float: """Apply lesion (disconnection) perturbation.""" # Lesion removes connections from/to target elements for elem in intervention.target_elements: if elem < len(from_state.elements) or elem < len(to_state.elements): return 0.0 # Remove connection return original_prob def simulate_dynamics(self, initial_state: SystemState, steps: int) -> List[SystemState]: """Simulate system dynamics under perturbation.""" current_state = initial_state trajectory = [current_state] for _ in range(steps): # Find next state based on perturbed TPM next_state = self._get_next_state(current_state) if next_state: trajectory.append(next_state) current_state = next_state else: break self.history.extend(trajectory) return trajectory def _get_next_state(self, current_state: SystemState) -> Optional[SystemState]: """Get next state based on perturbed transition probabilities.""" possible_transitions = [] for (from_state, to_state), prob in self.perturbed_tpm.transitions.items(): if from_state.elements == current_state.elements: possible_transitions.append((to_state, prob)) if not possible_transitions: return None # Select based on probability (simplified deterministic selection) possible_transitions.sort(key=lambda x: x[1], reverse=True) return possible_transitions[0][0] class CausalPowerAnalyzer: """Advanced causal power analysis using perturbation methods.""" def __init__(self, calculator: IITCalculator): self.calculator = calculator self.perturbation_results = {} self.intervention_history = [] def compute_causal_power_matrix(self, system_state: SystemState) -> Dict[Tuple[int, int], float]: """ Compute comprehensive causal power matrix. Returns matrix where entry (i,j) represents causal power of element i on element j. """ num_elements = self.calculator.num_elements causal_matrix = {} for source in range(num_elements): for target in range(num_elements): if source != target: causal_power = self._compute_pairwise_causal_power( source, target, system_state ) causal_matrix[(source, target)] = causal_power return causal_matrix def _compute_pairwise_causal_power(self, source: int, target: int, system_state: SystemState) -> float: """Compute causal power of specific source-target pair.""" # Create intervention on source element intervention = Intervention( target_elements={source}, intervention_type='clamp', intervention_strength=1.0, duration=1 ) # Create perturbed system perturbed_system = PerturbedSystem(self.calculator.tpm, intervention) # Measure difference in target behavior baseline_repertoire = self.calculator.tpm.get_effect_repertoire( system_state, {source}, {target} ) perturbed_repertoire = perturbed_system.perturbed_tpm.get_effect_repertoire( system_state, {source}, {target} ) # Causal power as KL divergence between repertoires causal_power = baseline_repertoire.kullback_leibler_divergence(perturbed_repertoire) return causal_power def analyze_system_resilience(self, system_state: SystemState, perturbation_types: Optional[List[str]] = None) -> Dict[str, float]: """Analyze system resilience to different types of perturbations.""" if perturbation_types is None: perturbation_types = ['clamp', 'noise', 'lesion'] perturbation_types = ['clamp', 'noise', 'lesion'] resilience_scores = {} for pert_type in perturbation_types: # Test perturbations of different intensities intensity_scores = [] for intensity in [0.1, 0.3, 0.5, 0.7, 0.9]: intervention = Intervention( target_elements=set(range(self.calculator.num_elements)), intervention_type=pert_type, intervention_strength=intensity, duration=5 ) # Measure system response perturbed_system = PerturbedSystem(self.calculator.tpm, intervention) # Simulate recovery initial_perturbed = SystemState(system_state.elements, 1.0) trajectory = perturbed_system.simulate_dynamics(initial_perturbed, 10) # Calculate resilience metric resilience = self._calculate_resilience_metric(system_state, trajectory) intensity_scores.append(resilience) # Average resilience across intensities resilience_scores[pert_type] = sum(intensity_scores) / len(intensity_scores) return resilience_scores def _calculate_resilience_metric(self, original_state: SystemState, trajectory: List[SystemState]) -> float: """Calculate resilience metric based on trajectory analysis.""" if not trajectory: return 0.0 # Measure how quickly system returns to original behavior recovery_time = 0 for i, state in enumerate(trajectory): if state.elements == original_state.elements: recovery_time = i break if recovery_time == 0 and len(trajectory) > 0: recovery_time = len(trajectory) # Never recovered # Resilience inversely proportional to recovery time resilience = 1.0 / (1.0 + recovery_time) return resilience def find_critical_elements(self, system_state: SystemState, threshold: float = 0.5) -> List[int]: """Identify elements critical for system integration.""" critical_elements = [] # Compare Φ before and after perturbation original_concepts = self.calculator.compute_concepts(system_state) for element in range(self.calculator.num_elements): # Test lesion of individual element intervention = Intervention( target_elements={element}, intervention_type='lesion', intervention_strength=1.0, duration=10 ) perturbed_system = PerturbedSystem(self.calculator.tpm, intervention) perturbed_concepts = self._compute_perturbed_concepts(perturbed_system, system_state) # Element is critical if Φ significantly reduces if original_concepts.total_phi > 0: phi_reduction = (original_concepts.total_phi - perturbed_concepts.total_phi) / original_concepts.total_phi else: phi_reduction = 0 if phi_reduction > threshold: critical_elements.append(element) return critical_elements def _compute_perturbed_concepts(self, perturbed_system: PerturbedSystem, system_state: SystemState) -> CauseEffectStructure: """Compute concepts in perturbed system.""" # Create temporary calculator with perturbed TPM temp_calculator = IITCalculator(self.calculator.num_elements) temp_calculator.tpm = perturbed_system.perturbed_tpm return temp_calculator.compute_concepts(system_state) def compute_intervention_effects(self, system_state: SystemState, interventions: List[Intervention]) -> List[PerturbationResult]: """Analyze effects of multiple interventions.""" results = [] for intervention in interventions: # Apply perturbation perturbed_system = PerturbedSystem(self.calculator.tpm, intervention) # Simulate dynamics trajectory = perturbed_system.simulate_dynamics(system_state, intervention.duration) # Compute causal power causal_power = self._compute_intervention_causal_power( system_state, perturbed_system, intervention ) # Compute stability and recovery stability = self._compute_stability_metric(trajectory) recovery_time = self._compute_recovery_time(system_state, trajectory) result = PerturbationResult( perturbation_type=intervention.intervention_type, perturbed_system=perturbed_system, causal_power=causal_power, intervention_strength=intervention.intervention_strength, recovery_time=recovery_time, stability_metric=stability ) results.append(result) return results def _compute_intervention_causal_power(self, system_state: SystemState, perturbed_system: PerturbedSystem, intervention: Intervention) -> float: """Compute causal power of specific intervention.""" # Compare system behavior before and after intervention original_concepts = self.calculator.compute_concepts(system_state) perturbed_concepts = self._compute_perturbed_concepts(perturbed_system, system_state) # Causal power as difference in integrated information causal_power = abs(original_concepts.total_phi - perturbed_concepts.total_phi) return causal_power def _compute_stability_metric(self, trajectory: List[SystemState]) -> float: """Compute stability metric from trajectory.""" if len(trajectory) < 2: return 1.0 # Stability as inverse of state variability state_changes = 0 for i in range(1, len(trajectory)): if trajectory[i].elements != trajectory[i-1].elements: state_changes += 1 stability = 1.0 - (state_changes / (len(trajectory) - 1)) return stability def _compute_recovery_time(self, original_state: SystemState, trajectory: List[SystemState]) -> float: """Compute time to return to original state.""" for i, state in enumerate(trajectory): if state.elements == original_state.elements: return float(i) return float(len(trajectory)) # No recovery observed class PerturbationOptimizer: """Optimization of perturbation strategies for maximum effect.""" def __init__(self, analyzer: CausalPowerAnalyzer): self.analyzer = analyzer self.optimization_cache = {} def find_optimal_intervention(self, system_state: SystemState, objective: str = 'maximize_causal_power', constraints: Optional[Dict[str, Any]] = None) -> Optional[Intervention]: """Find optimal intervention given objectives and constraints.""" if constraints is None: constraints = {'max_elements': 3, 'max_strength': 0.8} best_intervention = None best_score = float('-inf') if objective == 'maximize_causal_power' else float('inf') # Search through intervention space num_elements = self.analyzer.calculator.num_elements # Generate candidate interventions candidates = self._generate_candidate_interventions( num_elements, constraints ) for candidate in candidates: # Evaluate candidate score = self._evaluate_intervention(candidate, system_state, objective) if objective == 'maximize_causal_power' and score > best_score: best_score = score best_intervention = candidate elif objective == 'minimize_disruption' and score < best_score: best_score = score best_intervention = candidate return best_intervention def _generate_candidate_interventions(self, num_elements: int, constraints: Dict[str, Any]) -> List[Intervention]: """Generate candidate interventions within constraints.""" candidates = [] intervention_types = ['clamp', 'noise', 'lesion'] max_elements = constraints.get('max_elements', num_elements) max_strength = constraints.get('max_strength', 1.0) # Generate combinations of target elements for target_size in range(1, min(max_elements + 1, num_elements + 1)): for targets in itertools.combinations(range(num_elements), target_size): target_set = set(targets) for int_type in intervention_types: for strength in [0.2, 0.5, 0.8, max_strength]: intervention = Intervention( target_elements=target_set, intervention_type=int_type, intervention_strength=strength, duration=5 ) candidates.append(intervention) return candidates def _evaluate_intervention(self, intervention: Intervention, system_state: SystemState, objective: str) -> float: """Evaluate intervention based on objective.""" # Apply intervention perturbed_system = PerturbedSystem(self.analyzer.calculator.tpm, intervention) if objective == 'maximize_causal_power': # Maximize causal power (disruption) original_concepts = self.analyzer.calculator.compute_concepts(system_state) perturbed_concepts = self.analyzer._compute_perturbed_concepts( perturbed_system, system_state ) score = abs(original_concepts.total_phi - perturbed_concepts.total_phi) elif objective == 'minimize_disruption': # Minimize disruption (preserve function) original_concepts = self.analyzer.calculator.compute_concepts(system_state) perturbed_concepts = self.analyzer._compute_perturbed_concepts( perturbed_system, system_state ) score = abs(original_concepts.total_phi - perturbed_concepts.total_phi) else: score = 0.0 return score def benchmark_causal_power_analysis(num_elements: int = 3) -> Dict[str, Any]: """Benchmark causal power analysis performance.""" print(f"Benchmarking Causal Power Analysis for {num_elements} elements") # Setup test system calculator = IITCalculator(num_elements) setup_test_tpm(calculator) analyzer = CausalPowerAnalyzer(calculator) # Test state test_state = SystemState(tuple([1] * num_elements), 1.0) # Benchmark different analyses import time results = {} # Causal power matrix start_time = time.time() causal_matrix = analyzer.compute_causal_power_matrix(test_state) results['causal_matrix_time'] = time.time() - start_time results['causal_matrix_size'] = len(causal_matrix) # Resilience analysis start_time = time.time() resilience = analyzer.analyze_system_resilience(test_state) results['resilience_time'] = time.time() - start_time results['resilience_scores'] = resilience # Critical elements start_time = time.time() critical = analyzer.find_critical_elements(test_state) results['critical_elements_time'] = time.time() - start_time results['critical_elements'] = critical # Intervention effects test_interventions = [ Intervention({0}, 'clamp', 0.5, 3), Intervention({1, 2}, 'noise', 0.3, 5), Intervention({0, 1, 2}, 'lesion', 1.0, 2) ] start_time = time.time() intervention_results = analyzer.compute_intervention_effects(test_state, test_interventions) results['intervention_time'] = time.time() - start_time results['intervention_count'] = len(intervention_results) return results def setup_test_tpm(calculator: IITCalculator): """Setup test TPM for benchmarking.""" # Create deterministic transitions states = [] for bits in itertools.product([0, 1], repeat=calculator.num_elements): prob = 1.0 / (2 ** calculator.num_elements) states.append(SystemState(bits, prob)) # Add cyclic and random transitions for i, from_state in enumerate(states): # Primary transition (deterministic) to_state = states[(i + 1) % len(states)] calculator.tpm.add_transition(from_state, to_state, 0.7) # Secondary transitions (stochastic) for j, other_state in enumerate(states): if i != j and j % 2 == 0: calculator.tpm.add_transition(from_state, other_state, 0.3 / (len(states) - 1)) if __name__ == "__main__": print("Causal Power Analysis with Perturbation Methods") print("=" * 50) # Test with 3-element system calculator = IITCalculator(num_elements=3) setup_test_tpm(calculator) analyzer = CausalPowerAnalyzer(calculator) # Test state test_state = SystemState((1, 0, 1), 1.0) print(f"Analyzing causal power for state: {test_state.elements}") # Compute causal power matrix causal_matrix = analyzer.compute_causal_power_matrix(test_state) print(f"\nCausal Power Matrix:") for (source, target), power in causal_matrix.items(): print(f" Element {source} → Element {target}: {power:.6f}") # Analyze resilience resilience = analyzer.analyze_system_resilience(test_state) print(f"\nResilience Scores:") for pert_type, score in resilience.items(): print(f" {pert_type}: {score:.6f}") # Find critical elements critical = analyzer.find_critical_elements(test_state) print(f"\nCritical Elements: {critical}") # Test interventions test_interventions = [ Intervention({0}, 'clamp', 0.5, 3), Intervention({1}, 'noise', 0.3, 5) ] intervention_results = analyzer.compute_intervention_effects(test_state, test_interventions) print(f"\nIntervention Effects:") for i, result in enumerate(intervention_results): print(f" Intervention {i+1} ({result.perturbation_type}):") print(f" Causal Power: {result.causal_power:.6f}") print(f" Stability: {result.stability_metric:.6f}") print(f" Recovery Time: {result.recovery_time:.1f}") # Benchmark print(f"\nBenchmarking...") benchmark_results = benchmark_causal_power_analysis(3) print(f"Causal matrix computation: {benchmark_results['causal_matrix_time']:.6f}s") print(f"Resilience analysis: {benchmark_results['resilience_time']:.6f}s") print(f"Critical elements analysis: {benchmark_results['critical_elements_time']:.6f}s") print("\nCausal power analysis initialized successfully!")