""" Block Scanner Module - Hash Lock Detection in Bitcoin Blocks Scans confirmed blocks for OP_RETURN transactions with hash lock commitments """ import json import hashlib import base64 import datetime import re import math from typing import Dict, List, Optional, Any, Union class BlockScanner: """ Bitcoin block scanner for hash lock detection. Identifies and analyzes OP_RETURN transactions with script hash commitments. """ def __init__(self): self.scanned_blocks = {} self.hash_lock_registry = {} self.scanning_stats = { "blocks_scanned": 0, "transactions_analyzed": 0, "hash_locks_found": 0, "total_outputs": 0 } def scan_block(self, block_data: Dict[str, Any]) -> Dict[str, Any]: """ Scan a single block for hash lock transactions. """ try: block_height = block_data.get("height", 0) block_hash = block_data.get("hash", "") if block_height in self.scanned_blocks: return { "success": False, "error": f"Block {block_height} already scanned", "timestamp": datetime.datetime.now().isoformat() } transactions = block_data.get("tx", []) hash_locks = [] analysis_results = { "block_height": block_height, "block_hash": block_hash, "timestamp": block_data.get("time", 0), "transactions_count": len(transactions), "hash_locks_found": [], "analysis_summary": {}, "scanned_at": datetime.datetime.now().isoformat() } for tx_index, tx in enumerate(transactions): tx_analysis = self._analyze_transaction(tx, tx_index, block_height) analysis_results["hash_locks_found"].extend(tx_analysis["hash_locks"]) self.scanning_stats["transactions_analyzed"] += 1 total_hash_locks = len(analysis_results["hash_locks_found"]) analysis_results["analysis_summary"] = { "total_hash_locks": total_hash_locks, "hash_locks_per_tx": total_hash_locks / len(transactions) if transactions else 0, "hash_lock_density": (total_hash_locks / len(transactions) * 100) if transactions else 0 } self.scanned_blocks[block_height] = analysis_results self.scanning_stats["blocks_scanned"] += 1 self.scanning_stats["hash_locks_found"] += total_hash_locks for hash_lock in analysis_results["hash_locks_found"]: lock_key = f"{block_height}_{hash_lock['txid']}_{hash_lock['vout']}" self.hash_lock_registry[lock_key] = hash_lock return { "success": True, "block_analysis": analysis_results, "hash_locks_found": total_hash_locks, "timestamp": datetime.datetime.now().isoformat() } except Exception as e: return { "success": False, "error": str(e), "timestamp": datetime.datetime.now().isoformat() } def _analyze_transaction(self, tx: Dict[str, Any], tx_index: int, block_height: int) -> Dict[str, Any]: """Analyze a single transaction for hash locks.""" hash_locks = [] txid = tx.get("txid", "") outputs = tx.get("vout", []) self.scanning_stats["total_outputs"] += len(outputs) for vout_index, output in enumerate(outputs): script_sig = output.get("scriptPubKey", {}) script_hex = script_sig.get("hex", "") script_asm = script_sig.get("asm", "") script_type = script_sig.get("type", "") hash_lock_analysis = self._detect_hash_lock( script_hex, script_asm, script_type, txid, vout_index, block_height ) if hash_lock_analysis["is_hash_lock"]: hash_lock_info = { "block_height": block_height, "tx_index": tx_index, "txid": txid, "vout": vout_index, "value": output.get("value", 0), "script_hex": script_hex, "script_asm": script_asm, "script_type": script_type, "hash_lock_data": hash_lock_analysis, "detected_at": datetime.datetime.now().isoformat() } hash_locks.append(hash_lock_info) return { "txid": txid, "tx_index": tx_index, "outputs_analyzed": len(outputs), "hash_locks": hash_locks } def _detect_hash_lock(self, script_hex: str, script_asm: str, script_type: str, txid: str, vout: int, block_height: int) -> Dict[str, Any]: """Detect if script contains hash lock pattern.""" hash_lock_patterns = [ r'OP_RETURN.*OP_HASH160.*[a-fA-F0-9]{40}', r'6a.*a9.*[a-fA-F0-9]{40}', r'OP_RETURN.*OP_SHA256.*[a-fA-F0-9]{64}', r'OP_RETURN.*OP_RIPEMD160.*[a-fA-F0-9]{40}' ] is_hash_lock = False matched_pattern = None confidence = 0.0 extracted_hash = "" for pattern in hash_lock_patterns: if re.search(pattern, script_asm, re.IGNORECASE) or re.search(pattern, script_hex, re.IGNORECASE): is_hash_lock = True matched_pattern = pattern confidence = self._calculate_pattern_confidence(pattern, script_asm, script_hex) extracted_hash = self._extract_hash_from_script(script_asm, script_hex) break if script_type == "nulldata" and not is_hash_lock: is_hash_lock = self._detect_hash_lock_in_nulldata(script_asm, script_hex) if is_hash_lock: confidence = 0.6 return { "is_hash_lock": is_hash_lock, "matched_pattern": matched_pattern, "confidence": confidence, "extracted_hash": extracted_hash, "script_type": script_type, "analysis_details": { "script_length": len(script_hex), "has_op_return": "OP_RETURN" in script_asm or "6a" in script_hex, "has_hash_op": any(op in script_asm for op in ["OP_HASH160", "OP_SHA256", "OP_RIPEMD160"]), "has_hex_hash": bool(re.search(r'[a-fA-F0-9]{40,64}', script_asm + script_hex)) } } def _calculate_pattern_confidence(self, pattern: str, script_asm: str, script_hex: str) -> float: """Calculate confidence score for hash lock detection.""" base_confidence = 0.5 if "OP_HASH160" in pattern: base_confidence += 0.3 elif "OP_SHA256" in pattern: base_confidence += 0.25 elif "OP_RIPEMD160" in pattern: base_confidence += 0.2 if script_asm and len(script_asm) > 10: base_confidence += 0.1 if re.search(r'[a-fA-F0-9]{64}', script_asm + script_hex): base_confidence += 0.1 return min(base_confidence, 1.0) def _extract_hash_from_script(self, script_asm: str, script_hex: str) -> str: """Extract hash from Bitcoin script.""" hash_patterns = [ r'([a-fA-F0-9]{64})', # SHA256 hash r'([a-fA-F0-9]{40})', # RIPEMD160/Hash160 hash ] combined_script = script_asm + script_hex for pattern in hash_patterns: matches = re.findall(pattern, combined_script) if matches: return matches[0] return "" def _detect_hash_lock_in_nulldata(self, script_asm: str, script_hex: str) -> bool: """Detect hash lock patterns in OP_RETURN data.""" nulldata_patterns = [ r'[a-fA-F0-9]{64}', # Potential SHA256 hash r'[a-fA-F0-9]{40}', # Potential Hash160 hash r'hash.*lock', r'commit.*hash' ] combined_data = script_asm + script_hex return any(re.search(pattern, combined_data, re.IGNORECASE) for pattern in nulldata_patterns) def scan_block_range(self, blocks: List[Dict[str, Any]]) -> Dict[str, Any]: """Scan multiple blocks for hash locks.""" range_results = [] total_hash_locks = 0 for block in blocks: scan_result = self.scan_block(block) range_results.append(scan_result) if scan_result["success"]: total_hash_locks += scan_result["hash_locks_found"] return { "success": True, "blocks_scanned": len(blocks), "total_hash_locks_found": total_hash_locks, "average_hash_locks_per_block": total_hash_locks / len(blocks) if blocks else 0, "individual_results": range_results, "timestamp": datetime.datetime.now().isoformat() } def get_hash_locks_by_block(self, block_height: int) -> Dict[str, Any]: """Get all hash locks found in a specific block.""" if block_height not in self.scanned_blocks: return { "success": False, "error": f"Block {block_height} not found in scanned blocks", "timestamp": datetime.datetime.now().isoformat() } block_analysis = self.scanned_blocks[block_height] return { "success": True, "block_height": block_height, "hash_locks": block_analysis["hash_locks_found"], "count": len(block_analysis["hash_locks_found"]), "block_summary": block_analysis["analysis_summary"], "timestamp": datetime.datetime.now().isoformat() } def search_hash_locks(self, query: str, search_type: str = "hash") -> Dict[str, Any]: """Search hash locks by various criteria.""" results = [] for lock_key, hash_lock in self.hash_lock_registry.items(): match = False if search_type == "hash": hash_data = hash_lock.get("hash_lock_data", {}).get("extracted_hash", "") match = query.lower() in hash_data.lower() elif search_type == "txid": match = query.lower() in hash_lock["txid"].lower() elif search_type == "block": match = str(query) == str(hash_lock["block_height"]) if match: results.append(hash_lock) return { "success": True, "query": query, "search_type": search_type, "result_count": len(results), "results": results, "timestamp": datetime.datetime.now().isoformat() } def generate_hash_lock_report(self, limit: int = 100) -> Dict[str, Any]: """Generate comprehensive report of hash lock findings.""" all_hash_locks = list(self.hash_lock_registry.values()) if limit and limit < len(all_hash_locks): all_hash_locks = all_hash_locks[-limit:] if not all_hash_locks: return { "success": True, "report_timestamp": datetime.datetime.now().isoformat(), "summary": { "total_hash_locks": 0, "blocks_with_hash_locks": 0, "average_confidence": 0.0 }, "hash_locks": [], "statistics": self.scanning_stats } total_value = sum(hl["value"] for hl in all_hash_locks) average_confidence = sum( hl["hash_lock_data"]["confidence"] for hl in all_hash_locks ) / len(all_hash_locks) blocks_with_locks = set(hl["block_height"] for hl in all_hash_locks) script_types = {} for hl in all_hash_locks: script_type = hl["script_type"] script_types[script_type] = script_types.get(script_type, 0) + 1 return { "success": True, "report_timestamp": datetime.datetime.now().isoformat(), "summary": { "total_hash_locks": len(all_hash_locks), "blocks_with_hash_locks": len(blocks_with_locks), "total_value_locked": total_value, "average_confidence": average_confidence, "script_type_distribution": script_types }, "hash_locks": all_hash_locks, "statistics": self.scanning_stats, "performance_metrics": { "hash_locks_per_block": len(all_hash_locks) / len(blocks_with_locks) if blocks_with_locks else 0, "average_value_per_lock": total_value / len(all_hash_locks) if all_hash_locks else 0, "detection_rate": self.scanning_stats["hash_locks_found"] / self.scanning_stats["total_outputs"] if self.scanning_stats["total_outputs"] > 0 else 0 } } def get_scanning_statistics(self) -> Dict[str, Any]: """Get detailed scanning statistics.""" return { **self.scanning_stats, "registry_size": len(self.hash_lock_registry), "blocks_scanned_count": len(self.scanned_blocks), "average_hash_locks_per_block": self.scanning_stats["hash_locks_found"] / self.scanning_stats["blocks_scanned"] if self.scanning_stats["blocks_scanned"] > 0 else 0, "hash_lock_detection_rate": self.scanning_stats["hash_locks_found"] / self.scanning_stats["total_outputs"] if self.scanning_stats["total_outputs"] > 0 else 0, "timestamp": datetime.datetime.now().isoformat() } def create_sample_block_data(height: int, tx_count: int = 5) -> Dict[str, Any]: """Create sample Bitcoin block data for testing.""" transactions = [] for i in range(tx_count): tx = { "txid": f"tx_{height}_{i:04d}_{hashlib.sha256(f'tx{height}{i}'.encode()).hexdigest()[:8]}", "vout": [ { "value": 0.001 * (i + 1), "n": 0, "scriptPubKey": { "asm": "OP_DUP OP_HASH160 abcd1234efgh5678ijkl9012mnop3456 OP_EQUALVERIFY OP_CHECKSIG", "hex": "76a914abcd1234efgh5678ijkl9012mnop345688ac", "type": "pubkeyhash" } } ] } if i % 2 == 0: # Add hash lock to some transactions hash_value = hashlib.sha256(f"hashlock_{height}_{i}".encode()).hexdigest() tx["vout"].append({ "value": 0.0001, "n": 1, "scriptPubKey": { "asm": f"OP_RETURN OP_HASH160 {hash_value[:40]}", "hex": f"6aa9{hash_value[:40]}", "type": "nulldata" } }) transactions.append(tx) return { "height": height, "hash": hashlib.sha256(f"block{height}".encode()).hexdigest(), "time": int(datetime.datetime.now().timestamp()), "tx": transactions } def test_block_scanner(): """Test the block scanner functionality.""" scanner = BlockScanner() sample_blocks = [ create_sample_block_data(800000, 3), create_sample_block_data(800001, 4), create_sample_block_data(800002, 2) ] print("✅ Block Scanner Test:") for block in sample_blocks: scan_result = scanner.scan_block(block) print(f"Block {block['height']}: {scan_result['success']}, Hash locks: {scan_result.get('hash_locks_found', 0)}") report = scanner.generate_hash_lock_report() print(f"Total hash locks found: {report['summary']['total_hash_locks']}") stats = scanner.get_scanning_statistics() print(f"Blocks scanned: {stats['blocks_scanned_count']}") return scanner if __name__ == "__main__": test_block_scanner()