system-prompts-and-models-o.../salesflow-saas/backend/app/services/security_gate.py
Claude a329957a3b
feat: Add AI engine, PDPL compliance, sequences, CPQ, and governance layers
Phase 1-6 implementation for Dealix AI Revenue OS:

- AI Arabic Engine: NLP (arabic_nlp.py), lead scoring (lead_scoring.py)
- PDPL Compliance: consent manager, data rights handler, consent model
- Sequence Engine: multi-channel sequences with WhatsApp/Email/SMS
- CPQ System: quote engine, AI proposal generator
- Security Gate: pre-release checks, PDPL message validation
- Tool Verification: agent action audit trail
- Project Operating Files: AGENTS.md, CLAUDE.md
- Project Memory: architecture, ADRs, provider routing, PDPL checklist
- Design System: IBM Plex Sans Arabic tokens, RTL-safe components
- Sequence/Consent models for database

https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj
2026-04-11 07:40:39 +00:00

209 lines
7.6 KiB
Python

"""
Security Gate — Dealix AI Revenue OS
Pre-release and runtime security verification.
Blocks risky actions and enforces compliance checks.
"""
import logging
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
class Severity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class GateDecision(str, Enum):
PASS = "pass"
WARN = "warn"
BLOCK = "block"
class SecurityFinding(BaseModel):
id: str
category: str
severity: Severity
title: str
description: str
affected_file: Optional[str] = None
affected_endpoint: Optional[str] = None
recommendation: str
found_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
resolved: bool = False
class GateResult(BaseModel):
decision: GateDecision
findings: list[SecurityFinding] = []
checked_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
summary: str = ""
class SecurityGate:
"""
Security verification gate for Dealix.
Checks auth, PDPL, messaging, and API security.
"""
def __init__(self):
self._findings: list[SecurityFinding] = []
def check_outbound_message(
self,
channel: str,
recipient: str,
tenant_id: str,
has_consent: bool,
consent_purpose: str = None,
) -> GateResult:
findings = []
if not has_consent:
findings.append(SecurityFinding(
id=f"MSG-{len(self._findings)+1}",
category="pdpl",
severity=Severity.CRITICAL,
title="رسالة بدون موافقة PDPL",
description=f"محاولة إرسال رسالة {channel} بدون موافقة مسجلة للعميل",
recommendation="يجب الحصول على موافقة العميل قبل الإرسال",
))
if channel == "whatsapp" and not recipient.startswith("+"):
findings.append(SecurityFinding(
id=f"MSG-{len(self._findings)+2}",
category="validation",
severity=Severity.MEDIUM,
title="رقم واتساب غير صالح",
description=f"الرقم {recipient} لا يبدأ بـ +",
recommendation="استخدم التنسيق الدولي +966XXXXXXXXX",
))
self._findings.extend(findings)
critical = any(f.severity == Severity.CRITICAL for f in findings)
return GateResult(
decision=GateDecision.BLOCK if critical else GateDecision.PASS,
findings=findings,
summary=f"فحص الرسالة: {'محظور - بدون موافقة' if critical else 'مرخص'}",
)
def check_data_access(
self,
user_id: str,
user_role: str,
target_tenant_id: str,
user_tenant_id: str,
action: str,
) -> GateResult:
findings = []
if target_tenant_id != user_tenant_id and user_role != "admin":
findings.append(SecurityFinding(
id=f"ACC-{len(self._findings)+1}",
category="authorization",
severity=Severity.CRITICAL,
title="محاولة وصول عبر المستأجرين",
description=f"المستخدم {user_id} حاول الوصول لبيانات مستأجر آخر",
recommendation="رفض الطلب وتسجيل المحاولة",
))
if action in ("delete", "export") and user_role not in ("owner", "admin"):
findings.append(SecurityFinding(
id=f"ACC-{len(self._findings)+2}",
category="authorization",
severity=Severity.HIGH,
title="إجراء محظور للدور الحالي",
description=f"الدور {user_role} لا يملك صلاحية {action}",
recommendation="ترقية الصلاحيات أو استخدام حساب مخول",
))
self._findings.extend(findings)
critical = any(f.severity == Severity.CRITICAL for f in findings)
return GateResult(
decision=GateDecision.BLOCK if critical else GateDecision.PASS,
findings=findings,
summary=f"فحص الوصول: {action} - {'محظور' if critical else 'مرخص'}",
)
def check_api_request(
self,
endpoint: str,
method: str,
has_auth: bool,
user_role: str = None,
) -> GateResult:
findings = []
sensitive_patterns = ["/compliance", "/admin", "/tenant", "/users"]
is_sensitive = any(p in endpoint for p in sensitive_patterns)
if is_sensitive and not has_auth:
findings.append(SecurityFinding(
id=f"API-{len(self._findings)+1}",
category="authentication",
severity=Severity.CRITICAL,
title="وصول لنقطة حساسة بدون مصادقة",
description=f"{method} {endpoint} بدون JWT token",
recommendation="إضافة مصادقة إلزامية",
))
if is_sensitive and user_role and user_role not in ("owner", "admin"):
findings.append(SecurityFinding(
id=f"API-{len(self._findings)+2}",
category="authorization",
severity=Severity.HIGH,
title="صلاحيات غير كافية",
description=f"الدور {user_role} لا يملك وصول لـ {endpoint}",
recommendation="تحقق من صلاحيات المستخدم",
))
self._findings.extend(findings)
critical = any(f.severity == Severity.CRITICAL for f in findings)
return GateResult(
decision=GateDecision.BLOCK if critical else GateDecision.PASS,
findings=findings,
summary=f"فحص API: {method} {endpoint}",
)
def check_release(self) -> GateResult:
unresolved_critical = [
f for f in self._findings
if f.severity == Severity.CRITICAL and not f.resolved
]
unresolved_high = [
f for f in self._findings
if f.severity == Severity.HIGH and not f.resolved
]
if unresolved_critical:
decision = GateDecision.BLOCK
summary = f"محظور: {len(unresolved_critical)} مشاكل حرجة غير محلولة"
elif unresolved_high:
decision = GateDecision.WARN
summary = f"تحذير: {len(unresolved_high)} مشاكل عالية غير محلولة"
else:
decision = GateDecision.PASS
summary = "مرخص للإطلاق"
return GateResult(
decision=decision,
findings=unresolved_critical + unresolved_high,
summary=summary,
)
def get_all_findings(
self, severity: Severity = None, resolved: bool = None
) -> list[SecurityFinding]:
results = self._findings
if severity:
results = [f for f in results if f.severity == severity]
if resolved is not None:
results = [f for f in results if f.resolved == resolved]
return results
def resolve_finding(self, finding_id: str) -> bool:
for f in self._findings:
if f.id == finding_id:
f.resolved = True
return True
return False
# Global singleton
security_gate = SecurityGate()