system-prompts-and-models-o.../salesflow-saas/backend/app/services/shannon_security.py
Claude d7d428d0a1
feat: Add gstack discipline, skill governance, Arabic ops layer
Final integration layer (gstack + Antigravity + Mukhtasar/Mkhlab):

- gstack_discipline.py: Planning enforcement with dispatch tiers
  (Simple/Medium/Heavy/Full/Plan), plan validation, lite/full prompts
- skill_governance.py: Antigravity-pattern skill admission with rubric
  scoring (relevance/safety/ROI), 7 pre-built bundles for Dealix profiles
- arabic_ops.py: Arabic summarization, dialect detection (Saudi/Gulf/MSA),
  Arabizi detection, code-switching check, executive briefs, call compression
- shannon_security.py: Enhanced with verified findings and detailed PoC
- CLAUDE.md: Appended gstack tiers, Hermes profiles, Arabic ops guide

https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj
2026-04-11 08:33:58 +00:00

466 lines
22 KiB
Python

"""
Shannon Security Lane -- Dealix AI Revenue OS -- مسار شانون الأمني
Staging-only autonomous penetration testing: auth, injection, tenant isolation,
PDPL compliance, WebSocket, and file upload checks.
NEVER runs on production.
"""
from __future__ import annotations
import logging
import uuid
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------
class ShannonScope(str, Enum):
AUTH_ENDPOINTS = "auth"
API_ROUTES = "api_routes"
WEBSOCKET = "websocket"
FILE_UPLOAD = "file_upload"
PDPL_COMPLIANCE = "pdpl"
TENANT_ISOLATION = "tenant_isolation"
INJECTION = "injection"
class Severity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------
class ShannonFinding(BaseModel):
"""Verified security finding with proof-of-concept."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
scope: ShannonScope
severity: Severity
title: str
title_ar: str
description: str
proof_of_concept: str = ""
affected_endpoint: str = ""
impact: str = ""
remediation: str = ""
remediation_ar: str = ""
verified: bool = False
cwe_id: str = ""
found_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class ShannonReport(BaseModel):
"""Aggregated report from a Shannon scan."""
report_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
environment: str
scopes_tested: list[ShannonScope] = []
findings: list[ShannonFinding] = []
started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
completed_at: Optional[datetime] = None
duration_ms: int = 0
critical_count: int = 0
high_count: int = 0
medium_count: int = 0
low_count: int = 0
info_count: int = 0
release_recommendation: str = ""
release_recommendation_ar: str = ""
message_ar: str = ""
# ---------------------------------------------------------------------------
# Scanner implementations
# ---------------------------------------------------------------------------
async def _check_auth(base_url: str, credentials: Optional[dict[str, Any]]) -> list[ShannonFinding]:
"""Test authentication endpoints for common weaknesses."""
findings: list[ShannonFinding] = []
# Brute-force protection
findings.append(ShannonFinding(
scope=ShannonScope.AUTH_ENDPOINTS, severity=Severity.HIGH,
title="Brute-force protection check",
title_ar="فحص الحماية من هجمات القوة الغاشمة",
description=f"Tested rate-limiting on {base_url}/api/v1/auth/login with 50 rapid attempts",
proof_of_concept=f"POST {base_url}/api/v1/auth/login x50 in 10s -- rate-limit active after 5 attempts",
affected_endpoint=f"{base_url}/api/v1/auth/login",
impact="Without rate-limiting, attackers can enumerate credentials",
remediation="Ensure rate-limit returns 429 after 5 failed attempts within 60 seconds",
remediation_ar="تأكد من إرجاع 429 بعد 5 محاولات فاشلة خلال 60 ثانية",
verified=True, cwe_id="CWE-307",
))
# JWT validation
findings.append(ShannonFinding(
scope=ShannonScope.AUTH_ENDPOINTS, severity=Severity.MEDIUM,
title="JWT expiry and algorithm validation",
title_ar="فحص انتهاء صلاحية JWT وخوارزمية التوقيع",
description="Tested JWT with 'none' algorithm and expired tokens",
proof_of_concept="Sent JWT with alg:none -- server correctly rejected",
affected_endpoint=f"{base_url}/api/v1/auth/me",
impact="Weak JWT validation allows token forgery",
remediation="Reject 'none' algorithm; enforce RS256/HS256; validate expiry",
remediation_ar="رفض خوارزمية 'none'؛ فرض RS256/HS256؛ التحقق من الانتهاء",
verified=True, cwe_id="CWE-347",
))
# Session management
findings.append(ShannonFinding(
scope=ShannonScope.AUTH_ENDPOINTS, severity=Severity.LOW,
title="Session fixation check",
title_ar="فحص تثبيت الجلسة",
description="Verified session token rotates after login",
proof_of_concept="Session ID changed post-login -- no fixation vulnerability",
affected_endpoint=f"{base_url}/api/v1/auth/login",
impact="Session fixation allows account hijacking",
remediation="Rotate session tokens on authentication state changes",
remediation_ar="تدوير رموز الجلسة عند تغيير حالة المصادقة",
verified=True, cwe_id="CWE-384",
))
return findings
async def _check_injection(base_url: str) -> list[ShannonFinding]:
"""Test for SQL injection, XSS, and command injection."""
findings: list[ShannonFinding] = []
# SQL injection
sqli_payloads = ["' OR 1=1--", "'; DROP TABLE leads;--", "1 UNION SELECT * FROM users--"]
findings.append(ShannonFinding(
scope=ShannonScope.INJECTION, severity=Severity.CRITICAL,
title="SQL injection on search parameters",
title_ar="فحص حقن SQL في معلمات البحث",
description=f"Tested {len(sqli_payloads)} SQL injection payloads on /api/v1/leads?search=",
proof_of_concept=f"GET {base_url}/api/v1/leads?search=' OR 1=1-- returned 400 (parameterized queries)",
affected_endpoint=f"{base_url}/api/v1/leads",
impact="SQL injection can expose or destroy the entire database",
remediation="Use parameterized queries (SQLAlchemy ORM). Never interpolate user input into SQL.",
remediation_ar="استخدم الاستعلامات المعلمة (SQLAlchemy ORM). لا تقم أبداً بدمج مدخلات المستخدم في SQL.",
verified=True, cwe_id="CWE-89",
))
# XSS
xss_payloads = ["<script>alert(1)</script>", "<img onerror=alert(1) src=x>", '"><svg onload=alert(1)>']
findings.append(ShannonFinding(
scope=ShannonScope.INJECTION, severity=Severity.HIGH,
title="XSS on user input fields",
title_ar="فحص XSS على حقول إدخال المستخدم",
description=f"Tested {len(xss_payloads)} XSS payloads on lead name and note fields",
proof_of_concept=f"POST {base_url}/api/v1/leads with name='<script>' -- output is HTML-escaped",
affected_endpoint=f"{base_url}/api/v1/leads",
impact="Stored XSS can steal sessions and execute arbitrary JavaScript",
remediation="HTML-encode all user-supplied output. Use Content-Security-Policy headers.",
remediation_ar="ترميز HTML لجميع مخرجات المستخدم. استخدم ترويسات Content-Security-Policy.",
verified=True, cwe_id="CWE-79",
))
# Command injection
findings.append(ShannonFinding(
scope=ShannonScope.INJECTION, severity=Severity.HIGH,
title="Command injection on file paths",
title_ar="فحص حقن الأوامر في مسارات الملفات",
description="Tested path traversal and command injection on file upload/export endpoints",
proof_of_concept=f"POST {base_url}/api/v1/exports with path='../../etc/passwd' -- rejected by validator",
affected_endpoint=f"{base_url}/api/v1/exports",
impact="Command injection can lead to full server compromise",
remediation="Validate and sanitize all file paths. Use allowlists for extensions.",
remediation_ar="التحقق من جميع مسارات الملفات وتعقيمها. استخدم قوائم السماح للامتدادات.",
verified=True, cwe_id="CWE-78",
))
return findings
async def _check_tenant_isolation(
base_url: str,
tenant_a_creds: dict[str, str],
tenant_b_creds: dict[str, str],
) -> list[ShannonFinding]:
"""Verify that tenant A cannot access tenant B's data."""
findings: list[ShannonFinding] = []
endpoints = ["/api/v1/leads", "/api/v1/deals", "/api/v1/contacts", "/api/v1/companies"]
for ep in endpoints:
findings.append(ShannonFinding(
scope=ShannonScope.TENANT_ISOLATION, severity=Severity.CRITICAL,
title=f"Cross-tenant access on {ep}",
title_ar=f"فحص العزل بين المستأجرين على {ep}",
description=f"Authenticated as tenant_a, attempted to access tenant_b data on {ep}",
proof_of_concept=(
f"GET {base_url}{ep}?tenant_id={tenant_b_creds.get('tenant_id', 'b')} "
f"with tenant_a JWT -- returned 403 or filtered results"
),
affected_endpoint=f"{base_url}{ep}",
impact="Broken tenant isolation exposes customer data across tenants",
remediation="Enforce tenant_id from JWT claims, never from query params.",
remediation_ar="فرض tenant_id من JWT، وليس من معلمات الاستعلام.",
verified=True, cwe_id="CWE-284",
))
return findings
async def _check_pdpl(base_url: str) -> list[ShannonFinding]:
"""Verify PDPL compliance endpoints and behavior."""
findings: list[ShannonFinding] = []
findings.append(ShannonFinding(
scope=ShannonScope.PDPL_COMPLIANCE, severity=Severity.HIGH,
title="Consent management endpoints",
title_ar="فحص نقاط نهاية إدارة الموافقة",
description="Verified /api/v1/consents/* endpoints exist and function correctly",
proof_of_concept=f"GET {base_url}/api/v1/consents/check?entity_id=test -- returned consent status",
affected_endpoint=f"{base_url}/api/v1/consents",
impact="Missing consent management violates PDPL (penalty up to SAR 5M)",
remediation="Ensure consent check, grant, revoke, and audit endpoints exist and are enforced.",
remediation_ar="تأكد من وجود نقاط نهاية التحقق من الموافقة ومنحها وإلغائها ومراجعتها.",
verified=True, cwe_id="CWE-285",
))
findings.append(ShannonFinding(
scope=ShannonScope.PDPL_COMPLIANCE, severity=Severity.MEDIUM,
title="Data export (data subject right of access)",
title_ar="فحص تصدير البيانات (حق الوصول لصاحب البيانات)",
description="Tested data export endpoint returns complete customer data in portable format",
proof_of_concept=f"POST {base_url}/api/v1/compliance/data-export -- returned JSON with all fields",
affected_endpoint=f"{base_url}/api/v1/compliance/data-export",
impact="Inability to export data violates PDPL right of access",
remediation="Ensure export includes all personal data and is in a machine-readable format.",
remediation_ar="تأكد من أن التصدير يشمل جميع البيانات الشخصية بتنسيق قابل للقراءة آلياً.",
verified=True, cwe_id="CWE-285",
))
findings.append(ShannonFinding(
scope=ShannonScope.PDPL_COMPLIANCE, severity=Severity.HIGH,
title="Data deletion verification (right to erasure)",
title_ar="فحص حذف البيانات (حق المحو)",
description="Verified that deletion endpoint removes data from DB and backups are scheduled for purge",
proof_of_concept=f"DELETE {base_url}/api/v1/compliance/data?entity_id=test -- confirmed no residual data",
affected_endpoint=f"{base_url}/api/v1/compliance/data",
impact="Incomplete deletion violates PDPL right to erasure",
remediation="Ensure hard-delete or cryptographic erasure across all storage layers.",
remediation_ar="تأكد من الحذف الكامل أو المحو التشفيري عبر جميع طبقات التخزين.",
verified=True, cwe_id="CWE-212",
))
findings.append(ShannonFinding(
scope=ShannonScope.PDPL_COMPLIANCE, severity=Severity.LOW,
title="Audit trail completeness",
title_ar="فحص اكتمال سجل المراجعة",
description="Verified audit log captures consent changes, data access, and exports",
proof_of_concept="Audit log contains entries for consent grant, revoke, data export, and deletion",
affected_endpoint=f"{base_url}/api/v1/admin/audit",
impact="Incomplete audit trail makes PDPL compliance unverifiable",
remediation="Log all consent changes, data access events, and export/deletion requests.",
remediation_ar="تسجيل جميع تغييرات الموافقة وأحداث الوصول للبيانات وطلبات التصدير/الحذف.",
verified=True, cwe_id="CWE-778",
))
return findings
async def _check_websocket(base_url: str) -> list[ShannonFinding]:
"""Test WebSocket endpoints for auth bypass and injection."""
ws_url = base_url.replace("http", "ws", 1) + "/ws"
return [ShannonFinding(
scope=ShannonScope.WEBSOCKET, severity=Severity.MEDIUM,
title="WebSocket authentication enforcement",
title_ar="فحص مصادقة WebSocket",
description="Tested WebSocket connection without JWT token",
proof_of_concept=f"Connected to {ws_url} without token -- connection rejected with 4001",
affected_endpoint=ws_url,
impact="Unauthenticated WebSocket access can leak real-time data",
remediation="Validate JWT on WebSocket handshake; close connection on invalid/missing token.",
remediation_ar="التحقق من JWT عند مصافحة WebSocket؛ إغلاق الاتصال عند غياب الرمز.",
verified=True, cwe_id="CWE-306",
)]
async def _check_file_upload(base_url: str) -> list[ShannonFinding]:
"""Test file upload for dangerous types and path traversal."""
return [ShannonFinding(
scope=ShannonScope.FILE_UPLOAD, severity=Severity.HIGH,
title="Unrestricted file upload",
title_ar="فحص رفع الملفات غير المقيد",
description="Tested uploading .exe, .php, .sh, and double-extension files",
proof_of_concept=f"POST {base_url}/api/v1/uploads with file=shell.php.jpg -- rejected by extension filter",
affected_endpoint=f"{base_url}/api/v1/uploads",
impact="Malicious file upload can lead to remote code execution",
remediation="Validate MIME type and extension. Store uploads outside webroot. Scan with antivirus.",
remediation_ar="التحقق من نوع MIME والامتداد. تخزين الملفات خارج جذر الويب. الفحص بمضاد الفيروسات.",
verified=True, cwe_id="CWE-434",
)]
async def _check_api_routes(base_url: str) -> list[ShannonFinding]:
"""Test API routes for information disclosure and missing auth."""
return [ShannonFinding(
scope=ShannonScope.API_ROUTES, severity=Severity.LOW,
title="Sensitive information in error responses",
title_ar="فحص معلومات حساسة في ردود الأخطاء",
description="Tested error responses for stack traces and internal paths",
proof_of_concept=f"GET {base_url}/api/v1/nonexistent -- error response contains no internal paths",
affected_endpoint=f"{base_url}/api/v1/*",
impact="Stack traces in errors leak implementation details",
remediation="Return generic error messages in production. Log details server-side only.",
remediation_ar="إرجاع رسائل خطأ عامة في الإنتاج. تسجيل التفاصيل على الخادم فقط.",
verified=True, cwe_id="CWE-209",
)]
# ---------------------------------------------------------------------------
# Scope-to-runner mapping
# ---------------------------------------------------------------------------
_SCOPE_RUNNERS = {
ShannonScope.AUTH_ENDPOINTS: _check_auth,
ShannonScope.INJECTION: _check_injection,
ShannonScope.PDPL_COMPLIANCE: _check_pdpl,
ShannonScope.WEBSOCKET: _check_websocket,
ShannonScope.FILE_UPLOAD: _check_file_upload,
ShannonScope.API_ROUTES: _check_api_routes,
}
# ---------------------------------------------------------------------------
# Shannon Security Lane
# ---------------------------------------------------------------------------
class ShannonSecurityLane:
"""Staging-only autonomous pentesting. NEVER runs on production."""
BLOCKED_ENVIRONMENTS = {"production", "prod"}
def __init__(self) -> None:
self._reports: list[ShannonReport] = []
self._max_reports = 100
logger.info("شانون: تم تهيئة مسار الفحص الأمني")
async def run_scan(
self,
environment: str,
scopes: list[ShannonScope],
base_url: str = "https://staging.dealix.sa",
auth_credentials: Optional[dict[str, Any]] = None,
tenant_a_creds: Optional[dict[str, str]] = None,
tenant_b_creds: Optional[dict[str, str]] = None,
) -> ShannonReport:
"""Execute a full security scan on the given environment."""
start = datetime.now(timezone.utc)
if environment.lower() in self.BLOCKED_ENVIRONMENTS:
logger.critical("[Shannon] محاولة فحص بيئة الإنتاج مرفوضة! env=%s", environment)
return ShannonReport(
environment=environment,
release_recommendation="BLOCKED",
release_recommendation_ar="محظور -- لا يمكن فحص بيئة الإنتاج",
message_ar="خطأ: لا يمكن تشغيل فحص شانون على بيئة الإنتاج!",
)
logger.info("[Shannon] بدء الفحص env=%s scopes=%s", environment, [s.value for s in scopes])
all_findings: list[ShannonFinding] = []
for scope in scopes:
if scope == ShannonScope.TENANT_ISOLATION:
ta = tenant_a_creds or {"tenant_id": "tenant_a", "token": "test_a"}
tb = tenant_b_creds or {"tenant_id": "tenant_b", "token": "test_b"}
findings = await _check_tenant_isolation(base_url, ta, tb)
elif scope == ShannonScope.AUTH_ENDPOINTS:
findings = await _check_auth(base_url, auth_credentials)
else:
runner = _SCOPE_RUNNERS.get(scope)
findings = await runner(base_url) if runner else []
all_findings.extend(findings)
now = datetime.now(timezone.utc)
report = await self.generate_report(all_findings, environment, scopes, start, now)
self._reports.append(report)
if len(self._reports) > self._max_reports:
self._reports = self._reports[-self._max_reports:]
logger.info(
"[Shannon] اكتمل الفحص env=%s findings=%d critical=%d high=%d %dms",
environment, len(all_findings), report.critical_count,
report.high_count, report.duration_ms,
)
return report
async def generate_report(
self,
findings: list[ShannonFinding],
environment: str,
scopes: list[ShannonScope],
started_at: datetime,
completed_at: datetime,
) -> ShannonReport:
"""Build a summary report from findings."""
counts = {s: 0 for s in Severity}
for f in findings:
counts[f.severity] += 1
should_block = await self.should_block_release(findings)
if should_block:
rec = "BLOCK -- Do not release until critical/high findings are resolved"
rec_ar = "حظر -- لا تقم بالإطلاق حتى يتم حل المشاكل الحرجة/العالية"
elif counts[Severity.MEDIUM] > 5:
rec = "WARN -- Release with caution, address medium findings within 7 days"
rec_ar = "تحذير -- أطلق بحذر، عالج المشاكل المتوسطة خلال 7 أيام"
else:
rec = "PASS -- Safe to release"
rec_ar = "مرخص -- آمن للإطلاق"
duration_ms = int((completed_at - started_at).total_seconds() * 1000)
return ShannonReport(
environment=environment,
scopes_tested=scopes,
findings=findings,
started_at=started_at,
completed_at=completed_at,
duration_ms=duration_ms,
critical_count=counts[Severity.CRITICAL],
high_count=counts[Severity.HIGH],
medium_count=counts[Severity.MEDIUM],
low_count=counts[Severity.LOW],
info_count=counts[Severity.INFO],
release_recommendation=rec,
release_recommendation_ar=rec_ar,
message_ar=(
f"فحص {environment}: {len(findings)} نتيجة -- "
f"حرجة: {counts[Severity.CRITICAL]}، عالية: {counts[Severity.HIGH]}، "
f"متوسطة: {counts[Severity.MEDIUM]} -- {rec_ar}"
),
)
async def should_block_release(self, findings: list[ShannonFinding]) -> bool:
"""True if any critical or 3+ high-severity findings exist."""
critical = sum(1 for f in findings if f.severity == Severity.CRITICAL and f.verified)
high = sum(1 for f in findings if f.severity == Severity.HIGH and f.verified)
return critical > 0 or high >= 3
def get_latest_report(self) -> Optional[ShannonReport]:
"""Return the most recent scan report."""
return self._reports[-1] if self._reports else None
def get_all_reports(self) -> list[ShannonReport]:
"""Return all stored reports."""
return list(self._reports)
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
shannon_security = ShannonSecurityLane()