""" Business Logic Flaw and Race Condition Testing Framework Author: Security Testing Framework Version: 1.0 """ import json import math import hashlib import datetime import re import itertools from typing import Dict, List, Optional, Any, Union class BusinessLogicTester: """Test framework for business logic flaws and race conditions""" def __init__(self): self.vulnerabilities = [] self.test_results = [] def analyze_race_conditions(self, endpoints: List[str]) -> Dict[str, Any]: """Analyze potential race conditions in API endpoints""" race_conditions = [] for endpoint in endpoints: # Test for concurrent request vulnerabilities patterns = [ r'/transfer', r'/purchase', r'/withdraw', r'/bid', r'/vote', r'/update' ] for pattern in patterns: if re.search(pattern, endpoint, re.IGNORECASE): race_conditions.append({ 'endpoint': endpoint, 'vulnerability': 'Race Condition', 'severity': 'High', 'description': f'Endpoint {endpoint} may be vulnerable to race conditions', 'recommendation': 'Implement atomic operations and proper locking', 'timestamp': datetime.datetime.now().isoformat() }) return { 'race_conditions_found': len(race_conditions), 'vulnerabilities': race_conditions } def test_business_logic_flaws(self, scenarios: List[Dict]) -> Dict[str, Any]: """Test for business logic flaws in various scenarios""" flaws_found = [] for scenario in scenarios: # Price manipulation tests if 'price' in scenario.get('params', {}): if self._test_price_manipulation(scenario): flaws_found.append({ 'type': 'Price Manipulation', 'scenario': scenario, 'severity': 'Critical', 'description': 'Price can be manipulated through API calls' }) # Authorization bypass tests if 'role' in scenario.get('params', {}): if self._test_authorization_bypass(scenario): flaws_found.append({ 'type': 'Authorization Bypass', 'scenario': scenario, 'severity': 'High', 'description': 'Role-based access control can be bypassed' }) # Resource exhaustion tests if 'resource' in scenario: if self._test_resource_exhaustion(scenario): flaws_found.append({ 'type': 'Resource Exhaustion', 'scenario': scenario, 'severity': 'Medium', 'description': 'Unlimited resource consumption possible' }) return { 'flaws_found': len(flaws_found), 'vulnerabilities': flaws_found } def _test_price_manipulation(self, scenario: Dict) -> bool: """Test if price can be manipulated""" params = scenario.get('params', {}) price = params.get('price', 0) # Check for negative prices or unrealistic values if price < 0 or price > 999999: return True # Check for price manipulation through concurrent requests if 'concurrent' in scenario and scenario['concurrent']: return True return False def _test_authorization_bypass(self, scenario: Dict) -> bool: """Test if authorization can be bypassed""" params = scenario.get('params', {}) role = params.get('role', '') # Check for admin role escalation if role.lower() in ['admin', 'administrator', 'root']: return True # Check for privilege escalation through parameters if 'privilege' in params and params['privilege'] > 5: return True return False def _test_resource_exhaustion(self, scenario: Dict) -> bool: """Test for resource exhaustion vulnerabilities""" resource = scenario.get('resource', '') # Check for unlimited requests if 'limit' not in scenario or scenario.get('limit', 1000) > 100: return True # Check for large file uploads if 'file_size' in scenario and scenario['file_size'] > 100 * 1024 * 1024: # 100MB return True return False def generate_test_report(self) -> Dict[str, Any]: """Generate comprehensive test report""" return { 'test_summary': { 'total_tests': len(self.test_results), 'vulnerabilities_found': len(self.vulnerabilities), 'test_completion_time': datetime.datetime.now().isoformat() }, 'vulnerabilities': self.vulnerabilities, 'test_results': self.test_results } # Test scenarios for business logic flaws TEST_SCENARIOS = [ { 'name': 'Price Manipulation Test', 'endpoint': '/api/purchase', 'params': {'price': -100, 'item_id': '123'}, 'method': 'POST' }, { 'name': 'Role Escalation Test', 'endpoint': '/api/admin/users', 'params': {'role': 'admin', 'user_id': '456'}, 'method': 'PUT' }, { 'name': 'Concurrent Transfer Test', 'endpoint': '/api/transfer', 'params': {'amount': 1000, 'to_account': '789'}, 'concurrent': True, 'method': 'POST' }, { 'name': 'Resource Exhaustion Test', 'endpoint': '/api/upload', 'resource': 'file_upload', 'file_size': 500 * 1024 * 1024, # 500MB 'limit': 10000, 'method': 'POST' } ] # Sample API endpoints for testing API_ENDPOINTS = [ '/api/transfer/funds', '/api/purchase/order', '/api/withdraw/balance', '/api/auction/bid', '/api/voting/submit', '/api/user/update/profile' ] if __name__ == "__main__": # Run business logic tests tester = BusinessLogicTester() # Test race conditions race_results = tester.analyze_race_conditions(API_ENDPOINTS) tester.vulnerabilities.extend(race_results['vulnerabilities']) # Test business logic flaws logic_results = tester.test_business_logic_flaws(TEST_SCENARIOS) tester.vulnerabilities.extend(logic_results['vulnerabilities']) # Generate report report = tester.generate_test_report() print("Business Logic Testing Complete") print(f"Vulnerabilities Found: {len(report['vulnerabilities'])}") print(json.dumps(report, indent=2))