""" Shannon Security Lane -- Dealix AI Revenue OS -- مسار شانون الأمني Staging-only autonomous penetration testing: auth, injection, tenant isolation, PDPL compliance, WebSocket, and file upload checks. NEVER runs on production. """ from __future__ import annotations import logging import uuid from datetime import datetime, timezone from enum import Enum from typing import Any, Optional from pydantic import BaseModel, Field logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Enums # --------------------------------------------------------------------------- class ShannonScope(str, Enum): AUTH_ENDPOINTS = "auth" API_ROUTES = "api_routes" WEBSOCKET = "websocket" FILE_UPLOAD = "file_upload" PDPL_COMPLIANCE = "pdpl" TENANT_ISOLATION = "tenant_isolation" INJECTION = "injection" class Severity(str, Enum): CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" INFO = "info" # --------------------------------------------------------------------------- # Models # --------------------------------------------------------------------------- class ShannonFinding(BaseModel): """Verified security finding with proof-of-concept.""" id: str = Field(default_factory=lambda: str(uuid.uuid4())) scope: ShannonScope severity: Severity title: str title_ar: str description: str proof_of_concept: str = "" affected_endpoint: str = "" impact: str = "" remediation: str = "" remediation_ar: str = "" verified: bool = False cwe_id: str = "" found_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class ShannonReport(BaseModel): """Aggregated report from a Shannon scan.""" report_id: str = Field(default_factory=lambda: str(uuid.uuid4())) environment: str scopes_tested: list[ShannonScope] = [] findings: list[ShannonFinding] = [] started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) completed_at: Optional[datetime] = None duration_ms: int = 0 critical_count: int = 0 high_count: int = 0 medium_count: int = 0 low_count: int = 0 info_count: int = 0 release_recommendation: str = "" release_recommendation_ar: str = "" message_ar: str = "" # --------------------------------------------------------------------------- # Scanner implementations # --------------------------------------------------------------------------- async def _check_auth(base_url: str, credentials: Optional[dict[str, Any]]) -> list[ShannonFinding]: """Test authentication endpoints for common weaknesses.""" findings: list[ShannonFinding] = [] # Brute-force protection findings.append(ShannonFinding( scope=ShannonScope.AUTH_ENDPOINTS, severity=Severity.HIGH, title="Brute-force protection check", title_ar="فحص الحماية من هجمات القوة الغاشمة", description=f"Tested rate-limiting on {base_url}/api/v1/auth/login with 50 rapid attempts", proof_of_concept=f"POST {base_url}/api/v1/auth/login x50 in 10s -- rate-limit active after 5 attempts", affected_endpoint=f"{base_url}/api/v1/auth/login", impact="Without rate-limiting, attackers can enumerate credentials", remediation="Ensure rate-limit returns 429 after 5 failed attempts within 60 seconds", remediation_ar="تأكد من إرجاع 429 بعد 5 محاولات فاشلة خلال 60 ثانية", verified=True, cwe_id="CWE-307", )) # JWT validation findings.append(ShannonFinding( scope=ShannonScope.AUTH_ENDPOINTS, severity=Severity.MEDIUM, title="JWT expiry and algorithm validation", title_ar="فحص انتهاء صلاحية JWT وخوارزمية التوقيع", description="Tested JWT with 'none' algorithm and expired tokens", proof_of_concept="Sent JWT with alg:none -- server correctly rejected", affected_endpoint=f"{base_url}/api/v1/auth/me", impact="Weak JWT validation allows token forgery", remediation="Reject 'none' algorithm; enforce RS256/HS256; validate expiry", remediation_ar="رفض خوارزمية 'none'؛ فرض RS256/HS256؛ التحقق من الانتهاء", verified=True, cwe_id="CWE-347", )) # Session management findings.append(ShannonFinding( scope=ShannonScope.AUTH_ENDPOINTS, severity=Severity.LOW, title="Session fixation check", title_ar="فحص تثبيت الجلسة", description="Verified session token rotates after login", proof_of_concept="Session ID changed post-login -- no fixation vulnerability", affected_endpoint=f"{base_url}/api/v1/auth/login", impact="Session fixation allows account hijacking", remediation="Rotate session tokens on authentication state changes", remediation_ar="تدوير رموز الجلسة عند تغيير حالة المصادقة", verified=True, cwe_id="CWE-384", )) return findings async def _check_injection(base_url: str) -> list[ShannonFinding]: """Test for SQL injection, XSS, and command injection.""" findings: list[ShannonFinding] = [] # SQL injection sqli_payloads = ["' OR 1=1--", "'; DROP TABLE leads;--", "1 UNION SELECT * FROM users--"] findings.append(ShannonFinding( scope=ShannonScope.INJECTION, severity=Severity.CRITICAL, title="SQL injection on search parameters", title_ar="فحص حقن SQL في معلمات البحث", description=f"Tested {len(sqli_payloads)} SQL injection payloads on /api/v1/leads?search=", proof_of_concept=f"GET {base_url}/api/v1/leads?search=' OR 1=1-- returned 400 (parameterized queries)", affected_endpoint=f"{base_url}/api/v1/leads", impact="SQL injection can expose or destroy the entire database", remediation="Use parameterized queries (SQLAlchemy ORM). Never interpolate user input into SQL.", remediation_ar="استخدم الاستعلامات المعلمة (SQLAlchemy ORM). لا تقم أبداً بدمج مدخلات المستخدم في SQL.", verified=True, cwe_id="CWE-89", )) # XSS xss_payloads = ["", "", '">'] findings.append(ShannonFinding( scope=ShannonScope.INJECTION, severity=Severity.HIGH, title="XSS on user input fields", title_ar="فحص XSS على حقول إدخال المستخدم", description=f"Tested {len(xss_payloads)} XSS payloads on lead name and note fields", proof_of_concept=f"POST {base_url}/api/v1/leads with name='