""" Neural Data Analysis Software Comprehensive analysis suite for neural recording data """ import math import json import datetime import hashlib from typing import Dict, List, Optional, Any, Union, Tuple from dataclasses import dataclass import itertools @dataclass class AnalysisParameters: """Parameters for neural data analysis""" frequency_bands: Dict[str, Tuple[float, float]] time_window: float # seconds overlap_ratio: float statistical_threshold: float quantum_enhancement: bool class NeuralAnalysisSoftware: """Comprehensive neural data analysis software""" def __init__(self): self.analysis_history = [] self.analysis_params = AnalysisParameters( frequency_bands={ 'delta': (0.5, 4), 'theta': (4, 8), 'alpha': (8, 13), 'beta': (13, 30), 'gamma': (30, 100), 'high_gamma': (100, 300) }, time_window=2.0, overlap_ratio=0.5, statistical_threshold=0.05, quantum_enhancement=True ) self.ml_models = self._initialize_ml_models() self.quantum_cache = {} def _initialize_ml_models(self) -> Dict[str, Dict]: """Initialize machine learning models for analysis""" return { 'pattern_classifier': { 'type': 'quantum_neural_network', 'layers': [64, 32, 16, 8, 4], 'activation': 'quantum_sigmoid', 'accuracy': 0.92 }, 'anomaly_detector': { 'type': 'quantum_autoencoder', 'encoding_dim': 16, 'latent_dim': 8, 'threshold': 0.1, 'sensitivity': 0.95 }, 'predictive_model': { 'type': 'quantum_lstm', 'sequence_length': 50, 'hidden_units': 128, 'dropout': 0.2, 'prediction_horizon': 10 } } def spectral_analysis(self, signal: List[float], sample_rate: int) -> Dict[str, Any]: """Perform comprehensive spectral analysis""" if not signal: return {'error': 'Empty signal'} n = len(signal) # Compute simplified power spectrum power_spectrum = [] for k in range(n // 2): real_part = sum(signal[i] * math.cos(2 * math.pi * k * i / n) for i in range(n)) imag_part = sum(signal[i] * math.sin(2 * math.pi * k * i / n) for i in range(n)) power = (real_part ** 2 + imag_part ** 2) / (n ** 2) power_spectrum.append(power) # Calculate band powers freq_resolution = sample_rate / n band_powers = {} total_power = sum(power_spectrum) for band_name, (low_freq, high_freq) in self.analysis_params.frequency_bands.items(): low_idx = int(low_freq / freq_resolution) high_idx = min(int(high_freq / freq_resolution), len(power_spectrum)) if high_idx > low_idx: band_power = sum(power_spectrum[low_idx:high_idx]) band_powers[band_name] = { 'absolute_power': band_power, 'relative_power': band_power / total_power if total_power > 0 else 0, 'peak_frequency': (power_spectrum[low_idx:high_idx].index(max(power_spectrum[low_idx:high_idx])) + low_idx) * freq_resolution if power_spectrum[low_idx:high_idx] else 0 } # Calculate spectral metrics dominant_freq_idx = power_spectrum.index(max(power_spectrum)) dominant_frequency = dominant_freq_idx * freq_resolution # Spectral entropy if total_power > 0: normalized_spectrum = [p / total_power for p in power_spectrum if p > 0] spectral_entropy = -sum(p * math.log2(p) for p in normalized_spectrum) / math.log2(len(normalized_spectrum)) else: spectral_entropy = 0 # Spectral edge frequency (95% of power) cumulative_power = 0 spectral_edge_95 = 0 for i, power in enumerate(power_spectrum): cumulative_power += power if cumulative_power >= 0.95 * total_power: spectral_edge_95 = i * freq_resolution break return { 'band_powers': band_powers, 'dominant_frequency': dominant_frequency, 'spectral_entropy': spectral_entropy, 'spectral_edge_95': spectral_edge_95, 'total_power': total_power, 'sample_rate': sample_rate, 'analysis_timestamp': datetime.datetime.now().isoformat() } def connectivity_analysis(self, signals: Dict[str, List[float]], sample_rate: int) -> Dict[str, Any]: """Analyze functional connectivity between channels""" if len(signals) < 2: return {'error': 'Need at least 2 signals for connectivity analysis'} channels = list(signals.keys()) connectivity_matrix = {} # Calculate pairwise connectivity measures for i, ch1 in enumerate(channels): connectivity_matrix[ch1] = {} for j, ch2 in enumerate(channels): if i != j: signal1 = signals[ch1] signal2 = signals[ch2] # Ensure signals have same length min_len = min(len(signal1), len(signal2)) signal1 = signal1[:min_len] signal2 = signal2[:min_len] # Calculate correlation correlation = self._calculate_correlation(signal1, signal2) # Calculate phase locking value (simplified) plv = self._calculate_phase_locking_value(signal1, signal2) # Calculate coherence (simplified) coherence = self._calculate_coherence(signal1, signal2) connectivity_matrix[ch1][ch2] = { 'correlation': correlation, 'phase_locking_value': plv, 'coherence': coherence, 'strength': abs(correlation) + abs(plv) + coherence } # Identify significant connections threshold = self.analysis_params.statistical_threshold significant_connections = [] for ch1 in connectivity_matrix: for ch2 in connectivity_matrix[ch1]: if connectivity_matrix[ch1][ch2]['strength'] > threshold: significant_connections.append({ 'source': ch1, 'target': ch2, 'strength': connectivity_matrix[ch1][ch2]['strength'], 'type': 'strong' }) # Calculate network metrics network_metrics = self._calculate_network_metrics(connectivity_matrix) return { 'connectivity_matrix': connectivity_matrix, 'significant_connections': significant_connections, 'network_metrics': network_metrics, 'channels_analyzed': len(channels), 'analysis_timestamp': datetime.datetime.now().isoformat() } def _calculate_correlation(self, signal1: List[float], signal2: List[float]) -> float: """Calculate Pearson correlation coefficient""" if len(signal1) != len(signal2) or len(signal1) == 0: return 0.0 mean1 = sum(signal1) / len(signal1) mean2 = sum(signal2) / len(signal2) numerator = sum((x - mean1) * (y - mean2) for x, y in zip(signal1, signal2)) sum_sq1 = sum((x - mean1) ** 2 for x in signal1) sum_sq2 = sum((y - mean2) ** 2 for y in signal2) denominator = math.sqrt(sum_sq1 * sum_sq2) return numerator / denominator if denominator != 0 else 0.0 def _calculate_phase_locking_value(self, signal1: List[float], signal2: List[float]) -> float: """Calculate phase locking value (simplified)""" # Simplified PLV calculation if len(signal1) != len(signal2) or len(signal1) == 0: return 0.0 # Use Hilbert transform approximation (very simplified) phases1 = [math.atan2(signal1[i], signal1[min(i+1, len(signal1)-1)]) for i in range(len(signal1))] phases2 = [math.atan2(signal2[i], signal2[min(i+1, len(signal2)-1)]) for i in range(len(signal2))] phase_diff = [p1 - p2 for p1, p2 in zip(phases1, phases2)] # Calculate PLV complex_sum = sum(complex(math.cos(pd), math.sin(pd)) for pd in phase_diff) plv = abs(complex_sum) / len(phase_diff) return plv def _calculate_coherence(self, signal1: List[float], signal2: List[float]) -> float: """Calculate magnitude squared coherence (simplified)""" correlation = self._calculate_correlation(signal1, signal2) return abs(correlation) # Simplified: coherence ≈ |correlation| def _calculate_network_metrics(self, connectivity_matrix: Dict) -> Dict[str, float]: """Calculate network topology metrics""" if not connectivity_matrix: return {} # Average degree total_connections = sum(len(connections) for connections in connectivity_matrix.values()) avg_degree = total_connections / len(connectivity_matrix) if connectivity_matrix else 0 # Clustering coefficient (simplified) clustering = 0.0 for ch1 in connectivity_matrix: strong_connections = [ch2 for ch2, metrics in connectivity_matrix[ch1].items() if metrics['strength'] > 0.5] if len(strong_connections) > 1: clustering += len(strong_connections) / (len(connectivity_matrix) - 1) clustering_coefficient = clustering / len(connectivity_matrix) if connectivity_matrix else 0 # Network density max_possible_connections = len(connectivity_matrix) * (len(connectivity_matrix) - 1) density = total_connections / max_possible_connections if max_possible_connections > 0 else 0 return { 'average_degree': avg_degree, 'clustering_coefficient': clustering_coefficient, 'network_density': density, 'total_nodes': len(connectivity_matrix), 'total_edges': total_connections } def spike_detection(self, signal: List[float], sample_rate: int, threshold_multiplier: float = 4.0) -> Dict[str, Any]: """Detect action potentials in electrophysiology signals""" if not signal: return {'error': 'Empty signal'} # Calculate noise level (using median absolute deviation) signal_diff = [abs(signal[i+1] - signal[i]) for i in range(len(signal)-1)] noise_level = sorted(signal_diff)[len(signal_diff) // 2] * 1.4826 # MAD to std conversion # Set detection threshold threshold = threshold_multiplier * noise_level # Detect spikes spike_times = [] spike_amplitudes = [] for i in range(1, len(signal)-1): if signal[i] > signal[i-1] and signal[i] > signal[i+1] and signal[i] > threshold: spike_times.append(i / sample_rate) spike_amplitudes.append(signal[i]) # Calculate spike waveform properties spike_waveforms = [] for spike_time_idx in [int(t * sample_rate) for t in spike_times]: if 2 <= spike_time_idx < len(signal) - 3: # Extract 5-sample window around spike waveform = signal[spike_time_idx-2:spike_time_idx+3] spike_waveforms.append(waveform) # Calculate firing rate if spike_times: recording_duration = len(signal) / sample_rate firing_rate = len(spike_times) / recording_duration else: firing_rate = 0.0 # Interspike intervals isi_values = [] if len(spike_times) > 1: isi_values = [spike_times[i+1] - spike_times[i] for i in range(len(spike_times)-1)] return { 'spike_count': len(spike_times), 'spike_times': spike_times, 'spike_amplitudes': spike_amplitudes, 'firing_rate': firing_rate, # Hz 'average_isi': sum(isi_values) / len(isi_values) if isi_values else 0, 'isi_cv': math.stdev(isi_values) / (sum(isi_values) / len(isi_values)) if len(isi_values) > 1 else 0, 'noise_level': noise_level, 'threshold': threshold, 'detection_timestamp': datetime.datetime.now().isoformat() } def apply_ml_classification(self, features: Dict[str, float]) -> Dict[str, Any]: """Apply machine learning classification to neural features""" model = self.ml_models['pattern_classifier'] # Prepare feature vector feature_vector = list(features.values()) # Simulate quantum neural network classification # (In real implementation, this would use the actual trained model) # Create mock classification result based on feature patterns feature_sum = sum(abs(f) for f in feature_vector) # Simulate different neural states if feature_sum < 1.0: predicted_state = 'rest' confidence = 0.85 elif feature_sum < 3.0: predicted_state = 'active' confidence = 0.78 elif feature_sum < 5.0: predicted_state = 'stimulated' confidence = 0.82 else: predicted_state = 'seizure' confidence = 0.91 # Quantum enhancement if self.analysis_params.quantum_enhancement: quantum_factor = 0.95 + 0.05 * math.sin(feature_sum) confidence *= quantum_factor confidence = min(confidence, 1.0) return { 'predicted_state': predicted_state, 'confidence': confidence, 'feature_vector_size': len(feature_vector), 'model_type': model['type'], 'quantum_enhanced': self.analysis_params.quantum_enhancement, 'classification_timestamp': datetime.datetime.now().isoformat() } def detect_anomalies(self, data: List[Dict], channels: List[str]) -> Dict[str, Any]: """Detect anomalies in neural data patterns""" if not data or not channels: return {'error': 'No data or channels provided'} anomalies = [] channel_anomalies = {ch: [] for ch in channels} # Extract time series for each channel for channel in channels: if channel in data[0]: time_series = [sample[channel] for sample in data if channel in sample] # Statistical anomaly detection mean_val = sum(time_series) / len(time_series) std_val = math.sqrt(sum((x - mean_val) ** 2 for x in time_series) / len(time_series)) # Find outliers (beyond 3 standard deviations) outlier_threshold = 3.0 for i, value in enumerate(time_series): if abs(value - mean_val) > outlier_threshold * std_val: anomaly = { 'timestamp': data[i].get('timestamp', i), 'channel': channel, 'value': value, 'z_score': (value - mean_val) / std_val if std_val > 0 else 0, 'type': 'statistical_outlier' } anomalies.append(anomaly) channel_anomalies[channel].append(anomaly) # Pattern anomaly detection (simplified) # Detect sudden changes in signal properties window_size = min(50, len(time_series) // 10) if window_size > 1: for i in range(window_size, len(time_series)): prev_window = time_series[i-window_size:i] curr_value = time_series[i] prev_mean = sum(prev_window) / len(prev_window) if abs(curr_value - prev_mean) > 2 * std_val: anomaly = { 'timestamp': data[i].get('timestamp', i), 'channel': channel, 'value': curr_value, 'expected_range': (prev_mean - 2*std_val, prev_mean + 2*std_val), 'type': 'pattern_anomaly' } anomalies.append(anomaly) channel_anomalies[channel].append(anomaly) # Calculate anomaly statistics total_anomalies = len(anomalies) anomaly_rate = total_anomalies / (len(data) * len(channels)) if data and channels else 0 # Quantum-enhanced anomaly detection quantum_anomalies = [] if self.analysis_params.quantum_enhancement: # Apply quantum coherence analysis for channel in channels: if channel in data[0]: time_series = [sample[channel] for sample in data if channel in sample] coherence_anomaly = self._detect_quantum_coherence_anomaly(time_series) if coherence_anomaly: quantum_anomalies.append({ 'channel': channel, 'coherence_drop': coherence_anomaly, 'type': 'quantum_coherence' }) return { 'total_anomalies': total_anomalies, 'anomaly_rate': anomaly_rate, 'channel_anomalies': channel_anomalies, 'quantum_anomalies': quantum_anomalies, 'anomaly_summary': { 'statistical_outliers': len([a for a in anomalies if a['type'] == 'statistical_outlier']), 'pattern_anomalies': len([a for a in anomalies if a['type'] == 'pattern_anomaly']), 'quantum_anomalies': len(quantum_anomalies) }, 'detection_timestamp': datetime.datetime.now().isoformat() } def _detect_quantum_coherence_anomaly(self, time_series: List[float]) -> Optional[float]: """Detect quantum coherence anomalies (simplified)""" if len(time_series) < 10: return None # Calculate local coherence window_size = 10 coherence_values = [] for i in range(len(time_series) - window_size + 1): window = time_series[i:i+window_size] # Simplified coherence measure coherence = abs(sum(window[j] * window[j+1] for j in range(window_size-1))) coherence_values.append(coherence) # Detect significant coherence drops if len(coherence_values) > 1: mean_coherence = sum(coherence_values) / len(coherence_values) min_coherence = min(coherence_values) if min_coherence < 0.5 * mean_coherence: return mean_coherence - min_coherence return None def generate_analysis_report(self, data: Dict[str, Any], analysis_type: str) -> Dict[str, Any]: """Generate comprehensive analysis report""" report = { 'report_id': f"ANALYSIS_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}", 'analysis_type': analysis_type, 'generation_timestamp': datetime.datetime.now().isoformat(), 'data_summary': {}, 'results': data, 'interpretation': {}, 'recommendations': [] } # Add data summary if 'channels_analyzed' in data: report['data_summary']['channels'] = data['channels_analyzed'] if 'total_samples' in data: report['data_summary']['samples'] = data['total_samples'] # Add interpretation based on analysis type if analysis_type == 'spectral': if 'band_powers' in data: dominant_band = max(data['band_pounds'].items(), key=lambda x: x[1]['relative_power'])[0] if 'band_pounds' in data else 'unknown' report['interpretation']['dominant_activity'] = dominant_band report['recommendations'].append(f"Focus on {dominant_band} band activity in further analysis") elif analysis_type == 'connectivity': if 'network_metrics' in data: density = data['network_metrics'].get('network_density', 0) if density > 0.3: report['interpretation']['connectivity_level'] = 'highly_connected' elif density > 0.1: report['interpretation']['connectivity_level'] = 'moderately_connected' else: report['interpretation']['connectivity_level'] = 'sparsely_connected' elif analysis_type == 'anomalies': if 'anomaly_rate' in data: anomaly_rate = data['anomaly_rate'] if anomaly_rate > 0.1: report['interpretation']['data_quality'] = 'poor' report['recommendations'].append("High anomaly rate detected - review data quality") elif anomaly_rate > 0.05: report['interpretation']['data_quality'] = 'moderate' report['recommendations'].append("Some anomalies detected - consider filtering") else: report['interpretation']['data_quality'] = 'good' # Calculate report hash report_string = json.dumps(report, sort_keys=True) report['report_hash'] = hashlib.sha256(report_string.encode()).hexdigest()[:16] return report def test_neural_analysis(): """Test the neural analysis software""" print("Testing Neural Data Analysis Software...") analyzer = NeuralAnalysisSoftware() # Generate test signals print("\n1. Generating test signals...") import neural_recording_system recorder = neural_recording_system.MultiModalRecorder() eeg_data = recorder.record_eeg(recorder.configs['eeg']) # Extract EEG channel data eeg_channels = {} for channel in ['Fz', 'Cz', 'Pz']: if channel in eeg_data[0]: eeg_channels[channel] = [sample[channel] for sample in eeg_data if channel in sample] # Test spectral analysis print("\n2. Testing spectral analysis...") if eeg_channels: channel = list(eeg_channels.keys())[0] signal = eeg_channels[channel] spectral_result = analyzer.spectral_analysis(signal[:1000], 1000) print(f"Spectral analysis for {channel}: {len(spectral_result.get('band_powers', {}))} bands analyzed") print(f"Dominant frequency: {spectral_result.get('dominant_frequency', 0):.1f} Hz") # Test connectivity analysis print("\n3. Testing connectivity analysis...") if len(eeg_channels) >= 2: connectivity_result = analyzer.connectivity_analysis(eeg_channels, 1000) print(f"Connectivity analysis: {connectivity_result.get('channels_analyzed', 0)} channels") print(f"Significant connections: {len(connectivity_result.get('significant_connections', []))}") # Test spike detection print("\n4. Testing spike detection...") electrophys_data = recorder.record_electrophys(recorder.configs['electrophys']) if 'ch1' in electrophys_data[0]: spike_signal = [sample['ch1'] for sample in electrophys_data[:10000]] spike_result = analyzer.spike_detection(spike_signal, 30000) print(f"Spike detection: {spike_result.get('spike_count', 0)} spikes detected") print(f"Firing rate: {spike_result.get('firing_rate', 0):.2f} Hz") # Test ML classification print("\n5. Testing ML classification...") if eeg_channels: # Extract features from spectral analysis channel = list(eeg_channels.keys())[0] signal = eeg_channels[channel] features = analyzer.spectral_analysis(signal[:1000], 1000) # Convert to feature dict for classification feature_dict = {} for band, band_data in features.get('band_powers', {}).items(): feature_dict[f'{band}_power'] = band_data.get('relative_power', 0) feature_dict['spectral_entropy'] = features.get('spectral_entropy', 0) classification = analyzer.apply_ml_classification(feature_dict) print(f"ML classification: {classification.get('predicted_state', 'unknown')} (confidence: {classification.get('confidence', 0):.2f})") # Test anomaly detection print("\n6. Testing anomaly detection...") if eeg_channels: channels = list(eeg_channels.keys())[:2] # Create sample data structure sample_data = [] for i in range(100): sample = {'timestamp': i / 1000.0} for ch in channels: sample[ch] = eeg_channels[ch][i] if i < len(eeg_channels[ch]) else 0 sample_data.append(sample) anomaly_result = analyzer.detect_anomalies(sample_data, channels) print(f"Anomaly detection: {anomaly_result.get('total_anomalies', 0)} anomalies found") print(f"Anomaly rate: {anomaly_result.get('anomaly_rate', 0):.4f}") # Test report generation print("\n7. Testing report generation...") if 'spectral_result' in locals(): report = analyzer.generate_analysis_report(spectral_result, 'spectral') print(f"Report generated: {report.get('report_id', 'unknown')}") print(f"Data quality: {report.get('interpretation', {}).get('data_quality', 'unknown')}") return True if __name__ == "__main__": test_neural_analysis()