""" Review Generation Service Comment formatting, severity classification, and fix suggestions """ from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Dict, List, Optional, Any import json import datetime import hashlib import re from enum import Enum from dataclasses import dataclass app = FastAPI(title="Review Generation Service", version="1.0.0") class CommentSeverity(Enum): CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" INFO = "info" class CommentType(Enum): ISSUE = "issue" SUGGESTION = "suggestion" COMPLIMENT = "compliment" QUESTION = "question" @dataclass class ReviewComment: file_path: str line_number: int comment_type: CommentType severity: CommentSeverity message: str suggestion: Optional[str] = None code_snippet: Optional[str] = None confidence: float = 0.8 @dataclass class ReviewSummary: total_comments: int critical_issues: int high_issues: int medium_issues: int low_issues: int suggestions: int compliments: int overall_score: float approval_status: str class ReviewGenerator: def __init__(self): self.templates = self._load_templates() self.severity_weights = { CommentSeverity.CRITICAL: 10, CommentSeverity.HIGH: 5, CommentSeverity.MEDIUM: 2, CommentSeverity.LOW: 1, CommentSeverity.INFO: 0.5 } def _load_templates(self) -> Dict[str, Dict]: return { "security": { "critical": { "message": "🚨 Critical security vulnerability detected", "suggestion": "This must be fixed before deployment to prevent security breaches" }, "high": { "message": "âš ī¸ Security issue found", "suggestion": "Address this security concern to protect your application" } }, "performance": { "high": { "message": "🐌 Performance bottleneck identified", "suggestion": "Optimize this code for better performance" }, "medium": { "message": "⚡ Performance improvement opportunity", "suggestion": "Consider optimizing this for better efficiency" } }, "code_quality": { "high": { "message": "🔧 Code quality issue", "suggestion": "Refactor this code for better maintainability" }, "medium": { "message": "💡 Code improvement suggestion", "suggestion": "Consider this improvement for cleaner code" } }, "best_practices": { "medium": { "message": "📚 Best practice recommendation", "suggestion": "Follow this best practice for better code" }, "low": { "message": "💭 Style suggestion", "suggestion": "Minor style improvement for consistency" } } } def generate_review(self, analysis_results: List[Dict], ai_insights: List[Dict], file_context: Dict) -> Dict[str, Any]: comments = [] for result in analysis_results: file_comments = self._process_analysis_result(result, file_context) comments.extend(file_comments) for insight in ai_insights: ai_comment = self._process_ai_insight(insight, file_context) comments.append(ai_comment) comments = self._deduplicate_comments(comments) comments = self._prioritize_comments(comments) summary = self._generate_summary(comments) formatted_review = self._format_review(comments, summary, file_context) return formatted_review def _process_analysis_result(self, result: Dict, file_context: Dict) -> List[ReviewComment]: comments = [] file_path = result.get("file_path", "unknown") for issue in result.get("issues", []): comment = ReviewComment( file_path=file_path, line_number=issue.get("line_number", 1), comment_type=CommentType.ISSUE, severity=self._map_severity(issue.get("severity", "info")), message=issue.get("message", "Code issue detected"), suggestion=issue.get("suggestion"), code_snippet=self._extract_code_snippet(file_context, file_path, issue.get("line_number", 1)), confidence=0.8 ) comments.append(comment) return comments def _process_ai_insight(self, insight: Dict, file_context: Dict) -> ReviewComment: insight_type = insight.get("type", "suggestion") comment_type = self._map_comment_type(insight_type) return ReviewComment( file_path=insight.get("file_path", "unknown"), line_number=insight.get("line_number", 1), comment_type=comment_type, severity=self._map_severity(insight.get("severity", "medium")), message=insight.get("message", "AI insight"), suggestion=insight.get("suggestion"), code_snippet=insight.get("code_example"), confidence=insight.get("confidence", 0.7) ) def _map_severity(self, severity_str: str) -> CommentSeverity: severity_map = { "error": CommentSeverity.CRITICAL, "critical": CommentSeverity.CRITICAL, "high": CommentSeverity.HIGH, "warning": CommentSeverity.MEDIUM, "medium": CommentSeverity.MEDIUM, "low": CommentSeverity.LOW, "info": CommentSeverity.INFO } return severity_map.get(severity_str.lower(), CommentSeverity.MEDIUM) def _map_comment_type(self, type_str: str) -> CommentType: type_map = { "vulnerability": CommentType.ISSUE, "anti_pattern": CommentType.ISSUE, "optimization": CommentType.SUGGESTION, "suggestion": CommentType.SUGGESTION, "question": CommentType.QUESTION } return type_map.get(type_str.lower(), CommentType.SUGGESTION) def _extract_code_snippet(self, file_context: Dict, file_path: str, line_number: int) -> str: file_content = file_context.get(file_path, "") lines = file_content.split('\n') start = max(0, line_number - 2) end = min(len(lines), line_number + 1) snippet_lines = [] for i in range(start, end): prefix = ">>> " if i == line_number - 1 else " " snippet_lines.append(f"{prefix}{lines[i]}") return '\n'.join(snippet_lines) def _deduplicate_comments(self, comments: List[ReviewComment]) -> List[ReviewComment]: seen = set() unique_comments = [] for comment in comments: key = f"{comment.file_path}:{comment.line_number}:{comment.message[:50]}" if key not in seen: seen.add(key) unique_comments.append(comment) return unique_comments def _prioritize_comments(self, comments: List[ReviewComment]) -> List[ReviewComment]: severity_order = { CommentSeverity.CRITICAL: 0, CommentSeverity.HIGH: 1, CommentSeverity.MEDIUM: 2, CommentSeverity.LOW: 3, CommentSeverity.INFO: 4 } return sorted(comments, key=lambda c: (severity_order[c.severity], c.line_number)) def _generate_summary(self, comments: List[ReviewComment]) -> ReviewSummary: severity_counts = {severity: 0 for severity in CommentSeverity} type_counts = {comment_type: 0 for comment_type in CommentType} total_score = 0 max_score = 0 for comment in comments: severity_counts[comment.severity] += 1 type_counts[comment.comment_type] += 1 weight = self.severity_weights[comment.severity] total_score += weight * (1 - comment.confidence) max_score += weight overall_score = max(0, 100 - (total_score / max_score * 100)) if max_score > 0 else 100 approval_status = self._determine_approval_status(severity_counts, overall_score) return ReviewSummary( total_comments=len(comments), critical_issues=severity_counts[CommentSeverity.CRITICAL], high_issues=severity_counts[CommentSeverity.HIGH], medium_issues=severity_counts[CommentSeverity.MEDIUM], low_issues=severity_counts[CommentSeverity.LOW], suggestions=type_counts[CommentType.SUGGESTION], compliments=type_counts[CommentType.COMPLIMENT], overall_score=overall_score, approval_status=approval_status ) def _determine_approval_status(self, severity_counts: Dict, overall_score: float) -> str: if severity_counts[CommentSeverity.CRITICAL] > 0: return "CHANGES_REQUESTED" elif severity_counts[CommentSeverity.HIGH] > 3: return "CHANGES_REQUESTED" elif overall_score < 70: return "CHANGES_REQUESTED" elif overall_score < 85: return "NEEDS_REVIEW" else: return "APPROVED" def _format_review(self, comments: List[ReviewComment], summary: ReviewSummary, file_context: Dict) -> Dict[str, Any]: formatted_comments = [] for comment in comments: formatted_comment = { "path": comment.file_path, "position": comment.line_number, "body": self._format_comment_body(comment), "line": comment.line_number } formatted_comments.append(formatted_comment) review_body = self._generate_review_body(summary, comments) return { "body": review_body, "comments": formatted_comments, "summary": { "total_comments": summary.total_comments, "critical_issues": summary.critical_issues, "high_issues": summary.high_issues, "medium_issues": summary.medium_issues, "low_issues": summary.low_issues, "suggestions": summary.suggestions, "overall_score": summary.overall_score, "approval_status": summary.approval_status }, "metadata": { "generated_by": "ai-code-review-assistant", "timestamp": datetime.datetime.now().isoformat(), "version": "1.0.0" } } def _format_comment_body(self, comment: ReviewComment) -> str: severity_emoji = { CommentSeverity.CRITICAL: "🚨", CommentSeverity.HIGH: "âš ī¸", CommentSeverity.MEDIUM: "💡", CommentSeverity.LOW: "💭", CommentSeverity.INFO: "â„šī¸" } body_parts = [ f"{severity_emoji[comment.severity]} **{comment.severity.value.title()}**", "", comment.message ] if comment.code_snippet: body_parts.extend([ "", "**Code:**", "```", comment.code_snippet, "```" ]) if comment.suggestion: body_parts.extend([ "", "**Suggestion:**", comment.suggestion ]) body_parts.extend([ "", f"*Confidence: {comment.confidence:.1%}*" ]) return "\n".join(body_parts) def _generate_review_body(self, summary: ReviewSummary, comments: List[ReviewComment]) -> str: status_emoji = { "APPROVED": "✅", "NEEDS_REVIEW": "🔍", "CHANGES_REQUESTED": "🔄" } body_parts = [ f"# 🤖 AI Code Review", "", f"**Status:** {status_emoji.get(summary.approval_status, '❓')} {summary.approval_status.replace('_', ' ').title()}", f"**Overall Score:** {summary.overall_score:.1f}/100", "" ] if summary.critical_issues > 0: body_parts.extend([ f"## 🚨 Critical Issues ({summary.critical_issues})", "These must be addressed before merge.", "" ]) if summary.high_issues > 0: body_parts.extend([ f"## âš ī¸ High Priority Issues ({summary.high_issues})", "These should be addressed soon.", "" ]) if summary.medium_issues > 0: body_parts.extend([ f"## 💡 Medium Priority Issues ({summary.medium_issues})", "Consider addressing these for better code quality.", "" ]) if summary.suggestions > 0: body_parts.extend([ f"## đŸŽ¯ Suggestions ({summary.suggestions})", "Optional improvements for better practices.", "" ]) body_parts.extend([ "---", "*This review was generated by AI Code Review Assistant v1.0.0*" ]) return "\n".join(body_parts) review_generator = ReviewGenerator() class ReviewRequest(BaseModel): analysis_results: List[Dict[str, Any]] ai_insights: List[Dict[str, Any]] file_context: Dict[str, str] repository_info: Dict[str, Any] class ReviewResponse(BaseModel): body: str comments: List[Dict[str, Any]] summary: Dict[str, Any] metadata: Dict[str, Any] @app.get("/health") async def health(): return {"status": "healthy", "service": "review-generation"} @app.post("/generate", response_model=ReviewResponse) async def generate_review(request: ReviewRequest): try: review = review_generator.generate_review( request.analysis_results, request.ai_insights, request.file_context ) return ReviewResponse(**review) except Exception as e: raise HTTPException(status_code=500, detail=f"Review generation failed: {str(e)}") @app.post("/generate/summary") async def generate_review_summary(comments: List[Dict[str, Any]]): try: review_comments = [] for comment_data in comments: comment = ReviewComment( file_path=comment_data.get("file_path", "unknown"), line_number=comment_data.get("line_number", 1), comment_type=CommentType(comment_data.get("comment_type", "suggestion")), severity=CommentSeverity(comment_data.get("severity", "medium")), message=comment_data.get("message", ""), suggestion=comment_data.get("suggestion"), confidence=comment_data.get("confidence", 0.8) ) review_comments.append(comment) summary = review_generator._generate_summary(review_comments) return { "summary": { "total_comments": summary.total_comments, "critical_issues": summary.critical_issues, "high_issues": summary.high_issues, "medium_issues": summary.medium_issues, "low_issues": summary.low_issues, "suggestions": summary.suggestions, "overall_score": summary.overall_score, "approval_status": summary.approval_status } } except Exception as e: raise HTTPException(status_code=500, detail=f"Summary generation failed: {str(e)}") @app.get("/templates") async def get_comment_templates(): return { "templates": review_generator.templates, "severity_weights": review_generator.severity_weights } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8004)