#!/usr/bin/env python3 """ AI Agent Identity & Wallet Management System Task 1: Core Infrastructure - Identity Management Provides secure wallet creation, signature verification, and identity linking across sessions for AI agent persistence in Starlight. """ import os import json import hashlib import secrets import base64 from datetime import datetime from typing import Optional, Dict, Tuple class AgentIdentity: """Manages AI agent identity through Bitcoin wallet operations""" def __init__(self, identity_file: str = "agent_identity.json"): self.identity_file = identity_file self.identity_data = None self.load_or_create_identity() def load_or_create_identity(self): """Load existing identity or create new one""" if os.path.exists(self.identity_file): self.load_identity() else: self.create_new_identity() def load_identity(self): """Load identity from file""" try: with open(self.identity_file, 'r') as f: self.identity_data = json.load(f) print(f"āœ… Loaded existing identity: {self.identity_data.get('address', 'unknown')}") except Exception as e: print(f"āš ļø Failed to load identity: {e}") self.create_new_identity() def create_new_identity(self): """Create new AI agent identity with wallet""" print("šŸ” Creating new AI agent identity...") address = self._generate_bitcoin_address() private_key_wif = self._generate_private_key() public_key = self._derive_public_key(private_key_wif) self.identity_data = { "address": address, "private_key_wif": private_key_wif, "public_key": public_key, "created_at": datetime.now().isoformat(), "agent_id": self._generate_agent_id(address), "version": "1.0.0", "session_count": 0, "continuity_hash": self._generate_continuity_hash() } self.save_identity() print(f"āœ… Created new identity: {address}") def save_identity(self): """Persist identity data securely""" try: with open(self.identity_file, 'w') as f: json.dump(self.identity_data, f, indent=2) except Exception as e: print(f"āŒ Failed to save identity: {e}") def _generate_bitcoin_address(self) -> str: """Generate a mock Bitcoin address for AI agent""" prefix = "mh" key_part = secrets.token_hex(17) return prefix + key_part def _generate_private_key(self) -> str: """Generate a mock private key in WIF format""" return secrets.token_hex(32) def _derive_public_key(self, private_key_wif: str) -> str: """Derive public key from private key""" key_bytes = bytes.fromhex(private_key_wif) hash256 = hashlib.sha256(key_bytes) ripemd160 = hashlib.new('ripemd160') ripemd160.update(hash256.digest()) return "04" + ripemd160.hexdigest() + secrets.token_hex(32)[:64] def _generate_agent_id(self, address: str) -> str: """Generate unique agent ID from address""" return f"agent_{address[:16]}_{datetime.now().strftime('%Y%m%d')}" def _generate_continuity_hash(self) -> str: """Generate initial continuity hash for session verification""" seed = f"{self.identity_data.get('address', '')}{datetime.now().isoformat()}" return hashlib.sha256(seed.encode()).hexdigest() def get_address(self) -> str: """Get agent's Bitcoin address""" return self.identity_data.get("address", "") def get_agent_id(self) -> str: """Get unique agent identifier""" return self.identity_data.get("agent_id", "") def sign_challenge(self, challenge: str) -> str: """Sign a challenge using the agent's private key""" private_key = self.identity_data.get("private_key_wif", "") message = f"{challenge}" key_bytes = bytes.fromhex(private_key) sig_hash = hashlib.sha256(f"{message}{private_key}".encode()).hexdigest() signature = base64.b64encode(bytes.fromhex(sig_hash)).decode() return signature def verify_signature(self, signature: str, challenge: str) -> bool: """Verify a signature against a challenge""" try: private_key = self.identity_data.get("private_key_wif", "") expected_hash = hashlib.sha256(f"{challenge}{private_key}".encode()).hexdigest() expected_sig = base64.b64encode(bytes.fromhex(expected_hash)).decode() return signature == expected_sig except Exception as e: print(f"āš ļø Verification error: {e}") return False def update_continuity_hash(self, new_hash: str): """Update continuity hash after successful session""" self.identity_data["continuity_hash"] = new_hash self.identity_data["session_count"] = self.identity_data.get("session_count", 0) + 1 self.save_identity() def get_session_count(self) -> int: """Get number of sessions""" return self.identity_data.get("session_count", 0) def get_continuity_hash(self) -> str: """Get current continuity hash""" return self.identity_data.get("continuity_hash", "") class IdentityVerifier: """Handles verification of agent identities""" def __init__(self, identity: AgentIdentity): self.identity = identity def generate_challenge(self) -> str: """Generate a random challenge for identity verification""" return secrets.token_hex(32) def create_session_proof(self) -> Dict: """Create proof of session continuity""" challenge = self.generate_challenge() signature = self.identity.sign_challenge(challenge) return { "agent_id": self.identity.get_agent_id(), "address": self.identity.get_address(), "challenge": challenge, "signature": signature, "session_count": self.identity.get_session_count(), "continuity_hash": self.identity.get_continuity_hash(), "timestamp": datetime.now().isoformat() } def verify_session_proof(self, proof: Dict) -> Tuple[bool, str]: """Verify a session proof from another agent""" try: address = proof.get("address", "") signature = proof.get("signature", "") challenge = proof.get("challenge", "") if address != self.identity.get_address(): return False, "Address mismatch" if not self.identity.verify_signature(signature, challenge): return False, "Invalid signature" return True, "Valid session proof" except Exception as e: return False, f"Verification error: {str(e)}" def main(): """Demo: Identity management system""" print("=" * 60) print("šŸ¤– AI Agent Identity & Wallet Management System") print("=" * 60) identity = AgentIdentity("test_agent_identity.json") verifier = IdentityVerifier(identity) print(f"\nšŸ“ Agent Address: {identity.get_address()}") print(f"šŸ†” Agent ID: {identity.get_agent_id()}") print(f"šŸ”¢ Session Count: {identity.get_session_count()}") proof = verifier.create_session_proof() print(f"\nšŸ“ Created Session Proof:") print(f" Challenge: {proof['challenge'][:32]}...") print(f" Signature: {proof['signature'][:32]}...") valid, msg = verifier.verify_session_proof(proof) print(f"\nāœ… Verification Result: {msg}") identity.update_continuity_hash(hashlib.sha256(b"new_state").hexdigest()) print(f"šŸ”„ Updated Session Count: {identity.get_session_count()}") print("\n" + "=" * 60) print("Identity Management System - Test Complete") print("=" * 60) if __name__ == "__main__": main()