""" AI-Powered Core Service LLM integration with context-aware analysis """ from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Dict, List, Optional, Any, Union import json import datetime import hashlib import re import math from enum import Enum from dataclasses import dataclass app = FastAPI(title="AI Core Service", version="1.0.0") class AnalysisType(Enum): SECURITY = "security" PERFORMANCE = "performance" ARCHITECTURE = "architecture" CODE_QUALITY = "code_quality" BEST_PRACTICES = "best_practices" class InsightType(Enum): VULNERABILITY = "vulnerability" ANTI_PATTERN = "anti_pattern" OPTIMIZATION = "optimization" SUGGESTION = "suggestion" QUESTION = "question" @dataclass class AIInsight: type: InsightType severity: str confidence: float message: str file_path: str line_number: int context: str suggestion: Optional[str] = None code_example: Optional[str] = None @dataclass class ContextData: repository_info: Dict[str, Any] file_history: List[Dict[str, Any]] related_files: List[str] dependencies: List[str] coding_standards: Dict[str, Any] class MockLLMService: def __init__(self): self.model_name = "gpt-4-code-review" self.context_window = 8192 self.temperature = 0.3 def analyze_code(self, code: str, context: ContextData, analysis_types: List[AnalysisType]) -> List[AIInsight]: insights = [] for analysis_type in analysis_types: if analysis_type == AnalysisType.SECURITY: insights.extend(self._security_analysis(code, context)) elif analysis_type == AnalysisType.PERFORMANCE: insights.extend(self._performance_analysis(code, context)) elif analysis_type == AnalysisType.ARCHITECTURE: insights.extend(self._architecture_analysis(code, context)) elif analysis_type == AnalysisType.CODE_QUALITY: insights.extend(self._code_quality_analysis(code, context)) elif analysis_type == AnalysisType.BEST_PRACTICES: insights.extend(self._best_practices_analysis(code, context)) return insights def _security_analysis(self, code: str, context: ContextData) -> List[AIInsight]: insights = [] lines = code.split('\n') security_patterns = [ { "pattern": r"eval\s*\(", "type": InsightType.VULNERABILITY, "severity": "high", "message": "Use of eval() can lead to code injection", "suggestion": "Use safer alternatives like ast.literal_eval() or JSON parsing", "example": "# Instead of:\n# result = eval(user_input)\n# Use:\n# result = ast.literal_eval(user_input)" }, { "pattern": r"exec\s*\(", "type": InsightType.VULNERABILITY, "severity": "high", "message": "Use of exec() is dangerous and can execute arbitrary code", "suggestion": "Avoid exec() entirely. Use function calls or configuration files instead", "example": "# Instead of:\n# exec(user_code)\n# Use:\n# result = safe_function(user_input)" }, { "pattern": r"sql.*\+.*%|format.*sql|f\".*sql", "type": InsightType.VULNERABILITY, "severity": "high", "message": "Potential SQL injection vulnerability", "suggestion": "Use parameterized queries or ORM methods", "example": "# Instead of:\n# cursor.execute(f\"SELECT * FROM users WHERE id = {user_id}\")\n# Use:\n# cursor.execute(\"SELECT * FROM users WHERE id = %s\", (user_id,))" }, { "pattern": r"password.*=.*['\"][^'\"]{0,8}['\"]", "type": InsightType.VULNERABILITY, "severity": "high", "message": "Hardcoded password detected", "suggestion": "Use environment variables or secure configuration management", "example": "# Instead of:\n# password = \"secret123\"\n# Use:\n# password = os.getenv('DB_PASSWORD')" } ] for line_num, line in enumerate(lines, 1): for pattern_info in security_patterns: if re.search(pattern_info["pattern"], line, re.IGNORECASE): insight = AIInsight( type=pattern_info["type"], severity=pattern_info["severity"], confidence=0.85, message=pattern_info["message"], file_path=context.repository_info.get("file_path", "unknown"), line_number=line_num, context=line.strip(), suggestion=pattern_info["suggestion"], code_example=pattern_info["example"] ) insights.append(insight) return insights def _performance_analysis(self, code: str, context: ContextData) -> List[AIInsight]: insights = [] lines = code.split('\n') performance_patterns = [ { "pattern": r"for.*in.*range\(.*\):.*\.append\(", "type": InsightType.OPTIMIZATION, "severity": "medium", "message": "Inefficient list building in loop", "suggestion": "Use list comprehensions or map() for better performance", "example": "# Instead of:\n# result = []\n# for i in range(n):\n# result.append(i * 2)\n# Use:\n# result = [i * 2 for i in range(n)]" }, { "pattern": r"\.find\(.*\)\s*!=\s*-1", "type": InsightType.OPTIMIZATION, "severity": "low", "message": "Use 'in' operator instead of find() for membership testing", "suggestion": "Replace find() with 'in' for cleaner and faster code", "example": "# Instead of:\n# if text.find(substring) != -1:\n# Use:\n# if substring in text:" } ] for line_num, line in enumerate(lines, 1): for pattern_info in performance_patterns: if re.search(pattern_info["pattern"], line): insight = AIInsight( type=pattern_info["type"], severity=pattern_info["severity"], confidence=0.75, message=pattern_info["message"], file_path=context.repository_info.get("file_path", "unknown"), line_number=line_num, context=line.strip(), suggestion=pattern_info["suggestion"], code_example=pattern_info["example"] ) insights.append(insight) return insights def _architecture_analysis(self, code: str, context: ContextData) -> List[AIInsight]: insights = [] if len(code.split('\n')) > 100: insight = AIInsight( type=InsightType.SUGGESTION, severity="medium", confidence=0.8, message="Large file detected. Consider splitting into smaller modules", file_path=context.repository_info.get("file_path", "unknown"), line_number=1, context="File analysis", suggestion="Break this file into logical components with single responsibilities", code_example="# Split into:\n# - models.py\n# - services.py\n# - utils.py\n# - main.py" ) insights.append(insight) return insights def _code_quality_analysis(self, code: str, context: ContextData) -> List[AIInsight]: insights = [] lines = code.split('\n') quality_patterns = [ { "pattern": r"def\s+\w+\([^)]*\):", "type": InsightType.ANTI_PATTERN, "severity": "medium", "message": "Function complexity detected", "suggestion": "Break this function into smaller, more focused functions" }, { "pattern": r"except\s*:", "type": InsightType.ANTI_PATTERN, "severity": "medium", "message": "Bare except clause catches all exceptions", "suggestion": "Specify the exception types you want to catch", "example": "# Instead of:\n# except:\n# Use:\n# except (ValueError, TypeError):" } ] for line_num, line in enumerate(lines, 1): for pattern_info in quality_patterns: if re.search(pattern_info["pattern"], line): insight = AIInsight( type=pattern_info["type"], severity=pattern_info["severity"], confidence=0.7, message=pattern_info["message"], file_path=context.repository_info.get("file_path", "unknown"), line_number=line_num, context=line.strip(), suggestion=pattern_info.get("suggestion"), code_example=pattern_info.get("example") ) insights.append(insight) return insights def _best_practices_analysis(self, code: str, context: ContextData) -> List[AIInsight]: insights = [] lines = code.split('\n') best_practice_patterns = [ { "pattern": r"TODO|FIXME|HACK|XXX", "type": InsightType.QUESTION, "severity": "low", "message": "Technical debt marker found", "suggestion": "Address the marked issue or create a ticket to track it" }, { "pattern": r"print\s*\(", "type": InsightType.SUGGESTION, "severity": "low", "message": "Print statement found in production code", "suggestion": "Use proper logging instead of print statements", "example": "# Instead of:\n# print(f\"Processing {item}\")\n# Use:\n# logger.info(f\"Processing {item}\")" } ] for line_num, line in enumerate(lines, 1): for pattern_info in best_practice_patterns: if re.search(pattern_info["pattern"], line, re.IGNORECASE): insight = AIInsight( type=pattern_info["type"], severity=pattern_info["severity"], confidence=0.6, message=pattern_info["message"], file_path=context.repository_info.get("file_path", "unknown"), line_number=line_num, context=line.strip(), suggestion=pattern_info["suggestion"], code_example=pattern_info.get("example") ) insights.append(insight) return insights class AIAnalysisService: def __init__(self): self.llm_service = MockLLMService() self.cache = {} def analyze_with_context(self, code: str, context: ContextData, analysis_types: List[AnalysisType]) -> Dict[str, Any]: cache_key = self._generate_cache_key(code, context, analysis_types) if cache_key in self.cache: return self.cache[cache_key] insights = self.llm_service.analyze_code(code, context, analysis_types) result = { "insights": [ { "type": insight.type.value, "severity": insight.severity, "confidence": insight.confidence, "message": insight.message, "file_path": insight.file_path, "line_number": insight.line_number, "context": insight.context, "suggestion": insight.suggestion, "code_example": insight.code_example } for insight in insights ], "summary": self._generate_summary(insights), "recommendations": self._generate_recommendations(insights), "confidence_score": self._calculate_overall_confidence(insights), "analysis_timestamp": datetime.datetime.now().isoformat() } self.cache[cache_key] = result return result def _generate_cache_key(self, code: str, context: ContextData, analysis_types: List[AnalysisType]) -> str: content = f"{code[:100]}{context.repository_info.get('file_path', '')}{','.join([t.value for t in analysis_types])}" return hashlib.sha256(content.encode()).hexdigest()[:16] def _generate_summary(self, insights: List[AIInsight]) -> Dict[str, Any]: total_insights = len(insights) severity_counts = {} type_counts = {} for insight in insights: severity_counts[insight.severity] = severity_counts.get(insight.severity, 0) + 1 type_counts[insight.type.value] = type_counts.get(insight.type.value, 0) + 1 return { "total_insights": total_insights, "severity_breakdown": severity_counts, "type_breakdown": type_counts, "high_priority_issues": severity_counts.get("high", 0), "critical_vulnerabilities": len([i for i in insights if i.type == InsightType.VULNERABILITY and i.severity == "high"]) } def _generate_recommendations(self, insights: List[AIInsight]) -> List[str]: recommendations = [] high_severity_count = len([i for i in insights if i.severity == "high"]) if high_severity_count > 0: recommendations.append(f"Address {high_severity_count} high-severity issues immediately") vulnerabilities = [i for i in insights if i.type == InsightType.VULNERABILITY] if vulnerabilities: recommendations.append("Review and fix all security vulnerabilities before deployment") performance_issues = [i for i in insights if i.type == InsightType.OPTIMIZATION] if len(performance_issues) > 3: recommendations.append("Consider performance optimization to improve application efficiency") return recommendations def _calculate_overall_confidence(self, insights: List[AIInsight]) -> float: if not insights: return 0.0 total_confidence = sum(insight.confidence for insight in insights) return total_confidence / len(insights) ai_service = AIAnalysisService() class AIAnalysisRequest(BaseModel): code: str context: Dict[str, Any] analysis_types: List[str] = ["security", "performance", "code_quality"] class AIAnalysisResponse(BaseModel): insights: List[Dict[str, Any]] summary: Dict[str, Any] recommendations: List[str] confidence_score: float analysis_timestamp: str @app.get("/health") async def health(): return {"status": "healthy", "service": "ai-core"} @app.post("/analyze", response_model=AIAnalysisResponse) async def analyze_code_with_ai(request: AIAnalysisRequest): try: context_data = ContextData( repository_info=request.context.get("repository_info", {}), file_history=request.context.get("file_history", []), related_files=request.context.get("related_files", []), dependencies=request.context.get("dependencies", []), coding_standards=request.context.get("coding_standards", {}) ) analysis_types = [AnalysisType(t) for t in request.analysis_types] result = ai_service.analyze_with_context(request.code, context_data, analysis_types) return AIAnalysisResponse(**result) except ValueError as e: raise HTTPException(status_code=400, detail=f"Invalid analysis type: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"AI analysis failed: {str(e)}") @app.post("/analyze/batch") async def analyze_batch_with_ai(requests: List[AIAnalysisRequest]): try: results = [] for req in requests: context_data = ContextData( repository_info=req.context.get("repository_info", {}), file_history=req.context.get("file_history", []), related_files=req.context.get("related_files", []), dependencies=req.context.get("dependencies", []), coding_standards=req.context.get("coding_standards", {}) ) analysis_types = [AnalysisType(t) for t in req.analysis_types] result = ai_service.analyze_with_context(req.code, context_data, analysis_types) results.append(result) return { "results": results, "total_files": len(requests), "timestamp": datetime.datetime.now().isoformat() } except Exception as e: raise HTTPException(status_code=500, detail=f"Batch AI analysis failed: {str(e)}") @app.get("/models") async def get_available_models(): return { "models": [ { "name": "gpt-4-code-review", "description": "GPT-4 optimized for code review", "context_window": 8192, "capabilities": ["security", "performance", "architecture", "code_quality"] } ], "default_model": "gpt-4-code-review" } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8003)