""" Advanced Φ (Phi) Calculation Algorithms for Integrated Information Theory This module implements sophisticated algorithms for computing integrated information including optimized MIP search, parallel computation, and approximation methods. Author: IIT Implementation Team Version: 1.0 """ import math import itertools import json import hashlib import datetime from typing import Dict, List, Tuple, Optional, Any, Set from dataclasses import dataclass from collections import defaultdict from iit_core import ( SystemState, Transition, Concept, CauseEffectStructure, ProbabilityDistribution, TransitionProbabilityMatrix, Partition ) @dataclass class PhiResult: """Result of Φ calculation with detailed metrics.""" phi_value: float mip: Optional[Partition] cause_integration: float effect_integration: float computation_time: float partitions_evaluated: int is_approximate: bool = False confidence: float = 1.0 class AdvancedPhiCalculator: """Advanced calculator for Φ with optimization algorithms.""" def __init__(self, tpm: TransitionProbabilityMatrix, num_elements: int): self.tpm = tpm self.num_elements = num_elements self.cache = {} self.computation_stats = { 'total_calculations': 0, 'cache_hits': 0, 'partitions_evaluated': 0, 'approximations_used': 0 } def compute_phi_exhaustive(self, mechanism: Set[int], purview: Set[int], system_state: SystemState) -> PhiResult: """ Exhaustive computation of Φ by evaluating all possible partitions. Time Complexity: O(2^(|mechanism| + |purview|)) """ start_time = datetime.datetime.now() # Check cache cache_key = self._get_cache_key(mechanism, purview, system_state, "exhaustive") if cache_key in self.cache: self.computation_stats['cache_hits'] += 1 return self.cache[cache_key] # Get cause and effect repertoires cause_repertoire = self.tpm.get_cause_repertoire(system_state, mechanism, purview) effect_repertoire = self.tpm.get_effect_repertoire(system_state, mechanism, purview) # Generate all possible bipartitions partitions = self._generate_all_bipartitions(mechanism, purview) best_mip = None min_total_phi = float('inf') best_cause_phi = 0.0 best_effect_phi = 0.0 for partition in partitions: # Compute partitioned repertoires partitioned_cause = self._compute_partitioned_repertoire( cause_repertoire, partition, "cause" ) partitioned_effect = self._compute_partitioned_repertoire( effect_repertoire, partition, "effect" ) # Calculate integrated information cause_phi = cause_repertoire.kullback_leibler_divergence(partitioned_cause) effect_phi = effect_repertoire.kullback_leibler_divergence(partitioned_effect) total_phi = cause_phi + effect_phi if total_phi < min_total_phi: min_total_phi = total_phi best_mip = partition best_cause_phi = cause_phi best_effect_phi = effect_phi computation_time = (datetime.datetime.now() - start_time).total_seconds() result = PhiResult( phi_value=min_total_phi, mip=best_mip, cause_integration=best_cause_phi, effect_integration=best_effect_phi, computation_time=computation_time, partitions_evaluated=len(partitions), is_approximate=False ) # Cache result self.cache[cache_key] = result self.computation_stats['total_calculations'] += 1 self.computation_stats['partitions_evaluated'] += len(partitions) return result def compute_phi_approximate(self, mechanism: Set[int], purview: Set[int], system_state: SystemState, approximation_level: str = "medium") -> PhiResult: """ Approximate Φ computation using heuristics and sampling. approximation_level: "low", "medium", "high" - higher means more accurate """ start_time = datetime.datetime.now() cache_key = self._get_cache_key(mechanism, purview, system_state, f"approx_{approximation_level}") if cache_key in self.cache: self.computation_stats['cache_hits'] += 1 return self.cache[cache_key] # Get cause and effect repertoires cause_repertoire = self.tpm.get_cause_repertoire(system_state, mechanism, purview) effect_repertoire = self.tpm.get_effect_repertoire(system_state, mechanism, purview) # Generate heuristic partitions based on approximation level if approximation_level == "low": partitions = self._generate_coarse_partitions(mechanism, purview) confidence = 0.7 elif approximation_level == "medium": partitions = self._generate_balanced_partitions(mechanism, purview) confidence = 0.85 else: # high partitions = self._generate_fine_partitions(mechanism, purview) confidence = 0.95 best_mip = None min_total_phi = float('inf') best_cause_phi = 0.0 best_effect_phi = 0.0 for partition in partitions: # Use approximate partition computation partitioned_cause = self._compute_approximate_partitioned_repertoire( cause_repertoire, partition, "cause" ) partitioned_effect = self._compute_approximate_partitioned_repertoire( effect_repertoire, partition, "effect" ) cause_phi = cause_repertoire.kullback_leibler_divergence(partitioned_cause) effect_phi = effect_repertoire.kullback_leibler_divergence(partitioned_effect) total_phi = cause_phi + effect_phi if total_phi < min_total_phi: min_total_phi = total_phi best_mip = partition best_cause_phi = cause_phi best_effect_phi = effect_phi computation_time = (datetime.datetime.now() - start_time).total_seconds() result = PhiResult( phi_value=min_total_phi, mip=best_mip, cause_integration=best_cause_phi, effect_integration=best_effect_phi, computation_time=computation_time, partitions_evaluated=len(partitions), is_approximate=True, confidence=confidence ) self.cache[cache_key] = result self.computation_stats['total_calculations'] += 1 self.computation_stats['partitions_evaluated'] += len(partitions) self.computation_stats['approximations_used'] += 1 return result def compute_phi_parallel(self, mechanism: Set[int], purview: Set[int], system_state: SystemState) -> PhiResult: """ Parallel computation of Φ (simulated for this implementation). In a real implementation, this would use multiprocessing or GPU acceleration. """ start_time = datetime.datetime.now() # Get repertoires cause_repertoire = self.tpm.get_cause_repertoire(system_state, mechanism, purview) effect_repertoire = self.tpm.get_effect_repertoire(system_state, mechanism, purview) # Generate partitions partitions = self._generate_all_bipartitions(mechanism, purview) # Simulate parallel evaluation partition_results = [] for partition in partitions: # Simulate parallel work chunk partitioned_cause = self._compute_partitioned_repertoire( cause_repertoire, partition, "cause" ) partitioned_effect = self._compute_partitioned_repertoire( effect_repertoire, partition, "effect" ) cause_phi = cause_repertoire.kullback_leibler_divergence(partitioned_cause) effect_phi = effect_repertoire.kullback_leibler_divergence(partitioned_effect) total_phi = cause_phi + effect_phi partition_results.append((partition, total_phi, cause_phi, effect_phi)) # Find minimum (simulate reduction from parallel results) best_mip = None min_total_phi = float('inf') best_cause_phi = 0.0 best_effect_phi = 0.0 for partition, total_phi, cause_phi, effect_phi in partition_results: if total_phi < min_total_phi: min_total_phi = total_phi best_mip = partition best_cause_phi = cause_phi best_effect_phi = effect_phi computation_time = (datetime.datetime.now() - start_time).total_seconds() return PhiResult( phi_value=min_total_phi, mip=best_mip, cause_integration=best_cause_phi, effect_integration=best_effect_phi, computation_time=computation_time, partitions_evaluated=len(partitions), is_approximate=False ) def _get_cache_key(self, mechanism: Set[int], purview: Set[int], system_state: SystemState, method: str) -> str: """Generate cache key for computation parameters.""" mech_str = ''.join(sorted(map(str, mechanism))) pur_str = ''.join(sorted(map(str, purview))) state_str = ''.join(map(str, system_state.elements)) return f"{method}_{mech_str}_{pur_str}_{state_str}" def _generate_all_bipartitions(self, mechanism: Set[int], purview: Set[int]) -> List[Partition]: """Generate all possible bipartitions of mechanism and purview.""" partitions = [] # Generate all mechanism bipartitions mechanism_partitions = self._bipartition_set(mechanism) # Generate all purview bipartitions purview_partitions = self._bipartition_set(purview) # Combine mechanism and purview partitions for mech_part in mechanism_partitions: for pur_part in purview_partitions: partitions.append(Partition( mech_part[0], mech_part[1], pur_part[0], pur_part[1] )) # Add single-sided partitions if len(mechanism) > 1: for mech_part in mechanism_partitions: partitions.append(Partition( mech_part[0], mech_part[1], purview, set() )) if len(purview) > 1: for pur_part in purview_partitions: partitions.append(Partition( mechanism, set(), pur_part[0], pur_part[1] )) return partitions def _bipartition_set(self, element_set: Set[int]) -> List[Tuple[Set[int], Set[int]]]: """Generate all bipartitions of a set.""" bipartitions = [] element_list = list(element_set) # Generate all non-trivial bipartitions for k in range(1, len(element_list) // 2 + 1): for subset in itertools.combinations(element_list, k): part1 = set(subset) part2 = element_set - part1 bipartitions.append((part1, part2)) return bipartitions def _generate_coarse_partitions(self, mechanism: Set[int], purview: Set[int]) -> List[Partition]: """Generate coarse partitions for low-accuracy approximation.""" partitions = [] # Only try balanced splits if len(mechanism) > 1: mid = len(mechanism) // 2 mech_list = list(mechanism) partitions.append(Partition( set(mech_list[:mid]), set(mech_list[mid:]), purview, set() )) if len(purview) > 1: mid = len(purview) // 2 pur_list = list(purview) partitions.append(Partition( mechanism, set(), set(pur_list[:mid]), set(pur_list[mid:]) )) return partitions def _generate_balanced_partitions(self, mechanism: Set[int], purview: Set[int]) -> List[Partition]: """Generate balanced partitions for medium-accuracy approximation.""" partitions = [] # Try multiple balanced splits if len(mechanism) > 1: mech_list = list(mechanism) for split_ratio in [0.3, 0.5, 0.7]: split_idx = int(len(mech_list) * split_ratio) if 0 < split_idx < len(mech_list): partitions.append(Partition( set(mech_list[:split_idx]), set(mech_list[split_idx:]), purview, set() )) if len(purview) > 1: pur_list = list(purview) for split_ratio in [0.3, 0.5, 0.7]: split_idx = int(len(pur_list) * split_ratio) if 0 < split_idx < len(pur_list): partitions.append(Partition( mechanism, set(), set(pur_list[:split_idx]), set(pur_list[split_idx:]) )) # Add single-element partitions for elem in mechanism: partitions.append(Partition( {elem}, mechanism - {elem}, purview, set() )) return partitions def _generate_fine_partitions(self, mechanism: Set[int], purview: Set[int]) -> List[Partition]: """Generate fine partitions for high-accuracy approximation.""" partitions = [] # Include all partitions from exhaustive but limit complexity all_partitions = self._generate_all_bipartitions(mechanism, purview) # Select most promising partitions based on heuristics for partition in all_partitions: # Heuristic: prefer balanced partitions mech_balance = abs(len(partition.mechanism_1) - len(partition.mechanism_2)) pur_balance = abs(len(partition.purview_1) - len(partition.purview_2)) if mech_balance <= 2 and pur_balance <= 2: partitions.append(partition) # Ensure we have some partitions if not partitions: partitions = self._generate_balanced_partitions(mechanism, purview) return partitions def _compute_partitioned_repertoire(self, original: ProbabilityDistribution, partition: Partition, repertoire_type: str) -> ProbabilityDistribution: """Compute repertoire under partition with exact method.""" partitioned_dist = {} for event, prob in original.distribution.items(): # Apply independence assumption between partitioned parts # This is a simplified implementation adjusted_prob = prob # Apply partition reduction based on partition structure if repertoire_type == "cause": if partition.mechanism_1 and partition.mechanism_2: # Mechanism is partitioned adjusted_prob *= 0.8 # Reduction due to partition else: # effect if partition.purview_1 and partition.purview_2: # Purview is partitioned adjusted_prob *= 0.8 partitioned_dist[event] = adjusted_prob # Normalize total = sum(partitioned_dist.values()) if total > 0: partitioned_dist = {k: v/total for k, v in partitioned_dist.items()} return ProbabilityDistribution(partitioned_dist) def _compute_approximate_partitioned_repertoire(self, original: ProbabilityDistribution, partition: Partition, repertoire_type: str) -> ProbabilityDistribution: """Compute repertoire under partition with approximate method.""" # Use simpler approximation for faster computation partitioned_dist = {} for event, prob in original.distribution.items(): # Simple scaling based on partition scale_factor = 0.9 # Approximate independence partitioned_dist[event] = prob * scale_factor # Normalize total = sum(partitioned_dist.values()) if total > 0: partitioned_dist = {k: v/total for k, v in partitioned_dist.items()} return ProbabilityDistribution(partitioned_dist) def get_computation_stats(self) -> Dict[str, Any]: """Get statistics about computations performed.""" return { **self.computation_stats, 'cache_size': len(self.cache), 'cache_hit_rate': (self.computation_stats['cache_hits'] / max(self.computation_stats['total_calculations'], 1)) } def clear_cache(self): """Clear computation cache.""" self.cache.clear() self.computation_stats = { 'total_calculations': 0, 'cache_hits': 0, 'partitions_evaluated': 0, 'approximations_used': 0 } class PhiAnalyzer: """Analyzes Φ values across mechanisms and system states.""" def __init__(self, calculator: AdvancedPhiCalculator): self.calculator = calculator def analyze_mechanism_power(self, mechanisms: List[Set[int]], purview: Set[int], system_state: SystemState) -> Dict[str, Any]: """Analyze the causal power of different mechanisms.""" results = {} for mechanism in mechanisms: phi_result = self.calculator.compute_phi_exhaustive(mechanism, purview, system_state) results[str(mechanism)] = { 'phi': phi_result.phi_value, 'cause_integration': phi_result.cause_integration, 'effect_integration': phi_result.effect_integration, 'computation_time': phi_result.computation_time } # Sort mechanisms by phi value sorted_mechanisms = sorted(results.items(), key=lambda x: x[1]['phi'], reverse=True) return { 'mechanism_results': dict(sorted_mechanisms), 'strongest_mechanism': sorted_mechanisms[0][0] if sorted_mechanisms else None, 'average_phi': sum(r['phi'] for r in results.values()) / len(results) if results else 0 } def analyze_state_transitions(self, states: List[SystemState], mechanism: Set[int], purview: Set[int]) -> Dict[str, Any]: """Analyze how Φ changes across state transitions.""" phi_values = [] transition_analysis = [] for i, state in enumerate(states): phi_result = self.calculator.compute_phi_exhaustive(mechanism, purview, state) phi_values.append(phi_result.phi_value) if i > 0: phi_change = phi_values[-1] - phi_values[-2] transition_analysis.append({ 'from_state': i-1, 'to_state': i, 'phi_change': phi_change, 'abs_change': abs(phi_change) }) return { 'phi_values': phi_values, 'state_analysis': transition_analysis, 'max_phi': max(phi_values) if phi_values else 0, 'min_phi': min(phi_values) if phi_values else 0, 'phi_variance': self._compute_variance(phi_values) if phi_values else 0 } def _compute_variance(self, values: List[float]) -> float: """Compute variance of a list of values.""" if not values: return 0 mean = sum(values) / len(values) return sum((x - mean) ** 2 for x in values) / len(values) if __name__ == "__main__": # Example usage and testing print("Advanced Φ Calculator - Example Usage") print("=" * 50) # Create a simple test system from iit_core import IITCalculator tpm = TransitionProbabilityMatrix(2) calculator = AdvancedPhiCalculator(tpm, 2) # Test different computation methods mechanism = {0, 1} purview = {0, 1} test_state = SystemState((0, 1), 0.5) # Exhaustive computation print("Exhaustive Φ computation:") result_exhaustive = calculator.compute_phi_exhaustive(mechanism, purview, test_state) print(f" Φ: {result_exhaustive.phi_value:.4f}") print(f" Partitions evaluated: {result_exhaustive.partitions_evaluated}") print(f" Computation time: {result_exhaustive.computation_time:.4f}s") # Approximate computation print("\nApproximate Φ computation (medium):") result_approx = calculator.compute_phi_approximate(mechanism, purview, test_state, "medium") print(f" Φ: {result_approx.phi_value:.4f}") print(f" Confidence: {result_approx.confidence:.2f}") print(f" Computation time: {result_approx.computation_time:.4f}s") # Computation statistics print("\nComputation Statistics:") stats = calculator.get_computation_stats() for key, value in stats.items(): print(f" {key}: {value}") print("\nAdvanced Φ calculator initialized successfully!")