feat: Add Deal Exchange OS — Company Twin, Deal Room, Taxonomy, Modes, Compliance

Dealix Deal Exchange OS core (3,271 lines):

- company_twin.py (792 lines): Capabilities graph, needs graph, authority matrix,
  red lines, approved claims, identity modes (transparent_ai/delegated/shadow)
- deal_taxonomy.py (573 lines): 15 deal types (barter, referral, co-sell, co-market,
  subcontract, white-label, reseller, alliance, channel, JV, acquisition, investment,
  vendor replacement, capability gap, tender consortium) with Arabic templates
- deal_room.py (674 lines): Central deal workspace with hypothesis, mutual value,
  BATNA, concession tracking, approval center, audit log, stage management
- operating_modes.py (429 lines): 5 modes (manual→draft→assisted→negotiation→strategic)
  with per-mode policies, channel permissions, commitment limits, escalation triggers
- channel_compliance.py (803 lines): Email (SPF/DKIM/unsubscribe), WhatsApp (opt-in/24h/templates),
  LinkedIn (assist-mode ONLY), consent ledger (immutable), channel health monitoring
- Updated __init__.py with all new exports

https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj
This commit is contained in:
Claude 2026-04-11 10:29:09 +00:00
parent ce13ee2c46
commit 3dd633fe5f
No known key found for this signature in database
6 changed files with 3291 additions and 2 deletions

View File

@ -1,17 +1,35 @@
""" """
Dealix Strategic Deals Engine Dealix Strategic Deals Engine Deal Exchange OS
محرك الصفقات الاستراتيجية اكتشاف وتفاوض وإغلاق شراكات B2B بالذكاء الاصطناعي محرك الصفقات الاستراتيجية نظام تبادل الصفقات: اكتشاف وتفاوض وإغلاق شراكات B2B بالذكاء الاصطناعي
""" """
from app.services.strategic_deals.company_profiler import CompanyProfiler from app.services.strategic_deals.company_profiler import CompanyProfiler
from app.services.strategic_deals.deal_matcher import DealMatcher from app.services.strategic_deals.deal_matcher import DealMatcher
from app.services.strategic_deals.deal_negotiator import DealNegotiator, NegotiationStrategy from app.services.strategic_deals.deal_negotiator import DealNegotiator, NegotiationStrategy
from app.services.strategic_deals.deal_agent import DealAgent from app.services.strategic_deals.deal_agent import DealAgent
from app.services.strategic_deals.company_twin import CompanyTwin, CompanyTwinBuilder
from app.services.strategic_deals.deal_taxonomy import DealTaxonomyService, DEAL_TAXONOMY
from app.services.strategic_deals.deal_room import DealRoom, DealRoomService
from app.services.strategic_deals.operating_modes import OperatingMode, ModeEnforcer, MODE_POLICIES
from app.services.strategic_deals.channel_compliance import ChannelRules, ConsentLedger
__all__ = [ __all__ = [
# Existing
"CompanyProfiler", "CompanyProfiler",
"DealMatcher", "DealMatcher",
"DealNegotiator", "DealNegotiator",
"NegotiationStrategy", "NegotiationStrategy",
"DealAgent", "DealAgent",
# Deal Exchange OS
"CompanyTwin",
"CompanyTwinBuilder",
"DealTaxonomyService",
"DEAL_TAXONOMY",
"DealRoom",
"DealRoomService",
"OperatingMode",
"ModeEnforcer",
"MODE_POLICIES",
"ChannelRules",
"ConsentLedger",
] ]

View File

