""" Multi-Language Analysis Engine AST parsers and static analysis rules for multiple programming languages """ from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Dict, List, Optional, Any, Union import json import datetime import re import math import hashlib from enum import Enum from dataclasses import dataclass app = FastAPI(title="Analysis Engine", version="1.0.0") class Language(Enum): PYTHON = "python" JAVASCRIPT = "javascript" TYPESCRIPT = "typescript" JAVA = "java" GO = "go" RUST = "rust" CPP = "cpp" CSHARP = "csharp" class Severity(Enum): ERROR = "error" WARNING = "warning" INFO = "info" class IssueType(Enum): SYNTAX = "syntax" SECURITY = "security" PERFORMANCE = "performance" STYLE = "style" COMPLEXITY = "complexity" MAINTAINABILITY = "maintainability" @dataclass class CodeIssue: file_path: str line_number: int column: int issue_type: IssueType severity: Severity message: str rule_id: str suggestion: Optional[str] = None @dataclass class CodeMetrics: lines_of_code: int cyclomatic_complexity: float maintainability_index: float technical_debt_ratio: float test_coverage: float class LanguageAnalyzer: def __init__(self): self.rules = self._load_rules() def _load_rules(self) -> Dict[Language, List[Dict]]: return { Language.PYTHON: [ { "id": "PY001", "type": IssueType.STYLE, "severity": Severity.WARNING, "pattern": r"^\s*print\s*\(", "message": "Use logging instead of print", "suggestion": "Replace print() with logger.info() or appropriate logging level" }, { "id": "PY002", "type": IssueType.SECURITY, "severity": Severity.ERROR, "pattern": r"eval\s*\(", "message": "Use of eval() is dangerous", "suggestion": "Use ast.literal_eval() or safer alternatives" }, { "id": "PY003", "type": IssueType.COMPLEXITY, "severity": Severity.WARNING, "pattern": r"def\s+\w+\([^)]*\):", "message": "Function complexity detected", "suggestion": "Consider breaking this function into smaller functions" } ], Language.JAVASCRIPT: [ { "id": "JS001", "type": IssueType.STYLE, "severity": Severity.WARNING, "pattern": r"console\.log", "message": "Console.log found in production code", "suggestion": "Remove or replace with proper logging" }, { "id": "JS002", "type": IssueType.SECURITY, "severity": Severity.ERROR, "pattern": r"innerHTML\s*=", "message": "Direct innerHTML assignment can lead to XSS", "suggestion": "Use textContent or sanitize HTML first" }, { "id": "JS003", "type": IssueType.PERFORMANCE, "severity": Severity.WARNING, "pattern": r"for\s*\(\s*let\s+\w+\s*=\s*0\s*;", "message": "Consider using array methods instead of manual loops", "suggestion": "Use map(), filter(), or forEach() when appropriate" } ], Language.JAVA: [ { "id": "JV001", "type": IssueType.SECURITY, "severity": Severity.ERROR, "pattern": r"Runtime\.getRuntime\(\)\.exec", "message": "Direct command execution is dangerous", "suggestion": "Use ProcessBuilder with proper input validation" }, { "id": "JV002", "type": IssueType.PERFORMANCE, "severity": Severity.WARNING, "pattern": r"String\s+\w+\s*=\s*\w+\s*\+", "message": "String concatenation in loop is inefficient", "suggestion": "Use StringBuilder for string concatenation" } ] } def detect_language(self, file_path: str, content: str) -> Language: extension = file_path.split('.')[-1].lower() extension_map = { 'py': Language.PYTHON, 'js': Language.JAVASCRIPT, 'jsx': Language.JAVASCRIPT, 'ts': Language.TYPESCRIPT, 'tsx': Language.TYPESCRIPT, 'java': Language.JAVA, 'go': Language.GO, 'rs': Language.RUST, 'cpp': Language.CPP, 'cc': Language.CPP, 'cxx': Language.CPP, 'cs': Language.CSHARP, 'c': Language.CPP } if extension in extension_map: return extension_map[extension] content_indicators = { Language.PYTHON: [r"import\s+\w+", r"def\s+\w+\s*\(", r"class\s+\w+\s*:"], Language.JAVASCRIPT: [r"function\s+\w+", r"const\s+\w+\s*=", r"let\s+\w+\s*="], Language.JAVA: [r"public\s+class", r"public\s+static\s+void\s+main"], Language.GO: [r"func\s+\w+", r"package\s+\w+"], } for lang, patterns in content_indicators.items(): for pattern in patterns: if re.search(pattern, content, re.IGNORECASE): return lang return Language.PYTHON def analyze_code(self, file_path: str, content: str) -> tuple[List[CodeIssue], CodeMetrics]: language = self.detect_language(file_path, content) issues = self._apply_rules(file_path, content, language) metrics = self._calculate_metrics(content, language) return issues, metrics def _apply_rules(self, file_path: str, content: str, language: Language) -> List[CodeIssue]: issues = [] rules = self.rules.get(language, []) lines = content.split('\n') for rule in rules: pattern = rule["pattern"] for line_num, line in enumerate(lines, 1): match = re.search(pattern, line, re.IGNORECASE) if match: try: matched_text = match.group() if match.group() else "" column = line.find(matched_text) + 1 if matched_text else 1 except: column = 1 issue = CodeIssue( file_path=file_path, line_number=line_num, column=column, issue_type=rule["type"], severity=rule["severity"], message=rule["message"], rule_id=rule["id"], suggestion=rule.get("suggestion") ) issues.append(issue) return issues def _calculate_metrics(self, content: str, language: Language) -> CodeMetrics: lines = content.split('\n') non_empty_lines = [line for line in lines if line.strip()] loc = len(non_empty_lines) complexity = self._calculate_cyclomatic_complexity(content, language) maintainability = self._calculate_maintainability_index(loc, complexity) tech_debt = self._calculate_technical_debt_ratio(content, language) test_coverage = self._estimate_test_coverage(content, language) return CodeMetrics( lines_of_code=loc, cyclomatic_complexity=complexity, maintainability_index=maintainability, technical_debt_ratio=tech_debt, test_coverage=test_coverage ) def _calculate_cyclomatic_complexity(self, content: str, language: Language) -> float: complexity_patterns = { Language.PYTHON: [r"if\s+", r"elif\s+", r"for\s+", r"while\s+", r"except\s+", r"with\s+"], Language.JAVASCRIPT: [r"if\s*\(", r"for\s*\(", r"while\s*\(", r"catch\s*\(", r"case\s+"], Language.JAVA: [r"if\s*\(", r"for\s*\(", r"while\s*\(", r"catch\s*\(", r"case\s+"], Language.GO: [r"if\s+", r"for\s+", r"switch\s+", r"select\s+"], } patterns = complexity_patterns.get(language, complexity_patterns[Language.PYTHON]) complexity = 1.0 for pattern in patterns: matches = re.findall(pattern, content, re.IGNORECASE) complexity += len(matches) return complexity def _calculate_maintainability_index(self, loc: int, complexity: float) -> float: if loc == 0: return 100.0 maintainability = max(0, 171 - 5.2 * math.log(complexity) - 0.23 * complexity - 16.2 * math.log(loc)) return min(100, maintainability) def _calculate_technical_debt_ratio(self, content: str, language: Language) -> float: debt_indicators = ["TODO", "FIXME", "HACK", "XXX", "BUG"] total_lines = len([line for line in content.split('\n') if line.strip()]) if total_lines == 0: return 0.0 debt_lines = 0 for line in content.split('\n'): for indicator in debt_indicators: if indicator in line.upper(): debt_lines += 1 break return (debt_lines / total_lines) * 100 def _estimate_test_coverage(self, content: str, language: Language) -> float: test_patterns = { Language.PYTHON: [r"def\s+test_", r"class\s+Test", r"import\s+unittest", r"import\s+pytest"], Language.JAVASCRIPT: [r"describe\s*\(", r"it\s*\(", r"test\s*\(", r"expect\s*\("], Language.JAVA: [r"@Test", r"extends\s+TestCase", r"import\s+org\.junit"], } patterns = test_patterns.get(language, []) test_matches = 0 for pattern in patterns: test_matches += len(re.findall(pattern, content, re.IGNORECASE)) if test_matches == 0: return 0.0 return min(100.0, (test_matches / 10) * 100) analyzer = LanguageAnalyzer() class AnalysisRequest(BaseModel): file_path: str content: str language: Optional[str] = None class AnalysisResponse(BaseModel): file_path: str language: str issues: List[Dict[str, Any]] metrics: Dict[str, float] summary: Dict[str, Any] @app.get("/health") async def health(): return {"status": "healthy", "service": "analysis-engine"} @app.post("/analyze") async def analyze_file(request: AnalysisRequest): try: issues, metrics = analyzer.analyze_code(request.file_path, request.content) detected_language = analyzer.detect_language(request.file_path, request.content) response = AnalysisResponse( file_path=request.file_path, language=detected_language.value, issues=[ { "line_number": issue.line_number, "column": issue.column, "issue_type": issue.issue_type.value, "severity": issue.severity.value, "message": issue.message, "rule_id": issue.rule_id, "suggestion": issue.suggestion } for issue in issues ], metrics={ "lines_of_code": metrics.lines_of_code, "cyclomatic_complexity": metrics.cyclomatic_complexity, "maintainability_index": metrics.maintainability_index, "technical_debt_ratio": metrics.technical_debt_ratio, "test_coverage": metrics.test_coverage }, summary={ "total_issues": len(issues), "error_count": len([i for i in issues if i.severity == Severity.ERROR]), "warning_count": len([i for i in issues if i.severity == Severity.WARNING]), "info_count": len([i for i in issues if i.severity == Severity.INFO]), "quality_score": max(0, 100 - len(issues) * 5 - metrics.technical_debt_ratio) } ) return response except Exception as e: raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") @app.post("/analyze/batch") async def analyze_batch(requests: List[AnalysisRequest]): try: results = [] for req in requests: issues, metrics = analyzer.analyze_code(req.file_path, req.content) detected_language = analyzer.detect_language(req.file_path, req.content) results.append({ "file_path": req.file_path, "language": detected_language.value, "issues": len(issues), "metrics": { "lines_of_code": metrics.lines_of_code, "cyclomatic_complexity": metrics.cyclomatic_complexity, "maintainability_index": metrics.maintainability_index, "technical_debt_ratio": metrics.technical_debt_ratio, "test_coverage": metrics.test_coverage } }) return { "results": results, "total_files": len(requests), "timestamp": datetime.datetime.now().isoformat() } except Exception as e: raise HTTPException(status_code=500, detail=f"Batch analysis failed: {str(e)}") @app.get("/rules") async def get_rules(language: Optional[str] = None): try: if language: lang_enum = Language(language.lower()) return {"language": language, "rules": analyzer.rules.get(lang_enum, [])} return { "languages": [lang.value for lang in Language], "rules": { lang.value: [ { "id": rule["id"], "type": rule["type"].value, "severity": rule["severity"].value, "message": rule["message"] } for rule in rules ] for lang, rules in analyzer.rules.items() } } except ValueError: raise HTTPException(status_code=400, detail=f"Unsupported language: {language}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8002)