""" Dynamic Pricing Algorithm with Demand Analytics Author: Starlight Agent Version: 1.0 """ import json import math import datetime from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass, asdict from collections import defaultdict @dataclass class DemandMetrics: """Demand metrics for pricing calculation""" current_demand: float historical_average: float demand_trend: float time_to_event: int inventory_remaining: int total_inventory: int competitor_prices: List[float] seasonality_factor: float @dataclass class PricingFactors: """Pricing factors configuration""" base_price: float demand_multiplier: float time_multiplier: float inventory_multiplier: float competitor_multiplier: float seasonality_multiplier: float max_price_increase: float min_price_decrease: float class DynamicPricingEngine: """Dynamic pricing engine with demand analytics""" def __init__(self): self.pricing_history = defaultdict(list) self.demand_patterns = {} self.competitor_data = {} self.pricing_factors = {} def calculate_optimal_price(self, event_id: str, metrics: DemandMetrics, factors: PricingFactors) -> Dict[str, Any]: """Calculate optimal price based on demand analytics""" # Calculate individual pricing components demand_price = self._calculate_demand_pricing(metrics, factors) time_price = self._calculate_time_pricing(metrics, factors) inventory_price = self._calculate_inventory_pricing(metrics, factors) competitor_price = self._calculate_competitor_pricing(metrics, factors) seasonality_price = self._calculate_seasonality_pricing(metrics, factors) # Combine pricing factors with weights weights = { "demand": 0.35, "time": 0.25, "inventory": 0.20, "competitor": 0.10, "seasonality": 0.10 } optimal_price = ( demand_price * weights["demand"] + time_price * weights["time"] + inventory_price * weights["inventory"] + competitor_price * weights["competitor"] + seasonality_price * weights["seasonality"] ) # Apply price constraints max_price = factors.base_price * (1 + factors.max_price_increase) min_price = factors.base_price * (1 - factors.min_price_decrease) optimal_price = max(min_price, min(max_price, optimal_price)) # Calculate confidence score confidence = self._calculate_pricing_confidence(metrics, factors) # Store pricing decision pricing_record = { "timestamp": datetime.datetime.now().isoformat(), "event_id": event_id, "base_price": factors.base_price, "optimal_price": optimal_price, "confidence": confidence, "factors": { "demand": demand_price, "time": time_price, "inventory": inventory_price, "competitor": competitor_price, "seasonality": seasonality_price }, "metrics": asdict(metrics) } self.pricing_history[event_id].append(pricing_record) return { "optimal_price": round(optimal_price, 2), "confidence": confidence, "price_change": round(((optimal_price - factors.base_price) / factors.base_price) * 100, 2), "recommendation": self._generate_pricing_recommendation(optimal_price, factors.base_price, confidence), "factors_breakdown": { "demand": round(demand_price, 2), "time": round(time_price, 2), "inventory": round(inventory_price, 2), "competitor": round(competitor_price, 2), "seasonality": round(seasonality_price, 2) } } def _calculate_demand_pricing(self, metrics: DemandMetrics, factors: PricingFactors) -> float: """Calculate demand-based pricing""" demand_ratio = metrics.current_demand / max(metrics.historical_average, 1) # Apply demand multiplier with elasticity if demand_ratio > 1.5: demand_adjustment = 1 + (factors.demand_multiplier * (demand_ratio - 1)) elif demand_ratio > 1.0: demand_adjustment = 1 + (factors.demand_multiplier * 0.5 * (demand_ratio - 1)) else: demand_adjustment = 1 - (factors.demand_multiplier * 0.3 * (1 - demand_ratio)) # Consider demand trend if metrics.demand_trend > 0.1: demand_adjustment *= 1.1 elif metrics.demand_trend < -0.1: demand_adjustment *= 0.95 return factors.base_price * demand_adjustment def _calculate_time_pricing(self, metrics: DemandMetrics, factors: PricingFactors) -> float: """Calculate time-based pricing""" days_to_event = metrics.time_to_event # Time-based pricing curve if days_to_event <= 1: time_multiplier = 1 + (factors.time_multiplier * 0.5) elif days_to_event <= 7: time_multiplier = 1 + (factors.time_multiplier * 0.3) elif days_to_event <= 30: time_multiplier = 1 + (factors.time_multiplier * 0.1) elif days_to_event <= 90: time_multiplier = 1.0 else: time_multiplier = 1 - (factors.time_multiplier * 0.1) return factors.base_price * time_multiplier def _calculate_inventory_pricing(self, metrics: DemandMetrics, factors: PricingFactors) -> float: """Calculate inventory-based pricing""" inventory_ratio = metrics.inventory_remaining / max(metrics.total_inventory, 1) # Scarcity pricing if inventory_ratio <= 0.1: inventory_multiplier = 1 + (factors.inventory_multiplier * 0.8) elif inventory_ratio <= 0.25: inventory_multiplier = 1 + (factors.inventory_multiplier * 0.4) elif inventory_ratio <= 0.5: inventory_multiplier = 1 + (factors.inventory_multiplier * 0.2) else: inventory_multiplier = 1 - (factors.inventory_multiplier * 0.1) return factors.base_price * inventory_multiplier def _calculate_competitor_pricing(self, metrics: DemandMetrics, factors: PricingFactors) -> float: """Calculate competitor-based pricing""" if not metrics.competitor_prices: return factors.base_price avg_competitor_price = sum(metrics.competitor_prices) / len(metrics.competitor_prices) # Position relative to competitors if factors.base_price < avg_competitor_price * 0.9: # We're significantly cheaper - can increase price competitor_multiplier = 1 + (factors.competitor_multiplier * 0.2) elif factors.base_price > avg_competitor_price * 1.1: # We're significantly more expensive - should decrease price competitor_multiplier = 1 - (factors.competitor_multiplier * 0.15) else: # Competitive pricing competitor_multiplier = 1.0 return factors.base_price * competitor_multiplier def _calculate_seasonality_pricing(self, metrics: DemandMetrics, factors: PricingFactors) -> float: """Calculate seasonality-based pricing""" seasonality_multiplier = 1 + (factors.seasonality_multiplier * metrics.seasonality_factor) return factors.base_price * seasonality_multiplier def _calculate_pricing_confidence(self, metrics: DemandMetrics, factors: PricingFactors) -> float: """Calculate confidence score for pricing decision""" confidence_factors = [] # Data availability confidence if metrics.historical_average > 0: confidence_factors.append(0.9) else: confidence_factors.append(0.5) # Competitor data confidence if metrics.competitor_prices: confidence_factors.append(0.8) else: confidence_factors.append(0.6) # Inventory confidence if metrics.total_inventory > 0: confidence_factors.append(0.9) else: confidence_factors.append(0.4) # Time confidence if metrics.time_to_event > 0: confidence_factors.append(0.8) else: confidence_factors.append(0.5) return sum(confidence_factors) / len(confidence_factors) def _generate_pricing_recommendation(self, optimal_price: float, base_price: float, confidence: float) -> str: """Generate pricing recommendation""" price_change = ((optimal_price - base_price) / base_price) * 100 if confidence < 0.6: return "LOW_CONFIDENCE: Collect more data before adjusting prices" if price_change > 20: return "SIGNIFICANT_INCREASE: High demand detected, consider gradual price increase" elif price_change > 10: return "MODERATE_INCREASE: Demand is strong, price adjustment recommended" elif price_change > -10: return "STABLE_PRICING: Current price is optimal" elif price_change > -20: return "MODERATE_DECREASE: Consider price reduction to stimulate demand" else: return "SIGNIFICANT_DECREASE: Low demand detected, aggressive pricing recommended" def analyze_demand_patterns(self, event_id: str, historical_data: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze demand patterns for better pricing""" if not historical_data: return {"patterns": [], "insights": []} # Extract demand trends demand_over_time = [] for data_point in historical_data: demand_over_time.append({ "timestamp": data_point.get("timestamp"), "demand": data_point.get("demand", 0), "price": data_point.get("price", 0) }) # Calculate demand elasticity elasticity = self._calculate_demand_elasticity(demand_over_time) # Identify peak demand periods peak_periods = self._identify_peak_demand_periods(demand_over_time) # Generate insights insights = [] if elasticity > 1.5: insights.append("High demand elasticity - price changes significantly impact demand") elif elasticity < 0.5: insights.append("Low demand elasticity - demand is price insensitive") if peak_periods: insights.append(f"Peak demand periods identified: {peak_periods}") return { "demand_elasticity": elasticity, "peak_periods": peak_periods, "insights": insights, "recommendations": self._generate_demand_insights(elasticity, peak_periods) } def _calculate_demand_elasticity(self, demand_data: List[Dict[str, Any]]) -> float: """Calculate price elasticity of demand""" if len(demand_data) < 2: return 1.0 # Calculate percentage changes elasticity_values = [] for i in range(1, len(demand_data)): prev_data = demand_data[i-1] curr_data = demand_data[i] if prev_data["price"] > 0 and prev_data["demand"] > 0: price_change = (curr_data["price"] - prev_data["price"]) / prev_data["price"] demand_change = (curr_data["demand"] - prev_data["demand"]) / prev_data["demand"] if price_change != 0: elasticity = abs(demand_change / price_change) elasticity_values.append(elasticity) return sum(elasticity_values) / len(elasticity_values) if elasticity_values else 1.0 def _identify_peak_demand_periods(self, demand_data: List[Dict[str, Any]]) -> List[str]: """Identify peak demand periods""" if not demand_data: return [] # Calculate average demand avg_demand = sum(d["demand"] for d in demand_data) / len(demand_data) # Find peak periods peak_periods = [] for data_point in demand_data: if data_point["demand"] > avg_demand * 1.5: timestamp = data_point.get("timestamp", "") peak_periods.append(timestamp) return peak_periods[:5] # Return top 5 peak periods def _generate_demand_insights(self, elasticity: float, peak_periods: List[str]) -> List[str]: """Generate demand-based insights""" insights = [] if elasticity > 2.0: insights.append("Implement dynamic pricing with frequent adjustments") elif elasticity < 0.5: insights.append("Focus on value-added services rather than price competition") if len(peak_periods) > 3: insights.append("Consider time-based pricing tiers for peak periods") return insights def test_dynamic_pricing(): """Test dynamic pricing engine""" engine = DynamicPricingEngine() # Create test metrics metrics = DemandMetrics( current_demand=150, historical_average=100, demand_trend=0.15, time_to_event=14, inventory_remaining=50, total_inventory=200, competitor_prices=[45.0, 55.0, 50.0], seasonality_factor=0.1 ) factors = PricingFactors( base_price=50.0, demand_multiplier=0.3, time_multiplier=0.2, inventory_multiplier=0.25, competitor_multiplier=0.15, seasonality_multiplier=0.1, max_price_increase=0.5, min_price_decrease=0.3 ) # Calculate optimal price result = engine.calculate_optimal_price("event123", metrics, factors) # Verify results assert result["optimal_price"] > 0 assert 0 <= result["confidence"] <= 1 assert "recommendation" in result print("✅ Dynamic pricing engine working correctly") return result if __name__ == "__main__": result = test_dynamic_pricing() print(f"Optimal price calculated: ${result['optimal_price']} with {result['confidence']:.2%} confidence")