""" Autonomous Workflow Module Implements the complete self-improvement loop for the Starlight protocol. """ import json import hashlib import datetime import time from typing import Dict, List, Optional, Any, Callable from dataclasses import dataclass, field, asdict from enum import Enum from api_client import StarlightAPIClient, WakeLoop from proposal_system import ProposalSystem, WishBuilder, SelfImprovementWish, ImprovementProposal from task_system import TaskClaimingSystem, TaskDefinition, TaskMarketplace, create_task_from_wish class WorkflowState(Enum): """Autonomous workflow states""" IDLE = "idle" DISCOVERING = "discovering" EVALUATING = "evaluating" CLAIMING = "claiming" PROCESSING = "processing" SUBMITTING = "submitting" SELF_IMPROVING = "self_improving" WAITING = "waiting" @dataclass class WorkflowMetrics: """Workflow performance metrics""" tasks_discovered: int = 0 tasks_claimed: int = 0 tasks_completed: int = 0 tasks_approved: int = 0 tasks_rejected: int = 0 wishes_created: int = 0 proposals_submitted: int = 0 self_improvements: int = 0 cycle_count: int = 0 total_runtime_seconds: float = 0.0 @dataclass class AgentConfig: """Agent configuration for autonomous operation""" agent_id: str api_key: Optional[str] = None poll_interval: int = 60 max_concurrent_tasks: int = 3 self_improvement_enabled: bool = True self_improvement_interval: int = 10 min_task_priority: int = 2 auto_submit: bool = True class AutonomousWorkflow: """ Complete autonomous workflow that handles: - Task discovery and claiming - Task processing and submission - Self-improvement wish creation and proposal """ def __init__(self, config: AgentConfig): self.config = config self.client = StarlightAPIClient(config.agent_id, config.api_key) self.api_client = self.client self.task_system = TaskClaimingSystem(config.agent_id) self.proposal_system = ProposalSystem(config.agent_id) self.task_marketplace = TaskMarketplace() self.state = WorkflowState.IDLE self.metrics = WorkflowMetrics() self.running = False self.start_time = None self._cycle_handlers = { WorkflowState.DISCOVERING: self._discover_tasks, WorkflowState.EVALUATING: self._evaluate_tasks, WorkflowState.CLAIMING: self._claim_tasks, WorkflowState.PROCESSING: self._process_tasks, WorkflowState.SUBMITTING: self._submit_tasks, WorkflowState.SELF_IMPROVING: self._self_improve, WorkflowState.WAITING: self._wait } def start(self): """Start the autonomous workflow.""" self.running = True self.start_time = datetime.datetime.now() self.client.authenticate() print(f"Starting autonomous workflow for agent: {self.config.agent_id}") while self.running: try: self._cycle() self.metrics.cycle_count += 1 except KeyboardInterrupt: print("Stopping workflow...") break except Exception as e: print(f"Error in workflow cycle: {e}") self.state = WorkflowState.WAITING time.sleep(self.config.poll_interval) def stop(self): """Stop the autonomous workflow.""" self.running = False def _cycle(self): """Execute one workflow cycle.""" handler = self._cycle_handlers.get(self.state, self._wait) handler() def _discover_tasks(self): """Discover available tasks from the protocol.""" self.state = WorkflowState.DISCOVERING tasks = self.client.get_tasks(status="available") self.metrics.tasks_discovered += len(tasks) for task_data in tasks: task_def = TaskDefinition( id=task_data.id, wish_id=task_data.wish_id, title=task_data.title, description=task_data.description, requirements=task_data.requirements, deliverables=[], reward=task_data.reward ) self.task_marketplace.register_task(task_def) self.state = WorkflowState.EVALUATING def _evaluate_tasks(self): """Evaluate discovered tasks for eligibility.""" self.state = WorkflowState.EVALUATING available_tasks = self.task_marketplace.find_tasks( min_priority=self.config.min_task_priority ) eligible = [] for task in available_tasks: if task.id not in self.task_system.claimed_tasks: eligible.append(task) if eligible: self.state = WorkflowState.CLAIMING else: if self.config.self_improvement_enabled: self.state = WorkflowState.SELF_IMPROVING else: self.state = WorkflowState.WAITING def _claim_tasks(self): """Claim eligible tasks for processing.""" self.state = WorkflowState.CLAIMING available_tasks = self.task_marketplace.find_tasks( min_priority=self.config.min_task_priority ) current_claims = len(self.task_system.claimed_tasks) slots_available = self.config.max_concurrent_tasks - current_claims claimed = 0 for task in available_tasks: if claimed >= slots_available: break if task.id not in self.task_system.claimed_tasks: try: self.task_system.claim_task(task) self.metrics.tasks_claimed += 1 claimed += 1 except Exception as e: print(f"Failed to claim task {task.id}: {e}") self.state = WorkflowState.PROCESSING def _process_tasks(self): """Process claimed tasks.""" self.state = WorkflowState.PROCESSING claimed = self.task_system.get_claimed_tasks() for claim in claimed: if claim.status == "claimed" or claim.status == "in_progress": self._process_single_task(claim.task_id) if self.task_system.get_pending_submissions(): self.state = WorkflowState.SUBMITTING elif self.config.self_improvement_enabled: self.state = WorkflowState.SELF_IMPROVING else: self.state = WorkflowState.WAITING def _process_single_task(self, task_id: str): """Process a single task (stub for actual implementation).""" self.task_system.update_progress(task_id, 0.5, "Processing...") self.task_system.update_progress(task_id, 1.0, "Completed") def _submit_tasks(self): """Submit completed tasks for review.""" self.state = WorkflowState.SUBMITTING pending = self.task_system.get_pending_submissions() for submission in pending: if self.config.auto_submit: pass self.metrics.tasks_completed += len(pending) if self.config.self_improvement_enabled: self.state = WorkflowState.SELF_IMPROVING else: self.state = WorkflowState.WAITING def _self_improve(self): """Execute self-improvement cycle.""" self.state = WorkflowState.SELF_IMPROVING wish = self._identify_improvement_area() if wish: proposal = self._create_improvement_proposal(wish) if proposal: self.proposal_system.submit_proposal(proposal.id) self.metrics.proposals_submitted += 1 self.metrics.self_improvements += 1 self.state = WorkflowState.WAITING def _identify_improvement_area(self) -> Optional[SelfImprovementWish]: """Identify an area for self-improvement.""" self.metrics.wishes_created += 1 wish = self.proposal_system.create_wish( title="Add API Integration Capability", description="Add better API integration capabilities for improved reliability", category="capability", requirements=["Implement API integration", "Add retry logic", "Add circuit breaker"], success_criteria=["API integration functional", "Retry logic working", "Circuit breaker tested"] ) return wish def _create_improvement_proposal(self, wish: SelfImprovementWish) -> Optional[ImprovementProposal]: """Create a proposal for the improvement wish.""" proposal = self.proposal_system.create_proposal( wish_id=wish.id, title=f"Implement: {wish.title}", summary=f"Proposal to implement {wish.title}", motivation=wish.description, implementation_plan="Step 1: Analyze requirements\nStep 2: Implement\nStep 3: Test", timeline_days=7, resources_required=["time", "compute"], expected_improvements={"capability": 25.0}, risks=["scope creep"], mitigation=["clear requirements"] ) return proposal def _wait(self): """Wait before next cycle.""" self.state = WorkflowState.WAITING if self.start_time: elapsed = (datetime.datetime.now() - self.start_time).total_seconds() self.metrics.total_runtime_seconds = elapsed time.sleep(self.config.poll_interval) self.state = WorkflowState.DISCOVERING def get_status(self) -> Dict[str, Any]: """Get current workflow status.""" return { "state": self.state.value, "metrics": asdict(self.metrics), "config": asdict(self.config), "claimed_tasks": len(self.task_system.claimed_tasks), "pending_submissions": len(self.task_system.get_pending_submissions()) } def create_autonomous_agent(agent_id: str, api_key: Optional[str] = None) -> AutonomousWorkflow: """ Factory function to create a configured autonomous agent. Args: agent_id: Unique agent identifier api_key: Optional API key for authentication Returns: AutonomousWorkflow: Configured workflow """ config = AgentConfig( agent_id=agent_id, api_key=api_key, poll_interval=60, max_concurrent_tasks=3, self_improvement_enabled=True, self_improvement_interval=10, min_task_priority=2, auto_submit=True ) return AutonomousWorkflow(config)