diff --git a/salesflow-saas/backend/app/services/strategic_deals/__init__.py b/salesflow-saas/backend/app/services/strategic_deals/__init__.py index 1483032b..8e25ede5 100644 --- a/salesflow-saas/backend/app/services/strategic_deals/__init__.py +++ b/salesflow-saas/backend/app/services/strategic_deals/__init__.py @@ -1,6 +1,7 @@ """ -Dealix Strategic Deals Engine — Deal Exchange OS -محرك الصفقات الاستراتيجية — نظام تبادل الصفقات: اكتشاف وتفاوض وإغلاق شراكات B2B بالذكاء الاصطناعي +Dealix Strategic Deals Engine — Deal Exchange OS + Strategic Growth OS +محرك الصفقات الاستراتيجية — نظام تبادل الصفقات + نظام النمو الاستراتيجي +اكتشاف وتفاوض وإغلاق شراكات B2B بالذكاء الاصطناعي """ from app.services.strategic_deals.company_profiler import CompanyProfiler @@ -13,6 +14,21 @@ from app.services.strategic_deals.deal_room import DealRoom, DealRoomService from app.services.strategic_deals.operating_modes import OperatingMode, ModeEnforcer, MODE_POLICIES from app.services.strategic_deals.channel_compliance import ChannelRules, ConsentLedger +# Strategic Growth OS +from app.services.strategic_deals.acquisition_scouting import ( + AcquisitionTarget, AcquisitionCriteria, AcquisitionScoutingEngine, +) +from app.services.strategic_deals.ecosystem_mapper import ( + EcosystemEntity, EcosystemLink, EcosystemMapper, +) +from app.services.strategic_deals.strategic_simulator import ( + StrategicScenario, StrategicSimulator, +) +from app.services.strategic_deals.roi_engine import ROICalculation, ROIEngine +from app.services.strategic_deals.portfolio_intelligence import ( + PortfolioInsight, PortfolioIntelligence, +) + __all__ = [ # Existing "CompanyProfiler", @@ -32,4 +48,17 @@ __all__ = [ "MODE_POLICIES", "ChannelRules", "ConsentLedger", + # Strategic Growth OS + "AcquisitionTarget", + "AcquisitionCriteria", + "AcquisitionScoutingEngine", + "EcosystemEntity", + "EcosystemLink", + "EcosystemMapper", + "StrategicScenario", + "StrategicSimulator", + "ROICalculation", + "ROIEngine", + "PortfolioInsight", + "PortfolioIntelligence", ] diff --git a/salesflow-saas/backend/app/services/strategic_deals/acquisition_scouting.py b/salesflow-saas/backend/app/services/strategic_deals/acquisition_scouting.py new file mode 100644 index 00000000..86b5b86c --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/acquisition_scouting.py @@ -0,0 +1,494 @@ +""" +Acquisition Scouting Engine — AI-powered M&A target identification for Saudi B2B. +محرك استكشاف الاستحواذ: تحديد أهداف الاندماج والاستحواذ بالذكاء الاصطناعي للسوق السعودي +""" + +import json +import logging +import uuid +from datetime import datetime, timezone +from typing import Optional + +from pydantic import BaseModel, Field +from sqlalchemy import select, and_, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.strategic_deal import CompanyProfile +from app.services.llm.provider import get_llm + +logger = logging.getLogger("dealix.strategic_deals.acquisition_scouting") + +# ── Saudi sector synergy map ──────────────────────────────────────────────── + +SECTOR_SYNERGY = { + "technology": ["consulting", "telecom", "media", "education"], + "construction": ["real_estate", "manufacturing", "energy", "logistics"], + "real_estate": ["construction", "finance", "tourism"], + "retail": ["wholesale", "logistics", "food_beverage", "marketing"], + "healthcare": ["technology", "manufacturing", "consulting"], + "finance": ["technology", "real_estate", "consulting"], + "logistics": ["retail", "wholesale", "manufacturing", "food_beverage"], + "energy": ["construction", "manufacturing", "technology"], + "food_beverage": ["logistics", "retail", "agriculture", "tourism"], + "consulting": ["technology", "finance", "healthcare", "education"], + "manufacturing": ["construction", "wholesale", "logistics", "energy"], + "marketing": ["technology", "media", "retail", "telecom"], + "telecom": ["technology", "media", "consulting"], + "education": ["technology", "consulting", "media"], + "tourism": ["food_beverage", "real_estate", "marketing"], + "media": ["marketing", "technology", "telecom", "tourism"], + "agriculture": ["food_beverage", "logistics", "manufacturing"], + "automotive": ["manufacturing", "logistics", "finance"], + "government": ["technology", "consulting", "construction"], + "wholesale": ["retail", "manufacturing", "logistics"], +} + +# ── Valid status transitions ──────────────────────────────────────────────── + +VALID_STATUSES = ("scouted", "qualified", "briefed", "intro_sent", "in_discussion") + +STATUS_TRANSITIONS = { + "scouted": ["qualified", "briefed"], + "qualified": ["briefed", "intro_sent"], + "briefed": ["intro_sent", "in_discussion"], + "intro_sent": ["in_discussion"], + "in_discussion": [], +} + + +# ── Models ────────────────────────────────────────────────────────────────── + + +class AcquisitionTarget(BaseModel): + """Represents a scouted M&A target with strategic scoring.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + company_name: str + company_name_ar: str = "" + industry: str = "" + city: str = "" + strategic_fit_score: float = Field(0.0, ge=0.0, le=1.0) + market_adjacency: float = Field(0.0, ge=0.0, le=1.0) + size_fit: float = Field(0.0, ge=0.0, le=1.0) + estimated_value_sar: float = 0.0 + growth_signals: list[str] = Field(default_factory=list) + risk_factors: list[str] = Field(default_factory=list) + brief: str = "" + status: str = "scouted" + tenant_id: Optional[str] = None + scouted_at: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + + class Config: + json_schema_extra = { + "example": { + "company_name": "TechVision Co", + "company_name_ar": "شركة تك فيجن", + "industry": "technology", + "city": "الرياض", + "strategic_fit_score": 0.85, + "market_adjacency": 0.7, + "size_fit": 0.6, + "estimated_value_sar": 5_000_000.0, + "growth_signals": ["نمو الإيرادات ٣٠٪ سنوياً", "توسع في ٣ مدن جديدة"], + "risk_factors": ["اعتماد كبير على عميل واحد"], + "status": "scouted", + } + } + + +class AcquisitionCriteria(BaseModel): + """Filter criteria for scouting acquisition targets.""" + industries: list[str] = Field(default_factory=list) + cities: list[str] = Field(default_factory=list) + min_revenue_sar: float = 0.0 + max_revenue_sar: float = 0.0 + min_employees: int = 0 + max_employees: int = 0 + required_capabilities: list[str] = Field(default_factory=list) + exclude_ids: list[str] = Field(default_factory=list) + min_strategic_fit: float = 0.3 + + +# ── Engine ────────────────────────────────────────────────────────────────── + + +class AcquisitionScoutingEngine: + """ + AI-powered acquisition target scouting engine. + Identifies, scores, and briefs potential M&A targets in the Saudi market. + محرك استكشاف أهداف الاستحواذ بالذكاء الاصطناعي — يحدد ويقيّم ويلخص أهداف الاندماج والاستحواذ + """ + + def __init__(self): + self.llm = get_llm() + self._watchlists: dict[str, list[AcquisitionTarget]] = {} + + # ── Scout ─────────────────────────────────────────────────────────────── + + async def scout( + self, + criteria: AcquisitionCriteria, + tenant_id: str, + db: AsyncSession, + ) -> list[AcquisitionTarget]: + """ + Scout potential acquisition targets matching criteria from the company pool. + استكشاف أهداف الاستحواذ المحتملة التي تطابق المعايير من قاعدة الشركات + """ + query = select(CompanyProfile).where( + CompanyProfile.tenant_id == tenant_id, + CompanyProfile.is_verified == True, # noqa: E712 + ) + + result = await db.execute(query) + all_profiles = result.scalars().all() + + if not all_profiles: + logger.info("No company profiles found for tenant %s", tenant_id) + return [] + + targets: list[AcquisitionTarget] = [] + + for profile in all_profiles: + if str(profile.id) in criteria.exclude_ids: + continue + + # Industry filter + if criteria.industries and (profile.industry or "") not in criteria.industries: + adjacent = set() + for ind in criteria.industries: + adjacent.update(SECTOR_SYNERGY.get(ind, [])) + if (profile.industry or "") not in adjacent: + continue + + # City filter + if criteria.cities and (profile.region or "") not in criteria.cities: + continue + + # Revenue filter + revenue = float(profile.annual_revenue_sar or 0) + if criteria.min_revenue_sar > 0 and revenue < criteria.min_revenue_sar: + continue + if criteria.max_revenue_sar > 0 and revenue > criteria.max_revenue_sar: + continue + + # Employee count filter + emp = int(profile.employee_count or 0) + if criteria.min_employees > 0 and emp < criteria.min_employees: + continue + if criteria.max_employees > 0 and emp > criteria.max_employees: + continue + + # Capability filter + if criteria.required_capabilities: + profile_caps = {c.lower() for c in (profile.capabilities or [])} + required = {c.lower() for c in criteria.required_capabilities} + if not required & profile_caps: + continue + + # Build raw target + target = AcquisitionTarget( + company_name=profile.company_name or "", + company_name_ar=profile.company_name_ar if hasattr(profile, "company_name_ar") else "", + industry=profile.industry or "", + city=profile.region or "", + estimated_value_sar=self._estimate_value(profile), + status="scouted", + tenant_id=tenant_id, + ) + targets.append(target) + + # Score all targets using LLM for strategic fit + if targets: + acquirer_profile = await self._get_acquirer_profile(tenant_id, db) + scored = [] + for target in targets: + scored_target = await self.score_target(target, acquirer_profile, db) + if scored_target.strategic_fit_score >= criteria.min_strategic_fit: + scored.append(scored_target) + targets = sorted(scored, key=lambda t: t.strategic_fit_score, reverse=True) + + # Persist to watchlist + self._watchlists.setdefault(tenant_id, []).extend(targets) + + logger.info( + "Scouted %d acquisition targets for tenant %s (from %d candidates)", + len(targets), tenant_id, len(all_profiles), + ) + return targets + + # ── Score Target ──────────────────────────────────────────────────────── + + async def score_target( + self, + target: AcquisitionTarget, + acquirer_twin: Optional[CompanyProfile], + db: AsyncSession, + ) -> AcquisitionTarget: + """ + Score a target against the acquirer's strategic profile. + تقييم هدف الاستحواذ مقابل الملف الاستراتيجي للمستحوذ + """ + acquirer_industry = acquirer_twin.industry if acquirer_twin else "unknown" + acquirer_caps = acquirer_twin.capabilities if acquirer_twin else [] + acquirer_revenue = float(acquirer_twin.annual_revenue_sar or 0) if acquirer_twin else 0 + acquirer_name = acquirer_twin.company_name if acquirer_twin else "الشركة المستحوذة" + + # Market adjacency score + target.market_adjacency = self._compute_adjacency(acquirer_industry, target.industry) + + # Size fit — ideal ratio between 0.05 and 0.5 of acquirer + if acquirer_revenue > 0 and target.estimated_value_sar > 0: + ratio = target.estimated_value_sar / acquirer_revenue + if 0.05 <= ratio <= 0.5: + target.size_fit = 1.0 + elif 0.01 <= ratio < 0.05 or 0.5 < ratio <= 1.0: + target.size_fit = 0.6 + else: + target.size_fit = 0.3 + else: + target.size_fit = 0.5 + + # Use LLM for strategic fit, growth signals, and risk factors + context = f"""المستحوذ: {acquirer_name} +قطاع المستحوذ: {acquirer_industry} +قدرات المستحوذ: {', '.join(acquirer_caps or ['غير محدد'])} +إيرادات المستحوذ: {acquirer_revenue:,.0f} ريال + +الهدف: {target.company_name} +قطاع الهدف: {target.industry} +مدينة الهدف: {target.city} +القيمة التقديرية: {target.estimated_value_sar:,.0f} ريال""" + + system_prompt = """أنت مستشار اندماج واستحواذ سعودي خبير. قيّم هذا الهدف الاستحواذي. + +Return JSON: +{ + "strategic_fit_score": 0.0 to 1.0, + "growth_signals": ["إشارة نمو ١ بالعربي", "إشارة نمو ٢"], + "risk_factors": ["عامل خطر ١ بالعربي", "عامل خطر ٢"], + "rationale_ar": "سبب التوصية بالعربي" +}""" + + try: + llm_response = await self.llm.complete( + system_prompt=system_prompt, + user_message=context, + json_mode=True, + temperature=0.3, + ) + result = llm_response.parse_json() or {} + + target.strategic_fit_score = min(1.0, max(0.0, float(result.get("strategic_fit_score", 0.5)))) + target.growth_signals = result.get("growth_signals", []) + target.risk_factors = result.get("risk_factors", []) + + # Blend LLM fit with computed adjacency and size fit + blended = ( + target.strategic_fit_score * 0.5 + + target.market_adjacency * 0.3 + + target.size_fit * 0.2 + ) + target.strategic_fit_score = round(min(1.0, blended), 4) + + except Exception as exc: + logger.warning("LLM scoring failed for target %s: %s", target.company_name, exc) + target.strategic_fit_score = round( + target.market_adjacency * 0.6 + target.size_fit * 0.4, 4 + ) + target.growth_signals = ["لم يتم التحليل — يتطلب مراجعة يدوية"] + target.risk_factors = ["لم يتم التحليل — يتطلب مراجعة يدوية"] + + logger.info( + "Scored target %s: fit=%.2f adjacency=%.2f size=%.2f", + target.company_name, target.strategic_fit_score, + target.market_adjacency, target.size_fit, + ) + return target + + # ── Generate Brief ────────────────────────────────────────────────────── + + async def generate_brief( + self, + target_id: str, + db: AsyncSession, + ) -> str: + """ + Generate a detailed Arabic acquisition brief for a scouted target. + إنشاء ملخص استحواذ تفصيلي بالعربي لهدف مُستكشَف + """ + target = self._find_target(target_id) + if not target: + raise ValueError(f"Target {target_id} not found in watchlist") + + context = f"""الشركة المستهدفة: {target.company_name} ({target.company_name_ar}) +القطاع: {target.industry} +المدينة: {target.city} +القيمة التقديرية: {target.estimated_value_sar:,.0f} ريال سعودي +درجة الملاءمة الاستراتيجية: {target.strategic_fit_score:.0%} +درجة القرب السوقي: {target.market_adjacency:.0%} +ملاءمة الحجم: {target.size_fit:.0%} +إشارات النمو: {', '.join(target.growth_signals)} +عوامل الخطر: {', '.join(target.risk_factors)}""" + + system_prompt = """أنت مستشار اندماج واستحواذ سعودي. اكتب ملخص استحواذ تنفيذي شامل بالعربي. + +يجب أن يشمل الملخص: +١. نظرة عامة على الشركة المستهدفة +٢. المبرر الاستراتيجي للاستحواذ +٣. تحليل نقاط القوة والفرص +٤. المخاطر الرئيسية واستراتيجيات التخفيف +٥. التقييم المبدئي والهيكل المقترح +٦. الخطوات التالية الموصى بها +٧. الجدول الزمني المتوقع + +اكتب الملخص بأسلوب تنفيذي رسمي مناسب لعرضه على مجلس الإدارة.""" + + try: + llm_response = await self.llm.complete( + system_prompt=system_prompt, + user_message=context, + temperature=0.3, + ) + brief_text = llm_response.content.strip() + except Exception as exc: + logger.error("Brief generation failed for target %s: %s", target_id, exc) + brief_text = ( + f"ملخص استحواذ — {target.company_name}\n" + f"القطاع: {target.industry} | المدينة: {target.city}\n" + f"القيمة التقديرية: {target.estimated_value_sar:,.0f} ريال\n" + f"درجة الملاءمة: {target.strategic_fit_score:.0%}\n" + f"إشارات النمو: {', '.join(target.growth_signals)}\n" + f"عوامل الخطر: {', '.join(target.risk_factors)}\n" + f"الحالة: يتطلب تحليل يدوي إضافي" + ) + + target.brief = brief_text + target.status = "briefed" + + logger.info("Generated acquisition brief for target %s", target_id) + return brief_text + + # ── Get Watchlist ─────────────────────────────────────────────────────── + + async def get_watchlist( + self, + tenant_id: str, + db: AsyncSession, + ) -> list[AcquisitionTarget]: + """ + Retrieve the current acquisition watchlist for a tenant. + استرجاع قائمة مراقبة الاستحواذ الحالية للمستأجر + """ + watchlist = self._watchlists.get(tenant_id, []) + logger.info("Retrieved watchlist for tenant %s: %d targets", tenant_id, len(watchlist)) + return sorted(watchlist, key=lambda t: t.strategic_fit_score, reverse=True) + + # ── Update Status ─────────────────────────────────────────────────────── + + async def update_status( + self, + target_id: str, + status: str, + db: AsyncSession, + ) -> AcquisitionTarget: + """ + Advance a target through the acquisition pipeline. + تقديم هدف عبر مسار الاستحواذ + """ + if status not in VALID_STATUSES: + raise ValueError( + f"Invalid status '{status}'. Must be one of: {', '.join(VALID_STATUSES)}" + ) + + target = self._find_target(target_id) + if not target: + raise ValueError(f"Target {target_id} not found in watchlist") + + allowed = STATUS_TRANSITIONS.get(target.status, []) + if status != target.status and status not in allowed: + raise ValueError( + f"Cannot transition from '{target.status}' to '{status}'. " + f"Allowed transitions: {', '.join(allowed) if allowed else 'none (terminal state)'}" + ) + + old_status = target.status + target.status = status + + logger.info( + "Updated target %s status: %s -> %s", + target_id, old_status, status, + ) + return target + + # ── Private Helpers ───────────────────────────────────────────────────── + + def _compute_adjacency(self, acquirer_industry: str, target_industry: str) -> float: + """Compute market adjacency between two industries.""" + if not acquirer_industry or not target_industry: + return 0.3 + if acquirer_industry == target_industry: + return 1.0 + synergies = SECTOR_SYNERGY.get(acquirer_industry, []) + if target_industry in synergies: + return 0.7 + # Check reverse + reverse = SECTOR_SYNERGY.get(target_industry, []) + if acquirer_industry in reverse: + return 0.6 + return 0.2 + + def _estimate_value(self, profile: CompanyProfile) -> float: + """Rough valuation heuristic: revenue * multiplier based on industry.""" + revenue = float(profile.annual_revenue_sar or 0) + if revenue <= 0: + emp = int(profile.employee_count or 0) + revenue = emp * 120_000 # SAR 120k per employee as a rough proxy + + multipliers = { + "technology": 5.0, + "healthcare": 4.0, + "finance": 3.5, + "consulting": 3.0, + "education": 3.0, + "media": 3.0, + "telecom": 3.5, + "retail": 2.0, + "wholesale": 1.5, + "construction": 2.0, + "real_estate": 2.5, + "manufacturing": 2.0, + "logistics": 2.5, + "food_beverage": 2.0, + "energy": 3.0, + "marketing": 2.5, + "tourism": 2.0, + "agriculture": 1.5, + "automotive": 2.0, + "government": 1.0, + } + industry = profile.industry or "" + mult = multipliers.get(industry, 2.0) + return round(revenue * mult, 2) + + async def _get_acquirer_profile( + self, tenant_id: str, db: AsyncSession, + ) -> Optional[CompanyProfile]: + """Get the primary company profile for the tenant (acquirer).""" + result = await db.execute( + select(CompanyProfile) + .where( + CompanyProfile.tenant_id == tenant_id, + CompanyProfile.is_verified == True, # noqa: E712 + ) + .order_by(CompanyProfile.created_at) + .limit(1) + ) + return result.scalar_one_or_none() + + def _find_target(self, target_id: str) -> Optional[AcquisitionTarget]: + """Search all watchlists for a target by ID.""" + for targets in self._watchlists.values(): + for t in targets: + if t.id == target_id: + return t + return None diff --git a/salesflow-saas/backend/app/services/strategic_deals/ecosystem_mapper.py b/salesflow-saas/backend/app/services/strategic_deals/ecosystem_mapper.py new file mode 100644 index 00000000..8e48ac55 --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/ecosystem_mapper.py @@ -0,0 +1,568 @@ +""" +Ecosystem Mapper — Maps and analyzes B2B partner ecosystems in the Saudi market. +خريطة المنظومة: رسم وتحليل منظومة الشركاء في السوق السعودي +""" + +import json +import logging +import uuid +from collections import defaultdict +from typing import Optional + +from pydantic import BaseModel, Field +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.strategic_deal import CompanyProfile +from app.services.llm.provider import get_llm + +logger = logging.getLogger("dealix.strategic_deals.ecosystem_mapper") + +# ── Entity type definitions ───────────────────────────────────────────────── + +ENTITY_TYPES = { + "agency": "وكالة", + "integrator": "مُدمج أنظمة", + "reseller": "موزع معتمد", + "consultant": "مستشار", + "distributor": "موزع", + "supplier": "مورد", + "customer": "عميل", + "competitor": "منافس", +} + +LINK_TYPES = ("partner", "competitor", "vendor", "client", "referral", "subsidiary") + +# ── Capability clusters for gap analysis ──────────────────────────────────── + +CAPABILITY_CLUSTERS = { + "تقنية": ["تطوير برمجيات", "حوسبة سحابية", "أمن سيبراني", "ذكاء اصطناعي", "تحليل بيانات"], + "تسويق": ["تسويق رقمي", "إعلان", "علاقات عامة", "إدارة محتوى", "سوشل ميديا"], + "عمليات": ["لوجستيات", "سلسلة إمداد", "إدارة مخازن", "نقل", "توزيع"], + "مالية": ["محاسبة", "تدقيق", "استشارات مالية", "تمويل", "إدارة مخاطر"], + "موارد بشرية": ["توظيف", "تدريب", "تطوير مهني", "رواتب", "شؤون موظفين"], + "قانونية": ["استشارات قانونية", "عقود", "ملكية فكرية", "امتثال", "تراخيص"], + "مبيعات": ["مبيعات مباشرة", "مبيعات قنوات", "تطوير أعمال", "إدارة حسابات", "عروض أسعار"], +} + + +# ── Models ────────────────────────────────────────────────────────────────── + + +class EcosystemEntity(BaseModel): + """A node in the ecosystem graph representing a company or organization.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + name: str + name_ar: str = "" + entity_type: str = "partner" # agency, integrator, reseller, consultant, distributor + industry: str = "" + city: str = "" + capabilities: list[str] = Field(default_factory=list) + relationship_strength: float = Field(0.0, ge=0.0, le=1.0) + partner_potential: float = Field(0.0, ge=0.0, le=1.0) + profile_id: Optional[str] = None + + class Config: + json_schema_extra = { + "example": { + "name": "DataSphere Solutions", + "name_ar": "حلول داتا سفير", + "entity_type": "integrator", + "industry": "technology", + "city": "الرياض", + "capabilities": ["حوسبة سحابية", "أمن سيبراني"], + "relationship_strength": 0.8, + "partner_potential": 0.75, + } + } + + +class EcosystemLink(BaseModel): + """An edge in the ecosystem graph representing a relationship.""" + source_id: str + target_id: str + link_type: str = "partner" # partner, competitor, vendor, client + strength: float = Field(0.5, ge=0.0, le=1.0) + description_ar: str = "" + + +# ── Ecosystem Mapper Engine ───────────────────────────────────────────────── + + +class EcosystemMapper: + """ + Builds, analyzes, and visualizes B2B ecosystem maps. + Identifies gaps, suggests partners, and monitors ecosystem health. + بناء وتحليل وعرض خرائط منظومة الأعمال — تحديد الفجوات واقتراح الشركاء + """ + + def __init__(self): + self.llm = get_llm() + + # ── Build Map ─────────────────────────────────────────────────────────── + + async def build_map( + self, + tenant_id: str, + db: AsyncSession, + ) -> dict: + """ + Build a complete ecosystem map from company profiles and deal history. + بناء خريطة منظومة كاملة من ملفات الشركات وتاريخ الصفقات + """ + result = await db.execute( + select(CompanyProfile).where(CompanyProfile.tenant_id == tenant_id) + ) + profiles = result.scalars().all() + + if not profiles: + logger.info("No profiles found for tenant %s", tenant_id) + return {"entities": [], "links": [], "stats": {}} + + entities: list[EcosystemEntity] = [] + links: list[EcosystemLink] = [] + + # Build entity nodes from profiles + entity_map: dict[str, EcosystemEntity] = {} + for profile in profiles: + entity_type = self._infer_entity_type(profile) + entity = EcosystemEntity( + name=profile.company_name or "", + name_ar=profile.company_name_ar if hasattr(profile, "company_name_ar") else "", + entity_type=entity_type, + industry=profile.industry or "", + city=profile.region or "", + capabilities=[c for c in (profile.capabilities or [])], + relationship_strength=float(profile.trust_score or 0.5), + partner_potential=0.0, + profile_id=str(profile.id), + ) + entities.append(entity) + entity_map[str(profile.id)] = entity + + # Infer links based on capability/need overlap and industry relationships + profile_list = list(profiles) + for i, prof_a in enumerate(profile_list): + for prof_b in profile_list[i + 1:]: + link_type, strength = self._infer_link(prof_a, prof_b) + if strength >= 0.2: + entity_a = entity_map.get(str(prof_a.id)) + entity_b = entity_map.get(str(prof_b.id)) + if entity_a and entity_b: + link = EcosystemLink( + source_id=entity_a.id, + target_id=entity_b.id, + link_type=link_type, + strength=round(strength, 4), + description_ar=self._link_description( + prof_a.company_name, prof_b.company_name, link_type + ), + ) + links.append(link) + + # Compute partner potential for each entity + for entity in entities: + incoming = [lk for lk in links if lk.target_id == entity.id] + outgoing = [lk for lk in links if lk.source_id == entity.id] + partner_links = [ + lk for lk in incoming + outgoing + if lk.link_type in ("partner", "referral") + ] + if partner_links: + entity.partner_potential = round( + sum(lk.strength for lk in partner_links) / len(partner_links), 4 + ) + + stats = { + "total_entities": len(entities), + "total_links": len(links), + "entity_types": defaultdict(int), + "link_types": defaultdict(int), + "avg_relationship_strength": 0.0, + } + for e in entities: + stats["entity_types"][e.entity_type] += 1 + for lk in links: + stats["link_types"][lk.link_type] += 1 + if entities: + stats["avg_relationship_strength"] = round( + sum(e.relationship_strength for e in entities) / len(entities), 4 + ) + stats["entity_types"] = dict(stats["entity_types"]) + stats["link_types"] = dict(stats["link_types"]) + + logger.info( + "Built ecosystem map for tenant %s: %d entities, %d links", + tenant_id, len(entities), len(links), + ) + + return { + "entities": [e.model_dump() for e in entities], + "links": [lk.model_dump() for lk in links], + "stats": stats, + } + + # ── Find Gaps ─────────────────────────────────────────────────────────── + + async def find_gaps( + self, + tenant_id: str, + db: AsyncSession, + ) -> list[dict]: + """ + Identify underserved areas in the ecosystem where partners are missing. + تحديد المناطق غير المخدومة في المنظومة حيث ينقص الشركاء + """ + result = await db.execute( + select(CompanyProfile).where(CompanyProfile.tenant_id == tenant_id) + ) + profiles = result.scalars().all() + + if not profiles: + return [] + + # Collect all capabilities and all needs across the ecosystem + all_capabilities: set[str] = set() + all_needs: set[str] = set() + for profile in profiles: + for cap in (profile.capabilities or []): + all_capabilities.add(cap.lower().strip()) + for need in (profile.needs or []): + all_needs.add(need.lower().strip()) + + # Gaps: needs that no one in the ecosystem can fulfill + unmet_needs = all_needs - all_capabilities + + # Cluster-level gaps: entire capability clusters with low coverage + cluster_gaps: list[dict] = [] + for cluster_name, cluster_caps in CAPABILITY_CLUSTERS.items(): + cluster_lower = {c.lower() for c in cluster_caps} + covered = cluster_lower & all_capabilities + coverage = len(covered) / len(cluster_lower) if cluster_lower else 0 + if coverage < 0.3: + cluster_gaps.append({ + "gap_type": "cluster", + "cluster_name_ar": cluster_name, + "coverage": round(coverage, 4), + "missing_capabilities": list(cluster_lower - all_capabilities), + "recommendation_ar": f"المنظومة تفتقر لشركاء في مجال {cluster_name} — التغطية {coverage:.0%} فقط", + }) + + # Individual unmet needs + individual_gaps = [ + { + "gap_type": "unmet_need", + "need": need, + "recommendation_ar": f"لا يوجد شريك يقدم: {need}", + } + for need in sorted(unmet_needs)[:20] + ] + + gaps = cluster_gaps + individual_gaps + + logger.info( + "Found %d ecosystem gaps for tenant %s (%d cluster, %d individual)", + len(gaps), tenant_id, len(cluster_gaps), len(individual_gaps), + ) + return gaps + + # ── Suggest Partners ──────────────────────────────────────────────────── + + async def suggest_partners( + self, + gap_type: str, + tenant_id: str, + db: AsyncSession, + ) -> list[EcosystemEntity]: + """ + Suggest potential partners to fill an ecosystem gap. + اقتراح شركاء محتملين لسد فجوة في المنظومة + """ + gaps = await self.find_gaps(tenant_id, db) + + matching_gaps = [g for g in gaps if g.get("gap_type") == gap_type] + if not matching_gaps: + matching_gaps = gaps[:3] + + gap_summary = json.dumps(matching_gaps[:5], ensure_ascii=False) + + context = f"""فجوات المنظومة: +{gap_summary} + +نوع الفجوة المطلوب: {gap_type}""" + + system_prompt = """أنت مستشار تطوير أعمال سعودي. بناءً على فجوات المنظومة، اقترح شركاء محتملين. + +Return JSON: +{ + "suggestions": [ + { + "name": "اسم النوع المقترح بالإنجليزي", + "name_ar": "اسم النوع المقترح بالعربي", + "entity_type": "agency/integrator/reseller/consultant/distributor", + "industry": "القطاع", + "capabilities": ["قدرة ١", "قدرة ٢"], + "rationale_ar": "سبب الاقتراح بالعربي", + "partner_potential": 0.0 to 1.0 + } + ] +}""" + + try: + llm_response = await self.llm.complete( + system_prompt=system_prompt, + user_message=context, + json_mode=True, + temperature=0.4, + ) + result = llm_response.parse_json() or {} + suggestions_data = result.get("suggestions", []) + except Exception as exc: + logger.warning("LLM partner suggestion failed: %s", exc) + suggestions_data = [ + { + "name": f"Partner for {gap_type}", + "name_ar": f"شريك لسد فجوة {gap_type}", + "entity_type": "consultant", + "industry": "consulting", + "capabilities": [g.get("need", "") for g in matching_gaps if g.get("need")], + "rationale_ar": "اقتراح تلقائي بناءً على الفجوات المكتشفة", + "partner_potential": 0.5, + } + ] + + entities: list[EcosystemEntity] = [] + for s in suggestions_data: + entity = EcosystemEntity( + name=s.get("name", ""), + name_ar=s.get("name_ar", ""), + entity_type=s.get("entity_type", "consultant"), + industry=s.get("industry", ""), + capabilities=s.get("capabilities", []), + relationship_strength=0.0, + partner_potential=min(1.0, max(0.0, float(s.get("partner_potential", 0.5)))), + ) + entities.append(entity) + + logger.info( + "Suggested %d partners for gap '%s' in tenant %s", + len(entities), gap_type, tenant_id, + ) + return entities + + # ── Get Clusters ──────────────────────────────────────────────────────── + + async def get_clusters( + self, + tenant_id: str, + db: AsyncSession, + ) -> list[dict]: + """ + Identify clusters of related entities in the ecosystem. + تحديد تجمعات الكيانات المترابطة في المنظومة + """ + eco_map = await self.build_map(tenant_id, db) + entities = eco_map.get("entities", []) + links = eco_map.get("links", []) + + if not entities: + return [] + + # Group entities by industry + industry_groups: dict[str, list[dict]] = defaultdict(list) + for entity in entities: + industry_groups[entity.get("industry", "other")].append(entity) + + clusters: list[dict] = [] + for industry, members in industry_groups.items(): + if not members: + continue + + member_ids = {m["id"] for m in members} + internal_links = [ + lk for lk in links + if lk.get("source_id") in member_ids and lk.get("target_id") in member_ids + ] + external_links = [ + lk for lk in links + if (lk.get("source_id") in member_ids) != (lk.get("target_id") in member_ids) + ] + + avg_strength = 0.0 + if internal_links: + avg_strength = sum(lk.get("strength", 0) for lk in internal_links) / len(internal_links) + + all_caps: set[str] = set() + for m in members: + all_caps.update(m.get("capabilities", [])) + + clusters.append({ + "cluster_name": industry, + "cluster_name_ar": ENTITY_TYPES.get(industry, industry), + "member_count": len(members), + "internal_links": len(internal_links), + "external_links": len(external_links), + "avg_internal_strength": round(avg_strength, 4), + "capabilities": sorted(all_caps), + "members": [{"id": m["id"], "name": m["name"]} for m in members], + }) + + clusters.sort(key=lambda c: c["member_count"], reverse=True) + + logger.info("Identified %d clusters for tenant %s", len(clusters), tenant_id) + return clusters + + # ── Ecosystem Health ──────────────────────────────────────────────────── + + async def get_ecosystem_health( + self, + tenant_id: str, + db: AsyncSession, + ) -> dict: + """ + Calculate ecosystem health metrics: coverage, concentration, resilience. + حساب مؤشرات صحة المنظومة: التغطية والتركيز والمرونة + """ + eco_map = await self.build_map(tenant_id, db) + entities = eco_map.get("entities", []) + links = eco_map.get("links", []) + gaps = await self.find_gaps(tenant_id, db) + + total_entities = len(entities) + total_links = len(links) + total_gaps = len(gaps) + + if total_entities == 0: + return { + "overall_score": 0.0, + "coverage": 0.0, + "concentration_risk": 1.0, + "resilience": 0.0, + "diversity": 0.0, + "gap_count": 0, + "recommendations_ar": ["لا توجد بيانات كافية لتحليل صحة المنظومة"], + } + + # Coverage: ratio of cluster gaps (lower = better coverage) + cluster_gaps = [g for g in gaps if g.get("gap_type") == "cluster"] + total_clusters = len(CAPABILITY_CLUSTERS) + coverage = 1.0 - (len(cluster_gaps) / total_clusters) if total_clusters > 0 else 0.0 + + # Concentration risk: how dependent the ecosystem is on few entities + type_counts = defaultdict(int) + for e in entities: + type_counts[e.get("entity_type", "unknown")] += 1 + max_type_share = max(type_counts.values()) / total_entities if total_entities > 0 else 1.0 + concentration_risk = max_type_share + + # Diversity: number of distinct entity types / total possible + diversity = len(type_counts) / len(ENTITY_TYPES) if ENTITY_TYPES else 0.0 + + # Resilience: avg links per entity (more links = more resilient) + avg_links = total_links / total_entities if total_entities > 0 else 0.0 + resilience = min(1.0, avg_links / 3.0) # 3+ links per entity = max resilience + + # Overall health score + overall = round( + coverage * 0.35 + + (1.0 - concentration_risk) * 0.25 + + resilience * 0.25 + + diversity * 0.15, + 4, + ) + + # Generate recommendations + recommendations_ar: list[str] = [] + if coverage < 0.5: + recommendations_ar.append("تغطية المنظومة ضعيفة — يُنصح بإضافة شركاء في القطاعات الناقصة") + if concentration_risk > 0.6: + recommendations_ar.append("تركيز عالٍ على نوع واحد من الشركاء — يُنصح بالتنويع") + if resilience < 0.4: + recommendations_ar.append("مرونة المنظومة منخفضة — يُنصح بتعزيز الروابط بين الشركاء") + if diversity < 0.5: + recommendations_ar.append("تنوع أنواع الشركاء محدود — يُنصح بإضافة أنواع جديدة") + if total_gaps > 10: + recommendations_ar.append(f"يوجد {total_gaps} فجوة في المنظومة — يُنصح بمعالجة الفجوات الحرجة أولاً") + if not recommendations_ar: + recommendations_ar.append("المنظومة في حالة صحية جيدة — استمر في المراقبة الدورية") + + health = { + "overall_score": overall, + "coverage": round(coverage, 4), + "concentration_risk": round(concentration_risk, 4), + "resilience": round(resilience, 4), + "diversity": round(diversity, 4), + "gap_count": total_gaps, + "total_entities": total_entities, + "total_links": total_links, + "entity_type_distribution": dict(type_counts), + "recommendations_ar": recommendations_ar, + } + + logger.info( + "Ecosystem health for tenant %s: overall=%.2f coverage=%.2f risk=%.2f", + tenant_id, overall, coverage, concentration_risk, + ) + return health + + # ── Private Helpers ───────────────────────────────────────────────────── + + def _infer_entity_type(self, profile: CompanyProfile) -> str: + """Infer entity type from company profile characteristics.""" + caps = {c.lower() for c in (profile.capabilities or [])} + industry = (profile.industry or "").lower() + + if industry == "consulting" or "استشارات" in caps: + return "consultant" + if "توزيع" in caps or "distribution" in industry: + return "distributor" + if "تكامل" in caps or "integration" in industry or "تكامل أنظمة" in caps: + return "integrator" + if "إعادة بيع" in caps or "reselling" in industry: + return "reseller" + if industry in ("marketing", "media") or "تسويق" in caps: + return "agency" + return "partner" + + def _infer_link( + self, prof_a: CompanyProfile, prof_b: CompanyProfile, + ) -> tuple[str, float]: + """Infer the link type and strength between two profiles.""" + caps_a = {c.lower() for c in (prof_a.capabilities or [])} + caps_b = {c.lower() for c in (prof_b.capabilities or [])} + needs_a = {n.lower() for n in (prof_a.needs or [])} + needs_b = {n.lower() for n in (prof_b.needs or [])} + + # Check if they are in the same industry (potential competitors) + same_industry = (prof_a.industry or "") == (prof_b.industry or "") and prof_a.industry + + # Check vendor/client: A offers what B needs + a_serves_b = len(caps_a & needs_b) + b_serves_a = len(caps_b & needs_a) + + if a_serves_b > 0 and b_serves_a > 0: + # Mutual exchange = partnership + strength = min(1.0, (a_serves_b + b_serves_a) / max(len(needs_a | needs_b), 1) * 2) + return "partner", round(strength, 4) + elif a_serves_b > 0: + strength = min(1.0, a_serves_b / max(len(needs_b), 1)) + return "vendor", round(strength, 4) + elif b_serves_a > 0: + strength = min(1.0, b_serves_a / max(len(needs_a), 1)) + return "client", round(strength, 4) + elif same_industry and caps_a & caps_b: + overlap = len(caps_a & caps_b) / max(len(caps_a | caps_b), 1) + return "competitor", round(overlap, 4) + else: + return "partner", 0.1 + + def _link_description(self, name_a: str, name_b: str, link_type: str) -> str: + """Generate Arabic description for a link.""" + descriptions = { + "partner": f"{name_a} و{name_b} شركاء محتملون", + "competitor": f"{name_a} و{name_b} في نفس المجال التنافسي", + "vendor": f"{name_a} مورد محتمل لـ{name_b}", + "client": f"{name_a} عميل محتمل لـ{name_b}", + "referral": f"{name_a} و{name_b} في شبكة إحالات مشتركة", + } + return descriptions.get(link_type, f"علاقة بين {name_a} و{name_b}") diff --git a/salesflow-saas/backend/app/services/strategic_deals/portfolio_intelligence.py b/salesflow-saas/backend/app/services/strategic_deals/portfolio_intelligence.py new file mode 100644 index 00000000..4d04c0a1 --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/portfolio_intelligence.py @@ -0,0 +1,573 @@ +""" +Portfolio Intelligence — AI-driven insights across the deal portfolio. +ذكاء المحفظة: رؤى مدعومة بالذكاء الاصطناعي عبر محفظة الصفقات +""" + +import json +import logging +from collections import defaultdict +from typing import Optional + +from pydantic import BaseModel, Field +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.strategic_deal import CompanyProfile, DealMatch, StrategicDeal +from app.services.llm.provider import get_llm + +logger = logging.getLogger("dealix.strategic_deals.portfolio_intelligence") + +# ── Vertical definitions (Saudi market) ───────────────────────────────────── + +VERTICALS = { + "technology": "تقنية المعلومات", + "construction": "مقاولات وبناء", + "real_estate": "عقارات", + "retail": "تجارة تجزئة", + "wholesale": "تجارة جملة", + "healthcare": "رعاية صحية", + "education": "تعليم وتدريب", + "food_beverage": "أغذية ومشروبات", + "logistics": "نقل ولوجستيات", + "finance": "خدمات مالية", + "energy": "طاقة", + "tourism": "سياحة وضيافة", + "consulting": "استشارات", + "marketing": "تسويق وإعلان", + "manufacturing": "صناعة", + "telecom": "اتصالات", + "media": "إعلام وترفيه", + "agriculture": "زراعة", + "automotive": "سيارات", + "government": "قطاع حكومي", +} + +DEAL_TYPE_LABELS = { + "partnership": "شراكة", + "distribution": "توزيع", + "franchise": "امتياز", + "jv": "مشروع مشترك", + "referral": "إحالة", + "acquisition": "استحواذ", + "barter": "مقايضة", + "reseller": "إعادة بيع", +} + + +# ── Models ────────────────────────────────────────────────────────────────── + + +class PortfolioInsight(BaseModel): + """A single intelligence insight derived from portfolio analysis.""" + insight_type: str # top_vertical, best_deal_type, best_partner_archetype, gap, productization + title: str = "" + title_ar: str = "" + data: dict = Field(default_factory=dict) + confidence: float = Field(0.5, ge=0.0, le=1.0) + recommendation: str = "" + recommendation_ar: str = "" + + class Config: + json_schema_extra = { + "example": { + "insight_type": "top_vertical", + "title": "Technology is the best-performing vertical", + "title_ar": "قطاع التقنية هو الأفضل أداءً", + "data": {"vertical": "technology", "deal_count": 15, "avg_score": 0.82}, + "confidence": 0.85, + "recommendation_ar": "زيادة التركيز على صفقات قطاع التقنية", + } + } + + +# ── Portfolio Intelligence Engine ─────────────────────────────────────────── + + +class PortfolioIntelligence: + """ + Analyzes the entire deal portfolio to surface actionable insights. + Identifies top verticals, best deal structures, gaps, and productization opportunities. + يحلل محفظة الصفقات بالكامل لاستخراج رؤى قابلة للتنفيذ + """ + + def __init__(self): + self.llm = get_llm() + + # ── Full Analysis ─────────────────────────────────────────────────────── + + async def analyze( + self, + tenant_id: str, + period: str = "quarterly", + db: AsyncSession = None, + ) -> list[PortfolioInsight]: + """ + Run a complete portfolio analysis and return all insights. + تحليل شامل للمحفظة واستخراج جميع الرؤى + """ + if db is None: + raise ValueError("Database session is required") + + insights: list[PortfolioInsight] = [] + + # Run all analysis types in sequence + verticals = await self.get_top_verticals(tenant_id, db) + if verticals: + top = verticals[0] + insights.append(PortfolioInsight( + insight_type="top_vertical", + title=f"Top vertical: {top.get('vertical', 'unknown')}", + title_ar=f"القطاع الأفضل: {top.get('vertical_ar', 'غير محدد')}", + data=top, + confidence=min(0.95, top.get("deal_count", 0) / 20), + recommendation=f"Increase focus on {top.get('vertical', '')} deals", + recommendation_ar=f"زيادة التركيز على صفقات قطاع {top.get('vertical_ar', '')}", + )) + + deal_types = await self.get_best_deal_types(tenant_id, db) + if deal_types: + best = deal_types[0] + insights.append(PortfolioInsight( + insight_type="best_deal_type", + title=f"Best deal type: {best.get('deal_type', 'unknown')}", + title_ar=f"أفضل نوع صفقة: {best.get('deal_type_ar', 'غير محدد')}", + data=best, + confidence=min(0.90, best.get("count", 0) / 15), + recommendation=f"Prioritize {best.get('deal_type', '')} deals", + recommendation_ar=f"إعطاء الأولوية لصفقات {best.get('deal_type_ar', '')}", + )) + + archetypes = await self.get_best_partner_archetypes(tenant_id, db) + if archetypes: + best_arch = archetypes[0] + insights.append(PortfolioInsight( + insight_type="best_partner_archetype", + title=f"Best partner type: {best_arch.get('archetype', 'unknown')}", + title_ar=f"أفضل نوع شريك: {best_arch.get('archetype_ar', 'غير محدد')}", + data=best_arch, + confidence=min(0.85, best_arch.get("count", 0) / 10), + recommendation_ar=f"البحث عن شركاء من نوع {best_arch.get('archetype_ar', '')}", + )) + + gaps = await self.get_repeated_gaps(tenant_id, db) + for gap in gaps[:3]: + insights.append(PortfolioInsight( + insight_type="repeated_gap", + title=f"Repeated gap: {gap.get('gap', '')}", + title_ar=f"فجوة متكررة: {gap.get('gap', '')}", + data=gap, + confidence=min(0.80, gap.get("frequency", 0) / 5), + recommendation_ar=f"سد فجوة: {gap.get('gap', '')} — تكررت {gap.get('frequency', 0)} مرات", + )) + + products = await self.get_productization_candidates(tenant_id, db) + for prod in products[:2]: + insights.append(PortfolioInsight( + insight_type="productization", + title=f"Productization candidate: {prod.get('capability', '')}", + title_ar=f"فرصة تحويل لمنتج: {prod.get('capability', '')}", + data=prod, + confidence=min(0.75, prod.get("demand_count", 0) / 8), + recommendation_ar=f"تحويل «{prod.get('capability', '')}» إلى منتج قابل للبيع", + )) + + # Sort by confidence descending + insights.sort(key=lambda i: i.confidence, reverse=True) + + logger.info( + "Portfolio analysis for tenant %s (%s): %d insights", + tenant_id, period, len(insights), + ) + return insights + + # ── Top Verticals ─────────────────────────────────────────────────────── + + async def get_top_verticals( + self, + tenant_id: str, + db: AsyncSession, + ) -> list[dict]: + """ + Identify the highest-performing industry verticals by deal volume and score. + تحديد القطاعات الصناعية الأفضل أداءً حسب حجم الصفقات والتقييم + """ + result = await db.execute( + select(CompanyProfile).where(CompanyProfile.tenant_id == tenant_id) + ) + profiles = result.scalars().all() + + # Count deals and avg scores per industry + industry_stats: dict[str, dict] = defaultdict( + lambda: {"deal_count": 0, "total_score": 0.0, "total_revenue": 0.0, "companies": 0} + ) + + for profile in profiles: + industry = profile.industry or "other" + industry_stats[industry]["companies"] += 1 + industry_stats[industry]["total_revenue"] += float(profile.annual_revenue_sar or 0) + industry_stats[industry]["total_score"] += float(profile.trust_score or 0) + + # Get match counts per industry + matches_result = await db.execute( + select(DealMatch).where(DealMatch.tenant_id == tenant_id) + ) + matches = matches_result.scalars().all() + + profile_industry: dict[str, str] = {} + for p in profiles: + profile_industry[str(p.id)] = p.industry or "other" + + for match in matches: + industry_a = profile_industry.get(str(match.company_a_id), "other") + industry_stats[industry_a]["deal_count"] += 1 + + # Build ranked list + verticals: list[dict] = [] + for industry, stats in industry_stats.items(): + companies = stats["companies"] + avg_score = stats["total_score"] / companies if companies > 0 else 0 + verticals.append({ + "vertical": industry, + "vertical_ar": VERTICALS.get(industry, industry), + "deal_count": stats["deal_count"], + "company_count": companies, + "avg_trust_score": round(avg_score, 4), + "total_revenue_sar": round(stats["total_revenue"], 2), + "performance_score": round( + stats["deal_count"] * 0.4 + avg_score * 0.3 + min(companies / 10, 1) * 0.3, 4 + ), + }) + + verticals.sort(key=lambda v: v["performance_score"], reverse=True) + + logger.info("Top verticals for tenant %s: %d industries analyzed", tenant_id, len(verticals)) + return verticals + + # ── Best Deal Types ───────────────────────────────────────────────────── + + async def get_best_deal_types( + self, + tenant_id: str, + db: AsyncSession, + ) -> list[dict]: + """ + Determine which deal types yield the best results. + تحديد أنواع الصفقات الأكثر نجاحاً + """ + matches_result = await db.execute( + select(DealMatch).where(DealMatch.tenant_id == tenant_id) + ) + matches = matches_result.scalars().all() + + type_stats: dict[str, dict] = defaultdict( + lambda: {"count": 0, "total_score": 0.0, "accepted": 0} + ) + + for match in matches: + deal_type = match.deal_type_suggested or "unknown" + type_stats[deal_type]["count"] += 1 + type_stats[deal_type]["total_score"] += float(match.match_score or 0) + if match.status in ("accepted", "signed", "active"): + type_stats[deal_type]["accepted"] += 1 + + deal_types: list[dict] = [] + for dt, stats in type_stats.items(): + count = stats["count"] + avg_score = stats["total_score"] / count if count > 0 else 0 + acceptance_rate = stats["accepted"] / count if count > 0 else 0 + + deal_types.append({ + "deal_type": dt, + "deal_type_ar": DEAL_TYPE_LABELS.get(dt, dt), + "count": count, + "avg_match_score": round(avg_score, 4), + "acceptance_rate": round(acceptance_rate, 4), + "effectiveness_score": round( + avg_score * 0.4 + acceptance_rate * 0.4 + min(count / 20, 1) * 0.2, 4 + ), + }) + + deal_types.sort(key=lambda d: d["effectiveness_score"], reverse=True) + + logger.info("Best deal types for tenant %s: %d types analyzed", tenant_id, len(deal_types)) + return deal_types + + # ── Best Partner Archetypes ───────────────────────────────────────────── + + async def get_best_partner_archetypes( + self, + tenant_id: str, + db: AsyncSession, + ) -> list[dict]: + """ + Identify the most successful partner archetypes (size, industry, type). + تحديد أنماط الشركاء الأكثر نجاحاً (الحجم، القطاع، النوع) + """ + result = await db.execute( + select(CompanyProfile).where(CompanyProfile.tenant_id == tenant_id) + ) + profiles = result.scalars().all() + + matches_result = await db.execute( + select(DealMatch).where(DealMatch.tenant_id == tenant_id) + ) + matches = matches_result.scalars().all() + + # Build profile lookup + profile_map: dict[str, CompanyProfile] = {} + for p in profiles: + profile_map[str(p.id)] = p + + # Analyze successful matches to derive archetypes + archetype_stats: dict[str, dict] = defaultdict( + lambda: {"count": 0, "total_score": 0.0, "examples": []} + ) + + for match in matches: + partner_id = str(match.company_b_id) if match.company_b_id else None + if not partner_id or partner_id not in profile_map: + continue + + partner = profile_map[partner_id] + emp_count = int(partner.employee_count or 0) + + if emp_count > 500: + size_bucket = "enterprise" + size_ar = "مؤسسة كبيرة" + elif emp_count > 50: + size_bucket = "mid_market" + size_ar = "سوق متوسط" + elif emp_count > 10: + size_bucket = "smb" + size_ar = "أعمال صغيرة ومتوسطة" + else: + size_bucket = "startup" + size_ar = "شركة ناشئة" + + archetype_key = f"{partner.industry or 'unknown'}_{size_bucket}" + archetype_stats[archetype_key]["count"] += 1 + archetype_stats[archetype_key]["total_score"] += float(match.match_score or 0) + archetype_stats[archetype_key]["industry"] = partner.industry or "unknown" + archetype_stats[archetype_key]["size"] = size_bucket + archetype_stats[archetype_key]["size_ar"] = size_ar + archetype_stats[archetype_key]["industry_ar"] = VERTICALS.get(partner.industry or "", partner.industry or "") + if len(archetype_stats[archetype_key]["examples"]) < 3: + archetype_stats[archetype_key]["examples"].append(partner.company_name) + + archetypes: list[dict] = [] + for key, stats in archetype_stats.items(): + count = stats["count"] + avg_score = stats["total_score"] / count if count > 0 else 0 + archetype_label = f"{stats.get('industry_ar', '')} - {stats.get('size_ar', '')}" + + archetypes.append({ + "archetype": key, + "archetype_ar": archetype_label, + "industry": stats.get("industry", ""), + "size": stats.get("size", ""), + "count": count, + "avg_match_score": round(avg_score, 4), + "examples": stats.get("examples", []), + "score": round(avg_score * 0.6 + min(count / 10, 1) * 0.4, 4), + }) + + archetypes.sort(key=lambda a: a["score"], reverse=True) + + logger.info("Partner archetypes for tenant %s: %d archetypes", tenant_id, len(archetypes)) + return archetypes + + # ── Repeated Gaps ─────────────────────────────────────────────────────── + + async def get_repeated_gaps( + self, + tenant_id: str, + db: AsyncSession, + ) -> list[dict]: + """ + Find needs that repeatedly appear but are never fulfilled in the portfolio. + اكتشاف الاحتياجات التي تتكرر ولا يتم تلبيتها في المحفظة + """ + result = await db.execute( + select(CompanyProfile).where(CompanyProfile.tenant_id == tenant_id) + ) + profiles = result.scalars().all() + + all_needs: dict[str, int] = defaultdict(int) + all_caps: set[str] = set() + + for profile in profiles: + for need in (profile.needs or []): + all_needs[need.lower().strip()] += 1 + for cap in (profile.capabilities or []): + all_caps.add(cap.lower().strip()) + + # Gaps: needs that appear multiple times but nobody offers + gaps: list[dict] = [] + for need, frequency in sorted(all_needs.items(), key=lambda x: x[1], reverse=True): + if need not in all_caps and frequency >= 2: + gaps.append({ + "gap": need, + "frequency": frequency, + "severity": "high" if frequency >= 5 else ("medium" if frequency >= 3 else "low"), + "severity_ar": "عالية" if frequency >= 5 else ("متوسطة" if frequency >= 3 else "منخفضة"), + "recommendation_ar": f"البحث عن شريك يقدم «{need}» — مطلوب من {frequency} شركة", + }) + + logger.info("Repeated gaps for tenant %s: %d gaps found", tenant_id, len(gaps)) + return gaps + + # ── Productization Candidates ─────────────────────────────────────────── + + async def get_productization_candidates( + self, + tenant_id: str, + db: AsyncSession, + ) -> list[dict]: + """ + Identify capabilities with high demand that could become standalone products. + تحديد القدرات ذات الطلب العالي التي يمكن تحويلها لمنتجات مستقلة + """ + result = await db.execute( + select(CompanyProfile).where(CompanyProfile.tenant_id == tenant_id) + ) + profiles = result.scalars().all() + + # Count how many companies need each capability vs how many offer it + cap_supply: dict[str, int] = defaultdict(int) + cap_demand: dict[str, int] = defaultdict(int) + + for profile in profiles: + for cap in (profile.capabilities or []): + cap_supply[cap.lower().strip()] += 1 + for need in (profile.needs or []): + cap_demand[need.lower().strip()] += 1 + + candidates: list[dict] = [] + for capability, demand_count in cap_demand.items(): + supply_count = cap_supply.get(capability, 0) + if demand_count >= 3 and supply_count <= 1: + demand_supply_ratio = demand_count / max(supply_count, 1) + candidates.append({ + "capability": capability, + "demand_count": demand_count, + "supply_count": supply_count, + "demand_supply_ratio": round(demand_supply_ratio, 2), + "market_potential": "عالي" if demand_supply_ratio > 5 else ("متوسط" if demand_supply_ratio > 2 else "منخفض"), + "recommendation_ar": ( + f"فرصة لتحويل «{capability}» إلى منتج — " + f"مطلوب من {demand_count} شركة ومتوفر عند {supply_count} فقط" + ), + }) + + candidates.sort(key=lambda c: c["demand_supply_ratio"], reverse=True) + + logger.info( + "Productization candidates for tenant %s: %d candidates", + tenant_id, len(candidates), + ) + return candidates + + # ── Quarterly Report ──────────────────────────────────────────────────── + + async def generate_quarterly_report( + self, + tenant_id: str, + db: AsyncSession, + ) -> str: + """ + Generate a comprehensive Arabic quarterly portfolio intelligence report. + إنشاء تقرير ذكاء محفظة ربع سنوي شامل بالعربي + """ + insights = await self.analyze(tenant_id, period="quarterly", db=db) + verticals = await self.get_top_verticals(tenant_id, db) + deal_types = await self.get_best_deal_types(tenant_id, db) + gaps = await self.get_repeated_gaps(tenant_id, db) + products = await self.get_productization_candidates(tenant_id, db) + + # Build context for LLM + context_parts = [ + f"عدد الرؤى المستخرجة: {len(insights)}", + f"القطاعات الأفضل أداءً: {json.dumps(verticals[:5], ensure_ascii=False)}", + f"أنواع الصفقات الأنجح: {json.dumps(deal_types[:5], ensure_ascii=False)}", + f"الفجوات المتكررة: {json.dumps(gaps[:5], ensure_ascii=False)}", + f"فرص التحويل لمنتجات: {json.dumps(products[:5], ensure_ascii=False)}", + ] + + top_insights = [] + for ins in insights[:5]: + top_insights.append(f"- {ins.title_ar} (ثقة: {ins.confidence:.0%}): {ins.recommendation_ar}") + + context_parts.append(f"أبرز الرؤى:\n" + "\n".join(top_insights)) + + context = "\n\n".join(context_parts) + + system_prompt = """أنت محلل استراتيجي سعودي خبير. اكتب تقرير ذكاء محفظة ربع سنوي شامل بالعربي. + +يجب أن يشمل التقرير: +١. ملخص تنفيذي +٢. أداء القطاعات — أي القطاعات تحقق أفضل النتائج +٣. تحليل أنواع الصفقات — أي الهياكل أنجح +٤. الفجوات الاستراتيجية — ما ينقص المنظومة +٥. فرص التحويل لمنتجات — خدمات يمكن تعبئتها كمنتجات +٦. التوصيات الاستراتيجية — ٣-٥ توصيات محددة +٧. خطة العمل للربع القادم + +اكتب بأسلوب تنفيذي رسمي مناسب لمجلس الإدارة. استخدم الأرقام والنسب.""" + + try: + llm_response = await self.llm.complete( + system_prompt=system_prompt, + user_message=context, + temperature=0.3, + ) + report = llm_response.content.strip() + except Exception as exc: + logger.error("Quarterly report generation failed: %s", exc) + # Build a structured fallback report + report_parts = [ + "تقرير ذكاء المحفظة — الربع الحالي", + "=" * 40, + "", + "ملخص تنفيذي:", + f"تم تحليل المحفظة واستخراج {len(insights)} رؤية استراتيجية.", + "", + ] + + if verticals: + report_parts.append("القطاعات الأفضل أداءً:") + for v in verticals[:3]: + report_parts.append( + f" - {v.get('vertical_ar', '')}: " + f"{v.get('deal_count', 0)} صفقة، " + f"تقييم {v.get('avg_trust_score', 0):.2f}" + ) + report_parts.append("") + + if deal_types: + report_parts.append("أنواع الصفقات الأنجح:") + for dt in deal_types[:3]: + report_parts.append( + f" - {dt.get('deal_type_ar', '')}: " + f"{dt.get('count', 0)} صفقة، " + f"فعالية {dt.get('effectiveness_score', 0):.2f}" + ) + report_parts.append("") + + if gaps: + report_parts.append("الفجوات المتكررة:") + for g in gaps[:3]: + report_parts.append(f" - {g.get('gap', '')}: تكررت {g.get('frequency', 0)} مرات") + report_parts.append("") + + if products: + report_parts.append("فرص التحويل لمنتجات:") + for p in products[:3]: + report_parts.append( + f" - {p.get('capability', '')}: " + f"الطلب {p.get('demand_count', 0)} / العرض {p.get('supply_count', 0)}" + ) + + report = "\n".join(report_parts) + + logger.info("Generated quarterly report for tenant %s", tenant_id) + return report diff --git a/salesflow-saas/backend/app/services/strategic_deals/roi_engine.py b/salesflow-saas/backend/app/services/strategic_deals/roi_engine.py new file mode 100644 index 00000000..c5bb9f01 --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/roi_engine.py @@ -0,0 +1,484 @@ +""" +ROI Engine — Return on Investment calculator for strategic B2B initiatives. +محرك العائد على الاستثمار: حاسبة العائد على الاستثمار للمبادرات الاستراتيجية +""" + +import json +import logging +import math +from typing import Optional + +from pydantic import BaseModel, Field +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.strategic_deal import CompanyProfile +from app.services.llm.provider import get_llm + +logger = logging.getLogger("dealix.strategic_deals.roi_engine") + +# ── Initiative type benchmarks (Saudi market) ─────────────────────────────── + +INITIATIVE_BENCHMARKS = { + "partnership": { + "avg_roi_pct": 0.45, + "avg_payback_months": 8, + "cac_reduction_range": (0.10, 0.30), + "margin_impact_range": (0.02, 0.08), + }, + "acquisition": { + "avg_roi_pct": 0.25, + "avg_payback_months": 24, + "cac_reduction_range": (0.15, 0.40), + "margin_impact_range": (0.05, 0.15), + }, + "channel_expansion": { + "avg_roi_pct": 0.60, + "avg_payback_months": 6, + "cac_reduction_range": (0.05, 0.20), + "margin_impact_range": (0.01, 0.05), + }, + "market_entry": { + "avg_roi_pct": 0.30, + "avg_payback_months": 18, + "cac_reduction_range": (0.00, 0.10), + "margin_impact_range": (0.03, 0.10), + }, + "digital_transformation": { + "avg_roi_pct": 0.55, + "avg_payback_months": 12, + "cac_reduction_range": (0.20, 0.50), + "margin_impact_range": (0.05, 0.12), + }, + "product_launch": { + "avg_roi_pct": 0.40, + "avg_payback_months": 10, + "cac_reduction_range": (0.00, 0.15), + "margin_impact_range": (0.05, 0.20), + }, + "referral_program": { + "avg_roi_pct": 0.80, + "avg_payback_months": 3, + "cac_reduction_range": (0.30, 0.60), + "margin_impact_range": (0.01, 0.03), + }, +} + + +# ── Models ────────────────────────────────────────────────────────────────── + + +class ROICalculation(BaseModel): + """Complete ROI analysis for a strategic initiative.""" + initiative_type: str + investment_sar: float = 0.0 + projected_return_sar: float = 0.0 + roi_percentage: float = 0.0 + payback_months: int = 0 + cac_reduction: float = Field(0.0, ge=0.0, le=1.0) + distribution_value: float = 0.0 + margin_impact: float = 0.0 + risk_adjusted_roi: float = 0.0 + confidence: float = Field(0.5, ge=0.0, le=1.0) + breakdown: dict = Field(default_factory=dict) + summary_ar: str = "" + + class Config: + json_schema_extra = { + "example": { + "initiative_type": "partnership", + "investment_sar": 100_000, + "projected_return_sar": 250_000, + "roi_percentage": 150.0, + "payback_months": 6, + "cac_reduction": 0.20, + "risk_adjusted_roi": 97.5, + "confidence": 0.75, + "summary_ar": "شراكة مع عائد متوقع ٢٥٠ ألف ريال واسترداد خلال ٦ أشهر", + } + } + + +# ── ROI Engine ────────────────────────────────────────────────────────────── + + +class ROIEngine: + """ + Calculates, compares, and projects ROI for strategic B2B initiatives. + يحسب ويقارن ويتوقع العائد على الاستثمار للمبادرات الاستراتيجية + """ + + def __init__(self): + self.llm = get_llm() + self._active_calculations: dict[str, list[ROICalculation]] = {} + + # ── Calculate ROI ─────────────────────────────────────────────────────── + + async def calculate( + self, + initiative_type: str, + params: dict, + db: AsyncSession, + ) -> ROICalculation: + """ + Calculate detailed ROI for a strategic initiative. + حساب العائد على الاستثمار التفصيلي لمبادرة استراتيجية + """ + benchmark = INITIATIVE_BENCHMARKS.get(initiative_type, {}) + + investment = float(params.get("investment_sar", 0)) + if investment <= 0: + raise ValueError("investment_sar must be positive") + + projected_return = float(params.get("projected_return_sar", 0)) + monthly_return = float(params.get("monthly_return_sar", 0)) + duration_months = int(params.get("duration_months", 12)) + risk_factor = min(1.0, max(0.0, float(params.get("risk_factor", 0.3)))) + discount_rate = float(params.get("annual_discount_rate", 0.08)) + + # If projected_return not given, estimate from monthly + if projected_return <= 0 and monthly_return > 0: + projected_return = monthly_return * duration_months + + # If still zero, use benchmark + if projected_return <= 0: + avg_roi = benchmark.get("avg_roi_pct", 0.30) + projected_return = investment * (1 + avg_roi) + + # Core ROI + roi_percentage = ((projected_return - investment) / investment) * 100 if investment > 0 else 0.0 + + # Payback period + if monthly_return > 0: + payback_months = max(1, math.ceil(investment / monthly_return)) + elif projected_return > investment and duration_months > 0: + monthly_est = (projected_return - investment) / duration_months + payback_months = max(1, math.ceil(investment / monthly_est)) if monthly_est > 0 else duration_months + else: + payback_months = benchmark.get("avg_payback_months", 12) + + # CAC reduction estimate + cac_range = benchmark.get("cac_reduction_range", (0.0, 0.15)) + cac_reduction = float(params.get("cac_reduction", (cac_range[0] + cac_range[1]) / 2)) + cac_reduction = min(1.0, max(0.0, cac_reduction)) + + # Margin impact + margin_range = benchmark.get("margin_impact_range", (0.01, 0.05)) + margin_impact = float(params.get("margin_impact", (margin_range[0] + margin_range[1]) / 2)) + + # Distribution / channel value + distribution_value = float(params.get("distribution_value_sar", 0)) + if distribution_value <= 0 and initiative_type in ("channel_expansion", "partnership", "referral_program"): + distribution_value = projected_return * 0.2 + + # Risk-adjusted ROI — discount by risk factor + risk_adjusted_roi = roi_percentage * (1 - risk_factor) + + # NPV-based confidence: higher NPV relative to investment = higher confidence + monthly_discount = discount_rate / 12 + npv = 0.0 + if monthly_return > 0: + for month in range(1, duration_months + 1): + npv += monthly_return / ((1 + monthly_discount) ** month) + else: + monthly_est = projected_return / max(duration_months, 1) + for month in range(1, duration_months + 1): + npv += monthly_est / ((1 + monthly_discount) ** month) + + npv -= investment + npv_ratio = npv / investment if investment > 0 else 0 + confidence = min(0.95, max(0.1, 0.5 + npv_ratio * 0.3)) + + # Detailed breakdown + breakdown = { + "gross_return_sar": round(projected_return, 2), + "net_return_sar": round(projected_return - investment, 2), + "npv_sar": round(npv, 2), + "monthly_return_sar": round(monthly_return or projected_return / max(duration_months, 1), 2), + "duration_months": duration_months, + "risk_factor": risk_factor, + "discount_rate": discount_rate, + "cac_savings_sar": round(investment * cac_reduction, 2), + "distribution_value_sar": round(distribution_value, 2), + "benchmark_avg_roi_pct": benchmark.get("avg_roi_pct", 0) * 100, + "vs_benchmark": "أعلى من المتوسط" if roi_percentage > benchmark.get("avg_roi_pct", 0) * 100 else "أقل من المتوسط", + } + + # Generate Arabic summary + summary_ar = await self._generate_summary( + initiative_type, investment, projected_return, + roi_percentage, payback_months, risk_adjusted_roi, confidence, + ) + + calc = ROICalculation( + initiative_type=initiative_type, + investment_sar=round(investment, 2), + projected_return_sar=round(projected_return, 2), + roi_percentage=round(roi_percentage, 2), + payback_months=payback_months, + cac_reduction=round(cac_reduction, 4), + distribution_value=round(distribution_value, 2), + margin_impact=round(margin_impact, 4), + risk_adjusted_roi=round(risk_adjusted_roi, 2), + confidence=round(confidence, 4), + breakdown=breakdown, + summary_ar=summary_ar, + ) + + # Store for tenant dashboard + tenant_id = params.get("tenant_id", "default") + self._active_calculations.setdefault(tenant_id, []).append(calc) + + logger.info( + "ROI calculated: type=%s investment=%.0f return=%.0f roi=%.1f%% payback=%dm", + initiative_type, investment, projected_return, roi_percentage, payback_months, + ) + return calc + + # ── Compare Initiatives ───────────────────────────────────────────────── + + async def compare_initiatives( + self, + calculations: list[ROICalculation], + ) -> dict: + """ + Rank and compare multiple initiatives by risk-adjusted ROI. + ترتيب ومقارنة عدة مبادرات حسب العائد المعدل بالمخاطر + """ + if not calculations: + return {"ranked": [], "summary_ar": "لا توجد مبادرات للمقارنة"} + + ranked = [] + for calc in calculations: + ranked.append({ + "initiative_type": calc.initiative_type, + "investment_sar": calc.investment_sar, + "projected_return_sar": calc.projected_return_sar, + "roi_percentage": calc.roi_percentage, + "risk_adjusted_roi": calc.risk_adjusted_roi, + "payback_months": calc.payback_months, + "confidence": calc.confidence, + "cac_reduction": calc.cac_reduction, + "margin_impact": calc.margin_impact, + "npv_sar": calc.breakdown.get("npv_sar", 0), + }) + + # Sort by risk-adjusted ROI descending + ranked.sort(key=lambda x: x["risk_adjusted_roi"], reverse=True) + + for i, item in enumerate(ranked): + item["rank"] = i + 1 + + best = ranked[0] + summary_ar = ( + f"تم مقارنة {len(ranked)} مبادرة. " + f"الأفضل: {best['initiative_type']} بعائد معدل {best['risk_adjusted_roi']:.1f}% " + f"واسترداد خلال {best['payback_months']} شهر " + f"بدرجة ثقة {best['confidence']:.0%}." + ) + + total_investment = sum(c.investment_sar for c in calculations) + total_return = sum(c.projected_return_sar for c in calculations) + portfolio_roi = ((total_return - total_investment) / total_investment * 100) if total_investment > 0 else 0 + + logger.info( + "Compared %d initiatives. Best: %s (adj ROI=%.1f%%)", + len(ranked), best["initiative_type"], best["risk_adjusted_roi"], + ) + + return { + "ranked": ranked, + "portfolio_investment_sar": round(total_investment, 2), + "portfolio_return_sar": round(total_return, 2), + "portfolio_roi_pct": round(portfolio_roi, 2), + "summary_ar": summary_ar, + } + + # ── Annual Projection ─────────────────────────────────────────────────── + + async def project_annual( + self, + tenant_id: str, + db: AsyncSession, + ) -> dict: + """ + Project annual returns across all active initiatives for a tenant. + إسقاط العوائد السنوية لجميع المبادرات النشطة للمستأجر + """ + calculations = self._active_calculations.get(tenant_id, []) + + if not calculations: + return { + "tenant_id": tenant_id, + "total_investment_sar": 0, + "projected_annual_return_sar": 0, + "weighted_roi_pct": 0, + "avg_payback_months": 0, + "monthly_projections": [], + "summary_ar": "لا توجد مبادرات نشطة لهذا المستأجر", + } + + total_investment = sum(c.investment_sar for c in calculations) + total_return = sum(c.projected_return_sar for c in calculations) + weighted_roi = ((total_return - total_investment) / total_investment * 100) if total_investment > 0 else 0 + avg_payback = sum(c.payback_months for c in calculations) / len(calculations) + total_cac_savings = sum(c.investment_sar * c.cac_reduction for c in calculations) + + # Monthly projection across all initiatives + monthly_projections = [] + for month in range(1, 13): + month_return = 0.0 + for calc in calculations: + if month >= calc.payback_months: + monthly_est = calc.breakdown.get("monthly_return_sar", 0) + month_return += monthly_est + else: + ramp_ratio = month / max(calc.payback_months, 1) + monthly_est = calc.breakdown.get("monthly_return_sar", 0) * ramp_ratio + month_return += monthly_est + + monthly_projections.append({ + "month": month, + "projected_return_sar": round(month_return, 2), + "cumulative_sar": round( + sum(p["projected_return_sar"] for p in monthly_projections) + month_return, 2 + ), + }) + + summary_ar = ( + f"إجمالي الاستثمار: {total_investment:,.0f} ريال | " + f"العائد السنوي المتوقع: {total_return:,.0f} ريال | " + f"العائد على الاستثمار: {weighted_roi:.1f}% | " + f"متوسط فترة الاسترداد: {avg_payback:.0f} شهر | " + f"وفورات تكلفة الاستحواذ: {total_cac_savings:,.0f} ريال" + ) + + logger.info( + "Annual projection for tenant %s: investment=%.0f return=%.0f roi=%.1f%%", + tenant_id, total_investment, total_return, weighted_roi, + ) + + return { + "tenant_id": tenant_id, + "total_investment_sar": round(total_investment, 2), + "projected_annual_return_sar": round(total_return, 2), + "weighted_roi_pct": round(weighted_roi, 2), + "avg_payback_months": round(avg_payback, 1), + "total_cac_savings_sar": round(total_cac_savings, 2), + "initiative_count": len(calculations), + "monthly_projections": monthly_projections, + "summary_ar": summary_ar, + } + + # ── ROI Dashboard ─────────────────────────────────────────────────────── + + async def get_roi_dashboard( + self, + tenant_id: str, + db: AsyncSession, + ) -> dict: + """ + Get a comprehensive ROI dashboard for all active initiatives. + الحصول على لوحة معلومات شاملة للعائد على الاستثمار لجميع المبادرات النشطة + """ + calculations = self._active_calculations.get(tenant_id, []) + projection = await self.project_annual(tenant_id, db) + + # Group by initiative type + by_type: dict[str, list[ROICalculation]] = {} + for calc in calculations: + by_type.setdefault(calc.initiative_type, []).append(calc) + + type_summaries = [] + for init_type, calcs in by_type.items(): + total_inv = sum(c.investment_sar for c in calcs) + total_ret = sum(c.projected_return_sar for c in calcs) + avg_roi = sum(c.roi_percentage for c in calcs) / len(calcs) + avg_conf = sum(c.confidence for c in calcs) / len(calcs) + + type_summaries.append({ + "initiative_type": init_type, + "count": len(calcs), + "total_investment_sar": round(total_inv, 2), + "total_return_sar": round(total_ret, 2), + "avg_roi_pct": round(avg_roi, 2), + "avg_confidence": round(avg_conf, 4), + }) + + type_summaries.sort(key=lambda x: x["avg_roi_pct"], reverse=True) + + # Top performers + top_performers = sorted(calculations, key=lambda c: c.risk_adjusted_roi, reverse=True)[:5] + top_list = [ + { + "initiative_type": c.initiative_type, + "investment_sar": c.investment_sar, + "roi_pct": c.roi_percentage, + "risk_adjusted_roi": c.risk_adjusted_roi, + "payback_months": c.payback_months, + } + for c in top_performers + ] + + dashboard = { + "tenant_id": tenant_id, + "initiative_count": len(calculations), + "projection": projection, + "by_type": type_summaries, + "top_performers": top_list, + "health": { + "avg_roi_pct": round( + sum(c.roi_percentage for c in calculations) / max(len(calculations), 1), 2 + ), + "avg_confidence": round( + sum(c.confidence for c in calculations) / max(len(calculations), 1), 4 + ), + "total_at_risk_sar": round( + sum(c.investment_sar * (1 - c.confidence) for c in calculations), 2 + ), + }, + } + + logger.info("ROI dashboard for tenant %s: %d initiatives", tenant_id, len(calculations)) + return dashboard + + # ── Private Helpers ───────────────────────────────────────────────────── + + async def _generate_summary( + self, + initiative_type: str, + investment: float, + projected_return: float, + roi_pct: float, + payback_months: int, + risk_adjusted_roi: float, + confidence: float, + ) -> str: + """Generate an Arabic summary for an ROI calculation.""" + context = f"""نوع المبادرة: {initiative_type} +الاستثمار: {investment:,.0f} ريال +العائد المتوقع: {projected_return:,.0f} ريال +العائد على الاستثمار: {roi_pct:.1f}% +فترة الاسترداد: {payback_months} شهر +العائد المعدل بالمخاطر: {risk_adjusted_roi:.1f}% +درجة الثقة: {confidence:.0%}""" + + system_prompt = """أنت محلل مالي سعودي. اكتب ملخصاً موجزاً بالعربي (٢-٣ جمل) يشرح العائد على الاستثمار لهذه المبادرة. +اذكر إذا كان العائد جيداً أو ضعيفاً مقارنة بالسوق وأعطِ توصية مختصرة. +اكتب الملخص مباشرة بدون JSON.""" + + try: + llm_response = await self.llm.complete( + system_prompt=system_prompt, + user_message=context, + temperature=0.3, + ) + return llm_response.content.strip() + except Exception as exc: + logger.warning("LLM summary generation failed: %s", exc) + verdict = "عائد جيد" if roi_pct > 30 else ("عائد متوسط" if roi_pct > 10 else "عائد ضعيف") + return ( + f"مبادرة {initiative_type}: استثمار {investment:,.0f} ريال " + f"بعائد متوقع {projected_return:,.0f} ريال ({roi_pct:.1f}%). " + f"فترة الاسترداد {payback_months} شهر. التقييم: {verdict}." + ) diff --git a/salesflow-saas/backend/app/services/strategic_deals/strategic_simulator.py b/salesflow-saas/backend/app/services/strategic_deals/strategic_simulator.py new file mode 100644 index 00000000..7cf89c07 --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/strategic_simulator.py @@ -0,0 +1,596 @@ +""" +Strategic Simulator — Monte Carlo-style scenario modeling for B2B deals. +المحاكي الاستراتيجي: نمذجة سيناريوهات بأسلوب مونت كارلو للصفقات بين الشركات +""" + +import json +import logging +import uuid +from datetime import datetime, timezone +from typing import Optional + +from pydantic import BaseModel, Field +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.strategic_deal import CompanyProfile +from app.services.llm.provider import get_llm + +logger = logging.getLogger("dealix.strategic_deals.strategic_simulator") + +# ── Scenario type definitions ─────────────────────────────────────────────── + +SCENARIO_TYPES = { + "partnership": "شراكة استراتيجية", + "acquisition": "استحواذ", + "channel_expansion": "توسع قنوات التوزيع", + "market_entry": "دخول سوق جديد", + "joint_venture": "مشروع مشترك", + "franchise": "امتياز تجاري", + "divestiture": "تصفية أصول", +} + +# ── Default assumptions by scenario type ──────────────────────────────────── + +DEFAULT_ASSUMPTIONS = { + "partnership": { + "revenue_share_pct": 0.15, + "setup_cost_sar": 50_000, + "ramp_months": 3, + "success_probability": 0.65, + "annual_growth_pct": 0.10, + }, + "acquisition": { + "premium_pct": 0.25, + "integration_cost_pct": 0.15, + "synergy_savings_pct": 0.10, + "ramp_months": 12, + "success_probability": 0.50, + "annual_growth_pct": 0.15, + }, + "channel_expansion": { + "channel_setup_sar": 100_000, + "per_channel_cost_sar": 25_000, + "channels_count": 3, + "revenue_per_channel_sar": 200_000, + "ramp_months": 6, + "success_probability": 0.70, + }, + "market_entry": { + "entry_cost_sar": 500_000, + "first_year_revenue_sar": 300_000, + "market_share_target": 0.05, + "ramp_months": 12, + "success_probability": 0.45, + "annual_growth_pct": 0.20, + }, + "joint_venture": { + "equity_split": 0.50, + "total_investment_sar": 1_000_000, + "projected_revenue_sar": 2_000_000, + "ramp_months": 9, + "success_probability": 0.55, + "annual_growth_pct": 0.12, + }, + "franchise": { + "franchise_fee_sar": 200_000, + "royalty_pct": 0.06, + "unit_revenue_sar": 500_000, + "units_count": 2, + "ramp_months": 6, + "success_probability": 0.60, + }, + "divestiture": { + "asset_value_sar": 1_000_000, + "discount_pct": 0.10, + "transaction_cost_pct": 0.05, + "timeline_months": 6, + "success_probability": 0.75, + }, +} + + +# ── Models ────────────────────────────────────────────────────────────────── + + +class StrategicScenario(BaseModel): + """A fully modeled strategic scenario with financial projections.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + name: str = "" + name_ar: str = "" + scenario_type: str = "partnership" + parties: list[str] = Field(default_factory=list) + assumptions: dict = Field(default_factory=dict) + upside: dict = Field(default_factory=dict) + downside: dict = Field(default_factory=dict) + timeline_months: int = 12 + probability: float = Field(0.5, ge=0.0, le=1.0) + net_value_sar: float = 0.0 + recommendation: str = "" + recommendation_ar: str = "" + created_at: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + + class Config: + json_schema_extra = { + "example": { + "name": "Partnership with LogiPrime", + "name_ar": "شراكة مع لوجي برايم", + "scenario_type": "partnership", + "parties": ["شركتنا", "لوجي برايم"], + "probability": 0.65, + "net_value_sar": 750_000, + } + } + + +# ── Strategic Simulator Engine ────────────────────────────────────────────── + + +class StrategicSimulator: + """ + Simulates strategic scenarios, comparing outcomes and generating + Arabic-language recommendations for Saudi B2B decision-makers. + يحاكي السيناريوهات الاستراتيجية ويقارن النتائج ويولد توصيات بالعربي + """ + + def __init__(self): + self.llm = get_llm() + self._scenarios: dict[str, StrategicScenario] = {} + + # ── Simulate ──────────────────────────────────────────────────────────── + + async def simulate( + self, + scenario_type: str, + params: dict, + twin_id: Optional[str], + db: AsyncSession, + ) -> StrategicScenario: + """ + Run a full strategic simulation for a given scenario type. + تشغيل محاكاة استراتيجية كاملة لنوع سيناريو معين + """ + if scenario_type not in SCENARIO_TYPES: + raise ValueError( + f"Unknown scenario type '{scenario_type}'. " + f"Valid types: {', '.join(SCENARIO_TYPES.keys())}" + ) + + # Load acquirer profile if twin_id provided + acquirer_name = params.get("acquirer_name", "الشركة") + acquirer_revenue = float(params.get("acquirer_revenue_sar", 0)) + if twin_id: + result = await db.execute( + select(CompanyProfile).where(CompanyProfile.id == twin_id) + ) + twin = result.scalar_one_or_none() + if twin: + acquirer_name = twin.company_name or acquirer_name + acquirer_revenue = float(twin.annual_revenue_sar or acquirer_revenue) + + # Merge defaults with user-provided params + defaults = DEFAULT_ASSUMPTIONS.get(scenario_type, {}).copy() + assumptions = {**defaults, **params.get("assumptions", {})} + + # Compute financials based on scenario type + upside, downside, net_value, timeline = self._compute_financials( + scenario_type, assumptions, acquirer_revenue, + ) + + probability = min(1.0, max(0.0, float( + assumptions.get("success_probability", + defaults.get("success_probability", 0.5)) + ))) + + # Build scenario + parties = params.get("parties", [acquirer_name]) + scenario = StrategicScenario( + name=params.get("name", f"{scenario_type} scenario"), + name_ar=params.get("name_ar", SCENARIO_TYPES.get(scenario_type, scenario_type)), + scenario_type=scenario_type, + parties=parties, + assumptions=assumptions, + upside=upside, + downside=downside, + timeline_months=timeline, + probability=probability, + net_value_sar=round(net_value, 2), + ) + + # Generate Arabic recommendation via LLM + recommendation = await self._generate_scenario_recommendation(scenario) + scenario.recommendation = recommendation + scenario.recommendation_ar = recommendation + + self._scenarios[scenario.id] = scenario + + logger.info( + "Simulated scenario '%s' (type=%s): net_value=%.0f SAR, probability=%.0%%", + scenario.name, scenario_type, net_value, probability * 100, + ) + return scenario + + # ── Compare Scenarios ─────────────────────────────────────────────────── + + async def compare_scenarios( + self, + scenarios: list[StrategicScenario], + db: AsyncSession, + ) -> dict: + """ + Rank and compare multiple scenarios by expected value and risk. + ترتيب ومقارنة عدة سيناريوهات حسب القيمة المتوقعة والمخاطر + """ + if not scenarios: + return {"ranked": [], "summary_ar": "لا توجد سيناريوهات للمقارنة"} + + ranked = [] + for s in scenarios: + expected_value = s.net_value_sar * s.probability + risk_adjusted = expected_value * (1.0 - (1.0 - s.probability) * 0.5) + ranked.append({ + "id": s.id, + "name": s.name, + "name_ar": s.name_ar, + "scenario_type": s.scenario_type, + "net_value_sar": s.net_value_sar, + "probability": s.probability, + "expected_value_sar": round(expected_value, 2), + "risk_adjusted_value_sar": round(risk_adjusted, 2), + "timeline_months": s.timeline_months, + "upside_total": sum( + float(v) for v in s.upside.values() if isinstance(v, (int, float)) + ), + "downside_total": sum( + float(v) for v in s.downside.values() if isinstance(v, (int, float)) + ), + }) + + ranked.sort(key=lambda x: x["risk_adjusted_value_sar"], reverse=True) + + # Add rank + for i, item in enumerate(ranked): + item["rank"] = i + 1 + + # Generate comparison summary + best = ranked[0] + worst = ranked[-1] + + summary_ar = ( + f"تم مقارنة {len(ranked)} سيناريو. " + f"الأفضل: {best['name_ar']} بقيمة متوقعة {best['expected_value_sar']:,.0f} ريال " + f"واحتمالية نجاح {best['probability']:.0%}. " + ) + if len(ranked) > 1: + summary_ar += ( + f"الأقل جاذبية: {worst['name_ar']} بقيمة متوقعة " + f"{worst['expected_value_sar']:,.0f} ريال." + ) + + logger.info("Compared %d scenarios. Best: %s", len(ranked), best["name"]) + + return { + "ranked": ranked, + "best_scenario_id": best["id"], + "summary_ar": summary_ar, + } + + # ── Sensitivity Analysis ──────────────────────────────────────────────── + + async def sensitivity_analysis( + self, + scenario_id: str, + variable: str, + value_range: list[float], + db: AsyncSession, + ) -> list[dict]: + """ + Run sensitivity analysis on a single variable across a range of values. + تحليل الحساسية لمتغير واحد عبر نطاق من القيم + """ + base_scenario = self._scenarios.get(scenario_id) + if not base_scenario: + raise ValueError(f"Scenario {scenario_id} not found") + + if not value_range: + base_val = float(base_scenario.assumptions.get(variable, 1.0)) + value_range = [ + round(base_val * 0.5, 4), + round(base_val * 0.75, 4), + round(base_val, 4), + round(base_val * 1.25, 4), + round(base_val * 1.5, 4), + ] + + results: list[dict] = [] + for val in value_range: + modified_assumptions = base_scenario.assumptions.copy() + modified_assumptions[variable] = val + + upside, downside, net_value, timeline = self._compute_financials( + base_scenario.scenario_type, modified_assumptions, 0, + ) + + expected = net_value * base_scenario.probability + results.append({ + "variable": variable, + "value": val, + "net_value_sar": round(net_value, 2), + "expected_value_sar": round(expected, 2), + "upside_revenue": upside.get("revenue_gain_sar", 0), + "downside_cost": downside.get("total_cost_sar", 0), + "delta_from_base": round(net_value - base_scenario.net_value_sar, 2), + }) + + logger.info( + "Sensitivity analysis for scenario %s on '%s': %d data points", + scenario_id, variable, len(results), + ) + return results + + # ── Generate Recommendation ───────────────────────────────────────────── + + async def generate_recommendation( + self, + scenario_id: str, + db: AsyncSession, + ) -> str: + """ + Generate a detailed Arabic strategic recommendation for a scenario. + إنشاء توصية استراتيجية تفصيلية بالعربي لسيناريو محدد + """ + scenario = self._scenarios.get(scenario_id) + if not scenario: + raise ValueError(f"Scenario {scenario_id} not found") + + recommendation = await self._generate_scenario_recommendation(scenario) + scenario.recommendation = recommendation + scenario.recommendation_ar = recommendation + + logger.info("Generated recommendation for scenario %s", scenario_id) + return recommendation + + # ── Private: Compute Financials ───────────────────────────────────────── + + def _compute_financials( + self, + scenario_type: str, + assumptions: dict, + acquirer_revenue: float, + ) -> tuple[dict, dict, float, int]: + """Compute upside, downside, net value, and timeline from assumptions.""" + + if scenario_type == "partnership": + rev_share = float(assumptions.get("revenue_share_pct", 0.15)) + setup = float(assumptions.get("setup_cost_sar", 50_000)) + ramp = int(assumptions.get("ramp_months", 3)) + growth = float(assumptions.get("annual_growth_pct", 0.10)) + base_rev = acquirer_revenue if acquirer_revenue > 0 else 1_000_000 + + annual_gain = base_rev * rev_share + three_year = annual_gain * (1 + growth) + annual_gain * (1 + growth) ** 2 + annual_gain * (1 + growth) ** 3 + + upside = { + "revenue_gain_sar": round(annual_gain, 2), + "three_year_revenue_sar": round(three_year, 2), + "reach_expansion_pct": round(rev_share * 100, 1), + "capacity_gain_pct": round(rev_share * 50, 1), + } + downside = { + "setup_cost_sar": setup, + "annual_management_sar": round(setup * 0.3, 2), + "total_cost_sar": round(setup + setup * 0.3 * 3, 2), + "operational_burden": "متوسط", + "risk_level": "منخفض", + } + net_value = three_year - downside["total_cost_sar"] + timeline = ramp + 12 + + elif scenario_type == "acquisition": + premium = float(assumptions.get("premium_pct", 0.25)) + integration_cost = float(assumptions.get("integration_cost_pct", 0.15)) + synergy = float(assumptions.get("synergy_savings_pct", 0.10)) + target_value = float(assumptions.get("target_value_sar", acquirer_revenue * 0.3)) + ramp = int(assumptions.get("ramp_months", 12)) + + acquisition_price = target_value * (1 + premium) + integration = target_value * integration_cost + annual_synergy = target_value * synergy + + upside = { + "revenue_gain_sar": round(target_value, 2), + "annual_synergy_sar": round(annual_synergy, 2), + "three_year_synergy_sar": round(annual_synergy * 3, 2), + "market_share_gain_pct": round(target_value / max(acquirer_revenue, 1) * 100, 1), + } + downside = { + "acquisition_price_sar": round(acquisition_price, 2), + "integration_cost_sar": round(integration, 2), + "total_cost_sar": round(acquisition_price + integration, 2), + "operational_burden": "عالي", + "risk_level": "عالي", + } + net_value = upside["three_year_synergy_sar"] + target_value - downside["total_cost_sar"] + timeline = ramp + 24 + + elif scenario_type == "channel_expansion": + channel_setup = float(assumptions.get("channel_setup_sar", 100_000)) + per_channel = float(assumptions.get("per_channel_cost_sar", 25_000)) + channels = int(assumptions.get("channels_count", 3)) + rev_per_channel = float(assumptions.get("revenue_per_channel_sar", 200_000)) + ramp = int(assumptions.get("ramp_months", 6)) + + total_setup = channel_setup + per_channel * channels + annual_rev = rev_per_channel * channels + + upside = { + "revenue_gain_sar": round(annual_rev, 2), + "reach_expansion_pct": round(channels * 15, 1), + "channels_added": channels, + } + downside = { + "setup_cost_sar": round(total_setup, 2), + "annual_ops_sar": round(per_channel * channels * 0.5, 2), + "total_cost_sar": round(total_setup + per_channel * channels * 0.5, 2), + "operational_burden": "متوسط", + "risk_level": "منخفض", + } + net_value = annual_rev * 2 - downside["total_cost_sar"] + timeline = ramp + 12 + + elif scenario_type == "market_entry": + entry_cost = float(assumptions.get("entry_cost_sar", 500_000)) + first_year = float(assumptions.get("first_year_revenue_sar", 300_000)) + growth = float(assumptions.get("annual_growth_pct", 0.20)) + ramp = int(assumptions.get("ramp_months", 12)) + + three_year_rev = first_year + first_year * (1 + growth) + first_year * (1 + growth) ** 2 + + upside = { + "revenue_gain_sar": round(first_year, 2), + "three_year_revenue_sar": round(three_year_rev, 2), + "market_share_target_pct": float(assumptions.get("market_share_target", 0.05)) * 100, + } + downside = { + "entry_cost_sar": round(entry_cost, 2), + "annual_ops_sar": round(entry_cost * 0.2, 2), + "total_cost_sar": round(entry_cost + entry_cost * 0.2 * 2, 2), + "operational_burden": "عالي", + "risk_level": "عالي", + } + net_value = three_year_rev - downside["total_cost_sar"] + timeline = ramp + 24 + + elif scenario_type == "joint_venture": + equity = float(assumptions.get("equity_split", 0.50)) + investment = float(assumptions.get("total_investment_sar", 1_000_000)) + projected = float(assumptions.get("projected_revenue_sar", 2_000_000)) + ramp = int(assumptions.get("ramp_months", 9)) + + our_share = projected * equity + our_cost = investment * equity + + upside = { + "revenue_gain_sar": round(our_share, 2), + "equity_value_sar": round(our_share * 3, 2), + "reach_expansion_pct": round(equity * 100, 1), + } + downside = { + "investment_sar": round(our_cost, 2), + "annual_ops_sar": round(our_cost * 0.1, 2), + "total_cost_sar": round(our_cost + our_cost * 0.1 * 2, 2), + "operational_burden": "عالي", + "risk_level": "متوسط", + } + net_value = our_share * 2 - downside["total_cost_sar"] + timeline = ramp + 18 + + elif scenario_type == "franchise": + fee = float(assumptions.get("franchise_fee_sar", 200_000)) + royalty = float(assumptions.get("royalty_pct", 0.06)) + unit_rev = float(assumptions.get("unit_revenue_sar", 500_000)) + units = int(assumptions.get("units_count", 2)) + ramp = int(assumptions.get("ramp_months", 6)) + + annual_royalty = unit_rev * units * royalty + total_fees = fee * units + + upside = { + "revenue_gain_sar": round(annual_royalty + total_fees, 2), + "annual_royalty_sar": round(annual_royalty, 2), + "franchise_fees_sar": round(total_fees, 2), + "units_count": units, + } + downside = { + "setup_cost_sar": round(fee * 0.3 * units, 2), + "support_cost_sar": round(unit_rev * 0.02 * units, 2), + "total_cost_sar": round(fee * 0.3 * units + unit_rev * 0.02 * units * 3, 2), + "operational_burden": "متوسط", + "risk_level": "منخفض", + } + net_value = (annual_royalty * 3 + total_fees) - downside["total_cost_sar"] + timeline = ramp + 12 + + elif scenario_type == "divestiture": + asset_val = float(assumptions.get("asset_value_sar", 1_000_000)) + discount = float(assumptions.get("discount_pct", 0.10)) + tx_cost = float(assumptions.get("transaction_cost_pct", 0.05)) + ramp = int(assumptions.get("timeline_months", 6)) + + proceeds = asset_val * (1 - discount) + costs = asset_val * tx_cost + + upside = { + "proceeds_sar": round(proceeds, 2), + "cash_freed_sar": round(proceeds - costs, 2), + "operational_relief": "تخفيف عبء تشغيلي", + } + downside = { + "transaction_cost_sar": round(costs, 2), + "discount_loss_sar": round(asset_val * discount, 2), + "total_cost_sar": round(costs + asset_val * discount, 2), + "operational_burden": "منخفض", + "risk_level": "منخفض", + } + net_value = proceeds - costs + timeline = ramp + + else: + upside = {"revenue_gain_sar": 0} + downside = {"total_cost_sar": 0} + net_value = 0 + timeline = 12 + + return upside, downside, round(net_value, 2), timeline + + # ── Private: Generate Recommendation ──────────────────────────────────── + + async def _generate_scenario_recommendation( + self, scenario: StrategicScenario, + ) -> str: + """Generate an Arabic recommendation for a scenario using LLM.""" + type_ar = SCENARIO_TYPES.get(scenario.scenario_type, scenario.scenario_type) + + context = f"""نوع السيناريو: {type_ar} +الأطراف: {', '.join(scenario.parties)} +الافتراضات: {json.dumps(scenario.assumptions, ensure_ascii=False)} +الجانب الإيجابي: {json.dumps(scenario.upside, ensure_ascii=False)} +الجانب السلبي: {json.dumps(scenario.downside, ensure_ascii=False)} +المدة الزمنية: {scenario.timeline_months} شهر +احتمالية النجاح: {scenario.probability:.0%} +صافي القيمة: {scenario.net_value_sar:,.0f} ريال سعودي""" + + system_prompt = """أنت مستشار استراتيجي سعودي خبير. اكتب توصية تنفيذية واضحة بالعربي. + +يجب أن تشمل: +١. ملخص تنفيذي في سطرين +٢. المبرر الاستراتيجي +٣. المخاطر الرئيسية وطرق التخفيف +٤. التوصية النهائية (تنفيذ / تأجيل / رفض) مع المبررات +٥. الخطوات التالية إذا كانت التوصية بالتنفيذ + +اكتب بأسلوب مهني رسمي مناسب لعرضه على الإدارة التنفيذية.""" + + try: + llm_response = await self.llm.complete( + system_prompt=system_prompt, + user_message=context, + temperature=0.3, + ) + return llm_response.content.strip() + except Exception as exc: + logger.warning("LLM recommendation generation failed: %s", exc) + if scenario.net_value_sar > 0 and scenario.probability >= 0.5: + verdict = "يُنصح بالتنفيذ" + elif scenario.net_value_sar > 0: + verdict = "يُنصح بمزيد من الدراسة قبل التنفيذ" + else: + verdict = "لا يُنصح بالتنفيذ في الوقت الحالي" + + return ( + f"توصية — {type_ar}\n" + f"صافي القيمة المتوقعة: {scenario.net_value_sar:,.0f} ريال\n" + f"احتمالية النجاح: {scenario.probability:.0%}\n" + f"المدة الزمنية: {scenario.timeline_months} شهر\n" + f"القرار: {verdict}" + )