""" Performance Monitoring & Caching Service System health, metrics, and intelligent caching """ from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Dict, List, Optional, Any import json import datetime import hashlib import time from enum import Enum from dataclasses import dataclass import collections app = FastAPI(title="Performance Service", version="1.0.0") class MetricType(Enum): COUNTER = "counter" GAUGE = "gauge" HISTOGRAM = "histogram" TIMER = "timer" class AlertLevel(Enum): INFO = "info" WARNING = "warning" CRITICAL = "critical" @dataclass class Metric: name: str metric_type: MetricType value: float timestamp: datetime.datetime labels: Dict[str, str] = None @dataclass class Alert: level: AlertLevel message: str service: str timestamp: datetime.datetime resolved: bool = False class CacheManager: def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600): self.cache = {} self.access_times = {} self.max_size = max_size self.ttl_seconds = ttl_seconds self.hits = 0 self.misses = 0 def get(self, key: str) -> Optional[Any]: current_time = time.time() if key in self.cache: entry_time, value = self.cache[key] if current_time - entry_time < self.ttl_seconds: self.access_times[key] = current_time self.hits += 1 return value else: del self.cache[key] if key in self.access_times: del self.access_times[key] self.misses += 1 return None def set(self, key: str, value: Any) -> None: current_time = time.time() if len(self.cache) >= self.max_size: self._evict_lru() self.cache[key] = (current_time, value) self.access_times[key] = current_time def _evict_lru(self) -> None: if not self.access_times: return lru_key = min(self.access_times.keys(), key=lambda k: self.access_times[k]) del self.cache[lru_key] del self.access_times[lru_key] def clear(self) -> None: self.cache.clear() self.access_times.clear() def get_stats(self) -> Dict[str, Any]: total_requests = self.hits + self.misses hit_rate = self.hits / total_requests if total_requests > 0 else 0 return { "cache_size": len(self.cache), "max_size": self.max_size, "hits": self.hits, "misses": self.misses, "hit_rate": hit_rate, "ttl_seconds": self.ttl_seconds } class PerformanceMonitor: def __init__(self): self.metrics = collections.defaultdict(list) self.alerts = [] self.service_health = {} self.cache = CacheManager() self.start_time = datetime.datetime.now() def record_metric(self, name: str, value: float, metric_type: MetricType, labels: Dict[str, str] = None) -> None: metric = Metric( name=name, metric_type=metric_type, value=value, timestamp=datetime.datetime.now(), labels=labels or {} ) self.metrics[name].append(metric) if len(self.metrics[name]) > 1000: self.metrics[name] = self.metrics[name][-500:] self._check_thresholds(metric) def _check_thresholds(self, metric: Metric) -> None: thresholds = { "response_time": {"warning": 1000, "critical": 5000}, "error_rate": {"warning": 0.05, "critical": 0.10}, "cpu_usage": {"warning": 0.80, "critical": 0.95}, "memory_usage": {"warning": 0.85, "critical": 0.95} } if metric.name in thresholds: threshold = thresholds[metric.name] if metric.value >= threshold["critical"]: self._create_alert(AlertLevel.CRITICAL, f"{metric.name} is critical: {metric.value}", metric.name) elif metric.value >= threshold["warning"]: self._create_alert(AlertLevel.WARNING, f"{metric.name} is elevated: {metric.value}", metric.name) def _create_alert(self, level: AlertLevel, message: str, service: str) -> None: alert = Alert( level=level, message=message, service=service, timestamp=datetime.datetime.now() ) self.alerts.append(alert) if len(self.alerts) > 100: self.alerts = self.alerts[-50:] def get_service_health(self, service_name: str) -> Dict[str, Any]: health_key = f"health_{service_name}" cached_health = self.cache.get(health_key) if cached_health: return cached_health recent_metrics = [] for metric_name, metric_list in self.metrics.items(): if service_name in metric_name: recent_metrics.extend([m for m in metric_list if (datetime.datetime.now() - m.timestamp).seconds < 300]) if not recent_metrics: health_status = { "service": service_name, "status": "unknown", "last_check": datetime.datetime.now().isoformat(), "metrics": {} } else: avg_response_time = sum(m.value for m in recent_metrics if "response_time" in m.name) / max(1, len([m for m in recent_metrics if "response_time" in m.name])) error_count = len([m for m in recent_metrics if "error" in m.name]) status = "healthy" if avg_response_time > 5000 or error_count > 10: status = "unhealthy" elif avg_response_time > 1000 or error_count > 5: status = "degraded" health_status = { "service": service_name, "status": status, "last_check": datetime.datetime.now().isoformat(), "metrics": { "avg_response_time": avg_response_time, "error_count": error_count, "total_requests": len(recent_metrics) } } self.cache.set(health_key, health_status) return health_status def get_system_metrics(self) -> Dict[str, Any]: uptime = datetime.datetime.now() - self.start_time return { "uptime_seconds": uptime.total_seconds(), "total_metrics": sum(len(metric_list) for metric_list in self.metrics.values()), "active_alerts": len([a for a in self.alerts if not a.resolved]), "cache_stats": self.cache.get_stats(), "services": list(set(service.split("_")[0] for service in self.metrics.keys())) } monitor = PerformanceMonitor() class MetricRequest(BaseModel): name: str value: float metric_type: str labels: Optional[Dict[str, str]] = None class HealthResponse(BaseModel): status: str timestamp: str metrics: Dict[str, Any] uptime: float @app.get("/health") async def health(): return { "status": "healthy", "service": "performance-monitoring", "timestamp": datetime.datetime.now().isoformat(), "cache_stats": monitor.cache.get_stats() } @app.post("/metrics") async def record_metric(request: MetricRequest): try: metric_type = MetricType(request.metric_type) monitor.record_metric( name=request.name, value=request.value, metric_type=metric_type, labels=request.labels ) return {"status": "recorded", "timestamp": datetime.datetime.now().isoformat()} except ValueError as e: raise HTTPException(status_code=400, detail=f"Invalid metric type: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to record metric: {str(e)}") @app.get("/metrics/{service_name}") async def get_service_metrics(service_name: str): try: service_metrics = {} for metric_name, metric_list in monitor.metrics.items(): if service_name in metric_name: recent_metrics = [m for m in metric_list if (datetime.datetime.now() - m.timestamp).seconds < 3600] if recent_metrics: values = [m.value for m in recent_metrics] service_metrics[metric_name] = { "count": len(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values), "latest": values[-1] } return { "service": service_name, "metrics": service_metrics, "health": monitor.get_service_health(service_name), "timestamp": datetime.datetime.now().isoformat() } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get metrics: {str(e)}") @app.get("/health/{service_name}") async def get_service_health(service_name: str): try: health = monitor.get_service_health(service_name) return health except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get health: {str(e)}") @app.get("/alerts") async def get_alerts(level: Optional[str] = None, service: Optional[str] = None): try: filtered_alerts = monitor.alerts if level: filtered_alerts = [a for a in filtered_alerts if a.level.value == level] if service: filtered_alerts = [a for a in filtered_alerts if a.service == service] return { "alerts": [ { "level": alert.level.value, "message": alert.message, "service": alert.service, "timestamp": alert.timestamp.isoformat(), "resolved": alert.resolved } for alert in filtered_alerts ], "total": len(filtered_alerts), "active": len([a for a in filtered_alerts if not a.resolved]) } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get alerts: {str(e)}") @app.get("/system") async def get_system_metrics(): try: return monitor.get_system_metrics() except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get system metrics: {str(e)}") @app.post("/cache/clear") async def clear_cache(): try: monitor.cache.clear() return {"status": "cleared", "timestamp": datetime.datetime.now().isoformat()} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}") @app.get("/cache/stats") async def get_cache_stats(): try: return monitor.cache.get_stats() except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get cache stats: {str(e)}") # Middleware for automatic metrics collection @app.middleware("http") async def metrics_middleware(request, call_next): start_time = time.time() response = await call_next(request) end_time = time.time() response_time = (end_time - start_time) * 1000 monitor.record_metric( name="response_time", value=response_time, metric_type=MetricType.TIMER, labels={"endpoint": request.url.path, "method": request.method} ) if response.status_code >= 400: monitor.record_metric( name="error_count", value=1, metric_type=MetricType.COUNTER, labels={"status_code": str(response.status_code)} ) return response if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8006)