""" AI Analysis Engine with LLM Integration Provides intelligent code analysis using language models """ import json import hashlib import datetime from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass from enum import Enum class AnalysisType(Enum): SECURITY = "security" PERFORMANCE = "performance" CODE_QUALITY = "code_quality" ARCHITECTURE = "architecture" DOCUMENTATION = "documentation" class ConfidenceLevel(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" VERY_HIGH = "very_high" @dataclass class AIInsight: """Structure for AI-generated insights""" category: AnalysisType confidence: ConfidenceLevel message: str suggestion: str code_snippet: Optional[str] line_range: Optional[Tuple[int, int]] impact_score: float # 0.0 to 1.0 @dataclass class AnalysisContext: """Context for AI analysis""" file_path: str language: str code_content: str pr_title: str pr_description: str changed_lines: List[int] repository_context: Dict[str, Any] class MockLLMProvider: """Mock LLM provider for demonstration""" def __init__(self): self.response_patterns = { "security": [ "Potential security vulnerability detected in authentication logic", "Input validation missing for user-provided data", "Sensitive information exposed in error messages" ], "performance": [ "Inefficient algorithm detected - consider optimizing", "Database query could be optimized with proper indexing", "Memory leak potential in resource management" ], "code_quality": [ "Code duplication detected - consider refactoring", "Function complexity exceeds recommended limits", "Missing error handling for edge cases" ], "architecture": [ "Tight coupling between components detected", "Single responsibility principle violation", "Missing abstraction layer for database operations" ] } def generate_insight(self, prompt: str, context: AnalysisContext) -> str: """Generate AI insight based on prompt and context""" # Simulate LLM processing content_hash = hashlib.md5((prompt + context.code_content).encode()).hexdigest() # Select appropriate response pattern if "security" in prompt.lower(): pattern_list = self.response_patterns["security"] elif "performance" in prompt.lower(): pattern_list = self.response_patterns["performance"] elif "architecture" in prompt.lower(): pattern_list = self.response_patterns["architecture"] else: pattern_list = self.response_patterns["code_quality"] # Select response based on content hash selected_index = int(content_hash[:8], 16) % len(pattern_list) return pattern_list[selected_index] class AIAnalysisEngine: """Main AI analysis engine""" def __init__(self, llm_provider: MockLLMProvider): self.llm_provider = llm_provider self.analysis_cache = {} self.insight_history = [] def analyze_code(self, context: AnalysisContext) -> List[AIInsight]: """Perform comprehensive AI analysis on code""" insights = [] # Check cache first cache_key = self._generate_cache_key(context) if cache_key in self.analysis_cache: return self.analysis_cache[cache_key] # Security analysis security_insights = self._analyze_security(context) insights.extend(security_insights) # Performance analysis performance_insights = self._analyze_performance(context) insights.extend(performance_insights) # Code quality analysis quality_insights = self._analyze_code_quality(context) insights.extend(quality_insights) # Architecture analysis architecture_insights = self._analyze_architecture(context) insights.extend(architecture_insights) # Cache results self.analysis_cache[cache_key] = insights # Track insights self.insight_history.append({ "timestamp": datetime.datetime.now().isoformat(), "file_path": context.file_path, "insights_count": len(insights), "categories": list(set(insight.category for insight in insights)) }) return insights def _analyze_security(self, context: AnalysisContext) -> List[AIInsight]: """AI-powered security analysis""" insights = [] # Generate security-focused prompts security_prompts = [ f"Analyze this {context.language} code for security vulnerabilities", "Check for authentication and authorization issues", "Look for input validation problems", "Identify potential data exposure risks" ] for prompt in security_prompts: response = self.llm_provider.generate_insight(prompt, context) if response and "security" in response.lower(): confidence = self._calculate_confidence(prompt, context) impact = self._calculate_impact_score("security", response) insight = AIInsight( category=AnalysisType.SECURITY, confidence=confidence, message=response, suggestion=self._generate_security_suggestion(response), code_snippet=self._extract_relevant_code(context, response), line_range=self._estimate_line_range(context, response), impact_score=impact ) insights.append(insight) return insights def _analyze_performance(self, context: AnalysisContext) -> List[AIInsight]: """AI-powered performance analysis""" insights = [] performance_prompts = [ f"Identify performance bottlenecks in this {context.language} code", "Look for inefficient algorithms or data structures", "Check for resource management issues", "Analyze database query efficiency" ] for prompt in performance_prompts: response = self.llm_provider.generate_insight(prompt, context) if response and any(keyword in response.lower() for keyword in ["performance", "efficient", "optimize", "bottleneck"]): confidence = self._calculate_confidence(prompt, context) impact = self._calculate_impact_score("performance", response) insight = AIInsight( category=AnalysisType.PERFORMANCE, confidence=confidence, message=response, suggestion=self._generate_performance_suggestion(response), code_snippet=self._extract_relevant_code(context, response), line_range=self._estimate_line_range(context, response), impact_score=impact ) insights.append(insight) return insights def _analyze_code_quality(self, context: AnalysisContext) -> List[AIInsight]: """AI-powered code quality analysis""" insights = [] quality_prompts = [ f"Assess code quality and maintainability in this {context.language} code", "Look for code duplication and refactoring opportunities", "Check for proper error handling and edge cases", "Evaluate naming conventions and code readability" ] for prompt in quality_prompts: response = self.llm_provider.generate_insight(prompt, context) if response and any(keyword in response.lower() for keyword in ["quality", "refactor", "duplicate", "readability"]): confidence = self._calculate_confidence(prompt, context) impact = self._calculate_impact_score("quality", response) insight = AIInsight( category=AnalysisType.CODE_QUALITY, confidence=confidence, message=response, suggestion=self._generate_quality_suggestion(response), code_snippet=self._extract_relevant_code(context, response), line_range=self._estimate_line_range(context, response), impact_score=impact ) insights.append(insight) return insights def _analyze_architecture(self, context: AnalysisContext) -> List[AIInsight]: """AI-powered architecture analysis""" insights = [] architecture_prompts = [ f"Evaluate architectural patterns in this {context.language} code", "Check for SOLID principles compliance", "Look for tight coupling or low cohesion issues", "Assess separation of concerns" ] for prompt in architecture_prompts: response = self.llm_provider.generate_insight(prompt, context) if response and any(keyword in response.lower() for keyword in ["architecture", "coupling", "design", "pattern"]): confidence = self._calculate_confidence(prompt, context) impact = self._calculate_impact_score("architecture", response) insight = AIInsight( category=AnalysisType.ARCHITECTURE, confidence=confidence, message=response, suggestion=self._generate_architecture_suggestion(response), code_snippet=self._extract_relevant_code(context, response), line_range=self._estimate_line_range(context, response), impact_score=impact ) insights.append(insight) return insights def _calculate_confidence(self, prompt: str, context: AnalysisContext) -> ConfidenceLevel: """Calculate confidence level for AI insight""" # Simple confidence calculation based on context richness score = 0 if context.pr_title and len(context.pr_title) > 10: score += 1 if context.pr_description and len(context.pr_description) > 50: score += 1 if len(context.code_content) > 100: score += 1 if context.repository_context: score += 1 if score >= 3: return ConfidenceLevel.HIGH elif score >= 2: return ConfidenceLevel.MEDIUM else: return ConfidenceLevel.LOW def _calculate_impact_score(self, category: str, message: str) -> float: """Calculate impact score (0.0 to 1.0)""" base_scores = { "security": 0.8, "performance": 0.6, "quality": 0.4, "architecture": 0.5 } base_score = base_scores.get(category, 0.5) # Adjust based on message content high_impact_keywords = ["critical", "vulnerability", "security", "major", "severe"] medium_impact_keywords = ["improve", "optimize", "consider", "recommend"] if any(keyword in message.lower() for keyword in high_impact_keywords): return min(1.0, base_score + 0.2) elif any(keyword in message.lower() for keyword in medium_impact_keywords): return base_score return max(0.1, base_score - 0.1) def _generate_cache_key(self, context: AnalysisContext) -> str: """Generate cache key for analysis context""" content = f"{context.file_path}:{hashlib.md5(context.code_content.encode()).hexdigest()}" return hashlib.sha256(content.encode()).hexdigest() def _extract_relevant_code(self, context: AnalysisContext, message: str) -> Optional[str]: """Extract relevant code snippet based on message""" lines = context.code_content.splitlines() # Simple extraction - return first few lines if len(lines) > 0: return "\n".join(lines[:min(5, len(lines))]) return None def _estimate_line_range(self, context: AnalysisContext, message: str) -> Optional[Tuple[int, int]]: """Estimate line range for the insight""" lines = context.code_content.splitlines() if len(lines) > 0: return (1, min(10, len(lines))) return None def _generate_security_suggestion(self, message: str) -> str: """Generate security improvement suggestion""" if "authentication" in message.lower(): return "Implement proper authentication with secure password handling" elif "validation" in message.lower(): return "Add input validation and sanitization for all user inputs" elif "exposure" in message.lower(): return "Remove sensitive information from error messages and logs" else: return "Review security best practices and implement proper safeguards" def _generate_performance_suggestion(self, message: str) -> str: """Generate performance improvement suggestion""" if "algorithm" in message.lower(): return "Consider using more efficient algorithms or data structures" elif "database" in message.lower(): return "Optimize database queries and add proper indexing" elif "memory" in message.lower(): return "Review memory management and implement proper resource cleanup" else: return "Profile the code and identify performance bottlenecks" def _generate_quality_suggestion(self, message: str) -> str: """Generate code quality improvement suggestion""" if "duplicate" in message.lower(): return "Extract common code into reusable functions or classes" elif "error" in message.lower(): return "Add proper error handling and edge case coverage" elif "naming" in message.lower(): return "Improve variable and function naming for better readability" else: return "Refactor code to improve maintainability and readability" def _generate_architecture_suggestion(self, message: str) -> str: """Generate architecture improvement suggestion""" if "coupling" in message.lower(): return "Reduce coupling by implementing dependency injection or interfaces" elif "design" in message.lower(): return "Apply appropriate design patterns for better code organization" elif "separation" in message.lower(): return "Improve separation of concerns by splitting responsibilities" else: return "Review architectural principles and refactor accordingly" # Test implementation def test_ai_analysis_engine(): """Test the AI analysis engine""" llm_provider = MockLLMProvider() engine = AIAnalysisEngine(llm_provider) # Create test context context = AnalysisContext( file_path="test.py", language="python", code_content=""" def authenticate_user(username, password): secret_key = "hardcoded_secret" if username == "admin": return True return False """, pr_title="Add authentication feature", pr_description="This PR adds user authentication with hardcoded secrets", changed_lines=[2, 3, 4], repository_context={"main_branch": "main", "language": "python"} ) # Test analysis insights = engine.analyze_code(context) assert len(insights) > 0 # Check insight structure for insight in insights: assert insight.category in AnalysisType assert insight.confidence in ConfidenceLevel assert insight.message assert insight.suggestion assert 0.0 <= insight.impact_score <= 1.0 # Test cache functionality insights2 = engine.analyze_code(context) assert len(insights2) == len(insights) print("✅ AI analysis engine tests passed") return True if __name__ == "__main__": test_ai_analysis_engine()