""" Comprehensive Test Suite Unit tests, integration tests, and security testing """ import unittest import json import datetime import hashlib from unittest.mock import Mock, patch, MagicMock import sys import os # Add the current directory to Python path for imports sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) class TestCoreArchitecture(unittest.TestCase): """Test the main FastAPI application and service registry""" def setUp(self): # Import here to avoid circular imports from main import ServiceRegistry, ServiceType, CodeRequest self.registry = ServiceRegistry() self.ServiceType = ServiceType self.CodeRequest = CodeRequest def test_service_registry_initialization(self): """Test that service registry initializes correctly""" self.assertEqual(len(self.registry.services), 5) self.assertIn(self.ServiceType.GITHUB, self.registry.services) self.assertIn(self.ServiceType.ANALYSIS, self.registry.services) def test_get_service_url(self): """Test service URL generation""" github_url = self.registry.get_service_url(self.ServiceType.GITHUB) self.assertEqual(github_url, "http://localhost:8001") with self.assertRaises(Exception): self.registry.get_service_url("invalid_service") def test_register_health(self): """Test health registration""" self.registry.register_health(self.ServiceType.GITHUB, True) self.assertIn(self.ServiceType.GITHUB.value, self.registry.service_health) self.assertTrue(self.registry.service_health[self.ServiceType.GITHUB.value]["healthy"]) def test_code_request_validation(self): """Test CodeRequest model validation""" valid_request = self.CodeRequest( repository_url="https://github.com/example/repo", pull_request_id=123, files=[{"path": "test.py", "content": "print('hello')"}], user_token="token123" ) self.assertEqual(valid_request.repository_url, "https://github.com/example/repo") self.assertEqual(valid_request.pull_request_id, 123) class TestGitHubIntegration(unittest.TestCase): """Test GitHub service functionality""" def setUp(self): from github_service import GitHubWebhookService self.github_service = GitHubWebhookService() def test_webhook_signature_verification(self): """Test webhook signature verification""" payload = '{"test": "data"}' signature = "sha256=invalid_signature" self.assertFalse(self.github_service.verify_webhook_signature(payload, signature)) def test_access_token_generation(self): """Test access token generation""" token = self.github_service.get_access_token("test_code") self.assertTrue(token.startswith("ghp_")) self.assertEqual(len(token), 20) # ghp_ + 16 chars def test_get_pr_files(self): """Test PR files retrieval""" files = self.github_service.get_pr_files("owner", "repo", 123, "token") self.assertIsInstance(files, list) self.assertGreater(len(files), 0) # Check file structure file_data = files[0] self.assertIn("filename", file_data) self.assertIn("status", file_data) self.assertIn("additions", file_data) class TestAnalysisEngine(unittest.TestCase): """Test multi-language analysis engine""" def setUp(self): from analysis_engine import LanguageAnalyzer, Language, IssueType, Severity self.analyzer = LanguageAnalyzer() self.Language = Language self.IssueType = IssueType self.Severity = Severity def test_language_detection(self): """Test programming language detection""" # Python detection python_code = "import os\nprint('hello')" detected = self.analyzer.detect_language("test.py", python_code) self.assertEqual(detected, self.Language.PYTHON) # JavaScript detection js_code = "function test() { console.log('hello'); }" detected = self.analyzer.detect_language("test.js", js_code) self.assertEqual(detected, self.Language.JAVASCRIPT) def test_code_analysis(self): """Test code analysis functionality""" code_with_issues = """ import os print('debug message') eval(user_input) """ issues, metrics = self.analyzer.analyze_code("test.py", code_with_issues) self.assertIsInstance(issues, list) self.assertIsInstance(metrics, object) # Should detect print statement and eval usage self.assertGreater(len(issues), 0) def test_metrics_calculation(self): """Test code metrics calculation""" simple_code = "def test():\n return 42" issues, metrics = self.analyzer.analyze_code("test.py", simple_code) self.assertGreater(metrics.lines_of_code, 0) self.assertGreater(metrics.cyclomatic_complexity, 0) self.assertLessEqual(metrics.maintainability_index, 100) class TestAICore(unittest.TestCase): """Test AI-powered core service""" def setUp(self): from ai_core import AIAnalysisService, AnalysisType, ContextData self.ai_service = AIAnalysisService() self.AnalysisType = AnalysisType self.ContextData = ContextData def test_security_analysis(self): """Test security analysis patterns""" vulnerable_code = "password = 'secret123'\neval(user_input)" context = self.ContextData( repository_info={"file_path": "test.py"}, file_history=[], related_files=[], dependencies=[], coding_standards={} ) result = self.ai_service.analyze_with_context( vulnerable_code, context, [self.AnalysisType.SECURITY] ) self.assertIn("insights", result) self.assertGreater(len(result["insights"]), 0) # Should detect hardcoded password and eval usage security_insights = [i for i in result["insights"] if i["type"] == "vulnerability"] self.assertGreater(len(security_insights), 0) def test_performance_analysis(self): """Test performance analysis patterns""" inefficient_code = """ result = [] for i in range(1000): result.append(i * 2) """ context = self.ContextData( repository_info={"file_path": "test.py"}, file_history=[], related_files=[], dependencies=[], coding_standards={} ) result = self.ai_service.analyze_with_context( inefficient_code, context, [self.AnalysisType.PERFORMANCE] ) self.assertIn("insights", result) # Should detect inefficient loop pattern optimization_insights = [i for i in result["insights"] if i["type"] == "optimization"] self.assertGreater(len(optimization_insights), 0) def test_cache_functionality(self): """Test AI service caching""" code = "print('test')" context = self.ContextData( repository_info={"file_path": "test.py"}, file_history=[], related_files=[], dependencies=[], coding_standards={} ) # First call should compute result1 = self.ai_service.analyze_with_context( code, context, [self.AnalysisType.CODE_QUALITY] ) # Second call should use cache result2 = self.ai_service.analyze_with_context( code, context, [self.AnalysisType.CODE_QUALITY] ) self.assertEqual(result1["analysis_timestamp"], result2["analysis_timestamp"]) class TestReviewGeneration(unittest.TestCase): """Test review generation service""" def setUp(self): from review_service import ReviewGenerator, CommentSeverity, CommentType self.generator = ReviewGenerator() self.CommentSeverity = CommentSeverity self.CommentType = CommentType def test_review_generation(self): """Test complete review generation""" analysis_results = [{ "file_path": "test.py", "issues": [{ "line_number": 5, "severity": "warning", "message": "Print statement found", "suggestion": "Use logging instead" }] }] ai_insights = [{ "type": "vulnerability", "severity": "high", "message": "Security issue detected", "file_path": "test.py", "line_number": 10, "confidence": 0.9 }] file_context = {"test.py": "print('debug')\neval('test')"} review = self.generator.generate_review(analysis_results, ai_insights, file_context) self.assertIn("body", review) self.assertIn("comments", review) self.assertIn("summary", review) self.assertGreater(len(review["comments"]), 0) def test_severity_mapping(self): """Test severity level mapping""" mapped = self.generator._map_severity("error") self.assertEqual(mapped, self.CommentSeverity.CRITICAL) mapped = self.generator._map_severity("warning") self.assertEqual(mapped, self.CommentSeverity.MEDIUM) def test_summary_generation(self): """Test review summary generation""" from review_service import ReviewComment comments = [ ReviewComment( file_path="test.py", line_number=1, comment_type=self.CommentType.ISSUE, severity=self.CommentSeverity.HIGH, message="Test issue", confidence=0.8 ), ReviewComment( file_path="test.py", line_number=2, comment_type=self.CommentType.SUGGESTION, severity=self.CommentSeverity.LOW, message="Test suggestion", confidence=0.7 ) ] summary = self.generator._generate_summary(comments) self.assertEqual(summary.total_comments, 2) self.assertEqual(summary.high_issues, 1) self.assertEqual(summary.suggestions, 1) self.assertGreater(summary.overall_score, 0) class TestPerformanceMonitoring(unittest.TestCase): """Test performance monitoring service""" def setUp(self): from performance_service import PerformanceMonitor, MetricType self.monitor = PerformanceMonitor() self.MetricType = MetricType def test_metric_recording(self): """Test metric recording functionality""" initial_count = len(self.monitor.metrics.get("test_metric", [])) self.monitor.record_metric( name="test_metric", value=42.0, metric_type=self.MetricType.GAUGE ) self.assertEqual(len(self.monitor.metrics["test_metric"]), initial_count + 1) metric = self.monitor.metrics["test_metric"][-1] self.assertEqual(metric.value, 42.0) self.assertEqual(metric.metric_type, self.MetricType.GAUGE) def test_cache_functionality(self): """Test caching system""" # Test cache miss result = self.monitor.cache.get("test_key") self.assertIsNone(result) # Test cache set and get self.monitor.cache.set("test_key", {"data": "test"}) result = self.monitor.cache.get("test_key") self.assertEqual(result["data"], "test") # Test cache stats stats = self.monitor.cache.get_stats() self.assertIn("hits", stats) self.assertIn("misses", stats) self.assertIn("hit_rate", stats) def test_health_check(self): """Test service health monitoring""" # Record some metrics self.monitor.record_metric("response_time", 100.0, self.MetricType.TIMER) self.monitor.record_metric("error_count", 1.0, self.MetricType.COUNTER) health = self.monitor.get_service_health("test_service") self.assertIn("service", health) self.assertIn("status", health) self.assertIn("last_check", health) class TestSecurity(unittest.TestCase): """Security testing suite""" def test_input_validation(self): """Test input validation against malicious data""" from main import CodeRequest # Test SQL injection attempt malicious_input = "'; DROP TABLE users; --" with self.assertRaises(Exception): CodeRequest( repository_url=malicious_input, pull_request_id=-1, # Invalid ID files=[{"path": malicious_input, "content": malicious_input}], user_token=malicious_input ) def test_code_injection_prevention(self): """Test prevention of code injection in analysis""" from analysis_engine import LanguageAnalyzer analyzer = LanguageAnalyzer() # Test dangerous code patterns are detected but not executed dangerous_code = "eval('__import__(\"os\").system(\"rm -rf /\")')" issues, metrics = analyzer.analyze_code("test.py", dangerous_code) # Should detect the dangerous pattern eval_issues = [i for i in issues if "eval" in str(i.message).lower()] self.assertGreater(len(eval_issues), 0) def test_webhook_security(self): """Test webhook security measures""" from github_service import GitHubWebhookService service = GitHubWebhookService() # Test invalid signature rejection payload = '{"malicious": "content"}' invalid_signature = "sha256=invalid" self.assertFalse(service.verify_webhook_signature(payload, invalid_signature)) class TestIntegration(unittest.TestCase): """Integration tests for the complete system""" def test_end_to_end_workflow(self): """Test complete code review workflow""" # This would test the entire flow from GitHub webhook to review generation # For now, we'll test the integration points # Test service communication from main import ServiceRegistry, ServiceType registry = ServiceRegistry() # Verify all services are registered self.assertEqual(len(registry.services), 5) # Test service URL generation for service_type in registry.services: url = registry.get_service_url(service_type) self.assertTrue(url.startswith("http://")) def test_error_handling(self): """Test system error handling and recovery""" from main import ServiceRegistry, ServiceType registry = ServiceRegistry() # Test handling of invalid service requests with self.assertRaises(Exception): registry.get_service_url("nonexistent_service") # Test health monitoring with invalid data registry.register_health(ServiceType.GITHUB, None) # Should handle gracefully self.assertIn(ServiceType.GITHUB.value, registry.service_health) def run_security_scan(): """Run security vulnerability scan""" print("๐Ÿ”’ Running Security Scan...") # Check for common security issues security_issues = [] # Test for hardcoded secrets test_files = [ "main.py", "github_service.py", "analysis_engine.py", "ai_core.py", "review_service.py", "performance_service.py" ] for file_name in test_files: if os.path.exists(file_name): with open(file_name, 'r') as f: content = f.read() # Check for potential secrets if "password" in content.lower() and "=" in content: security_issues.append(f"Potential hardcoded password in {file_name}") if "api_key" in content.lower() and "=" in content: security_issues.append(f"Potential hardcoded API key in {file_name}") if "eval(" in content: security_issues.append(f"Use of eval() in {file_name}") if security_issues: print("โš ๏ธ Security Issues Found:") for issue in security_issues: print(f" - {issue}") else: print("โœ… No security issues detected") return len(security_issues) == 0 def run_performance_tests(): """Run performance benchmarks""" print("๐Ÿš€ Running Performance Tests...") import time # Test analysis engine performance from analysis_engine import LanguageAnalyzer analyzer = LanguageAnalyzer() # Test with different code sizes test_cases = [ ("Small", "print('hello')"), ("Medium", "\n".join([f"print('line {i}')" for i in range(100)])), ("Large", "\n".join([f"print('line {i}')" for i in range(1000)])) ] results = {} for size_name, code in test_cases: start_time = time.time() issues, metrics = analyzer.analyze_code("test.py", code) end_time = time.time() duration = end_time - start_time results[size_name] = { "duration": duration, "issues_found": len(issues), "lines_processed": len(code.split('\n')) } print(f" {size_name}: {duration:.3f}s, {len(issues)} issues") return results if __name__ == "__main__": print("๐Ÿงช Running Comprehensive Test Suite") print("=" * 50) # Run unit tests print("\n๐Ÿ“‹ Running Unit Tests...") unittest.main(argv=[''], exit=False, verbosity=2) # Run security scan print("\n" + "=" * 50) security_passed = run_security_scan() # Run performance tests print("\n" + "=" * 50) performance_results = run_performance_tests() # Summary print("\n" + "=" * 50) print("๐Ÿ“Š Test Summary:") print(f" Security: {'โœ… PASSED' if security_passed else 'โŒ FAILED'}") print(f" Performance: {'โœ… COMPLETED' if performance_results else 'โŒ FAILED'}") print("\n๐ŸŽ‰ Test Suite Complete!")