diff --git a/salesflow-saas/backend/app/api/v1/intelligence.py b/salesflow-saas/backend/app/api/v1/intelligence.py index 00c85252..04b9abee 100644 --- a/salesflow-saas/backend/app/api/v1/intelligence.py +++ b/salesflow-saas/backend/app/api/v1/intelligence.py @@ -1,22 +1,348 @@ """ -Dealix Full API: Lead Pipeline + Autonomous Core + Intelligence Reports +Intelligence API — Signals, alerts, behaviour patterns, recommendations, +escalations. Wires the signal_intelligence, alert_delivery and +behavior_intelligence services into FastAPI endpoints. """ -from fastapi import APIRouter, BackgroundTasks, HTTPException -from pydantic import BaseModel -from typing import Optional -import os -router = APIRouter(prefix="/intelligence", tags=["🧠 Intelligence"]) +from __future__ import annotations + +import logging +import uuid +from datetime import datetime, timezone +from typing import List, Optional + +from fastapi import APIRouter, BackgroundTasks, HTTPException, Query +from pydantic import BaseModel, Field + +from app.services.signal_intelligence import ( + SignalSource, + SignalEvent, + SignalFilter, + Watchlist, + get_signal_intelligence, +) +from app.services.alert_delivery import ( + AlertUrgency, + get_alert_delivery, +) +from app.services.behavior_intelligence import ( + get_behavior_intelligence, +) + +logger = logging.getLogger("dealix.api.intelligence") + +router = APIRouter(prefix="/intelligence", tags=["Intelligence"]) + +# --------------------------------------------------------------------------- +# Request / Response schemas +# --------------------------------------------------------------------------- -def _groq_key(): - key = os.getenv("GROQ_API_KEY", "") - if not key: - raise HTTPException(500, "GROQ_API_KEY missing") - return key +class IngestRequest(BaseModel): + source: SignalSource + payload: dict + tenant_id: str + + +class WatchlistCreate(BaseModel): + tenant_id: str + name: str + name_ar: str + entity_type: str + entity_ids: List[str] = [] + keywords: List[str] = [] + alert_threshold: float = 0.5 + channels: List[str] = Field(default=["dashboard"]) + + +class AcknowledgeRequest(BaseModel): + user_id: str + + +class EscalationResolveRequest(BaseModel): + user_id: str + resolution: str + resolution_ar: str = "" + + +class _Escalation(BaseModel): + id: str = Field(default_factory=lambda: uuid.uuid4().hex) + tenant_id: str + title: str + title_ar: str + reason: str + reason_ar: str + entity_type: str = "" + entity_id: str = "" + assigned_to: Optional[str] = None + status: str = "open" # open, resolved + resolved_by: Optional[str] = None + resolved_at: Optional[datetime] = None + resolution: Optional[str] = None + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +# In-memory escalation store (per-tenant) +_escalations: dict[str, list[_Escalation]] = {} + + +# --------------------------------------------------------------------------- +# Signals +# --------------------------------------------------------------------------- + + +@router.post("/signals/ingest", summary="Ingest a raw signal event") +async def ingest_signal(req: IngestRequest): + """Normalise, score, deduplicate and store a signal from any source.""" + engine = get_signal_intelligence() + event = await engine.ingest(req.source, req.payload, req.tenant_id) + return event.model_dump() + + +@router.get("/signals", summary="List recent signals with filters") +async def list_signals( + tenant_id: str, + source: Optional[SignalSource] = None, + entity_type: Optional[str] = None, + entity_id: Optional[str] = None, + min_importance: float = 0.0, + sentiment: Optional[str] = None, + limit: int = Query(default=50, le=200), +): + """Return signals matching the provided filters, most recent first.""" + engine = get_signal_intelligence() + filters = SignalFilter( + source=source, + entity_type=entity_type, + entity_id=entity_id, + min_importance=min_importance, + sentiment=sentiment, + limit=limit, + ) + events = await engine.get_signals(tenant_id, filters) + return { + "count": len(events), + "signals": [e.model_dump() for e in events], + } + + +@router.get( + "/signals/{entity_type}/{entity_id}", + summary="Signal summary for a specific entity", +) +async def entity_signal_summary( + entity_type: str, + entity_id: str, + tenant_id: str, + hours: int = Query(default=24, le=720), +): + """Summarise all signals for a given entity in the last N hours.""" + engine = get_signal_intelligence() + summary = await engine.get_entity_summary(entity_type, entity_id, tenant_id, hours) + return summary + + +# --------------------------------------------------------------------------- +# Watchlists +# --------------------------------------------------------------------------- + + +@router.post("/watchlists", summary="Create a signal watchlist") +async def create_watchlist(body: WatchlistCreate): + """Create a watchlist that triggers alerts when matching signals arrive.""" + engine = get_signal_intelligence() + wl = Watchlist( + tenant_id=body.tenant_id, + name=body.name, + name_ar=body.name_ar, + entity_type=body.entity_type, + entity_ids=body.entity_ids, + keywords=body.keywords, + alert_threshold=body.alert_threshold, + channels=body.channels, + ) + created = await engine.create_watchlist(wl) + return created.model_dump() + + +@router.get("/watchlists", summary="List active watchlists") +async def list_watchlists(tenant_id: str): + engine = get_signal_intelligence() + items = await engine.get_watchlists(tenant_id) + return {"count": len(items), "watchlists": [w.model_dump() for w in items]} + + +# --------------------------------------------------------------------------- +# Alerts +# --------------------------------------------------------------------------- + + +@router.get("/alerts", summary="Pending alerts for a user") +async def pending_alerts( + tenant_id: str, + user_id: Optional[str] = None, +): + """Return all unacknowledged alerts, optionally filtered by user.""" + delivery = get_alert_delivery() + alerts = await delivery.get_pending(tenant_id, user_id) + return { + "count": len(alerts), + "alerts": [a.model_dump() for a in alerts], + } + + +@router.post("/alerts/{alert_id}/acknowledge", summary="Acknowledge an alert") +async def acknowledge_alert(alert_id: str, body: AcknowledgeRequest): + delivery = get_alert_delivery() + ok = await delivery.acknowledge(alert_id, body.user_id) + if not ok: + raise HTTPException(404, "التنبيه غير موجود") + return {"acknowledged": True, "alert_id": alert_id} + + +@router.get("/alerts/stats", summary="Alert delivery statistics") +async def alert_stats(tenant_id: str): + delivery = get_alert_delivery() + return await delivery.get_delivery_stats(tenant_id) + + +# --------------------------------------------------------------------------- +# Digest +# --------------------------------------------------------------------------- + + +@router.get("/digest", summary="Generate Arabic alert digest") +async def generate_digest( + tenant_id: str, + user_id: Optional[str] = None, + period: str = Query(default="daily", regex="^(daily|weekly)$"), +): + """Compile unacknowledged alerts into an Arabic summary.""" + delivery = get_alert_delivery() + return await delivery.generate_digest(tenant_id, user_id, period) + + +# --------------------------------------------------------------------------- +# Behavior Patterns +# --------------------------------------------------------------------------- + + +@router.get("/patterns", summary="Detected behaviour patterns") +async def detected_patterns(tenant_id: str): + """Return all detected patterns: rep performance, sequences, risks.""" + bi = get_behavior_intelligence() + + rep = await bi.analyze_rep_performance(tenant_id) + seq = await bi.analyze_winning_sequences(tenant_id) + risk = await bi.detect_at_risk_patterns(tenant_id) + timing = await bi.analyze_best_contact_times(tenant_id) + + all_patterns = [p.model_dump() for p in rep + seq + risk] + + return { + "count": len(all_patterns), + "patterns": all_patterns, + "best_contact_times": timing, + } + + +@router.get("/recommendations", summary="AI recommendations in Arabic") +async def recommendations(tenant_id: str): + """Generate actionable Arabic recommendations based on detected patterns.""" + bi = get_behavior_intelligence() + recs = await bi.get_recommendations(tenant_id) + return { + "count": len(recs), + "recommendations": recs, + } + + +# --------------------------------------------------------------------------- +# Escalations +# --------------------------------------------------------------------------- + + +@router.get("/escalations", summary="Pending escalations") +async def pending_escalations(tenant_id: str): + """List all open escalations for a tenant.""" + items = [e for e in _escalations.get(tenant_id, []) if e.status == "open"] + return { + "count": len(items), + "escalations": [e.model_dump() for e in items], + } + + +@router.post("/escalations/{escalation_id}/resolve", summary="Resolve an escalation") +async def resolve_escalation(escalation_id: str, body: EscalationResolveRequest): + """Mark an escalation as resolved with a resolution note.""" + for esc_list in _escalations.values(): + for esc in esc_list: + if esc.id == escalation_id: + if esc.status == "resolved": + return {"already_resolved": True, "id": escalation_id} + esc.status = "resolved" + esc.resolved_by = body.user_id + esc.resolved_at = datetime.now(timezone.utc) + esc.resolution = body.resolution or body.resolution_ar + logger.info( + "Escalation %s resolved by %s", + escalation_id[:8], body.user_id[:8], + ) + return {"resolved": True, "id": escalation_id} + + raise HTTPException(404, "التصعيد غير موجود") + + +# --------------------------------------------------------------------------- +# Helper: create escalation (called internally by signal/alert pipeline) +# --------------------------------------------------------------------------- + + +async def create_escalation( + tenant_id: str, + title: str, + title_ar: str, + reason: str, + reason_ar: str, + entity_type: str = "", + entity_id: str = "", + assigned_to: Optional[str] = None, +) -> dict: + """Programmatic escalation creation (not exposed as public endpoint).""" + esc = _Escalation( + tenant_id=tenant_id, + title=title, + title_ar=title_ar, + reason=reason, + reason_ar=reason_ar, + entity_type=entity_type, + entity_id=entity_id, + assigned_to=assigned_to, + ) + _escalations.setdefault(tenant_id, []).insert(0, esc) + + # Fire an alert via the delivery service + delivery = get_alert_delivery() + await delivery.send_from_template( + template_key="escalation", + tenant_id=tenant_id, + urgency=AlertUrgency.HIGH, + category="compliance", + user_id=assigned_to, + requires_ack=True, + title=title_ar, + reason=reason_ar, + ) + + logger.info("Escalation created: %s for tenant %s", esc.id[:8], tenant_id[:8]) + return esc.model_dump() + + +# --------------------------------------------------------------------------- +# Legacy endpoints preserved for backward compatibility +# --------------------------------------------------------------------------- -# ── Lead Pipeline ───────────────────────────────────────────── class LeadInput(BaseModel): id: str = "lead_001" contact_name: str @@ -36,47 +362,46 @@ class MeetingReport(BaseModel): outcome: str = "follow_up_needed" +def _groq_key(): + import os + key = os.getenv("GROQ_API_KEY", "") + if not key: + raise HTTPException(500, "GROQ_API_KEY missing") + return key + + @router.post("/run-pipeline") async def run_lead_pipeline(lead_input: LeadInput): - """🎯 Complete Lead-to-Meeting pipeline in one API call.""" from app.services.lead_pipeline import DealixLeadPipeline, Lead, Company - pipeline = DealixLeadPipeline(_groq_key()) lead = Lead( id=lead_input.id, contact_name=lead_input.contact_name, contact_phone=lead_input.contact_phone, contact_title=lead_input.contact_title, - company=Company( - name=lead_input.company_name, - website=lead_input.company_website - ), - source=lead_input.source + company=Company(name=lead_input.company_name, website=lead_input.company_website), + source=lead_input.source, ) return await pipeline.run_full_pipeline(lead) @router.post("/executive-report") async def generate_executive_report(report_data: MeetingReport): - """📋 Generate post-meeting executive report with company analysis.""" from app.services.lead_pipeline import DealixLeadPipeline, Lead, Company - pipeline = DealixLeadPipeline(_groq_key()) lead = Lead( id=report_data.lead_id, contact_name=report_data.contact_name, contact_phone=report_data.contact_phone, - company=Company(name=report_data.company_name) + company=Company(name=report_data.company_name), ) return await pipeline.generate_executive_report( - lead, report_data.meeting_notes, report_data.outcome + lead, report_data.meeting_notes, report_data.outcome, ) -# ── Autonomous Intelligence ─────────────────────────────────── @router.get("/system-report") async def get_system_intelligence_report(): - """🔮 Full autonomous intelligence + financial + strategic report.""" from app.services.autonomous_core import get_autonomous_core core = get_autonomous_core(_groq_key()) return await core.get_full_intelligence_report() @@ -84,7 +409,6 @@ async def get_system_intelligence_report(): @router.post("/improve") async def trigger_self_improvement(background_tasks: BackgroundTasks): - """⚡ Trigger autonomous self-improvement cycle.""" from app.services.autonomous_core import get_autonomous_core core = get_autonomous_core(_groq_key()) @@ -97,45 +421,39 @@ async def trigger_self_improvement(background_tasks: BackgroundTasks): @router.get("/financial-forecast") async def get_financial_forecast(): - """💰 AI-powered financial forecast and pipeline valuation.""" from app.services.autonomous_core import get_autonomous_core core = get_autonomous_core(_groq_key()) return await core.financial.generate_financial_forecast({ - "timestamp": "now", - "pipeline": "active" + "timestamp": "now", "pipeline": "active", }) @router.get("/market-expansion") async def get_expansion_opportunities(): - """🌍 Strategic market expansion opportunities for Saudi Arabia.""" from app.services.autonomous_core import get_autonomous_core core = get_autonomous_core(_groq_key()) return await core.strategic.analyze_market_opportunity({ "market": "Saudi Arabia", - "current_sectors": ["عقارات", "تقنية", "صحة"] + "current_sectors": ["عقارات", "تقنية", "صحة"], }) @router.get("/growth-plan") async def get_90_day_growth_plan(): - """📈 Autonomous 90-day growth plan generation.""" from app.services.autonomous_core import get_autonomous_core core = get_autonomous_core(_groq_key()) return await core.strategic.generate_growth_plan({ - "current_stage": "early_growth", - "market": "KSA" + "current_stage": "early_growth", "market": "KSA", }) @router.get("/health") async def system_health(): - """❤️ System health and auto-healing status.""" from app.services.autonomous_core import get_autonomous_core core = get_autonomous_core(_groq_key()) return { "health": core.healer.get_system_health(), "autonomous_cycle": core._cycle_count, "improvements_applied": len(core.improver.improvements_log), - "status": "AUTONOMOUS_RUNNING — لا يتوقف أبداً" + "status": "AUTONOMOUS_RUNNING", } diff --git a/salesflow-saas/backend/app/services/alert_delivery.py b/salesflow-saas/backend/app/services/alert_delivery.py new file mode 100644 index 00000000..8ade7d27 --- /dev/null +++ b/salesflow-saas/backend/app/services/alert_delivery.py @@ -0,0 +1,423 @@ +""" +Alert Delivery Service — Multi-channel alert routing with urgency-based +channel selection, acknowledgement tracking, and Arabic digest generation. + +Channel routing matrix: + CRITICAL : dashboard + whatsapp + email + sms + HIGH : dashboard + whatsapp + MEDIUM : dashboard + email + LOW : dashboard (collected for daily digest) +""" + +from __future__ import annotations + +import logging +import uuid +from collections import defaultdict +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger("dealix.services.alert_delivery") + +# --------------------------------------------------------------------------- +# Enums & Models +# --------------------------------------------------------------------------- + + +class AlertUrgency(str, Enum): + CRITICAL = "critical" + HIGH = "high" + MEDIUM = "medium" + LOW = "low" + + +class AlertChannel(str, Enum): + DASHBOARD = "dashboard" + EMAIL = "email" + WHATSAPP = "whatsapp" + SMS = "sms" + TELEGRAM = "telegram" + + +class Alert(BaseModel): + id: str = Field(default_factory=lambda: uuid.uuid4().hex) + tenant_id: str + user_id: Optional[str] = None + title: str + title_ar: str + body: str + body_ar: str + urgency: AlertUrgency = AlertUrgency.MEDIUM + category: str = "system" # lead, deal, system, compliance, security + channels: List[AlertChannel] = [AlertChannel.DASHBOARD] + action_url: Optional[str] = None + action_label: Optional[str] = None + requires_acknowledgement: bool = False + acknowledged_at: Optional[datetime] = None + delivered_channels: List[str] = [] + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + metadata: Dict[str, Any] = {} + + +# --------------------------------------------------------------------------- +# Arabic alert templates +# --------------------------------------------------------------------------- + +ALERT_TEMPLATES: Dict[str, Dict[str, str]] = { + "new_lead": { + "title_ar": "عميل محتمل جديد", + "body_ar": "عميل محتمل جديد: {name} من {source}", + }, + "deal_won": { + "title_ar": "صفقة ناجحة", + "body_ar": "تم إغلاق صفقة: {title} بقيمة {value} ر.س", + }, + "deal_at_risk": { + "title_ar": "صفقة معرضة للخطر", + "body_ar": "صفقة معرضة للخطر: {title} - لا نشاط منذ {days} أيام", + }, + "consent_expiring": { + "title_ar": "موافقة PDPL تنتهي قريبا", + "body_ar": "موافقة PDPL تنتهي خلال {days} أيام للعميل {name}", + }, + "escalation": { + "title_ar": "تصعيد يتطلب انتباهك", + "body_ar": "يحتاج تدخلك: {title} - {reason}", + }, + "sequence_complete": { + "title_ar": "تسلسل مكتمل", + "body_ar": "اكتمل تسلسل {name} للعميل {lead_name}", + }, + "meeting_booked": { + "title_ar": "موعد جديد", + "body_ar": "تم حجز موعد مع {name} في {time}", + }, + "competitor_alert": { + "title_ar": "تنبيه منافس", + "body_ar": "تغيير من المنافس {competitor}: {detail}", + }, +} + +# Channel routing per urgency +_CHANNEL_MATRIX: Dict[AlertUrgency, List[AlertChannel]] = { + AlertUrgency.CRITICAL: [ + AlertChannel.DASHBOARD, AlertChannel.WHATSAPP, + AlertChannel.EMAIL, AlertChannel.SMS, + ], + AlertUrgency.HIGH: [AlertChannel.DASHBOARD, AlertChannel.WHATSAPP], + AlertUrgency.MEDIUM: [AlertChannel.DASHBOARD, AlertChannel.EMAIL], + AlertUrgency.LOW: [AlertChannel.DASHBOARD], +} + + +# --------------------------------------------------------------------------- +# Channel dispatchers (thin wrappers — production would call real adapters) +# --------------------------------------------------------------------------- + +async def _dispatch_dashboard(alert: Alert) -> bool: + logger.info( + "[DASHBOARD] tenant=%s user=%s title=%s", + alert.tenant_id[:8], (alert.user_id or "broadcast")[:8], alert.title_ar, + ) + return True + + +async def _dispatch_email(alert: Alert) -> bool: + logger.info( + "[EMAIL] tenant=%s user=%s subject=%s", + alert.tenant_id[:8], (alert.user_id or "broadcast")[:8], alert.title_ar, + ) + return True + + +async def _dispatch_whatsapp(alert: Alert) -> bool: + logger.info( + "[WHATSAPP] tenant=%s user=%s body=%s", + alert.tenant_id[:8], (alert.user_id or "broadcast")[:8], alert.body_ar[:60], + ) + return True + + +async def _dispatch_sms(alert: Alert) -> bool: + logger.info( + "[SMS] tenant=%s user=%s body=%s", + alert.tenant_id[:8], (alert.user_id or "broadcast")[:8], alert.body_ar[:60], + ) + return True + + +async def _dispatch_telegram(alert: Alert) -> bool: + logger.info( + "[TELEGRAM] tenant=%s user=%s body=%s", + alert.tenant_id[:8], (alert.user_id or "broadcast")[:8], alert.body_ar[:60], + ) + return True + + +_DISPATCHERS = { + AlertChannel.DASHBOARD: _dispatch_dashboard, + AlertChannel.EMAIL: _dispatch_email, + AlertChannel.WHATSAPP: _dispatch_whatsapp, + AlertChannel.SMS: _dispatch_sms, + AlertChannel.TELEGRAM: _dispatch_telegram, +} + + +# --------------------------------------------------------------------------- +# Core Service +# --------------------------------------------------------------------------- + + +class AlertDelivery: + """ + Multi-channel alert delivery with urgency-based routing, acknowledgement + tracking, digest generation and delivery statistics. + """ + + def __init__(self) -> None: + # tenant_id -> list[Alert] (most recent first) + self._alerts: Dict[str, List[Alert]] = defaultdict(list) + # delivery stats counters + self._stats: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int)) + + # ── Send ────────────────────────────────────────────────── + + async def send(self, alert: Alert) -> Dict[str, Any]: + """Route alert to channels based on urgency, deliver, and persist.""" + # Determine channels from urgency matrix, merged with explicit overrides + urgency_channels = _CHANNEL_MATRIX.get(alert.urgency, [AlertChannel.DASHBOARD]) + target_channels = list(set(urgency_channels) | set(alert.channels)) + + delivered: List[str] = [] + failed: List[str] = [] + + for ch in target_channels: + ok = await self.send_to_channel(alert, ch) + if ok: + delivered.append(ch.value) + self._stats[alert.tenant_id][ch.value] += 1 + else: + failed.append(ch.value) + + alert.delivered_channels = delivered + self._alerts[alert.tenant_id].insert(0, alert) + + # Cap buffer + if len(self._alerts[alert.tenant_id]) > 10_000: + self._alerts[alert.tenant_id] = self._alerts[alert.tenant_id][:10_000] + + self._stats[alert.tenant_id]["total"] += 1 + + logger.info( + "Alert %s [%s] delivered via %s for tenant %s", + alert.id[:8], alert.urgency.value, + ", ".join(delivered) or "none", alert.tenant_id[:8], + ) + + return { + "alert_id": alert.id, + "urgency": alert.urgency.value, + "delivered": delivered, + "failed": failed, + } + + async def send_to_channel(self, alert: Alert, channel: AlertChannel) -> bool: + """Dispatch to a single channel. Returns success bool.""" + dispatcher = _DISPATCHERS.get(channel) + if not dispatcher: + logger.warning("No dispatcher for channel %s", channel.value) + return False + try: + return await dispatcher(alert) + except Exception: + logger.exception("Channel %s dispatch failed for alert %s", channel.value, alert.id[:8]) + return False + + # ── Templates ───────────────────────────────────────────── + + async def send_from_template( + self, + template_key: str, + tenant_id: str, + urgency: AlertUrgency, + category: str = "system", + user_id: Optional[str] = None, + action_url: Optional[str] = None, + requires_ack: bool = False, + **kwargs: Any, + ) -> Dict[str, Any]: + """Build and send an alert from a named Arabic template.""" + tpl = ALERT_TEMPLATES.get(template_key) + if not tpl: + logger.error("Unknown alert template: %s", template_key) + return {"error": f"Unknown template: {template_key}"} + + title_ar = tpl["title_ar"] + body_ar = tpl["body_ar"].format_map(defaultdict(lambda: "—", **kwargs)) + + alert = Alert( + tenant_id=tenant_id, + user_id=user_id, + title=template_key.replace("_", " ").title(), + title_ar=title_ar, + body=body_ar, + body_ar=body_ar, + urgency=urgency, + category=category, + action_url=action_url, + requires_acknowledgement=requires_ack, + metadata=dict(kwargs), + ) + return await self.send(alert) + + # ── Acknowledgement ─────────────────────────────────────── + + async def acknowledge(self, alert_id: str, user_id: str) -> bool: + """Mark an alert as acknowledged by a user.""" + for alerts in self._alerts.values(): + for alert in alerts: + if alert.id == alert_id: + if alert.acknowledged_at: + return True # already acked + alert.acknowledged_at = datetime.now(timezone.utc) + logger.info( + "Alert %s acknowledged by user %s", + alert_id[:8], user_id[:8], + ) + return True + return False + + # ── Digest ──────────────────────────────────────────────── + + async def generate_digest( + self, + tenant_id: str, + user_id: Optional[str] = None, + period: str = "daily", + ) -> Dict[str, Any]: + """Compile unacknowledged alerts into an Arabic summary digest.""" + hours = 24 if period == "daily" else 168 # weekly + cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) + + pending = [ + a for a in self._alerts.get(tenant_id, []) + if a.acknowledged_at is None + and a.created_at >= cutoff + and (user_id is None or a.user_id is None or a.user_id == user_id) + ] + + if not pending: + return { + "tenant_id": tenant_id, + "period": period, + "count": 0, + "digest_ar": "لا توجد تنبيهات جديدة", + "alerts": [], + } + + # Group by category + by_category: Dict[str, List[Alert]] = defaultdict(list) + for a in pending: + by_category[a.category].append(a) + + category_labels = { + "lead": "العملاء المحتملون", + "deal": "الصفقات", + "system": "النظام", + "compliance": "الامتثال", + "security": "الأمان", + } + + lines: List[str] = [] + lines.append(f"ملخص التنبيهات — {'يومي' if period == 'daily' else 'أسبوعي'}") + lines.append(f"إجمالي التنبيهات: {len(pending)}") + lines.append("") + + critical_count = sum(1 for a in pending if a.urgency == AlertUrgency.CRITICAL) + high_count = sum(1 for a in pending if a.urgency == AlertUrgency.HIGH) + if critical_count: + lines.append(f"تنبيهات حرجة: {critical_count}") + if high_count: + lines.append(f"تنبيهات عالية الأهمية: {high_count}") + lines.append("") + + for cat, cat_alerts in by_category.items(): + label = category_labels.get(cat, cat) + lines.append(f"— {label} ({len(cat_alerts)}):") + for a in cat_alerts[:10]: + urgency_marker = "" + if a.urgency == AlertUrgency.CRITICAL: + urgency_marker = " [حرج]" + elif a.urgency == AlertUrgency.HIGH: + urgency_marker = " [مهم]" + lines.append(f" - {a.title_ar}{urgency_marker}") + if len(cat_alerts) > 10: + lines.append(f" ... و {len(cat_alerts) - 10} تنبيهات أخرى") + + digest_text = "\n".join(lines) + + return { + "tenant_id": tenant_id, + "user_id": user_id, + "period": period, + "count": len(pending), + "critical": critical_count, + "high": high_count, + "digest_ar": digest_text, + "alerts": [a.model_dump() for a in pending[:50]], + } + + # ── Queries ─────────────────────────────────────────────── + + async def get_pending( + self, tenant_id: str, user_id: Optional[str] = None + ) -> List[Alert]: + """Return unacknowledged alerts for a user (or all if user_id is None).""" + return [ + a for a in self._alerts.get(tenant_id, []) + if a.acknowledged_at is None + and (user_id is None or a.user_id is None or a.user_id == user_id) + ] + + async def get_delivery_stats(self, tenant_id: str) -> Dict[str, Any]: + """Return delivery statistics for a tenant.""" + stats = dict(self._stats.get(tenant_id, {})) + total = stats.get("total", 0) + + alerts = self._alerts.get(tenant_id, []) + acked = sum(1 for a in alerts if a.acknowledged_at is not None) + pending = sum(1 for a in alerts if a.acknowledged_at is None) + + urgency_counts: Dict[str, int] = defaultdict(int) + category_counts: Dict[str, int] = defaultdict(int) + for a in alerts: + urgency_counts[a.urgency.value] += 1 + category_counts[a.category] += 1 + + return { + "tenant_id": tenant_id, + "total_sent": total, + "acknowledged": acked, + "pending": pending, + "ack_rate": round(acked / max(total, 1) * 100, 1), + "by_channel": {k: v for k, v in stats.items() if k != "total"}, + "by_urgency": dict(urgency_counts), + "by_category": dict(category_counts), + } + + +# --------------------------------------------------------------------------- +# Module-level singleton +# --------------------------------------------------------------------------- + +_instance: Optional[AlertDelivery] = None + + +def get_alert_delivery() -> AlertDelivery: + global _instance + if _instance is None: + _instance = AlertDelivery() + return _instance diff --git a/salesflow-saas/backend/app/services/autopilot.py b/salesflow-saas/backend/app/services/autopilot.py new file mode 100644 index 00000000..9f6256e4 --- /dev/null +++ b/salesflow-saas/backend/app/services/autopilot.py @@ -0,0 +1,545 @@ +""" +Autopilot Layer — Dealix AI Revenue OS +======================================== +نظام الطيار الآلي: تشغيل مهام CRM بشكل مستقل وآمن. +- أوضاع متعددة: محاكاة، توصية، مسودة، موافقة، مستقل +- حدود ميزانية وحماية من التجاوز +- نقاط تفتيش وإمكانية الإيقاف والاستئناف +""" +from __future__ import annotations + +import asyncio +import logging +import uuid +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import Any, Callable, Coroutine, Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +# ── Enums ─────────────────────────────────────────────────────────── + +class AutopilotMode(str, Enum): + SIMULATION = "simulation" + RECOMMENDATION = "recommendation" + DRAFT = "draft" + APPROVAL_GATED = "approval_gated" + AUTONOMOUS = "autonomous" + + +class RunStatus(str, Enum): + RUNNING = "running" + PAUSED = "paused" + COMPLETED = "completed" + FAILED = "failed" + ABORTED = "aborted" + AWAITING_APPROVAL = "awaiting_approval" + + +AUTOPILOT_STEPS = [ + "monitor", "detect", "classify", "decide", "propose", "approve", "execute", "verify", "log", +] + + +# ── Models ────────────────────────────────────────────────────────── + +class AutopilotBudget(BaseModel): + api_calls: int = 100 + messages: int = 50 + max_duration_minutes: int = 30 + api_calls_used: int = 0 + messages_used: int = 0 + + def consume_api_call(self) -> bool: + if self.api_calls_used >= self.api_calls: + return False + self.api_calls_used += 1 + return True + + def consume_message(self) -> bool: + if self.messages_used >= self.messages: + return False + self.messages_used += 1 + return True + + @property + def exhausted(self) -> bool: + return self.api_calls_used >= self.api_calls + + +class PendingApproval(BaseModel): + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + action: str + description_ar: str + params: dict[str, Any] = {} + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + approved: Optional[bool] = None + approved_by: Optional[str] = None + + +class SideEffect(BaseModel): + action: str + target: str + detail: str + occurred_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +class AutopilotUnit(BaseModel): + run_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + agent_id: str = "" + tenant_id: str = "" + task_type: str = "" + mode: AutopilotMode = AutopilotMode.SIMULATION + status: RunStatus = RunStatus.RUNNING + current_step: str = "monitor" + confidence: float = 0.0 + pending_approvals: list[PendingApproval] = [] + side_effects: list[SideEffect] = [] + checkpoint: dict[str, Any] = {} + budget: AutopilotBudget = Field(default_factory=AutopilotBudget) + result_data: dict[str, Any] = {} + error: Optional[str] = None + started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + completed_at: Optional[datetime] = None + + +class AutopilotPolicy(BaseModel): + max_api_calls: int = 100 + max_messages_per_hour: int = 50 + max_run_duration_minutes: int = 30 + require_approval_for: list[str] = Field(default_factory=lambda: [ + "send_message", "update_deal", "assign_lead", + ]) + forbidden_actions: list[str] = Field(default_factory=lambda: [ + "delete_data", "change_permissions", "bulk_send", + ]) + kill_switch_enabled: bool = True + + +class AutopilotResult(BaseModel): + run_id: str + task_type: str + mode: AutopilotMode + status: RunStatus + steps_completed: list[str] = [] + findings: list[dict[str, Any]] = [] + actions_taken: list[dict[str, Any]] = [] + actions_proposed: list[dict[str, Any]] = [] + side_effects: list[SideEffect] = [] + confidence: float = 0.0 + duration_ms: int = 0 + summary_ar: str = "" + + +# ── Task Handlers ─────────────────────────────────────────────────── + +async def _task_follow_up_dormant_leads( + unit: AutopilotUnit, policy: AutopilotPolicy, +) -> None: + unit.current_step = "monitor" + unit.checkpoint["step"] = "monitor" + unit.budget.consume_api_call() + + dormant = [ + {"lead_id": "L001", "name": "أحمد المطيري", "days_inactive": 5}, + {"lead_id": "L002", "name": "فاطمة العتيبي", "days_inactive": 4}, + {"lead_id": "L003", "name": "محمد القحطاني", "days_inactive": 3}, + ] + unit.result_data["dormant_leads"] = dormant + + unit.current_step = "detect" + unit.checkpoint["step"] = "detect" + unit.result_data["detected_count"] = len(dormant) + + unit.current_step = "classify" + unit.checkpoint["step"] = "classify" + for lead in dormant: + lead["urgency"] = "high" if lead["days_inactive"] >= 5 else "medium" + + unit.current_step = "decide" + unit.confidence = 0.78 + drafts = [] + for lead in dormant: + drafts.append({ + "lead_id": lead["lead_id"], + "action": "send_follow_up", + "message_ar": f"مرحباً {lead['name']}، نود متابعة محادثتنا السابقة. هل لديك أي أسئلة؟", + "channel": "whatsapp", + }) + + unit.current_step = "propose" + unit.result_data["proposed_actions"] = drafts + unit.checkpoint["step"] = "propose" + + if unit.mode in (AutopilotMode.SIMULATION, AutopilotMode.RECOMMENDATION): + return + + if unit.mode == AutopilotMode.DRAFT: + unit.result_data["drafts_created"] = len(drafts) + return + + if unit.mode == AutopilotMode.APPROVAL_GATED: + for draft in drafts: + if "send_message" in policy.require_approval_for: + unit.pending_approvals.append(PendingApproval( + action="send_follow_up", + description_ar=f"إرسال متابعة لـ {draft['lead_id']}", + params=draft, + )) + unit.status = RunStatus.AWAITING_APPROVAL + return + + unit.current_step = "execute" + for draft in drafts: + if not unit.budget.consume_message(): + unit.error = "تم تجاوز حد الرسائل المسموح" + break + unit.side_effects.append(SideEffect( + action="send_whatsapp", target=draft["lead_id"], + detail=draft["message_ar"][:100], + )) + unit.result_data["messages_sent"] = len(unit.side_effects) + + unit.current_step = "verify" + unit.checkpoint["step"] = "verify" + + +async def _task_qualify_new_leads( + unit: AutopilotUnit, policy: AutopilotPolicy, +) -> None: + unit.current_step = "monitor" + unit.budget.consume_api_call() + + new_leads = [ + {"lead_id": "L010", "name": "سارة الحربي", "source": "website"}, + {"lead_id": "L011", "name": "خالد الشمري", "source": "whatsapp"}, + ] + unit.result_data["new_leads"] = new_leads + + unit.current_step = "detect" + unit.result_data["detected_count"] = len(new_leads) + + unit.current_step = "classify" + scored = [] + for lead in new_leads: + unit.budget.consume_api_call() + scored.append({**lead, "score": 65, "qualified": True, "tier": "B"}) + unit.result_data["scored_leads"] = scored + + unit.current_step = "decide" + unit.confidence = 0.82 + + unit.current_step = "propose" + unit.result_data["proposed_actions"] = [ + {"lead_id": s["lead_id"], "action": "update_qualification", "score": s["score"]} + for s in scored + ] + unit.checkpoint["step"] = "propose" + + if unit.mode in (AutopilotMode.SIMULATION, AutopilotMode.RECOMMENDATION, AutopilotMode.DRAFT): + return + + if unit.mode == AutopilotMode.APPROVAL_GATED: + for s in scored: + unit.pending_approvals.append(PendingApproval( + action="update_qualification", + description_ar=f"تحديث تأهيل {s['name']} — درجة {s['score']}", + params={"lead_id": s["lead_id"], "score": s["score"]}, + )) + unit.status = RunStatus.AWAITING_APPROVAL + return + + unit.current_step = "execute" + for s in scored: + unit.side_effects.append(SideEffect( + action="qualify_lead", target=s["lead_id"], + detail=f"تأهيل: {s['score']} — فئة {s['tier']}", + )) + + unit.current_step = "verify" + + +async def _task_pipeline_health_check( + unit: AutopilotUnit, policy: AutopilotPolicy, +) -> None: + unit.current_step = "monitor" + unit.budget.consume_api_call() + + unit.current_step = "detect" + at_risk = [ + {"deal_id": "D100", "title": "مشروع تقنية المعلومات", "value": 250_000, "risk": "stalled"}, + {"deal_id": "D101", "title": "عقد صيانة سنوي", "value": 80_000, "risk": "competitor"}, + ] + unit.result_data["at_risk_deals"] = at_risk + + unit.current_step = "classify" + for deal in at_risk: + deal["urgency"] = "critical" if deal["value"] > 100_000 else "high" + + unit.current_step = "decide" + unit.confidence = 0.75 + unit.result_data["recommendations"] = [ + {"deal_id": d["deal_id"], "action_ar": "جدولة اجتماع عاجل مع العميل"} for d in at_risk + ] + + unit.current_step = "propose" + unit.checkpoint["step"] = "propose" + + +async def _task_daily_report( + unit: AutopilotUnit, policy: AutopilotPolicy, +) -> None: + unit.current_step = "monitor" + unit.budget.consume_api_call() + + unit.current_step = "detect" + unit.result_data["report"] = { + "date": datetime.now(timezone.utc).strftime("%Y-%m-%d"), + "new_leads": 12, "qualified": 5, "deals_won": 2, + "revenue_today": 180_000, "currency": "SAR", + "top_performer": "أحمد المطيري", + "at_risk_count": 3, + "summary_ar": "يوم إيجابي: صفقتان مغلقتان بقيمة 180 ألف ريال. 3 صفقات تحتاج متابعة.", + } + + unit.current_step = "classify" + unit.confidence = 0.95 + + unit.current_step = "propose" + unit.checkpoint["step"] = "propose" + + +async def _task_sequence_optimizer( + unit: AutopilotUnit, policy: AutopilotPolicy, +) -> None: + unit.current_step = "monitor" + unit.budget.consume_api_call() + + unit.current_step = "detect" + sequences = [ + {"id": "SEQ01", "name": "ترحيب عملاء جدد", "open_rate": 0.45, "reply_rate": 0.12}, + {"id": "SEQ02", "name": "متابعة بعد العرض", "open_rate": 0.62, "reply_rate": 0.25}, + ] + unit.result_data["sequences"] = sequences + + unit.current_step = "classify" + unit.current_step = "decide" + unit.confidence = 0.70 + suggestions = [] + for seq in sequences: + if seq["reply_rate"] < 0.15: + suggestions.append({ + "sequence_id": seq["id"], + "suggestion_ar": f"تحسين محتوى '{seq['name']}' — معدل الرد منخفض ({seq['reply_rate']:.0%})", + "proposed_change": "shorten_message", + }) + unit.result_data["suggestions"] = suggestions + + unit.current_step = "propose" + unit.checkpoint["step"] = "propose" + + +# ── Task Registry ────────────────────────────────────────────────── + +_TASK_HANDLERS: dict[str, Callable[[AutopilotUnit, AutopilotPolicy], Coroutine[Any, Any, None]]] = { + "follow_up_dormant_leads": _task_follow_up_dormant_leads, + "qualify_new_leads": _task_qualify_new_leads, + "pipeline_health_check": _task_pipeline_health_check, + "daily_report": _task_daily_report, + "sequence_optimizer": _task_sequence_optimizer, +} + + +# ── Autopilot Runner ─────────────────────────────────────────────── + +class AutopilotRunner: + """Runs autopilot tasks safely with budgets, policies, and checkpointing.""" + + def __init__(self, policy: Optional[AutopilotPolicy] = None) -> None: + self._policy = policy or AutopilotPolicy() + self._active_runs: dict[str, AutopilotUnit] = {} + + async def run( + self, + task_type: str, + mode: AutopilotMode, + params: dict[str, Any], + budget: Optional[AutopilotBudget] = None, + tenant_id: str = "", + agent_id: str = "", + ) -> AutopilotResult: + handler = _TASK_HANDLERS.get(task_type) + if not handler: + return AutopilotResult( + run_id=str(uuid.uuid4()), task_type=task_type, mode=mode, + status=RunStatus.FAILED, + summary_ar=f"مهمة غير معروفة: {task_type}", + ) + + unit = AutopilotUnit( + agent_id=agent_id, tenant_id=tenant_id, task_type=task_type, + mode=mode, + budget=budget or AutopilotBudget( + api_calls=self._policy.max_api_calls, + messages=self._policy.max_messages_per_hour, + max_duration_minutes=self._policy.max_run_duration_minutes, + ), + ) + self._active_runs[unit.run_id] = unit + start = datetime.now(timezone.utc) + deadline = start + timedelta(minutes=unit.budget.max_duration_minutes) + + logger.info( + "[Autopilot] بدء run=%s task=%s mode=%s tenant=%s", + unit.run_id, task_type, mode.value, tenant_id, + ) + + try: + if self._policy.kill_switch_enabled and datetime.now(timezone.utc) > deadline: + unit.status = RunStatus.FAILED + unit.error = "تم تجاوز الحد الزمني المسموح" + else: + await handler(unit, self._policy) + if unit.status == RunStatus.RUNNING: + unit.status = RunStatus.COMPLETED + except Exception as exc: + logger.exception("[Autopilot] فشل run=%s: %s", unit.run_id, exc) + unit.status = RunStatus.FAILED + unit.error = str(exc) + + end = datetime.now(timezone.utc) + unit.completed_at = end + duration_ms = int((end - start).total_seconds() * 1000) + + steps_done = [] + for step in AUTOPILOT_STEPS: + steps_done.append(step) + if step == unit.current_step: + break + + result = AutopilotResult( + run_id=unit.run_id, task_type=task_type, mode=mode, + status=unit.status, steps_completed=steps_done, + findings=unit.result_data.get("at_risk_deals", unit.result_data.get("dormant_leads", [])), + actions_taken=[se.model_dump() for se in unit.side_effects], + actions_proposed=unit.result_data.get("proposed_actions", []), + side_effects=unit.side_effects, + confidence=unit.confidence, duration_ms=duration_ms, + summary_ar=self._build_summary(unit), + ) + + logger.info( + "[Autopilot] انتهاء run=%s status=%s dur=%dms", + unit.run_id, unit.status.value, duration_ms, + ) + return result + + async def pause(self, run_id: str) -> bool: + unit = self._active_runs.get(run_id) + if not unit or unit.status != RunStatus.RUNNING: + return False + unit.status = RunStatus.PAUSED + logger.info("[Autopilot] إيقاف مؤقت run=%s at step=%s", run_id, unit.current_step) + return True + + async def resume(self, run_id: str) -> Optional[AutopilotResult]: + unit = self._active_runs.get(run_id) + if not unit or unit.status not in (RunStatus.PAUSED, RunStatus.AWAITING_APPROVAL): + return None + unit.status = RunStatus.RUNNING + logger.info("[Autopilot] استئناف run=%s from step=%s", run_id, unit.current_step) + handler = _TASK_HANDLERS.get(unit.task_type) + if handler: + try: + await handler(unit, self._policy) + if unit.status == RunStatus.RUNNING: + unit.status = RunStatus.COMPLETED + except Exception as exc: + unit.status = RunStatus.FAILED + unit.error = str(exc) + return AutopilotResult( + run_id=unit.run_id, task_type=unit.task_type, mode=unit.mode, + status=unit.status, confidence=unit.confidence, + summary_ar=self._build_summary(unit), + ) + + async def abort(self, run_id: str) -> bool: + unit = self._active_runs.get(run_id) + if not unit: + return False + unit.status = RunStatus.ABORTED + unit.completed_at = datetime.now(timezone.utc) + logger.info("[Autopilot] إلغاء run=%s", run_id) + return True + + async def approve_pending(self, run_id: str, approval_id: str, approved_by: str) -> bool: + unit = self._active_runs.get(run_id) + if not unit: + return False + for pa in unit.pending_approvals: + if pa.id == approval_id: + pa.approved = True + pa.approved_by = approved_by + logger.info("[Autopilot] تمت الموافقة approval=%s by=%s", approval_id, approved_by) + return True + return False + + async def get_status(self, run_id: str) -> Optional[AutopilotUnit]: + return self._active_runs.get(run_id) + + def list_active(self, tenant_id: Optional[str] = None) -> list[AutopilotUnit]: + runs = list(self._active_runs.values()) + if tenant_id: + runs = [r for r in runs if r.tenant_id == tenant_id] + return [r for r in runs if r.status in (RunStatus.RUNNING, RunStatus.PAUSED, RunStatus.AWAITING_APPROVAL)] + + def list_supported_tasks(self) -> list[dict[str, str]]: + _TASK_META = { + "follow_up_dormant_leads": { + "name_ar": "متابعة العملاء الخاملين", + "desc_ar": "البحث عن عملاء بدون نشاط لأكثر من 3 أيام وصياغة رسائل متابعة", + }, + "qualify_new_leads": { + "name_ar": "تأهيل العملاء الجدد", + "desc_ar": "تقييم وتأهيل العملاء المحتملين الجدد تلقائياً", + }, + "pipeline_health_check": { + "name_ar": "فحص صحة خط الأنابيب", + "desc_ar": "تحليل خط الأنابيب والكشف عن الصفقات المعرضة للخطر", + }, + "daily_report": { + "name_ar": "التقرير اليومي", + "desc_ar": "إنشاء ملخص يومي لأداء المبيعات", + }, + "sequence_optimizer": { + "name_ar": "تحسين التسلسلات", + "desc_ar": "تحليل أداء التسلسلات واقتراح تحسينات", + }, + } + return [ + {"task_type": k, **_TASK_META.get(k, {"name_ar": k, "desc_ar": ""})} + for k in _TASK_HANDLERS + ] + + @staticmethod + def _build_summary(unit: AutopilotUnit) -> str: + if unit.status == RunStatus.FAILED: + return f"فشل التنفيذ: {unit.error or 'خطأ غير محدد'}" + if unit.status == RunStatus.ABORTED: + return "تم إلغاء المهمة" + if unit.status == RunStatus.AWAITING_APPROVAL: + return f"بانتظار الموافقة على {len(unit.pending_approvals)} إجراء" + if unit.status == RunStatus.PAUSED: + return f"متوقف مؤقتاً عند الخطوة: {unit.current_step}" + + effects = len(unit.side_effects) + proposed = len(unit.result_data.get("proposed_actions", [])) + parts = [f"تم التنفيذ بنجاح (ثقة {unit.confidence:.0%})"] + if effects: + parts.append(f"— {effects} إجراء منفّذ") + if proposed: + parts.append(f"— {proposed} إجراء مقترح") + return " ".join(parts) diff --git a/salesflow-saas/backend/app/services/behavior_intelligence.py b/salesflow-saas/backend/app/services/behavior_intelligence.py new file mode 100644 index 00000000..496df013 --- /dev/null +++ b/salesflow-saas/backend/app/services/behavior_intelligence.py @@ -0,0 +1,488 @@ +""" +Behavior Intelligence — Pattern detection engine for Dealix CRM. + +Watch-mode analytics that detects winning sequences, top-rep behaviours, +optimal contact times, at-risk deals, and generates Arabic recommendations. +Operates on in-memory signal history; no autonomous actions taken. +""" + +from __future__ import annotations + +import logging +import uuid +from collections import defaultdict +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger("dealix.services.behavior_intelligence") + +# --------------------------------------------------------------------------- +# Models +# --------------------------------------------------------------------------- + + +class TrackedPattern(BaseModel): + id: str = Field(default_factory=lambda: uuid.uuid4().hex) + tenant_id: str + pattern_type: str # winning_sequence, fast_close, high_conversion_rep, at_risk, best_time + description: str + description_ar: str + confidence: float = 0.0 # 0-1 + frequency: int = 1 + entities_involved: List[str] = [] + first_seen: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + last_seen: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + suggested_action: str = "" + suggested_action_ar: str = "" + + +class Recommendation(BaseModel): + id: str = Field(default_factory=lambda: uuid.uuid4().hex) + tenant_id: str + category: str # performance, sequence, timing, risk + title_ar: str + detail_ar: str + impact: str # high, medium, low + confidence: float = 0.0 + source_patterns: List[str] = [] + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +# --------------------------------------------------------------------------- +# Simulated data layer +# --------------------------------------------------------------------------- +# In production these would query PostgreSQL aggregates. Here we keep +# lightweight dicts that can be seeded or fed from the signal engine. + + +class _TenantData: + """Holds simulated analytics data for a single tenant.""" + + def __init__(self) -> None: + # rep_id -> stats + self.rep_stats: Dict[str, Dict[str, Any]] = {} + # sequence_id -> stats + self.sequence_stats: Dict[str, Dict[str, Any]] = {} + # deal_id -> stats + self.deal_stats: Dict[str, Dict[str, Any]] = {} + # hour -> response count + self.hourly_responses: Dict[int, int] = defaultdict(int) + # day-of-week (0=Mon) -> response count + self.daily_responses: Dict[int, int] = defaultdict(int) + + +_tenant_data: Dict[str, _TenantData] = defaultdict(_TenantData) + + +def seed_tenant_data(tenant_id: str, data: _TenantData) -> None: + """Allow external seeding (tests, signal ingest pipeline).""" + _tenant_data[tenant_id] = data + + +def _get_data(tenant_id: str) -> _TenantData: + return _tenant_data[tenant_id] + + +# --------------------------------------------------------------------------- +# Core Service +# --------------------------------------------------------------------------- + + +class BehaviorIntelligence: + """ + Detects behavioural patterns across reps, sequences, contact timing, + and deal health. All analysis is read-only (watch mode). + """ + + # ── Rep Performance ─────────────────────────────────────── + + async def analyze_rep_performance(self, tenant_id: str) -> List[TrackedPattern]: + """Find top-performing reps and what differentiates them.""" + data = _get_data(tenant_id) + patterns: List[TrackedPattern] = [] + + if not data.rep_stats: + # Generate representative sample when no data yet + data.rep_stats = _sample_rep_stats() + + reps = data.rep_stats + if not reps: + return patterns + + # Find top closer + by_close = sorted(reps.items(), key=lambda r: r[1].get("close_rate", 0), reverse=True) + if by_close: + top_id, top = by_close[0] + cr = top.get("close_rate", 0) + avg_resp = top.get("avg_response_min", 0) + name = top.get("name", top_id[:8]) + patterns.append(TrackedPattern( + tenant_id=tenant_id, + pattern_type="high_conversion_rep", + description=f"Rep {name} closes at {cr:.0%} with avg response {avg_resp}min", + description_ar=f"{name} يغلق بنسبة {cr:.0%} مع متوسط استجابة {avg_resp} دقيقة", + confidence=min(1.0, cr + 0.1), + frequency=top.get("deals_closed", 1), + entities_involved=[top_id], + suggested_action=f"Replicate {name}'s follow-up cadence across the team", + suggested_action_ar=f"انسخ نمط متابعة {name} لبقية الفريق", + )) + + # Find fastest closer + by_speed = sorted(reps.items(), key=lambda r: r[1].get("avg_days_to_close", 999)) + if by_speed: + fast_id, fast = by_speed[0] + days = fast.get("avg_days_to_close", 0) + name = fast.get("name", fast_id[:8]) + patterns.append(TrackedPattern( + tenant_id=tenant_id, + pattern_type="fast_close", + description=f"{name} avg close in {days} days", + description_ar=f"{name} يغلق الصفقات في متوسط {days} أيام", + confidence=0.75, + frequency=fast.get("deals_closed", 1), + entities_involved=[fast_id], + suggested_action=f"Study {name}'s discovery call technique", + suggested_action_ar=f"ادرس أسلوب {name} في مكالمة الاستكشاف", + )) + + # Find slow responder (coaching opportunity) + by_resp = sorted(reps.items(), key=lambda r: r[1].get("avg_response_min", 0), reverse=True) + if by_resp: + slow_id, slow = by_resp[0] + avg_r = slow.get("avg_response_min", 0) + if avg_r > 60: + name = slow.get("name", slow_id[:8]) + patterns.append(TrackedPattern( + tenant_id=tenant_id, + pattern_type="slow_responder", + description=f"{name} avg response {avg_r}min — above 60min threshold", + description_ar=f"{name} متوسط استجابة {avg_r} دقيقة — أعلى من الحد المقبول", + confidence=0.80, + frequency=1, + entities_involved=[slow_id], + suggested_action=f"Coach {name} on response time; set mobile alerts", + suggested_action_ar=f"درّب {name} على سرعة الاستجابة وفعّل التنبيهات", + )) + + return patterns + + # ── Winning Sequences ───────────────────────────────────── + + async def analyze_winning_sequences(self, tenant_id: str) -> List[TrackedPattern]: + """Identify sequence templates with highest conversion rates.""" + data = _get_data(tenant_id) + patterns: List[TrackedPattern] = [] + + if not data.sequence_stats: + data.sequence_stats = _sample_sequence_stats() + + seqs = data.sequence_stats + by_conv = sorted(seqs.items(), key=lambda s: s[1].get("conversion_rate", 0), reverse=True) + + for seq_id, stats in by_conv[:3]: + name = stats.get("name", seq_id[:8]) + name_ar = stats.get("name_ar", name) + cr = stats.get("conversion_rate", 0) + enrolled = stats.get("enrolled", 0) + + patterns.append(TrackedPattern( + tenant_id=tenant_id, + pattern_type="winning_sequence", + description=f"Sequence '{name}' converts at {cr:.0%} ({enrolled} enrolled)", + description_ar=f"تسلسل '{name_ar}' يحقق تحويل {cr:.0%} ({enrolled} مسجل)", + confidence=min(1.0, 0.5 + cr), + frequency=enrolled, + entities_involved=[seq_id], + suggested_action=f"Use '{name}' as default for similar leads", + suggested_action_ar=f"استخدم '{name_ar}' كتسلسل افتراضي للعملاء المشابهين", + )) + + # Compare top vs average + if len(by_conv) >= 2: + top_cr = by_conv[0][1].get("conversion_rate", 0) + avg_cr = sum(s.get("conversion_rate", 0) for _, s in by_conv) / len(by_conv) + if avg_cr > 0: + multiplier = round(top_cr / avg_cr, 1) + top_name_ar = by_conv[0][1].get("name_ar", by_conv[0][0][:8]) + patterns.append(TrackedPattern( + tenant_id=tenant_id, + pattern_type="winning_sequence", + description=f"Top sequence outperforms average by {multiplier}x", + description_ar=f"تسلسل '{top_name_ar}' يحقق {multiplier}x تحويل مقارنة بالمتوسط", + confidence=0.85, + frequency=1, + entities_involved=[by_conv[0][0]], + suggested_action="Migrate underperforming sequences to the top template", + suggested_action_ar="انقل التسلسلات الضعيفة إلى القالب الأفضل", + )) + + return patterns + + # ── Best Contact Times ──────────────────────────────────── + + async def analyze_best_contact_times(self, tenant_id: str) -> Dict[str, Any]: + """When do leads respond most? Returns hour/day heat map.""" + data = _get_data(tenant_id) + + if not data.hourly_responses: + # Seed typical Saudi business patterns + for h in range(24): + if 9 <= h <= 12: + data.hourly_responses[h] = 35 + (h - 9) * 5 + elif 16 <= h <= 20: + data.hourly_responses[h] = 40 + (20 - h) * 3 + elif 13 <= h <= 15: + data.hourly_responses[h] = 15 + else: + data.hourly_responses[h] = 5 + + if not data.daily_responses: + # Sunday-Thursday work week in KSA + data.daily_responses = { + 0: 30, 1: 25, 2: 35, 3: 20, 4: 15, # Mon-Fri + 5: 5, 6: 40, # Sat, Sun — Sun is work day in KSA + } + + hourly = dict(data.hourly_responses) + daily = dict(data.daily_responses) + + best_hour = max(hourly, key=hourly.get) # type: ignore[arg-type] + best_day = max(daily, key=daily.get) # type: ignore[arg-type] + + day_names_ar = { + 0: "الإثنين", 1: "الثلاثاء", 2: "الأربعاء", + 3: "الخميس", 4: "الجمعة", 5: "السبت", 6: "الأحد", + } + + period = "صباحا" if best_hour < 12 else "مساء" + display_hour = best_hour if best_hour <= 12 else best_hour - 12 + + return { + "tenant_id": tenant_id, + "best_hour": best_hour, + "best_hour_ar": f"{display_hour} {period}", + "best_day": best_day, + "best_day_ar": day_names_ar.get(best_day, ""), + "hourly_distribution": hourly, + "daily_distribution": {day_names_ar.get(d, str(d)): c for d, c in daily.items()}, + "recommendation_ar": ( + f"أفضل وقت للتواصل: {day_names_ar.get(best_day, '')} الساعة {display_hour} {period}" + ), + } + + # ── At-Risk Detection ───────────────────────────────────── + + async def detect_at_risk_patterns(self, tenant_id: str) -> List[TrackedPattern]: + """Find deals going cold or leads losing interest.""" + data = _get_data(tenant_id) + patterns: List[TrackedPattern] = [] + + if not data.deal_stats: + data.deal_stats = _sample_deal_stats() + + now = datetime.now(timezone.utc) + + for deal_id, stats in data.deal_stats.items(): + title = stats.get("title", deal_id[:8]) + last_activity_str = stats.get("last_activity") + stage = stats.get("stage", "") + + if stage in ("closed_won", "closed_lost"): + continue + + if last_activity_str: + try: + last_dt = datetime.fromisoformat(last_activity_str) + except (ValueError, TypeError): + last_dt = now - timedelta(days=3) + else: + last_dt = now - timedelta(days=5) + + days_idle = (now - last_dt).days + + if days_idle >= 7: + confidence = min(1.0, 0.5 + days_idle * 0.05) + patterns.append(TrackedPattern( + tenant_id=tenant_id, + pattern_type="at_risk_deal", + description=f"Deal '{title}' idle for {days_idle} days", + description_ar=f"صفقة '{title}' بدون نشاط منذ {days_idle} أيام", + confidence=confidence, + frequency=1, + entities_involved=[deal_id], + suggested_action=f"Re-engage on deal '{title}' immediately", + suggested_action_ar=f"أعد التواصل بخصوص صفقة '{title}' فورا", + )) + elif days_idle >= 3: + patterns.append(TrackedPattern( + tenant_id=tenant_id, + pattern_type="cooling_deal", + description=f"Deal '{title}' cooling — {days_idle} days since last touch", + description_ar=f"صفقة '{title}' تبرد — {days_idle} أيام منذ آخر تواصل", + confidence=0.55, + frequency=1, + entities_involved=[deal_id], + suggested_action=f"Schedule follow-up for deal '{title}'", + suggested_action_ar=f"جدول متابعة لصفقة '{title}'", + )) + + return patterns + + # ── Recommendations ─────────────────────────────────────── + + async def get_recommendations(self, tenant_id: str) -> List[Dict[str, Any]]: + """Generate Arabic recommendations from all detected patterns.""" + rep_patterns = await self.analyze_rep_performance(tenant_id) + seq_patterns = await self.analyze_winning_sequences(tenant_id) + time_analysis = await self.analyze_best_contact_times(tenant_id) + risk_patterns = await self.detect_at_risk_patterns(tenant_id) + + recommendations: List[Recommendation] = [] + + # From rep patterns + for p in rep_patterns: + if p.pattern_type == "high_conversion_rep": + recommendations.append(Recommendation( + tenant_id=tenant_id, + category="performance", + title_ar="نمط إغلاق ناجح", + detail_ar=p.description_ar + " — " + p.suggested_action_ar, + impact="high", + confidence=p.confidence, + source_patterns=[p.id], + )) + elif p.pattern_type == "slow_responder": + recommendations.append(Recommendation( + tenant_id=tenant_id, + category="performance", + title_ar="فرصة تحسين سرعة الاستجابة", + detail_ar=p.description_ar + " — " + p.suggested_action_ar, + impact="medium", + confidence=p.confidence, + source_patterns=[p.id], + )) + + # From sequence patterns + for p in seq_patterns: + recommendations.append(Recommendation( + tenant_id=tenant_id, + category="sequence", + title_ar="تسلسل عالي الأداء", + detail_ar=p.description_ar, + impact="high" if p.confidence > 0.7 else "medium", + confidence=p.confidence, + source_patterns=[p.id], + )) + + # From timing + if time_analysis.get("recommendation_ar"): + recommendations.append(Recommendation( + tenant_id=tenant_id, + category="timing", + title_ar="أفضل وقت للتواصل", + detail_ar=time_analysis["recommendation_ar"], + impact="medium", + confidence=0.80, + )) + + # From risk patterns + critical_risks = [p for p in risk_patterns if p.pattern_type == "at_risk_deal"] + if critical_risks: + names = ", ".join(p.entities_involved[0][:8] for p in critical_risks[:5]) + recommendations.append(Recommendation( + tenant_id=tenant_id, + category="risk", + title_ar="صفقات معرضة للخطر", + detail_ar=f"{len(critical_risks)} صفقات بدون نشاط لأكثر من أسبوع: {names}", + impact="high", + confidence=0.85, + source_patterns=[p.id for p in critical_risks[:5]], + )) + + return [r.model_dump() for r in recommendations] + + +# --------------------------------------------------------------------------- +# Sample data generators (used when no real data exists yet) +# --------------------------------------------------------------------------- + + +def _sample_rep_stats() -> Dict[str, Dict[str, Any]]: + return { + "rep_001": { + "name": "أحمد", "close_rate": 0.42, "avg_response_min": 18, + "avg_days_to_close": 12, "deals_closed": 28, "follow_ups_per_deal": 4.2, + }, + "rep_002": { + "name": "سارة", "close_rate": 0.35, "avg_response_min": 25, + "avg_days_to_close": 15, "deals_closed": 22, "follow_ups_per_deal": 3.1, + }, + "rep_003": { + "name": "خالد", "close_rate": 0.28, "avg_response_min": 75, + "avg_days_to_close": 22, "deals_closed": 15, "follow_ups_per_deal": 2.0, + }, + } + + +def _sample_sequence_stats() -> Dict[str, Dict[str, Any]]: + return { + "seq_001": { + "name": "VIP Real Estate", "name_ar": "عقارات VIP", + "conversion_rate": 0.38, "enrolled": 120, "avg_steps": 4, + }, + "seq_002": { + "name": "Tech Startup Outreach", "name_ar": "تواصل الشركات الناشئة", + "conversion_rate": 0.25, "enrolled": 85, "avg_steps": 5, + }, + "seq_003": { + "name": "Standard Follow-up", "name_ar": "المتابعة العادية", + "conversion_rate": 0.12, "enrolled": 200, "avg_steps": 6, + }, + } + + +def _sample_deal_stats() -> Dict[str, Dict[str, Any]]: + now = datetime.now(timezone.utc) + return { + "deal_001": { + "title": "عقد صيانة المبنى", + "stage": "negotiation", + "value": 250000, + "last_activity": (now - timedelta(days=10)).isoformat(), + }, + "deal_002": { + "title": "ترخيص برمجيات", + "stage": "proposal", + "value": 180000, + "last_activity": (now - timedelta(days=4)).isoformat(), + }, + "deal_003": { + "title": "خدمات استشارية", + "stage": "discovery", + "value": 95000, + "last_activity": (now - timedelta(days=1)).isoformat(), + }, + "deal_004": { + "title": "نظام ERP", + "stage": "negotiation", + "value": 500000, + "last_activity": (now - timedelta(days=8)).isoformat(), + }, + } + + +# --------------------------------------------------------------------------- +# Module-level singleton +# --------------------------------------------------------------------------- + +_instance: Optional[BehaviorIntelligence] = None + + +def get_behavior_intelligence() -> BehaviorIntelligence: + global _instance + if _instance is None: + _instance = BehaviorIntelligence() + return _instance diff --git a/salesflow-saas/backend/app/services/escalation.py b/salesflow-saas/backend/app/services/escalation.py new file mode 100644 index 00000000..3a8c6288 --- /dev/null +++ b/salesflow-saas/backend/app/services/escalation.py @@ -0,0 +1,443 @@ +""" +Escalation Service — Dealix AI Revenue OS +============================================ +نظام التصعيد: إدارة حلقة الإنسان في العملية (Human-in-the-Loop). +- إنشاء حزم تصعيد مع سياق كامل +- تعيين ومتابعة وحل التصعيدات +- قواعد تصعيد تلقائية لحالات محددة +- استئناف سير العمل بعد الحل +""" +from __future__ import annotations + +import logging +import uuid +from collections import defaultdict +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import Any, Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +# ── Enums ─────────────────────────────────────────────────────────── + +class EscalationReason(str, Enum): + VALIDATION_FAILURE = "validation_failure" + MISSING_DATA = "missing_data" + PERMISSION_ISSUE = "permission_issue" + TIMEOUT = "timeout" + AMBIGUOUS_DATA = "ambiguous_data" + LOW_CONFIDENCE = "low_confidence" + HIGH_VALUE_DEAL = "high_value_deal" + CUSTOMER_COMPLAINT = "customer_complaint" + CONSENT_EXPIRED = "consent_expired" + DELIVERY_FAILURE = "delivery_failure" + + +class EscalationPriority(str, Enum): + CRITICAL = "critical" + HIGH = "high" + MEDIUM = "medium" + LOW = "low" + + +class EscalationStatus(str, Enum): + PENDING = "pending" + IN_PROGRESS = "in_progress" + RESOLVED = "resolved" + EXPIRED = "expired" + + +# ── Arabic labels ─────────────────────────────────────────────────── + +_REASON_AR: dict[EscalationReason, str] = { + EscalationReason.VALIDATION_FAILURE: "فشل التحقق من البيانات", + EscalationReason.MISSING_DATA: "بيانات مفقودة", + EscalationReason.PERMISSION_ISSUE: "مشكلة في الصلاحيات", + EscalationReason.TIMEOUT: "انتهاء المهلة الزمنية", + EscalationReason.AMBIGUOUS_DATA: "بيانات غامضة تحتاج توضيح", + EscalationReason.LOW_CONFIDENCE: "ثقة منخفضة في النتيجة", + EscalationReason.HIGH_VALUE_DEAL: "صفقة عالية القيمة", + EscalationReason.CUSTOMER_COMPLAINT: "شكوى عميل", + EscalationReason.CONSENT_EXPIRED: "انتهاء صلاحية الموافقة (PDPL)", + EscalationReason.DELIVERY_FAILURE: "فشل متكرر في التوصيل", +} + +_PRIORITY_AR: dict[EscalationPriority, str] = { + EscalationPriority.CRITICAL: "حرج", + EscalationPriority.HIGH: "عالي", + EscalationPriority.MEDIUM: "متوسط", + EscalationPriority.LOW: "منخفض", +} + + +# ── Models ────────────────────────────────────────────────────────── + +class EscalationArtifact(BaseModel): + type: str = "text" + name: str = "" + content: str = "" + url: Optional[str] = None + + +class EscalationPacket(BaseModel): + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + tenant_id: str = "" + title: str + title_ar: str + entity_type: str + entity_id: str + workflow_name: str = "" + failed_step: str = "" + reason: EscalationReason + missing_data: list[str] = [] + priority: EscalationPriority = EscalationPriority.MEDIUM + due_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc) + timedelta(hours=4) + ) + risk_if_delayed: str = "" + risk_if_delayed_ar: str = "" + artifacts: list[EscalationArtifact] = [] + resume_token: str = Field(default_factory=lambda: str(uuid.uuid4())) + suggested_action: str = "" + suggested_action_ar: str = "" + confidence: float = 0.0 + status: EscalationStatus = EscalationStatus.PENDING + assigned_to: Optional[str] = None + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + resolved_at: Optional[datetime] = None + resolution_data: dict[str, Any] = {} + resolution_notes: str = "" + + +class ResolutionInput(BaseModel): + action_taken: str + notes: str = "" + override_data: dict[str, Any] = {} + resume_workflow: bool = False + + +class EscalationStats(BaseModel): + total: int = 0 + by_priority: dict[str, int] = Field(default_factory=dict) + by_status: dict[str, int] = Field(default_factory=dict) + by_reason: dict[str, int] = Field(default_factory=dict) + avg_resolution_minutes: float = 0.0 + oldest_pending_hours: float = 0.0 + overdue_count: int = 0 + + +# ── Auto-escalation rules ────────────────────────────────────────── + +class AutoEscalationRule(BaseModel): + id: str + name_ar: str + condition: str + priority: EscalationPriority + target_role: str + reason: EscalationReason + suggested_action_ar: str + + +DEFAULT_RULES: list[AutoEscalationRule] = [ + AutoEscalationRule( + id="rule_high_value_deal", + name_ar="صفقة تتجاوز 100 ألف ريال", + condition="deal_value_sar > 100000", + priority=EscalationPriority.HIGH, + target_role="manager", + reason=EscalationReason.HIGH_VALUE_DEAL, + suggested_action_ar="مراجعة الصفقة والموافقة على استراتيجية التفاوض", + ), + AutoEscalationRule( + id="rule_no_response_5d", + name_ar="عدم رد لأكثر من 5 أيام", + condition="days_since_last_response > 5", + priority=EscalationPriority.MEDIUM, + target_role="assigned_rep", + reason=EscalationReason.TIMEOUT, + suggested_action_ar="الاتصال بالعميل عبر قناة بديلة أو تصعيد للمدير", + ), + AutoEscalationRule( + id="rule_low_confidence", + name_ar="ثقة ذكاء اصطناعي منخفضة", + condition="ai_confidence < 0.3", + priority=EscalationPriority.HIGH, + target_role="human_reviewer", + reason=EscalationReason.LOW_CONFIDENCE, + suggested_action_ar="مراجعة يدوية للقرار — الذكاء الاصطناعي غير واثق من النتيجة", + ), + AutoEscalationRule( + id="rule_consent_expired", + name_ar="انتهاء موافقة PDPL", + condition="consent_expired == true", + priority=EscalationPriority.CRITICAL, + target_role="compliance", + reason=EscalationReason.CONSENT_EXPIRED, + suggested_action_ar="إيقاف جميع الاتصالات فوراً وطلب تجديد الموافقة", + ), + AutoEscalationRule( + id="rule_delivery_failed_3x", + name_ar="فشل التوصيل 3 مرات متتالية", + condition="delivery_failures >= 3", + priority=EscalationPriority.MEDIUM, + target_role="assigned_rep", + reason=EscalationReason.DELIVERY_FAILURE, + suggested_action_ar="التحقق من رقم العميل واستخدام قناة بديلة (بريد إلكتروني أو SMS)", + ), +] + + +# ── Workflow resume registry ──────────────────────────────────────── + +_workflow_resume_handlers: dict[str, Any] = {} + + +def register_workflow_resume(workflow_name: str, handler: Any) -> None: + _workflow_resume_handlers[workflow_name] = handler + logger.info("تسجيل معالج استئناف لسير العمل: %s", workflow_name) + + +# ── Escalation Service ────────────────────────────────────────────── + +class EscalationService: + """Manages human-in-the-loop escalation packets.""" + + def __init__(self, rules: Optional[list[AutoEscalationRule]] = None) -> None: + self._store: dict[str, EscalationPacket] = {} + self._rules = rules or DEFAULT_RULES + self._history: list[EscalationPacket] = [] + self._max_history = 10_000 + + async def create(self, packet: EscalationPacket) -> EscalationPacket: + packet.status = EscalationStatus.PENDING + if not packet.id: + packet.id = str(uuid.uuid4()) + if not packet.resume_token: + packet.resume_token = str(uuid.uuid4()) + self._store[packet.id] = packet + logger.info( + "[Escalation] إنشاء تصعيد id=%s priority=%s reason=%s entity=%s/%s tenant=%s", + packet.id, packet.priority.value, packet.reason.value, + packet.entity_type, packet.entity_id, packet.tenant_id, + ) + return packet + + async def assign(self, escalation_id: str, user_id: str) -> Optional[EscalationPacket]: + packet = self._store.get(escalation_id) + if not packet: + logger.warning("[Escalation] تصعيد غير موجود: %s", escalation_id) + return None + if packet.status == EscalationStatus.RESOLVED: + logger.warning("[Escalation] محاولة تعيين تصعيد محلول: %s", escalation_id) + return packet + packet.assigned_to = user_id + packet.status = EscalationStatus.IN_PROGRESS + logger.info("[Escalation] تعيين %s إلى %s", escalation_id, user_id) + return packet + + async def resolve( + self, + escalation_id: str, + resolution: ResolutionInput, + user_id: str, + ) -> Optional[EscalationPacket]: + packet = self._store.get(escalation_id) + if not packet: + logger.warning("[Escalation] تصعيد غير موجود: %s", escalation_id) + return None + if packet.status == EscalationStatus.RESOLVED: + return packet + + now = datetime.now(timezone.utc) + packet.status = EscalationStatus.RESOLVED + packet.resolved_at = now + packet.resolution_data = { + "action_taken": resolution.action_taken, + "override_data": resolution.override_data, + "resolved_by": user_id, + } + packet.resolution_notes = resolution.notes + + self._history.append(packet) + if len(self._history) > self._max_history: + self._history = self._history[-self._max_history:] + + logger.info( + "[Escalation] حل تصعيد id=%s by=%s dur=%s", + escalation_id, user_id, + str(now - packet.created_at) if packet.created_at else "N/A", + ) + + if resolution.resume_workflow: + await self._try_resume_workflow(packet) + + return packet + + async def resume_workflow(self, escalation_id: str) -> dict[str, Any]: + packet = self._store.get(escalation_id) + if not packet: + return {"success": False, "error": "تصعيد غير موجود"} + if packet.status != EscalationStatus.RESOLVED: + return {"success": False, "error": "التصعيد لم يُحل بعد"} + return await self._try_resume_workflow(packet) + + async def _try_resume_workflow(self, packet: EscalationPacket) -> dict[str, Any]: + handler = _workflow_resume_handlers.get(packet.workflow_name) + if not handler: + logger.info( + "[Escalation] لا يوجد معالج استئناف لسير العمل: %s", + packet.workflow_name, + ) + return { + "success": False, + "error": f"لا يوجد معالج استئناف لـ {packet.workflow_name}", + } + try: + result = await handler( + resume_token=packet.resume_token, + entity_type=packet.entity_type, + entity_id=packet.entity_id, + resolution_data=packet.resolution_data, + ) + logger.info( + "[Escalation] استئناف سير العمل %s للتصعيد %s", + packet.workflow_name, packet.id, + ) + return {"success": True, "result": result} + except Exception as exc: + logger.exception("[Escalation] فشل استئناف سير العمل: %s", exc) + return {"success": False, "error": str(exc)} + + async def expire_overdue(self, tenant_id: str) -> int: + now = datetime.now(timezone.utc) + count = 0 + for packet in self._store.values(): + if ( + packet.tenant_id == tenant_id + and packet.status in (EscalationStatus.PENDING, EscalationStatus.IN_PROGRESS) + and packet.due_at < now + ): + packet.status = EscalationStatus.EXPIRED + count += 1 + logger.info("[Escalation] انتهاء صلاحية تصعيد: %s", packet.id) + return count + + async def list_pending( + self, + tenant_id: str, + priority: Optional[EscalationPriority] = None, + ) -> list[EscalationPacket]: + results = [ + p for p in self._store.values() + if p.tenant_id == tenant_id and p.status in ( + EscalationStatus.PENDING, EscalationStatus.IN_PROGRESS, + ) + ] + if priority: + results = [p for p in results if p.priority == priority] + results.sort(key=lambda p: ( + list(EscalationPriority).index(p.priority), p.created_at, + )) + return results + + async def get(self, escalation_id: str) -> Optional[EscalationPacket]: + return self._store.get(escalation_id) + + async def get_stats(self, tenant_id: str) -> EscalationStats: + now = datetime.now(timezone.utc) + tenant_packets = [p for p in self._store.values() if p.tenant_id == tenant_id] + + by_priority: dict[str, int] = defaultdict(int) + by_status: dict[str, int] = defaultdict(int) + by_reason: dict[str, int] = defaultdict(int) + resolution_times: list[float] = [] + oldest_pending_hours = 0.0 + overdue = 0 + + for p in tenant_packets: + by_priority[p.priority.value] += 1 + by_status[p.status.value] += 1 + by_reason[p.reason.value] += 1 + + if p.status == EscalationStatus.RESOLVED and p.resolved_at and p.created_at: + resolution_times.append( + (p.resolved_at - p.created_at).total_seconds() / 60.0 + ) + + if p.status in (EscalationStatus.PENDING, EscalationStatus.IN_PROGRESS): + age_h = (now - p.created_at).total_seconds() / 3600.0 + oldest_pending_hours = max(oldest_pending_hours, age_h) + if p.due_at < now: + overdue += 1 + + # Include resolved history for this tenant + for p in self._history: + if p.tenant_id == tenant_id and p.id not in self._store: + if p.resolved_at and p.created_at: + resolution_times.append( + (p.resolved_at - p.created_at).total_seconds() / 60.0 + ) + + return EscalationStats( + total=len(tenant_packets), + by_priority=dict(by_priority), + by_status=dict(by_status), + by_reason=dict(by_reason), + avg_resolution_minutes=( + sum(resolution_times) / len(resolution_times) if resolution_times else 0.0 + ), + oldest_pending_hours=round(oldest_pending_hours, 2), + overdue_count=overdue, + ) + + async def check_auto_escalation( + self, + tenant_id: str, + context: dict[str, Any], + ) -> Optional[EscalationPacket]: + for rule in self._rules: + if self._evaluate_rule(rule, context): + packet = EscalationPacket( + tenant_id=tenant_id, + title=f"Auto-escalation: {rule.id}", + title_ar=rule.name_ar, + entity_type=context.get("entity_type", "unknown"), + entity_id=context.get("entity_id", ""), + workflow_name=context.get("workflow_name", ""), + failed_step=context.get("current_step", ""), + reason=rule.reason, + priority=rule.priority, + risk_if_delayed_ar=rule.suggested_action_ar, + suggested_action=rule.suggested_action_ar, + suggested_action_ar=rule.suggested_action_ar, + confidence=context.get("confidence", 0.0), + artifacts=[EscalationArtifact( + type="context", name="auto_escalation_context", + content=str(context), + )], + ) + created = await self.create(packet) + logger.info( + "[Escalation] تصعيد تلقائي rule=%s entity=%s/%s", + rule.id, packet.entity_type, packet.entity_id, + ) + return created + return None + + @staticmethod + def _evaluate_rule(rule: AutoEscalationRule, context: dict[str, Any]) -> bool: + cond = rule.condition + if "deal_value_sar > 100000" in cond: + return context.get("deal_value_sar", 0) > 100_000 + if "days_since_last_response > 5" in cond: + return context.get("days_since_last_response", 0) > 5 + if "ai_confidence < 0.3" in cond: + return context.get("confidence", 1.0) < 0.3 + if "consent_expired == true" in cond: + return context.get("consent_expired", False) is True + if "delivery_failures >= 3" in cond: + return context.get("delivery_failures", 0) >= 3 + return False diff --git a/salesflow-saas/backend/app/services/signal_intelligence.py b/salesflow-saas/backend/app/services/signal_intelligence.py new file mode 100644 index 00000000..8407119f --- /dev/null +++ b/salesflow-saas/backend/app/services/signal_intelligence.py @@ -0,0 +1,267 @@ +""" +Signal Intelligence Engine — Real-time signal distillation, deduplication, +importance scoring, and watchlist matching for Dealix CRM. +""" +from __future__ import annotations + +import hashlib, logging, uuid +from collections import defaultdict +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import Any, Dict, List, Optional +from pydantic import BaseModel, Field + +logger = logging.getLogger("dealix.services.signal_intelligence") + + +class SignalSource(str, Enum): + CRM_EVENT = "crm_event" + WHATSAPP = "whatsapp" + EMAIL = "email" + WEBSITE = "website" + COMPETITOR = "competitor" + SYSTEM = "system" + + +class SignalEvent(BaseModel): + id: str = Field(default_factory=lambda: uuid.uuid4().hex) + source: SignalSource + entity_type: str + entity_id: str + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + raw_payload: Dict[str, Any] = {} + normalized: Dict[str, Any] = {} + importance_score: float = 0.0 + urgency_score: float = 0.0 + sentiment: str = "neutral" + tags: List[str] = [] + tenant_id: str = "" + is_duplicate: bool = False + + +class Watchlist(BaseModel): + id: str = Field(default_factory=lambda: uuid.uuid4().hex) + tenant_id: str + name: str + name_ar: str + entity_type: str + entity_ids: List[str] = [] + keywords: List[str] = [] + alert_threshold: float = 0.5 + channels: List[str] = ["dashboard"] + is_active: bool = True + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +class SignalFilter(BaseModel): + source: Optional[SignalSource] = None + entity_type: Optional[str] = None + entity_id: Optional[str] = None + min_importance: float = 0.0 + sentiment: Optional[str] = None + tags: List[str] = [] + since: Optional[datetime] = None + limit: int = 50 + + +# ── Importance / urgency scoring tables ────────────────────────────────── +_IMPORTANCE: Dict[str, float] = { + "deal_stage_won": 0.95, "deal_stage_lost": 0.95, + "pdpl_consent_expiring": 0.90, "high_value_lead_responded": 0.85, + "competitor_price_change": 0.80, "meeting_booked": 0.75, + "new_lead_whatsapp": 0.70, "sequence_step_completed": 0.40, + "email_opened": 0.30, "page_visit": 0.20, "routine_update": 0.15, +} +_URGENCY: Dict[str, float] = { + "deal_stage_won": 0.90, "deal_stage_lost": 0.85, + "pdpl_consent_expiring": 0.95, "high_value_lead_responded": 0.80, + "competitor_price_change": 0.70, "meeting_booked": 0.65, + "new_lead_whatsapp": 0.60, "sequence_step_completed": 0.30, + "email_opened": 0.15, "page_visit": 0.10, "routine_update": 0.05, +} + +_POSITIVE_KW = {"شكرا", "ممتاز", "موافق", "نعم", "interested", "great", "won", "closed_won", "booked"} +_NEGATIVE_KW = {"غاضب", "مشكلة", "إلغاء", "lost", "closed_lost", "cancel", "complaint", "angry"} + + +def _classify(source: SignalSource, p: Dict[str, Any]) -> str: + et = str(p.get("event_type", "")).lower() + st = str(p.get("stage", "")).lower() + if source == SignalSource.CRM_EVENT: + if st in ("closed_won", "won"): return "deal_stage_won" + if st in ("closed_lost", "lost"): return "deal_stage_lost" + return {"meeting_booked": "meeting_booked", "high_value_response": "high_value_lead_responded", + "sequence_step_completed": "sequence_step_completed"}.get(et, "routine_update") + if source == SignalSource.WHATSAPP: + return "new_lead_whatsapp" if et == "new_lead" else "high_value_lead_responded" + if source == SignalSource.EMAIL: + return "email_opened" if et == "opened" else "routine_update" + if source == SignalSource.WEBSITE: + return "page_visit" + if source == SignalSource.COMPETITOR: + return "competitor_price_change" if "price" in et else "routine_update" + if source == SignalSource.SYSTEM: + return "pdpl_consent_expiring" if ("consent" in et or "pdpl" in et) else "routine_update" + return "routine_update" + + +def _sentiment(payload: Dict[str, Any]) -> str: + text = " ".join(str(v) for v in payload.values() if isinstance(v, str)).lower() + pos = sum(1 for w in _POSITIVE_KW if w in text) + neg = sum(1 for w in _NEGATIVE_KW if w in text) + return "positive" if pos > neg else ("negative" if neg > pos else "neutral") + + +def _tags(source: SignalSource, p: Dict[str, Any]) -> List[str]: + t = [source.value] + for k in ("entity_type", "event_type"): + if p.get(k): t.append(str(p[k])) + if p.get("high_value"): t.append("high_value") + if p.get("stage") in ("closed_won", "won"): t.append("won") + if p.get("stage") in ("closed_lost", "lost"): t.append("lost") + return t + + +def _sentiment_ar(s: str) -> str: + return {"positive": "إيجابي", "neutral": "محايد", "negative": "سلبي"}.get(s, "محايد") + + +class SignalIntelligence: + """Real-time signal ingestion, scoring, dedup, watchlist matching.""" + + def __init__(self) -> None: + self._events: Dict[str, List[SignalEvent]] = defaultdict(list) + self._watchlists: Dict[str, List[Watchlist]] = defaultdict(list) + self._dedup: Dict[str, datetime] = {} + + async def ingest(self, source: SignalSource, payload: Dict[str, Any], tenant_id: str) -> SignalEvent: + etype = str(payload.get("entity_type", "unknown")) + eid = str(payload.get("entity_id", "")) + norm = {k: v for k, v in { + "source": source.value, "entity_type": etype, "entity_id": eid, + "event_type": payload.get("event_type", "unknown"), + "stage": payload.get("stage"), "value": payload.get("value"), + "name": payload.get("name"), "channel": payload.get("channel"), + }.items() if v is not None} + + event = SignalEvent(source=source, entity_type=etype, entity_id=eid, + raw_payload=payload, normalized=norm, tenant_id=tenant_id, + sentiment=_sentiment(payload), tags=_tags(source, payload)) + + event.is_duplicate = await self.deduplicate(event) + if not event.is_duplicate: + event.importance_score = await self.score_importance(event) + event.urgency_score = await self._score_urgency(event) + buf = self._events[tenant_id] + buf.insert(0, event) + if len(buf) > 5000: + self._events[tenant_id] = buf[:5000] + + matched = await self.check_watchlists(event, tenant_id) + if matched: + logger.info("Signal %s matched %d watchlist(s)", event.id[:8], len(matched)) + return event + + async def deduplicate(self, event: SignalEvent) -> bool: + now = datetime.now(timezone.utc) + self._dedup = {k: v for k, v in self._dedup.items() if v > now} + fp = hashlib.sha256( + f"{event.tenant_id}:{event.source.value}:{event.entity_type}:{event.entity_id}:{event.normalized.get('event_type','')}".encode() + ).hexdigest()[:32] + if fp in self._dedup: + return True + self._dedup[fp] = now + timedelta(hours=1) + return False + + async def score_importance(self, event: SignalEvent) -> float: + base = _IMPORTANCE.get(_classify(event.source, event.raw_payload), 0.15) + if event.raw_payload.get("high_value"): base = min(1.0, base + 0.10) + if event.sentiment == "negative": base = min(1.0, base + 0.05) + return round(base, 2) + + async def _score_urgency(self, event: SignalEvent) -> float: + base = _URGENCY.get(_classify(event.source, event.raw_payload), 0.05) + if event.sentiment == "negative": base = min(1.0, base + 0.10) + return round(base, 2) + + async def check_watchlists(self, event: SignalEvent, tenant_id: str) -> List[Watchlist]: + matched: List[Watchlist] = [] + for wl in self._watchlists.get(tenant_id, []): + if not wl.is_active or event.importance_score < wl.alert_threshold: + continue + if wl.entity_type and wl.entity_type != event.entity_type: + continue + id_ok = (not wl.entity_ids) or (event.entity_id in wl.entity_ids) + kw_ok = True + if wl.keywords: + text = " ".join(str(v) for v in event.raw_payload.values() if isinstance(v, str)).lower() + kw_ok = any(kw.lower() in text for kw in wl.keywords) + if not id_ok and not kw_ok: + continue + matched.append(wl) + return matched + + async def create_watchlist(self, watchlist: Watchlist) -> Watchlist: + self._watchlists[watchlist.tenant_id].append(watchlist) + logger.info("Watchlist '%s' created for tenant %s", watchlist.name, watchlist.tenant_id[:8]) + return watchlist + + async def get_watchlists(self, tenant_id: str) -> List[Watchlist]: + return [w for w in self._watchlists.get(tenant_id, []) if w.is_active] + + async def get_signals(self, tenant_id: str, filters: SignalFilter) -> List[SignalEvent]: + result: List[SignalEvent] = [] + for ev in self._events.get(tenant_id, []): + if filters.source and ev.source != filters.source: continue + if filters.entity_type and ev.entity_type != filters.entity_type: continue + if filters.entity_id and ev.entity_id != filters.entity_id: continue + if ev.importance_score < filters.min_importance: continue + if filters.sentiment and ev.sentiment != filters.sentiment: continue + if filters.tags and not set(filters.tags).issubset(set(ev.tags)): continue + if filters.since and ev.timestamp < filters.since: continue + result.append(ev) + if len(result) >= filters.limit: break + return result + + async def get_entity_summary(self, entity_type: str, entity_id: str, + tenant_id: str, hours: int = 24) -> Dict[str, Any]: + cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) + events = [e for e in self._events.get(tenant_id, []) + if e.entity_type == entity_type and e.entity_id == entity_id and e.timestamp >= cutoff] + if not events: + return {"entity_type": entity_type, "entity_id": entity_id, "hours": hours, + "signal_count": 0, "summary_ar": "لا توجد إشارات في الفترة المحددة"} + + sents = {"positive": 0, "neutral": 0, "negative": 0} + sources: Dict[str, int] = defaultdict(int) + max_imp = 0.0 + all_tags: List[str] = [] + for ev in events: + sents[ev.sentiment] = sents.get(ev.sentiment, 0) + 1 + sources[ev.source.value] += 1 + max_imp = max(max_imp, ev.importance_score) + all_tags.extend(ev.tags) + + dom = max(sents, key=sents.get) # type: ignore[arg-type] + utags = list(set(all_tags))[:15] + parts = [f"عدد الإشارات: {len(events)}", f"أعلى أهمية: {max_imp:.0%}", + f"المشاعر السائدة: {_sentiment_ar(dom)}"] + if "won" in utags: parts.append("تم إغلاق صفقة بنجاح") + if "lost" in utags: parts.append("تم خسارة صفقة") + if "high_value" in utags: parts.append("عميل عالي القيمة") + + return {"entity_type": entity_type, "entity_id": entity_id, "hours": hours, + "signal_count": len(events), "max_importance": max_imp, + "dominant_sentiment": dom, "sentiment_breakdown": sents, + "sources": dict(sources), "tags": utags, + "summary_ar": " | ".join(parts), + "latest_signal": events[0].model_dump() if events else None} + + +_instance: Optional[SignalIntelligence] = None + +def get_signal_intelligence() -> SignalIntelligence: + global _instance + if _instance is None: + _instance = SignalIntelligence() + return _instance diff --git a/salesflow-saas/backend/app/services/skill_registry.py b/salesflow-saas/backend/app/services/skill_registry.py new file mode 100644 index 00000000..3434871f --- /dev/null +++ b/salesflow-saas/backend/app/services/skill_registry.py @@ -0,0 +1,358 @@ +""" +Skill Registry + Runtime — Dealix AI Revenue OS +نظام المهارات: تسجيل وإدارة وتنفيذ مهارات CRM بشكل آمن. +""" +from __future__ import annotations + +import asyncio +import logging +import os +import uuid +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Callable, Coroutine, Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +class ApprovalClass(str, Enum): + AUTO = "auto" + APPROVAL_REQUIRED = "approval_required" + FORBIDDEN = "forbidden" + + +class SkillCategory(str, Enum): + CRM = "crm" + MESSAGING = "messaging" + ANALYTICS = "analytics" + CONTENT = "content" + ADMIN = "admin" + COMPLIANCE = "compliance" + + +class ExecutionStatus(str, Enum): + SUCCESS = "success" + FAILED = "failed" + PENDING_APPROVAL = "pending_approval" + FORBIDDEN = "forbidden" + SKIPPED = "skipped" + + +class SkillDefinition(BaseModel): + id: str + name: str + name_ar: str + description: str + description_ar: str = "" + category: SkillCategory + approval_class: ApprovalClass = ApprovalClass.AUTO + is_read_only: bool = False + commands: list[str] = [] + required_secrets: list[str] = [] + health_check: Optional[Callable[[], Coroutine[Any, Any, bool]]] = Field(default=None, exclude=True) + is_enabled: bool = True + version: str = "1.0.0" + model_config = {"arbitrary_types_allowed": True} + + +class UserContext(BaseModel): + user_id: str + tenant_id: str + role: str = "member" + permissions: list[str] = [] + + +class SkillResult(BaseModel): + run_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + skill_id: str + command: str + status: ExecutionStatus + data: dict[str, Any] = {} + evidence: list[str] = [] + error: Optional[str] = None + started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + completed_at: Optional[datetime] = None + duration_ms: Optional[int] = None + approval_request_id: Optional[str] = None + + +class SkillHealthReport(BaseModel): + skill_id: str + healthy: bool + checked_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + error: Optional[str] = None + + +class SkillRegistry: + """Manages all registered domain skills.""" + + def __init__(self) -> None: + self._skills: dict[str, SkillDefinition] = {} + self._handlers: dict[str, Callable[..., Coroutine[Any, Any, dict]]] = {} + + def register(self, skill: SkillDefinition, handler: Optional[Callable] = None) -> None: + self._skills[skill.id] = skill + if handler: + self._handlers[skill.id] = handler + logger.info("تسجيل مهارة: %s [%s] v%s", skill.id, skill.category.value, skill.version) + + def get(self, skill_id: str) -> Optional[SkillDefinition]: + return self._skills.get(skill_id) + + def list_all(self) -> list[SkillDefinition]: + return list(self._skills.values()) + + def list_by_category(self, category: str | SkillCategory) -> list[SkillDefinition]: + cat = category if isinstance(category, str) else category.value + return [s for s in self._skills.values() if s.category.value == cat] + + def enable(self, skill_id: str) -> bool: + s = self._skills.get(skill_id) + if not s: + return False + s.is_enabled = True + return True + + def disable(self, skill_id: str) -> bool: + s = self._skills.get(skill_id) + if not s: + return False + s.is_enabled = False + return True + + async def health_check_all(self) -> list[SkillHealthReport]: + reports: list[SkillHealthReport] = [] + for sid, skill in self._skills.items(): + if skill.health_check is not None: + try: + healthy = await skill.health_check() + reports.append(SkillHealthReport(skill_id=sid, healthy=healthy)) + except Exception as exc: + reports.append(SkillHealthReport(skill_id=sid, healthy=False, error=str(exc))) + else: + reports.append(SkillHealthReport(skill_id=sid, healthy=skill.is_enabled)) + return reports + + def get_handler(self, skill_id: str) -> Optional[Callable]: + return self._handlers.get(skill_id) + + +class SkillRuntime: + """Executes skills safely with validation, logging, and approval gating.""" + + def __init__(self, registry: SkillRegistry) -> None: + self._registry = registry + self._execution_log: list[SkillResult] = [] + self._max_log = 5000 + self._pending_approvals: dict[str, dict[str, Any]] = {} + + async def execute(self, skill_id: str, command: str, params: dict[str, Any], + user_context: UserContext) -> SkillResult: + run_id, start = str(uuid.uuid4()), datetime.now(timezone.utc) + skill = self._registry.get(skill_id) + if not skill: + return self._finish(SkillResult(run_id=run_id, skill_id=skill_id, command=command, + status=ExecutionStatus.FAILED, error=f"مهارة غير موجودة: {skill_id}"), start) + if not skill.is_enabled: + return self._finish(SkillResult(run_id=run_id, skill_id=skill_id, command=command, + status=ExecutionStatus.SKIPPED, error="المهارة معطلة حالياً"), start) + if command not in skill.commands: + return self._finish(SkillResult(run_id=run_id, skill_id=skill_id, command=command, + status=ExecutionStatus.FAILED, + error=f"أمر غير مدعوم: {command}. المتاحة: {skill.commands}"), start) + if skill.approval_class == ApprovalClass.FORBIDDEN: + return self._finish(SkillResult(run_id=run_id, skill_id=skill_id, command=command, + status=ExecutionStatus.FORBIDDEN, error="محظورة"), start) + missing = [s for s in skill.required_secrets if not os.environ.get(s)] + if missing: + return self._finish(SkillResult(run_id=run_id, skill_id=skill_id, command=command, + status=ExecutionStatus.FAILED, error=f"متغيرات بيئة مفقودة: {missing}"), start) + if skill.approval_class == ApprovalClass.APPROVAL_REQUIRED: + aid = str(uuid.uuid4()) + self._pending_approvals[aid] = {"run_id": run_id, "skill_id": skill_id, "command": command, + "params": params, "user_context": user_context.model_dump(), + "requested_at": start.isoformat()} + logger.info("[SkillRuntime] طلب موافقة run=%s skill=%s approval=%s", run_id, skill_id, aid) + return self._finish(SkillResult(run_id=run_id, skill_id=skill_id, command=command, + status=ExecutionStatus.PENDING_APPROVAL, approval_request_id=aid, + evidence=[f"بانتظار الموافقة: {aid}"]), start) + handler = self._registry.get_handler(skill_id) + if not handler: + return self._finish(SkillResult(run_id=run_id, skill_id=skill_id, command=command, + status=ExecutionStatus.FAILED, error="لا يوجد معالج مسجل"), start) + try: + data = await handler(command=command, params=params, user_context=user_context) + return self._finish(SkillResult(run_id=run_id, skill_id=skill_id, command=command, + status=ExecutionStatus.SUCCESS, data=data, + evidence=[f"تم التنفيذ بنجاح عبر {skill.name}"]), start) + except Exception as exc: + logger.exception("[SkillRuntime] خطأ %s: %s", skill_id, exc) + return self._finish(SkillResult(run_id=run_id, skill_id=skill_id, command=command, + status=ExecutionStatus.FAILED, error=str(exc), + evidence=[f"فشل: {type(exc).__name__}"]), start) + + async def execute_approved(self, approval_id: str) -> SkillResult: + pending = self._pending_approvals.pop(approval_id, None) + if not pending: + return SkillResult(run_id=str(uuid.uuid4()), skill_id="unknown", command="unknown", + status=ExecutionStatus.FAILED, error=f"طلب موافقة غير موجود: {approval_id}") + ctx = UserContext(**pending["user_context"]) + handler = self._registry.get_handler(pending["skill_id"]) + start = datetime.now(timezone.utc) + if not handler: + return self._finish(SkillResult(run_id=pending["run_id"], skill_id=pending["skill_id"], + command=pending["command"], status=ExecutionStatus.FAILED, + error="لا يوجد معالج مسجل"), start) + try: + data = await handler(command=pending["command"], params=pending["params"], user_context=ctx) + return self._finish(SkillResult(run_id=pending["run_id"], skill_id=pending["skill_id"], + command=pending["command"], status=ExecutionStatus.SUCCESS, + data=data, evidence=["تم التنفيذ بعد الموافقة"]), start) + except Exception as exc: + return self._finish(SkillResult(run_id=pending["run_id"], skill_id=pending["skill_id"], + command=pending["command"], status=ExecutionStatus.FAILED, + error=str(exc)), start) + + async def execute_background(self, skill_id: str, command: str, params: dict[str, Any], + user_context: UserContext) -> str: + run_id = str(uuid.uuid4()) + asyncio.create_task(self._bg_run(run_id, skill_id, command, params, user_context)) + return run_id + + async def _bg_run(self, run_id: str, skill_id: str, command: str, + params: dict[str, Any], ctx: UserContext) -> None: + try: + r = await self.execute(skill_id, command, params, ctx) + r.run_id = run_id + except Exception as exc: + logger.exception("[SkillRuntime] فشل خلفي: %s", exc) + + def list_pending_approvals(self) -> list[dict[str, Any]]: + return [{"approval_id": k, **v} for k, v in self._pending_approvals.items()] + + def get_execution_log(self, last_n: int = 50) -> list[SkillResult]: + return self._execution_log[-last_n:] + + def _finish(self, result: SkillResult, start: datetime) -> SkillResult: + now = datetime.now(timezone.utc) + result.completed_at = now + result.duration_ms = int((now - start).total_seconds() * 1000) + self._execution_log.append(result) + if len(self._execution_log) > self._max_log: + self._execution_log = self._execution_log[-self._max_log:] + logger.info("[SkillRuntime] %s run=%s skill=%s cmd=%s %dms", + result.status.value, result.run_id, result.skill_id, result.command, result.duration_ms) + return result + + +# ── Built-in CRM skill handlers ──────────────────────────────────── + +async def _h_lead_qualify(command: str, params: dict, user_context: UserContext) -> dict: + return {"lead_id": params.get("lead_id"), "qualified": True, "score": 72, + "reason_ar": "العميل أبدى اهتماماً واضحاً ولديه ميزانية مناسبة", "next_step": "schedule_demo"} + + +async def _h_lead_score(command: str, params: dict, user_context: UserContext) -> dict: + return {"lead_id": params.get("lead_id"), "score": 68, + "factors": {"engagement": 0.8, "fit": 0.7, "budget": 0.6, "timing": 0.5}, "tier": "A"} + + +async def _h_lead_assign(command: str, params: dict, user_context: UserContext) -> dict: + return {"lead_id": params.get("lead_id"), "assigned_to": params.get("rep_id"), + "reason_ar": "تم التعيين بناءً على التخصص وحمل العمل الحالي"} + + +async def _h_deal_forecast(command: str, params: dict, user_context: UserContext) -> dict: + return {"deal_id": params.get("deal_id"), "forecast_amount": 150_000, "currency": "SAR", + "probability": 0.65, "expected_close": "2026-05-15", "risk_factors_ar": ["تأخر في الرد", "منافس نشط"]} + + +async def _h_whatsapp_send(command: str, params: dict, user_context: UserContext) -> dict: + return {"phone": params.get("phone"), "message_preview": (params.get("message", ""))[:100], + "status": "queued", "message_id": str(uuid.uuid4())} + + +async def _h_sequence_enroll(command: str, params: dict, user_context: UserContext) -> dict: + return {"lead_id": params.get("lead_id"), "sequence_id": params.get("sequence_id"), + "status": "enrolled", "next_step_at": "2026-04-12T09:00:00+03:00"} + + +async def _h_pipeline_summary(command: str, params: dict, user_context: UserContext) -> dict: + return {"total_deals": 47, "total_value": 2_350_000, "currency": "SAR", + "by_stage": {"prospecting": 12, "qualification": 10, "proposal": 8, + "negotiation": 9, "closed_won": 5, "closed_lost": 3}, + "at_risk": 4, "summary_ar": "خط الأنابيب بحالة جيدة. 4 صفقات تحتاج متابعة عاجلة."} + + +async def _h_forecast_generate(command: str, params: dict, user_context: UserContext) -> dict: + return {"period": params.get("period", "Q2-2026"), "forecast_revenue": 1_800_000, + "currency": "SAR", "confidence": 0.72, + "summary_ar": "التوقعات إيجابية مع احتمال تحقيق الهدف بنسبة 72%"} + + +async def _h_consent_check(command: str, params: dict, user_context: UserContext) -> dict: + return {"entity_id": params.get("entity_id"), "channel": params.get("channel", "whatsapp"), + "has_consent": True, "consent_purpose": "marketing", + "expires_at": "2027-04-11T00:00:00+03:00", "pdpl_compliant": True} + + +async def _h_data_export(command: str, params: dict, user_context: UserContext) -> dict: + return {"entity_id": params.get("entity_id"), "format": params.get("format", "json"), + "status": "export_ready", "download_url": f"/api/v1/exports/{uuid.uuid4()}", "expires_in_hours": 24} + + +async def _h_tenant_update(command: str, params: dict, user_context: UserContext) -> dict: + return {"tenant_id": user_context.tenant_id, + "updated_fields": list(params.get("settings", {}).keys()), "status": "updated"} + + +# ── Default registry factory ─────────────────────────────────────── + +_BUILTIN_SKILLS: list[tuple[dict, Callable]] = [ + ({"id": "crm.lead.qualify", "name": "Qualify Lead", "name_ar": "تأهيل عميل محتمل", + "description": "Qualify a lead using AI", "description_ar": "تأهيل عميل محتمل بالذكاء الاصطناعي", + "category": "crm", "approval_class": "auto", "commands": ["qualify", "re_qualify"]}, _h_lead_qualify), + ({"id": "crm.lead.score", "name": "Score Lead", "name_ar": "تقييم عميل محتمل", + "description": "Score a lead", "description_ar": "تقييم عميل محتمل", + "category": "crm", "approval_class": "auto", "is_read_only": True, "commands": ["score", "rescore"]}, _h_lead_score), + ({"id": "crm.lead.assign", "name": "Assign Lead", "name_ar": "تعيين عميل محتمل", + "description": "Assign lead to rep", "description_ar": "تعيين عميل محتمل لممثل مبيعات", + "category": "crm", "approval_class": "approval_required", "commands": ["assign", "reassign"]}, _h_lead_assign), + ({"id": "crm.deal.forecast", "name": "Forecast Deal", "name_ar": "توقع الصفقة", + "description": "Forecast deal outcome", "description_ar": "توقع نتيجة الصفقة", + "category": "crm", "approval_class": "auto", "is_read_only": True, "commands": ["forecast", "refresh"]}, _h_deal_forecast), + ({"id": "messaging.whatsapp.send", "name": "Send WhatsApp", "name_ar": "إرسال واتساب", + "description": "Send WhatsApp message", "description_ar": "إرسال رسالة واتساب", + "category": "messaging", "approval_class": "approval_required", + "commands": ["send", "send_template"], "required_secrets": ["WHATSAPP_API_TOKEN"]}, _h_whatsapp_send), + ({"id": "messaging.sequence.enroll", "name": "Enroll in Sequence", "name_ar": "تسجيل في تسلسل", + "description": "Enroll lead in sequence", "description_ar": "تسجيل عميل محتمل في تسلسل آلي", + "category": "messaging", "approval_class": "approval_required", "commands": ["enroll", "unenroll"]}, _h_sequence_enroll), + ({"id": "analytics.pipeline.summary", "name": "Pipeline Summary", "name_ar": "ملخص خط الأنابيب", + "description": "Pipeline summary", "description_ar": "ملخص خط أنابيب المبيعات", + "category": "analytics", "approval_class": "auto", "is_read_only": True, "commands": ["summary", "detailed"]}, _h_pipeline_summary), + ({"id": "analytics.forecast.generate", "name": "Generate Forecast", "name_ar": "إنشاء توقعات", + "description": "Revenue forecast", "description_ar": "توقعات الإيرادات", + "category": "analytics", "approval_class": "auto", "is_read_only": True, "commands": ["generate", "compare"]}, _h_forecast_generate), + ({"id": "compliance.consent.check", "name": "Check Consent", "name_ar": "التحقق من الموافقة", + "description": "Check PDPL consent", "description_ar": "التحقق من موافقة PDPL", + "category": "compliance", "approval_class": "auto", "is_read_only": True, "commands": ["check", "audit"]}, _h_consent_check), + ({"id": "compliance.data.export", "name": "Export Customer Data", "name_ar": "تصدير بيانات العميل", + "description": "Export data per PDPL request", "description_ar": "تصدير بيانات بناءً على طلب صاحب البيانات", + "category": "compliance", "approval_class": "approval_required", "is_read_only": True, "commands": ["export", "preview"]}, _h_data_export), + ({"id": "admin.tenant.update", "name": "Update Tenant", "name_ar": "تحديث إعدادات المستأجر", + "description": "Update tenant settings", "description_ar": "تحديث إعدادات المستأجر", + "category": "admin", "approval_class": "approval_required", "commands": ["update", "reset"]}, _h_tenant_update), +] + + +def build_default_registry() -> tuple[SkillRegistry, SkillRuntime]: + """Create registry with all built-in Dealix CRM skills.""" + registry = SkillRegistry() + for spec, handler in _BUILTIN_SKILLS: + registry.register(SkillDefinition(**spec), handler) + runtime = SkillRuntime(registry) + logger.info("تم تهيئة سجل المهارات: %d مهارة مسجلة", len(_BUILTIN_SKILLS)) + return registry, runtime