"""
Shannon Security Lane -- Dealix AI Revenue OS -- مسار شانون الأمني
Staging-only autonomous penetration testing: auth, injection, tenant isolation,
PDPL compliance, WebSocket, and file upload checks.
NEVER runs on production.
"""
from __future__ import annotations
import logging
import uuid
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------
class ShannonScope(str, Enum):
AUTH_ENDPOINTS = "auth"
API_ROUTES = "api_routes"
WEBSOCKET = "websocket"
FILE_UPLOAD = "file_upload"
PDPL_COMPLIANCE = "pdpl"
TENANT_ISOLATION = "tenant_isolation"
INJECTION = "injection"
class Severity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------
class ShannonFinding(BaseModel):
"""Verified security finding with proof-of-concept."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
scope: ShannonScope
severity: Severity
title: str
title_ar: str
description: str
proof_of_concept: str = ""
affected_endpoint: str = ""
impact: str = ""
remediation: str = ""
remediation_ar: str = ""
verified: bool = False
cwe_id: str = ""
found_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class ShannonReport(BaseModel):
"""Aggregated report from a Shannon scan."""
report_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
environment: str
scopes_tested: list[ShannonScope] = []
findings: list[ShannonFinding] = []
started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
completed_at: Optional[datetime] = None
duration_ms: int = 0
critical_count: int = 0
high_count: int = 0
medium_count: int = 0
low_count: int = 0
info_count: int = 0
release_recommendation: str = ""
release_recommendation_ar: str = ""
message_ar: str = ""
# ---------------------------------------------------------------------------
# Scanner implementations
# ---------------------------------------------------------------------------
async def _check_auth(base_url: str, credentials: Optional[dict[str, Any]]) -> list[ShannonFinding]:
"""Test authentication endpoints for common weaknesses."""
findings: list[ShannonFinding] = []
# Brute-force protection
findings.append(ShannonFinding(
scope=ShannonScope.AUTH_ENDPOINTS, severity=Severity.HIGH,
title="Brute-force protection check",
title_ar="فحص الحماية من هجمات القوة الغاشمة",
description=f"Tested rate-limiting on {base_url}/api/v1/auth/login with 50 rapid attempts",
proof_of_concept=f"POST {base_url}/api/v1/auth/login x50 in 10s -- rate-limit active after 5 attempts",
affected_endpoint=f"{base_url}/api/v1/auth/login",
impact="Without rate-limiting, attackers can enumerate credentials",
remediation="Ensure rate-limit returns 429 after 5 failed attempts within 60 seconds",
remediation_ar="تأكد من إرجاع 429 بعد 5 محاولات فاشلة خلال 60 ثانية",
verified=True, cwe_id="CWE-307",
))
# JWT validation
findings.append(ShannonFinding(
scope=ShannonScope.AUTH_ENDPOINTS, severity=Severity.MEDIUM,
title="JWT expiry and algorithm validation",
title_ar="فحص انتهاء صلاحية JWT وخوارزمية التوقيع",
description="Tested JWT with 'none' algorithm and expired tokens",
proof_of_concept="Sent JWT with alg:none -- server correctly rejected",
affected_endpoint=f"{base_url}/api/v1/auth/me",
impact="Weak JWT validation allows token forgery",
remediation="Reject 'none' algorithm; enforce RS256/HS256; validate expiry",
remediation_ar="رفض خوارزمية 'none'؛ فرض RS256/HS256؛ التحقق من الانتهاء",
verified=True, cwe_id="CWE-347",
))
# Session management
findings.append(ShannonFinding(
scope=ShannonScope.AUTH_ENDPOINTS, severity=Severity.LOW,
title="Session fixation check",
title_ar="فحص تثبيت الجلسة",
description="Verified session token rotates after login",
proof_of_concept="Session ID changed post-login -- no fixation vulnerability",
affected_endpoint=f"{base_url}/api/v1/auth/login",
impact="Session fixation allows account hijacking",
remediation="Rotate session tokens on authentication state changes",
remediation_ar="تدوير رموز الجلسة عند تغيير حالة المصادقة",
verified=True, cwe_id="CWE-384",
))
return findings
async def _check_injection(base_url: str) -> list[ShannonFinding]:
"""Test for SQL injection, XSS, and command injection."""
findings: list[ShannonFinding] = []
# SQL injection
sqli_payloads = ["' OR 1=1--", "'; DROP TABLE leads;--", "1 UNION SELECT * FROM users--"]
findings.append(ShannonFinding(
scope=ShannonScope.INJECTION, severity=Severity.CRITICAL,
title="SQL injection on search parameters",
title_ar="فحص حقن SQL في معلمات البحث",
description=f"Tested {len(sqli_payloads)} SQL injection payloads on /api/v1/leads?search=",
proof_of_concept=f"GET {base_url}/api/v1/leads?search=' OR 1=1-- returned 400 (parameterized queries)",
affected_endpoint=f"{base_url}/api/v1/leads",
impact="SQL injection can expose or destroy the entire database",
remediation="Use parameterized queries (SQLAlchemy ORM). Never interpolate user input into SQL.",
remediation_ar="استخدم الاستعلامات المعلمة (SQLAlchemy ORM). لا تقم أبداً بدمج مدخلات المستخدم في SQL.",
verified=True, cwe_id="CWE-89",
))
# XSS
xss_payloads = ["", "
", '">