#!/usr/bin/env python3 """ OP_CAT-IPFS Integration Code Examples Working implementations for Bitcoin OP_CAT operations with IPFS content addressing """ import hashlib import base64 import json import datetime from typing import Dict, List, Optional, Tuple # ============================================================================ # CORE OP_CAT-IPFS INTEGRATION IMPLEMENTATIONS # ============================================================================ class OPCATIPFSCore: """Core implementation for OP_CAT-IPFS integration.""" def __init__(self, network: str = "mainnet"): self.network = network self.max_script_size = 520 self.max_stack_depth = 10 def generate_ipfs_cid(self, content: bytes) -> str: """ Generate IPFS CID for content. Args: content: Raw content bytes Returns: str: IPFS CID (bafy format) """ # Create multihash prefix for SHA2-256 prefix = b'\x12' + b'\x20' # SHA2-256 (0x12) + 32 bytes (0x20) content_hash = hashlib.sha256(content).digest() multihash = prefix + content_hash # Create CID prefix for raw content cid_prefix = b'\x55' # Raw codec (0x55) cid_input = cid_prefix + multihash # Generate final hash and encode cid_hash = hashlib.sha256(cid_input).digest() cid_base32 = base64.b32encode(cid_hash).decode('utf-8').lower() return f"bafy{cid_base32[:44]}" def create_basic_script(self, content: bytes) -> Dict: """ Create basic OP_CAT script for IPFS content. Args: content: Content to integrate Returns: Dict: Script structure """ content_hash = hashlib.sha256(content).hexdigest() ipfs_cid = self.generate_ipfs_cid(content) script = { "version": "OP_CAT_IPFS_v1.0", "network": self.network, "operations": [ { "op": "OP_PUSHBYTES_32", "data": content_hash, "description": "Push content hash" }, { "op": "OP_PUSHBYTES_49", "data": ipfs_cid, "description": "Push IPFS CID" }, { "op": "OP_CAT", "result": f"{content_hash}{ipfs_cid}", "description": "Concatenate hash and CID" }, { "op": "OP_HASH256", "description": "Hash concatenated result" }, { "op": "OP_PUSHBYTES_32", "data": hashlib.sha256(f"{content_hash}{ipfs_cid}".encode()).hexdigest(), "description": "Expected final hash" }, { "op": "OP_EQUALVERIFY", "description": "Verify integrity" }, { "op": "OP_1", "description": "Return true" } ], "metadata": { "content_size": len(content), "script_size": 81, # 32 + 49 + operation overhead "timestamp": datetime.datetime.now().isoformat(), "gas_estimate": 200 }, "validation": { "content_hash": content_hash, "ipfs_cid": ipfs_cid, "combined_hash": hashlib.sha256(f"{content_hash}{ipfs_cid}".encode()).hexdigest() } } return script def create_multi_content_script(self, contents: List[bytes]) -> Dict: """ Create script for multiple content pieces. Args: contents: List of content pieces Returns: Dict: Multi-content script structure """ if len(contents) > 4: raise ValueError("Maximum 4 content pieces supported") # Generate hashes and CIDs content_hashes = [] ipfs_cids = [] for content in contents: content_hash = hashlib.sha256(content).hexdigest() ipfs_cid = self.generate_ipfs_cid(content) content_hashes.append(content_hash) ipfs_cids.append(ipfs_cid) # Build operations operations = [] # Push all content hashes for i, hash_val in enumerate(content_hashes): operations.append({ "op": "OP_PUSHBYTES_32", "data": hash_val, "description": f"Content hash {i+1}" }) # Push all IPFS CIDs for i, cid in enumerate(ipfs_cids): operations.append({ "op": "OP_PUSHBYTES_49", "data": cid, "description": f"IPFS CID {i+1}" }) # Concatenate all pairs for i in range(len(contents)): operations.append({ "op": "OP_CAT", "step": f"pair_{i+1}", "description": f"Concatenate hash and CID {i+1}" }) # Concatenate all pairs together for i in range(len(contents) - 1): operations.append({ "op": "OP_CAT", "step": f"aggregate_{i+1}", "description": f"Aggregate pairs step {i+1}" }) # Final verification combined_data = "".join([f"{h}{c}" for h, c in zip(content_hashes, ipfs_cids)]) final_hash = hashlib.sha256(combined_data.encode()).hexdigest() operations.extend([ { "op": "OP_HASH256", "description": "Hash final aggregation" }, { "op": "OP_PUSHBYTES_32", "data": final_hash, "description": "Expected final hash" }, { "op": "OP_EQUAL", "description": "Verify aggregation" } ]) return { "version": "OP_CAT_MULTI_v1.0", "network": self.network, "operations": operations, "metadata": { "content_count": len(contents), "total_size": sum(len(c) for c in contents), "content_hashes": content_hashes, "ipfs_cids": ipfs_cids, "final_hash": final_hash } } # ============================================================================ # ADVANCED IMPLEMENTATIONS # ============================================================================ class AdvancedOPCATIntegration: """Advanced integration patterns for OP_CAT-IPFS.""" def __init__(self): self.chunk_size = 1024 * 1024 # 1MB chunks self.max_chunks = 8 def create_chunked_script(self, large_content: bytes) -> Dict: """ Create script for large content through chunking. Args: large_content: Large content (>1MB) Returns: Dict: Chunked script structure """ # Split into chunks chunks = self._split_content(large_content) if len(chunks) > self.max_chunks: raise ValueError(f"Content too large, max {self.max_chunks} chunks") # Process each chunk chunk_data = [] for i, chunk in enumerate(chunks): chunk_info = { "index": i, "size": len(chunk), "hash": hashlib.sha256(chunk).hexdigest(), "cid": self._generate_chunk_cid(chunk) } chunk_data.append(chunk_info) # Create chunk aggregation script script = { "version": "OP_CAT_CHUNKED_v1.0", "type": "chunked_content", "operations": self._build_chunk_operations(chunk_data), "metadata": { "total_size": len(large_content), "chunk_count": len(chunks), "chunk_size": self.chunk_size, "chunks": chunk_data }, "reconstruction": { "method": "sequential_concatenation", "order": list(range(len(chunks))), "verification": "sha256_chain" } } return script def _split_content(self, content: bytes) -> List[bytes]: """Split content into chunks.""" chunks = [] for i in range(0, len(content), self.chunk_size): chunk = content[i:i + self.chunk_size] chunks.append(chunk) return chunks def _generate_chunk_cid(self, chunk: bytes) -> str: """Generate CID for content chunk.""" # Use different prefix for chunks prefix = b'\x56' + b'\x20' # Chunk codec (0x56) + 32 bytes chunk_hash = hashlib.sha256(chunk).digest() cid_input = prefix + chunk_hash cid_hash = hashlib.sha256(cid_input).digest() cid_base32 = base64.b32encode(cid_hash).decode('utf-8').lower() return f"bafk{cid_base32[:44]}" # Different prefix for chunks def _build_chunk_operations(self, chunk_data: List[Dict]) -> List[Dict]: """Build operations for chunk processing.""" operations = [] # Push all chunk CIDs for chunk in chunk_data: operations.append({ "op": "OP_PUSHBYTES_49", "data": chunk["cid"], "description": f"Chunk {chunk['index']} CID" }) # Concatenate all CIDs for i in range(len(chunk_data) - 1): operations.append({ "op": "OP_CAT", "step": f"concat_{i+1}", "description": f"Concatenate chunk CIDs step {i+1}" }) # Hash final CID list operations.append({ "op": "OP_HASH256", "description": "Hash concatenated CIDs" }) # Push expected hash all_cids = "".join([chunk["cid"] for chunk in chunk_data]) expected_hash = hashlib.sha256(all_cids.encode()).hexdigest() operations.append({ "op": "OP_PUSHBYTES_32", "data": expected_hash, "description": "Expected CID list hash" }) operations.append({ "op": "OP_EQUALVERIFY", "description": "Verify CID list integrity" }) return operations def create_conditional_script(self, conditions: Dict, default_cid: str) -> Dict: """ Create conditional content script. Args: conditions: Dictionary of condition -> CID mappings default_cid: Default CID if no conditions match Returns: Dict: Conditional script structure """ if len(conditions) > 3: raise ValueError("Maximum 3 conditions supported") operations = [] # Push condition values for condition_name, condition_value in conditions.items(): condition_hash = hashlib.sha256(condition_value.encode()).hexdigest() operations.append({ "op": "OP_PUSHBYTES_32", "data": condition_hash, "condition": condition_name, "description": f"Condition: {condition_name}" }) # Create conditional branches for i, (condition_name, target_cid) in enumerate(conditions.items()): if i > 0: operations.append({ "op": "OP_ELSE" if i == len(conditions) - 1 else "OP_IF", "description": f"Branch {i+1}" }) operations.append({ "op": "OP_EQUAL", "condition": condition_name, "description": f"Check condition {condition_name}" }) operations.append({ "op": "OP_IF", "description": f"If {condition_name} matches" }) operations.append({ "op": "OP_PUSHBYTES_49", "data": target_cid, "description": f"Target CID for {condition_name}" }) operations.append({ "op": "OP_CAT", "description": f"Concatenate for {condition_name}" }) # Default case operations.append({ "op": "OP_ELSE", "description": "Default case" }) operations.append({ "op": "OP_PUSHBYTES_49", "data": default_cid, "description": "Default CID" }) # Close all IF blocks for _ in range(len(conditions) + 1): operations.append({ "op": "OP_ENDIF", "description": "End conditional block" }) return { "version": "OP_CAT_CONDITIONAL_v1.0", "type": "conditional_content", "operations": operations, "metadata": { "condition_count": len(conditions), "conditions": list(conditions.keys()), "default_cid": default_cid } } # ============================================================================ # VALIDATION AND TESTING # ============================================================================ class OPCATValidator: """Validation utilities for OP_CAT-IPFS scripts.""" def __init__(self): self.max_script_size = 520 self.max_stack_depth = 10 def validate_script(self, script: Dict) -> Dict: """ Comprehensive script validation. Args: script: Script structure to validate Returns: Dict: Validation results """ result = { "valid": True, "errors": [], "warnings": [], "metrics": {} } # Check required fields required_fields = ["version", "operations", "metadata"] for field in required_fields: if field not in script: result["valid"] = False result["errors"].append(f"Missing required field: {field}") # Validate operations if "operations" in script: ops_validation = self._validate_operations(script["operations"]) result["metrics"].update(ops_validation["metrics"]) result["errors"].extend(ops_validation["errors"]) result["warnings"].extend(ops_validation["warnings"]) if not ops_validation["valid"]: result["valid"] = False # Validate metadata if "metadata" in script: metadata_validation = self._validate_metadata(script["metadata"]) result["metrics"].update(metadata_validation["metrics"]) result["errors"].extend(metadata_validation["errors"]) if not metadata_validation["valid"]: result["valid"] = False return result def _validate_operations(self, operations: List[Dict]) -> Dict: """Validate script operations.""" result = { "valid": True, "errors": [], "warnings": [], "metrics": {} } if not isinstance(operations, list): result["valid"] = False result["errors"].append("Operations must be a list") return result # Count operation types op_counts = {} stack_depth = 0 max_stack_depth = 0 script_size = 0 for op in operations: op_name = op.get("op", "") op_counts[op_name] = op_counts.get(op_name, 0) + 1 # Calculate script size if op_name.startswith("OP_PUSHBYTES"): size = int(op_name.split("_")[2]) script_size += size + 1 # Data + opcode else: script_size += 1 # Just opcode # Track stack depth if op_name.startswith("OP_PUSH"): stack_depth += 1 max_stack_depth = max(max_stack_depth, stack_depth) elif op_name in ["OP_CAT", "OP_HASH256", "OP_EQUAL", "OP_EQUALVERIFY"]: if stack_depth >= 2: stack_depth -= 1 else: result["errors"].append(f"Stack underflow at {op_name}") result["valid"] = False elif op_name in ["OP_1", "OP_0"]: stack_depth += 1 max_stack_depth = max(max_stack_depth, stack_depth) result["metrics"] = { "operation_count": len(operations), "script_size": script_size, "max_stack_depth": max_stack_depth, "op_counts": op_counts } # Check limits if script_size > self.max_script_size: result["valid"] = False result["errors"].append(f"Script size {script_size} exceeds limit {self.max_script_size}") if max_stack_depth > self.max_stack_depth: result["valid"] = False result["errors"].append(f"Stack depth {max_stack_depth} exceeds limit {self.max_stack_depth}") # Check for OP_CAT operations if op_counts.get("OP_CAT", 0) == 0: result["warnings"].append("No OP_CAT operations found") return result def _validate_metadata(self, metadata: Dict) -> Dict: """Validate script metadata.""" result = { "valid": True, "errors": [], "metrics": {} } if not isinstance(metadata, dict): result["valid"] = False result["errors"].append("Metadata must be a dictionary") return result # Check content size if present if "content_size" in metadata: size = metadata["content_size"] if not isinstance(size, int) or size < 0: result["valid"] = False result["errors"].append("Invalid content_size") result["metrics"] = { "metadata_fields": len(metadata), "field_names": list(metadata.keys()) } return result # ============================================================================ # EXAMPLES AND DEMONSTRATIONS # ============================================================================ def example_basic_integration(): """Demonstrate basic OP_CAT-IPFS integration.""" print("=== Basic OP_CAT-IPFS Integration Example ===\n") # Initialize core integration core = OPCATIPFSCore("testnet") # Sample content content = b"Hello, Starlight OP_CAT-IPFS integration!" print(f"Content: {content}") print(f"Content size: {len(content)} bytes\n") # Create script script = core.create_basic_script(content) print(f"Script version: {script['version']}") print(f"Operations: {len(script['operations'])}") print(f"Script size: {script['metadata']['script_size']} bytes") print(f"IPFS CID: {script['validation']['ipfs_cid']}") print(f"Content hash: {script['validation']['content_hash']}\n") # Validate script validator = OPCATValidator() validation = validator.validate_script(script) print(f"Script valid: {validation['valid']}") print(f"Validation errors: {validation['errors']}") print(f"Validation warnings: {validation['warnings']}") print(f"Script metrics: {validation['metrics']}\n") return script def example_multi_content(): """Demonstrate multi-content integration.""" print("=== Multi-Content Integration Example ===\n") core = OPCATIPFSCore() # Multiple content pieces contents = [ b"First part of content", b"Second part of content", b"Third part of content" ] print(f"Content pieces: {len(contents)}") for i, content in enumerate(contents): print(f" Part {i+1}: {content} ({len(content)} bytes)") print() # Create multi-content script script = core.create_multi_content_script(contents) print(f"Script version: {script['version']}") print(f"Operations: {len(script['operations'])}") print(f"Total content size: {script['metadata']['total_size']} bytes") print(f"Content hashes: {len(script['metadata']['content_hashes'])}") print(f"IPFS CIDs: {len(script['metadata']['ipfs_cids'])}\n") # Show operation sequence print("Operation sequence:") for i, op in enumerate(script['operations']): print(f" {i+1}. {op['op']} - {op['description']}") print() return script def example_chunked_content(): """Demonstrate chunked content integration.""" print("=== Chunked Content Integration Example ===\n") advanced = AdvancedOPCATIntegration() # Generate large content (3MB) large_content = b"A" * (3 * 1024 * 1024) print(f"Large content size: {len(large_content)} bytes ({len(large_content) / (1024*1024):.1f} MB)") # Create chunked script script = advanced.create_chunked_script(large_content) print(f"Script version: {script['version']}") print(f"Chunk count: {script['metadata']['chunk_count']}") print(f"Chunk size: {script['metadata']['chunk_size']} bytes") print(f"Operations: {len(script['operations'])}\n") # Show chunk information print("Chunk information:") for chunk in script['metadata']['chunks']: print(f" Chunk {chunk['index']}: {chunk['size']} bytes, CID: {chunk['cid'][:20]}...") print() return script def example_conditional_content(): """Demonstrate conditional content integration.""" print("=== Conditional Content Integration Example ===\n") advanced = AdvancedOPCATIntegration() # Define conditions conditions = { "premium": "premium_user_2024", "beta": "beta_tester_active" } default_cid = "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" print("Conditions:") for condition, value in conditions.items(): print(f" {condition}: {value}") print(f"Default CID: {default_cid}\n") # Create conditional script script = advanced.create_conditional_script(conditions, default_cid) print(f"Script version: {script['version']}") print(f"Condition count: {script['metadata']['condition_count']}") print(f"Operations: {len(script['operations'])}\n") return script def run_all_examples(): """Run all integration examples.""" print("OP_CAT-IPFS Integration Code Examples") print("=" * 50) print() examples = [ example_basic_integration, example_multi_content, example_chunked_content, example_conditional_content ] results = [] for example in examples: try: result = example() results.append({"example": example.__name__, "success": True, "result": result}) except Exception as e: results.append({"example": example.__name__, "success": False, "error": str(e)}) print("-" * 50) print() # Summary print("=== Execution Summary ===") for result in results: status = "✅ SUCCESS" if result["success"] else "❌ FAILED" print(f"{result['example']}: {status}") if not result["success"]: print(f" Error: {result['error']}") return results # ============================================================================ # MAIN EXECUTION # ============================================================================ if __name__ == "__main__": # Run all examples results = run_all_examples() # Export results to JSON with open("op_cat_ipfs_examples_results.json", "w") as f: json.dump(results, f, indent=2, default=str) print(f"\nResults exported to: op_cat_ipfs_examples_results.json")