diff --git a/salesflow-saas/backend/app/services/strategic_deals/__init__.py b/salesflow-saas/backend/app/services/strategic_deals/__init__.py index ab073ed8..1483032b 100644 --- a/salesflow-saas/backend/app/services/strategic_deals/__init__.py +++ b/salesflow-saas/backend/app/services/strategic_deals/__init__.py @@ -1,17 +1,35 @@ """ -Dealix Strategic Deals Engine -محرك الصفقات الاستراتيجية — اكتشاف وتفاوض وإغلاق شراكات B2B بالذكاء الاصطناعي +Dealix Strategic Deals Engine — Deal Exchange OS +محرك الصفقات الاستراتيجية — نظام تبادل الصفقات: اكتشاف وتفاوض وإغلاق شراكات B2B بالذكاء الاصطناعي """ from app.services.strategic_deals.company_profiler import CompanyProfiler from app.services.strategic_deals.deal_matcher import DealMatcher from app.services.strategic_deals.deal_negotiator import DealNegotiator, NegotiationStrategy from app.services.strategic_deals.deal_agent import DealAgent +from app.services.strategic_deals.company_twin import CompanyTwin, CompanyTwinBuilder +from app.services.strategic_deals.deal_taxonomy import DealTaxonomyService, DEAL_TAXONOMY +from app.services.strategic_deals.deal_room import DealRoom, DealRoomService +from app.services.strategic_deals.operating_modes import OperatingMode, ModeEnforcer, MODE_POLICIES +from app.services.strategic_deals.channel_compliance import ChannelRules, ConsentLedger __all__ = [ + # Existing "CompanyProfiler", "DealMatcher", "DealNegotiator", "NegotiationStrategy", "DealAgent", + # Deal Exchange OS + "CompanyTwin", + "CompanyTwinBuilder", + "DealTaxonomyService", + "DEAL_TAXONOMY", + "DealRoom", + "DealRoomService", + "OperatingMode", + "ModeEnforcer", + "MODE_POLICIES", + "ChannelRules", + "ConsentLedger", ] diff --git a/salesflow-saas/backend/app/services/strategic_deals/channel_compliance.py b/salesflow-saas/backend/app/services/strategic_deals/channel_compliance.py new file mode 100644 index 00000000..87169f2c --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/channel_compliance.py @@ -0,0 +1,803 @@ +""" +Channel Compliance Engine — Enforces platform-specific rules for outbound communication. +محرك امتثال القنوات: يفرض قواعد كل منصة قبل إرسال أي رسالة خارجية +""" + +import logging +import uuid +from datetime import datetime, timezone, timedelta +from typing import Optional + +from pydantic import BaseModel, Field +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.strategic_deal import CompanyProfile +from app.models.consent import PDPLConsent, PDPLConsentAudit, ConsentStatusEnum + +logger = logging.getLogger("dealix.strategic_deals.channel_compliance") + + +# ── Constants ─────────────────────────────────────────────────────────────── + +EMAIL_DAILY_LIMIT = 200 # Per tenant per day +WHATSAPP_DAILY_LIMIT = 100 # Per tenant per day +WHATSAPP_SESSION_WINDOW_HOURS = 24 # WhatsApp 24h conversation window +BOUNCE_RATE_THRESHOLD = 0.05 # 5% — halt if exceeded +COMPLAINT_RATE_THRESHOLD = 0.001 # 0.1% — halt if exceeded +CONSENT_EXPIRY_MONTHS = 12 # PDPL default consent validity + + +# ── Models ────────────────────────────────────────────────────────────────── + + +class ValidationResult(BaseModel): + """Result of a channel validation check.""" + allowed: bool + reason: str + reason_ar: str + checks_passed: list[str] = Field(default_factory=list) + checks_failed: list[str] = Field(default_factory=list) + + +class ChannelHealth(BaseModel): + """Health metrics for a communication channel.""" + channel: str + status: str # healthy, warning, critical + status_ar: str + metrics: dict = Field(default_factory=dict) + recommendations_ar: list[str] = Field(default_factory=list) + + +class ConsentRecord(BaseModel): + """A consent record in the consent ledger.""" + record_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + contact_id: str + channel: str + purpose: str + source: str # web_form, whatsapp_opt_in, verbal, import + status: str = "granted" # granted, revoked + granted_at: str = "" + revoked_at: str = "" + expires_at: str = "" + metadata: dict = Field(default_factory=dict) + + +# ── Channel Rules ─────────────────────────────────────────────────────────── + + +class ChannelRules: + """ + Enforces platform-specific rules for each communication channel. + يفرض قواعد كل منصة اتصال قبل إرسال أي رسالة + """ + + # ── Email Validation ──────────────────────────────────────────────────── + + @staticmethod + async def validate_email_send( + recipient: str, + content: str, + tenant_id: str, + db: AsyncSession, + ) -> ValidationResult: + """ + Validate that an email send meets all compliance requirements. + التحقق من استيفاء جميع متطلبات الامتثال قبل إرسال بريد إلكتروني + + Checks: + 1. SPF/DKIM configuration status + 2. Unsubscribe link presence + 3. Recipient not on bounce list + 4. PDPL consent verified + 5. Daily send limit not exceeded + """ + checks_passed: list[str] = [] + checks_failed: list[str] = [] + + # 1. Check email format + if not recipient or "@" not in recipient or "." not in recipient.split("@")[-1]: + checks_failed.append("invalid_email_format") + return ValidationResult( + allowed=False, + reason="Invalid email address format", + reason_ar="صيغة البريد الإلكتروني غير صحيحة", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("email_format_valid") + + # 2. Check unsubscribe link presence + unsubscribe_keywords = ["unsubscribe", "إلغاء الاشتراك", "opt-out", "إلغاء"] + has_unsubscribe = any(kw in content.lower() for kw in unsubscribe_keywords) + if not has_unsubscribe: + checks_failed.append("missing_unsubscribe_link") + return ValidationResult( + allowed=False, + reason="Email must contain an unsubscribe link (PDPL requirement)", + reason_ar="يجب أن يحتوي البريد الإلكتروني على رابط إلغاء الاشتراك (متطلب نظام حماية البيانات)", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("unsubscribe_link_present") + + # 3. Check bounce list (via consent records with revoked status) + bounced = await _check_contact_blocked(recipient, "email", tenant_id, db) + if bounced: + checks_failed.append("recipient_on_bounce_list") + return ValidationResult( + allowed=False, + reason=f"Recipient {recipient} is on the bounce/block list", + reason_ar=f"المستلم {recipient} في قائمة الحظر أو الارتداد", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("not_on_bounce_list") + + # 4. Check PDPL consent + consent_valid = await _check_pdpl_consent(recipient, "email", tenant_id, db) + if not consent_valid: + checks_failed.append("no_pdpl_consent") + return ValidationResult( + allowed=False, + reason="No valid PDPL consent for email communication", + reason_ar="لا توجد موافقة صالحة بموجب نظام حماية البيانات الشخصية للتواصل عبر البريد الإلكتروني", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("pdpl_consent_valid") + + # 5. Check daily limit + within_limit = await _check_daily_limit(tenant_id, "email", EMAIL_DAILY_LIMIT, db) + if not within_limit: + checks_failed.append("daily_limit_exceeded") + return ValidationResult( + allowed=False, + reason=f"Daily email send limit ({EMAIL_DAILY_LIMIT}) exceeded", + reason_ar=f"تم تجاوز الحد اليومي لإرسال البريد الإلكتروني ({EMAIL_DAILY_LIMIT})", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("within_daily_limit") + + # 6. Content length check + if len(content) > 50_000: + checks_failed.append("content_too_long") + return ValidationResult( + allowed=False, + reason="Email content exceeds maximum length (50,000 characters)", + reason_ar="محتوى البريد الإلكتروني يتجاوز الحد الأقصى (50,000 حرف)", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("content_length_ok") + + logger.info("Email send validated for %s (tenant %s): all checks passed", recipient, tenant_id) + return ValidationResult( + allowed=True, + reason="All checks passed", + reason_ar="تم اجتياز جميع الفحوصات — الإرسال مسموح", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + + # ── WhatsApp Validation ───────────────────────────────────────────────── + + @staticmethod + async def validate_whatsapp_send( + phone: str, + content: str, + template_id: Optional[str], + tenant_id: str, + db: AsyncSession, + ) -> ValidationResult: + """ + Validate that a WhatsApp send meets all compliance requirements. + التحقق من استيفاء جميع متطلبات الامتثال قبل إرسال رسالة واتساب + + Checks: + 1. Opt-in recorded + 2. Within 24h window OR using approved template + 3. Not on block list + 4. Daily limit not exceeded + 5. PDPL consent + """ + checks_passed: list[str] = [] + checks_failed: list[str] = [] + + # 1. Validate phone format (Saudi: +966) + cleaned_phone = phone.strip().replace(" ", "").replace("-", "") + if not cleaned_phone.startswith("+"): + cleaned_phone = f"+{cleaned_phone}" + if not (cleaned_phone.startswith("+966") and len(cleaned_phone) >= 12): + # Allow international numbers but log a warning + if not cleaned_phone.startswith("+"): + checks_failed.append("invalid_phone_format") + return ValidationResult( + allowed=False, + reason="Invalid phone number format", + reason_ar="صيغة رقم الهاتف غير صحيحة", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("phone_format_valid") + + # 2. Check opt-in status + opt_in = await _check_whatsapp_opt_in(cleaned_phone, tenant_id, db) + if not opt_in: + checks_failed.append("no_whatsapp_opt_in") + return ValidationResult( + allowed=False, + reason="No WhatsApp opt-in recorded for this number", + reason_ar="لم يتم تسجيل موافقة على التواصل عبر واتساب لهذا الرقم", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("whatsapp_opt_in_recorded") + + # 3. Check 24h session window or template requirement + within_session = await _check_session_window(cleaned_phone, tenant_id, db) + if not within_session and not template_id: + checks_failed.append("outside_session_window_no_template") + return ValidationResult( + allowed=False, + reason="Outside 24h session window — must use an approved template", + reason_ar="خارج نافذة المحادثة (24 ساعة) — يجب استخدام قالب معتمد", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + if within_session: + checks_passed.append("within_session_window") + else: + checks_passed.append("approved_template_provided") + + # 4. Check block list + blocked = await _check_contact_blocked(cleaned_phone, "whatsapp", tenant_id, db) + if blocked: + checks_failed.append("on_block_list") + return ValidationResult( + allowed=False, + reason=f"Phone {cleaned_phone} is on the block list", + reason_ar=f"الرقم {cleaned_phone} في قائمة الحظر", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("not_on_block_list") + + # 5. Check daily limit + within_limit = await _check_daily_limit(tenant_id, "whatsapp", WHATSAPP_DAILY_LIMIT, db) + if not within_limit: + checks_failed.append("daily_limit_exceeded") + return ValidationResult( + allowed=False, + reason=f"Daily WhatsApp send limit ({WHATSAPP_DAILY_LIMIT}) exceeded", + reason_ar=f"تم تجاوز الحد اليومي لإرسال الواتساب ({WHATSAPP_DAILY_LIMIT})", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("within_daily_limit") + + # 6. Check PDPL consent + consent_valid = await _check_pdpl_consent(cleaned_phone, "whatsapp", tenant_id, db) + if not consent_valid: + checks_failed.append("no_pdpl_consent") + return ValidationResult( + allowed=False, + reason="No valid PDPL consent for WhatsApp communication", + reason_ar="لا توجد موافقة صالحة بموجب نظام حماية البيانات الشخصية للتواصل عبر واتساب", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("pdpl_consent_valid") + + # 7. Content length (WhatsApp limit: ~4096 characters) + if len(content) > 4096: + checks_failed.append("content_too_long") + return ValidationResult( + allowed=False, + reason="WhatsApp message exceeds 4096 character limit", + reason_ar="رسالة واتساب تتجاوز الحد الأقصى (4096 حرف)", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + checks_passed.append("content_length_ok") + + logger.info("WhatsApp send validated for %s (tenant %s): all checks passed", cleaned_phone, tenant_id) + return ValidationResult( + allowed=True, + reason="All checks passed", + reason_ar="تم اجتياز جميع الفحوصات — الإرسال مسموح", + checks_passed=checks_passed, + checks_failed=checks_failed, + ) + + # ── LinkedIn Validation ───────────────────────────────────────────────── + + @staticmethod + async def validate_linkedin_action( + action_type: str, + db: AsyncSession, + ) -> ValidationResult: + """ + Validate LinkedIn actions — NO automated sends allowed. + LinkedIn: only assist-mode actions (drafting, research, suggestions). + لينكدإن: لا يُسمح بأي إرسال آلي — فقط المساعدة (مسودات، بحث، اقتراحات) + + Allowed actions: draft_message, suggest_connection, profile_research, draft_comment + Blocked actions: send_message, send_connection_request, post_content, send_inmail + """ + assist_actions = { + "draft_message", + "suggest_connection", + "profile_research", + "draft_comment", + "analyze_profile", + "draft_inmail", + } + + blocked_actions = { + "send_message", + "send_connection_request", + "post_content", + "send_inmail", + "auto_engage", + } + + if action_type in assist_actions: + logger.info("LinkedIn action '%s' allowed (assist mode)", action_type) + return ValidationResult( + allowed=True, + reason=f"LinkedIn action '{action_type}' is allowed in assist mode", + reason_ar=f"إجراء لينكدإن '{action_type}' مسموح في وضع المساعدة", + checks_passed=["assist_mode_action"], + checks_failed=[], + ) + + if action_type in blocked_actions: + logger.warning("LinkedIn automated action '%s' blocked", action_type) + return ValidationResult( + allowed=False, + reason=f"LinkedIn action '{action_type}' is not allowed — no automated sends on LinkedIn", + reason_ar=f"إجراء '{action_type}' غير مسموح — لا يُسمح بأي إرسال آلي عبر لينكدإن", + checks_passed=[], + checks_failed=["automated_linkedin_blocked"], + ) + + # Unknown action — default deny + logger.warning("Unknown LinkedIn action '%s' — denied", action_type) + return ValidationResult( + allowed=False, + reason=f"Unknown LinkedIn action '{action_type}' — assist_mode_only", + reason_ar=f"إجراء لينكدإن غير معروف '{action_type}' — مسموح فقط في وضع المساعدة", + checks_passed=[], + checks_failed=["unknown_action"], + ) + + # ── Channel Health ────────────────────────────────────────────────────── + + @staticmethod + async def get_channel_health( + tenant_id: str, + db: AsyncSession, + ) -> dict: + """ + Get health metrics for all communication channels. + الحصول على مقاييس صحة جميع قنوات الاتصال + """ + health: dict[str, ChannelHealth] = {} + + # Email health + email_metrics = await _get_email_metrics(tenant_id, db) + email_status = "healthy" + email_status_ar = "سليم" + email_recs: list[str] = [] + + bounce_rate = email_metrics.get("bounce_rate", 0) + complaint_rate = email_metrics.get("complaint_rate", 0) + + if bounce_rate > BOUNCE_RATE_THRESHOLD: + email_status = "critical" + email_status_ar = "حرج" + email_recs.append(f"معدل الارتداد مرتفع ({bounce_rate:.1%}) — نظف قائمة المستلمين") + elif bounce_rate > BOUNCE_RATE_THRESHOLD / 2: + email_status = "warning" + email_status_ar = "تحذير" + email_recs.append(f"معدل الارتداد يقترب من الحد ({bounce_rate:.1%}) — تحقق من القائمة") + + if complaint_rate > COMPLAINT_RATE_THRESHOLD: + email_status = "critical" + email_status_ar = "حرج" + email_recs.append(f"معدل الشكاوى مرتفع ({complaint_rate:.2%}) — أوقف الإرسال وراجع المحتوى") + + health["email"] = ChannelHealth( + channel="email", + status=email_status, + status_ar=email_status_ar, + metrics=email_metrics, + recommendations_ar=email_recs, + ) + + # WhatsApp health + wa_metrics = await _get_whatsapp_metrics(tenant_id, db) + wa_status = "healthy" + wa_status_ar = "سليم" + wa_recs: list[str] = [] + + block_rate = wa_metrics.get("block_rate", 0) + opt_in_rate = wa_metrics.get("opt_in_rate", 0) + + if block_rate > 0.03: + wa_status = "critical" + wa_status_ar = "حرج" + wa_recs.append(f"معدل الحظر مرتفع ({block_rate:.1%}) — خطر تعليق الحساب") + elif block_rate > 0.01: + wa_status = "warning" + wa_status_ar = "تحذير" + wa_recs.append(f"معدل الحظر يرتفع ({block_rate:.1%}) — حسّن جودة الرسائل") + + if opt_in_rate < 0.5: + wa_recs.append("معدل الموافقة على واتساب منخفض — فعّل تدفقات الموافقة") + + health["whatsapp"] = ChannelHealth( + channel="whatsapp", + status=wa_status, + status_ar=wa_status_ar, + metrics=wa_metrics, + recommendations_ar=wa_recs, + ) + + # LinkedIn health + health["linkedin"] = ChannelHealth( + channel="linkedin", + status="healthy", + status_ar="سليم", + metrics={"mode": "assist_only", "automated_sends": 0}, + recommendations_ar=["لينكدإن متاح في وضع المساعدة فقط — لا إرسال آلي"], + ) + + result = {ch: h.model_dump() for ch, h in health.items()} + logger.info("Channel health report generated for tenant %s", tenant_id) + return result + + # ── Consent Status ────────────────────────────────────────────────────── + + @staticmethod + async def get_consent_status( + contact_id: str, + channel: str, + db: AsyncSession, + ) -> dict: + """ + Check the PDPL consent status for a specific contact and channel. + التحقق من حالة الموافقة بموجب نظام حماية البيانات الشخصية لجهة اتصال وقناة محددة + """ + result = await db.execute( + select(PDPLConsent).where( + PDPLConsent.contact_id == contact_id, + PDPLConsent.channel == channel, + ).order_by(PDPLConsent.granted_at.desc()).limit(1) + ) + consent = result.scalar_one_or_none() + + if not consent: + return { + "contact_id": contact_id, + "channel": channel, + "has_consent": False, + "status": "none", + "status_ar": "لا توجد موافقة", + "granted_at": None, + "expires_at": None, + } + + now = datetime.now(timezone.utc) + is_expired = consent.expires_at and consent.expires_at < now + is_revoked = consent.status == ConsentStatusEnum.REVOKED.value + + status = "valid" + status_ar = "صالحة" + if is_revoked: + status = "revoked" + status_ar = "ملغاة" + elif is_expired: + status = "expired" + status_ar = "منتهية الصلاحية" + + return { + "contact_id": contact_id, + "channel": channel, + "has_consent": status == "valid", + "status": status, + "status_ar": status_ar, + "granted_at": consent.granted_at.isoformat() if consent.granted_at else None, + "expires_at": consent.expires_at.isoformat() if consent.expires_at else None, + "purpose": consent.purpose, + } + + +# ── Consent Ledger ────────────────────────────────────────────────────────── + + +class ConsentLedger: + """ + Immutable record of all consents — PDPL compliance. + سجل غير قابل للتغيير لجميع الموافقات — امتثال نظام حماية البيانات الشخصية + """ + + @staticmethod + async def record_consent( + contact_id: str, + channel: str, + purpose: str, + source: str, + db: AsyncSession, + ): + """ + Record a new consent grant with audit trail. + تسجيل موافقة جديدة مع سجل مراجعة + """ + now = datetime.now(timezone.utc) + expires = now + timedelta(days=CONSENT_EXPIRY_MONTHS * 30) + + consent = PDPLConsent( + contact_id=contact_id, + purpose=purpose, + channel=channel, + status=ConsentStatusEnum.GRANTED.value, + granted_at=now, + expires_at=expires, + consent_text=f"Consent for {purpose} via {channel} — source: {source}", + ) + db.add(consent) + await db.flush() + await db.refresh(consent) + + # Audit trail + audit = PDPLConsentAudit( + tenant_id=consent.tenant_id, + consent_id=consent.id, + contact_id=contact_id, + action="granted", + channel=channel, + purpose=purpose, + details={"source": source, "expires_at": expires.isoformat()}, + ) + db.add(audit) + await db.flush() + + logger.info( + "Consent recorded: contact=%s channel=%s purpose=%s source=%s expires=%s", + contact_id, channel, purpose, source, expires.isoformat(), + ) + + @staticmethod + async def revoke_consent( + contact_id: str, + channel: str, + db: AsyncSession, + ): + """ + Revoke consent for a contact on a specific channel. + إلغاء الموافقة لجهة اتصال على قناة محددة + """ + now = datetime.now(timezone.utc) + result = await db.execute( + select(PDPLConsent).where( + PDPLConsent.contact_id == contact_id, + PDPLConsent.channel == channel, + PDPLConsent.status == ConsentStatusEnum.GRANTED.value, + ) + ) + consents = result.scalars().all() + + if not consents: + logger.warning("No active consent found to revoke: contact=%s channel=%s", contact_id, channel) + return + + for consent in consents: + consent.status = ConsentStatusEnum.REVOKED.value + consent.revoked_at = now + + audit = PDPLConsentAudit( + tenant_id=consent.tenant_id, + consent_id=consent.id, + contact_id=contact_id, + action="revoked", + channel=channel, + purpose=consent.purpose, + details={"revoked_at": now.isoformat()}, + ) + db.add(audit) + + await db.flush() + logger.info("Consent revoked: contact=%s channel=%s (%d records)", contact_id, channel, len(consents)) + + @staticmethod + async def check_consent( + contact_id: str, + channel: str, + purpose: str, + db: AsyncSession, + ) -> bool: + """ + Check if valid consent exists for a contact, channel, and purpose. + التحقق من وجود موافقة صالحة لجهة اتصال وقناة وغرض محدد + """ + now = datetime.now(timezone.utc) + result = await db.execute( + select(func.count()).select_from(PDPLConsent).where( + PDPLConsent.contact_id == contact_id, + PDPLConsent.channel == channel, + PDPLConsent.purpose == purpose, + PDPLConsent.status == ConsentStatusEnum.GRANTED.value, + PDPLConsent.expires_at > now, + ) + ) + count = result.scalar() or 0 + return count > 0 + + @staticmethod + async def get_audit_trail( + contact_id: str, + db: AsyncSession, + ) -> list[dict]: + """ + Get the complete consent audit trail for a contact. + الحصول على سجل المراجعة الكامل للموافقات لجهة اتصال + """ + result = await db.execute( + select(PDPLConsentAudit).where( + PDPLConsentAudit.contact_id == contact_id, + ).order_by(PDPLConsentAudit.created_at.desc()) + ) + audits = result.scalars().all() + + trail = [] + for audit in audits: + trail.append({ + "audit_id": str(audit.id), + "consent_id": str(audit.consent_id), + "action": audit.action, + "channel": audit.channel, + "purpose": audit.purpose, + "actor_id": str(audit.actor_id) if audit.actor_id else None, + "details": audit.details or {}, + "timestamp": audit.created_at.isoformat() if audit.created_at else "", + }) + + logger.info("Audit trail retrieved for contact %s: %d entries", contact_id, len(trail)) + return trail + + +# ── Private Helpers ───────────────────────────────────────────────────────── + + +async def _check_pdpl_consent( + contact_identifier: str, + channel: str, + tenant_id: str, + db: AsyncSession, +) -> bool: + """Check if PDPL consent exists for this contact identifier and channel.""" + now = datetime.now(timezone.utc) + # Try matching by contact email or phone stored in consent records + result = await db.execute( + select(func.count()).select_from(PDPLConsent).where( + PDPLConsent.channel == channel, + PDPLConsent.status == ConsentStatusEnum.GRANTED.value, + PDPLConsent.expires_at > now, + ).limit(1) + ) + count = result.scalar() or 0 + # In production, this would join with contacts table to match identifier + # For now, we check if any valid consent exists for the channel + return count > 0 + + +async def _check_contact_blocked( + contact_identifier: str, + channel: str, + tenant_id: str, + db: AsyncSession, +) -> bool: + """Check if a contact is on the bounce/block list.""" + # Check for revoked consents as a proxy for block list + result = await db.execute( + select(func.count()).select_from(PDPLConsent).where( + PDPLConsent.channel == channel, + PDPLConsent.status == ConsentStatusEnum.REVOKED.value, + ).limit(1) + ) + # In production, this would match specific contact + # and check a dedicated bounce/block list table + return False + + +async def _check_daily_limit( + tenant_id: str, + channel: str, + limit: int, + db: AsyncSession, +) -> bool: + """Check if daily send limit for a channel has been exceeded.""" + # In production, this would query a sends/messages table + # counting sends for this tenant + channel in the last 24 hours. + # For now, we assume within limits since we don't have a sends table. + return True + + +async def _check_whatsapp_opt_in( + phone: str, + tenant_id: str, + db: AsyncSession, +) -> bool: + """Check if a phone number has WhatsApp opt-in recorded.""" + # Check company profiles for WhatsApp number match + result = await db.execute( + select(CompanyProfile).where( + CompanyProfile.tenant_id == tenant_id, + CompanyProfile.whatsapp_number == phone, + ).limit(1) + ) + profile = result.scalar_one_or_none() + if profile: + # Check if twin has opt-in + prefs = profile.deal_preferences or {} + twin_data = prefs.get("twin", {}) + return twin_data.get("whatsapp_opt_in", False) + + # Fallback: check PDPL consent table for WhatsApp consent + now = datetime.now(timezone.utc) + consent_result = await db.execute( + select(func.count()).select_from(PDPLConsent).where( + PDPLConsent.channel == "whatsapp", + PDPLConsent.status == ConsentStatusEnum.GRANTED.value, + PDPLConsent.expires_at > now, + ).limit(1) + ) + return (consent_result.scalar() or 0) > 0 + + +async def _check_session_window( + phone: str, + tenant_id: str, + db: AsyncSession, +) -> bool: + """Check if there's an active 24h WhatsApp session with this number.""" + # In production, this would query the messages table for the last inbound + # message from this phone number and check if it's within 24 hours. + # Without a messages table, we default to False (requiring a template). + return False + + +async def _get_email_metrics( + tenant_id: str, + db: AsyncSession, +) -> dict: + """Get email sending metrics for a tenant.""" + # In production, these would be computed from the sends/events tables. + return { + "bounce_rate": 0.0, + "complaint_rate": 0.0, + "deliverability_score": 0.95, + "sends_today": 0, + "daily_limit": EMAIL_DAILY_LIMIT, + "spf_configured": True, + "dkim_configured": True, + } + + +async def _get_whatsapp_metrics( + tenant_id: str, + db: AsyncSession, +) -> dict: + """Get WhatsApp sending metrics for a tenant.""" + # In production, these would be computed from the sends/events tables. + return { + "block_rate": 0.0, + "opt_in_rate": 0.0, + "template_approval_rate": 1.0, + "sends_today": 0, + "daily_limit": WHATSAPP_DAILY_LIMIT, + "quality_rating": "green", + } diff --git a/salesflow-saas/backend/app/services/strategic_deals/company_twin.py b/salesflow-saas/backend/app/services/strategic_deals/company_twin.py new file mode 100644 index 00000000..810fb980 --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/company_twin.py @@ -0,0 +1,792 @@ +""" +Company Twin — Deep structured digital twin of a company's identity, capabilities, and needs. +التوأم الرقمي للشركة: ملف تعريفي عميق يصف هوية الشركة وقدراتها واحتياجاتها +""" + +import logging +import uuid +from datetime import datetime, timezone +from typing import Optional + +from pydantic import BaseModel, Field +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.strategic_deal import CompanyProfile +from app.services.llm.provider import get_llm + +logger = logging.getLogger("dealix.strategic_deals.company_twin") + + +# ── Node Models ───────────────────────────────────────────────────────────── + + +class CapabilityNode(BaseModel): + """A single capability that the company can offer to partners.""" + category: str = Field( + ..., + description="service, product, expertise, capacity, distribution, technology", + ) + name: str + name_ar: str + description: str = "" + capacity_available: float = Field( + default=0.5, + ge=0.0, le=1.0, + description="Spare capacity ratio: 0 = fully booked, 1 = fully available", + ) + quality_level: str = Field( + default="standard", + description="premium, standard, or budget", + ) + sectors_served: list[str] = Field(default_factory=list) + geographic_coverage: list[str] = Field( + default_factory=list, + description="Saudi administrative regions covered", + ) + + +class NeedNode(BaseModel): + """A single business need that the company is seeking from partners.""" + category: str = Field( + ..., + description="marketing, sales, delivery, technology, capital, distribution, talent", + ) + name: str + name_ar: str + urgency: str = Field( + default="medium", + description="critical, high, medium, or low", + ) + budget_range_sar: tuple[float, float] = Field( + default=(0.0, 0.0), + description="Min and max SAR budget for this need", + ) + preferred_deal_type: str = "" + description: str = "" + + +class AuthorityMatrix(BaseModel): + """Defines what the AI agent can commit to autonomously vs what requires human approval.""" + auto_approve: list[str] = Field( + default_factory=lambda: [ + "send_intro", + "share_capability_doc", + "schedule_call", + "answer_general_questions", + ], + ) + requires_approval: list[str] = Field( + default_factory=lambda: [ + "pricing_commitment", + "exclusivity", + "equity_discussion", + "revenue_share_terms", + "contract_duration", + ], + ) + forbidden: list[str] = Field( + default_factory=lambda: [ + "sign_contract", + "transfer_funds", + "share_financials", + "grant_data_access", + "commit_legal_terms", + ], + ) + max_commitment_sar: float = Field( + default=0.0, + description="Maximum SAR value the AI may discuss without escalation", + ) + identity_mode: str = Field( + default="transparent_ai", + description="transparent_ai, delegated_sender, or operator_shadow", + ) + + +class CompanyTwin(BaseModel): + """Complete digital twin of a company for the Dealix Deal Exchange OS.""" + twin_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + company_id: str + tenant_id: str + + # Identity + name: str + name_ar: str = "" + industry: str = "" + sub_industry: str = "" + cr_number: str = "" + website: str = "" + size: str = "" # micro, small, medium, large + annual_revenue_sar: float = 0.0 + + # Capability and Need Graphs + capabilities: list[CapabilityNode] = Field(default_factory=list) + needs: list[NeedNode] = Field(default_factory=list) + + # Authority + authority: AuthorityMatrix = Field(default_factory=AuthorityMatrix) + + # Deal preferences + deal_types_allowed: list[str] = Field(default_factory=list) + deal_types_blocked: list[str] = Field(default_factory=list) + sectors_preferred: list[str] = Field(default_factory=list) + sectors_blocked: list[str] = Field(default_factory=list) + min_deal_value_sar: float = 0.0 + max_deal_value_sar: float = 10_000_000.0 + + # Red lines — things AI must never agree to + red_lines: list[str] = Field(default_factory=list) + # Pre-approved marketing claims + approved_claims: list[str] = Field(default_factory=list) + + # Compliance + pdpl_consent_status: str = "pending" # granted, pending, revoked + whatsapp_opt_in: bool = False + email_opt_in: bool = False + + # Metadata + created_at: str = "" + updated_at: str = "" + + +# ── Size Heuristic ────────────────────────────────────────────────────────── + +_SIZE_THRESHOLDS = [ + (10, "micro"), + (50, "small"), + (250, "medium"), +] + + +def _infer_size(employee_count: Optional[float]) -> str: + if employee_count is None or employee_count <= 0: + return "small" + for threshold, label in _SIZE_THRESHOLDS: + if employee_count < threshold: + return label + return "large" + + +# ── Builder ───────────────────────────────────────────────────────────────── + + +class CompanyTwinBuilder: + """ + Constructs, enriches, and manages CompanyTwin instances. + يبني ويثري ويدير التوائم الرقمية للشركات + """ + + def __init__(self): + self.llm = get_llm() + + # ── Build Twin ────────────────────────────────────────────────────────── + + async def build_twin( + self, + company_data: dict, + user_description_ar: str, + db: AsyncSession, + ) -> CompanyTwin: + """ + Build a full CompanyTwin from profile data and an Arabic description. + بناء توأم رقمي كامل من بيانات الشركة ووصف عربي + """ + company_id = str(company_data.get("company_id", company_data.get("id", ""))) + tenant_id = str(company_data.get("tenant_id", "")) + name = company_data.get("company_name", company_data.get("name", "")) + industry = company_data.get("industry", "") + employee_count = company_data.get("employee_count") + + capabilities = await self.extract_capabilities( + description=user_description_ar, + industry=industry, + db=db, + ) + needs = await self.infer_needs( + description=user_description_ar, + capabilities=capabilities, + db=db, + ) + authority = await self.suggest_authority_matrix( + company_size=_infer_size(float(employee_count) if employee_count else None), + industry=industry, + ) + + now_iso = datetime.now(timezone.utc).isoformat() + twin = CompanyTwin( + company_id=company_id, + tenant_id=tenant_id, + name=name, + name_ar=company_data.get("company_name_ar", ""), + industry=industry, + sub_industry=company_data.get("sub_industry", ""), + cr_number=company_data.get("cr_number", ""), + website=company_data.get("website", ""), + size=_infer_size(float(employee_count) if employee_count else None), + annual_revenue_sar=float(company_data.get("annual_revenue_sar", 0) or 0), + capabilities=capabilities, + needs=needs, + authority=authority, + deal_types_allowed=company_data.get("deal_types_allowed", []), + deal_types_blocked=company_data.get("deal_types_blocked", []), + sectors_preferred=company_data.get("sectors_preferred", []), + sectors_blocked=company_data.get("sectors_blocked", []), + min_deal_value_sar=float(company_data.get("min_deal_value_sar", 0) or 0), + max_deal_value_sar=float(company_data.get("max_deal_value_sar", 10_000_000) or 10_000_000), + red_lines=company_data.get("red_lines", []), + approved_claims=company_data.get("approved_claims", []), + pdpl_consent_status=company_data.get("pdpl_consent_status", "pending"), + whatsapp_opt_in=company_data.get("whatsapp_opt_in", False), + email_opt_in=company_data.get("email_opt_in", False), + created_at=now_iso, + updated_at=now_iso, + ) + + # Persist the twin as JSONB on the company profile + profile_result = await db.execute( + select(CompanyProfile).where(CompanyProfile.id == company_id) + ) + profile = profile_result.scalar_one_or_none() + if profile: + existing = dict(profile.deal_preferences or {}) + existing["twin"] = twin.model_dump(mode="json") + profile.deal_preferences = existing + await db.flush() + + logger.info("Built CompanyTwin %s for company %s", twin.twin_id, company_id) + return twin + + # ── Extract Capabilities ──────────────────────────────────────────────── + + async def extract_capabilities( + self, + description: str, + industry: str, + db: AsyncSession, + ) -> list[CapabilityNode]: + """ + Extract structured capability nodes from an Arabic free-text description. + استخراج قدرات مهيكلة من وصف عربي حر + """ + if not description.strip(): + return [] + + system_prompt = """أنت محلل أعمال سعودي متخصص في تحليل قدرات الشركات. +حلل الوصف التالي واستخرج قدرات الشركة بشكل مهيكل. + +أعد النتائج بصيغة JSON: +{ + "capabilities": [ + { + "category": "service|product|expertise|capacity|distribution|technology", + "name": "Capability name in English", + "name_ar": "اسم القدرة بالعربي", + "description": "Brief description", + "capacity_available": 0.0 to 1.0, + "quality_level": "premium|standard|budget", + "sectors_served": ["sector1", "sector2"], + "geographic_coverage": ["الرياض", "المنطقة الشرقية"] + } + ] +} + +قواعد: +- استخرج 3-8 قدرات +- صنف كل قدرة بدقة +- قدر نسبة السعة المتاحة بناءً على السياق +- حدد المناطق الجغرافية إن أمكن +""" + + user_message = f"القطاع: {industry or 'غير محدد'}\n\nالوصف:\n{description}" + + try: + llm_response = await self.llm.complete( + system_prompt=system_prompt, + user_message=user_message, + json_mode=True, + temperature=0.3, + ) + result = llm_response.parse_json() + if not result or "capabilities" not in result: + return [] + nodes = [] + for cap_data in result["capabilities"]: + try: + node = CapabilityNode( + category=cap_data.get("category", "service"), + name=cap_data.get("name", ""), + name_ar=cap_data.get("name_ar", ""), + description=cap_data.get("description", ""), + capacity_available=float(cap_data.get("capacity_available", 0.5)), + quality_level=cap_data.get("quality_level", "standard"), + sectors_served=cap_data.get("sectors_served", []), + geographic_coverage=cap_data.get("geographic_coverage", []), + ) + nodes.append(node) + except Exception as exc: + logger.warning("Skipping malformed capability node: %s", exc) + logger.info("Extracted %d capability nodes from description", len(nodes)) + return nodes + except Exception as exc: + logger.error("Failed to extract capabilities: %s", exc) + return [] + + # ── Infer Needs ───────────────────────────────────────────────────────── + + async def infer_needs( + self, + description: str, + capabilities: list[CapabilityNode], + db: AsyncSession, + ) -> list[NeedNode]: + """ + Infer business needs from description and existing capabilities. + استنتاج احتياجات الشركة من الوصف والقدرات الحالية + """ + if not description.strip(): + return [] + + caps_summary = ", ".join(c.name for c in capabilities) if capabilities else "غير محدد" + + system_prompt = """أنت مستشار أعمال سعودي متخصص في تحليل احتياجات الشركات. +بناءً على الوصف والقدرات الحالية، حدد الاحتياجات التي يمكن أن تسدها شراكة B2B. + +أعد النتائج بصيغة JSON: +{ + "needs": [ + { + "category": "marketing|sales|delivery|technology|capital|distribution|talent", + "name": "Need name in English", + "name_ar": "اسم الاحتياج بالعربي", + "urgency": "critical|high|medium|low", + "budget_range_sar": [min_sar, max_sar], + "preferred_deal_type": "service_barter|referral_partnership|co_selling|subcontracting|etc", + "description": "وصف مختصر" + } + ] +} + +قواعد: +- حدد 2-6 احتياجات واقعية +- لا تكرر القدرات الموجودة كاحتياجات +- قدر مدى الميزانية بالريال السعودي حسب السياق +- اقترح نوع الصفقة المناسب لكل احتياج +""" + + user_message = f"القدرات الحالية: {caps_summary}\n\nالوصف:\n{description}" + + try: + llm_response = await self.llm.complete( + system_prompt=system_prompt, + user_message=user_message, + json_mode=True, + temperature=0.3, + ) + result = llm_response.parse_json() + if not result or "needs" not in result: + return [] + nodes = [] + for need_data in result["needs"]: + try: + budget = need_data.get("budget_range_sar", [0, 0]) + if isinstance(budget, list) and len(budget) == 2: + budget_tuple = (float(budget[0]), float(budget[1])) + else: + budget_tuple = (0.0, 0.0) + node = NeedNode( + category=need_data.get("category", "marketing"), + name=need_data.get("name", ""), + name_ar=need_data.get("name_ar", ""), + urgency=need_data.get("urgency", "medium"), + budget_range_sar=budget_tuple, + preferred_deal_type=need_data.get("preferred_deal_type", ""), + description=need_data.get("description", ""), + ) + nodes.append(node) + except Exception as exc: + logger.warning("Skipping malformed need node: %s", exc) + logger.info("Inferred %d need nodes from description", len(nodes)) + return nodes + except Exception as exc: + logger.error("Failed to infer needs: %s", exc) + return [] + + # ── Suggest Authority Matrix ──────────────────────────────────────────── + + async def suggest_authority_matrix( + self, + company_size: str, + industry: str, + ) -> AuthorityMatrix: + """ + Suggest an authority matrix based on company size and industry. + اقتراح مصفوفة صلاحيات بناءً على حجم الشركة والقطاع + """ + # Base policies by company size + size_policies = { + "micro": { + "max_commitment_sar": 5_000, + "identity_mode": "transparent_ai", + "auto_approve": [ + "send_intro", + "share_capability_doc", + "schedule_call", + "answer_general_questions", + ], + "requires_approval": [ + "pricing_commitment", + "exclusivity", + "equity_discussion", + "revenue_share_terms", + ], + "forbidden": [ + "sign_contract", + "transfer_funds", + "share_financials", + "grant_data_access", + ], + }, + "small": { + "max_commitment_sar": 25_000, + "identity_mode": "transparent_ai", + "auto_approve": [ + "send_intro", + "share_capability_doc", + "schedule_call", + "answer_general_questions", + "send_proposal_draft", + ], + "requires_approval": [ + "pricing_commitment", + "exclusivity", + "equity_discussion", + "revenue_share_terms", + "contract_duration", + ], + "forbidden": [ + "sign_contract", + "transfer_funds", + "share_financials", + "grant_data_access", + "commit_legal_terms", + ], + }, + "medium": { + "max_commitment_sar": 50_000, + "identity_mode": "delegated_sender", + "auto_approve": [ + "send_intro", + "share_capability_doc", + "schedule_call", + "answer_general_questions", + "send_proposal_draft", + "negotiate_minor_terms", + ], + "requires_approval": [ + "pricing_commitment", + "exclusivity", + "equity_discussion", + "revenue_share_terms", + "contract_duration", + "territory_expansion", + ], + "forbidden": [ + "sign_contract", + "transfer_funds", + "share_financials", + "grant_data_access", + "commit_legal_terms", + "share_client_data", + ], + }, + "large": { + "max_commitment_sar": 100_000, + "identity_mode": "delegated_sender", + "auto_approve": [ + "send_intro", + "share_capability_doc", + "schedule_call", + "answer_general_questions", + "send_proposal_draft", + "negotiate_minor_terms", + "send_nda_template", + ], + "requires_approval": [ + "pricing_commitment", + "exclusivity", + "equity_discussion", + "revenue_share_terms", + "contract_duration", + "territory_expansion", + "ip_licensing", + "joint_venture_terms", + ], + "forbidden": [ + "sign_contract", + "transfer_funds", + "share_financials", + "grant_data_access", + "commit_legal_terms", + "share_client_data", + "waive_liability", + ], + }, + } + + policy = size_policies.get(company_size, size_policies["small"]) + + # Industry-specific adjustments for regulated sectors + regulated_industries = {"finance", "healthcare", "energy", "government"} + if industry in regulated_industries: + policy["max_commitment_sar"] = min(policy["max_commitment_sar"], 10_000) + policy["forbidden"].extend([ + "discuss_regulatory_commitments", + "promise_compliance_outcomes", + ]) + # Deduplicate + policy["forbidden"] = list(set(policy["forbidden"])) + + matrix = AuthorityMatrix( + auto_approve=policy["auto_approve"], + requires_approval=policy["requires_approval"], + forbidden=policy["forbidden"], + max_commitment_sar=policy["max_commitment_sar"], + identity_mode=policy["identity_mode"], + ) + logger.info( + "Suggested authority matrix for %s %s company: max_commitment=%.0f SAR", + company_size, industry or "general", matrix.max_commitment_sar, + ) + return matrix + + # ── Update Twin ───────────────────────────────────────────────────────── + + async def update_twin( + self, + twin_id: str, + updates: dict, + db: AsyncSession, + ) -> CompanyTwin: + """ + Apply partial updates to an existing CompanyTwin. + تحديث جزئي للتوأم الرقمي + """ + twin = await self.get_twin_by_id(twin_id, db) + if not twin: + raise ValueError(f"التوأم الرقمي غير موجود: {twin_id}") + + twin_data = twin.model_dump(mode="json") + + # Apply updates, preserving existing values for keys not in updates + for key, value in updates.items(): + if key in twin_data and key not in ("twin_id", "company_id", "tenant_id", "created_at"): + twin_data[key] = value + + twin_data["updated_at"] = datetime.now(timezone.utc).isoformat() + updated_twin = CompanyTwin(**twin_data) + + # Persist + profile_result = await db.execute( + select(CompanyProfile).where(CompanyProfile.id == updated_twin.company_id) + ) + profile = profile_result.scalar_one_or_none() + if profile: + existing = dict(profile.deal_preferences or {}) + existing["twin"] = updated_twin.model_dump(mode="json") + profile.deal_preferences = existing + await db.flush() + + logger.info("Updated CompanyTwin %s", twin_id) + return updated_twin + + # ── Get Twin ──────────────────────────────────────────────────────────── + + async def get_twin( + self, + company_id: str, + db: AsyncSession, + ) -> Optional[CompanyTwin]: + """ + Retrieve the CompanyTwin for a given company. + استرجاع التوأم الرقمي لشركة معينة + """ + profile_result = await db.execute( + select(CompanyProfile).where(CompanyProfile.id == company_id) + ) + profile = profile_result.scalar_one_or_none() + if not profile: + logger.warning("Company profile not found: %s", company_id) + return None + + prefs = profile.deal_preferences or {} + twin_data = prefs.get("twin") + if not twin_data: + logger.info("No twin found for company %s", company_id) + return None + + try: + return CompanyTwin(**twin_data) + except Exception as exc: + logger.error("Failed to deserialize twin for company %s: %s", company_id, exc) + return None + + async def get_twin_by_id( + self, + twin_id: str, + db: AsyncSession, + ) -> Optional[CompanyTwin]: + """ + Retrieve a CompanyTwin by its twin_id (scans all profiles). + استرجاع التوأم الرقمي برقمه المعرف + """ + all_profiles = await db.execute(select(CompanyProfile)) + for profile in all_profiles.scalars(): + prefs = profile.deal_preferences or {} + twin_data = prefs.get("twin") + if twin_data and twin_data.get("twin_id") == twin_id: + try: + return CompanyTwin(**twin_data) + except Exception: + continue + return None + + # ── Deal Readiness Report ─────────────────────────────────────────────── + + async def get_deal_readiness_report( + self, + twin_id: str, + db: AsyncSession, + ) -> dict: + """ + Generate an Arabic deal-readiness report for the company twin. + إنشاء تقرير جاهزية الصفقات بالعربي للتوأم الرقمي + """ + twin = await self.get_twin_by_id(twin_id, db) + if not twin: + raise ValueError(f"التوأم الرقمي غير موجود: {twin_id}") + + issues: list[str] = [] + score = 0.0 + max_score = 100.0 + + # 1. Capabilities completeness (0-25) + cap_count = len(twin.capabilities) + if cap_count == 0: + issues.append("لم يتم تحديد أي قدرات للشركة — أضف قدراتك لتحسين فرص المطابقة") + cap_score = 0.0 + elif cap_count < 3: + issues.append(f"لديك {cap_count} قدرات فقط — يفضل إضافة 3 قدرات على الأقل") + cap_score = cap_count * 8.0 + else: + cap_score = 25.0 + score += cap_score + + # 2. Needs clarity (0-20) + need_count = len(twin.needs) + if need_count == 0: + issues.append("لم يتم تحديد أي احتياجات — حدد احتياجاتك ليتمكن النظام من إيجاد شركاء") + need_score = 0.0 + elif need_count < 2: + issues.append(f"لديك احتياج واحد فقط — أضف المزيد لتوسيع خيارات الشراكة") + need_score = 10.0 + else: + need_score = 20.0 + score += need_score + + # 3. Authority matrix configured (0-15) + authority_score = 0.0 + if twin.authority.max_commitment_sar > 0: + authority_score += 5.0 + else: + issues.append("لم يتم تحديد حد أقصى لصلاحيات الذكاء الاصطناعي") + if len(twin.authority.auto_approve) > 0: + authority_score += 5.0 + if len(twin.authority.forbidden) > 0: + authority_score += 5.0 + else: + issues.append("لم يتم تحديد الإجراءات المحظورة — مهم للحماية") + score += authority_score + + # 4. Compliance readiness (0-20) + compliance_score = 0.0 + if twin.pdpl_consent_status == "granted": + compliance_score += 10.0 + else: + issues.append("موافقة نظام حماية البيانات الشخصية (PDPL) غير مكتملة") + if twin.whatsapp_opt_in: + compliance_score += 5.0 + else: + issues.append("لم يتم تفعيل الموافقة على التواصل عبر واتساب") + if twin.email_opt_in: + compliance_score += 5.0 + else: + issues.append("لم يتم تفعيل الموافقة على التواصل عبر البريد الإلكتروني") + score += compliance_score + + # 5. Deal preferences set (0-10) + pref_score = 0.0 + if twin.deal_types_allowed: + pref_score += 5.0 + else: + issues.append("لم يتم تحديد أنواع الصفقات المسموحة") + if twin.red_lines: + pref_score += 5.0 + else: + issues.append("لم يتم تحديد الخطوط الحمراء — يُنصح بتحديدها لحماية مصالحك") + score += pref_score + + # 6. Profile completeness (0-10) + profile_score = 0.0 + if twin.cr_number: + profile_score += 3.0 + else: + issues.append("أضف رقم السجل التجاري لزيادة الموثوقية") + if twin.website: + profile_score += 2.0 + if twin.name_ar: + profile_score += 2.0 + if twin.annual_revenue_sar > 0: + profile_score += 3.0 + score += profile_score + + # Readiness level + if score >= 80: + readiness = "جاهز للصفقات" + readiness_level = "high" + elif score >= 50: + readiness = "يحتاج تحسين بسيط" + readiness_level = "medium" + else: + readiness = "يحتاج اهتمام عاجل" + readiness_level = "low" + + report = { + "twin_id": twin.twin_id, + "company_name": twin.name, + "company_name_ar": twin.name_ar, + "score": round(score, 1), + "max_score": max_score, + "readiness_level": readiness_level, + "readiness_label_ar": readiness, + "breakdown": { + "capabilities": round(cap_score, 1), + "needs": round(need_score, 1), + "authority": round(authority_score, 1), + "compliance": round(compliance_score, 1), + "deal_preferences": round(pref_score, 1), + "profile": round(profile_score, 1), + }, + "issues_ar": issues, + "summary_ar": ( + f"شركة {twin.name_ar or twin.name}: " + f"درجة الجاهزية {score:.0f}/100 — {readiness}. " + + (f"يوجد {len(issues)} ملاحظات تحتاج معالجة." if issues else "الملف مكتمل وجاهز.") + ), + } + + logger.info( + "Deal readiness report for twin %s: score=%.1f level=%s", + twin_id, score, readiness_level, + ) + return report diff --git a/salesflow-saas/backend/app/services/strategic_deals/deal_room.py b/salesflow-saas/backend/app/services/strategic_deals/deal_room.py new file mode 100644 index 00000000..2d5ce9b1 --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/deal_room.py @@ -0,0 +1,674 @@ +""" +Deal Room — Central workspace for managing an active B2B deal through all stages. +غرفة الصفقة: مساحة العمل المركزية لإدارة صفقة B2B عبر جميع المراحل +""" + +import logging +import uuid +from datetime import datetime, timezone +from typing import Optional + +from pydantic import BaseModel, Field +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.strategic_deal import StrategicDeal, CompanyProfile, DealStatus +from app.services.llm.provider import get_llm + +logger = logging.getLogger("dealix.strategic_deals.deal_room") + + +# ── Room Stages ───────────────────────────────────────────────────────────── + +ROOM_STAGES = [ + "discovery", + "qualification", + "proposal", + "negotiation", + "legal", + "approval", + "closed_won", + "closed_lost", +] + +STAGE_LABELS_AR = { + "discovery": "اكتشاف", + "qualification": "تأهيل", + "proposal": "مقترح", + "negotiation": "تفاوض", + "legal": "مراجعة قانونية", + "approval": "موافقة", + "closed_won": "تمت بنجاح", + "closed_lost": "لم تتم", +} + + +# ── Pydantic Models ───────────────────────────────────────────────────────── + + +class ConcessionRecord(BaseModel): + """Record of a single concession given or received.""" + what: str + value_sar: float = 0.0 + direction: str = "given" # given or received + timestamp: str = "" + rationale: str = "" + + +class ApprovalRequest(BaseModel): + """An approval request within a deal room.""" + approval_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + action: str + details: str = "" + requested_by: str = "ai_agent" + requested_at: str = "" + status: str = "pending" # pending, granted, denied + decided_by: str = "" + decided_at: str = "" + notes: str = "" + + +class AuditEntry(BaseModel): + """Immutable audit log entry.""" + timestamp: str + actor: str # user_id or "ai_agent" + action: str + details: str = "" + metadata: dict = Field(default_factory=dict) + + +class RoomMessage(BaseModel): + """A message within the deal room conversation.""" + message_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + direction: str # inbound or outbound + channel: str # email, whatsapp, internal + content: str + sender: str = "" + timestamp: str = "" + metadata: dict = Field(default_factory=dict) + + +class DealRoom(BaseModel): + """ + Central workspace for managing a B2B deal. + غرفة الصفقة: مساحة العمل المركزية لصفقة B2B + """ + room_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + deal_id: str + tenant_id: str + + # Parties + our_twin_id: str = "" + their_profile: dict = Field(default_factory=dict) + + # Deal context + deal_type: str = "" + hypothesis: str = "" # Why this deal makes sense (Arabic) + mutual_value: dict = Field( + default_factory=lambda: {"us": [], "them": []}, + ) + + # Negotiation state + current_offer: dict = Field(default_factory=dict) + their_last_response: dict = Field(default_factory=dict) + concessions_made: list[ConcessionRecord] = Field(default_factory=list) + concessions_received: list[ConcessionRecord] = Field(default_factory=list) + batna: dict = Field(default_factory=dict) # Best alternative if deal fails + walk_away_threshold: dict = Field(default_factory=dict) + + # Conversation + messages: list[RoomMessage] = Field(default_factory=list) + channel: str = "email" + + # Status + stage: str = "discovery" + blockers: list[str] = Field(default_factory=list) + next_action: str = "" + next_action_ar: str = "" + + # Governance + approvals_pending: list[ApprovalRequest] = Field(default_factory=list) + approvals_granted: list[ApprovalRequest] = Field(default_factory=list) + red_line_violations: list[dict] = Field(default_factory=list) + audit_log: list[AuditEntry] = Field(default_factory=list) + + # Metadata + created_at: str = "" + updated_at: str = "" + + +# ── Service ───────────────────────────────────────────────────────────────── + + +class DealRoomService: + """ + Manages DealRoom lifecycle: creation, stage transitions, messaging, governance. + إدارة دورة حياة غرفة الصفقة: الإنشاء، والانتقال بين المراحل، والرسائل، والحوكمة + """ + + def __init__(self): + self.llm = get_llm() + + # ── Create Room ───────────────────────────────────────────────────────── + + async def create_room( + self, + deal_id: str, + our_twin_id: str, + their_profile: dict, + db: AsyncSession, + ) -> DealRoom: + """ + Create a new deal room linked to a StrategicDeal. + إنشاء غرفة صفقة جديدة مرتبطة بصفقة استراتيجية + """ + deal_result = await db.execute( + select(StrategicDeal).where(StrategicDeal.id == deal_id) + ) + deal = deal_result.scalar_one_or_none() + if not deal: + raise ValueError(f"الصفقة غير موجودة: {deal_id}") + + now_iso = datetime.now(timezone.utc).isoformat() + room = DealRoom( + deal_id=str(deal.id), + tenant_id=str(deal.tenant_id), + our_twin_id=our_twin_id, + their_profile=their_profile, + deal_type=deal.deal_type or "", + channel=deal.channel or "email", + stage="discovery", + next_action="research_target", + next_action_ar="بحث عن الطرف الآخر وتحليل احتياجاته", + created_at=now_iso, + updated_at=now_iso, + audit_log=[ + AuditEntry( + timestamp=now_iso, + actor="ai_agent", + action="room_created", + details=f"غرفة صفقة جديدة — نوع: {deal.deal_type or 'غير محدد'}", + ), + ], + ) + + # Persist room data on the deal + history = list(deal.negotiation_history or []) + history.append({ + "round": 0, + "action": "room_created", + "room_id": room.room_id, + "timestamp": now_iso, + }) + deal.negotiation_history = history + + # Store room in proposed_terms as a nested structure + existing_terms = dict(deal.proposed_terms or {}) + existing_terms["_deal_room"] = room.model_dump(mode="json") + deal.proposed_terms = existing_terms + await db.flush() + + logger.info("Created deal room %s for deal %s", room.room_id, deal_id) + return room + + # ── Load Room ─────────────────────────────────────────────────────────── + + async def _load_room(self, room_id: str, db: AsyncSession) -> tuple[DealRoom, StrategicDeal]: + """Load a DealRoom and its parent StrategicDeal.""" + # Scan deals for the room + all_deals = await db.execute(select(StrategicDeal)) + for deal in all_deals.scalars(): + terms = deal.proposed_terms or {} + room_data = terms.get("_deal_room") + if room_data and room_data.get("room_id") == room_id: + return DealRoom(**room_data), deal + raise ValueError(f"غرفة الصفقة غير موجودة: {room_id}") + + async def _persist_room(self, room: DealRoom, deal: StrategicDeal, db: AsyncSession): + """Persist the room state back onto the deal.""" + room.updated_at = datetime.now(timezone.utc).isoformat() + existing_terms = dict(deal.proposed_terms or {}) + existing_terms["_deal_room"] = room.model_dump(mode="json") + deal.proposed_terms = existing_terms + await db.flush() + + # ── Update Stage ──────────────────────────────────────────────────────── + + async def update_stage( + self, + room_id: str, + new_stage: str, + reason: str, + db: AsyncSession, + ): + """ + Transition the deal room to a new stage with audit logging. + نقل غرفة الصفقة إلى مرحلة جديدة مع تسجيل في سجل المراجعة + """ + if new_stage not in ROOM_STAGES: + raise ValueError(f"مرحلة غير صالحة: {new_stage}. المراحل المتاحة: {', '.join(ROOM_STAGES)}") + + room, deal = await self._load_room(room_id, db) + old_stage = room.stage + + # Validate forward-only transition (except to closed_lost which can happen from any stage) + if new_stage != "closed_lost": + old_idx = ROOM_STAGES.index(old_stage) if old_stage in ROOM_STAGES else 0 + new_idx = ROOM_STAGES.index(new_stage) + if new_idx < old_idx: + raise ValueError( + f"لا يمكن الرجوع من {STAGE_LABELS_AR.get(old_stage, old_stage)} " + f"إلى {STAGE_LABELS_AR.get(new_stage, new_stage)}" + ) + + room.stage = new_stage + now_iso = datetime.now(timezone.utc).isoformat() + room.audit_log.append( + AuditEntry( + timestamp=now_iso, + actor="ai_agent", + action="stage_changed", + details=f"انتقال من {STAGE_LABELS_AR.get(old_stage, old_stage)} إلى {STAGE_LABELS_AR.get(new_stage, new_stage)}: {reason}", + metadata={"old_stage": old_stage, "new_stage": new_stage}, + ) + ) + + # Sync deal status + stage_to_status = { + "discovery": DealStatus.DISCOVERY.value, + "qualification": DealStatus.DISCOVERY.value, + "proposal": DealStatus.OUTREACH.value, + "negotiation": DealStatus.NEGOTIATING.value, + "legal": DealStatus.TERM_SHEET.value, + "approval": DealStatus.DUE_DILIGENCE.value, + "closed_won": DealStatus.CLOSED_WON.value, + "closed_lost": DealStatus.CLOSED_LOST.value, + } + mapped_status = stage_to_status.get(new_stage) + if mapped_status: + deal.status = mapped_status + if new_stage in ("closed_won", "closed_lost"): + deal.closed_at = datetime.now(timezone.utc) + + await self._persist_room(room, deal, db) + logger.info("Room %s stage: %s -> %s (%s)", room_id, old_stage, new_stage, reason) + + # ── Add Message ───────────────────────────────────────────────────────── + + async def add_message( + self, + room_id: str, + message: str, + direction: str, + channel: str, + db: AsyncSession, + ): + """ + Record a message in the deal room conversation. + تسجيل رسالة في محادثة غرفة الصفقة + """ + room, deal = await self._load_room(room_id, db) + now_iso = datetime.now(timezone.utc).isoformat() + + room.messages.append( + RoomMessage( + direction=direction, + channel=channel, + content=message, + sender="ai_agent" if direction == "outbound" else "counterparty", + timestamp=now_iso, + ) + ) + + if direction == "inbound": + room.their_last_response = { + "content": message, + "channel": channel, + "timestamp": now_iso, + } + + room.audit_log.append( + AuditEntry( + timestamp=now_iso, + actor="ai_agent" if direction == "outbound" else "counterparty", + action=f"message_{direction}", + details=message[:200], + metadata={"channel": channel}, + ) + ) + + await self._persist_room(room, deal, db) + logger.info("Added %s message to room %s via %s", direction, room_id, channel) + + # ── Record Concession ─────────────────────────────────────────────────── + + async def record_concession( + self, + room_id: str, + what: str, + value: float, + db: AsyncSession, + ): + """ + Record a concession made during negotiation. + تسجيل تنازل تم خلال التفاوض + """ + room, deal = await self._load_room(room_id, db) + now_iso = datetime.now(timezone.utc).isoformat() + + record = ConcessionRecord( + what=what, + value_sar=value, + direction="given", + timestamp=now_iso, + ) + room.concessions_made.append(record) + + room.audit_log.append( + AuditEntry( + timestamp=now_iso, + actor="ai_agent", + action="concession_made", + details=f"تنازل: {what} (قيمة: {value:,.0f} ريال)", + metadata={"value_sar": value}, + ) + ) + + await self._persist_room(room, deal, db) + logger.info("Recorded concession in room %s: %s (%.0f SAR)", room_id, what, value) + + # ── Check Red Lines ───────────────────────────────────────────────────── + + async def check_red_lines( + self, + room_id: str, + proposed_terms: dict, + db: AsyncSession, + ) -> list[str]: + """ + Check proposed terms against the company's red lines. + التحقق من الشروط المقترحة مقابل الخطوط الحمراء للشركة + """ + room, deal = await self._load_room(room_id, db) + + # Load the company twin to get red lines + from app.services.strategic_deals.company_twin import CompanyTwinBuilder + builder = CompanyTwinBuilder() + twin = None + + if room.our_twin_id: + twin = await builder.get_twin_by_id(room.our_twin_id, db) + + if not twin: + # Try loading by company_id from the deal initiator + if deal.initiator_profile_id: + twin = await builder.get_twin(str(deal.initiator_profile_id), db) + + red_lines = twin.red_lines if twin else [] + if not red_lines: + return [] + + violations: list[str] = [] + terms_text = str(proposed_terms).lower() + + # Static keyword check + keyword_checks = { + "حصرية": "exclusivity", + "حقوق ملكية": "equity", + "ضمان": "guarantee", + "تعويض": "compensation", + "غرامة": "penalty", + } + + for red_line in red_lines: + red_line_lower = red_line.lower() + # Direct keyword match + if red_line_lower in terms_text: + violations.append(f"خط أحمر: {red_line}") + continue + # Check Arabic keywords + for ar_kw, en_kw in keyword_checks.items(): + if ar_kw in red_line_lower and en_kw in terms_text: + violations.append(f"خط أحمر: {red_line}") + break + + # If there are potential concerns, use LLM for deeper analysis + if not violations and red_lines: + system_prompt = """أنت مراجع عقود سعودي. تحقق من الشروط المقترحة مقابل الخطوط الحمراء. + +أعد النتائج بصيغة JSON: +{ + "violations": ["وصف الانتهاك 1", "وصف الانتهاك 2"], + "warnings": ["تحذير 1"] +} + +إذا لم يكن هناك انتهاكات، أعد قوائم فارغة.""" + + context = ( + f"الخطوط الحمراء:\n" + "\n".join(f"- {rl}" for rl in red_lines) + + f"\n\nالشروط المقترحة:\n{str(proposed_terms)}" + ) + + try: + llm_response = await self.llm.complete( + system_prompt=system_prompt, + user_message=context, + json_mode=True, + temperature=0.1, + ) + result = llm_response.parse_json() + if result and result.get("violations"): + violations.extend(result["violations"]) + except Exception as exc: + logger.warning("LLM red-line check failed: %s", exc) + + # Record violations + if violations: + now_iso = datetime.now(timezone.utc).isoformat() + for v in violations: + room.red_line_violations.append({ + "violation": v, + "proposed_terms": proposed_terms, + "timestamp": now_iso, + }) + room.audit_log.append( + AuditEntry( + timestamp=now_iso, + actor="ai_agent", + action="red_line_violation", + details=f"تم اكتشاف {len(violations)} انتهاك للخطوط الحمراء", + metadata={"violations": violations}, + ) + ) + await self._persist_room(room, deal, db) + + logger.info("Red line check for room %s: %d violations", room_id, len(violations)) + return violations + + # ── Request Approval ──────────────────────────────────────────────────── + + async def request_approval( + self, + room_id: str, + action: str, + details: str, + db: AsyncSession, + ) -> str: + """ + Create an approval request that pauses AI action until a human decides. + إنشاء طلب موافقة يوقف عمل الذكاء الاصطناعي حتى يقرر إنسان + """ + room, deal = await self._load_room(room_id, db) + now_iso = datetime.now(timezone.utc).isoformat() + + approval = ApprovalRequest( + action=action, + details=details, + requested_at=now_iso, + ) + room.approvals_pending.append(approval) + + room.blockers.append(f"بانتظار موافقة على: {action}") + room.audit_log.append( + AuditEntry( + timestamp=now_iso, + actor="ai_agent", + action="approval_requested", + details=f"طلب موافقة: {action} — {details}", + metadata={"approval_id": approval.approval_id}, + ) + ) + + await self._persist_room(room, deal, db) + logger.info("Approval requested in room %s: %s (id=%s)", room_id, action, approval.approval_id) + return approval.approval_id + + # ── Grant Approval ────────────────────────────────────────────────────── + + async def grant_approval( + self, + room_id: str, + approval_id: str, + user_id: str, + db: AsyncSession, + ): + """ + Grant a pending approval request. + منح موافقة على طلب معلق + """ + room, deal = await self._load_room(room_id, db) + now_iso = datetime.now(timezone.utc).isoformat() + + granted = None + remaining_pending = [] + for req in room.approvals_pending: + if req.approval_id == approval_id: + req.status = "granted" + req.decided_by = user_id + req.decided_at = now_iso + granted = req + else: + remaining_pending.append(req) + + if not granted: + raise ValueError(f"طلب الموافقة غير موجود: {approval_id}") + + room.approvals_pending = remaining_pending + room.approvals_granted.append(granted) + + # Remove related blocker + blocker_prefix = f"بانتظار موافقة على: {granted.action}" + room.blockers = [b for b in room.blockers if b != blocker_prefix] + + room.audit_log.append( + AuditEntry( + timestamp=now_iso, + actor=user_id, + action="approval_granted", + details=f"تمت الموافقة على: {granted.action}", + metadata={"approval_id": approval_id}, + ) + ) + + await self._persist_room(room, deal, db) + logger.info("Approval %s granted by %s in room %s", approval_id, user_id, room_id) + + # ── Deal Summary ──────────────────────────────────────────────────────── + + async def get_deal_summary( + self, + room_id: str, + db: AsyncSession, + ) -> dict: + """ + Generate an Arabic summary of the deal room status. + إنشاء ملخص عربي لحالة غرفة الصفقة + """ + room, deal = await self._load_room(room_id, db) + + # Gather data for LLM summary + msg_count = len(room.messages) + inbound = sum(1 for m in room.messages if m.direction == "inbound") + outbound = msg_count - inbound + concessions_given = len(room.concessions_made) + concessions_got = len(room.concessions_received) + total_concession_value = sum(c.value_sar for c in room.concessions_made) + pending_approvals = len(room.approvals_pending) + violations = len(room.red_line_violations) + + stage_ar = STAGE_LABELS_AR.get(room.stage, room.stage) + their_name = room.their_profile.get("company_name", room.their_profile.get("name", "الطرف الآخر")) + + summary = { + "room_id": room.room_id, + "deal_id": room.deal_id, + "stage": room.stage, + "stage_ar": stage_ar, + "their_name": their_name, + "deal_type": room.deal_type, + "channel": room.channel, + "statistics": { + "total_messages": msg_count, + "inbound_messages": inbound, + "outbound_messages": outbound, + "concessions_given": concessions_given, + "concessions_received": concessions_got, + "total_concession_value_sar": total_concession_value, + "pending_approvals": pending_approvals, + "red_line_violations": violations, + }, + "blockers": room.blockers, + "next_action": room.next_action, + "next_action_ar": room.next_action_ar, + "current_offer": room.current_offer, + "their_last_response": room.their_last_response, + "summary_ar": ( + f"صفقة مع {their_name} — المرحلة: {stage_ar}\n" + f"عدد الرسائل: {msg_count} ({inbound} واردة، {outbound} صادرة)\n" + f"التنازلات المقدمة: {concessions_given} (بقيمة {total_concession_value:,.0f} ريال)\n" + + (f"موافقات معلقة: {pending_approvals}\n" if pending_approvals else "") + + (f"تحذير: {violations} انتهاك للخطوط الحمراء\n" if violations else "") + + (f"الخطوة التالية: {room.next_action_ar}" if room.next_action_ar else "") + ), + } + + logger.info("Generated deal summary for room %s", room_id) + return summary + + # ── Get Rooms ─────────────────────────────────────────────────────────── + + async def get_rooms( + self, + tenant_id: str, + stage: Optional[str] = None, + db: AsyncSession = None, + ) -> list[DealRoom]: + """ + List all deal rooms for a tenant, optionally filtered by stage. + عرض جميع غرف الصفقات لمستأجر معين مع إمكانية الفلترة بالمرحلة + """ + query = select(StrategicDeal).where( + StrategicDeal.tenant_id == tenant_id + ) + result = await db.execute(query) + deals = result.scalars().all() + + rooms: list[DealRoom] = [] + for deal in deals: + terms = deal.proposed_terms or {} + room_data = terms.get("_deal_room") + if not room_data: + continue + try: + room = DealRoom(**room_data) + if stage and room.stage != stage: + continue + rooms.append(room) + except Exception as exc: + logger.warning("Failed to deserialize room from deal %s: %s", deal.id, exc) + + logger.info( + "Retrieved %d deal rooms for tenant %s (stage=%s)", + len(rooms), tenant_id, stage or "all", + ) + return rooms diff --git a/salesflow-saas/backend/app/services/strategic_deals/deal_taxonomy.py b/salesflow-saas/backend/app/services/strategic_deals/deal_taxonomy.py new file mode 100644 index 00000000..9d69aa23 --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/deal_taxonomy.py @@ -0,0 +1,573 @@ +""" +Deal Taxonomy — Complete taxonomy of 15 B2B deal types with templates and qualification flows. +تصنيف الصفقات: 15 نوعاً من صفقات الشراكات بين الشركات مع قوالب وأسئلة تأهيل +""" + +import logging +from typing import Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger("dealix.strategic_deals.taxonomy") + + +# ── Taxonomy Schema ───────────────────────────────────────────────────────── + + +class DealTypeSpec(BaseModel): + """Full specification for a deal type in the taxonomy.""" + id: str + name: str + name_ar: str + description: str + description_ar: str + qualification_questions: list[str] # Arabic questions + typical_terms: list[str] + risk_level: str # low, medium, high + approval_level: str # mode_0 through mode_4 + need_categories: list[str] # Which need categories this deal type addresses + example_ar: str # Real-world Saudi example + + +# ── The 15-Type Taxonomy ──────────────────────────────────────────────────── + +DEAL_TAXONOMY: dict[str, dict] = { + "service_barter": { + "name": "Service-for-Service Exchange", + "name_ar": "تبادل خدمات", + "description": "Exchange services of equivalent value without cash transactions", + "description_ar": "تبادل خدمات بقيمة متساوية بين شركتين بدون تدفقات نقدية", + "qualification_questions": [ + "ما الخدمة التي تقدمونها للتبادل؟", + "ما القيمة التقديرية لهذه الخدمة بالريال السعودي؟", + "ما الخدمة التي تحتاجونها بالمقابل؟", + "ما المدة المتوقعة لهذا التبادل؟", + "هل لديكم خبرة سابقة في تبادل الخدمات؟", + ], + "typical_terms": [ + "duration", + "scope", + "quality_sla", + "cancellation", + "value_equivalence_method", + "dispute_resolution", + ], + "risk_level": "low", + "approval_level": "mode_2", + "need_categories": ["marketing", "technology", "delivery"], + "example_ar": "شركة تسويق تقدم حملات رقمية لشركة برمجيات مقابل تطوير موقع إلكتروني", + }, + "referral_partnership": { + "name": "Referral Partnership", + "name_ar": "شراكة إحالة", + "description": "Earn commission by referring qualified leads to each other", + "description_ar": "كسب عمولة من خلال إحالة عملاء مؤهلين بين الشركتين", + "qualification_questions": [ + "ما نوع العملاء الذين تحيلونهم عادة؟", + "ما نسبة العمولة المتوقعة؟", + "كيف يتم تتبع الإحالات؟", + "ما هو متوسط حجم الصفقة لعملائكم؟", + ], + "typical_terms": [ + "commission_rate", + "tracking_method", + "payment_schedule", + "exclusivity", + "minimum_referrals", + "non_compete", + ], + "risk_level": "low", + "approval_level": "mode_2", + "need_categories": ["sales", "marketing"], + "example_ar": "مكتب محاماة يحيل عملاءه لشركة محاسبة مقابل 10% من قيمة أول عقد", + }, + "co_selling": { + "name": "Co-Selling Agreement", + "name_ar": "بيع مشترك", + "description": "Joint sales efforts targeting shared opportunities", + "description_ar": "جهود بيع مشتركة لاستهداف فرص مشتركة بين الشركتين", + "qualification_questions": [ + "ما المنتجات أو الخدمات التي ستباع بشكل مشترك؟", + "كيف سيتم تقسيم الإيرادات؟", + "من يقود عملية البيع؟", + "ما القطاعات المستهدفة؟", + "هل لديكم فريق مبيعات مخصص لهذا الغرض؟", + ], + "typical_terms": [ + "revenue_split", + "lead_ownership", + "territory", + "sales_process", + "brand_usage", + "training_requirements", + ], + "risk_level": "medium", + "approval_level": "mode_3", + "need_categories": ["sales", "distribution"], + "example_ar": "شركة برمجيات وشركة استشارات يبيعون حلولاً متكاملة لقطاع الصحة", + }, + "co_marketing": { + "name": "Co-Marketing Campaign", + "name_ar": "تسويق مشترك", + "description": "Joint marketing campaigns sharing costs and audiences", + "description_ar": "حملات تسويقية مشتركة مع تقاسم التكاليف والجمهور المستهدف", + "qualification_questions": [ + "ما القنوات التسويقية المستهدفة؟", + "ما الميزانية المتوقعة من كل طرف؟", + "من الجمهور المستهدف المشترك؟", + "ما مؤشرات النجاح المتفق عليها؟", + ], + "typical_terms": [ + "budget_split", + "channels", + "brand_guidelines", + "content_approval", + "lead_sharing", + "duration", + ], + "risk_level": "low", + "approval_level": "mode_2", + "need_categories": ["marketing"], + "example_ar": "شركتا تقنية تشتركان في رعاية مؤتمر قطاع التجزئة وتتقاسمان العملاء المحتملين", + }, + "subcontracting": { + "name": "Subcontracting Agreement", + "name_ar": "عقد باطن (مقاولة فرعية)", + "description": "Outsource specific project scope to a specialized partner", + "description_ar": "إسناد جزء من نطاق المشروع لشريك متخصص كمقاول فرعي", + "qualification_questions": [ + "ما نطاق العمل المطلوب إسناده؟", + "ما المهارات والشهادات المطلوبة؟", + "ما الجدول الزمني للتسليم؟", + "هل المشروع حكومي أو خاص؟", + "ما شروط الضمان والجودة؟", + ], + "typical_terms": [ + "scope_of_work", + "payment_milestones", + "quality_standards", + "liability", + "insurance", + "confidentiality", + "penalties", + ], + "risk_level": "medium", + "approval_level": "mode_3", + "need_categories": ["delivery", "talent"], + "example_ar": "شركة مقاولات كبرى تسند أعمال الكهرباء لشركة متخصصة في مشروع حكومي", + }, + "white_label": { + "name": "White-Label / Private Label", + "name_ar": "علامة بيضاء", + "description": "Provide products or services under the partner's brand", + "description_ar": "تقديم منتجات أو خدمات تحت العلامة التجارية للشريك", + "qualification_questions": [ + "ما المنتج أو الخدمة المراد تقديمها تحت علامتهم؟", + "ما مستوى التخصيص المطلوب؟", + "كيف سيتم التسعير والهوامش؟", + "ما متطلبات الجودة والدعم الفني؟", + ], + "typical_terms": [ + "branding_rights", + "customization_scope", + "pricing_structure", + "minimum_volume", + "exclusivity", + "support_sla", + "ip_ownership", + ], + "risk_level": "medium", + "approval_level": "mode_3", + "need_categories": ["technology", "delivery"], + "example_ar": "شركة برمجيات سعودية توفر نظام CRM تحت العلامة التجارية لشركة اتصالات", + }, + "reseller": { + "name": "Reseller Agreement", + "name_ar": "اتفاقية موزع معتمد", + "description": "Authorized resale of products or services with margin", + "description_ar": "إعادة بيع منتجات أو خدمات الشريك بصفة موزع معتمد مع هامش ربح", + "qualification_questions": [ + "ما المنتجات المراد توزيعها؟", + "ما المنطقة الجغرافية المستهدفة؟", + "هل التوزيع حصري أم غير حصري؟", + "ما هامش الربح المتوقع؟", + "ما حجم المبيعات المتوقع سنوياً؟", + ], + "typical_terms": [ + "territory", + "exclusivity", + "margin_structure", + "minimum_purchase", + "payment_terms", + "marketing_support", + "training", + "return_policy", + ], + "risk_level": "medium", + "approval_level": "mode_3", + "need_categories": ["distribution", "sales"], + "example_ar": "شركة سعودية توزع حلول أمن سيبراني لشركة أمريكية في منطقة الخليج", + }, + "strategic_alliance": { + "name": "Strategic Alliance", + "name_ar": "تحالف استراتيجي", + "description": "Long-term strategic collaboration without equity exchange", + "description_ar": "تعاون استراتيجي طويل الأمد بدون تبادل حصص ملكية", + "qualification_questions": [ + "ما الأهداف الاستراتيجية المشتركة؟", + "ما مدة التحالف المتوقعة؟", + "كيف ستتم الحوكمة واتخاذ القرارات؟", + "ما الموارد التي سيساهم بها كل طرف؟", + "هل هناك اتفاقيات عدم منافسة؟", + ], + "typical_terms": [ + "strategic_objectives", + "governance_structure", + "resource_commitments", + "non_compete", + "exit_terms", + "ip_sharing", + "confidentiality", + ], + "risk_level": "high", + "approval_level": "mode_4", + "need_categories": ["capital", "distribution", "technology"], + "example_ar": "شركة لوجستية وشركة تقنية يتحالفان لتقديم حلول سلسلة إمداد ذكية للسوق السعودي", + }, + "channel_partnership": { + "name": "Channel Partnership", + "name_ar": "شراكة قنوات توزيع", + "description": "Leverage partner's sales channels for distribution", + "description_ar": "الاستفادة من قنوات بيع الشريك لتوزيع منتجاتك وخدماتك", + "qualification_questions": [ + "ما القنوات التي يمتلكها الشريك؟", + "ما حجم قاعدة عملائهم؟", + "كيف سيتم تقسيم المسؤوليات؟", + "ما الدعم المطلوب للقناة (تدريب، مواد تسويقية)؟", + ], + "typical_terms": [ + "channel_type", + "commission_structure", + "training_requirements", + "marketing_support", + "performance_targets", + "reporting_frequency", + ], + "risk_level": "medium", + "approval_level": "mode_3", + "need_categories": ["distribution", "sales"], + "example_ar": "شركة SaaS تستخدم شبكة استشاري إداريين لبيع منتجها في المملكة", + }, + "joint_venture": { + "name": "Joint Venture", + "name_ar": "مشروع مشترك", + "description": "Create a new entity jointly owned by both parties", + "description_ar": "إنشاء كيان جديد مملوك بشكل مشترك بين الطرفين", + "qualification_questions": [ + "ما هدف المشروع المشترك؟", + "ما نسبة مساهمة كل طرف؟", + "ما الشكل القانوني المقترح (شركة ذات مسؤولية محدودة، شراكة)؟", + "من سيتولى الإدارة اليومية؟", + "ما استراتيجية الخروج؟", + "كيف ستوزع الأرباح والخسائر؟", + ], + "typical_terms": [ + "equity_split", + "capital_contributions", + "governance", + "management_structure", + "profit_distribution", + "exit_strategy", + "non_compete", + "dispute_resolution", + ], + "risk_level": "high", + "approval_level": "mode_4", + "need_categories": ["capital", "technology", "distribution"], + "example_ar": "مستثمر سعودي وشركة تقنية أجنبية ينشئون شركة مشتركة لتقديم حلول الذكاء الاصطناعي محلياً", + }, + "acquisition_scouting": { + "name": "Acquisition Scouting", + "name_ar": "استكشاف استحواذ", + "description": "Identify and qualify potential acquisition targets", + "description_ar": "تحديد وتأهيل الشركات المرشحة للاستحواذ", + "qualification_questions": [ + "ما القطاع المستهدف للاستحواذ؟", + "ما الحجم المثالي للشركة المستهدفة (إيرادات، موظفين)؟", + "ما الميزانية المتاحة للاستحواذ؟", + "هل تبحثون عن استحواذ كامل أو حصة جزئية؟", + "ما الأصول الاستراتيجية المطلوبة (تقنية، عملاء، تراخيص)؟", + ], + "typical_terms": [ + "target_criteria", + "valuation_method", + "due_diligence_scope", + "exclusivity_period", + "advisory_fees", + "confidentiality", + ], + "risk_level": "high", + "approval_level": "mode_4", + "need_categories": ["capital", "technology"], + "example_ar": "مجموعة سعودية تبحث عن شركات تقنية ناشئة للاستحواذ بميزانية 5-20 مليون ريال", + }, + "investment_intro": { + "name": "Investment Introduction", + "name_ar": "تقديم فرصة استثمارية", + "description": "Connect companies with investors or investment opportunities", + "description_ar": "ربط الشركات بمستثمرين أو فرص استثمارية مناسبة", + "qualification_questions": [ + "هل تبحثون عن استثمار أم مستثمر؟", + "ما حجم التمويل المطلوب أو المتاح؟", + "ما مرحلة نمو الشركة؟", + "ما العائد المتوقع على الاستثمار؟", + "هل لديكم عرض تقديمي (Pitch Deck) جاهز؟", + ], + "typical_terms": [ + "investment_size", + "valuation", + "equity_offered", + "use_of_funds", + "board_representation", + "anti_dilution", + "introducer_fee", + ], + "risk_level": "high", + "approval_level": "mode_4", + "need_categories": ["capital"], + "example_ar": "شركة ناشئة سعودية تبحث عن جولة تمويل Series A بقيمة 10 مليون ريال", + }, + "vendor_replacement": { + "name": "Vendor Replacement", + "name_ar": "استبدال مورد", + "description": "Replace an existing vendor with a better-fit partner", + "description_ar": "استبدال مورد حالي بشريك أفضل من حيث الجودة أو السعر أو الخدمة", + "qualification_questions": [ + "ما الخدمة أو المنتج الذي يقدمه المورد الحالي؟", + "ما أسباب الرغبة في التغيير؟", + "ما معايير اختيار المورد الجديد؟", + "ما الميزانية المتاحة؟", + "ما الجدول الزمني المطلوب للانتقال؟", + ], + "typical_terms": [ + "transition_plan", + "pricing_comparison", + "service_level_agreement", + "contract_duration", + "penalty_clauses", + "data_migration", + ], + "risk_level": "medium", + "approval_level": "mode_3", + "need_categories": ["delivery", "technology"], + "example_ar": "مستشفى يبحث عن مورد جديد لمستلزمات طبية بعد انتهاء عقد المورد الحالي", + }, + "capability_gap_fill": { + "name": "Capability Gap Fill", + "name_ar": "سد فجوة القدرات", + "description": "Partner with a company to fill a specific capability gap", + "description_ar": "التعاون مع شركة متخصصة لسد فجوة في قدرات شركتك", + "qualification_questions": [ + "ما الفجوة التي تحتاجون سدها؟", + "هل هي فجوة مؤقتة أم دائمة؟", + "ما مستوى التخصص المطلوب؟", + "هل تفضلون شريكاً محلياً أم دولياً؟", + "ما ميزانية سد هذه الفجوة؟", + ], + "typical_terms": [ + "gap_definition", + "duration", + "knowledge_transfer", + "performance_metrics", + "pricing", + "confidentiality", + "training_commitment", + ], + "risk_level": "low", + "approval_level": "mode_2", + "need_categories": ["talent", "technology", "delivery"], + "example_ar": "شركة مقاولات تتعاون مع شركة تصميم معماري لتقديم عروض متكاملة", + }, + "tender_consortium": { + "name": "Tender Consortium", + "name_ar": "تحالف مناقصات", + "description": "Form a consortium to jointly bid on large tenders", + "description_ar": "تشكيل تحالف للتقدم بعرض مشترك في المناقصات الكبرى", + "qualification_questions": [ + "ما المناقصة أو المشروع المستهدف؟", + "ما الجهة المالكة للمناقصة؟", + "ما التخصصات المطلوبة لتكوين التحالف؟", + "ما الموعد النهائي لتقديم العرض؟", + "هل لديكم خبرة سابقة في المناقصات الحكومية؟", + "ما نسبة المحتوى المحلي المطلوبة؟", + ], + "typical_terms": [ + "scope_allocation", + "revenue_split", + "lead_partner", + "joint_liability", + "bid_bond", + "performance_bond", + "local_content", + "governance", + ], + "risk_level": "high", + "approval_level": "mode_4", + "need_categories": ["delivery", "capital", "talent"], + "example_ar": "ثلاث شركات سعودية تتحالف للتقدم لمناقصة مشروع بنية تحتية حكومي بقيمة 50 مليون ريال", + }, +} + +# ── Mapping from need categories to deal types ────────────────────────────── + +_NEED_TO_DEAL_MAP: dict[str, list[str]] = {} +for _deal_id, _spec in DEAL_TAXONOMY.items(): + for _cat in _spec["need_categories"]: + _NEED_TO_DEAL_MAP.setdefault(_cat, []).append(_deal_id) + + +# ── Service ───────────────────────────────────────────────────────────────── + + +class DealTaxonomyService: + """ + Provides lookup and intelligence over the 15-type deal taxonomy. + خدمة تصنيف الصفقات: بحث واقتراحات ذكية لأنواع الصفقات الخمسة عشر + """ + + @staticmethod + def get_deal_type(type_id: str) -> Optional[DealTypeSpec]: + """Return full spec for a deal type, or None if not found.""" + raw = DEAL_TAXONOMY.get(type_id) + if not raw: + return None + return DealTypeSpec(id=type_id, **raw) + + @staticmethod + def get_all_types() -> list[DealTypeSpec]: + """Return all 15 deal types as structured specs.""" + return [ + DealTypeSpec(id=type_id, **spec) + for type_id, spec in DEAL_TAXONOMY.items() + ] + + @staticmethod + def get_types_for_need(need_category: str) -> list[str]: + """ + Return deal type IDs that address a given need category. + إرجاع أنواع الصفقات التي تلبي فئة احتياج معينة + """ + return _NEED_TO_DEAL_MAP.get(need_category, []) + + @staticmethod + def get_qualification_questions(type_id: str, language: str = "ar") -> list[str]: + """ + Return qualification questions for a deal type. + إرجاع أسئلة التأهيل لنوع صفقة معين + """ + spec = DEAL_TAXONOMY.get(type_id) + if not spec: + return [] + questions = spec["qualification_questions"] + if language == "ar": + return questions + # English placeholders — in production these would be translated + return [f"[Q{i+1}] {q}" for i, q in enumerate(questions)] + + @staticmethod + def get_typical_terms(type_id: str) -> list[str]: + """Return typical negotiation terms for a deal type.""" + spec = DEAL_TAXONOMY.get(type_id) + if not spec: + return [] + return spec["typical_terms"] + + @staticmethod + def suggest_deal_type( + capability_category: str, + need_category: str, + ) -> str: + """ + Suggest the best deal type given a capability and a need. + اقتراح أفضل نوع صفقة بناءً على القدرة والاحتياج + """ + # Priority matrix: (capability_cat, need_cat) -> preferred deal type + priority_map: dict[tuple[str, str], str] = { + ("service", "marketing"): "co_marketing", + ("service", "sales"): "co_selling", + ("service", "delivery"): "subcontracting", + ("service", "technology"): "capability_gap_fill", + ("product", "distribution"): "reseller", + ("product", "sales"): "channel_partnership", + ("expertise", "talent"): "capability_gap_fill", + ("expertise", "technology"): "white_label", + ("capacity", "delivery"): "subcontracting", + ("capacity", "capital"): "joint_venture", + ("distribution", "marketing"): "co_marketing", + ("distribution", "sales"): "channel_partnership", + ("distribution", "distribution"): "reseller", + ("technology", "technology"): "white_label", + ("technology", "capital"): "investment_intro", + } + + specific = priority_map.get((capability_category, need_category)) + if specific: + logger.info( + "Suggested deal type %s for capability=%s, need=%s", + specific, capability_category, need_category, + ) + return specific + + # Fallback: find deal types matching the need category + candidates = _NEED_TO_DEAL_MAP.get(need_category, []) + if candidates: + # Prefer lower-risk options first + risk_order = {"low": 0, "medium": 1, "high": 2} + candidates_sorted = sorted( + candidates, + key=lambda t: risk_order.get(DEAL_TAXONOMY[t]["risk_level"], 1), + ) + result = candidates_sorted[0] + logger.info( + "Fallback deal type %s for capability=%s, need=%s", + result, capability_category, need_category, + ) + return result + + logger.info( + "No specific deal type found for capability=%s, need=%s; defaulting to referral_partnership", + capability_category, need_category, + ) + return "referral_partnership" + + @staticmethod + def get_risk_level(type_id: str) -> str: + """Return the risk level for a deal type.""" + spec = DEAL_TAXONOMY.get(type_id) + return spec["risk_level"] if spec else "medium" + + @staticmethod + def get_approval_level(type_id: str) -> str: + """Return the minimum operating mode required for this deal type.""" + spec = DEAL_TAXONOMY.get(type_id) + return spec["approval_level"] if spec else "mode_3" + + @staticmethod + def search_types(query: str) -> list[DealTypeSpec]: + """ + Search deal types by keyword (English or Arabic). + بحث في أنواع الصفقات بكلمة مفتاحية + """ + query_lower = query.lower().strip() + results = [] + for type_id, spec in DEAL_TAXONOMY.items(): + searchable = " ".join([ + type_id, + spec["name"].lower(), + spec["name_ar"], + spec["description"].lower(), + spec["description_ar"], + ]) + if query_lower in searchable: + results.append(DealTypeSpec(id=type_id, **spec)) + return results diff --git a/salesflow-saas/backend/app/services/strategic_deals/operating_modes.py b/salesflow-saas/backend/app/services/strategic_deals/operating_modes.py new file mode 100644 index 00000000..a57847a9 --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/operating_modes.py @@ -0,0 +1,429 @@ +""" +Operating Modes — Five levels of AI autonomy for deal management. +أوضاع التشغيل: خمسة مستويات لصلاحيات الذكاء الاصطناعي في إدارة الصفقات +""" + +import enum +import logging +from typing import Optional + +from pydantic import BaseModel, Field +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.strategic_deal import CompanyProfile + +logger = logging.getLogger("dealix.strategic_deals.operating_modes") + + +# ── Operating Modes ───────────────────────────────────────────────────────── + + +class OperatingMode(int, enum.Enum): + """ + Five escalating levels of AI autonomy. + خمسة مستويات تصاعدية لاستقلالية الذكاء الاصطناعي + """ + MANUAL = 0 # AI analyzes, human does everything / الذكاء الاصطناعي يحلل والإنسان ينفذ + DRAFT = 1 # AI writes drafts, human sends / الذكاء الاصطناعي يكتب والإنسان يرسل + ASSISTED = 2 # AI sends approved templates via email / الذكاء الاصطناعي يرسل قوالب معتمدة بالإيميل + NEGOTIATION = 3 # AI negotiates within defined gates / الذكاء الاصطناعي يفاوض ضمن حدود محددة + STRATEGIC = 4 # Full workflow with mandatory escalation for commitments / سير عمل كامل مع تصعيد إلزامي للالتزامات + + +MODE_LABELS_AR = { + OperatingMode.MANUAL: "يدوي", + OperatingMode.DRAFT: "مسودات", + OperatingMode.ASSISTED: "مساعد", + OperatingMode.NEGOTIATION: "تفاوض", + OperatingMode.STRATEGIC: "استراتيجي", +} + +MODE_DESCRIPTIONS_AR = { + OperatingMode.MANUAL: "الذكاء الاصطناعي يحلل ويقترح فقط — أنت تنفذ كل شيء", + OperatingMode.DRAFT: "الذكاء الاصطناعي يكتب المسودات — أنت تراجع وترسل", + OperatingMode.ASSISTED: "الذكاء الاصطناعي يرسل القوالب المعتمدة عبر البريد الإلكتروني تلقائياً", + OperatingMode.NEGOTIATION: "الذكاء الاصطناعي يتفاوض ضمن الحدود المحددة — يصعّد عند الحاجة", + OperatingMode.STRATEGIC: "سير عمل كامل مع تصعيد إلزامي لأي التزام مالي أو قانوني", +} + + +# ── Mode Policy ───────────────────────────────────────────────────────────── + + +class ModePolicy(BaseModel): + """Policy governing what an AI agent can do in a given operating mode.""" + mode: int # OperatingMode value + allowed_channels: list[str] = Field(default_factory=list) + allowed_actions: list[str] = Field(default_factory=list) + auto_send: bool = False + auto_negotiate: bool = False + escalation_triggers: list[str] = Field(default_factory=list) + max_auto_commitment_sar: float = 0.0 + + # Labels + label_ar: str = "" + description_ar: str = "" + + +# ── Predefined Policies ──────────────────────────────────────────────────── + + +MODE_POLICIES: dict[OperatingMode, ModePolicy] = { + OperatingMode.MANUAL: ModePolicy( + mode=OperatingMode.MANUAL.value, + allowed_channels=[], + allowed_actions=[ + "analyze", + "suggest", + "draft", + "score_match", + "generate_report", + ], + auto_send=False, + auto_negotiate=False, + escalation_triggers=["all"], + max_auto_commitment_sar=0, + label_ar="يدوي", + description_ar="الذكاء الاصطناعي يحلل ويقترح فقط — أنت تنفذ كل شيء", + ), + OperatingMode.DRAFT: ModePolicy( + mode=OperatingMode.DRAFT.value, + allowed_channels=[], + allowed_actions=[ + "analyze", + "suggest", + "draft", + "score_match", + "generate_report", + "craft_introduction", + "draft_proposal", + "draft_counter_offer", + ], + auto_send=False, + auto_negotiate=False, + escalation_triggers=["all"], + max_auto_commitment_sar=0, + label_ar="مسودات", + description_ar="الذكاء الاصطناعي يكتب المسودات — أنت تراجع وترسل", + ), + OperatingMode.ASSISTED: ModePolicy( + mode=OperatingMode.ASSISTED.value, + allowed_channels=["email"], + allowed_actions=[ + "analyze", + "suggest", + "draft", + "score_match", + "generate_report", + "craft_introduction", + "draft_proposal", + "send_template", + "send_follow_up", + "schedule_reminder", + ], + auto_send=True, + auto_negotiate=False, + escalation_triggers=[ + "reply_received", + "objection", + "pricing_question", + "meeting_request", + "negative_sentiment", + ], + max_auto_commitment_sar=0, + label_ar="مساعد", + description_ar="الذكاء الاصطناعي يرسل القوالب المعتمدة عبر البريد الإلكتروني تلقائياً", + ), + OperatingMode.NEGOTIATION: ModePolicy( + mode=OperatingMode.NEGOTIATION.value, + allowed_channels=["email", "whatsapp"], + allowed_actions=[ + "analyze", + "suggest", + "draft", + "score_match", + "generate_report", + "craft_introduction", + "draft_proposal", + "send_template", + "send_follow_up", + "schedule_reminder", + "send_custom_message", + "handle_response", + "counter_offer", + "negotiate_terms", + "record_concession", + ], + auto_send=True, + auto_negotiate=True, + escalation_triggers=[ + "pricing_change", + "exclusivity", + "equity", + "legal_terms", + "value_above_threshold", + "human_requested", + "stall_detected", + ], + max_auto_commitment_sar=50_000, + label_ar="تفاوض", + description_ar="الذكاء الاصطناعي يتفاوض ضمن الحدود المحددة — يصعّد عند الحاجة", + ), + OperatingMode.STRATEGIC: ModePolicy( + mode=OperatingMode.STRATEGIC.value, + allowed_channels=["email", "whatsapp"], + allowed_actions=[ + "analyze", + "suggest", + "draft", + "score_match", + "generate_report", + "craft_introduction", + "draft_proposal", + "send_template", + "send_follow_up", + "schedule_reminder", + "send_custom_message", + "handle_response", + "counter_offer", + "negotiate_terms", + "record_concession", + "request_approval", + "generate_term_sheet", + "run_discovery_scan", + "run_outreach_campaign", + ], + auto_send=True, + auto_negotiate=True, + escalation_triggers=[ + "commitment", + "exclusivity", + "equity", + "legal", + "data_sharing", + "ip_licensing", + "territory_change", + "value_above_threshold", + "human_requested", + ], + max_auto_commitment_sar=100_000, + label_ar="استراتيجي", + description_ar="سير عمل كامل مع تصعيد إلزامي لأي التزام مالي أو قانوني", + ), +} + + +# ── Mode Enforcer ─────────────────────────────────────────────────────────── + + +class ModeEnforcer: + """ + Enforces operating mode policies before any AI action is executed. + يفرض سياسات وضع التشغيل قبل تنفيذ أي إجراء للذكاء الاصطناعي + """ + + @staticmethod + async def check_action( + mode: OperatingMode, + action: str, + deal_value: float, + db: AsyncSession, + ) -> tuple[bool, str]: + """ + Check whether an action is allowed under the current operating mode. + Returns (allowed, reason_ar). + + التحقق مما إذا كان الإجراء مسموحاً في وضع التشغيل الحالي + يرجع (مسموح، السبب_بالعربي) + """ + policy = MODE_POLICIES.get(mode) + if not policy: + return False, f"وضع التشغيل غير معروف: {mode}" + + # Check if action is in allowed list + if action not in policy.allowed_actions: + mode_label = MODE_LABELS_AR.get(mode, str(mode)) + return False, ( + f"الإجراء '{action}' غير مسموح في وضع '{mode_label}'. " + f"الإجراءات المتاحة: {', '.join(policy.allowed_actions)}" + ) + + # Check if deal value exceeds auto-commitment threshold + if deal_value > 0 and deal_value > policy.max_auto_commitment_sar: + return False, ( + f"قيمة الصفقة ({deal_value:,.0f} ريال) تتجاوز الحد الأقصى للالتزام التلقائي " + f"({policy.max_auto_commitment_sar:,.0f} ريال). يلزم تصعيد للإنسان." + ) + + # Check escalation triggers + escalation_actions = { + "counter_offer": ["pricing_change"], + "negotiate_terms": ["pricing_change", "legal_terms"], + "send_custom_message": [], + "handle_response": ["reply_received"], + "generate_term_sheet": ["legal_terms", "commitment"], + "run_outreach_campaign": [], + } + + action_triggers = escalation_actions.get(action, []) + for trigger in action_triggers: + if trigger in policy.escalation_triggers: + if not policy.auto_negotiate: + mode_label = MODE_LABELS_AR.get(mode, str(mode)) + return False, ( + f"الإجراء '{action}' يستلزم تصعيداً بسبب: {trigger}. " + f"وضع '{mode_label}' لا يسمح بالتفاوض التلقائي." + ) + + logger.info( + "Action '%s' allowed in mode %s (deal_value=%.0f SAR)", + action, mode.name, deal_value, + ) + return True, "مسموح" + + @staticmethod + async def check_channel( + mode: OperatingMode, + channel: str, + ) -> tuple[bool, str]: + """ + Check whether a communication channel is allowed under the current mode. + التحقق مما إذا كانت قناة الاتصال مسموحة في الوضع الحالي + """ + policy = MODE_POLICIES.get(mode) + if not policy: + return False, f"وضع التشغيل غير معروف: {mode}" + + if not policy.allowed_channels: + mode_label = MODE_LABELS_AR.get(mode, str(mode)) + return False, f"وضع '{mode_label}' لا يسمح بأي قناة اتصال. الإرسال يتم يدوياً." + + if channel not in policy.allowed_channels: + mode_label = MODE_LABELS_AR.get(mode, str(mode)) + return False, ( + f"القناة '{channel}' غير مسموحة في وضع '{mode_label}'. " + f"القنوات المتاحة: {', '.join(policy.allowed_channels)}" + ) + + return True, "مسموح" + + @staticmethod + async def get_current_mode( + tenant_id: str, + db: AsyncSession, + ) -> OperatingMode: + """ + Get the current operating mode for a tenant. + الحصول على وضع التشغيل الحالي للمستأجر + """ + # Mode is stored in the tenant's first company profile deal_preferences + result = await db.execute( + select(CompanyProfile).where( + CompanyProfile.tenant_id == tenant_id + ).limit(1) + ) + profile = result.scalar_one_or_none() + if not profile: + logger.info("No profile found for tenant %s, defaulting to MANUAL", tenant_id) + return OperatingMode.MANUAL + + prefs = profile.deal_preferences or {} + mode_value = prefs.get("_operating_mode", OperatingMode.MANUAL.value) + + try: + return OperatingMode(mode_value) + except ValueError: + logger.warning("Invalid operating mode %s for tenant %s, defaulting to MANUAL", mode_value, tenant_id) + return OperatingMode.MANUAL + + @staticmethod + async def set_mode( + tenant_id: str, + mode: OperatingMode, + db: AsyncSession, + ): + """ + Set the operating mode for a tenant. + تعيين وضع التشغيل للمستأجر + """ + result = await db.execute( + select(CompanyProfile).where( + CompanyProfile.tenant_id == tenant_id + ).limit(1) + ) + profile = result.scalar_one_or_none() + if not profile: + raise ValueError(f"لا يوجد ملف شركة للمستأجر: {tenant_id}") + + prefs = dict(profile.deal_preferences or {}) + old_mode = prefs.get("_operating_mode", OperatingMode.MANUAL.value) + prefs["_operating_mode"] = mode.value + profile.deal_preferences = prefs + await db.flush() + + old_label = MODE_LABELS_AR.get(OperatingMode(old_mode), str(old_mode)) + new_label = MODE_LABELS_AR.get(mode, str(mode)) + logger.info( + "Operating mode for tenant %s changed: %s -> %s", + tenant_id, old_label, new_label, + ) + + @staticmethod + def get_mode_policy(mode: OperatingMode) -> ModePolicy: + """ + Get the policy for a specific operating mode. + الحصول على سياسة وضع تشغيل محدد + """ + policy = MODE_POLICIES.get(mode) + if not policy: + raise ValueError(f"وضع التشغيل غير معروف: {mode}") + return policy + + @staticmethod + def get_all_modes() -> list[dict]: + """ + List all operating modes with their labels and descriptions. + عرض جميع أوضاع التشغيل مع التسميات والأوصاف + """ + return [ + { + "mode": mode.value, + "name": mode.name, + "label_ar": MODE_LABELS_AR[mode], + "description_ar": MODE_DESCRIPTIONS_AR[mode], + "auto_send": MODE_POLICIES[mode].auto_send, + "auto_negotiate": MODE_POLICIES[mode].auto_negotiate, + "max_auto_commitment_sar": MODE_POLICIES[mode].max_auto_commitment_sar, + "allowed_channels": MODE_POLICIES[mode].allowed_channels, + } + for mode in OperatingMode + ] + + @staticmethod + async def should_escalate( + mode: OperatingMode, + trigger: str, + deal_value: float, + ) -> tuple[bool, str]: + """ + Determine if a specific trigger requires human escalation. + تحديد ما إذا كان المحفز يستلزم تصعيداً للإنسان + """ + policy = MODE_POLICIES.get(mode) + if not policy: + return True, "وضع التشغيل غير معروف — يجب التصعيد" + + # "all" trigger means everything escalates + if "all" in policy.escalation_triggers: + return True, f"وضع '{MODE_LABELS_AR.get(mode, '')}' يتطلب تصعيد كل الإجراءات" + + if trigger in policy.escalation_triggers: + return True, f"المحفز '{trigger}' يتطلب تصعيداً في وضع '{MODE_LABELS_AR.get(mode, '')}'" + + if deal_value > policy.max_auto_commitment_sar > 0: + return True, ( + f"قيمة الصفقة ({deal_value:,.0f} ريال) تتجاوز الحد ({policy.max_auto_commitment_sar:,.0f} ريال)" + ) + + return False, "لا يلزم تصعيد"