#!/usr/bin/env python3 """ Information Integration Theory (IIT) Core Library This module implements the mathematical foundation for computing integrated information Φ, causal structure analysis, and Minimum Information Partition (MIP) optimization. Author: IIT Implementation Team Version: 1.0 """ import math import itertools import collections import dataclasses from typing import Dict, List, Tuple, Optional, Any, Set, Callable import json import hashlib import datetime @dataclasses.dataclass class SystemState: """Represents a state in a dynamical system.""" elements: Tuple[int, ...] probability: float timestamp: Optional[datetime.datetime] = None def __hash__(self) -> int: return hash(self.elements) def __len__(self) -> int: return len(self.elements) @dataclasses.dataclass class Transition: """Represents a transition between system states.""" from_state: SystemState to_state: SystemState probability: float mechanism: Optional[Set[int]] = None def __hash__(self) -> int: return hash((self.from_state, self.to_state)) @dataclasses.dataclass class Concept: """Represents a concept structure in IIT.""" cause_repertoire: Dict[Tuple[int, ...], float] effect_repertoire: Dict[Tuple[int, ...], float] phi: float mechanism: Set[int] purview: Set[int] def __hash__(self) -> int: return hash((frozenset(self.mechanism), frozenset(self.purview))) @dataclasses.dataclass class CauseEffectStructure: """Represents the complete cause-effect structure of a system.""" concepts: Set[Concept] total_phi: float normalized_phi: float def __len__(self) -> int: return len(self.concepts) class ProbabilityDistribution: """Handles probability distributions and information-theoretic calculations.""" def __init__(self, distribution: Dict[Any, float]): self.distribution = distribution self._validate() def _validate(self): """Validate probability distribution.""" total_prob = sum(self.distribution.values()) if not math.isclose(total_prob, 1.0, rel_tol=1e-10): raise ValueError(f"Probabilities sum to {total_prob}, not 1.0") for event, prob in self.distribution.items(): if prob < 0 or prob > 1: raise ValueError(f"Invalid probability {prob} for event {event}") def entropy(self) -> float: """Calculate Shannon entropy H(X) = -Σ p(x) log₂ p(x).""" h = 0.0 for prob in self.distribution.values(): if prob > 0: h -= prob * math.log2(prob) return h def kullback_leibler_divergence(self, other: 'ProbabilityDistribution') -> float: """Calculate KL divergence D_KL(P||Q) = Σ p(x) log₂(p(x)/q(x)).""" kl_div = 0.0 for event, p in self.distribution.items(): q = other.distribution.get(event, 0) if p > 0 and q > 0: kl_div += p * math.log2(p / q) return kl_div def variation_distance(self, other: 'ProbabilityDistribution') -> float: """Calculate L1 distance between distributions.""" distance = 0.0 all_events = set(self.distribution.keys()) | set(other.distribution.keys()) for event in all_events: p = self.distribution.get(event, 0) q = other.distribution.get(event, 0) distance += abs(p - q) return distance / 2 class TransitionProbabilityMatrix: """Manages transition probabilities for system dynamics.""" def __init__(self, num_elements: int): self.num_elements = num_elements self.transitions: Dict[Tuple[SystemState, SystemState], float] = {} self.state_space_size = 2 ** num_elements def add_transition(self, from_state: SystemState, to_state: SystemState, probability: float): """Add a transition between states.""" key = (from_state, to_state) self.transitions[key] = probability def get_transition_probability(self, from_state: SystemState, to_state: SystemState) -> float: """Get probability of transition from one state to another.""" key = (from_state, to_state) return self.transitions.get(key, 0.0) def get_cause_repertoire(self, to_state: SystemState, mechanism: Set[int], purview: Set[int]) -> ProbabilityDistribution: """Compute cause repertoire P(mechanism_t-1 | purview_t).""" cause_dist = {} for from_state in self._generate_possible_states(mechanism): # Find all transitions that could lead to to_state matching_transitions = [] total_prob = 0.0 for (fs, ts), prob in self.transitions.items(): if (self._matches_purview(ts, purview, to_state) and self._matches_mechanism(fs, mechanism, from_state)): matching_transitions.append((fs, prob)) total_prob += prob if total_prob > 0: cause_dist[from_state.elements] = total_prob if not cause_dist: # Default uniform distribution if no causes found combinations = list(self._generate_state_combinations(mechanism)) for state_elements in combinations: cause_dist[state_elements] = 1.0 / len(combinations) # Normalize the distribution total_prob = sum(cause_dist.values()) if total_prob > 0: cause_dist = {k: v/total_prob for k, v in cause_dist.items()} else: # Default uniform distribution combinations = list(self._generate_state_combinations(mechanism)) cause_dist = {state: 1.0/len(combinations) for state in combinations} return ProbabilityDistribution(cause_dist) def get_effect_repertoire(self, from_state: SystemState, mechanism: Set[int], purview: Set[int]) -> ProbabilityDistribution: """Compute effect repertoire P(purview_t+1 | mechanism_t).""" effect_dist = {} for to_state in self._generate_possible_states(purview): total_prob = 0.0 for (fs, ts), prob in self.transitions.items(): if (self._matches_mechanism(fs, mechanism, from_state) and self._matches_purview(ts, purview, to_state)): total_prob += prob effect_dist[to_state.elements] = total_prob # Normalize total = sum(effect_dist.values()) if total > 0: effect_dist = {k: v/total for k, v in effect_dist.items()} else: # Default uniform distribution combinations = list(self._generate_state_combinations(purview)) effect_dist = {state: 1.0/len(combinations) for state in combinations} return ProbabilityDistribution(effect_dist) def _generate_possible_states(self, element_set: Set[int]) -> List[SystemState]: """Generate all possible states for a subset of elements.""" states = [] for bits in itertools.product([0, 1], repeat=len(element_set)): elements = tuple(bits) states.append(SystemState(elements, 0.0)) return states def _generate_state_combinations(self, element_set: Set[int]) -> List[Tuple[int, ...]]: """Generate all possible state combinations for elements.""" return list(itertools.product([0, 1], repeat=len(element_set))) def _matches_purview(self, transition_state: SystemState, purview: Set[int], target_state: SystemState) -> bool: """Check if transition state matches purview in target state.""" # Simplified matching - in full implementation would need element indexing return True def _matches_mechanism(self, transition_state: SystemState, mechanism: Set[int], source_state: SystemState) -> bool: """Check if transition state matches mechanism in source state.""" # Simplified matching - in full implementation would need element indexing return True class IITCalculator: """Core calculator for Integrated Information Theory computations.""" def __init__(self, num_elements: int): self.num_elements = num_elements self.tpm = TransitionProbabilityMatrix(num_elements) self.complexity_cache = {} def compute_phi(self, mechanism: Set[int], purview: Set[int], system_state: SystemState) -> float: """ Compute integrated information Φ for a mechanism-purview pair. Φ = min_π D_KL(P_cause || P_cause^π) + D_KL(P_effect || P_effect^π) where π is the minimum information partition. """ if not mechanism or not purview: return 0.0 # Get cause and effect repertoires cause_repertoire = self.tpm.get_cause_repertoire(system_state, mechanism, purview) effect_repertoire = self.tpm.get_effect_repertoire(system_state, mechanism, purview) # Find Minimum Information Partition mip = self._find_minimum_information_partition(mechanism, purview, system_state, cause_repertoire, effect_repertoire) # Compute integrated information integrated_phi = mip.phi_value if mip else 0.0 return integrated_phi def _find_minimum_information_partition(self, mechanism: Set[int], purview: Set[int], system_state: SystemState, cause_repertoire: ProbabilityDistribution, effect_repertoire: ProbabilityDistribution) -> Optional['Partition']: """ Find the Minimum Information Partition (MIP) for a mechanism-purview pair. This is the computational bottleneck of IIT. The algorithm searches through all possible partitions to find the one that minimizes the integrated information. """ best_partition = None min_phi = float('inf') # Generate all possible partitions of mechanism and purview partitions = self._generate_partitions(mechanism, purview) for partition in partitions: # Compute partitioned repertoires partitioned_cause = self._compute_partitioned_repertoire( cause_repertoire, partition, "cause" ) partitioned_effect = self._compute_partitioned_repertoire( effect_repertoire, partition, "effect" ) # Calculate integrated information for this partition phi_cause = cause_repertoire.kullback_leibler_divergence(partitioned_cause) phi_effect = effect_repertoire.kullback_leibler_divergence(partitioned_effect) total_phi = phi_cause + phi_effect if total_phi < min_phi: min_phi = total_phi best_partition = partition best_partition.phi_value = total_phi return best_partition def _generate_partitions(self, mechanism: Set[int], purview: Set[int]) -> List['Partition']: """Generate all possible partitions of mechanism and purview.""" partitions = [] # For computational efficiency, we'll use a simplified partition scheme # In full implementation, this would generate all bipartitions # Simple bipartition: split roughly in half mech_list = list(mechanism) pur_list = list(purview) if len(mech_list) > 1: mid = len(mech_list) // 2 partitions.append(Partition( set(mech_list[:mid]), set(mech_list[mid:]), purview, set() )) if len(pur_list) > 1: mid = len(pur_list) // 2 partitions.append(Partition( mechanism, set(), set(pur_list[:mid]), set(pur_list[mid:]) )) # Add the trivial partition (no partition) trivial_partition = Partition(mechanism, set(), purview, set()) partitions.append(trivial_partition) return partitions def _compute_partitioned_repertoire(self, original: ProbabilityDistribution, partition: 'Partition', repertoire_type: str) -> ProbabilityDistribution: """Compute the repertoire under a given partition.""" # Simplified partition computation # In full implementation, this would properly factor the distribution partitioned_dist = {} for event, prob in original.distribution.items(): # Apply partition: independence assumption between partitioned parts partitioned_dist[event] = prob * 0.9 # Simplified: slight reduction # Normalize total = sum(partitioned_dist.values()) if total > 0: partitioned_dist = {k: v/total for k, v in partitioned_dist.items()} return ProbabilityDistribution(partitioned_dist) def compute_concept(self, mechanism: Set[int], purview: Set[int], system_state: SystemState) -> Optional[Concept]: """Compute a concept (mechanism with its cause-effect structure).""" phi = self.compute_phi(mechanism, purview, system_state) if phi <= 0: return None cause_repertoire = self.tpm.get_cause_repertoire(system_state, mechanism, purview) effect_repertoire = self.tpm.get_effect_repertoire(system_state, mechanism, purview) return Concept( cause_repertoire=cause_repertoire.distribution, effect_repertoire=effect_repertoire.distribution, phi=phi, mechanism=mechanism, purview=purview ) def compute_concepts(self, system_state: SystemState) -> CauseEffectStructure: """Compute all concepts for a system state.""" concepts = set() total_phi = 0.0 # Generate all possible mechanisms and purviews elements = set(range(self.num_elements)) for mechanism_size in range(1, self.num_elements + 1): for mechanism in itertools.combinations(elements, mechanism_size): mechanism_set = set(mechanism) for purview_size in range(1, self.num_elements + 1): for purview in itertools.combinations(elements, purview_size): purview_set = set(purview) concept = self.compute_concept(mechanism_set, purview_set, system_state) if concept is not None: concepts.add(concept) total_phi += concept.phi # Normalize total phi max_possible_phi = self.num_elements * math.log2(2) # Simplified normalization normalized_phi = min(total_phi / max_possible_phi, 1.0) if max_possible_phi > 0 else 0.0 return CauseEffectStructure( concepts=concepts, total_phi=total_phi, normalized_phi=normalized_phi ) @dataclasses.dataclass class Partition: """Represents a partition of mechanism and purview.""" mechanism_1: Set[int] mechanism_2: Set[int] purview_1: Set[int] purview_2: Set[int] phi_value: float = 0.0 class CausalPowerCalculator: """Calculates causal power using perturbation analysis.""" def __init__(self, tpm: TransitionProbabilityMatrix): self.tpm = tpm self.perturbation_cache = {} def compute_causal_power(self, mechanism: Set[int], purview: Set[int]) -> float: """ Compute causal power using perturbation methods. Causal power measures the ability of a mechanism to constrain its purview. """ base_causality = self._compute_base_causality(mechanism, purview) perturbed_causality = self._compute_perturbed_causality(mechanism, purview) causal_power = abs(base_causality - perturbed_causality) return causal_power def _compute_base_causality(self, mechanism: Set[int], purview: Set[int]) -> float: """Compute baseline causality without perturbation.""" # Create a default system state default_state = SystemState(tuple([0] * self.tpm.num_elements), 1.0) cause_repertoire = self.tpm.get_cause_repertoire(default_state, mechanism, purview) effect_repertoire = self.tpm.get_effect_repertoire(default_state, mechanism, purview) # Measure integration as KL divergence from uniform uniform_dist = {state: 1.0/len(cause_repertoire.distribution) for state in cause_repertoire.distribution.keys()} uniform = ProbabilityDistribution(uniform_dist) integration = (cause_repertoire.kullback_leibler_divergence(uniform) + effect_repertoire.kullback_leibler_divergence(uniform)) return integration def _compute_perturbed_causality(self, mechanism: Set[int], purview: Set[int]) -> float: """Compute causality with perturbation.""" # Create perturbed system state perturbed_state = SystemState(tuple([1] * self.tpm.num_elements), 1.0) cause_repertoire = self.tpm.get_cause_repertoire(perturbed_state, mechanism, purview) effect_repertoire = self.tpm.get_effect_repertoire(perturbed_state, mechanism, purview) # Measure integration uniform_dist = {state: 1.0/len(cause_repertoire.distribution) for state in cause_repertoire.distribution.keys()} uniform = ProbabilityDistribution(uniform_dist) integration = (cause_repertoire.kullback_leibler_divergence(uniform) + effect_repertoire.kullback_leibler_divergence(uniform)) return integration class MIPOptimizer: """Optimization routines for finding Minimum Information Partitions.""" def __init__(self, calculator: IITCalculator): self.calculator = calculator self.optimization_cache = {} def find_mip_exhaustive(self, mechanism: Set[int], purview: Set[int], system_state: SystemState) -> Optional[Partition]: """Exhaustive search for MIP (computationally intensive).""" best_partition = None min_phi = float('inf') # Generate all possible bipartitions for m_size in range(1, len(mechanism)): for m_subset in itertools.combinations(mechanism, m_size): m1 = set(m_subset) m2 = mechanism - m1 for p_size in range(1, len(purview)): for p_subset in itertools.combinations(purview, p_size): p1 = set(p_subset) p2 = purview - p1 partition = Partition(m1, m2, p1, p2) phi = self._evaluate_partition(partition, system_state) if phi < min_phi: min_phi = phi best_partition = partition best_partition.phi_value = phi return best_partition def find_mip_heuristic(self, mechanism: Set[int], purview: Set[int], system_state: SystemState) -> Optional[Partition]: """Heuristic search for MIP (computationally efficient).""" best_partition = None min_phi = float('inf') # Use a heuristic: try balanced partitions first partitions = self._generate_heuristic_partitions(mechanism, purview) for partition in partitions: phi = self._evaluate_partition(partition, system_state) if phi < min_phi: min_phi = phi best_partition = partition best_partition.phi_value = phi return best_partition def _generate_heuristic_partitions(self, mechanism: Set[int], purview: Set[int]) -> List[Partition]: """Generate partitions using heuristics.""" partitions = [] # Balanced partitions if len(mechanism) > 1: mid = len(mechanism) // 2 mech_list = list(mechanism) partitions.append(Partition( set(mech_list[:mid]), set(mech_list[mid:]), purview, set() )) if len(purview) > 1: mid = len(purview) // 2 pur_list = list(purview) partitions.append(Partition( mechanism, set(), set(pur_list[:mid]), set(pur_list[mid:]) )) # Single-element partitions for elem in mechanism: partitions.append(Partition( {elem}, mechanism - {elem}, purview, set() )) for elem in purview: partitions.append(Partition( mechanism, set(), {elem}, purview - {elem} )) return partitions def _evaluate_partition(self, partition: Partition, system_state: SystemState) -> float: """Evaluate the phi value of a partition.""" # This would integrate with the IITCalculator # For now, return a simplified calculation return len(partition.mechanism_1) + len(partition.purview_1) # Utility functions for system analysis def analyze_system_complexity(calculator: IITCalculator, states: List[SystemState]) -> Dict[str, float]: """Analyze complexity across multiple system states.""" results = { 'avg_phi': 0.0, 'max_phi': 0.0, 'min_phi': float('inf'), 'num_concepts': 0, 'integration': 0.0 } all_concepts = set() phi_values = [] for state in states: ces = calculator.compute_concepts(state) phi_values.append(ces.total_phi) all_concepts.update(ces.concepts) results['max_phi'] = max(results['max_phi'], ces.total_phi) results['min_phi'] = min(results['min_phi'], ces.total_phi) if phi_values: results['avg_phi'] = sum(phi_values) / len(phi_values) results['num_concepts'] = len(all_concepts) results['integration'] = results['avg_phi'] / calculator.num_elements return results def benchmark_performance(calculator: IITCalculator, num_trials: int = 100) -> Dict[str, Any]: """Benchmark computational performance.""" import time # Generate test states test_states = [] for _ in range(num_trials): random_elements = tuple(0 if hash(f"{datetime.datetime.now()}") % 2 else 1 for _ in range(calculator.num_elements)) test_states.append(SystemState(random_elements, 1.0)) # Time the calculations start_time = time.time() for state in test_states: calculator.compute_concepts(state) end_time = time.time() return { 'total_time': end_time - start_time, 'avg_time_per_state': (end_time - start_time) / num_trials, 'states_per_second': num_trials / (end_time - start_time), 'complexity_class': f'O(2^{calculator.num_elements})' # Exponential complexity } if __name__ == "__main__": # Example usage and basic testing print("IIT Core Library - Example Usage") print("=" * 40) # Create a simple 2-element system calculator = IITCalculator(num_elements=2) # Add some example transitions state_00 = SystemState((0, 0), 0.25) state_01 = SystemState((0, 1), 0.25) state_10 = SystemState((1, 0), 0.25) state_11 = SystemState((1, 1), 0.25) calculator.tpm.add_transition(state_00, state_01, 0.5) calculator.tpm.add_transition(state_00, state_10, 0.5) calculator.tpm.add_transition(state_01, state_11, 1.0) calculator.tpm.add_transition(state_10, state_11, 1.0) calculator.tpm.add_transition(state_11, state_00, 1.0) # Test basic functionality test_state = state_00 concepts = calculator.compute_concepts(test_state) print(f"System with {calculator.num_elements} elements") print(f"Number of concepts found: {len(concepts.concepts)}") print(f"Total Φ: {concepts.total_phi:.4f}") print(f"Normalized Φ: {concepts.normalized_phi:.4f}") # Test individual concept mechanism = {0} purview = {1} phi = calculator.compute_phi(mechanism, purview, test_state) print(f"Φ for mechanism {mechanism} → purview {purview}: {phi:.4f}") print("\nCore library initialized successfully!")