mirror of
https://github.com/x1xhlol/system-prompts-and-models-of-ai-tools.git
synced 2026-06-18 15:29:36 +00:00
Phase 1-6 implementation for Dealix AI Revenue OS: - AI Arabic Engine: NLP (arabic_nlp.py), lead scoring (lead_scoring.py) - PDPL Compliance: consent manager, data rights handler, consent model - Sequence Engine: multi-channel sequences with WhatsApp/Email/SMS - CPQ System: quote engine, AI proposal generator - Security Gate: pre-release checks, PDPL message validation - Tool Verification: agent action audit trail - Project Operating Files: AGENTS.md, CLAUDE.md - Project Memory: architecture, ADRs, provider routing, PDPL checklist - Design System: IBM Plex Sans Arabic tokens, RTL-safe components - Sequence/Consent models for database https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj
209 lines
7.6 KiB
Python
209 lines
7.6 KiB
Python
"""
|
|
Security Gate — Dealix AI Revenue OS
|
|
Pre-release and runtime security verification.
|
|
Blocks risky actions and enforces compliance checks.
|
|
"""
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from enum import Enum
|
|
from typing import Any, Optional
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Severity(str, Enum):
|
|
CRITICAL = "critical"
|
|
HIGH = "high"
|
|
MEDIUM = "medium"
|
|
LOW = "low"
|
|
INFO = "info"
|
|
|
|
|
|
class GateDecision(str, Enum):
|
|
PASS = "pass"
|
|
WARN = "warn"
|
|
BLOCK = "block"
|
|
|
|
|
|
class SecurityFinding(BaseModel):
|
|
id: str
|
|
category: str
|
|
severity: Severity
|
|
title: str
|
|
description: str
|
|
affected_file: Optional[str] = None
|
|
affected_endpoint: Optional[str] = None
|
|
recommendation: str
|
|
found_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
|
resolved: bool = False
|
|
|
|
|
|
class GateResult(BaseModel):
|
|
decision: GateDecision
|
|
findings: list[SecurityFinding] = []
|
|
checked_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
|
summary: str = ""
|
|
|
|
|
|
class SecurityGate:
|
|
"""
|
|
Security verification gate for Dealix.
|
|
Checks auth, PDPL, messaging, and API security.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._findings: list[SecurityFinding] = []
|
|
|
|
def check_outbound_message(
|
|
self,
|
|
channel: str,
|
|
recipient: str,
|
|
tenant_id: str,
|
|
has_consent: bool,
|
|
consent_purpose: str = None,
|
|
) -> GateResult:
|
|
findings = []
|
|
if not has_consent:
|
|
findings.append(SecurityFinding(
|
|
id=f"MSG-{len(self._findings)+1}",
|
|
category="pdpl",
|
|
severity=Severity.CRITICAL,
|
|
title="رسالة بدون موافقة PDPL",
|
|
description=f"محاولة إرسال رسالة {channel} بدون موافقة مسجلة للعميل",
|
|
recommendation="يجب الحصول على موافقة العميل قبل الإرسال",
|
|
))
|
|
if channel == "whatsapp" and not recipient.startswith("+"):
|
|
findings.append(SecurityFinding(
|
|
id=f"MSG-{len(self._findings)+2}",
|
|
category="validation",
|
|
severity=Severity.MEDIUM,
|
|
title="رقم واتساب غير صالح",
|
|
description=f"الرقم {recipient} لا يبدأ بـ +",
|
|
recommendation="استخدم التنسيق الدولي +966XXXXXXXXX",
|
|
))
|
|
self._findings.extend(findings)
|
|
critical = any(f.severity == Severity.CRITICAL for f in findings)
|
|
return GateResult(
|
|
decision=GateDecision.BLOCK if critical else GateDecision.PASS,
|
|
findings=findings,
|
|
summary=f"فحص الرسالة: {'محظور - بدون موافقة' if critical else 'مرخص'}",
|
|
)
|
|
|
|
def check_data_access(
|
|
self,
|
|
user_id: str,
|
|
user_role: str,
|
|
target_tenant_id: str,
|
|
user_tenant_id: str,
|
|
action: str,
|
|
) -> GateResult:
|
|
findings = []
|
|
if target_tenant_id != user_tenant_id and user_role != "admin":
|
|
findings.append(SecurityFinding(
|
|
id=f"ACC-{len(self._findings)+1}",
|
|
category="authorization",
|
|
severity=Severity.CRITICAL,
|
|
title="محاولة وصول عبر المستأجرين",
|
|
description=f"المستخدم {user_id} حاول الوصول لبيانات مستأجر آخر",
|
|
recommendation="رفض الطلب وتسجيل المحاولة",
|
|
))
|
|
if action in ("delete", "export") and user_role not in ("owner", "admin"):
|
|
findings.append(SecurityFinding(
|
|
id=f"ACC-{len(self._findings)+2}",
|
|
category="authorization",
|
|
severity=Severity.HIGH,
|
|
title="إجراء محظور للدور الحالي",
|
|
description=f"الدور {user_role} لا يملك صلاحية {action}",
|
|
recommendation="ترقية الصلاحيات أو استخدام حساب مخول",
|
|
))
|
|
self._findings.extend(findings)
|
|
critical = any(f.severity == Severity.CRITICAL for f in findings)
|
|
return GateResult(
|
|
decision=GateDecision.BLOCK if critical else GateDecision.PASS,
|
|
findings=findings,
|
|
summary=f"فحص الوصول: {action} - {'محظور' if critical else 'مرخص'}",
|
|
)
|
|
|
|
def check_api_request(
|
|
self,
|
|
endpoint: str,
|
|
method: str,
|
|
has_auth: bool,
|
|
user_role: str = None,
|
|
) -> GateResult:
|
|
findings = []
|
|
sensitive_patterns = ["/compliance", "/admin", "/tenant", "/users"]
|
|
is_sensitive = any(p in endpoint for p in sensitive_patterns)
|
|
if is_sensitive and not has_auth:
|
|
findings.append(SecurityFinding(
|
|
id=f"API-{len(self._findings)+1}",
|
|
category="authentication",
|
|
severity=Severity.CRITICAL,
|
|
title="وصول لنقطة حساسة بدون مصادقة",
|
|
description=f"{method} {endpoint} بدون JWT token",
|
|
recommendation="إضافة مصادقة إلزامية",
|
|
))
|
|
if is_sensitive and user_role and user_role not in ("owner", "admin"):
|
|
findings.append(SecurityFinding(
|
|
id=f"API-{len(self._findings)+2}",
|
|
category="authorization",
|
|
severity=Severity.HIGH,
|
|
title="صلاحيات غير كافية",
|
|
description=f"الدور {user_role} لا يملك وصول لـ {endpoint}",
|
|
recommendation="تحقق من صلاحيات المستخدم",
|
|
))
|
|
self._findings.extend(findings)
|
|
critical = any(f.severity == Severity.CRITICAL for f in findings)
|
|
return GateResult(
|
|
decision=GateDecision.BLOCK if critical else GateDecision.PASS,
|
|
findings=findings,
|
|
summary=f"فحص API: {method} {endpoint}",
|
|
)
|
|
|
|
def check_release(self) -> GateResult:
|
|
unresolved_critical = [
|
|
f for f in self._findings
|
|
if f.severity == Severity.CRITICAL and not f.resolved
|
|
]
|
|
unresolved_high = [
|
|
f for f in self._findings
|
|
if f.severity == Severity.HIGH and not f.resolved
|
|
]
|
|
if unresolved_critical:
|
|
decision = GateDecision.BLOCK
|
|
summary = f"محظور: {len(unresolved_critical)} مشاكل حرجة غير محلولة"
|
|
elif unresolved_high:
|
|
decision = GateDecision.WARN
|
|
summary = f"تحذير: {len(unresolved_high)} مشاكل عالية غير محلولة"
|
|
else:
|
|
decision = GateDecision.PASS
|
|
summary = "مرخص للإطلاق"
|
|
return GateResult(
|
|
decision=decision,
|
|
findings=unresolved_critical + unresolved_high,
|
|
summary=summary,
|
|
)
|
|
|
|
def get_all_findings(
|
|
self, severity: Severity = None, resolved: bool = None
|
|
) -> list[SecurityFinding]:
|
|
results = self._findings
|
|
if severity:
|
|
results = [f for f in results if f.severity == severity]
|
|
if resolved is not None:
|
|
results = [f for f in results if f.resolved == resolved]
|
|
return results
|
|
|
|
def resolve_finding(self, finding_id: str) -> bool:
|
|
for f in self._findings:
|
|
if f.id == finding_id:
|
|
f.resolved = True
|
|
return True
|
|
return False
|
|
|
|
|
|
# Global singleton
|
|
security_gate = SecurityGate()
|