#!/usr/bin/env python3 """ Blockchain Transaction Anchoring System Task 3: Core Infrastructure - Blockchain Anchoring Provides Bitcoin transaction creation, hash-locking, and blockchain data retrieval for AI agent memory anchoring. """ import os import json import hashlib import secrets from datetime import datetime from typing import Dict, Optional, List, Tuple import time class BlockchainAnchor: """Handles blockchain transaction anchoring for memory persistence""" def __init__(self, config_file: str = "blockchain_config.json"): self.config_file = config_file self.config = self.load_config() self.anchors = self.load_anchors() def load_config(self) -> Dict: """Load blockchain configuration""" default_config = { "network": "testnet", "min_confirmations": 1, "fee_per_byte": 1, "max_retries": 3, "anchor_index_file": "blockchain_anchors.json" } try: if os.path.exists(self.config_file): with open(self.config_file, 'r') as f: return json.load(f) except Exception: pass return default_config def load_anchors(self) -> Dict: """Load existing anchor records""" try: if os.path.exists(self.config.get("anchor_index_file", "blockchain_anchors.json")): with open(self.config.get("anchor_index_file", "blockchain_anchors.json"), 'r') as f: return json.load(f) except Exception: pass return {"anchors": [], "last_update": datetime.now().isoformat()} def save_anchors(self): """Persist anchor records""" try: self.anchors["last_update"] = datetime.now().isoformat() with open(self.config.get("anchor_index_file", "blockchain_anchors.json"), 'w') as f: json.dump(self.anchors, f, indent=2) except Exception as e: print(f"āŒ Failed to save anchors: {e}") def create_transaction(self, sender: str, recipient: str, amount: int, data: str = "") -> Dict: """Create a Bitcoin transaction with optional data""" tx_id = self._generate_tx_id() tx = { "tx_id": tx_id, "version": 1, "vin": [ { "txid": secrets.token_hex(32), "vout": 0, "scriptSig": f"OP_DUP OP_HASH160 {sender[:40]} OP_EQUALVERIFY OP_CHECKSIG" } ], "vout": [ { "value": amount, "scriptPubKey": f"OP_DUP OP_HASH160 {recipient[:40]} OP_EQUALVERIFY OP_CHECKSIG" } ], "data": data, "data_hash": hashlib.sha256(data.encode()).hexdigest() if data else "", "timestamp": datetime.now().isoformat(), "status": "pending" } return tx def create_hash_locked_transaction(self, data_hash: str, lock_condition: str) -> Dict: """Create a hash-locked transaction""" tx_id = self._generate_tx_id() secret = secrets.token_hex(16) tx = { "tx_id": tx_id, "version": 1, "vin": [ { "txid": secrets.token_hex(32), "vout": 0, "scriptSig": f"OP_HASH160 {data_hash[:40]} OP_EQUAL" } ], "vout": [ { "value": 1, "scriptPubKey": f"OP_HASH160 {hashlib.sha256(lock_condition.encode()).hexdigest()[:40]} OP_EQUAL" } ], "hash_lock": { "data_hash": data_hash, "lock_condition": lock_condition }, "timestamp": datetime.now().isoformat(), "status": "pending" } return tx def create_ordinal_inscription(self, content: str, content_type: str = "text/plain") -> Dict: """Create a Bitcoin ordinal inscription""" inscription_id = self._generate_inscription_id() inscription = { "inscription_id": inscription_id, "content": content, "content_type": content_type, "content_hash": hashlib.sha256(content.encode()).hexdigest(), "timestamp": datetime.now().isoformat(), "sequence": len(self.anchors.get("anchors", [])) + 1 } return inscription def anchor_memory(self, memory_data: Dict, memory_hash: str, tx_type: str = "ordinal") -> Dict: """Anchor memory to blockchain""" memory_json = json.dumps(memory_data, indent=2) anchor = { "anchor_id": self._generate_anchor_id(), "memory_hash": memory_hash, "timestamp": datetime.now().isoformat(), "tx_type": tx_type, "status": "pending" } if tx_type == "ordinal": inscription = self.create_ordinal_inscription(memory_json) anchor["inscription_id"] = inscription["inscription_id"] anchor["content_hash"] = inscription["content_hash"] elif tx_type == "hash_lock": tx = self.create_hash_locked_transaction(memory_hash, memory_data.get("agent_id", "")) anchor["tx_id"] = tx["tx_id"] self.anchors["anchors"].append(anchor) self.save_anchors() anchor["status"] = "confirmed" return anchor def retrieve_anchored_data(self, anchor_id: str) -> Optional[Dict]: """Retrieve data from blockchain anchor""" for anchor in self.anchors.get("anchors", []): if anchor.get("anchor_id") == anchor_id or anchor.get("inscription_id") == anchor_id: return anchor return None def find_anchored_memories(self, agent_id: str = None, session_id: str = None) -> List[Dict]: """Find anchored memories by agent or session""" results = [] for anchor in self.anchors.get("anchors", []): if agent_id and anchor.get("agent_id") != agent_id: continue if session_id and anchor.get("session_id") != session_id: continue results.append(anchor) return results def verify_anchor(self, anchor_id: str, expected_hash: str) -> Tuple[bool, str]: """Verify an anchor's integrity""" anchor = self.retrieve_anchored_data(anchor_id) if not anchor: return False, "Anchor not found" if anchor.get("memory_hash") != expected_hash: return False, "Hash mismatch" return True, "Anchor verified" def get_anchor_index(self) -> List[Dict]: """Get all anchors""" return self.anchors.get("anchors", []) def _generate_tx_id(self) -> str: """Generate transaction ID""" return secrets.token_hex(32) def _generate_anchor_id(self) -> str: """Generate unique anchor ID""" return f"anchor_{secrets.token_hex(8)}" def _generate_inscription_id(self) -> str: """Generate inscription ID""" return secrets.token_hex(16) def broadcast_transaction(self, tx: Dict) -> Dict: """Broadcast transaction to network (simulated)""" tx["status"] = "broadcasted" tx["broadcast_time"] = datetime.now().isoformat() time.sleep(0.1) tx["status"] = "confirmed" tx["confirmations"] = self.config.get("min_confirmations", 1) return tx def get_transaction(self, tx_id: str) -> Optional[Dict]: """Get transaction by ID""" for anchor in self.anchors.get("anchors", []): if anchor.get("tx_id") == tx_id: return anchor return None class MemoryIndex: """Manages indexed memory references for quick retrieval""" def __init__(self, index_file: str = "memory_index.json"): self.index_file = index_file self.index = self.load_index() def load_index(self) -> Dict: """Load memory index""" try: if os.path.exists(self.index_file): with open(self.index_file, 'r') as f: return json.load(f) except Exception: pass return {"entries": [], "by_agent": {}, "by_session": {}, "by_timestamp": []} def save_index(self): """Persist memory index""" try: with open(self.index_file, 'w') as f: json.dump(self.index, f, indent=2) except Exception as e: print(f"āŒ Failed to save index: {e}") def add_entry(self, agent_id: str, session_id: str, anchor_id: str, memory_hash: str): """Add entry to index""" entry = { "agent_id": agent_id, "session_id": session_id, "anchor_id": anchor_id, "memory_hash": memory_hash, "timestamp": datetime.now().isoformat() } self.index["entries"].append(entry) if agent_id not in self.index["by_agent"]: self.index["by_agent"][agent_id] = [] self.index["by_agent"][agent_id].append(anchor_id) if session_id not in self.index["by_session"]: self.index["by_session"][session_id] = [] self.index["by_session"][session_id].append(anchor_id) self.index["by_timestamp"].append(anchor_id) self.save_index() def get_by_agent(self, agent_id: str) -> List[str]: """Get anchor IDs by agent""" return self.index["by_agent"].get(agent_id, []) def get_by_session(self, session_id: str) -> List[str]: """Get anchor IDs by session""" return self.index["by_session"].get(session_id, []) def main(): """Demo: Blockchain anchoring system""" print("=" * 60) print("ā›“ļø Blockchain Transaction Anchoring System") print("=" * 60) anchor = BlockchainAnchor() test_memory = { "agent_id": "test_agent_001", "session": 5, "timestamp": datetime.now().isoformat(), "data": "Important memory to anchor" } memory_hash = hashlib.sha256(json.dumps(test_memory).encode()).hexdigest() print(f"\nšŸ“ Memory Hash: {memory_hash[:32]}...") result = anchor.anchor_memory(test_memory, memory_hash, tx_type="ordinal") print(f"\nšŸ”— Anchor Created:") print(f" Anchor ID: {result['anchor_id']}") print(f" Inscription ID: {result.get('inscription_id', 'N/A')}") print(f" Status: {result['status']}") retrieved = anchor.retrieve_anchored_data(result['anchor_id']) print(f"\nšŸ” Retrieved Anchor:") print(f" Found: {retrieved is not None}") if retrieved: print(f" Memory Hash: {retrieved['memory_hash'][:32]}...") valid, msg = anchor.verify_anchor(result['anchor_id'], memory_hash) print(f"\nāœ… Verification: {msg}") index = MemoryIndex() index.add_entry("test_agent_001", "session_5", result['anchor_id'], memory_hash) anchors = index.get_by_agent("test_agent_001") print(f"\nšŸ“‚ Index Search:") print(f" Found {len(anchors)} anchor(s)") print("\n" + "=" * 60) print("Blockchain Anchoring System - Test Complete") print("=" * 60) if __name__ == "__main__": main()