""" GitHub Webhook Processing Service Handles GitHub pull request events and triggers code analysis """ import json import hashlib import hmac import datetime from typing import Dict, List, Optional, Any from dataclasses import dataclass from enum import Enum class WebhookEvent(Enum): PULL_REQUEST_OPENED = "pull_request_opened" PULL_REQUEST_SYNCHRONIZED = "pull_request_synchronized" PULL_REQUEST_CLOSED = "pull_request_closed" @dataclass class PullRequestData: """Structure for GitHub pull request information""" pr_number: int repository: str owner: str head_sha: str base_sha: str title: str description: str author: str files_changed: List[str] event_type: WebhookEvent class GitHubWebhookProcessor: """Processes GitHub webhooks and extracts PR data""" def __init__(self, secret_token: str): self.secret_token = secret_token self.processed_events = [] def verify_signature(self, payload: str, signature: str) -> bool: """Verify GitHub webhook signature""" expected_signature = hmac.new( self.secret_token.encode(), payload.encode(), hashlib.sha256 ).hexdigest() return hmac.compare_digest( f"sha256={expected_signature}", signature ) def parse_webhook_payload(self, payload: str) -> Optional[PullRequestData]: """Parse GitHub webhook payload and extract PR data""" try: data = json.loads(payload) if "pull_request" not in data: return None pr = data["pull_request"] repository = data["repository"] # Extract file changes files_changed = [] if "files" in pr: files_changed = [f["filename"] for f in pr["files"]] # Determine event type action = data.get("action", "") if action == "opened": event_type = WebhookEvent.PULL_REQUEST_OPENED elif action == "synchronize": event_type = WebhookEvent.PULL_REQUEST_SYNCHRONIZED elif action == "closed": event_type = WebhookEvent.PULL_REQUEST_CLOSED else: event_type = WebhookEvent.PULL_REQUEST_OPENED # Default pr_data = PullRequestData( pr_number=pr["number"], repository=repository["name"], owner=repository["owner"]["login"], head_sha=pr["head"]["sha"], base_sha=pr["base"]["sha"], title=pr["title"], description=pr.get("body", ""), author=pr["user"]["login"], files_changed=files_changed, event_type=event_type ) self.processed_events.append({ "timestamp": datetime.datetime.now().isoformat(), "pr_number": pr_data.pr_number, "event_type": event_type.value, "files_count": len(files_changed) }) return pr_data except (json.JSONDecodeError, KeyError) as e: print(f"Error parsing webhook payload: {e}") return None def should_analyze(self, pr_data: PullRequestData) -> bool: """Determine if PR should be analyzed based on criteria""" # Skip closed PRs if pr_data.event_type == WebhookEvent.PULL_REQUEST_CLOSED: return False # Skip if no files changed if not pr_data.files_changed: return False # Skip draft PRs if pr_data.title.startswith("[DRAFT]"): return False # Skip if author is bot if pr_data.author.endswith("[bot]"): return False return True def get_analysis_priority(self, pr_data: PullRequestData) -> str: """Calculate analysis priority based on PR characteristics""" score = 0 # More files changed = higher priority score += min(len(pr_data.files_changed) * 10, 100) # Check for critical file patterns critical_patterns = ["security", "auth", "payment", "crypto"] for pattern in critical_patterns: if any(pattern in file.lower() for file in pr_data.files_changed): score += 50 break # Check author reputation (simplified) if pr_data.author in ["trusted-contributor", "senior-dev"]: score -= 20 if score >= 80: return "high" elif score >= 40: return "medium" else: return "low" # Test implementation def test_webhook_processor(): """Test the webhook processor functionality""" processor = GitHubWebhookProcessor("test_secret") # Sample webhook payload sample_payload = """ { "action": "opened", "pull_request": { "number": 123, "title": "Add new authentication feature", "body": "This PR adds OAuth2 support", "user": {"login": "contributor"}, "head": {"sha": "abc123def456"}, "base": {"sha": "def456abc123"}, "files": [ {"filename": "src/auth.py"}, {"filename": "tests/test_auth.py"} ] }, "repository": { "name": "code-review-assistant", "owner": {"login": "starlight"} } } """ # Test parsing pr_data = processor.parse_webhook_payload(sample_payload) assert pr_data is not None assert pr_data.pr_number == 123 assert pr_data.repository == "code-review-assistant" # Test analysis decision should_analyze = processor.should_analyze(pr_data) assert should_analyze == True # Test priority calculation priority = processor.get_analysis_priority(pr_data) assert priority in ["high", "medium", "low"] print("✅ Webhook processor tests passed") return True if __name__ == "__main__": test_webhook_processor()