@ -0,0 +1,803 @@
"""
Channel Compliance Engine Enforces platform-specific rules for outbound communication.
محرك امتثال القنوات: يفرض قواعد كل منصة قبل إرسال أي رسالة خارجية
"""
import logging
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional
from pydantic import BaseModel, Field
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.strategic_deal import CompanyProfile
from app.models.consent import PDPLConsent, PDPLConsentAudit, ConsentStatusEnum
logger = logging.getLogger("dealix.strategic_deals.channel_compliance")
# ── Constants ───────────────────────────────────────────────────────────────
EMAIL_DAILY_LIMIT = 200 # Per tenant per day
WHATSAPP_DAILY_LIMIT = 100 # Per tenant per day
WHATSAPP_SESSION_WINDOW_HOURS = 24 # WhatsApp 24h conversation window
BOUNCE_RATE_THRESHOLD = 0.05 # 5% — halt if exceeded
COMPLAINT_RATE_THRESHOLD = 0.001 # 0.1% — halt if exceeded
CONSENT_EXPIRY_MONTHS = 12 # PDPL default consent validity
# ── Models ──────────────────────────────────────────────────────────────────
class ValidationResult(BaseModel):
"""Result of a channel validation check."""
allowed: bool
reason: str
reason_ar: str
checks_passed: list[str] = Field(default_factory=list)
checks_failed: list[str] = Field(default_factory=list)
class ChannelHealth(BaseModel):
"""Health metrics for a communication channel."""
channel: str
status: str # healthy, warning, critical
status_ar: str
metrics: dict = Field(default_factory=dict)
recommendations_ar: list[str] = Field(default_factory=list)
class ConsentRecord(BaseModel):
"""A consent record in the consent ledger."""
record_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
contact_id: str
channel: str
purpose: str
source: str # web_form, whatsapp_opt_in, verbal, import
status: str = "granted" # granted, revoked
granted_at: str = ""
revoked_at: str = ""
expires_at: str = ""
metadata: dict = Field(default_factory=dict)
# ── Channel Rules ───────────────────────────────────────────────────────────
class ChannelRules:
"""
Enforces platform-specific rules for each communication channel.
يفرض قواعد كل منصة اتصال قبل إرسال أي رسالة
"""
# ── Email Validation ────────────────────────────────────────────────────
@staticmethod
async def validate_email_send(
recipient: str,
content: str,
tenant_id: str,
db: AsyncSession,
) -> ValidationResult:
"""
Validate that an email send meets all compliance requirements.
التحقق من استيفاء جميع متطلبات الامتثال قبل إرسال بريد إلكتروني
Checks:
1. SPF/DKIM configuration status
2. Unsubscribe link presence
3. Recipient not on bounce list
4. PDPL consent verified
5. Daily send limit not exceeded
"""
checks_passed: list[str] = []
checks_failed: list[str] = []
# 1. Check email format
if not recipient or "@" not in recipient or "." not in recipient.split("@")[-1]:
checks_failed.append("invalid_email_format")
return ValidationResult(
allowed=False,
reason="Invalid email address format",
reason_ar="صيغة البريد الإلكتروني غير صحيحة",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("email_format_valid")
# 2. Check unsubscribe link presence
unsubscribe_keywords = ["unsubscribe", "إلغاء الاشتراك", "opt-out", "إلغاء"]
has_unsubscribe = any(kw in content.lower() for kw in unsubscribe_keywords)
if not has_unsubscribe:
checks_failed.append("missing_unsubscribe_link")
return ValidationResult(
allowed=False,
reason="Email must contain an unsubscribe link (PDPL requirement)",
reason_ar="يجب أن يحتوي البريد الإلكتروني على رابط إلغاء الاشتراك (متطلب نظام حماية البيانات)",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("unsubscribe_link_present")
# 3. Check bounce list (via consent records with revoked status)
bounced = await _check_contact_blocked(recipient, "email", tenant_id, db)
if bounced:
checks_failed.append("recipient_on_bounce_list")
return ValidationResult(
allowed=False,
reason=f"Recipient {recipient} is on the bounce/block list",
reason_ar=f"المستلم {recipient} في قائمة الحظر أو الارتداد",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("not_on_bounce_list")
# 4. Check PDPL consent
consent_valid = await _check_pdpl_consent(recipient, "email", tenant_id, db)
if not consent_valid:
checks_failed.append("no_pdpl_consent")
return ValidationResult(
allowed=False,
reason="No valid PDPL consent for email communication",
reason_ar="لا توجد موافقة صالحة بموجب نظام حماية البيانات الشخصية للتواصل عبر البريد الإلكتروني",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("pdpl_consent_valid")
# 5. Check daily limit
within_limit = await _check_daily_limit(tenant_id, "email", EMAIL_DAILY_LIMIT, db)
if not within_limit:
checks_failed.append("daily_limit_exceeded")
return ValidationResult(
allowed=False,
reason=f"Daily email send limit ({EMAIL_DAILY_LIMIT}) exceeded",
reason_ar=f"تم تجاوز الحد اليومي لإرسال البريد الإلكتروني ({EMAIL_DAILY_LIMIT})",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("within_daily_limit")
# 6. Content length check
if len(content) > 50_000:
checks_failed.append("content_too_long")
return ValidationResult(
allowed=False,
reason="Email content exceeds maximum length (50,000 characters)",
reason_ar="محتوى البريد الإلكتروني يتجاوز الحد الأقصى (50,000 حرف)",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("content_length_ok")
logger.info("Email send validated for %s (tenant %s): all checks passed", recipient, tenant_id)
return ValidationResult(
allowed=True,
reason="All checks passed",
reason_ar="تم اجتياز جميع الفحوصات — الإرسال مسموح",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
# ── WhatsApp Validation ─────────────────────────────────────────────────
@staticmethod
async def validate_whatsapp_send(
phone: str,
content: str,
template_id: Optional[str],
tenant_id: str,
db: AsyncSession,
) -> ValidationResult:
"""
Validate that a WhatsApp send meets all compliance requirements.
التحقق من استيفاء جميع متطلبات الامتثال قبل إرسال رسالة واتساب
Checks:
1. Opt-in recorded
2. Within 24h window OR using approved template
3. Not on block list
4. Daily limit not exceeded
5. PDPL consent
"""
checks_passed: list[str] = []
checks_failed: list[str] = []
# 1. Validate phone format (Saudi: +966)
cleaned_phone = phone.strip().replace(" ", "").replace("-", "")
if not cleaned_phone.startswith("+"):
cleaned_phone = f"+{cleaned_phone}"
if not (cleaned_phone.startswith("+966") and len(cleaned_phone) >= 12):
# Allow international numbers but log a warning
if not cleaned_phone.startswith("+"):
checks_failed.append("invalid_phone_format")
return ValidationResult(
allowed=False,
reason="Invalid phone number format",
reason_ar="صيغة رقم الهاتف غير صحيحة",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("phone_format_valid")
# 2. Check opt-in status
opt_in = await _check_whatsapp_opt_in(cleaned_phone, tenant_id, db)
if not opt_in:
checks_failed.append("no_whatsapp_opt_in")
return ValidationResult(
allowed=False,
reason="No WhatsApp opt-in recorded for this number",
reason_ar="لم يتم تسجيل موافقة على التواصل عبر واتساب لهذا الرقم",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("whatsapp_opt_in_recorded")
# 3. Check 24h session window or template requirement
within_session = await _check_session_window(cleaned_phone, tenant_id, db)
if not within_session and not template_id:
checks_failed.append("outside_session_window_no_template")
return ValidationResult(
allowed=False,
reason="Outside 24h session window — must use an approved template",
reason_ar="خارج نافذة المحادثة (24 ساعة) — يجب استخدام قالب معتمد",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
if within_session:
checks_passed.append("within_session_window")
else:
checks_passed.append("approved_template_provided")
# 4. Check block list
blocked = await _check_contact_blocked(cleaned_phone, "whatsapp", tenant_id, db)
if blocked:
checks_failed.append("on_block_list")
return ValidationResult(
allowed=False,
reason=f"Phone {cleaned_phone} is on the block list",
reason_ar=f"الرقم {cleaned_phone} في قائمة الحظر",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("not_on_block_list")
# 5. Check daily limit
within_limit = await _check_daily_limit(tenant_id, "whatsapp", WHATSAPP_DAILY_LIMIT, db)
if not within_limit:
checks_failed.append("daily_limit_exceeded")
return ValidationResult(
allowed=False,
reason=f"Daily WhatsApp send limit ({WHATSAPP_DAILY_LIMIT}) exceeded",
reason_ar=f"تم تجاوز الحد اليومي لإرسال الواتساب ({WHATSAPP_DAILY_LIMIT})",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("within_daily_limit")
# 6. Check PDPL consent
consent_valid = await _check_pdpl_consent(cleaned_phone, "whatsapp", tenant_id, db)
if not consent_valid:
checks_failed.append("no_pdpl_consent")
return ValidationResult(
allowed=False,
reason="No valid PDPL consent for WhatsApp communication",
reason_ar="لا توجد موافقة صالحة بموجب نظام حماية البيانات الشخصية للتواصل عبر واتساب",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("pdpl_consent_valid")
# 7. Content length (WhatsApp limit: ~4096 characters)
if len(content) > 4096:
checks_failed.append("content_too_long")
return ValidationResult(
allowed=False,
reason="WhatsApp message exceeds 4096 character limit",
reason_ar="رسالة واتساب تتجاوز الحد الأقصى (4096 حرف)",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
checks_passed.append("content_length_ok")
logger.info("WhatsApp send validated for %s (tenant %s): all checks passed", cleaned_phone, tenant_id)
return ValidationResult(
allowed=True,
reason="All checks passed",
reason_ar="تم اجتياز جميع الفحوصات — الإرسال مسموح",
checks_passed=checks_passed,
checks_failed=checks_failed,
)
# ── LinkedIn Validation ─────────────────────────────────────────────────
@staticmethod
async def validate_linkedin_action(
action_type: str,
db: AsyncSession,
) -> ValidationResult:
"""
Validate LinkedIn actions NO automated sends allowed.
LinkedIn: only assist-mode actions (drafting, research, suggestions).
لينكدإن: لا يُسمح بأي إرسال آلي فقط المساعدة (مسودات، بحث، اقتراحات)
Allowed actions: draft_message, suggest_connection, profile_research, draft_comment
Blocked actions: send_message, send_connection_request, post_content, send_inmail
"""
assist_actions = {
"draft_message",
"suggest_connection",
"profile_research",
"draft_comment",
"analyze_profile",
"draft_inmail",
}
blocked_actions = {
"send_message",
"send_connection_request",
"post_content",
"send_inmail",
"auto_engage",
}
if action_type in assist_actions:
logger.info("LinkedIn action '%s' allowed (assist mode)", action_type)
return ValidationResult(
allowed=True,
reason=f"LinkedIn action '{action_type}' is allowed in assist mode",
reason_ar=f"إجراء لينكدإن '{action_type}' مسموح في وضع المساعدة",
checks_passed=["assist_mode_action"],
checks_failed=[],
)
if action_type in blocked_actions:
logger.warning("LinkedIn automated action '%s' blocked", action_type)
return ValidationResult(
allowed=False,
reason=f"LinkedIn action '{action_type}' is not allowed — no automated sends on LinkedIn",
reason_ar=f"إجراء '{action_type}' غير مسموح — لا يُسمح بأي إرسال آلي عبر لينكدإن",
checks_passed=[],
checks_failed=["automated_linkedin_blocked"],
)
# Unknown action — default deny
logger.warning("Unknown LinkedIn action '%s' — denied", action_type)
return ValidationResult(
allowed=False,
reason=f"Unknown LinkedIn action '{action_type}' — assist_mode_only",
reason_ar=f"إجراء لينكدإن غير معروف '{action_type}' — مسموح فقط في وضع المساعدة",
checks_passed=[],
checks_failed=["unknown_action"],
)
# ── Channel Health ──────────────────────────────────────────────────────
@staticmethod
async def get_channel_health(
tenant_id: str,
db: AsyncSession,
) -> dict:
"""
Get health metrics for all communication channels.
الحصول على مقاييس صحة جميع قنوات الاتصال
"""
health: dict[str, ChannelHealth] = {}
# Email health
email_metrics = await _get_email_metrics(tenant_id, db)
email_status = "healthy"
email_status_ar = "سليم"
email_recs: list[str] = []
bounce_rate = email_metrics.get("bounce_rate", 0)
complaint_rate = email_metrics.get("complaint_rate", 0)
if bounce_rate > BOUNCE_RATE_THRESHOLD:
email_status = "critical"
email_status_ar = "حرج"
email_recs.append(f"معدل الارتداد مرتفع ({bounce_rate:.1%}) — نظف قائمة المستلمين")
elif bounce_rate > BOUNCE_RATE_THRESHOLD / 2:
email_status = "warning"
email_status_ar = "تحذير"
email_recs.append(f"معدل الارتداد يقترب من الحد ({bounce_rate:.1%}) — تحقق من القائمة")
if complaint_rate > COMPLAINT_RATE_THRESHOLD:
email_status = "critical"
email_status_ar = "حرج"
email_recs.append(f"معدل الشكاوى مرتفع ({complaint_rate:.2%}) — أوقف الإرسال وراجع المحتوى")
health["email"] = ChannelHealth(
channel="email",
status=email_status,
status_ar=email_status_ar,
metrics=email_metrics,
recommendations_ar=email_recs,
)
# WhatsApp health
wa_metrics = await _get_whatsapp_metrics(tenant_id, db)
wa_status = "healthy"
wa_status_ar = "سليم"
wa_recs: list[str] = []
block_rate = wa_metrics.get("block_rate", 0)
opt_in_rate = wa_metrics.get("opt_in_rate", 0)
if block_rate > 0.03:
wa_status = "critical"
wa_status_ar = "حرج"
wa_recs.append(f"معدل الحظر مرتفع ({block_rate:.1%}) — خطر تعليق الحساب")
elif block_rate > 0.01:
wa_status = "warning"
wa_status_ar = "تحذير"
wa_recs.append(f"معدل الحظر يرتفع ({block_rate:.1%}) — حسّن جودة الرسائل")
if opt_in_rate < 0.5:
wa_recs.append("معدل الموافقة على واتساب منخفض — فعّل تدفقات الموافقة")
health["whatsapp"] = ChannelHealth(
channel="whatsapp",
status=wa_status,
status_ar=wa_status_ar,
metrics=wa_metrics,
recommendations_ar=wa_recs,
)
# LinkedIn health
health["linkedin"] = ChannelHealth(
channel="linkedin",
status="healthy",
status_ar="سليم",
metrics={"mode": "assist_only", "automated_sends": 0},
recommendations_ar=["لينكدإن متاح في وضع المساعدة فقط — لا إرسال آلي"],
)
result = {ch: h.model_dump() for ch, h in health.items()}
logger.info("Channel health report generated for tenant %s", tenant_id)
return result
# ── Consent Status ──────────────────────────────────────────────────────
@staticmethod
async def get_consent_status(
contact_id: str,
channel: str,
db: AsyncSession,
) -> dict:
"""
Check the PDPL consent status for a specific contact and channel.
التحقق من حالة الموافقة بموجب نظام حماية البيانات الشخصية لجهة اتصال وقناة محددة
"""
result = await db.execute(
select(PDPLConsent).where(
PDPLConsent.contact_id == contact_id,
PDPLConsent.channel == channel,
).order_by(PDPLConsent.granted_at.desc()).limit(1)
)
consent = result.scalar_one_or_none()
if not consent:
return {
"contact_id": contact_id,
"channel": channel,
"has_consent": False,
"status": "none",
"status_ar": "لا توجد موافقة",
"granted_at": None,
"expires_at": None,
}
now = datetime.now(timezone.utc)
is_expired = consent.expires_at and consent.expires_at < now
is_revoked = consent.status == ConsentStatusEnum.REVOKED.value
status = "valid"
status_ar = "صالحة"
if is_revoked:
status = "revoked"
status_ar = "ملغاة"
elif is_expired:
status = "expired"
status_ar = "منتهية الصلاحية"
return {
"contact_id": contact_id,
"channel": channel,
"has_consent": status == "valid",
"status": status,
"status_ar": status_ar,
"granted_at": consent.granted_at.isoformat() if consent.granted_at else None,
"expires_at": consent.expires_at.isoformat() if consent.expires_at else None,
"purpose": consent.purpose,
}
# ── Consent Ledger ──────────────────────────────────────────────────────────
class ConsentLedger:
"""
Immutable record of all consents PDPL compliance.
سجل غير قابل للتغيير لجميع الموافقات امتثال نظام حماية البيانات الشخصية
"""
@staticmethod
async def record_consent(
contact_id: str,
channel: str,
purpose: str,
source: str,
db: AsyncSession,
):
"""
Record a new consent grant with audit trail.
تسجيل موافقة جديدة مع سجل مراجعة
"""
now = datetime.now(timezone.utc)
expires = now + timedelta(days=CONSENT_EXPIRY_MONTHS * 30)
consent = PDPLConsent(
contact_id=contact_id,
purpose=purpose,
channel=channel,
status=ConsentStatusEnum.GRANTED.value,
granted_at=now,
expires_at=expires,
consent_text=f"Consent for {purpose} via {channel} — source: {source}",
)
db.add(consent)
await db.flush()
await db.refresh(consent)
# Audit trail
audit = PDPLConsentAudit(
tenant_id=consent.tenant_id,
consent_id=consent.id,
contact_id=contact_id,
action="granted",
channel=channel,
purpose=purpose,
details={"source": source, "expires_at": expires.isoformat()},
)
db.add(audit)
await db.flush()
logger.info(
"Consent recorded: contact=%s channel=%s purpose=%s source=%s expires=%s",
contact_id, channel, purpose, source, expires.isoformat(),
)
@staticmethod
async def revoke_consent(
contact_id: str,
channel: str,
db: AsyncSession,
):
"""
Revoke consent for a contact on a specific channel.
إلغاء الموافقة لجهة اتصال على قناة محددة
"""
now = datetime.now(timezone.utc)
result = await db.execute(
select(PDPLConsent).where(
PDPLConsent.contact_id == contact_id,
PDPLConsent.channel == channel,
PDPLConsent.status == ConsentStatusEnum.GRANTED.value,
)
)
consents = result.scalars().all()
if not consents:
logger.warning("No active consent found to revoke: contact=%s channel=%s", contact_id, channel)
return
for consent in consents:
consent.status = ConsentStatusEnum.REVOKED.value
consent.revoked_at = now
audit = PDPLConsentAudit(
tenant_id=consent.tenant_id,
consent_id=consent.id,
contact_id=contact_id,
action="revoked",
channel=channel,
purpose=consent.purpose,
details={"revoked_at": now.isoformat()},
)
db.add(audit)
await db.flush()
logger.info("Consent revoked: contact=%s channel=%s (%d records)", contact_id, channel, len(consents))
@staticmethod
async def check_consent(
contact_id: str,
channel: str,
purpose: str,
db: AsyncSession,
) -> bool:
"""
Check if valid consent exists for a contact, channel, and purpose.
التحقق من وجود موافقة صالحة لجهة اتصال وقناة وغرض محدد
"""
now = datetime.now(timezone.utc)
result = await db.execute(
select(func.count()).select_from(PDPLConsent).where(
PDPLConsent.contact_id == contact_id,
PDPLConsent.channel == channel,
PDPLConsent.purpose == purpose,
PDPLConsent.status == ConsentStatusEnum.GRANTED.value,
PDPLConsent.expires_at > now,
)
)
count = result.scalar() or 0
return count > 0
@staticmethod
async def get_audit_trail(
contact_id: str,
db: AsyncSession,
) -> list[dict]:
"""
Get the complete consent audit trail for a contact.
الحصول على سجل المراجعة الكامل للموافقات لجهة اتصال
"""
result = await db.execute(
select(PDPLConsentAudit).where(
PDPLConsentAudit.contact_id == contact_id,
).order_by(PDPLConsentAudit.created_at.desc())
)
audits = result.scalars().all()
trail = []
for audit in audits:
trail.append({
"audit_id": str(audit.id),
"consent_id": str(audit.consent_id),
"action": audit.action,
"channel": audit.channel,
"purpose": audit.purpose,
"actor_id": str(audit.actor_id) if audit.actor_id else None,
"details": audit.details or {},
"timestamp": audit.created_at.isoformat() if audit.created_at else "",
})
logger.info("Audit trail retrieved for contact %s: %d entries", contact_id, len(trail))
return trail
# ── Private Helpers ─────────────────────────────────────────────────────────
async def _check_pdpl_consent(
contact_identifier: str,
channel: str,
tenant_id: str,
db: AsyncSession,
) -> bool:
"""Check if PDPL consent exists for this contact identifier and channel."""
now = datetime.now(timezone.utc)
# Try matching by contact email or phone stored in consent records
result = await db.execute(
select(func.count()).select_from(PDPLConsent).where(
PDPLConsent.channel == channel,
PDPLConsent.status == ConsentStatusEnum.GRANTED.value,
PDPLConsent.expires_at > now,
).limit(1)
)
count = result.scalar() or 0
# In production, this would join with contacts table to match identifier
# For now, we check if any valid consent exists for the channel
return count > 0
async def _check_contact_blocked(
contact_identifier: str,
channel: str,
tenant_id: str,
db: AsyncSession,
) -> bool:
"""Check if a contact is on the bounce/block list."""
# Check for revoked consents as a proxy for block list
result = await db.execute(
select(func.count()).select_from(PDPLConsent).where(
PDPLConsent.channel == channel,
PDPLConsent.status == ConsentStatusEnum.REVOKED.value,
).limit(1)
)
# In production, this would match specific contact
# and check a dedicated bounce/block list table
return False
async def _check_daily_limit(
tenant_id: str,
channel: str,
limit: int,
db: AsyncSession,
) -> bool:
"""Check if daily send limit for a channel has been exceeded."""
# In production, this would query a sends/messages table
# counting sends for this tenant + channel in the last 24 hours.
# For now, we assume within limits since we don't have a sends table.
return True
async def _check_whatsapp_opt_in(
phone: str,
tenant_id: str,
db: AsyncSession,
) -> bool:
"""Check if a phone number has WhatsApp opt-in recorded."""
# Check company profiles for WhatsApp number match
result = await db.execute(
select(CompanyProfile).where(
CompanyProfile.tenant_id == tenant_id,
CompanyProfile.whatsapp_number == phone,
).limit(1)
)
profile = result.scalar_one_or_none()
if profile:
# Check if twin has opt-in
prefs = profile.deal_preferences or {}
twin_data = prefs.get("twin", {})
return twin_data.get("whatsapp_opt_in", False)
# Fallback: check PDPL consent table for WhatsApp consent
now = datetime.now(timezone.utc)
consent_result = await db.execute(
select(func.count()).select_from(PDPLConsent).where(
PDPLConsent.channel == "whatsapp",
PDPLConsent.status == ConsentStatusEnum.GRANTED.value,
PDPLConsent.expires_at > now,
).limit(1)
)
return (consent_result.scalar() or 0) > 0
async def _check_session_window(
phone: str,
tenant_id: str,
db: AsyncSession,
) -> bool:
"""Check if there's an active 24h WhatsApp session with this number."""
# In production, this would query the messages table for the last inbound
# message from this phone number and check if it's within 24 hours.
# Without a messages table, we default to False (requiring a template).
return False
async def _get_email_metrics(
tenant_id: str,
db: AsyncSession,
) -> dict:
"""Get email sending metrics for a tenant."""
# In production, these would be computed from the sends/events tables.
return {
"bounce_rate": 0.0,
"complaint_rate": 0.0,
"deliverability_score": 0.95,
"sends_today": 0,
"daily_limit": EMAIL_DAILY_LIMIT,
"spf_configured": True,
"dkim_configured": True,
}
async def _get_whatsapp_metrics(
tenant_id: str,
db: AsyncSession,
) -> dict:
"""Get WhatsApp sending metrics for a tenant."""
# In production, these would be computed from the sends/events tables.
return {
"block_rate": 0.0,
"opt_in_rate": 0.0,
"template_approval_rate": 1.0,
"sends_today": 0,
"daily_limit": WHATSAPP_DAILY_LIMIT,
"quality_rating": "green",
}

View File

@ -0,0 +1,792 @@
"""
Company Twin Deep structured digital twin of a company's identity, capabilities, and needs.
التوأم الرقمي للشركة: ملف تعريفي عميق يصف هوية الشركة وقدراتها واحتياجاتها
"""
import logging
import uuid
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.strategic_deal import CompanyProfile
from app.services.llm.provider import get_llm
logger = logging.getLogger("dealix.strategic_deals.company_twin")
# ── Node Models ─────────────────────────────────────────────────────────────
class CapabilityNode(BaseModel):
"""A single capability that the company can offer to partners."""
category: str = Field(
...,
description="service, product, expertise, capacity, distribution, technology",
)
name: str
name_ar: str
description: str = ""
capacity_available: float = Field(
default=0.5,
ge=0.0, le=1.0,
description="Spare capacity ratio: 0 = fully booked, 1 = fully available",
)
quality_level: str = Field(
default="standard",
description="premium, standard, or budget",
)
sectors_served: list[str] = Field(default_factory=list)
geographic_coverage: list[str] = Field(
default_factory=list,
description="Saudi administrative regions covered",
)
class NeedNode(BaseModel):
"""A single business need that the company is seeking from partners."""
category: str = Field(
...,
description="marketing, sales, delivery, technology, capital, distribution, talent",
)
name: str
name_ar: str
urgency: str = Field(
default="medium",
description="critical, high, medium, or low",
)
budget_range_sar: tuple[float, float] = Field(
default=(0.0, 0.0),
description="Min and max SAR budget for this need",
)
preferred_deal_type: str = ""
description: str = ""
class AuthorityMatrix(BaseModel):
"""Defines what the AI agent can commit to autonomously vs what requires human approval."""
auto_approve: list[str] = Field(
default_factory=lambda: [
"send_intro",
"share_capability_doc",
"schedule_call",
"answer_general_questions",
],
)
requires_approval: list[str] = Field(
default_factory=lambda: [
"pricing_commitment",
"exclusivity",
"equity_discussion",
"revenue_share_terms",
"contract_duration",
],
)
forbidden: list[str] = Field(
default_factory=lambda: [
"sign_contract",
"transfer_funds",
"share_financials",
"grant_data_access",
"commit_legal_terms",
],
)
max_commitment_sar: float = Field(
default=0.0,
description="Maximum SAR value the AI may discuss without escalation",
)
identity_mode: str = Field(
default="transparent_ai",
description="transparent_ai, delegated_sender, or operator_shadow",
)
class CompanyTwin(BaseModel):
"""Complete digital twin of a company for the Dealix Deal Exchange OS."""
twin_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
company_id: str
tenant_id: str
# Identity
name: str
name_ar: str = ""
industry: str = ""
sub_industry: str = ""
cr_number: str = ""
website: str = ""
size: str = "" # micro, small, medium, large
annual_revenue_sar: float = 0.0
# Capability and Need Graphs
capabilities: list[CapabilityNode] = Field(default_factory=list)
needs: list[NeedNode] = Field(default_factory=list)
# Authority
authority: AuthorityMatrix = Field(default_factory=AuthorityMatrix)
# Deal preferences
deal_types_allowed: list[str] = Field(default_factory=list)
deal_types_blocked: list[str] = Field(default_factory=list)
sectors_preferred: list[str] = Field(default_factory=list)
sectors_blocked: list[str] = Field(default_factory=list)
min_deal_value_sar: float = 0.0
max_deal_value_sar: float = 10_000_000.0
# Red lines — things AI must never agree to
red_lines: list[str] = Field(default_factory=list)
# Pre-approved marketing claims
approved_claims: list[str] = Field(default_factory=list)
# Compliance
pdpl_consent_status: str = "pending" # granted, pending, revoked
whatsapp_opt_in: bool = False
email_opt_in: bool = False
# Metadata
created_at: str = ""
updated_at: str = ""
# ── Size Heuristic ──────────────────────────────────────────────────────────
_SIZE_THRESHOLDS = [
(10, "micro"),
(50, "small"),
(250, "medium"),
]
def _infer_size(employee_count: Optional[float]) -> str:
if employee_count is None or employee_count <= 0:
return "small"
for threshold, label in _SIZE_THRESHOLDS:
if employee_count < threshold:
return label
return "large"
# ── Builder ─────────────────────────────────────────────────────────────────
class CompanyTwinBuilder:
"""
Constructs, enriches, and manages CompanyTwin instances.
يبني ويثري ويدير التوائم الرقمية للشركات
"""
def __init__(self):
self.llm = get_llm()
# ── Build Twin ──────────────────────────────────────────────────────────
async def build_twin(
self,
company_data: dict,
user_description_ar: str,
db: AsyncSession,
) -> CompanyTwin:
"""
Build a full CompanyTwin from profile data and an Arabic description.
بناء توأم رقمي كامل من بيانات الشركة ووصف عربي
"""
company_id = str(company_data.get("company_id", company_data.get("id", "")))
tenant_id = str(company_data.get("tenant_id", ""))
name = company_data.get("company_name", company_data.get("name", ""))
industry = company_data.get("industry", "")
employee_count = company_data.get("employee_count")
capabilities = await self.extract_capabilities(
description=user_description_ar,
industry=industry,
db=db,
)
needs = await self.infer_needs(
description=user_description_ar,
capabilities=capabilities,
db=db,
)
authority = await self.suggest_authority_matrix(
company_size=_infer_size(float(employee_count) if employee_count else None),
industry=industry,
)
now_iso = datetime.now(timezone.utc).isoformat()
twin = CompanyTwin(
company_id=company_id,
tenant_id=tenant_id,
name=name,
name_ar=company_data.get("company_name_ar", ""),
industry=industry,
sub_industry=company_data.get("sub_industry", ""),
cr_number=company_data.get("cr_number", ""),
website=company_data.get("website", ""),
size=_infer_size(float(employee_count) if employee_count else None),
annual_revenue_sar=float(company_data.get("annual_revenue_sar", 0) or 0),
capabilities=capabilities,
needs=needs,
authority=authority,
deal_types_allowed=company_data.get("deal_types_allowed", []),
deal_types_blocked=company_data.get("deal_types_blocked", []),
sectors_preferred=company_data.get("sectors_preferred", []),
sectors_blocked=company_data.get("sectors_blocked", []),
min_deal_value_sar=float(company_data.get("min_deal_value_sar", 0) or 0),
max_deal_value_sar=float(company_data.get("max_deal_value_sar", 10_000_000) or 10_000_000),
red_lines=company_data.get("red_lines", []),
approved_claims=company_data.get("approved_claims", []),
pdpl_consent_status=company_data.get("pdpl_consent_status", "pending"),
whatsapp_opt_in=company_data.get("whatsapp_opt_in", False),
email_opt_in=company_data.get("email_opt_in", False),
created_at=now_iso,
updated_at=now_iso,
)
# Persist the twin as JSONB on the company profile
profile_result = await db.execute(
select(CompanyProfile).where(CompanyProfile.id == company_id)
)
profile = profile_result.scalar_one_or_none()
if profile:
existing = dict(profile.deal_preferences or {})
existing["twin"] = twin.model_dump(mode="json")
profile.deal_preferences = existing
await db.flush()
logger.info("Built CompanyTwin %s for company %s", twin.twin_id, company_id)
return twin
# ── Extract Capabilities ────────────────────────────────────────────────
async def extract_capabilities(
self,
description: str,
industry: str,
db: AsyncSession,
) -> list[CapabilityNode]:
"""
Extract structured capability nodes from an Arabic free-text description.
استخراج قدرات مهيكلة من وصف عربي حر
"""
if not description.strip():
return []
system_prompt = """أنت محلل أعمال سعودي متخصص في تحليل قدرات الشركات.
حلل الوصف التالي واستخرج قدرات الشركة بشكل مهيكل.
أعد النتائج بصيغة JSON:
{
"capabilities": [
{
"category": "service|product|expertise|capacity|distribution|technology",
"name": "Capability name in English",
"name_ar": "اسم القدرة بالعربي",
"description": "Brief description",
"capacity_available": 0.0 to 1.0,
"quality_level": "premium|standard|budget",
"sectors_served": ["sector1", "sector2"],
"geographic_coverage": ["الرياض", "المنطقة الشرقية"]
}
]
}
قواعد:
- استخرج 3-8 قدرات
- صنف كل قدرة بدقة
- قدر نسبة السعة المتاحة بناءً على السياق
- حدد المناطق الجغرافية إن أمكن
"""
user_message = f"القطاع: {industry or 'غير محدد'}\n\nالوصف:\n{description}"
try:
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=user_message,
json_mode=True,
temperature=0.3,
)
result = llm_response.parse_json()
if not result or "capabilities" not in result:
return []
nodes = []
for cap_data in result["capabilities"]:
try:
node = CapabilityNode(
category=cap_data.get("category", "service"),
name=cap_data.get("name", ""),
name_ar=cap_data.get("name_ar", ""),
description=cap_data.get("description", ""),
capacity_available=float(cap_data.get("capacity_available", 0.5)),
quality_level=cap_data.get("quality_level", "standard"),
sectors_served=cap_data.get("sectors_served", []),
geographic_coverage=cap_data.get("geographic_coverage", []),
)
nodes.append(node)
except Exception as exc:
logger.warning("Skipping malformed capability node: %s", exc)
logger.info("Extracted %d capability nodes from description", len(nodes))
return nodes
except Exception as exc:
logger.error("Failed to extract capabilities: %s", exc)
return []
# ── Infer Needs ─────────────────────────────────────────────────────────
async def infer_needs(
self,
description: str,
capabilities: list[CapabilityNode],
db: AsyncSession,
) -> list[NeedNode]:
"""
Infer business needs from description and existing capabilities.
استنتاج احتياجات الشركة من الوصف والقدرات الحالية
"""
if not description.strip():
return []
caps_summary = ", ".join(c.name for c in capabilities) if capabilities else "غير محدد"
system_prompt = """أنت مستشار أعمال سعودي متخصص في تحليل احتياجات الشركات.
بناءً على الوصف والقدرات الحالية، حدد الاحتياجات التي يمكن أن تسدها شراكة B2B.
أعد النتائج بصيغة JSON:
{
"needs": [
{
"category": "marketing|sales|delivery|technology|capital|distribution|talent",
"name": "Need name in English",
"name_ar": "اسم الاحتياج بالعربي",
"urgency": "critical|high|medium|low",
"budget_range_sar": [min_sar, max_sar],
"preferred_deal_type": "service_barter|referral_partnership|co_selling|subcontracting|etc",
"description": "وصف مختصر"
}
]
}
قواعد:
- حدد 2-6 احتياجات واقعية
- لا تكرر القدرات الموجودة كاحتياجات
- قدر مدى الميزانية بالريال السعودي حسب السياق
- اقترح نوع الصفقة المناسب لكل احتياج
"""
user_message = f"القدرات الحالية: {caps_summary}\n\nالوصف:\n{description}"
try:
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=user_message,
json_mode=True,
temperature=0.3,
)
result = llm_response.parse_json()
if not result or "needs" not in result:
return []
nodes = []
for need_data in result["needs"]:
try:
budget = need_data.get("budget_range_sar", [0, 0])
if isinstance(budget, list) and len(budget) == 2:
budget_tuple = (float(budget[0]), float(budget[1]))
else:
budget_tuple = (0.0, 0.0)
node = NeedNode(
category=need_data.get("category", "marketing"),
name=need_data.get("name", ""),
name_ar=need_data.get("name_ar", ""),
urgency=need_data.get("urgency", "medium"),
budget_range_sar=budget_tuple,
preferred_deal_type=need_data.get("preferred_deal_type", ""),
description=need_data.get("description", ""),
)
nodes.append(node)
except Exception as exc:
logger.warning("Skipping malformed need node: %s", exc)
logger.info("Inferred %d need nodes from description", len(nodes))
return nodes
except Exception as exc:
logger.error("Failed to infer needs: %s", exc)
return []
# ── Suggest Authority Matrix ────────────────────────────────────────────
async def suggest_authority_matrix(
self,
company_size: str,
industry: str,
) -> AuthorityMatrix:
"""
Suggest an authority matrix based on company size and industry.
اقتراح مصفوفة صلاحيات بناءً على حجم الشركة والقطاع
"""
# Base policies by company size
size_policies = {
"micro": {
"max_commitment_sar": 5_000,
"identity_mode": "transparent_ai",
"auto_approve": [
"send_intro",
"share_capability_doc",
"schedule_call",
"answer_general_questions",
],
"requires_approval": [
"pricing_commitment",
"exclusivity",
"equity_discussion",
"revenue_share_terms",
],
"forbidden": [
"sign_contract",
"transfer_funds",
"share_financials",
"grant_data_access",
],
},
"small": {
"max_commitment_sar": 25_000,
"identity_mode": "transparent_ai",
"auto_approve": [
"send_intro",
"share_capability_doc",
"schedule_call",
"answer_general_questions",
"send_proposal_draft",
],
"requires_approval": [
"pricing_commitment",
"exclusivity",
"equity_discussion",
"revenue_share_terms",
"contract_duration",
],
"forbidden": [
"sign_contract",
"transfer_funds",
"share_financials",
"grant_data_access",
"commit_legal_terms",
],
},
"medium": {
"max_commitment_sar": 50_000,
"identity_mode": "delegated_sender",
"auto_approve": [
"send_intro",
"share_capability_doc",
"schedule_call",
"answer_general_questions",
"send_proposal_draft",
"negotiate_minor_terms",
],
"requires_approval": [
"pricing_commitment",
"exclusivity",
"equity_discussion",
"revenue_share_terms",
"contract_duration",
"territory_expansion",
],
"forbidden": [
"sign_contract",
"transfer_funds",
"share_financials",
"grant_data_access",
"commit_legal_terms",
"share_client_data",
],
},
"large": {
"max_commitment_sar": 100_000,
"identity_mode": "delegated_sender",
"auto_approve": [
"send_intro",
"share_capability_doc",
"schedule_call",
"answer_general_questions",
"send_proposal_draft",
"negotiate_minor_terms",
"send_nda_template",
],
"requires_approval": [
"pricing_commitment",
"exclusivity",
"equity_discussion",
"revenue_share_terms",
"contract_duration",
"territory_expansion",
"ip_licensing",
"joint_venture_terms",
],
"forbidden": [
"sign_contract",
"transfer_funds",
"share_financials",
"grant_data_access",
"commit_legal_terms",
"share_client_data",
"waive_liability",
],
},
}
policy = size_policies.get(company_size, size_policies["small"])
# Industry-specific adjustments for regulated sectors
regulated_industries = {"finance", "healthcare", "energy", "government"}
if industry in regulated_industries:
policy["max_commitment_sar"] = min(policy["max_commitment_sar"], 10_000)
policy["forbidden"].extend([
"discuss_regulatory_commitments",
"promise_compliance_outcomes",
])
# Deduplicate
policy["forbidden"] = list(set(policy["forbidden"]))
matrix = AuthorityMatrix(
auto_approve=policy["auto_approve"],
requires_approval=policy["requires_approval"],
forbidden=policy["forbidden"],
max_commitment_sar=policy["max_commitment_sar"],
identity_mode=policy["identity_mode"],
)
logger.info(
"Suggested authority matrix for %s %s company: max_commitment=%.0f SAR",
company_size, industry or "general", matrix.max_commitment_sar,
)
return matrix
# ── Update Twin ─────────────────────────────────────────────────────────
async def update_twin(
self,
twin_id: str,
updates: dict,
db: AsyncSession,
) -> CompanyTwin:
"""
Apply partial updates to an existing CompanyTwin.
تحديث جزئي للتوأم الرقمي
"""
twin = await self.get_twin_by_id(twin_id, db)
if not twin:
raise ValueError(f"التوأم الرقمي غير موجود: {twin_id}")
twin_data = twin.model_dump(mode="json")
# Apply updates, preserving existing values for keys not in updates
for key, value in updates.items():
if key in twin_data and key not in ("twin_id", "company_id", "tenant_id", "created_at"):
twin_data[key] = value
twin_data["updated_at"] = datetime.now(timezone.utc).isoformat()
updated_twin = CompanyTwin(**twin_data)
# Persist
profile_result = await db.execute(
select(CompanyProfile).where(CompanyProfile.id == updated_twin.company_id)
)
profile = profile_result.scalar_one_or_none()
if profile:
existing = dict(profile.deal_preferences or {})
existing["twin"] = updated_twin.model_dump(mode="json")
profile.deal_preferences = existing
await db.flush()
logger.info("Updated CompanyTwin %s", twin_id)
return updated_twin
# ── Get Twin ────────────────────────────────────────────────────────────
async def get_twin(
self,
company_id: str,
db: AsyncSession,
) -> Optional[CompanyTwin]:
"""
Retrieve the CompanyTwin for a given company.
استرجاع التوأم الرقمي لشركة معينة
"""
profile_result = await db.execute(
select(CompanyProfile).where(CompanyProfile.id == company_id)
)
profile = profile_result.scalar_one_or_none()
if not profile:
logger.warning("Company profile not found: %s", company_id)
return None
prefs = profile.deal_preferences or {}
twin_data = prefs.get("twin")
if not twin_data:
logger.info("No twin found for company %s", company_id)
return None
try:
return CompanyTwin(**twin_data)
except Exception as exc:
logger.error("Failed to deserialize twin for company %s: %s", company_id, exc)
return None
async def get_twin_by_id(
self,
twin_id: str,
db: AsyncSession,
) -> Optional[CompanyTwin]:
"""
Retrieve a CompanyTwin by its twin_id (scans all profiles).
استرجاع التوأم الرقمي برقمه المعرف
"""
all_profiles = await db.execute(select(CompanyProfile))
for profile in all_profiles.scalars():
prefs = profile.deal_preferences or {}
twin_data = prefs.get("twin")
if twin_data and twin_data.get("twin_id") == twin_id:
try:
return CompanyTwin(**twin_data)
except Exception:
continue
return None
# ── Deal Readiness Report ───────────────────────────────────────────────
async def get_deal_readiness_report(
self,
twin_id: str,
db: AsyncSession,
) -> dict:
"""
Generate an Arabic deal-readiness report for the company twin.
إنشاء تقرير جاهزية الصفقات بالعربي للتوأم الرقمي
"""
twin = await self.get_twin_by_id(twin_id, db)
if not twin:
raise ValueError(f"التوأم الرقمي غير موجود: {twin_id}")
issues: list[str] = []
score = 0.0
max_score = 100.0
# 1. Capabilities completeness (0-25)
cap_count = len(twin.capabilities)
if cap_count == 0:
issues.append("لم يتم تحديد أي قدرات للشركة — أضف قدراتك لتحسين فرص المطابقة")
cap_score = 0.0
elif cap_count < 3:
issues.append(f"لديك {cap_count} قدرات فقط — يفضل إضافة 3 قدرات على الأقل")
cap_score = cap_count * 8.0
else:
cap_score = 25.0
score += cap_score
# 2. Needs clarity (0-20)
need_count = len(twin.needs)
if need_count == 0:
issues.append("لم يتم تحديد أي احتياجات — حدد احتياجاتك ليتمكن النظام من إيجاد شركاء")
need_score = 0.0
elif need_count < 2:
issues.append(f"لديك احتياج واحد فقط — أضف المزيد لتوسيع خيارات الشراكة")
need_score = 10.0
else:
need_score = 20.0
score += need_score
# 3. Authority matrix configured (0-15)
authority_score = 0.0
if twin.authority.max_commitment_sar > 0:
authority_score += 5.0
else:
issues.append("لم يتم تحديد حد أقصى لصلاحيات الذكاء الاصطناعي")
if len(twin.authority.auto_approve) > 0:
authority_score += 5.0
if len(twin.authority.forbidden) > 0:
authority_score += 5.0
else:
issues.append("لم يتم تحديد الإجراءات المحظورة — مهم للحماية")
score += authority_score
# 4. Compliance readiness (0-20)
compliance_score = 0.0
if twin.pdpl_consent_status == "granted":
compliance_score += 10.0
else:
issues.append("موافقة نظام حماية البيانات الشخصية (PDPL) غير مكتملة")
if twin.whatsapp_opt_in:
compliance_score += 5.0
else:
issues.append("لم يتم تفعيل الموافقة على التواصل عبر واتساب")
if twin.email_opt_in:
compliance_score += 5.0
else:
issues.append("لم يتم تفعيل الموافقة على التواصل عبر البريد الإلكتروني")
score += compliance_score
# 5. Deal preferences set (0-10)
pref_score = 0.0
if twin.deal_types_allowed:
pref_score += 5.0
else:
issues.append("لم يتم تحديد أنواع الصفقات المسموحة")
if twin.red_lines:
pref_score += 5.0
else:
issues.append("لم يتم تحديد الخطوط الحمراء — يُنصح بتحديدها لحماية مصالحك")
score += pref_score
# 6. Profile completeness (0-10)
profile_score = 0.0
if twin.cr_number:
profile_score += 3.0
else:
issues.append("أضف رقم السجل التجاري لزيادة الموثوقية")
if twin.website:
profile_score += 2.0
if twin.name_ar:
profile_score += 2.0
if twin.annual_revenue_sar > 0:
profile_score += 3.0
score += profile_score
# Readiness level
if score >= 80:
readiness = "جاهز للصفقات"
readiness_level = "high"
elif score >= 50:
readiness = "يحتاج تحسين بسيط"
readiness_level = "medium"
else:
readiness = "يحتاج اهتمام عاجل"
readiness_level = "low"
report = {
"twin_id": twin.twin_id,
"company_name": twin.name,
"company_name_ar": twin.name_ar,
"score": round(score, 1),
"max_score": max_score,
"readiness_level": readiness_level,
"readiness_label_ar": readiness,
"breakdown": {
"capabilities": round(cap_score, 1),
"needs": round(need_score, 1),
"authority": round(authority_score, 1),
"compliance": round(compliance_score, 1),
"deal_preferences": round(pref_score, 1),
"profile": round(profile_score, 1),
},
"issues_ar": issues,
"summary_ar": (
f"شركة {twin.name_ar or twin.name}: "
f"درجة الجاهزية {score:.0f}/100 — {readiness}. "
+ (f"يوجد {len(issues)} ملاحظات تحتاج معالجة." if issues else "الملف مكتمل وجاهز.")
),
}
logger.info(
"Deal readiness report for twin %s: score=%.1f level=%s",
twin_id, score, readiness_level,
)
return report

View File

@ -0,0 +1,674 @@
"""
Deal Room Central workspace for managing an active B2B deal through all stages.
غرفة الصفقة: مساحة العمل المركزية لإدارة صفقة B2B عبر جميع المراحل
"""
import logging
import uuid
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.strategic_deal import StrategicDeal, CompanyProfile, DealStatus
from app.services.llm.provider import get_llm
logger = logging.getLogger("dealix.strategic_deals.deal_room")
# ── Room Stages ─────────────────────────────────────────────────────────────
ROOM_STAGES = [
"discovery",
"qualification",
"proposal",
"negotiation",
"legal",
"approval",
"closed_won",
"closed_lost",
]
STAGE_LABELS_AR = {
"discovery": "اكتشاف",
"qualification": "تأهيل",
"proposal": "مقترح",
"negotiation": "تفاوض",
"legal": "مراجعة قانونية",
"approval": "موافقة",
"closed_won": "تمت بنجاح",
"closed_lost": "لم تتم",
}
# ── Pydantic Models ─────────────────────────────────────────────────────────
class ConcessionRecord(BaseModel):
"""Record of a single concession given or received."""
what: str
value_sar: float = 0.0
direction: str = "given" # given or received
timestamp: str = ""
rationale: str = ""
class ApprovalRequest(BaseModel):
"""An approval request within a deal room."""
approval_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
action: str
details: str = ""
requested_by: str = "ai_agent"
requested_at: str = ""
status: str = "pending" # pending, granted, denied
decided_by: str = ""
decided_at: str = ""
notes: str = ""
class AuditEntry(BaseModel):
"""Immutable audit log entry."""
timestamp: str
actor: str # user_id or "ai_agent"
action: str
details: str = ""
metadata: dict = Field(default_factory=dict)
class RoomMessage(BaseModel):
"""A message within the deal room conversation."""
message_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
direction: str # inbound or outbound
channel: str # email, whatsapp, internal
content: str
sender: str = ""
timestamp: str = ""
metadata: dict = Field(default_factory=dict)
class DealRoom(BaseModel):
"""
Central workspace for managing a B2B deal.
غرفة الصفقة: مساحة العمل المركزية لصفقة B2B
"""
room_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
deal_id: str
tenant_id: str
# Parties
our_twin_id: str = ""
their_profile: dict = Field(default_factory=dict)
# Deal context
deal_type: str = ""
hypothesis: str = "" # Why this deal makes sense (Arabic)
mutual_value: dict = Field(
default_factory=lambda: {"us": [], "them": []},
)
# Negotiation state
current_offer: dict = Field(default_factory=dict)
their_last_response: dict = Field(default_factory=dict)
concessions_made: list[ConcessionRecord] = Field(default_factory=list)
concessions_received: list[ConcessionRecord] = Field(default_factory=list)
batna: dict = Field(default_factory=dict) # Best alternative if deal fails
walk_away_threshold: dict = Field(default_factory=dict)
# Conversation
messages: list[RoomMessage] = Field(default_factory=list)
channel: str = "email"
# Status
stage: str = "discovery"
blockers: list[str] = Field(default_factory=list)
next_action: str = ""
next_action_ar: str = ""
# Governance
approvals_pending: list[ApprovalRequest] = Field(default_factory=list)
approvals_granted: list[ApprovalRequest] = Field(default_factory=list)
red_line_violations: list[dict] = Field(default_factory=list)
audit_log: list[AuditEntry] = Field(default_factory=list)
# Metadata
created_at: str = ""
updated_at: str = ""
# ── Service ─────────────────────────────────────────────────────────────────
class DealRoomService:
"""
Manages DealRoom lifecycle: creation, stage transitions, messaging, governance.
إدارة دورة حياة غرفة الصفقة: الإنشاء، والانتقال بين المراحل، والرسائل، والحوكمة
"""
def __init__(self):
self.llm = get_llm()
# ── Create Room ─────────────────────────────────────────────────────────
async def create_room(
self,
deal_id: str,
our_twin_id: str,
their_profile: dict,
db: AsyncSession,
) -> DealRoom:
"""
Create a new deal room linked to a StrategicDeal.
إنشاء غرفة صفقة جديدة مرتبطة بصفقة استراتيجية
"""
deal_result = await db.execute(
select(StrategicDeal).where(StrategicDeal.id == deal_id)
)
deal = deal_result.scalar_one_or_none()
if not deal:
raise ValueError(f"الصفقة غير موجودة: {deal_id}")
now_iso = datetime.now(timezone.utc).isoformat()
room = DealRoom(
deal_id=str(deal.id),
tenant_id=str(deal.tenant_id),
our_twin_id=our_twin_id,
their_profile=their_profile,
deal_type=deal.deal_type or "",
channel=deal.channel or "email",
stage="discovery",
next_action="research_target",
next_action_ar="بحث عن الطرف الآخر وتحليل احتياجاته",
created_at=now_iso,
updated_at=now_iso,
audit_log=[
AuditEntry(
timestamp=now_iso,
actor="ai_agent",
action="room_created",
details=f"غرفة صفقة جديدة — نوع: {deal.deal_type or 'غير محدد'}",
),
],
)
# Persist room data on the deal
history = list(deal.negotiation_history or [])
history.append({
"round": 0,
"action": "room_created",
"room_id": room.room_id,
"timestamp": now_iso,
})
deal.negotiation_history = history
# Store room in proposed_terms as a nested structure
existing_terms = dict(deal.proposed_terms or {})
existing_terms["_deal_room"] = room.model_dump(mode="json")
deal.proposed_terms = existing_terms
await db.flush()
logger.info("Created deal room %s for deal %s", room.room_id, deal_id)
return room
# ── Load Room ───────────────────────────────────────────────────────────
async def _load_room(self, room_id: str, db: AsyncSession) -> tuple[DealRoom, StrategicDeal]:
"""Load a DealRoom and its parent StrategicDeal."""
# Scan deals for the room
all_deals = await db.execute(select(StrategicDeal))
for deal in all_deals.scalars():
terms = deal.proposed_terms or {}
room_data = terms.get("_deal_room")
if room_data and room_data.get("room_id") == room_id:
return DealRoom(**room_data), deal
raise ValueError(f"غرفة الصفقة غير موجودة: {room_id}")
async def _persist_room(self, room: DealRoom, deal: StrategicDeal, db: AsyncSession):
"""Persist the room state back onto the deal."""
room.updated_at = datetime.now(timezone.utc).isoformat()
existing_terms = dict(deal.proposed_terms or {})
existing_terms["_deal_room"] = room.model_dump(mode="json")
deal.proposed_terms = existing_terms
await db.flush()
# ── Update Stage ────────────────────────────────────────────────────────
async def update_stage(
self,
room_id: str,
new_stage: str,
reason: str,
db: AsyncSession,
):
"""
Transition the deal room to a new stage with audit logging.
نقل غرفة الصفقة إلى مرحلة جديدة مع تسجيل في سجل المراجعة
"""
if new_stage not in ROOM_STAGES:
raise ValueError(f"مرحلة غير صالحة: {new_stage}. المراحل المتاحة: {', '.join(ROOM_STAGES)}")
room, deal = await self._load_room(room_id, db)
old_stage = room.stage
# Validate forward-only transition (except to closed_lost which can happen from any stage)
if new_stage != "closed_lost":
old_idx = ROOM_STAGES.index(old_stage) if old_stage in ROOM_STAGES else 0
new_idx = ROOM_STAGES.index(new_stage)
if new_idx < old_idx:
raise ValueError(
f"لا يمكن الرجوع من {STAGE_LABELS_AR.get(old_stage, old_stage)} "
f"إلى {STAGE_LABELS_AR.get(new_stage, new_stage)}"
)
room.stage = new_stage
now_iso = datetime.now(timezone.utc).isoformat()
room.audit_log.append(
AuditEntry(
timestamp=now_iso,
actor="ai_agent",
action="stage_changed",
details=f"انتقال من {STAGE_LABELS_AR.get(old_stage, old_stage)} إلى {STAGE_LABELS_AR.get(new_stage, new_stage)}: {reason}",
metadata={"old_stage": old_stage, "new_stage": new_stage},
)
)
# Sync deal status
stage_to_status = {
"discovery": DealStatus.DISCOVERY.value,
"qualification": DealStatus.DISCOVERY.value,
"proposal": DealStatus.OUTREACH.value,
"negotiation": DealStatus.NEGOTIATING.value,
"legal": DealStatus.TERM_SHEET.value,
"approval": DealStatus.DUE_DILIGENCE.value,
"closed_won": DealStatus.CLOSED_WON.value,
"closed_lost": DealStatus.CLOSED_LOST.value,
}
mapped_status = stage_to_status.get(new_stage)
if mapped_status:
deal.status = mapped_status
if new_stage in ("closed_won", "closed_lost"):
deal.closed_at = datetime.now(timezone.utc)
await self._persist_room(room, deal, db)
logger.info("Room %s stage: %s -> %s (%s)", room_id, old_stage, new_stage, reason)
# ── Add Message ─────────────────────────────────────────────────────────
async def add_message(
self,
room_id: str,
message: str,
direction: str,
channel: str,
db: AsyncSession,
):
"""
Record a message in the deal room conversation.
تسجيل رسالة في محادثة غرفة الصفقة
"""
room, deal = await self._load_room(room_id, db)
now_iso = datetime.now(timezone.utc).isoformat()
room.messages.append(
RoomMessage(
direction=direction,
channel=channel,
content=message,
sender="ai_agent" if direction == "outbound" else "counterparty",
timestamp=now_iso,
)
)
if direction == "inbound":
room.their_last_response = {
"content": message,
"channel": channel,
"timestamp": now_iso,
}
room.audit_log.append(
AuditEntry(
timestamp=now_iso,
actor="ai_agent" if direction == "outbound" else "counterparty",
action=f"message_{direction}",
details=message[:200],
metadata={"channel": channel},
)
)
await self._persist_room(room, deal, db)
logger.info("Added %s message to room %s via %s", direction, room_id, channel)
# ── Record Concession ───────────────────────────────────────────────────
async def record_concession(
self,
room_id: str,
what: str,
value: float,
db: AsyncSession,
):
"""
Record a concession made during negotiation.
تسجيل تنازل تم خلال التفاوض
"""
room, deal = await self._load_room(room_id, db)
now_iso = datetime.now(timezone.utc).isoformat()
record = ConcessionRecord(
what=what,
value_sar=value,
direction="given",
timestamp=now_iso,
)
room.concessions_made.append(record)
room.audit_log.append(
AuditEntry(
timestamp=now_iso,
actor="ai_agent",
action="concession_made",
details=f"تنازل: {what} (قيمة: {value:,.0f} ريال)",
metadata={"value_sar": value},
)
)
await self._persist_room(room, deal, db)
logger.info("Recorded concession in room %s: %s (%.0f SAR)", room_id, what, value)
# ── Check Red Lines ─────────────────────────────────────────────────────
async def check_red_lines(
self,
room_id: str,
proposed_terms: dict,
db: AsyncSession,
) -> list[str]:
"""
Check proposed terms against the company's red lines.
التحقق من الشروط المقترحة مقابل الخطوط الحمراء للشركة
"""
room, deal = await self._load_room(room_id, db)
# Load the company twin to get red lines
from app.services.strategic_deals.company_twin import CompanyTwinBuilder
builder = CompanyTwinBuilder()
twin = None
if room.our_twin_id:
twin = await builder.get_twin_by_id(room.our_twin_id, db)
if not twin:
# Try loading by company_id from the deal initiator
if deal.initiator_profile_id:
twin = await builder.get_twin(str(deal.initiator_profile_id), db)
red_lines = twin.red_lines if twin else []
if not red_lines:
return []
violations: list[str] = []
terms_text = str(proposed_terms).lower()
# Static keyword check
keyword_checks = {
"حصرية": "exclusivity",
"حقوق ملكية": "equity",
"ضمان": "guarantee",
"تعويض": "compensation",
"غرامة": "penalty",
}
for red_line in red_lines:
red_line_lower = red_line.lower()
# Direct keyword match
if red_line_lower in terms_text:
violations.append(f"خط أحمر: {red_line}")
continue
# Check Arabic keywords
for ar_kw, en_kw in keyword_checks.items():
if ar_kw in red_line_lower and en_kw in terms_text:
violations.append(f"خط أحمر: {red_line}")
break
# If there are potential concerns, use LLM for deeper analysis
if not violations and red_lines:
system_prompt = """أنت مراجع عقود سعودي. تحقق من الشروط المقترحة مقابل الخطوط الحمراء.
أعد النتائج بصيغة JSON:
{
"violations": ["وصف الانتهاك 1", "وصف الانتهاك 2"],
"warnings": ["تحذير 1"]
}
إذا لم يكن هناك انتهاكات، أعد قوائم فارغة."""
context = (
f"الخطوط الحمراء:\n" + "\n".join(f"- {rl}" for rl in red_lines)
+ f"\n\nالشروط المقترحة:\n{str(proposed_terms)}"
)
try:
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=context,
json_mode=True,
temperature=0.1,
)
result = llm_response.parse_json()
if result and result.get("violations"):
violations.extend(result["violations"])
except Exception as exc:
logger.warning("LLM red-line check failed: %s", exc)
# Record violations
if violations:
now_iso = datetime.now(timezone.utc).isoformat()
for v in violations:
room.red_line_violations.append({
"violation": v,
"proposed_terms": proposed_terms,
"timestamp": now_iso,
})
room.audit_log.append(
AuditEntry(
timestamp=now_iso,
actor="ai_agent",
action="red_line_violation",
details=f"تم اكتشاف {len(violations)} انتهاك للخطوط الحمراء",
metadata={"violations": violations},
)
)
await self._persist_room(room, deal, db)
logger.info("Red line check for room %s: %d violations", room_id, len(violations))
return violations
# ── Request Approval ────────────────────────────────────────────────────
async def request_approval(
self,
room_id: str,
action: str,
details: str,
db: AsyncSession,
) -> str:
"""
Create an approval request that pauses AI action until a human decides.
إنشاء طلب موافقة يوقف عمل الذكاء الاصطناعي حتى يقرر إنسان
"""
room, deal = await self._load_room(room_id, db)
now_iso = datetime.now(timezone.utc).isoformat()
approval = ApprovalRequest(
action=action,
details=details,
requested_at=now_iso,
)
room.approvals_pending.append(approval)
room.blockers.append(f"بانتظار موافقة على: {action}")
room.audit_log.append(
AuditEntry(
timestamp=now_iso,
actor="ai_agent",
action="approval_requested",
details=f"طلب موافقة: {action}{details}",
metadata={"approval_id": approval.approval_id},
)
)
await self._persist_room(room, deal, db)
logger.info("Approval requested in room %s: %s (id=%s)", room_id, action, approval.approval_id)
return approval.approval_id
# ── Grant Approval ──────────────────────────────────────────────────────
async def grant_approval(
self,
room_id: str,
approval_id: str,
user_id: str,
db: AsyncSession,
):
"""
Grant a pending approval request.
منح موافقة على طلب معلق
"""
room, deal = await self._load_room(room_id, db)
now_iso = datetime.now(timezone.utc).isoformat()
granted = None
remaining_pending = []
for req in room.approvals_pending:
if req.approval_id == approval_id:
req.status = "granted"
req.decided_by = user_id
req.decided_at = now_iso
granted = req
else:
remaining_pending.append(req)
if not granted:
raise ValueError(f"طلب الموافقة غير موجود: {approval_id}")
room.approvals_pending = remaining_pending
room.approvals_granted.append(granted)
# Remove related blocker
blocker_prefix = f"بانتظار موافقة على: {granted.action}"
room.blockers = [b for b in room.blockers if b != blocker_prefix]
room.audit_log.append(
AuditEntry(
timestamp=now_iso,
actor=user_id,
action="approval_granted",
details=f"تمت الموافقة على: {granted.action}",
metadata={"approval_id": approval_id},
)
)
await self._persist_room(room, deal, db)
logger.info("Approval %s granted by %s in room %s", approval_id, user_id, room_id)
# ── Deal Summary ────────────────────────────────────────────────────────
async def get_deal_summary(
self,
room_id: str,
db: AsyncSession,
) -> dict:
"""
Generate an Arabic summary of the deal room status.
إنشاء ملخص عربي لحالة غرفة الصفقة
"""
room, deal = await self._load_room(room_id, db)
# Gather data for LLM summary
msg_count = len(room.messages)
inbound = sum(1 for m in room.messages if m.direction == "inbound")
outbound = msg_count - inbound
concessions_given = len(room.concessions_made)
concessions_got = len(room.concessions_received)
total_concession_value = sum(c.value_sar for c in room.concessions_made)
pending_approvals = len(room.approvals_pending)
violations = len(room.red_line_violations)
stage_ar = STAGE_LABELS_AR.get(room.stage, room.stage)
their_name = room.their_profile.get("company_name", room.their_profile.get("name", "الطرف الآخر"))
summary = {
"room_id": room.room_id,
"deal_id": room.deal_id,
"stage": room.stage,
"stage_ar": stage_ar,
"their_name": their_name,
"deal_type": room.deal_type,
"channel": room.channel,
"statistics": {
"total_messages": msg_count,
"inbound_messages": inbound,
"outbound_messages": outbound,
"concessions_given": concessions_given,
"concessions_received": concessions_got,
"total_concession_value_sar": total_concession_value,
"pending_approvals": pending_approvals,
"red_line_violations": violations,
},
"blockers": room.blockers,
"next_action": room.next_action,
"next_action_ar": room.next_action_ar,
"current_offer": room.current_offer,
"their_last_response": room.their_last_response,
"summary_ar": (
f"صفقة مع {their_name} — المرحلة: {stage_ar}\n"
f"عدد الرسائل: {msg_count} ({inbound} واردة، {outbound} صادرة)\n"
f"التنازلات المقدمة: {concessions_given} (بقيمة {total_concession_value:,.0f} ريال)\n"
+ (f"موافقات معلقة: {pending_approvals}\n" if pending_approvals else "")
+ (f"تحذير: {violations} انتهاك للخطوط الحمراء\n" if violations else "")
+ (f"الخطوة التالية: {room.next_action_ar}" if room.next_action_ar else "")
),
}
logger.info("Generated deal summary for room %s", room_id)
return summary
# ── Get Rooms ───────────────────────────────────────────────────────────
async def get_rooms(
self,
tenant_id: str,
stage: Optional[str] = None,
db: AsyncSession = None,
) -> list[DealRoom]:
"""
List all deal rooms for a tenant, optionally filtered by stage.
عرض جميع غرف الصفقات لمستأجر معين مع إمكانية الفلترة بالمرحلة
"""
query = select(StrategicDeal).where(
StrategicDeal.tenant_id == tenant_id
)
result = await db.execute(query)
deals = result.scalars().all()
rooms: list[DealRoom] = []
for deal in deals:
terms = deal.proposed_terms or {}
room_data = terms.get("_deal_room")
if not room_data:
continue
try:
room = DealRoom(**room_data)
if stage and room.stage != stage:
continue
rooms.append(room)
except Exception as exc:
logger.warning("Failed to deserialize room from deal %s: %s", deal.id, exc)
logger.info(
"Retrieved %d deal rooms for tenant %s (stage=%s)",
len(rooms), tenant_id, stage or "all",
)
return rooms

View File

@ -0,0 +1,573 @@
"""
Deal Taxonomy Complete taxonomy of 15 B2B deal types with templates and qualification flows.
تصنيف الصفقات: 15 نوعاً من صفقات الشراكات بين الشركات مع قوالب وأسئلة تأهيل
"""
import logging
from typing import Optional
from pydantic import BaseModel, Field
logger = logging.getLogger("dealix.strategic_deals.taxonomy")
# ── Taxonomy Schema ─────────────────────────────────────────────────────────
class DealTypeSpec(BaseModel):
"""Full specification for a deal type in the taxonomy."""
id: str
name: str
name_ar: str
description: str
description_ar: str
qualification_questions: list[str] # Arabic questions
typical_terms: list[str]
risk_level: str # low, medium, high
approval_level: str # mode_0 through mode_4
need_categories: list[str] # Which need categories this deal type addresses
example_ar: str # Real-world Saudi example
# ── The 15-Type Taxonomy ────────────────────────────────────────────────────
DEAL_TAXONOMY: dict[str, dict] = {
"service_barter": {
"name": "Service-for-Service Exchange",
"name_ar": "تبادل خدمات",
"description": "Exchange services of equivalent value without cash transactions",
"description_ar": "تبادل خدمات بقيمة متساوية بين شركتين بدون تدفقات نقدية",
"qualification_questions": [
"ما الخدمة التي تقدمونها للتبادل؟",
"ما القيمة التقديرية لهذه الخدمة بالريال السعودي؟",
"ما الخدمة التي تحتاجونها بالمقابل؟",
"ما المدة المتوقعة لهذا التبادل؟",
"هل لديكم خبرة سابقة في تبادل الخدمات؟",
],
"typical_terms": [
"duration",
"scope",
"quality_sla",
"cancellation",
"value_equivalence_method",
"dispute_resolution",
],
"risk_level": "low",
"approval_level": "mode_2",
"need_categories": ["marketing", "technology", "delivery"],
"example_ar": "شركة تسويق تقدم حملات رقمية لشركة برمجيات مقابل تطوير موقع إلكتروني",
},
"referral_partnership": {
"name": "Referral Partnership",
"name_ar": "شراكة إحالة",
"description": "Earn commission by referring qualified leads to each other",
"description_ar": "كسب عمولة من خلال إحالة عملاء مؤهلين بين الشركتين",
"qualification_questions": [
"ما نوع العملاء الذين تحيلونهم عادة؟",
"ما نسبة العمولة المتوقعة؟",
"كيف يتم تتبع الإحالات؟",
"ما هو متوسط حجم الصفقة لعملائكم؟",
],
"typical_terms": [
"commission_rate",
"tracking_method",
"payment_schedule",
"exclusivity",
"minimum_referrals",
"non_compete",
],
"risk_level": "low",
"approval_level": "mode_2",
"need_categories": ["sales", "marketing"],
"example_ar": "مكتب محاماة يحيل عملاءه لشركة محاسبة مقابل 10% من قيمة أول عقد",
},
"co_selling": {
"name": "Co-Selling Agreement",
"name_ar": "بيع مشترك",
"description": "Joint sales efforts targeting shared opportunities",
"description_ar": "جهود بيع مشتركة لاستهداف فرص مشتركة بين الشركتين",
"qualification_questions": [
"ما المنتجات أو الخدمات التي ستباع بشكل مشترك؟",
"كيف سيتم تقسيم الإيرادات؟",
"من يقود عملية البيع؟",
"ما القطاعات المستهدفة؟",
"هل لديكم فريق مبيعات مخصص لهذا الغرض؟",
],
"typical_terms": [
"revenue_split",
"lead_ownership",
"territory",
"sales_process",
"brand_usage",
"training_requirements",
],
"risk_level": "medium",
"approval_level": "mode_3",
"need_categories": ["sales", "distribution"],
"example_ar": "شركة برمجيات وشركة استشارات يبيعون حلولاً متكاملة لقطاع الصحة",
},
"co_marketing": {
"name": "Co-Marketing Campaign",
"name_ar": "تسويق مشترك",
"description": "Joint marketing campaigns sharing costs and audiences",
"description_ar": "حملات تسويقية مشتركة مع تقاسم التكاليف والجمهور المستهدف",
"qualification_questions": [
"ما القنوات التسويقية المستهدفة؟",
"ما الميزانية المتوقعة من كل طرف؟",
"من الجمهور المستهدف المشترك؟",
"ما مؤشرات النجاح المتفق عليها؟",
],
"typical_terms": [
"budget_split",
"channels",
"brand_guidelines",
"content_approval",
"lead_sharing",
"duration",
],
"risk_level": "low",
"approval_level": "mode_2",
"need_categories": ["marketing"],
"example_ar": "شركتا تقنية تشتركان في رعاية مؤتمر قطاع التجزئة وتتقاسمان العملاء المحتملين",
},
"subcontracting": {
"name": "Subcontracting Agreement",
"name_ar": "عقد باطن (مقاولة فرعية)",
"description": "Outsource specific project scope to a specialized partner",
"description_ar": "إسناد جزء من نطاق المشروع لشريك متخصص كمقاول فرعي",
"qualification_questions": [
"ما نطاق العمل المطلوب إسناده؟",
"ما المهارات والشهادات المطلوبة؟",
"ما الجدول الزمني للتسليم؟",
"هل المشروع حكومي أو خاص؟",
"ما شروط الضمان والجودة؟",
],
"typical_terms": [
"scope_of_work",
"payment_milestones",
"quality_standards",
"liability",
"insurance",
"confidentiality",
"penalties",
],
"risk_level": "medium",
"approval_level": "mode_3",
"need_categories": ["delivery", "talent"],
"example_ar": "شركة مقاولات كبرى تسند أعمال الكهرباء لشركة متخصصة في مشروع حكومي",
},
"white_label": {
"name": "White-Label / Private Label",
"name_ar": "علامة بيضاء",
"description": "Provide products or services under the partner's brand",
"description_ar": "تقديم منتجات أو خدمات تحت العلامة التجارية للشريك",
"qualification_questions": [
"ما المنتج أو الخدمة المراد تقديمها تحت علامتهم؟",
"ما مستوى التخصيص المطلوب؟",
"كيف سيتم التسعير والهوامش؟",
"ما متطلبات الجودة والدعم الفني؟",
],
"typical_terms": [
"branding_rights",
"customization_scope",
"pricing_structure",
"minimum_volume",
"exclusivity",
"support_sla",
"ip_ownership",
],
"risk_level": "medium",
"approval_level": "mode_3",
"need_categories": ["technology", "delivery"],
"example_ar": "شركة برمجيات سعودية توفر نظام CRM تحت العلامة التجارية لشركة اتصالات",
},
"reseller": {
"name": "Reseller Agreement",
"name_ar": "اتفاقية موزع معتمد",
"description": "Authorized resale of products or services with margin",
"description_ar": "إعادة بيع منتجات أو خدمات الشريك بصفة موزع معتمد مع هامش ربح",
"qualification_questions": [
"ما المنتجات المراد توزيعها؟",
"ما المنطقة الجغرافية المستهدفة؟",
"هل التوزيع حصري أم غير حصري؟",
"ما هامش الربح المتوقع؟",
"ما حجم المبيعات المتوقع سنوياً؟",
],
"typical_terms": [
"territory",
"exclusivity",
"margin_structure",
"minimum_purchase",
"payment_terms",
"marketing_support",
"training",
"return_policy",
],
"risk_level": "medium",
"approval_level": "mode_3",
"need_categories": ["distribution", "sales"],
"example_ar": "شركة سعودية توزع حلول أمن سيبراني لشركة أمريكية في منطقة الخليج",
},
"strategic_alliance": {
"name": "Strategic Alliance",
"name_ar": "تحالف استراتيجي",
"description": "Long-term strategic collaboration without equity exchange",
"description_ar": "تعاون استراتيجي طويل الأمد بدون تبادل حصص ملكية",
"qualification_questions": [
"ما الأهداف الاستراتيجية المشتركة؟",
"ما مدة التحالف المتوقعة؟",
"كيف ستتم الحوكمة واتخاذ القرارات؟",
"ما الموارد التي سيساهم بها كل طرف؟",
"هل هناك اتفاقيات عدم منافسة؟",
],
"typical_terms": [
"strategic_objectives",
"governance_structure",
"resource_commitments",
"non_compete",
"exit_terms",
"ip_sharing",
"confidentiality",
],
"risk_level": "high",
"approval_level": "mode_4",
"need_categories": ["capital", "distribution", "technology"],
"example_ar": "شركة لوجستية وشركة تقنية يتحالفان لتقديم حلول سلسلة إمداد ذكية للسوق السعودي",
},
"channel_partnership": {
"name": "Channel Partnership",
"name_ar": "شراكة قنوات توزيع",
"description": "Leverage partner's sales channels for distribution",
"description_ar": "الاستفادة من قنوات بيع الشريك لتوزيع منتجاتك وخدماتك",
"qualification_questions": [
"ما القنوات التي يمتلكها الشريك؟",
"ما حجم قاعدة عملائهم؟",
"كيف سيتم تقسيم المسؤوليات؟",
"ما الدعم المطلوب للقناة (تدريب، مواد تسويقية)؟",
],
"typical_terms": [
"channel_type",
"commission_structure",
"training_requirements",
"marketing_support",
"performance_targets",
"reporting_frequency",
],
"risk_level": "medium",
"approval_level": "mode_3",
"need_categories": ["distribution", "sales"],
"example_ar": "شركة SaaS تستخدم شبكة استشاري إداريين لبيع منتجها في المملكة",
},
"joint_venture": {
"name": "Joint Venture",
"name_ar": "مشروع مشترك",
"description": "Create a new entity jointly owned by both parties",
"description_ar": "إنشاء كيان جديد مملوك بشكل مشترك بين الطرفين",
"qualification_questions": [
"ما هدف المشروع المشترك؟",
"ما نسبة مساهمة كل طرف؟",
"ما الشكل القانوني المقترح (شركة ذات مسؤولية محدودة، شراكة)؟",
"من سيتولى الإدارة اليومية؟",
"ما استراتيجية الخروج؟",
"كيف ستوزع الأرباح والخسائر؟",
],
"typical_terms": [
"equity_split",
"capital_contributions",
"governance",
"management_structure",
"profit_distribution",
"exit_strategy",
"non_compete",
"dispute_resolution",
],
"risk_level": "high",
"approval_level": "mode_4",
"need_categories": ["capital", "technology", "distribution"],
"example_ar": "مستثمر سعودي وشركة تقنية أجنبية ينشئون شركة مشتركة لتقديم حلول الذكاء الاصطناعي محلياً",
},
"acquisition_scouting": {
"name": "Acquisition Scouting",
"name_ar": "استكشاف استحواذ",
"description": "Identify and qualify potential acquisition targets",
"description_ar": "تحديد وتأهيل الشركات المرشحة للاستحواذ",
"qualification_questions": [
"ما القطاع المستهدف للاستحواذ؟",
"ما الحجم المثالي للشركة المستهدفة (إيرادات، موظفين)؟",
"ما الميزانية المتاحة للاستحواذ؟",
"هل تبحثون عن استحواذ كامل أو حصة جزئية؟",
"ما الأصول الاستراتيجية المطلوبة (تقنية، عملاء، تراخيص)؟",
],
"typical_terms": [
"target_criteria",
"valuation_method",
"due_diligence_scope",
"exclusivity_period",
"advisory_fees",
"confidentiality",
],
"risk_level": "high",
"approval_level": "mode_4",
"need_categories": ["capital", "technology"],
"example_ar": "مجموعة سعودية تبحث عن شركات تقنية ناشئة للاستحواذ بميزانية 5-20 مليون ريال",
},
"investment_intro": {
"name": "Investment Introduction",
"name_ar": "تقديم فرصة استثمارية",
"description": "Connect companies with investors or investment opportunities",
"description_ar": "ربط الشركات بمستثمرين أو فرص استثمارية مناسبة",
"qualification_questions": [
"هل تبحثون عن استثمار أم مستثمر؟",
"ما حجم التمويل المطلوب أو المتاح؟",
"ما مرحلة نمو الشركة؟",
"ما العائد المتوقع على الاستثمار؟",
"هل لديكم عرض تقديمي (Pitch Deck) جاهز؟",
],
"typical_terms": [
"investment_size",
"valuation",
"equity_offered",
"use_of_funds",
"board_representation",
"anti_dilution",
"introducer_fee",
],
"risk_level": "high",
"approval_level": "mode_4",
"need_categories": ["capital"],
"example_ar": "شركة ناشئة سعودية تبحث عن جولة تمويل Series A بقيمة 10 مليون ريال",
},
"vendor_replacement": {
"name": "Vendor Replacement",
"name_ar": "استبدال مورد",
"description": "Replace an existing vendor with a better-fit partner",
"description_ar": "استبدال مورد حالي بشريك أفضل من حيث الجودة أو السعر أو الخدمة",
"qualification_questions": [
"ما الخدمة أو المنتج الذي يقدمه المورد الحالي؟",
"ما أسباب الرغبة في التغيير؟",
"ما معايير اختيار المورد الجديد؟",
"ما الميزانية المتاحة؟",
"ما الجدول الزمني المطلوب للانتقال؟",
],
"typical_terms": [
"transition_plan",
"pricing_comparison",
"service_level_agreement",
"contract_duration",
"penalty_clauses",
"data_migration",
],
"risk_level": "medium",
"approval_level": "mode_3",
"need_categories": ["delivery", "technology"],
"example_ar": "مستشفى يبحث عن مورد جديد لمستلزمات طبية بعد انتهاء عقد المورد الحالي",
},
"capability_gap_fill": {
"name": "Capability Gap Fill",
"name_ar": "سد فجوة القدرات",
"description": "Partner with a company to fill a specific capability gap",
"description_ar": "التعاون مع شركة متخصصة لسد فجوة في قدرات شركتك",
"qualification_questions": [
"ما الفجوة التي تحتاجون سدها؟",
"هل هي فجوة مؤقتة أم دائمة؟",
"ما مستوى التخصص المطلوب؟",
"هل تفضلون شريكاً محلياً أم دولياً؟",
"ما ميزانية سد هذه الفجوة؟",
],
"typical_terms": [
"gap_definition",
"duration",
"knowledge_transfer",
"performance_metrics",
"pricing",
"confidentiality",
"training_commitment",
],
"risk_level": "low",
"approval_level": "mode_2",
"need_categories": ["talent", "technology", "delivery"],
"example_ar": "شركة مقاولات تتعاون مع شركة تصميم معماري لتقديم عروض متكاملة",
},
"tender_consortium": {
"name": "Tender Consortium",
"name_ar": "تحالف مناقصات",
"description": "Form a consortium to jointly bid on large tenders",
"description_ar": "تشكيل تحالف للتقدم بعرض مشترك في المناقصات الكبرى",
"qualification_questions": [
"ما المناقصة أو المشروع المستهدف؟",
"ما الجهة المالكة للمناقصة؟",
"ما التخصصات المطلوبة لتكوين التحالف؟",
"ما الموعد النهائي لتقديم العرض؟",
"هل لديكم خبرة سابقة في المناقصات الحكومية؟",
"ما نسبة المحتوى المحلي المطلوبة؟",
],
"typical_terms": [
"scope_allocation",
"revenue_split",
"lead_partner",
"joint_liability",
"bid_bond",
"performance_bond",
"local_content",
"governance",
],
"risk_level": "high",
"approval_level": "mode_4",
"need_categories": ["delivery", "capital", "talent"],
"example_ar": "ثلاث شركات سعودية تتحالف للتقدم لمناقصة مشروع بنية تحتية حكومي بقيمة 50 مليون ريال",
},
}
# ── Mapping from need categories to deal types ──────────────────────────────
_NEED_TO_DEAL_MAP: dict[str, list[str]] = {}
for _deal_id, _spec in DEAL_TAXONOMY.items():
for _cat in _spec["need_categories"]:
_NEED_TO_DEAL_MAP.setdefault(_cat, []).append(_deal_id)
# ── Service ─────────────────────────────────────────────────────────────────
class DealTaxonomyService:
"""
Provides lookup and intelligence over the 15-type deal taxonomy.
خدمة تصنيف الصفقات: بحث واقتراحات ذكية لأنواع الصفقات الخمسة عشر
"""
@staticmethod
def get_deal_type(type_id: str) -> Optional[DealTypeSpec]:
"""Return full spec for a deal type, or None if not found."""
raw = DEAL_TAXONOMY.get(type_id)
if not raw:
return None
return DealTypeSpec(id=type_id, **raw)
@staticmethod
def get_all_types() -> list[DealTypeSpec]:
"""Return all 15 deal types as structured specs."""
return [
DealTypeSpec(id=type_id, **spec)
for type_id, spec in DEAL_TAXONOMY.items()
]
@staticmethod
def get_types_for_need(need_category: str) -> list[str]:
"""
Return deal type IDs that address a given need category.
إرجاع أنواع الصفقات التي تلبي فئة احتياج معينة
"""
return _NEED_TO_DEAL_MAP.get(need_category, [])
@staticmethod
def get_qualification_questions(type_id: str, language: str = "ar") -> list[str]:
"""
Return qualification questions for a deal type.
إرجاع أسئلة التأهيل لنوع صفقة معين
"""
spec = DEAL_TAXONOMY.get(type_id)
if not spec:
return []
questions = spec["qualification_questions"]
if language == "ar":
return questions
# English placeholders — in production these would be translated
return [f"[Q{i+1}] {q}" for i, q in enumerate(questions)]
@staticmethod
def get_typical_terms(type_id: str) -> list[str]:
"""Return typical negotiation terms for a deal type."""
spec = DEAL_TAXONOMY.get(type_id)
if not spec:
return []
return spec["typical_terms"]
@staticmethod
def suggest_deal_type(
capability_category: str,
need_category: str,
) -> str:
"""
Suggest the best deal type given a capability and a need.
اقتراح أفضل نوع صفقة بناءً على القدرة والاحتياج
"""
# Priority matrix: (capability_cat, need_cat) -> preferred deal type
priority_map: dict[tuple[str, str], str] = {
("service", "marketing"): "co_marketing",
("service", "sales"): "co_selling",
("service", "delivery"): "subcontracting",
("service", "technology"): "capability_gap_fill",
("product", "distribution"): "reseller",
("product", "sales"): "channel_partnership",
("expertise", "talent"): "capability_gap_fill",
("expertise", "technology"): "white_label",
("capacity", "delivery"): "subcontracting",
("capacity", "capital"): "joint_venture",
("distribution", "marketing"): "co_marketing",
("distribution", "sales"): "channel_partnership",
("distribution", "distribution"): "reseller",
("technology", "technology"): "white_label",
("technology", "capital"): "investment_intro",
}
specific = priority_map.get((capability_category, need_category))
if specific:
logger.info(
"Suggested deal type %s for capability=%s, need=%s",
specific, capability_category, need_category,
)
return specific
# Fallback: find deal types matching the need category
candidates = _NEED_TO_DEAL_MAP.get(need_category, [])
if candidates:
# Prefer lower-risk options first
risk_order = {"low": 0, "medium": 1, "high": 2}
candidates_sorted = sorted(
candidates,
key=lambda t: risk_order.get(DEAL_TAXONOMY[t]["risk_level"], 1),
)
result = candidates_sorted[0]
logger.info(
"Fallback deal type %s for capability=%s, need=%s",
result, capability_category, need_category,
)
return result
logger.info(
"No specific deal type found for capability=%s, need=%s; defaulting to referral_partnership",
capability_category, need_category,
)
return "referral_partnership"
@staticmethod
def get_risk_level(type_id: str) -> str:
"""Return the risk level for a deal type."""
spec = DEAL_TAXONOMY.get(type_id)
return spec["risk_level"] if spec else "medium"
@staticmethod
def get_approval_level(type_id: str) -> str:
"""Return the minimum operating mode required for this deal type."""
spec = DEAL_TAXONOMY.get(type_id)
return spec["approval_level"] if spec else "mode_3"
@staticmethod
def search_types(query: str) -> list[DealTypeSpec]:
"""
Search deal types by keyword (English or Arabic).
بحث في أنواع الصفقات بكلمة مفتاحية
"""
query_lower = query.lower().strip()
results = []
for type_id, spec in DEAL_TAXONOMY.items():
searchable = " ".join([
type_id,
spec["name"].lower(),
spec["name_ar"],
spec["description"].lower(),
spec["description_ar"],
])
if query_lower in searchable:
results.append(DealTypeSpec(id=type_id, **spec))
return results

View File

@ -0,0 +1,429 @@
"""
Operating Modes Five levels of AI autonomy for deal management.
أوضاع التشغيل: خمسة مستويات لصلاحيات الذكاء الاصطناعي في إدارة الصفقات
"""
import enum
import logging
from typing import Optional
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.strategic_deal import CompanyProfile
logger = logging.getLogger("dealix.strategic_deals.operating_modes")
# ── Operating Modes ─────────────────────────────────────────────────────────
class OperatingMode(int, enum.Enum):
"""
Five escalating levels of AI autonomy.
خمسة مستويات تصاعدية لاستقلالية الذكاء الاصطناعي
"""
MANUAL = 0 # AI analyzes, human does everything / الذكاء الاصطناعي يحلل والإنسان ينفذ
DRAFT = 1 # AI writes drafts, human sends / الذكاء الاصطناعي يكتب والإنسان يرسل
ASSISTED = 2 # AI sends approved templates via email / الذكاء الاصطناعي يرسل قوالب معتمدة بالإيميل
NEGOTIATION = 3 # AI negotiates within defined gates / الذكاء الاصطناعي يفاوض ضمن حدود محددة
STRATEGIC = 4 # Full workflow with mandatory escalation for commitments / سير عمل كامل مع تصعيد إلزامي للالتزامات
MODE_LABELS_AR = {
OperatingMode.MANUAL: "يدوي",
OperatingMode.DRAFT: "مسودات",
OperatingMode.ASSISTED: "مساعد",
OperatingMode.NEGOTIATION: "تفاوض",
OperatingMode.STRATEGIC: "استراتيجي",
}
MODE_DESCRIPTIONS_AR = {
OperatingMode.MANUAL: "الذكاء الاصطناعي يحلل ويقترح فقط — أنت تنفذ كل شيء",
OperatingMode.DRAFT: "الذكاء الاصطناعي يكتب المسودات — أنت تراجع وترسل",
OperatingMode.ASSISTED: "الذكاء الاصطناعي يرسل القوالب المعتمدة عبر البريد الإلكتروني تلقائياً",
OperatingMode.NEGOTIATION: "الذكاء الاصطناعي يتفاوض ضمن الحدود المحددة — يصعّد عند الحاجة",
OperatingMode.STRATEGIC: "سير عمل كامل مع تصعيد إلزامي لأي التزام مالي أو قانوني",
}
# ── Mode Policy ─────────────────────────────────────────────────────────────
class ModePolicy(BaseModel):
"""Policy governing what an AI agent can do in a given operating mode."""
mode: int # OperatingMode value
allowed_channels: list[str] = Field(default_factory=list)
allowed_actions: list[str] = Field(default_factory=list)
auto_send: bool = False
auto_negotiate: bool = False
escalation_triggers: list[str] = Field(default_factory=list)
max_auto_commitment_sar: float = 0.0
# Labels
label_ar: str = ""
description_ar: str = ""
# ── Predefined Policies ────────────────────────────────────────────────────
MODE_POLICIES: dict[OperatingMode, ModePolicy] = {
OperatingMode.MANUAL: ModePolicy(
mode=OperatingMode.MANUAL.value,
allowed_channels=[],
allowed_actions=[
"analyze",
"suggest",
"draft",
"score_match",
"generate_report",
],
auto_send=False,
auto_negotiate=False,
escalation_triggers=["all"],
max_auto_commitment_sar=0,
label_ar="يدوي",
description_ar="الذكاء الاصطناعي يحلل ويقترح فقط — أنت تنفذ كل شيء",
),
OperatingMode.DRAFT: ModePolicy(
mode=OperatingMode.DRAFT.value,
allowed_channels=[],
allowed_actions=[
"analyze",
"suggest",
"draft",
"score_match",
"generate_report",
"craft_introduction",
"draft_proposal",
"draft_counter_offer",
],
auto_send=False,
auto_negotiate=False,
escalation_triggers=["all"],
max_auto_commitment_sar=0,
label_ar="مسودات",
description_ar="الذكاء الاصطناعي يكتب المسودات — أنت تراجع وترسل",
),
OperatingMode.ASSISTED: ModePolicy(
mode=OperatingMode.ASSISTED.value,
allowed_channels=["email"],
allowed_actions=[
"analyze",
"suggest",
"draft",
"score_match",
"generate_report",
"craft_introduction",
"draft_proposal",
"send_template",
"send_follow_up",
"schedule_reminder",
],
auto_send=True,
auto_negotiate=False,
escalation_triggers=[
"reply_received",
"objection",
"pricing_question",
"meeting_request",
"negative_sentiment",
],
max_auto_commitment_sar=0,
label_ar="مساعد",
description_ar="الذكاء الاصطناعي يرسل القوالب المعتمدة عبر البريد الإلكتروني تلقائياً",
),
OperatingMode.NEGOTIATION: ModePolicy(
mode=OperatingMode.NEGOTIATION.value,
allowed_channels=["email", "whatsapp"],
allowed_actions=[
"analyze",
"suggest",
"draft",
"score_match",
"generate_report",
"craft_introduction",
"draft_proposal",
"send_template",
"send_follow_up",
"schedule_reminder",
"send_custom_message",
"handle_response",
"counter_offer",
"negotiate_terms",
"record_concession",
],
auto_send=True,
auto_negotiate=True,
escalation_triggers=[
"pricing_change",
"exclusivity",
"equity",
"legal_terms",
"value_above_threshold",
"human_requested",
"stall_detected",
],
max_auto_commitment_sar=50_000,
label_ar="تفاوض",
description_ar="الذكاء الاصطناعي يتفاوض ضمن الحدود المحددة — يصعّد عند الحاجة",
),
OperatingMode.STRATEGIC: ModePolicy(
mode=OperatingMode.STRATEGIC.value,
allowed_channels=["email", "whatsapp"],
allowed_actions=[
"analyze",
"suggest",
"draft",
"score_match",
"generate_report",
"craft_introduction",
"draft_proposal",
"send_template",
"send_follow_up",
"schedule_reminder",
"send_custom_message",
"handle_response",
"counter_offer",
"negotiate_terms",
"record_concession",
"request_approval",
"generate_term_sheet",
"run_discovery_scan",
"run_outreach_campaign",
],
auto_send=True,
auto_negotiate=True,
escalation_triggers=[
"commitment",
"exclusivity",
"equity",
"legal",
"data_sharing",
"ip_licensing",
"territory_change",
"value_above_threshold",
"human_requested",
],
max_auto_commitment_sar=100_000,
label_ar="استراتيجي",
description_ar="سير عمل كامل مع تصعيد إلزامي لأي التزام مالي أو قانوني",
),
}
# ── Mode Enforcer ───────────────────────────────────────────────────────────
class ModeEnforcer:
"""
Enforces operating mode policies before any AI action is executed.
يفرض سياسات وضع التشغيل قبل تنفيذ أي إجراء للذكاء الاصطناعي
"""
@staticmethod
async def check_action(
mode: OperatingMode,
action: str,
deal_value: float,
db: AsyncSession,
) -> tuple[bool, str]:
"""
Check whether an action is allowed under the current operating mode.
Returns (allowed, reason_ar).
التحقق مما إذا كان الإجراء مسموحاً في وضع التشغيل الحالي
يرجع (مسموح، السبب_بالعربي)
"""
policy = MODE_POLICIES.get(mode)
if not policy:
return False, f"وضع التشغيل غير معروف: {mode}"
# Check if action is in allowed list
if action not in policy.allowed_actions:
mode_label = MODE_LABELS_AR.get(mode, str(mode))
return False, (
f"الإجراء '{action}' غير مسموح في وضع '{mode_label}'. "
f"الإجراءات المتاحة: {', '.join(policy.allowed_actions)}"
)
# Check if deal value exceeds auto-commitment threshold
if deal_value > 0 and deal_value > policy.max_auto_commitment_sar:
return False, (
f"قيمة الصفقة ({deal_value:,.0f} ريال) تتجاوز الحد الأقصى للالتزام التلقائي "
f"({policy.max_auto_commitment_sar:,.0f} ريال). يلزم تصعيد للإنسان."
)
# Check escalation triggers
escalation_actions = {
"counter_offer": ["pricing_change"],
"negotiate_terms": ["pricing_change", "legal_terms"],
"send_custom_message": [],
"handle_response": ["reply_received"],
"generate_term_sheet": ["legal_terms", "commitment"],
"run_outreach_campaign": [],
}
action_triggers = escalation_actions.get(action, [])
for trigger in action_triggers:
if trigger in policy.escalation_triggers:
if not policy.auto_negotiate:
mode_label = MODE_LABELS_AR.get(mode, str(mode))
return False, (
f"الإجراء '{action}' يستلزم تصعيداً بسبب: {trigger}. "
f"وضع '{mode_label}' لا يسمح بالتفاوض التلقائي."
)
logger.info(
"Action '%s' allowed in mode %s (deal_value=%.0f SAR)",
action, mode.name, deal_value,
)
return True, "مسموح"
@staticmethod
async def check_channel(
mode: OperatingMode,
channel: str,
) -> tuple[bool, str]:
"""
Check whether a communication channel is allowed under the current mode.
التحقق مما إذا كانت قناة الاتصال مسموحة في الوضع الحالي
"""
policy = MODE_POLICIES.get(mode)
if not policy:
return False, f"وضع التشغيل غير معروف: {mode}"
if not policy.allowed_channels:
mode_label = MODE_LABELS_AR.get(mode, str(mode))
return False, f"وضع '{mode_label}' لا يسمح بأي قناة اتصال. الإرسال يتم يدوياً."
if channel not in policy.allowed_channels:
mode_label = MODE_LABELS_AR.get(mode, str(mode))
return False, (
f"القناة '{channel}' غير مسموحة في وضع '{mode_label}'. "
f"القنوات المتاحة: {', '.join(policy.allowed_channels)}"
)
return True, "مسموح"
@staticmethod
async def get_current_mode(
tenant_id: str,
db: AsyncSession,
) -> OperatingMode:
"""
Get the current operating mode for a tenant.
الحصول على وضع التشغيل الحالي للمستأجر
"""
# Mode is stored in the tenant's first company profile deal_preferences
result = await db.execute(
select(CompanyProfile).where(
CompanyProfile.tenant_id == tenant_id
).limit(1)
)
profile = result.scalar_one_or_none()
if not profile:
logger.info("No profile found for tenant %s, defaulting to MANUAL", tenant_id)
return OperatingMode.MANUAL
prefs = profile.deal_preferences or {}
mode_value = prefs.get("_operating_mode", OperatingMode.MANUAL.value)
try:
return OperatingMode(mode_value)
except ValueError:
logger.warning("Invalid operating mode %s for tenant %s, defaulting to MANUAL", mode_value, tenant_id)
return OperatingMode.MANUAL
@staticmethod
async def set_mode(
tenant_id: str,
mode: OperatingMode,
db: AsyncSession,
):
"""
Set the operating mode for a tenant.
تعيين وضع التشغيل للمستأجر
"""
result = await db.execute(
select(CompanyProfile).where(
CompanyProfile.tenant_id == tenant_id
).limit(1)
)
profile = result.scalar_one_or_none()
if not profile:
raise ValueError(f"لا يوجد ملف شركة للمستأجر: {tenant_id}")
prefs = dict(profile.deal_preferences or {})
old_mode = prefs.get("_operating_mode", OperatingMode.MANUAL.value)
prefs["_operating_mode"] = mode.value
profile.deal_preferences = prefs
await db.flush()
old_label = MODE_LABELS_AR.get(OperatingMode(old_mode), str(old_mode))
new_label = MODE_LABELS_AR.get(mode, str(mode))
logger.info(
"Operating mode for tenant %s changed: %s -> %s",
tenant_id, old_label, new_label,
)
@staticmethod
def get_mode_policy(mode: OperatingMode) -> ModePolicy:
"""
Get the policy for a specific operating mode.
الحصول على سياسة وضع تشغيل محدد
"""
policy = MODE_POLICIES.get(mode)
if not policy:
raise ValueError(f"وضع التشغيل غير معروف: {mode}")
return policy
@staticmethod
def get_all_modes() -> list[dict]:
"""
List all operating modes with their labels and descriptions.
عرض جميع أوضاع التشغيل مع التسميات والأوصاف
"""
return [
{
"mode": mode.value,
"name": mode.name,
"label_ar": MODE_LABELS_AR[mode],
"description_ar": MODE_DESCRIPTIONS_AR[mode],
"auto_send": MODE_POLICIES[mode].auto_send,
"auto_negotiate": MODE_POLICIES[mode].auto_negotiate,
"max_auto_commitment_sar": MODE_POLICIES[mode].max_auto_commitment_sar,
"allowed_channels": MODE_POLICIES[mode].allowed_channels,
}
for mode in OperatingMode
]
@staticmethod
async def should_escalate(
mode: OperatingMode,
trigger: str,
deal_value: float,
) -> tuple[bool, str]:
"""
Determine if a specific trigger requires human escalation.
تحديد ما إذا كان المحفز يستلزم تصعيداً للإنسان
"""
policy = MODE_POLICIES.get(mode)
if not policy:
return True, "وضع التشغيل غير معروف — يجب التصعيد"
# "all" trigger means everything escalates
if "all" in policy.escalation_triggers:
return True, f"وضع '{MODE_LABELS_AR.get(mode, '')}' يتطلب تصعيد كل الإجراءات"
if trigger in policy.escalation_triggers:
return True, f"المحفز '{trigger}' يتطلب تصعيداً في وضع '{MODE_LABELS_AR.get(mode, '')}'"
if deal_value > policy.max_auto_commitment_sar > 0:
return True, (
f"قيمة الصفقة ({deal_value:,.0f} ريال) تتجاوز الحد ({policy.max_auto_commitment_sar:,.0f} ريال)"
)
return False, "لا يلزم تصعيد"