#!/usr/bin/env python3 """ Concert Data Lake Architecture Design and implementation of comprehensive data collection and storage framework """ import json import math import base64 import hashlib import datetime import re import string import itertools import collections import dataclasses import random from typing import Dict, List, Optional, Any, Union, Tuple from dataclasses import dataclass, field from collections import defaultdict from enum import Enum class DataType(Enum): TICKETING = "ticketing" ATTENDANCE = "attendance" PERFORMANCE = "performance" FINANCIAL = "financial" SOCIAL_MEDIA = "social_media" VENUE_SENSOR = "venue_sensor" AUDIO_VISUAL = "audio_visual" LOGISTICS = "logistics" STAFF = "staff" WEATHER = "weather" class DataQuality(Enum): HIGH = "high" MEDIUM = "medium" LOW = "low" UNKNOWN = "unknown" @dataclass class DataStream: """Individual data stream configuration""" stream_id: str name: str data_type: DataType source: str schema: Dict[str, Any] frequency: str # real-time, hourly, daily, etc. quality: DataQuality retention_days: int = 365 encrypted: bool = False compressed: bool = True @dataclass class DataPartition: """Data partitioning strategy""" partition_key: str partition_type: str # time, event, venue, etc. partition_value: str size_mb: float record_count: int created_at: datetime.datetime = field(default_factory=datetime.datetime.now) class DataLakeArchitecture: """ Comprehensive data lake architecture for concert data Implements storage, partitioning, and retrieval strategies """ def __init__(self): self.streams = {} self.partitions = [] self.metadata = { "total_storage_gb": 0.0, "total_records": 0, "active_streams": 0, "data_quality_score": 0.0 } self.partition_strategy = { "time_based": ["hour", "day", "week", "month"], "event_based": ["event_id", "venue_id", "artist_id"], "data_type_based": ["data_type", "source_system"] } def register_data_stream(self, stream: DataStream) -> bool: """Register a new data stream""" try: self.streams[stream.stream_id] = stream self.metadata["active_streams"] += 1 return True except Exception as e: return False def create_partition(self, stream_id: str, partition_key: str, partition_value: str, data_size: float, record_count: int) -> bool: """Create a new data partition""" try: partition = DataPartition( partition_key=partition_key, partition_type=stream_id, partition_value=partition_value, size_mb=data_size, record_count=record_count ) self.partitions.append(partition) # Update metadata self.metadata["total_storage_gb"] += data_size / 1024 self.metadata["total_records"] += record_count return True except Exception as e: return False def optimize_partitioning(self, stream_id: str) -> Dict[str, Any]: """Calculate optimal partitioning strategy for a stream""" stream = self.streams.get(stream_id) if not stream: return {"error": "Stream not found"} # Analyze data characteristics optimization = { "recommended_partitioning": [], "estimated_storage_savings": 0.0, "query_performance_improvement": 0.0 } # Time-based partitioning for streaming data if stream.frequency in ["real-time", "hourly"]: optimization["recommended_partitioning"].append({ "type": "time_based", "granularity": "hour", "benefit": "Optimal for real-time queries" }) # Event-based partitioning for event-centric data if stream.data_type in [DataType.TICKETING, DataType.ATTENDANCE, DataType.PERFORMANCE]: optimization["recommended_partitioning"].append({ "type": "event_based", "granularity": "event_id", "benefit": "Efficient event-specific analysis" }) # Calculate estimated benefits optimization["estimated_storage_savings"] = self._calculate_storage_savings(stream) optimization["query_performance_improvement"] = self._calculate_performance_gain(stream) return optimization def _calculate_storage_savings(self, stream: DataStream) -> float: """Calculate estimated storage savings from compression""" base_size = 100.0 # MB if stream.compressed: return base_size * 0.7 # 70% savings return 0.0 def _calculate_performance_gain(self, stream: DataStream) -> float: """Calculate estimated query performance improvement""" base_improvement = 0.0 if stream.frequency == "real-time": base_improvement += 40.0 # 40% faster for real-time if stream.quality == DataQuality.HIGH: base_improvement += 20.0 # 20% faster for high quality if stream.encrypted: base_improvement -= 10.0 # 10% slower for encrypted return max(0.0, base_improvement) def get_storage_summary(self) -> Dict[str, Any]: """Get comprehensive storage summary""" summary = { "metadata": self.metadata, "streams_by_type": defaultdict(int), "partitions_by_strategy": defaultdict(int), "growth_trend": [] } # Categorize streams for stream in self.streams.values(): summary["streams_by_type"][stream.data_type.value] += 1 # Categorize partitions for partition in self.partitions: strategy_type = partition.partition_key.split("_")[0] summary["partitions_by_strategy"][strategy_type] += 1 # Calculate data quality score total_quality = sum( 3 if s.quality == DataQuality.HIGH else 2 if s.quality == DataQuality.MEDIUM else 1 if s.quality == DataQuality.LOW else 0 for s in self.streams.values() ) max_quality = len(self.streams) * 3 self.metadata["data_quality_score"] = (total_quality / max_quality * 100) if max_quality > 0 else 0 return dict(summary) def simulate_data_ingestion(self, hours: int = 24) -> Dict[str, Any]: """Simulate data ingestion for testing""" simulation = { "duration_hours": hours, "ingested_data": {}, "performance_metrics": { "avg_ingestion_rate": 0.0, "peak_rate": 0.0, "total_records": 0 } } hourly_rates = {} for stream_id, stream in self.streams.items(): # Simulate different ingestion rates based on stream type if stream.frequency == "real-time": base_rate = random.random() * 1000 + 500 # 500-1500 records/hour elif stream.frequency == "hourly": base_rate = random.random() * 500 + 100 # 100-600 records/hour else: base_rate = random.random() * 100 + 10 # 10-110 records/hour hourly_rates[stream_id] = base_rate simulation["ingested_data"][stream_id] = base_rate * hours # Calculate performance metrics all_rates = [] for hour in range(hours): hour_total = sum(rate * (0.8 + random.random() * 0.4) for rate in hourly_rates.values()) all_rates.append(hour_total) simulation["performance_metrics"]["avg_ingestion_rate"] = sum(all_rates) / len(all_rates) simulation["performance_metrics"]["peak_rate"] = max(all_rates) simulation["performance_metrics"]["total_records"] = sum(simulation["ingested_data"].values()) return simulation def export_configuration(self) -> Dict[str, Any]: """Export data lake configuration""" config = { "streams": {}, "partitions": [], "metadata": self.metadata, "partition_strategy": self.partition_strategy, "export_timestamp": datetime.datetime.now().isoformat() } for stream_id, stream in self.streams.items(): config["streams"][stream_id] = { "stream_id": stream.stream_id, "name": stream.name, "data_type": stream.data_type.value, "source": stream.source, "frequency": stream.frequency, "quality": stream.quality.value, "retention_days": stream.retention_days, "encrypted": stream.encrypted, "compressed": stream.compressed } for partition in self.partitions: config["partitions"].append({ "partition_key": partition.partition_key, "partition_type": partition.partition_type, "partition_value": partition.partition_value, "size_mb": partition.size_mb, "record_count": partition.record_count, "created_at": partition.created_at.isoformat() }) return config def initialize_sample_streams() -> List[DataStream]: """Initialize sample data streams for demonstration""" streams = [ DataStream( stream_id="ticket_sales_001", name="Ticket Sales Real-time", data_type=DataType.TICKETING, source="ticketing_system", schema={"ticket_id": "string", "price": "float", "timestamp": "datetime"}, frequency="real-time", quality=DataQuality.HIGH, retention_days=1825 # 5 years ), DataStream( stream_id="attendance_001", name="Venue Attendance", data_type=DataType.ATTENDANCE, source="entrance_gates", schema={"gate_id": "string", "timestamp": "datetime", "count": "integer"}, frequency="real-time", quality=DataQuality.HIGH, retention_days=1095 # 3 years ), DataStream( stream_id="social_media_001", name="Social Media Mentions", data_type=DataType.SOCIAL_MEDIA, source="twitter_api", schema={"post_id": "string", "sentiment": "string", "timestamp": "datetime"}, frequency="hourly", quality=DataQuality.MEDIUM, retention_days=365 ), DataStream( stream_id="audio_system_001", name="Audio Performance Metrics", data_type=DataType.AUDIO_VISUAL, source="soundboard_system", schema={"timestamp": "datetime", "db_level": "float", "frequency": "float"}, frequency="real-time", quality=DataQuality.HIGH, retention_days=90 ) ] return streams if __name__ == "__main__": # Initialize data lake data_lake = DataLakeArchitecture() # Register sample streams sample_streams = initialize_sample_streams() for stream in sample_streams: data_lake.register_data_stream(stream) # Create sample partitions data_lake.create_partition("ticket_sales_001", "time_2024_01_15", "2024-01-15", 250.5, 50000) data_lake.create_partition("attendance_001", "time_2024_01_15", "2024-01-15", 120.3, 25000) data_lake.create_partition("social_media_001", "event_concert_123", "concert_123", 85.7, 15000) # Generate analytics print("=== Data Lake Architecture Summary ===") summary = data_lake.get_storage_summary() print(json.dumps(summary, indent=2, default=str)) print("\n=== Partitioning Optimization ===") for stream_id in data_lake.streams.keys(): optimization = data_lake.optimize_partitioning(stream_id) print(f"\nStream: {stream_id}") print(json.dumps(optimization, indent=2)) print("\n=== Ingestion Simulation ===") simulation = data_lake.simulate_data_ingestion(24) print(json.dumps(simulation, indent=2, default=str)) print("\n=== Export Configuration ===") config = data_lake.export_configuration() print(json.dumps(config, indent=2, default=str))