#!/usr/bin/env python3
"""
Business Logic Vulnerability Scanner
Tests for race conditions, logical flaws, and improper validation
Author: Security Engineer
Purpose: Identify business logic vulnerabilities through simulation
"""
import json
import math
import hashlib
import datetime
import re
import base64
import itertools
import collections
import html
import urllib.parse
from typing import Dict, List, Optional, Any, Union
class BusinessLogicVulnerabilityScanner:
"""Business logic vulnerability assessment tool"""
def __init__(self):
self.vulnerabilities = []
self.test_results = {}
self.assessment_id = hashlib.sha256(
datetime.datetime.now().isoformat().encode()
).hexdigest()[:16]
def test_race_conditions(self, api_config: Dict) -> Dict:
"""Test for race condition vulnerabilities in concurrent operations"""
results = {
"race_condition_scenarios": [],
"vulnerabilities_found": [],
"race_vulnerable_endpoints": []
}
# Test concurrent transactions
transaction_scenarios = [
{
"name": "Concurrent Fund Transfer",
"scenario": "Two users transfer funds simultaneously from same account",
"vulnerability": "double_spending",
"test_payload": self._generate_concurrent_transfer_payload()
},
{
"name": "Concurrent Inventory Update",
"scenario": "Multiple users purchase last item simultaneously",
"vulnerability": "overselling",
"test_payload": self._generate_concurrent_inventory_payload()
},
{
"name": "Concurrent Bid Placement",
"scenario": "Multiple bids placed at same timestamp",
"vulnerability": "bid_manipulation",
"test_payload": self._generate_concurrent_bid_payload()
},
{
"name": "Concurrent Registration",
"scenario": "Multiple users register with same username",
"vulnerability": "account_takeover",
"test_payload": self._generate_concurrent_registration_payload()
}
]
for scenario in transaction_scenarios:
# Simulate race condition testing
race_test_result = self._simulate_race_condition_test(
scenario,
api_config.get("concurrency_protection", {})
)
results["race_condition_scenarios"].append({
"scenario": scenario["name"],
"description": scenario["scenario"],
"vulnerability_type": scenario["vulnerability"],
"test_result": race_test_result,
"exploitable": race_test_result.get("vulnerable", False)
})
if race_test_result.get("vulnerable", False):
results["vulnerabilities_found"].append({
"type": "race_condition",
"scenario": scenario["name"],
"vulnerability": scenario["vulnerability"],
"severity": "high",
"description": f"Race condition in {scenario['name']}: {scenario['scenario']}"
})
if "endpoint" in race_test_result:
results["race_vulnerable_endpoints"].append(race_test_result["endpoint"])
return results
def test_logic_flaws(self, business_rules: Dict) -> Dict:
"""Test for business logic flaws"""
results = {
"logic_flaw_tests": [],
"vulnerabilities_found": [],
"bypassed_rules": []
}
# Test price manipulation
price_flaws = self._test_price_manipulation(business_rules.get("pricing", {}))
results["logic_flaw_tests"].append(price_flaws)
# Test authorization bypass
auth_flaws = self._test_authorization_logic_flaws(business_rules.get("authorization", {}))
results["logic_flaw_tests"].append(auth_flaws)
# Test workflow bypass
workflow_flaws = self._test_workflow_bypass(business_rules.get("workflows", {}))
results["logic_flaw_tests"].append(workflow_flaws)
# Test validation bypass
validation_flaws = self._test_validation_logic(business_rules.get("validation", {}))
results["logic_flaw_tests"].append(validation_flaws)
# Collect vulnerabilities
for test in results["logic_flaw_tests"]:
if test.get("vulnerable", False):
results["vulnerabilities_found"].append({
"type": "business_logic_flaw",
"category": test.get("category", "unknown"),
"severity": test.get("severity", "medium"),
"description": test.get("description", "Logic flaw detected"),
"bypass_method": test.get("bypass_method", "unknown")
})
if test.get("bypassed_rule"):
results["bypassed_rules"].append(test["bypassed_rule"])
return results
def test_improper_validation(self, validation_config: Dict) -> Dict:
"""Test for improper input validation"""
results = {
"validation_tests": [],
"bypassed_validations": [],
"vulnerable_parameters": []
}
# Test SQL injection bypasses
sql_bypasses = self._test_sql_validation_bypasses(validation_config.get("sql", {}))
results["validation_tests"].append(sql_bypasses)
# Test XSS bypasses
xss_bypasses = self._test_xss_validation_bypasses(validation_config.get("xss", {}))
results["validation_tests"].append(xss_bypasses)
# Test file upload validation
file_bypasses = self._test_file_upload_validation(validation_config.get("file_upload", {}))
results["validation_tests"].append(file_bypasses)
# Test API parameter validation
api_bypasses = self._test_api_parameter_validation(validation_config.get("api_parameters", {}))
results["validation_tests"].append(api_bypasses)
# Collect vulnerabilities
for test in results["validation_tests"]:
if test.get("bypass_successful", False):
results["bypassed_validations"].append({
"validation_type": test.get("type", "unknown"),
"bypass_payload": test.get("payload", ""),
"severity": test.get("severity", "medium")
})
if test.get("parameter"):
results["vulnerable_parameters"].append(test["parameter"])
return results
def test_state_manipulation(self, state_config: Dict) -> Dict:
"""Test for application state manipulation vulnerabilities"""
results = {
"state_tests": [],
"manipulation_vectors": [],
"vulnerable_states": []
}
# Test shopping cart manipulation
cart_tests = self._test_shopping_cart_manipulation(state_config.get("shopping_cart", {}))
results["state_tests"].append(cart_tests)
# Test user profile manipulation
profile_tests = self._test_profile_manipulation(state_config.get("user_profile", {}))
results["state_tests"].append(profile_tests)
# Test session state manipulation
session_tests = self._test_session_state_manipulation(state_config.get("session", {}))
results["state_tests"].append(session_tests)
# Test order state manipulation
order_tests = self._test_order_state_manipulation(state_config.get("orders", {}))
results["state_tests"].append(order_tests)
# Collect manipulation vectors
for test in results["state_tests"]:
if test.get("manipulable", False):
results["manipulation_vectors"].append({
"state_type": test.get("state_type", "unknown"),
"manipulation_method": test.get("method", "unknown"),
"impact": test.get("impact", "medium")
})
if test.get("vulnerable_state"):
results["vulnerable_states"].append(test["vulnerable_state"])
return results
def _simulate_race_condition_test(self, scenario: Dict, protection: Dict) -> Dict:
"""Simulate race condition vulnerability test"""
# Check if proper concurrency controls exist
has_pessimistic_locking = protection.get("pessimistic_locking", False)
has_optimistic_locking = protection.get("optimistic_locking", False)
has_transaction_isolation = protection.get("transaction_isolation", "READ_COMMITTED")
vulnerable = True # Assume vulnerable by default
if has_pessimistic_locking:
vulnerable = False
elif has_optimistic_locking and protection.get("version_checking", True):
vulnerable = False
elif has_transaction_isolation == "SERIALIZABLE":
vulnerable = False
return {
"vulnerable": vulnerable,
"endpoint": f"/api/{scenario['vulnerability']}",
"protection_level": protection.get("level", "none"),
"mitigation_required": vulnerable,
"attack_vector": "concurrent_requests"
}
def _test_price_manipulation(self, pricing_config: Dict) -> Dict:
"""Test for price manipulation vulnerabilities"""
client_side_validation = pricing_config.get("client_side_validation", True)
server_side_validation = pricing_config.get("server_side_validation", True)
price_encryption = pricing_config.get("encrypt_price_data", False)
vulnerable = False
bypass_method = ""
# Test if price can be modified client-side
if client_side_validation and not server_side_validation:
vulnerable = True
bypass_method = "client_side_price_modification"
# Test if price data is encrypted
if not price_encryption and not server_side_validation:
vulnerable = True
bypass_method = "unencrypted_price_interception"
return {
"category": "price_manipulation",
"vulnerable": vulnerable,
"severity": "high" if vulnerable else "none",
"description": "Price can be manipulated during checkout",
"bypass_method": bypass_method
}
def _test_authorization_logic_flaws(self, auth_config: Dict) -> Dict:
"""Test authorization logic flaws"""
role_hierarchy = auth_config.get("role_hierarchy", {})
implicit_permissions = auth_config.get("implicit_permissions", False)
ownership_validation = auth_config.get("ownership_validation", True)
vulnerable = False
bypass_method = ""
# Test for implicit permission escalation
if implicit_permissions and not role_hierarchy:
vulnerable = True
bypass_method = "implicit_permission_escalation"
# Test for ownership validation bypass
if not ownership_validation:
vulnerable = True
bypass_method = "ownership_bypass"
return {
"category": "authorization_logic",
"vulnerable": vulnerable,
"severity": "critical" if vulnerable else "none",
"description": "Authorization logic allows privilege escalation",
"bypass_method": bypass_method
}
def _test_workflow_bypass(self, workflow_config: Dict) -> Dict:
"""Test workflow bypass vulnerabilities"""
step_validation = workflow_config.get("step_validation", True)
state_verification = workflow_config.get("state_verification", True)
temporal_constraints = workflow_config.get("temporal_constraints", False)
vulnerable = False
bypass_method = ""
if not step_validation:
vulnerable = True
bypass_method = "step_skipping"
if not state_verification:
vulnerable = True
bypass_method = "state_manipulation"
if not temporal_constraints:
vulnerable = True
bypass_method = "temporal_bypass"
return {
"category": "workflow_bypass",
"vulnerable": vulnerable,
"severity": "medium" if vulnerable else "none",
"description": "Workflow steps can be bypassed",
"bypass_method": bypass_method,
"bypassed_rule": "workflow_sequence_validation"
}
def _test_validation_logic(self, validation_config: Dict) -> Dict:
"""Test validation logic flaws"""
whitelist_approach = validation_config.get("whitelist_approach", False)
blacklist_approach = validation_config.get("blacklist_approach", True)
encoding_validation = validation_config.get("encoding_validation", True)
vulnerable = False
bypass_method = ""
if blacklist_approach and not whitelist_approach:
vulnerable = True
bypass_method = "blacklist_bypass"
if not encoding_validation:
vulnerable = True
bypass_method = "encoding_bypass"
return {
"category": "validation_logic",
"vulnerable": vulnerable,
"severity": "medium" if vulnerable else "none",
"description": "Input validation can be bypassed",
"bypass_method": bypass_method
}
def _test_sql_validation_bypasses(self, sql_config: Dict) -> Dict:
"""Test SQL injection validation bypasses"""
bypass_payloads = [
"' OR 1=1--",
"1' UNION SELECT * FROM users--",
"'; DROP TABLE users--",
"1' OR (SELECT COUNT(*) FROM users) > 0--"
]
parameterized_queries = sql_config.get("parameterized_queries", False)
input_sanitization = sql_config.get("input_sanitization", True)
vulnerable = not parameterized_queries or not input_sanitization
return {
"type": "sql_injection",
"bypass_successful": vulnerable,
"payload": bypass_payloads[0] if vulnerable else "",
"parameter": "user_id",
"severity": "critical" if vulnerable else "none"
}
def _test_xss_validation_bypasses(self, xss_config: Dict) -> Dict:
"""Test XSS validation bypasses"""
bypass_payloads = [
"",
"
",
"javascript:alert('XSS')",
"