#!/usr/bin/env python3 """ Business Logic Vulnerability Scanner Tests for race conditions, logical flaws, and improper validation Author: Security Engineer Purpose: Identify business logic vulnerabilities through simulation """ import json import math import hashlib import datetime import re import base64 import itertools import collections import html import urllib.parse from typing import Dict, List, Optional, Any, Union class BusinessLogicVulnerabilityScanner: """Business logic vulnerability assessment tool""" def __init__(self): self.vulnerabilities = [] self.test_results = {} self.assessment_id = hashlib.sha256( datetime.datetime.now().isoformat().encode() ).hexdigest()[:16] def test_race_conditions(self, api_config: Dict) -> Dict: """Test for race condition vulnerabilities in concurrent operations""" results = { "race_condition_scenarios": [], "vulnerabilities_found": [], "race_vulnerable_endpoints": [] } # Test concurrent transactions transaction_scenarios = [ { "name": "Concurrent Fund Transfer", "scenario": "Two users transfer funds simultaneously from same account", "vulnerability": "double_spending", "test_payload": self._generate_concurrent_transfer_payload() }, { "name": "Concurrent Inventory Update", "scenario": "Multiple users purchase last item simultaneously", "vulnerability": "overselling", "test_payload": self._generate_concurrent_inventory_payload() }, { "name": "Concurrent Bid Placement", "scenario": "Multiple bids placed at same timestamp", "vulnerability": "bid_manipulation", "test_payload": self._generate_concurrent_bid_payload() }, { "name": "Concurrent Registration", "scenario": "Multiple users register with same username", "vulnerability": "account_takeover", "test_payload": self._generate_concurrent_registration_payload() } ] for scenario in transaction_scenarios: # Simulate race condition testing race_test_result = self._simulate_race_condition_test( scenario, api_config.get("concurrency_protection", {}) ) results["race_condition_scenarios"].append({ "scenario": scenario["name"], "description": scenario["scenario"], "vulnerability_type": scenario["vulnerability"], "test_result": race_test_result, "exploitable": race_test_result.get("vulnerable", False) }) if race_test_result.get("vulnerable", False): results["vulnerabilities_found"].append({ "type": "race_condition", "scenario": scenario["name"], "vulnerability": scenario["vulnerability"], "severity": "high", "description": f"Race condition in {scenario['name']}: {scenario['scenario']}" }) if "endpoint" in race_test_result: results["race_vulnerable_endpoints"].append(race_test_result["endpoint"]) return results def test_logic_flaws(self, business_rules: Dict) -> Dict: """Test for business logic flaws""" results = { "logic_flaw_tests": [], "vulnerabilities_found": [], "bypassed_rules": [] } # Test price manipulation price_flaws = self._test_price_manipulation(business_rules.get("pricing", {})) results["logic_flaw_tests"].append(price_flaws) # Test authorization bypass auth_flaws = self._test_authorization_logic_flaws(business_rules.get("authorization", {})) results["logic_flaw_tests"].append(auth_flaws) # Test workflow bypass workflow_flaws = self._test_workflow_bypass(business_rules.get("workflows", {})) results["logic_flaw_tests"].append(workflow_flaws) # Test validation bypass validation_flaws = self._test_validation_logic(business_rules.get("validation", {})) results["logic_flaw_tests"].append(validation_flaws) # Collect vulnerabilities for test in results["logic_flaw_tests"]: if test.get("vulnerable", False): results["vulnerabilities_found"].append({ "type": "business_logic_flaw", "category": test.get("category", "unknown"), "severity": test.get("severity", "medium"), "description": test.get("description", "Logic flaw detected"), "bypass_method": test.get("bypass_method", "unknown") }) if test.get("bypassed_rule"): results["bypassed_rules"].append(test["bypassed_rule"]) return results def test_improper_validation(self, validation_config: Dict) -> Dict: """Test for improper input validation""" results = { "validation_tests": [], "bypassed_validations": [], "vulnerable_parameters": [] } # Test SQL injection bypasses sql_bypasses = self._test_sql_validation_bypasses(validation_config.get("sql", {})) results["validation_tests"].append(sql_bypasses) # Test XSS bypasses xss_bypasses = self._test_xss_validation_bypasses(validation_config.get("xss", {})) results["validation_tests"].append(xss_bypasses) # Test file upload validation file_bypasses = self._test_file_upload_validation(validation_config.get("file_upload", {})) results["validation_tests"].append(file_bypasses) # Test API parameter validation api_bypasses = self._test_api_parameter_validation(validation_config.get("api_parameters", {})) results["validation_tests"].append(api_bypasses) # Collect vulnerabilities for test in results["validation_tests"]: if test.get("bypass_successful", False): results["bypassed_validations"].append({ "validation_type": test.get("type", "unknown"), "bypass_payload": test.get("payload", ""), "severity": test.get("severity", "medium") }) if test.get("parameter"): results["vulnerable_parameters"].append(test["parameter"]) return results def test_state_manipulation(self, state_config: Dict) -> Dict: """Test for application state manipulation vulnerabilities""" results = { "state_tests": [], "manipulation_vectors": [], "vulnerable_states": [] } # Test shopping cart manipulation cart_tests = self._test_shopping_cart_manipulation(state_config.get("shopping_cart", {})) results["state_tests"].append(cart_tests) # Test user profile manipulation profile_tests = self._test_profile_manipulation(state_config.get("user_profile", {})) results["state_tests"].append(profile_tests) # Test session state manipulation session_tests = self._test_session_state_manipulation(state_config.get("session", {})) results["state_tests"].append(session_tests) # Test order state manipulation order_tests = self._test_order_state_manipulation(state_config.get("orders", {})) results["state_tests"].append(order_tests) # Collect manipulation vectors for test in results["state_tests"]: if test.get("manipulable", False): results["manipulation_vectors"].append({ "state_type": test.get("state_type", "unknown"), "manipulation_method": test.get("method", "unknown"), "impact": test.get("impact", "medium") }) if test.get("vulnerable_state"): results["vulnerable_states"].append(test["vulnerable_state"]) return results def _simulate_race_condition_test(self, scenario: Dict, protection: Dict) -> Dict: """Simulate race condition vulnerability test""" # Check if proper concurrency controls exist has_pessimistic_locking = protection.get("pessimistic_locking", False) has_optimistic_locking = protection.get("optimistic_locking", False) has_transaction_isolation = protection.get("transaction_isolation", "READ_COMMITTED") vulnerable = True # Assume vulnerable by default if has_pessimistic_locking: vulnerable = False elif has_optimistic_locking and protection.get("version_checking", True): vulnerable = False elif has_transaction_isolation == "SERIALIZABLE": vulnerable = False return { "vulnerable": vulnerable, "endpoint": f"/api/{scenario['vulnerability']}", "protection_level": protection.get("level", "none"), "mitigation_required": vulnerable, "attack_vector": "concurrent_requests" } def _test_price_manipulation(self, pricing_config: Dict) -> Dict: """Test for price manipulation vulnerabilities""" client_side_validation = pricing_config.get("client_side_validation", True) server_side_validation = pricing_config.get("server_side_validation", True) price_encryption = pricing_config.get("encrypt_price_data", False) vulnerable = False bypass_method = "" # Test if price can be modified client-side if client_side_validation and not server_side_validation: vulnerable = True bypass_method = "client_side_price_modification" # Test if price data is encrypted if not price_encryption and not server_side_validation: vulnerable = True bypass_method = "unencrypted_price_interception" return { "category": "price_manipulation", "vulnerable": vulnerable, "severity": "high" if vulnerable else "none", "description": "Price can be manipulated during checkout", "bypass_method": bypass_method } def _test_authorization_logic_flaws(self, auth_config: Dict) -> Dict: """Test authorization logic flaws""" role_hierarchy = auth_config.get("role_hierarchy", {}) implicit_permissions = auth_config.get("implicit_permissions", False) ownership_validation = auth_config.get("ownership_validation", True) vulnerable = False bypass_method = "" # Test for implicit permission escalation if implicit_permissions and not role_hierarchy: vulnerable = True bypass_method = "implicit_permission_escalation" # Test for ownership validation bypass if not ownership_validation: vulnerable = True bypass_method = "ownership_bypass" return { "category": "authorization_logic", "vulnerable": vulnerable, "severity": "critical" if vulnerable else "none", "description": "Authorization logic allows privilege escalation", "bypass_method": bypass_method } def _test_workflow_bypass(self, workflow_config: Dict) -> Dict: """Test workflow bypass vulnerabilities""" step_validation = workflow_config.get("step_validation", True) state_verification = workflow_config.get("state_verification", True) temporal_constraints = workflow_config.get("temporal_constraints", False) vulnerable = False bypass_method = "" if not step_validation: vulnerable = True bypass_method = "step_skipping" if not state_verification: vulnerable = True bypass_method = "state_manipulation" if not temporal_constraints: vulnerable = True bypass_method = "temporal_bypass" return { "category": "workflow_bypass", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "Workflow steps can be bypassed", "bypass_method": bypass_method, "bypassed_rule": "workflow_sequence_validation" } def _test_validation_logic(self, validation_config: Dict) -> Dict: """Test validation logic flaws""" whitelist_approach = validation_config.get("whitelist_approach", False) blacklist_approach = validation_config.get("blacklist_approach", True) encoding_validation = validation_config.get("encoding_validation", True) vulnerable = False bypass_method = "" if blacklist_approach and not whitelist_approach: vulnerable = True bypass_method = "blacklist_bypass" if not encoding_validation: vulnerable = True bypass_method = "encoding_bypass" return { "category": "validation_logic", "vulnerable": vulnerable, "severity": "medium" if vulnerable else "none", "description": "Input validation can be bypassed", "bypass_method": bypass_method } def _test_sql_validation_bypasses(self, sql_config: Dict) -> Dict: """Test SQL injection validation bypasses""" bypass_payloads = [ "' OR 1=1--", "1' UNION SELECT * FROM users--", "'; DROP TABLE users--", "1' OR (SELECT COUNT(*) FROM users) > 0--" ] parameterized_queries = sql_config.get("parameterized_queries", False) input_sanitization = sql_config.get("input_sanitization", True) vulnerable = not parameterized_queries or not input_sanitization return { "type": "sql_injection", "bypass_successful": vulnerable, "payload": bypass_payloads[0] if vulnerable else "", "parameter": "user_id", "severity": "critical" if vulnerable else "none" } def _test_xss_validation_bypasses(self, xss_config: Dict) -> Dict: """Test XSS validation bypasses""" bypass_payloads = [ "", "", "javascript:alert('XSS')", "" ] output_encoding = xss_config.get("output_encoding", True) input_filtering = xss_config.get("input_filtering", True) csp_enabled = xss_config.get("csp_enabled", False) vulnerable = not output_encoding or not input_filtering return { "type": "xss", "bypass_successful": vulnerable, "payload": bypass_payloads[1] if vulnerable else "", "parameter": "comment", "severity": "high" if vulnerable else "none" } def _test_file_upload_validation(self, file_config: Dict) -> Dict: """Test file upload validation bypasses""" extension_check = file_config.get("extension_check", True) mime_type_check = file_config.get("mime_type_check", True) content_validation = file_config.get("content_validation", False) vulnerable = not content_validation return { "type": "file_upload", "bypass_successful": vulnerable, "payload": "shell.php.jpeg", "parameter": "upload_file", "severity": "high" if vulnerable else "none" } def _test_api_parameter_validation(self, api_config: Dict) -> Dict: """Test API parameter validation""" type_validation = api_config.get("type_validation", True) range_validation = api_config.get("range_validation", True) format_validation = api_config.get("format_validation", True) vulnerable = not (type_validation and range_validation and format_validation) return { "type": "api_parameter", "bypass_successful": vulnerable, "payload": '{"amount": -1000}', "parameter": "payment_amount", "severity": "medium" if vulnerable else "none" } def _test_shopping_cart_manipulation(self, cart_config: Dict) -> Dict: """Test shopping cart state manipulation""" server_side_validation = cart_config.get("server_side_validation", True) price_recalculation = cart_config.get("price_recalculation", True) tamper_protection = cart_config.get("tamper_protection", False) manipulable = not (server_side_validation and price_recalculation and tamper_protection) return { "state_type": "shopping_cart", "manipulable": manipulable, "method": "price_modification" if not price_recalculation else "cart_tampering", "impact": "high" if manipulable else "none", "vulnerable_state": "cart_total" } def _test_profile_manipulation(self, profile_config: Dict) -> Dict: """Test user profile manipulation""" readonly_fields = profile_config.get("readonly_fields", []) field_validation = profile_config.get("field_validation", True) privilege_check = profile_config.get("privilege_check", True) manipulable = not (len(readonly_fields) > 0 and field_validation and privilege_check) return { "state_type": "user_profile", "manipulable": manipulable, "method": "field_modification" if not field_validation else "privilege_escalation", "impact": "medium" if manipulable else "none", "vulnerable_state": "user_role" } def _test_session_state_manipulation(self, session_config: Dict) -> Dict: """Test session state manipulation""" server_side_storage = session_config.get("server_side_storage", True) session_encryption = session_config.get("session_encryption", True) integrity_check = session_config.get("integrity_check", True) manipulable = not (server_side_storage and session_encryption and integrity_check) return { "state_type": "session", "manipulable": manipulable, "method": "session_hijacking" if not server_side_storage else "session_tampering", "impact": "critical" if manipulable else "none", "vulnerable_state": "user_session" } def _test_order_state_manipulation(self, order_config: Dict) -> Dict: """Test order state manipulation""" state_machine = order_config.get("state_machine", False) transition_validation = order_config.get("transition_validation", True) audit_logging = order_config.get("audit_logging", False) manipulable = not (state_machine and transition_validation and audit_logging) return { "state_type": "order", "manipulable": manipulable, "method": "state_transition_bypass" if not transition_validation else "order_tampering", "impact": "high" if manipulable else "none", "vulnerable_state": "order_status" } def _generate_concurrent_transfer_payload(self) -> Dict: """Generate payload for concurrent transfer test""" return { "from_account": "12345", "to_account": "67890", "amount": 100.00, "concurrent_requests": 2 } def _generate_concurrent_inventory_payload(self) -> Dict: """Generate payload for concurrent inventory test""" return { "product_id": "PROD_001", "quantity": 1, "concurrent_purchases": 3, "stock_level": 1 } def _generate_concurrent_bid_payload(self) -> Dict: """Generate payload for concurrent bid test""" return { "auction_id": "AUCTION_001", "bid_amount": 1000.00, "timestamp": datetime.datetime.now().isoformat(), "concurrent_bids": 2 } def _generate_concurrent_registration_payload(self) -> Dict: """Generate payload for concurrent registration test""" return { "username": "newuser", "email": "user@example.com", "password": "password123", "concurrent_registrations": 2 } def generate_business_logic_report(self, test_results: Dict) -> Dict: """Generate comprehensive business logic security report""" # Count total vulnerabilities total_vulnerabilities = 0 critical_count = 0 high_count = 0 for test_category, results in test_results.items(): if "vulnerabilities_found" in results: total_vulnerabilities += len(results["vulnerabilities_found"]) for vuln in results["vulnerabilities_found"]: if vuln.get("severity") == "critical": critical_count += 1 elif vuln.get("severity") == "high": high_count += 1 # Calculate overall risk risk_level = "critical" if critical_count > 0 else "high" if high_count > 2 else "medium" if high_count > 0 else "low" report = { "assessment_id": self.assessment_id, "assessment_date": datetime.datetime.now().isoformat(), "assessment_type": "business_logic_security", "overall_risk_level": risk_level, "vulnerability_summary": { "total_vulnerabilities": total_vulnerabilities, "critical_vulnerabilities": critical_count, "high_vulnerabilities": high_count, "vulnerability_types": self._get_vulnerability_types(test_results) }, "detailed_results": test_results, "remediation_priorities": self._generate_remediation_priorities(test_results), "business_impact": self._assess_business_impact(test_results) } return report def _get_vulnerability_types(self, test_results: Dict) -> List[str]: """Get unique vulnerability types from test results""" types = set() for test_category, results in test_results.items(): if "vulnerabilities_found" in results: for vuln in results["vulnerabilities_found"]: types.add(vuln.get("type", "unknown")) return list(types) def _generate_remediation_priorities(self, test_results: Dict) -> List[Dict]: """Generate prioritized remediation recommendations""" priorities = [] for test_category, results in test_results.items(): if "vulnerabilities_found" in results: for vuln in results["vulnerabilities_found"]: priorities.append({ "category": test_category, "vulnerability_type": vuln.get("type", "unknown"), "severity": vuln.get("severity", "medium"), "description": vuln.get("description", ""), "remediation": self._get_remediation_for_type(vuln.get("type", "unknown")) }) # Sort by severity severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} priorities.sort(key=lambda x: severity_order.get(x["severity"], 3)) return priorities[:10] # Top 10 priorities def _get_remediation_for_type(self, vuln_type: str) -> str: """Get remediation recommendation for vulnerability type""" remediation_map = { "race_condition": "Implement proper concurrency controls with optimistic/pessimistic locking", "business_logic_flaw": "Implement server-side validation and business rule enforcement", "price_manipulation": "Encrypt price data and validate all pricing server-side", "authorization_logic": "Implement proper role-based access control with ownership validation", "workflow_bypass": "Enforce state machine validation for all workflow transitions", "validation_logic": "Use whitelist approach with comprehensive input validation", "state_manipulation": "Implement server-side state management with tamper protection" } return remediation_map.get(vuln_type, "Review and implement appropriate security controls") def _assess_business_impact(self, test_results: Dict) -> Dict: """Assess business impact of vulnerabilities""" total_vulns = 0 critical_count = 0 for test_category, results in test_results.items(): if "vulnerabilities_found" in results: total_vulns += len(results["vulnerabilities_found"]) for vuln in results["vulnerabilities_found"]: if vuln.get("severity") == "critical": critical_count += 1 financial_impact = "High" if critical_count > 0 else "Medium" if total_vulns > 3 else "Low" operational_impact = "Severe" if critical_count > 2 else "Significant" if critical_count > 0 else "Moderate" compliance_risk = "High" if total_vulns > 5 else "Medium" if total_vulns > 2 else "Low" return { "financial_impact": financial_impact, "operational_impact": operational_impact, "compliance_risk": compliance_risk, "reputation_risk": "High" if critical_count > 0 else "Medium" if total_vulns > 3 else "Low" } def main(): """Main function to demonstrate business logic vulnerability testing""" print("šŸ” Business Logic Vulnerability Assessment") print("=" * 50) scanner = BusinessLogicVulnerabilityScanner() # Test configuration (vulnerable for demonstration) test_config = { "concurrency_protection": { "pessimistic_locking": False, "optimistic_locking": False, "transaction_isolation": "READ_COMMITTED", "level": "none" }, "business_rules": { "pricing": { "client_side_validation": True, "server_side_validation": False, "encrypt_price_data": False }, "authorization": { "role_hierarchy": {}, "implicit_permissions": True, "ownership_validation": False }, "workflows": { "step_validation": False, "state_verification": False, "temporal_constraints": False }, "validation": { "whitelist_approach": False, "blacklist_approach": True, "encoding_validation": False } }, "validation_config": { "sql": { "parameterized_queries": False, "input_sanitization": False }, "xss": { "output_encoding": False, "input_filtering": False, "csp_enabled": False }, "file_upload": { "extension_check": True, "mime_type_check": True, "content_validation": False }, "api_parameters": { "type_validation": False, "range_validation": False, "format_validation": False } }, "state_config": { "shopping_cart": { "server_side_validation": False, "price_recalculation": False, "tamper_protection": False }, "user_profile": { "readonly_fields": [], "field_validation": False, "privilege_check": False }, "session": { "server_side_storage": False, "session_encryption": False, "integrity_check": False }, "orders": { "state_machine": False, "transition_validation": False, "audit_logging": False } } } # Run tests print("Running business logic vulnerability tests...") # Test 1: Race conditions race_results = scanner.test_race_conditions(test_config) print(f"\nšŸ Race Condition Tests: {len(race_results['vulnerabilities_found'])} vulnerabilities found") # Test 2: Logic flaws logic_results = scanner.test_logic_flaws(test_config["business_rules"]) print(f"🧠 Logic Flaw Tests: {len(logic_results['vulnerabilities_found'])} vulnerabilities found") # Test 3: Improper validation validation_results = scanner.test_improper_validation(test_config["validation_config"]) print(f"āœ… Validation Tests: {len(validation_results['bypassed_validations'])} bypasses found") # Test 4: State manipulation state_results = scanner.test_state_manipulation(test_config["state_config"]) print(f"šŸ”„ State Manipulation Tests: {len(state_results['manipulation_vectors'])} vectors found") # Compile results test_results = { "race_conditions": race_results, "logic_flaws": logic_results, "improper_validation": validation_results, "state_manipulation": state_results } # Generate report report = scanner.generate_business_logic_report(test_results) print(f"\nšŸ“Š BUSINESS LOGIC SECURITY REPORT") print("-" * 40) print(f"Overall Risk Level: {report['overall_risk_level']}") print(f"Total Vulnerabilities: {report['vulnerability_summary']['total_vulnerabilities']}") print(f"Critical Issues: {report['vulnerability_summary']['critical_vulnerabilities']}") print(f"High Priority Issues: {report['vulnerability_summary']['high_vulnerabilities']}") # Show top vulnerabilities if report['remediation_priorities']: print(f"\n🚨 TOP CRITICAL/HIGH VULNERABILITIES") print("-" * 40) for i, priority in enumerate(report['remediation_priorities'][:5], 1): print(f"{i}. [{priority['severity'].upper()}] {priority['description']}") print(f" → {priority['remediation']}") # Save report report_filename = f"business_logic_vulnerability_report_{scanner.assessment_id}.json" with open(report_filename, 'w') as f: json.dump(report, f, indent=2) print(f"\nšŸ“„ Detailed report saved to: {report_filename}") print(f"šŸ’¼ Business Impact: {report['business_impact']['financial_impact']} financial, {report['business_impact']['operational_impact']} operational") return report if __name__ == "__main__": vulnerability_report = main()