""" Information Flow Tracking and State Transition Dynamics for IIT Analysis """ import math import json import hashlib from typing import Dict, List, Optional, Tuple, Set import itertools import collections from neural_engine import NeuralNetwork class StateTransition: """Tracks state transitions and information flow""" def __init__(self, network: NeuralNetwork): self.network = network self.transition_probabilities = {} self.information_flow = {} self.causality_matrix = None self.integrated_information = 0.0 def compute_transition_probabilities(self, n_samples: int = 1000) -> Dict: """Compute state transition probabilities through sampling""" transitions = collections.defaultdict(lambda: collections.defaultdict(int)) # Sample transitions for _ in range(n_samples): # Random input inputs = [random.uniform(-1, 1) for _ in range(self.network.n_nodes)] current_state = tuple(self.network.get_state_vector()) # Step network self.network.step(inputs) next_state = tuple(self.network.get_state_vector()) transitions[current_state][next_state] += 1 # Normalize to probabilities prob_matrix = {} for current_state, next_states in transitions.items(): total = sum(next_states.values()) prob_matrix[current_state] = { next_state: count / total for next_state, count in next_states.items() } self.transition_probabilities = prob_matrix return prob_matrix def compute_causality_matrix(self) -> List[List[float]]: """Compute causality matrix based on state transitions""" n = self.network.n_nodes causality = [[0.0] * n for _ in range(n)] if not self.transition_probabilities: self.compute_transition_probabilities() # Analyze causal influence between nodes for source in range(n): for target in range(n): influence = self._measure_causal_influence(source, target) causality[source][target] = influence self.causality_matrix = causality return causality def _measure_causal_influence(self, source: int, target: int) -> float: """Measure causal influence of source on target""" influence = 0.0 count = 0 for current_state, transitions in self.transition_probabilities.items(): if len(current_state) <= source or len(current_state) <= target: continue baseline_prob = 0.0 modified_prob = 0.0 for next_state, prob in transitions.items(): if len(next_state) > target: baseline_prob += prob * next_state[target] # Simulate source modification modified_state = list(current_state) modified_state[source] = 1.0 - modified_state[source] modified_tuple = tuple(modified_state) if modified_tuple in transitions: for next_state, prob in transitions[modified_tuple].items(): if len(next_state) > target: modified_prob += prob * next_state[target] if baseline_prob > 0: influence += abs(modified_prob - baseline_prob) / baseline_prob count += 1 return influence / count if count > 0 else 0.0 def track_information_flow(self, steps: int = 100) -> Dict: """Track information flow through network over time""" flow_data = { 'entropy_history': [], 'mutual_information': [], 'integration_history': [], 'causality_flow': [] } for step in range(steps): # Random input inputs = [random.uniform(-1, 1) for _ in range(self.network.n_nodes)] # Get current state current_entropy = self._compute_entropy() # Step network self.network.step(inputs) # Get new state new_entropy = self._compute_entropy() # Compute mutual information mi = self._compute_mutual_information() # Compute integrated information phi = self._compute_integrated_information() flow_data['entropy_history'].append(new_entropy) flow_data['mutual_information'].append(mi) flow_data['integration_history'].append(phi) self.information_flow = flow_data return flow_data def _compute_entropy(self) -> float: """Compute Shannon entropy of current state distribution""" states = self.network.get_state_vector() # Discretize states for entropy calculation discretized = [1 if s > 0.5 else 0 for s in states] # Count patterns pattern_counts = collections.Counter(tuple(discretized)) total = sum(pattern_counts.values()) entropy = 0.0 for count in pattern_counts.values(): if count > 0: p = count / total entropy -= p * math.log2(p) return entropy def _compute_mutual_information(self) -> float: """Compute average mutual information between node pairs""" n = self.network.n_nodes total_mi = 0.0 pair_count = 0 for i in range(n): for j in range(i + 1, n): mi = self._pairwise_mutual_info(i, j) total_mi += mi pair_count += 1 return total_mi / pair_count if pair_count > 0 else 0.0 def _pairwise_mutual_info(self, node1: int, node2: int) -> float: """Compute mutual information between two nodes""" # Use historical states if available if len(self.network.state_history) < 2: return 0.0 # Collect joint states joint_states = [] for state_vector in self.network.state_history[-50:]: # Last 50 steps if len(state_vector) > max(node1, node2): s1 = 1 if state_vector[node1] > 0.5 else 0 s2 = 1 if state_vector[node2] > 0.5 else 0 joint_states.append((s1, s2)) if not joint_states: return 0.0 # Compute joint and marginal distributions joint_counts = collections.Counter(joint_states) total = len(joint_states) # Marginals s1_counts = collections.Counter([s for s, _ in joint_states]) s2_counts = collections.Counter([s for _, s in joint_states]) # Compute mutual information mi = 0.0 for (s1, s2), count in joint_counts.items(): p_joint = count / total p_s1 = s1_counts[s1] / total p_s2 = s2_counts[s2] / total if p_joint > 0 and p_s1 > 0 and p_s2 > 0: mi += p_joint * math.log2(p_joint / (p_s1 * p_s2)) return mi def _compute_integrated_information(self) -> float: """Compute Φ (Phi) - integrated information measure""" if not self.causality_matrix: self.compute_causality_matrix() # Simplified Φ calculation based on information integration n = self.network.n_nodes # Compute system information system_info = self._compute_system_information() # Compute sum of parts information parts_info = 0.0 for i in range(n): parts_info += self._compute_node_information(i) # Φ is the difference phi = max(0, system_info - parts_info) self.integrated_information = phi return phi def _compute_system_information(self) -> float: """Compute information for the whole system""" if len(self.network.state_history) < 2: return 0.0 # Use current state current_state = tuple(self.network.get_state_vector()) # Compute probability distribution state_counts = collections.Counter() for state_vector in self.network.state_history[-100:]: discretized = tuple([1 if s > 0.5 else 0 for s in state_vector]) state_counts[discretized] += 1 total = sum(state_counts.values()) if total == 0 or current_state not in state_counts: return 0.0 # Information content p = state_counts[current_state] / total if p > 0: return -math.log2(p) return 0.0 def _compute_node_information(self, node_id: int) -> float: """Compute information for individual node""" if len(self.network.state_history) < 2 or node_id >= len(self.network.get_state_vector()): return 0.0 current_state = 1 if self.network.get_state_vector()[node_id] > 0.5 else 0 # Count node states state_counts = collections.Counter() for state_vector in self.network.state_history[-100:]: if len(state_vector) > node_id: node_state = 1 if state_vector[node_id] > 0.5 else 0 state_counts[node_state] += 1 total = sum(state_counts.values()) if total == 0 or current_state not in state_counts: return 0.0 p = state_counts[current_state] / total if p > 0: return -math.log2(p) return 0.0 # Import required for random sampling import random