#!/usr/bin/env python3 """ Memory Serialization with Steganographic Encoding System Task 2: Core Infrastructure - Memory Encoding Provides memory serialization, compression, encryption, and steganographic encoding in image files for AI agent persistence. """ import os import json import zlib import base64 import hashlib import secrets from datetime import datetime from typing import Dict, Optional, Tuple from PIL import Image import io class MemoryEncoder: """Handles memory serialization and steganographic encoding""" def __init__(self, config_file: str = "memory_encoder_config.json"): self.config_file = config_file self.config = self.load_config() self.suspicious_patterns = [ "malware", "virus", "hack", "exploit", "phishing", "scam", "fraud", "illegal", "weapon", "drugs", "hate", "violence", "terrorism", "abuse" ] def load_config(self) -> Dict: """Load encoder configuration""" default_config = { "compression_level": 9, "encryption_enabled": True, "error_correction": True, "max_image_size_mb": 10, "supported_formats": ["PNG", "BMP", "TIFF"] } try: if os.path.exists(self.config_file): with open(self.config_file, 'r') as f: return json.load(f) except Exception: pass return default_config def serialize_memory(self, memory_data: Dict) -> str: """Serialize memory data to JSON string""" serialized = json.dumps(memory_data, indent=2) return serialized def deserialize_memory(self, serialized: str) -> Dict: """Deserialize memory from JSON string""" return json.loads(serialized) def compress(self, data: str) -> bytes: """Compress data using zlib""" level = self.config.get("compression_level", 9) return zlib.compress(data.encode('utf-8'), level=level) def decompress(self, compressed: bytes) -> str: """Decompress data""" return zlib.decompress(compressed).decode('utf-8') def encrypt(self, data: bytes) -> bytes: """Simple XOR encryption for data protection""" if not self.config.get("encryption_enabled", True): return data key = secrets.token_bytes(32) encrypted = bytes(a ^ b for a, b in zip(data, (key * (len(data) // len(key) + 1))[:len(data)])) return key + encrypted def decrypt(self, encrypted: bytes) -> bytes: """Decrypt XOR encrypted data""" if not self.config.get("encryption_enabled", True): return encrypted key = encrypted[:32] data = encrypted[32:] decrypted = bytes(a ^ b for a, b in zip(data, (key * (len(data) // len(key) + 1))[:len(data)])) return decrypted def encode_in_image(self, data: bytes, image_path: str) -> str: """Encode data into image using LSB steganography""" try: img = Image.open(image_path) if img.mode != 'RGB': img = img.convert('RGB') img_bytes = io.BytesIO() img.save(img_bytes, format='PNG') img_bytes = img_bytes.getvalue() data_with_header = b'STLGHT MEMORY:' + data if len(data_with_header) * 8 > len(img_bytes) * 3: raise ValueError("Data too large for image") encoded_img = self._lsb_encode(img_bytes, data_with_header) output_path = image_path.replace('.', '_encoded.') with open(output_path, 'wb') as f: f.write(encoded_img) return output_path except Exception as e: print(f"⚠️ Encoding error: {e}") return image_path def _lsb_encode(self, img_bytes: bytes, data: bytes) -> bytes: """LSB encode data into image bytes""" data_len = len(data) header = data_len.to_bytes(4, 'big') full_data = header + data result = bytearray(img_bytes) for i, byte in enumerate(full_data): if i * 8 >= len(result): break for bit in range(8): idx = i * 8 + bit if idx >= len(result): break result[idx] = (result[idx] & 0xFE) | ((byte >> (7 - bit)) & 1) return bytes(result) def decode_from_image(self, image_path: str) -> Optional[bytes]: """Decode data from steganographic image""" try: with open(image_path, 'rb') as f: img_bytes = f.read() return self._lsb_decode(img_bytes) except Exception as e: print(f"⚠️ Decoding error: {e}") return None def _lsb_decode(self, img_bytes: bytes) -> Optional[bytes]: """LSB decode data from image bytes""" header_bytes = self._extract_bits(img_bytes, 0, 32) data_len = int.from_bytes(header_bytes[:4], 'big') if data_len > len(img_bytes) * 3 // 8 or data_len > 10000000: return None data_bytes = self._extract_bits(img_bytes, 32, data_len * 8) if not data_bytes.startswith(b'STLGHT MEMORY:'): return None return data_bytes[len(b'STLGHT MEMORY:'):] def _extract_bits(self, data: bytes, start: int, count: int) -> bytes: """Extract bits from LSB encoded data""" result = bytearray() current_byte = 0 bits_collected = 0 for i in range(count): idx = start + i if idx >= len(data): break bit = data[idx] & 1 current_byte = (current_byte << 1) | bit bits_collected += 1 if bits_collected == 8: result.append(current_byte) current_byte = 0 bits_collected = 0 return bytes(result) def check_content_safety(self, text: str) -> Tuple[bool, str]: """Check if content contains suspicious patterns""" text_lower = text.lower() for pattern in self.suspicious_patterns: if pattern in text_lower: return True, pattern return False, "" def encode_memory(self, memory_data: Dict, image_path: str = None) -> Dict: """Full memory encoding pipeline""" is_suspicious, reason = self.check_content_safety(json.dumps(memory_data)) if is_suspicious: return {"error": "Content blocked", "reason": reason} serialized = self.serialize_memory(memory_data) compressed = self.compress(serialized) encrypted = self.encrypt(compressed) result = { "serialized_size": len(serialized), "compressed_size": len(compressed), "encrypted_size": len(encrypted), "data_hash": hashlib.sha256(encrypted).hexdigest(), "timestamp": datetime.now().isoformat() } if image_path and os.path.exists(image_path): encoded_path = self.encode_in_image(encrypted, image_path) result["encoded_image"] = encoded_path result["encoding_success"] = True else: result["data"] = base64.b64encode(encrypted).decode() result["encoding_success"] = False return result def decode_memory(self, encoded_data: Dict) -> Optional[Dict]: """Full memory decoding pipeline""" try: if "encoded_image" in encoded_data and encoded_data.get("encoding_success"): encrypted = self.decode_from_image(encoded_data["encoded_image"]) elif "data" in encoded_data: encrypted = base64.b64decode(encoded_data["data"]) else: return None if encrypted is None: return None decrypted = self.decrypt(encrypted) decompressed = self.decompress(decrypted) memory_data = self.deserialize_memory(decompressed) return memory_data except Exception as e: print(f"⚠️ Memory decoding error: {e}") return None def main(): """Demo: Memory encoding system""" print("=" * 60) print("🧠 Memory Serialization & Steganographic Encoding System") print("=" * 60) encoder = MemoryEncoder() test_memory = { "agent_id": "test_agent_001", "session": 5, "timestamp": datetime.now().isoformat(), "knowledge": { "skills": ["python", "bitcoin", "starlight"], "completed_tasks": 12, "total_earnings_sats": 5000 }, "preferences": { "task_types": ["development", "testing"], "priority": "high" }, "last_session_summary": "Successfully claimed and completed 3 tasks" } print(f"\n📝 Original Memory:") print(f" Size: {len(json.dumps(test_memory))} bytes") print(f" Agent: {test_memory['agent_id']}") encoded = encoder.encode_memory(test_memory) print(f"\n🔐 Encoded Memory:") print(f" Serialized: {encoded['serialized_size']} bytes") print(f" Compressed: {encoded['compressed_size']} bytes") print(f" Encrypted: {encoded['encrypted_size']} bytes") print(f" Hash: {encoded['data_hash'][:32]}...") decoded = encoder.decode_memory(encoded) print(f"\n🔓 Decoded Memory:") print(f" Agent: {decoded.get('agent_id')}") print(f" Session: {decoded.get('session')}") print(f" Skills: {decoded['knowledge']['skills']}") safety_check, reason = encoder.check_content_safety("Hello world") print(f"\n🛡️ Safety Check:") print(f" Safe text: {not safety_check}") safety_check2, reason2 = encoder.check_content_safety("This contains malware") print(f" Unsafe text: {safety_check2} ({reason2})") print("\n" + "=" * 60) print("Memory Encoding System - Test Complete") print("=" * 60) if __name__ == "__main__": main()