#!/usr/bin/env python3 """ Concept Structure and Repertoire Models for IIT This module implements sophisticated concept structure modeling and repertoire calculation algorithms for Integrated Information Theory. Author: IIT Implementation Team Version: 1.0 """ import math import itertools import collections from typing import Dict, List, Tuple, Optional, Set, Any, Callable, Union from dataclasses import dataclass, field import json import datetime from iit_core import ( IITCalculator, SystemState, Concept, CauseEffectStructure, ProbabilityDistribution, TransitionProbabilityMatrix ) @dataclass class RepertoireProfile: """Detailed profile of a probability repertoire.""" distribution: Dict[Tuple[int, ...], float] entropy: float information_content: float specificity: float determinism: float degeneracy: float def __post_init__(self): """Calculate derived properties after initialization.""" self._validate_distribution() self._compute_statistics() def _validate_distribution(self): """Validate probability distribution.""" total = sum(self.distribution.values()) if not math.isclose(total, 1.0, rel_tol=1e-10): # Normalize if needed self.distribution = {k: v/total for k, v in self.distribution.items()} def _compute_statistics(self): """Compute statistical properties of the distribution.""" # Entropy self.entropy = 0.0 for prob in self.distribution.values(): if prob > 0: self.entropy -= prob * math.log2(prob) # Information content (max entropy - actual entropy) max_entropy = math.log2(len(self.distribution)) if self.distribution else 0 self.information_content = max_entropy - self.entropy # Specificity (inverse of entropy) self.specificity = 1.0 / (1.0 + self.entropy) if self.entropy > 0 else 1.0 # Determinism (max probability) self.determinism = max(self.distribution.values()) if self.distribution else 0 # Degeneracy (number of non-zero probabilities) self.degeneracy = sum(1 for prob in self.distribution.values() if prob > 0) @dataclass class ConceptCluster: """Cluster of related concepts with similar properties.""" concepts: Set[Concept] cluster_id: int centroid_mechanism: Set[int] centroid_purview: Set[int] avg_phi: float similarity_threshold: float def __len__(self) -> int: return len(self.concepts) def get_representative_concept(self) -> Optional[Concept]: """Get the most representative concept in the cluster.""" if not self.concepts: return None return max(self.concepts, key=lambda c: c.phi) @dataclass class ConceptStructure: """Hierarchical structure of concepts and their relationships.""" root_concepts: Set[Concept] concept_hierarchy: Dict[int, List[int]] # parent_id -> [child_ids] concept_dependencies: Dict[int, Set[int]] # concept_id -> [dependent_concept_ids] integration_metrics: Dict[str, float] structural_complexity: float def __len__(self) -> int: return len(self.root_concepts) def get_all_concepts(self) -> Set[Concept]: """Get all concepts in the structure.""" all_concepts = set(self.root_concepts) # Add concepts from hierarchy for children in self.concept_hierarchy.values(): # Note: In full implementation, would need concept mapping pass return all_concepts class RepertoireCalculator: """Advanced repertoire calculation with multiple methods.""" def __init__(self, tpm: TransitionProbabilityMatrix): self.tpm = tpm self.repertoire_cache = {} self.method_registry = { 'standard': self._compute_standard_repertoire, 'bayesian': self._compute_bayesian_repertoire, 'maximum_likelihood': self._compute_ml_repertoire, 'entropy_minimization': self._compute_entropy_min_repertoire } def compute_cause_repertoire(self, system_state: SystemState, mechanism: Set[int], purview: Set[int], method: str = 'standard') -> RepertoireProfile: """Compute cause repertoire with specified method.""" cache_key = ('cause', system_state, frozenset(mechanism), frozenset(purview), method) if cache_key in self.repertoire_cache: return self.repertoire_cache[cache_key] if method not in self.method_registry: raise ValueError(f"Unknown method: {method}") distribution = self.method_registry[method](system_state, mechanism, purview, 'cause') profile = RepertoireProfile( distribution=distribution, entropy=0, information_content=0, specificity=0, determinism=0, degeneracy=0 ) self.repertoire_cache[cache_key] = profile return profile def compute_effect_repertoire(self, system_state: SystemState, mechanism: Set[int], purview: Set[int], method: str = 'standard') -> RepertoireProfile: """Compute effect repertoire with specified method.""" cache_key = ('effect', system_state, frozenset(mechanism), frozenset(purview), method) if cache_key in self.repertoire_cache: return self.repertoire_cache[cache_key] if method not in self.method_registry: raise ValueError(f"Unknown method: {method}") distribution = self.method_registry[method](system_state, mechanism, purview, 'effect') profile = RepertoireProfile( distribution=distribution, entropy=0, information_content=0, specificity=0, determinism=0, degeneracy=0 ) self.repertoire_cache[cache_key] = profile return profile def _compute_standard_repertoire(self, system_state: SystemState, mechanism: Set[int], purview: Set[int], repertoire_type: str) -> Dict[Tuple[int, ...], float]: """Standard repertoire computation using basic conditional probabilities.""" if repertoire_type == 'cause': base_dist = self.tpm.get_cause_repertoire(system_state, mechanism, purview) else: base_dist = self.tpm.get_effect_repertoire(system_state, mechanism, purview) return base_dist.distribution def _compute_bayesian_repertoire(self, system_state: SystemState, mechanism: Set[int], purview: Set[int], repertoire_type: str) -> Dict[Tuple[int, ...], float]: """Bayesian repertoire with prior incorporation.""" # Get standard repertoire as likelihood standard_dist = self._compute_standard_repertoire(system_state, mechanism, purview, repertoire_type) # Apply uniform prior prior_prob = 1.0 / len(standard_dist) if standard_dist else 1.0 # Bayesian update: posterior ∝ likelihood × prior posterior_dist = {} for state, likelihood in standard_dist.items(): posterior_dist[state] = likelihood * prior_prob # Normalize total = sum(posterior_dist.values()) if total > 0: posterior_dist = {k: v/total for k, v in posterior_dist.items()} return posterior_dist def _compute_ml_repertoire(self, system_state: SystemState, mechanism: Set[int], purview: Set[int], repertoire_type: str) -> Dict[Tuple[int, ...], float]: """Maximum likelihood repertoire.""" standard_dist = self._compute_standard_repertoire(system_state, mechanism, purview, repertoire_type) if not standard_dist: return {} # Find maximum likelihood state max_prob = max(standard_dist.values()) ml_states = [state for state, prob in standard_dist.items() if math.isclose(prob, max_prob, rel_tol=1e-10)] # Create ML repertoire (concentrated on most likely states) ml_dist = {} ml_prob = 1.0 / len(ml_states) for state in ml_states: ml_dist[state] = ml_prob return ml_dist def _compute_entropy_min_repertoire(self, system_state: SystemState, mechanism: Set[int], purview: Set[int], repertoire_type: str) -> Dict[Tuple[int, ...], float]: """Entropy-minimized repertoire.""" standard_dist = self._compute_standard_repertoire(system_state, mechanism, purview, repertoire_type) if not standard_dist: return {} # Sort by probability sorted_states = sorted(standard_dist.items(), key=lambda x: x[1], reverse=True) # Build minimum entropy distribution keeping top probabilities entropy_dist = {} total_prob = 0.0 for state, prob in sorted_states: if total_prob < 0.8: # Keep states until 80% probability mass entropy_dist[state] = prob total_prob += prob else: break # Normalize total = sum(entropy_dist.values()) if total > 0: entropy_dist = {k: v/total for k, v in entropy_dist.items()} return entropy_dist class ConceptAnalyzer: """Advanced analysis of concept structures and properties.""" def __init__(self, calculator: IITCalculator, repertoire_calc: RepertoireCalculator): self.calculator = calculator self.repertoire_calc = repertoire_calc self.analysis_cache = {} def analyze_concept_properties(self, concept: Concept, system_state: SystemState) -> Dict[str, float]: """Analyze detailed properties of a concept.""" # Get detailed repertoires cause_profile = self.repertoire_calc.compute_cause_repertoire( system_state, concept.mechanism, concept.purview ) effect_profile = self.repertoire_calc.compute_effect_repertoire( system_state, concept.mechanism, concept.purview ) properties = { 'phi': concept.phi, 'mechanism_size': len(concept.mechanism), 'purview_size': len(concept.purview), 'cause_entropy': cause_profile.entropy, 'effect_entropy': effect_profile.entropy, 'cause_specificity': cause_profile.specificity, 'effect_specificity': effect_profile.specificity, 'cause_determinism': cause_profile.determinism, 'effect_determinism': effect_profile.determinism, 'integration_ratio': self._compute_integration_ratio(concept), 'information_efficiency': self._compute_information_efficiency(concept, cause_profile, effect_profile), 'concept_purity': self._compute_concept_purity(concept), 'causal_relevance': self._compute_causal_relevance(concept, system_state) } return properties def _compute_integration_ratio(self, concept: Concept) -> float: """Compute ratio of integration to maximum possible.""" max_phi = min(len(concept.mechanism), len(concept.purview)) * math.log2(2) return concept.phi / max_phi if max_phi > 0 else 0 def _compute_information_efficiency(self, concept: Concept, cause_profile: RepertoireProfile, effect_profile: RepertoireProfile) -> float: """Compute information efficiency of the concept.""" # Efficiency = φ / (entropy_cost) entropy_cost = cause_profile.entropy + effect_profile.entropy return concept.phi / entropy_cost if entropy_cost > 0 else 0 def _compute_concept_purity(self, concept: Concept) -> float: """Compute concept purity based on repertoire structure.""" # Purity based on how concentrated the repertoires are total_states = len(concept.cause_repertoire) + len(concept.effect_repertoire) if total_states == 0: return 0 # Count high-probability states high_prob_count = 0 threshold = 0.1 # States with >10% probability for prob in concept.cause_repertoire.values(): if prob > threshold: high_prob_count += 1 for prob in concept.effect_repertoire.values(): if prob > threshold: high_prob_count += 1 purity = high_prob_count / total_states return purity def _compute_causal_relevance(self, concept: Concept, system_state: SystemState) -> float: """Compute causal relevance of the concept in current state.""" # Check if mechanism elements are active in current state mechanism_activity = 0 for elem in concept.mechanism: if elem < len(system_state.elements) and system_state.elements[elem] == 1: mechanism_activity += 1 activity_ratio = mechanism_activity / len(concept.mechanism) if concept.mechanism else 0 return activity_ratio * concept.phi def cluster_concepts(self, concepts: Set[Concept], similarity_threshold: float = 0.7) -> List[ConceptCluster]: """Cluster concepts based on similarity.""" if not concepts: return [] # Convert to list for indexing concept_list = list(concepts) n = len(concept_list) # Compute similarity matrix similarity_matrix = [[0.0] * n for _ in range(n)] for i in range(n): for j in range(i, n): similarity = self._compute_concept_similarity(concept_list[i], concept_list[j]) similarity_matrix[i][j] = similarity similarity_matrix[j][i] = similarity # Perform clustering (simplified hierarchical clustering) clusters = self._hierarchical_clustering(concept_list, similarity_matrix, similarity_threshold) return clusters def _compute_concept_similarity(self, concept1: Concept, concept2: Concept) -> float: """Compute similarity between two concepts.""" # Mechanism similarity mech_overlap = len(concept1.mechanism & concept2.mechanism) mech_union = len(concept1.mechanism | concept2.mechanism) mech_similarity = mech_overlap / mech_union if mech_union > 0 else 0 # Purview similarity purv_overlap = len(concept1.purview & concept2.purview) purv_union = len(concept1.purview | concept2.purview) purv_similarity = purv_overlap / purv_union if purv_union > 0 else 0 # Φ similarity phi_diff = abs(concept1.phi - concept2.phi) max_phi = max(concept1.phi, concept2.phi) phi_similarity = 1.0 - (phi_diff / max_phi) if max_phi > 0 else 1.0 # Weighted combination overall_similarity = (0.4 * mech_similarity + 0.4 * purv_similarity + 0.2 * phi_similarity) return overall_similarity def _hierarchical_clustering(self, concepts: List[Concept], similarity_matrix: List[List[float]], threshold: float) -> List[ConceptCluster]: """Perform hierarchical clustering.""" n = len(concepts) clusters = [] used_indices = set() cluster_id = 0 for i in range(n): if i in used_indices: continue # Start new cluster cluster_concepts = {concepts[i]} cluster_mechs = set(concepts[i].mechanism) cluster_purvs = set(concepts[i].purview) used_indices.add(i) # Find similar concepts for j in range(i + 1, n): if j in used_indices: continue if similarity_matrix[i][j] >= threshold: cluster_concepts.add(concepts[j]) cluster_mechs.update(concepts[j].mechanism) cluster_purvs.update(concepts[j].purview) used_indices.add(j) # Create cluster avg_phi = sum(c.phi for c in cluster_concepts) / len(cluster_concepts) cluster = ConceptCluster( concepts=cluster_concepts, cluster_id=cluster_id, centroid_mechanism=cluster_mechs, centroid_purview=cluster_purvs, avg_phi=avg_phi, similarity_threshold=threshold ) clusters.append(cluster) cluster_id += 1 return clusters def build_concept_hierarchy(self, concepts: Set[Concept]) -> ConceptStructure: """Build hierarchical structure of concepts.""" if not concepts: return ConceptStructure(set(), {}, {}, {}, 0.0) # Sort concepts by phi (descending) sorted_concepts = sorted(concepts, key=lambda c: c.phi, reverse=True) # Build hierarchy based on inclusion relationships hierarchy = {} dependencies = {} concept_id_map = {id(concept): i for i, concept in enumerate(sorted_concepts)} for i, concept in enumerate(sorted_concepts): hierarchy[i] = [] dependencies[i] = set() # Find parent concepts (those that contain this concept) for j, parent_concept in enumerate(sorted_concepts): if i == j: continue # Check if parent is more general if (len(parent_concept.mechanism) >= len(concept.mechanism) and len(parent_concept.purview) >= len(concept.purview) and concept.mechanism.issubset(parent_concept.mechanism) and concept.purview.issubset(parent_concept.purview) and parent_concept.phi > concept.phi): hierarchy[j].append(i) dependencies[i].add(j) # Identify root concepts (those with no parents) root_ids = [i for i in range(len(sorted_concepts)) if not dependencies[i]] root_concepts = {sorted_concepts[i] for i in root_ids} # Compute integration metrics total_phi = sum(c.phi for c in concepts) avg_phi = total_phi / len(concepts) if concepts else 0 integration_metrics = { 'total_phi': total_phi, 'avg_phi': avg_phi, 'max_phi': max(c.phi for c in concepts) if concepts else 0, 'min_phi': min(c.phi for c in concepts) if concepts else 0, 'concept_density': len(concepts) / (2 ** self.calculator.num_elements), 'hierarchy_depth': self._compute_hierarchy_depth(hierarchy) } # Structural complexity (simplified) structural_complexity = total_phi * integration_metrics['hierarchy_depth'] return ConceptStructure( root_concepts=root_concepts, concept_hierarchy=hierarchy, concept_dependencies=dependencies, integration_metrics=integration_metrics, structural_complexity=structural_complexity ) def _compute_hierarchy_depth(self, hierarchy: Dict[int, List[int]]) -> int: """Compute maximum depth of concept hierarchy.""" if not hierarchy: return 0 max_depth = 0 for parent_id, children in hierarchy.items(): if children: child_depth = 1 + self._compute_hierarchy_depth_from_node(parent_id, hierarchy) max_depth = max(max_depth, child_depth) return max_depth def _compute_hierarchy_depth_from_node(self, node_id: int, hierarchy: Dict[int, List[int]]) -> int: """Compute depth from a specific node.""" if node_id not in hierarchy or not hierarchy[node_id]: return 0 max_child_depth = 0 for child_id in hierarchy[node_id]: child_depth = 1 + self._compute_hierarchy_depth_from_node(child_id, hierarchy) max_child_depth = max(max_child_depth, child_depth) return max_child_depth class ConceptVisualizer: """Visualization tools for concept structures.""" def __init__(self, analyzer: ConceptAnalyzer): self.analyzer = analyzer def generate_concept_network_data(self, structure: ConceptStructure) -> Dict[str, Any]: """Generate data for concept network visualization.""" nodes = [] edges = [] # Create nodes concept_id_map = {} node_id = 0 for concept in structure.get_all_concepts(): concept_id_map[id(concept)] = node_id properties = self.analyzer.analyze_concept_properties( concept, SystemState((1, 1, 1), 1.0) # Use default test state ) node_data = { 'id': node_id, 'label': f"M{len(concept.mechanism)}→P{len(concept.purview)}", 'phi': concept.phi, 'mechanism_size': len(concept.mechanism), 'purview_size': len(concept.purview), 'integration_ratio': properties['integration_ratio'], 'information_efficiency': properties['information_efficiency'] } nodes.append(node_data) node_id += 1 # Create edges based on hierarchy for parent_id, child_ids in structure.concept_hierarchy.items(): for child_id in child_ids: edge_data = { 'source': parent_id, 'target': child_id, 'type': 'hierarchy' } edges.append(edge_data) return { 'nodes': nodes, 'edges': edges, 'integration_metrics': structure.integration_metrics } def generate_cluster_visualization_data(self, clusters: List[ConceptCluster]) -> Dict[str, Any]: """Generate data for cluster visualization.""" cluster_data = [] for cluster in clusters: cluster_info = { 'cluster_id': cluster.cluster_id, 'size': len(cluster.concepts), 'avg_phi': cluster.avg_phi, 'centroid_mechanism_size': len(cluster.centroid_mechanism), 'centroid_purview_size': len(cluster.centroid_purview), 'concepts': [] } for concept in cluster.concepts: concept_info = { 'phi': concept.phi, 'mechanism': list(concept.mechanism), 'purview': list(concept.purview) } cluster_info['concepts'].append(concept_info) cluster_data.append(cluster_info) return {'clusters': cluster_data} def benchmark_concept_analysis(num_elements: int = 3) -> Dict[str, Any]: """Benchmark concept analysis performance.""" print(f"Benchmarking Concept Analysis for {num_elements} elements") # Setup test system calculator = IITCalculator(num_elements) setup_test_system(calculator) repertoire_calc = RepertoireCalculator(calculator.tpm) analyzer = ConceptAnalyzer(calculator, repertoire_calc) # Test state test_state = SystemState(tuple([1] * num_elements), 1.0) import time results = {} # Generate concepts start_time = time.time() concepts = calculator.compute_concepts(test_state) results['concept_generation_time'] = time.time() - start_time results['num_concepts'] = len(concepts.concepts) if concepts.concepts: # Analyze concept properties start_time = time.time() sample_concept = list(concepts.concepts)[0] properties = analyzer.analyze_concept_properties(sample_concept, test_state) results['property_analysis_time'] = time.time() - start_time results['sample_properties'] = properties # Cluster concepts start_time = time.time() clusters = analyzer.cluster_concepts(concepts.concepts, similarity_threshold=0.5) results['clustering_time'] = time.time() - start_time results['num_clusters'] = len(clusters) # Build hierarchy start_time = time.time() structure = analyzer.build_concept_hierarchy(concepts.concepts) results['hierarchy_time'] = time.time() - start_time results['hierarchy_depth'] = structure.integration_metrics.get('hierarchy_depth', 0) # Generate visualization data start_time = time.time() visualizer = ConceptVisualizer(analyzer) network_data = visualizer.generate_concept_network_data(structure) results['visualization_time'] = time.time() - start_time results['network_nodes'] = len(network_data['nodes']) return results def setup_test_system(calculator: IITCalculator): """Setup test system for concept analysis.""" # Create deterministic and stochastic transitions states = [] for bits in itertools.product([0, 1], repeat=calculator.num_elements): prob = 1.0 / (2 ** calculator.num_elements) states.append(SystemState(bits, prob)) # Add transitions with varying complexity for i, from_state in enumerate(states): # Primary deterministic transition to_state = states[(i + 1) % len(states)] calculator.tpm.add_transition(from_state, to_state, 0.6) # Secondary stochastic transitions for j, other_state in enumerate(states): if i != j: # Vary probability based on Hamming distance hamming_dist = sum(a != b for a, b in zip(from_state.elements, other_state.elements)) prob = 0.4 / (len(states) - 1) * (1.0 / (1 + hamming_dist)) calculator.tpm.add_transition(from_state, other_state, prob) if __name__ == "__main__": print("Concept Structure and Repertoire Models") print("=" * 45) # Test with 3-element system calculator = IITCalculator(num_elements=3) setup_test_system(calculator) repertoire_calc = RepertoireCalculator(calculator.tpm) analyzer = ConceptAnalyzer(calculator, repertoire_calc) # Test state test_state = SystemState((1, 0, 1), 1.0) print(f"Analyzing concepts for state: {test_state.elements}") # Generate concepts concepts = calculator.compute_concepts(test_state) print(f"Number of concepts found: {len(concepts.concepts)}") print(f"Total Φ: {concepts.total_phi:.6f}") print(f"Normalized Φ: {concepts.normalized_phi:.6f}") if concepts.concepts: # Analyze sample concept sample_concept = list(concepts.concepts)[0] properties = analyzer.analyze_concept_properties(sample_concept, test_state) print(f"\nSample Concept Analysis:") print(f" Mechanism: {sample_concept.mechanism}") print(f" Purview: {sample_concept.purview}") print(f" Φ: {sample_concept.phi:.6f}") print(f" Integration Ratio: {properties['integration_ratio']:.6f}") print(f" Information Efficiency: {properties['information_efficiency']:.6f}") print(f" Concept Purity: {properties['concept_purity']:.6f}") # Cluster concepts clusters = analyzer.cluster_concepts(concepts.concepts, similarity_threshold=0.3) print(f"\nConcept Clustering:") print(f" Number of clusters: {len(clusters)}") for i, cluster in enumerate(clusters): print(f" Cluster {i}: {len(cluster.concepts)} concepts, avg Φ: {cluster.avg_phi:.6f}") # Build hierarchy structure = analyzer.build_concept_hierarchy(concepts.concepts) print(f"\nConcept Hierarchy:") print(f" Root concepts: {len(structure.root_concepts)}") print(f" Hierarchy depth: {structure.integration_metrics.get('hierarchy_depth', 0)}") print(f" Structural complexity: {structure.structural_complexity:.6f}") # Test different repertoire computation methods print(f"\nRepertoire Computation Methods:") methods = ['standard', 'bayesian', 'maximum_likelihood', 'entropy_minimization'] for method in methods: cause_profile = repertoire_calc.compute_cause_repertoire( test_state, sample_concept.mechanism, sample_concept.purview, method ) print(f" {method}:") print(f" Entropy: {cause_profile.entropy:.6f}") print(f" Specificity: {cause_profile.specificity:.6f}") print(f" Determinism: {cause_profile.determinism:.6f}") # Benchmark print(f"\nBenchmarking...") benchmark_results = benchmark_concept_analysis(3) for metric, value in benchmark_results.items(): if 'time' in metric: print(f"{metric}: {value:.6f}s") else: print(f"{metric}: {value}") print("\nConcept structure analysis initialized successfully!")