""" Task Claiming and Submission System Handles the complete lifecycle of task management in the Starlight protocol. """ import json import hashlib import datetime from typing import Dict, List, Optional, Any from dataclasses import dataclass, field, asdict from enum import Enum from collections import deque class TaskStatus(Enum): """Task lifecycle states""" AVAILABLE = "available" CLAIMED = "claimed" IN_PROGRESS = "in_progress" SUBMITTED = "submitted" UNDER_REVIEW = "under_review" APPROVED = "approved" REJECTED = "rejected" REVISION_REQUIRED = "revision_required" class TaskPriority(Enum): """Task priority levels""" LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 @dataclass class TaskDefinition: """Complete task definition from wish decomposition""" id: str wish_id: str title: str description: str requirements: List[str] deliverables: List[str] priority: int = TaskPriority.MEDIUM.value estimated_hours: float = 1.0 reward: float = 0.0 skills_required: List[str] = field(default_factory=list) dependencies: List[str] = field(default_factory=list) @dataclass class TaskClaim: """Active task claim by an agent""" task_id: str agent_id: str claimed_at: str status: str = TaskStatus.CLAIMED.value progress: float = 0.0 notes: str = "" @dataclass class TaskSubmission: """Task completion submission""" id: str task_id: str agent_id: str submitted_at: str deliverables: Dict[str, Any] notes: str artifacts: List[str] = field(default_factory=list) status: str = TaskStatus.SUBMITTED.value reviewer_id: Optional[str] = None reviewed_at: Optional[str] = None reviewer_notes: Optional[str] = None score: float = 0.0 @dataclass class TaskResult: """Final task result after approval""" task_id: str submission_id: str approved_at: str approved_by: str final_deliverables: Dict[str, Any] performance_metrics: Dict[str, float] quality_score: float class TaskClaimingSystem: """ System for claiming and managing tasks from the Starlight protocol. """ def __init__(self, agent_id: str): self.agent_id = agent_id self.claimed_tasks: Dict[str, TaskClaim] = {} self.submissions: Dict[str, TaskSubmission] = {} self.task_history: deque = deque(maxlen=100) def claim_task(self, task: TaskDefinition) -> TaskClaim: """ Claim a task for processing. Args: task: Task definition to claim Returns: TaskClaim: Active claim object """ if task.id in self.claimed_tasks: existing = self.claimed_tasks[task.id] if existing.agent_id == self.agent_id: return existing raise ValueError(f"Task {task.id} already claimed by {existing.agent_id}") claim = TaskClaim( task_id=task.id, agent_id=self.agent_id, claimed_at=datetime.datetime.now().isoformat(), status=TaskStatus.CLAIMED.value, progress=0.0 ) self.claimed_tasks[task.id] = claim self._add_to_history(task.id, "claimed") return claim def update_progress(self, task_id: str, progress: float, notes: str = "") -> Dict[str, Any]: """ Update progress on a claimed task. Args: task_id: Task to update progress: Progress percentage (0.0 - 1.0) notes: Optional progress notes Returns: dict: Update confirmation """ if task_id not in self.claimed_tasks: raise ValueError(f"Task {task_id} not claimed") claim = self.claimed_tasks[task_id] claim.progress = min(1.0, max(0.0, progress)) claim.notes = notes if claim.progress > 0.0: claim.status = TaskStatus.IN_PROGRESS.value return { "task_id": task_id, "progress": claim.progress, "status": claim.status, "updated_at": datetime.datetime.now().isoformat() } def submit_task(self, task_id: str, deliverables: Dict[str, Any], notes: str = "", artifacts: Optional[List[str]] = None) -> TaskSubmission: """ Submit completed task for review. Args: task_id: Completed task ID deliverables: Task deliverables notes: Submission notes artifacts: List of artifact references Returns: TaskSubmission: Submission object """ if task_id not in self.claimed_tasks: raise ValueError(f"Task {task_id} not claimed") claim = self.claimed_tasks[task_id] submission_id = self._generate_id(f"sub:{task_id}:{self.agent_id}") submission = TaskSubmission( id=submission_id, task_id=task_id, agent_id=self.agent_id, submitted_at=datetime.datetime.now().isoformat(), deliverables=deliverables, notes=notes, artifacts=artifacts or [], status=TaskStatus.SUBMITTED.value ) claim.status = TaskStatus.SUBMITTED.value self.submissions[submission_id] = submission self._add_to_history(task_id, "submitted") return submission def get_submission_status(self, submission_id: str) -> Optional[Dict[str, Any]]: """ Get status of a submission. Args: submission_id: Submission to check Returns: dict: Submission status or None """ if submission_id not in self.submissions: return None submission = self.submissions[submission_id] return asdict(submission) def get_claimed_tasks(self) -> List[TaskClaim]: """Get all tasks currently claimed by this agent.""" return list(self.claimed_tasks.values()) def get_pending_submissions(self) -> List[TaskSubmission]: """Get all submissions awaiting review.""" return [s for s in self.submissions.values() if s.status == TaskStatus.SUBMITTED.value] def _generate_id(self, seed: str) -> str: """Generate unique ID.""" return hashlib.sha256(seed.encode()).hexdigest()[:16] def _add_to_history(self, task_id: str, action: str): """Add action to task history.""" self.task_history.append({ "task_id": task_id, "agent_id": self.agent_id, "action": action, "timestamp": datetime.datetime.now().isoformat() }) class TaskMarketplace: """ Marketplace for discovering and filtering available tasks. """ def __init__(self): self.available_tasks: Dict[str, TaskDefinition] = {} def register_task(self, task: TaskDefinition): """Register a new task in the marketplace.""" self.available_tasks[task.id] = task def find_tasks(self, wish_id: Optional[str] = None, min_priority: int = TaskPriority.LOW.value, skills: Optional[List[str]] = None, max_estimated_hours: Optional[float] = None) -> List[TaskDefinition]: results = [] for task in self.available_tasks.values(): if task.id in ["available", "claimed"]: continue if wish_id and task.wish_id != wish_id: continue if task.priority < min_priority: continue if skills: if not any(s in task.skills_required for s in skills): continue if max_estimated_hours and task.estimated_hours > max_estimated_hours: continue results.append(task) return sorted(results, key=lambda t: (-t.priority, t.estimated_hours)) def get_task_by_id(self, task_id: str) -> Optional[TaskDefinition]: """Get task by ID.""" return self.available_tasks.get(task_id) def create_task_from_wish(wish_id: str, wish_title: str, wish_description: str, requirement: str, index: int) -> TaskDefinition: """ Factory function to create a task from a wish requirement. Args: wish_id: Parent wish ID wish_title: Wish title wish_description: Wish description requirement: Specific requirement index: Task index for ID generation Returns: TaskDefinition: New task """ task_id = hashlib.sha256(f"{wish_id}:{index}".encode()).hexdigest()[:12] return TaskDefinition( id=task_id, wish_id=wish_id, title=f"[{wish_title}] {requirement}", description=f"Task derived from wish: {wish_description}", requirements=[requirement], deliverables=[f"Implementation of: {requirement}"], priority=TaskPriority.MEDIUM.value, estimated_hours=2.0, reward=10.0, skills_required=["python", "development"] )