diff --git a/salesflow-saas/backend/app/api/v1/intelligence.py b/salesflow-saas/backend/app/api/v1/intelligence.py index 04b9abee..1fed1941 100644 --- a/salesflow-saas/backend/app/api/v1/intelligence.py +++ b/salesflow-saas/backend/app/api/v1/intelligence.py @@ -1,13 +1,11 @@ """ Intelligence API — Signals, alerts, behaviour patterns, recommendations, -escalations. Wires the signal_intelligence, alert_delivery and +escalations. Wires signal_intelligence, alert_delivery and behavior_intelligence services into FastAPI endpoints. """ - from __future__ import annotations -import logging -import uuid +import logging, uuid from datetime import datetime, timezone from typing import List, Optional @@ -15,35 +13,22 @@ from fastapi import APIRouter, BackgroundTasks, HTTPException, Query from pydantic import BaseModel, Field from app.services.signal_intelligence import ( - SignalSource, - SignalEvent, - SignalFilter, - Watchlist, - get_signal_intelligence, -) -from app.services.alert_delivery import ( - AlertUrgency, - get_alert_delivery, -) -from app.services.behavior_intelligence import ( - get_behavior_intelligence, + SignalSource, SignalFilter, Watchlist, get_signal_intelligence, ) +from app.services.alert_delivery import AlertUrgency, get_alert_delivery +from app.services.behavior_intelligence import get_behavior_intelligence logger = logging.getLogger("dealix.api.intelligence") - router = APIRouter(prefix="/intelligence", tags=["Intelligence"]) -# --------------------------------------------------------------------------- -# Request / Response schemas -# --------------------------------------------------------------------------- +# ── Request / Response schemas ────────────────────────────────────────── class IngestRequest(BaseModel): source: SignalSource payload: dict tenant_id: str - class WatchlistCreate(BaseModel): tenant_id: str name: str @@ -54,17 +39,14 @@ class WatchlistCreate(BaseModel): alert_threshold: float = 0.5 channels: List[str] = Field(default=["dashboard"]) - -class AcknowledgeRequest(BaseModel): +class AckRequest(BaseModel): user_id: str - -class EscalationResolveRequest(BaseModel): +class EscalationResolve(BaseModel): user_id: str resolution: str resolution_ar: str = "" - class _Escalation(BaseModel): id: str = Field(default_factory=lambda: uuid.uuid4().hex) tenant_id: str @@ -75,208 +57,108 @@ class _Escalation(BaseModel): entity_type: str = "" entity_id: str = "" assigned_to: Optional[str] = None - status: str = "open" # open, resolved + status: str = "open" resolved_by: Optional[str] = None resolved_at: Optional[datetime] = None resolution: Optional[str] = None created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - -# In-memory escalation store (per-tenant) _escalations: dict[str, list[_Escalation]] = {} -# --------------------------------------------------------------------------- -# Signals -# --------------------------------------------------------------------------- - +# ── Signals ───────────────────────────────────────────────────────────── @router.post("/signals/ingest", summary="Ingest a raw signal event") async def ingest_signal(req: IngestRequest): - """Normalise, score, deduplicate and store a signal from any source.""" - engine = get_signal_intelligence() - event = await engine.ingest(req.source, req.payload, req.tenant_id) + event = await get_signal_intelligence().ingest(req.source, req.payload, req.tenant_id) return event.model_dump() - @router.get("/signals", summary="List recent signals with filters") -async def list_signals( - tenant_id: str, - source: Optional[SignalSource] = None, - entity_type: Optional[str] = None, - entity_id: Optional[str] = None, - min_importance: float = 0.0, - sentiment: Optional[str] = None, - limit: int = Query(default=50, le=200), -): - """Return signals matching the provided filters, most recent first.""" - engine = get_signal_intelligence() - filters = SignalFilter( - source=source, - entity_type=entity_type, - entity_id=entity_id, - min_importance=min_importance, - sentiment=sentiment, - limit=limit, - ) - events = await engine.get_signals(tenant_id, filters) - return { - "count": len(events), - "signals": [e.model_dump() for e in events], - } +async def list_signals(tenant_id: str, source: Optional[SignalSource] = None, + entity_type: Optional[str] = None, entity_id: Optional[str] = None, + min_importance: float = 0.0, sentiment: Optional[str] = None, + limit: int = Query(default=50, le=200)): + f = SignalFilter(source=source, entity_type=entity_type, entity_id=entity_id, + min_importance=min_importance, sentiment=sentiment, limit=limit) + events = await get_signal_intelligence().get_signals(tenant_id, f) + return {"count": len(events), "signals": [e.model_dump() for e in events]} + +@router.get("/signals/{entity_type}/{entity_id}", summary="Entity signal summary") +async def entity_signal_summary(entity_type: str, entity_id: str, tenant_id: str, + hours: int = Query(default=24, le=720)): + return await get_signal_intelligence().get_entity_summary(entity_type, entity_id, tenant_id, hours) -@router.get( - "/signals/{entity_type}/{entity_id}", - summary="Signal summary for a specific entity", -) -async def entity_signal_summary( - entity_type: str, - entity_id: str, - tenant_id: str, - hours: int = Query(default=24, le=720), -): - """Summarise all signals for a given entity in the last N hours.""" - engine = get_signal_intelligence() - summary = await engine.get_entity_summary(entity_type, entity_id, tenant_id, hours) - return summary - - -# --------------------------------------------------------------------------- -# Watchlists -# --------------------------------------------------------------------------- - +# ── Watchlists ────────────────────────────────────────────────────────── @router.post("/watchlists", summary="Create a signal watchlist") async def create_watchlist(body: WatchlistCreate): - """Create a watchlist that triggers alerts when matching signals arrive.""" - engine = get_signal_intelligence() - wl = Watchlist( - tenant_id=body.tenant_id, - name=body.name, - name_ar=body.name_ar, - entity_type=body.entity_type, - entity_ids=body.entity_ids, - keywords=body.keywords, - alert_threshold=body.alert_threshold, - channels=body.channels, - ) - created = await engine.create_watchlist(wl) - return created.model_dump() - + wl = Watchlist(tenant_id=body.tenant_id, name=body.name, name_ar=body.name_ar, + entity_type=body.entity_type, entity_ids=body.entity_ids, + keywords=body.keywords, alert_threshold=body.alert_threshold, channels=body.channels) + return (await get_signal_intelligence().create_watchlist(wl)).model_dump() @router.get("/watchlists", summary="List active watchlists") async def list_watchlists(tenant_id: str): - engine = get_signal_intelligence() - items = await engine.get_watchlists(tenant_id) + items = await get_signal_intelligence().get_watchlists(tenant_id) return {"count": len(items), "watchlists": [w.model_dump() for w in items]} -# --------------------------------------------------------------------------- -# Alerts -# --------------------------------------------------------------------------- - +# ── Alerts ────────────────────────────────────────────────────────────── @router.get("/alerts", summary="Pending alerts for a user") -async def pending_alerts( - tenant_id: str, - user_id: Optional[str] = None, -): - """Return all unacknowledged alerts, optionally filtered by user.""" - delivery = get_alert_delivery() - alerts = await delivery.get_pending(tenant_id, user_id) - return { - "count": len(alerts), - "alerts": [a.model_dump() for a in alerts], - } - +async def pending_alerts(tenant_id: str, user_id: Optional[str] = None): + alerts = await get_alert_delivery().get_pending(tenant_id, user_id) + return {"count": len(alerts), "alerts": [a.model_dump() for a in alerts]} @router.post("/alerts/{alert_id}/acknowledge", summary="Acknowledge an alert") -async def acknowledge_alert(alert_id: str, body: AcknowledgeRequest): - delivery = get_alert_delivery() - ok = await delivery.acknowledge(alert_id, body.user_id) - if not ok: +async def acknowledge_alert(alert_id: str, body: AckRequest): + if not await get_alert_delivery().acknowledge(alert_id, body.user_id): raise HTTPException(404, "التنبيه غير موجود") return {"acknowledged": True, "alert_id": alert_id} - @router.get("/alerts/stats", summary="Alert delivery statistics") async def alert_stats(tenant_id: str): - delivery = get_alert_delivery() - return await delivery.get_delivery_stats(tenant_id) + return await get_alert_delivery().get_delivery_stats(tenant_id) -# --------------------------------------------------------------------------- -# Digest -# --------------------------------------------------------------------------- - +# ── Digest ────────────────────────────────────────────────────────────── @router.get("/digest", summary="Generate Arabic alert digest") -async def generate_digest( - tenant_id: str, - user_id: Optional[str] = None, - period: str = Query(default="daily", regex="^(daily|weekly)$"), -): - """Compile unacknowledged alerts into an Arabic summary.""" - delivery = get_alert_delivery() - return await delivery.generate_digest(tenant_id, user_id, period) +async def generate_digest(tenant_id: str, user_id: Optional[str] = None, + period: str = Query(default="daily", regex="^(daily|weekly)$")): + return await get_alert_delivery().generate_digest(tenant_id, user_id, period) -# --------------------------------------------------------------------------- -# Behavior Patterns -# --------------------------------------------------------------------------- - +# ── Behavior Patterns ────────────────────────────────────────────────── @router.get("/patterns", summary="Detected behaviour patterns") async def detected_patterns(tenant_id: str): - """Return all detected patterns: rep performance, sequences, risks.""" bi = get_behavior_intelligence() - rep = await bi.analyze_rep_performance(tenant_id) seq = await bi.analyze_winning_sequences(tenant_id) risk = await bi.detect_at_risk_patterns(tenant_id) timing = await bi.analyze_best_contact_times(tenant_id) - - all_patterns = [p.model_dump() for p in rep + seq + risk] - - return { - "count": len(all_patterns), - "patterns": all_patterns, - "best_contact_times": timing, - } - + all_p = [p.model_dump() for p in rep + seq + risk] + return {"count": len(all_p), "patterns": all_p, "best_contact_times": timing} @router.get("/recommendations", summary="AI recommendations in Arabic") async def recommendations(tenant_id: str): - """Generate actionable Arabic recommendations based on detected patterns.""" - bi = get_behavior_intelligence() - recs = await bi.get_recommendations(tenant_id) - return { - "count": len(recs), - "recommendations": recs, - } + recs = await get_behavior_intelligence().get_recommendations(tenant_id) + return {"count": len(recs), "recommendations": recs} -# --------------------------------------------------------------------------- -# Escalations -# --------------------------------------------------------------------------- - +# ── Escalations ──────────────────────────────────────────────────────── @router.get("/escalations", summary="Pending escalations") async def pending_escalations(tenant_id: str): - """List all open escalations for a tenant.""" items = [e for e in _escalations.get(tenant_id, []) if e.status == "open"] - return { - "count": len(items), - "escalations": [e.model_dump() for e in items], - } - + return {"count": len(items), "escalations": [e.model_dump() for e in items]} @router.post("/escalations/{escalation_id}/resolve", summary="Resolve an escalation") -async def resolve_escalation(escalation_id: str, body: EscalationResolveRequest): - """Mark an escalation as resolved with a resolution note.""" - for esc_list in _escalations.values(): - for esc in esc_list: +async def resolve_escalation(escalation_id: str, body: EscalationResolve): + for lst in _escalations.values(): + for esc in lst: if esc.id == escalation_id: if esc.status == "resolved": return {"already_resolved": True, "id": escalation_id} @@ -284,64 +166,26 @@ async def resolve_escalation(escalation_id: str, body: EscalationResolveRequest) esc.resolved_by = body.user_id esc.resolved_at = datetime.now(timezone.utc) esc.resolution = body.resolution or body.resolution_ar - logger.info( - "Escalation %s resolved by %s", - escalation_id[:8], body.user_id[:8], - ) return {"resolved": True, "id": escalation_id} - raise HTTPException(404, "التصعيد غير موجود") -# --------------------------------------------------------------------------- -# Helper: create escalation (called internally by signal/alert pipeline) -# --------------------------------------------------------------------------- - - -async def create_escalation( - tenant_id: str, - title: str, - title_ar: str, - reason: str, - reason_ar: str, - entity_type: str = "", - entity_id: str = "", - assigned_to: Optional[str] = None, -) -> dict: - """Programmatic escalation creation (not exposed as public endpoint).""" - esc = _Escalation( - tenant_id=tenant_id, - title=title, - title_ar=title_ar, - reason=reason, - reason_ar=reason_ar, - entity_type=entity_type, - entity_id=entity_id, - assigned_to=assigned_to, - ) +async def create_escalation(tenant_id: str, title: str, title_ar: str, + reason: str, reason_ar: str, entity_type: str = "", + entity_id: str = "", assigned_to: Optional[str] = None) -> dict: + """Internal helper — creates an escalation and fires a HIGH alert.""" + esc = _Escalation(tenant_id=tenant_id, title=title, title_ar=title_ar, + reason=reason, reason_ar=reason_ar, entity_type=entity_type, + entity_id=entity_id, assigned_to=assigned_to) _escalations.setdefault(tenant_id, []).insert(0, esc) - - # Fire an alert via the delivery service - delivery = get_alert_delivery() - await delivery.send_from_template( - template_key="escalation", - tenant_id=tenant_id, - urgency=AlertUrgency.HIGH, - category="compliance", - user_id=assigned_to, - requires_ack=True, - title=title_ar, - reason=reason_ar, - ) - + await get_alert_delivery().send_from_template( + "escalation", tenant_id, AlertUrgency.HIGH, category="compliance", + user_id=assigned_to, requires_ack=True, title=title_ar, reason=reason_ar) logger.info("Escalation created: %s for tenant %s", esc.id[:8], tenant_id[:8]) return esc.model_dump() -# --------------------------------------------------------------------------- -# Legacy endpoints preserved for backward compatibility -# --------------------------------------------------------------------------- - +# ── Legacy endpoints (backward compat) ───────────────────────────────── class LeadInput(BaseModel): id: str = "lead_001" @@ -352,7 +196,6 @@ class LeadInput(BaseModel): company_website: Optional[str] = None source: str = "whatsapp" - class MeetingReport(BaseModel): lead_id: str contact_name: str @@ -361,99 +204,63 @@ class MeetingReport(BaseModel): meeting_notes: str outcome: str = "follow_up_needed" - def _groq_key(): import os key = os.getenv("GROQ_API_KEY", "") - if not key: - raise HTTPException(500, "GROQ_API_KEY missing") + if not key: raise HTTPException(500, "GROQ_API_KEY missing") return key - @router.post("/run-pipeline") async def run_lead_pipeline(lead_input: LeadInput): from app.services.lead_pipeline import DealixLeadPipeline, Lead, Company - pipeline = DealixLeadPipeline(_groq_key()) - lead = Lead( - id=lead_input.id, - contact_name=lead_input.contact_name, - contact_phone=lead_input.contact_phone, - contact_title=lead_input.contact_title, - company=Company(name=lead_input.company_name, website=lead_input.company_website), - source=lead_input.source, - ) - return await pipeline.run_full_pipeline(lead) - + p = DealixLeadPipeline(_groq_key()) + lead = Lead(id=lead_input.id, contact_name=lead_input.contact_name, + contact_phone=lead_input.contact_phone, contact_title=lead_input.contact_title, + company=Company(name=lead_input.company_name, website=lead_input.company_website), + source=lead_input.source) + return await p.run_full_pipeline(lead) @router.post("/executive-report") -async def generate_executive_report(report_data: MeetingReport): +async def generate_executive_report(r: MeetingReport): from app.services.lead_pipeline import DealixLeadPipeline, Lead, Company - pipeline = DealixLeadPipeline(_groq_key()) - lead = Lead( - id=report_data.lead_id, - contact_name=report_data.contact_name, - contact_phone=report_data.contact_phone, - company=Company(name=report_data.company_name), - ) - return await pipeline.generate_executive_report( - lead, report_data.meeting_notes, report_data.outcome, - ) - + p = DealixLeadPipeline(_groq_key()) + lead = Lead(id=r.lead_id, contact_name=r.contact_name, contact_phone=r.contact_phone, + company=Company(name=r.company_name)) + return await p.generate_executive_report(lead, r.meeting_notes, r.outcome) @router.get("/system-report") async def get_system_intelligence_report(): from app.services.autonomous_core import get_autonomous_core - core = get_autonomous_core(_groq_key()) - return await core.get_full_intelligence_report() - + return await get_autonomous_core(_groq_key()).get_full_intelligence_report() @router.post("/improve") async def trigger_self_improvement(background_tasks: BackgroundTasks): from app.services.autonomous_core import get_autonomous_core core = get_autonomous_core(_groq_key()) - - async def run_improvement(): - await core.improver.analyze_and_improve({"triggered": "manual"}) - - background_tasks.add_task(run_improvement) + background_tasks.add_task(core.improver.analyze_and_improve, {"triggered": "manual"}) return {"status": "improvement_cycle_started", "message": "النظام يحلل نفسه ويتحسن..."} - @router.get("/financial-forecast") async def get_financial_forecast(): from app.services.autonomous_core import get_autonomous_core - core = get_autonomous_core(_groq_key()) - return await core.financial.generate_financial_forecast({ - "timestamp": "now", "pipeline": "active", - }) - + return await get_autonomous_core(_groq_key()).financial.generate_financial_forecast( + {"timestamp": "now", "pipeline": "active"}) @router.get("/market-expansion") async def get_expansion_opportunities(): from app.services.autonomous_core import get_autonomous_core - core = get_autonomous_core(_groq_key()) - return await core.strategic.analyze_market_opportunity({ - "market": "Saudi Arabia", - "current_sectors": ["عقارات", "تقنية", "صحة"], - }) - + return await get_autonomous_core(_groq_key()).strategic.analyze_market_opportunity( + {"market": "Saudi Arabia", "current_sectors": ["عقارات", "تقنية", "صحة"]}) @router.get("/growth-plan") async def get_90_day_growth_plan(): from app.services.autonomous_core import get_autonomous_core - core = get_autonomous_core(_groq_key()) - return await core.strategic.generate_growth_plan({ - "current_stage": "early_growth", "market": "KSA", - }) - + return await get_autonomous_core(_groq_key()).strategic.generate_growth_plan( + {"current_stage": "early_growth", "market": "KSA"}) @router.get("/health") async def system_health(): from app.services.autonomous_core import get_autonomous_core - core = get_autonomous_core(_groq_key()) - return { - "health": core.healer.get_system_health(), - "autonomous_cycle": core._cycle_count, - "improvements_applied": len(core.improver.improvements_log), - "status": "AUTONOMOUS_RUNNING", - } + c = get_autonomous_core(_groq_key()) + return {"health": c.healer.get_system_health(), "autonomous_cycle": c._cycle_count, + "improvements_applied": len(c.improver.improvements_log), "status": "AUTONOMOUS_RUNNING"} diff --git a/salesflow-saas/backend/app/services/alert_delivery.py b/salesflow-saas/backend/app/services/alert_delivery.py index 8ade7d27..93741e97 100644 --- a/salesflow-saas/backend/app/services/alert_delivery.py +++ b/salesflow-saas/backend/app/services/alert_delivery.py @@ -1,31 +1,24 @@ """ -Alert Delivery Service — Multi-channel alert routing with urgency-based -channel selection, acknowledgement tracking, and Arabic digest generation. +Alert Delivery — Multi-channel routing with urgency-based channel selection, +acknowledgement tracking, and Arabic digest generation for Dealix CRM. -Channel routing matrix: +Channel matrix: CRITICAL : dashboard + whatsapp + email + sms HIGH : dashboard + whatsapp MEDIUM : dashboard + email - LOW : dashboard (collected for daily digest) + LOW : dashboard (daily digest) """ - from __future__ import annotations -import logging -import uuid +import logging, uuid from collections import defaultdict from datetime import datetime, timedelta, timezone from enum import Enum from typing import Any, Dict, List, Optional - from pydantic import BaseModel, Field logger = logging.getLogger("dealix.services.alert_delivery") -# --------------------------------------------------------------------------- -# Enums & Models -# --------------------------------------------------------------------------- - class AlertUrgency(str, Enum): CRITICAL = "critical" @@ -46,12 +39,12 @@ class Alert(BaseModel): id: str = Field(default_factory=lambda: uuid.uuid4().hex) tenant_id: str user_id: Optional[str] = None - title: str - title_ar: str - body: str - body_ar: str + title: str = "" + title_ar: str = "" + body: str = "" + body_ar: str = "" urgency: AlertUrgency = AlertUrgency.MEDIUM - category: str = "system" # lead, deal, system, compliance, security + category: str = "system" channels: List[AlertChannel] = [AlertChannel.DASHBOARD] action_url: Optional[str] = None action_label: Optional[str] = None @@ -62,360 +55,157 @@ class Alert(BaseModel): metadata: Dict[str, Any] = {} -# --------------------------------------------------------------------------- -# Arabic alert templates -# --------------------------------------------------------------------------- - -ALERT_TEMPLATES: Dict[str, Dict[str, str]] = { - "new_lead": { - "title_ar": "عميل محتمل جديد", - "body_ar": "عميل محتمل جديد: {name} من {source}", - }, - "deal_won": { - "title_ar": "صفقة ناجحة", - "body_ar": "تم إغلاق صفقة: {title} بقيمة {value} ر.س", - }, - "deal_at_risk": { - "title_ar": "صفقة معرضة للخطر", - "body_ar": "صفقة معرضة للخطر: {title} - لا نشاط منذ {days} أيام", - }, - "consent_expiring": { - "title_ar": "موافقة PDPL تنتهي قريبا", - "body_ar": "موافقة PDPL تنتهي خلال {days} أيام للعميل {name}", - }, - "escalation": { - "title_ar": "تصعيد يتطلب انتباهك", - "body_ar": "يحتاج تدخلك: {title} - {reason}", - }, - "sequence_complete": { - "title_ar": "تسلسل مكتمل", - "body_ar": "اكتمل تسلسل {name} للعميل {lead_name}", - }, - "meeting_booked": { - "title_ar": "موعد جديد", - "body_ar": "تم حجز موعد مع {name} في {time}", - }, - "competitor_alert": { - "title_ar": "تنبيه منافس", - "body_ar": "تغيير من المنافس {competitor}: {detail}", - }, +TEMPLATES: Dict[str, Dict[str, str]] = { + "new_lead": {"title_ar": "عميل محتمل جديد", + "body_ar": "عميل محتمل جديد: {name} من {source}"}, + "deal_won": {"title_ar": "صفقة ناجحة", + "body_ar": "تم إغلاق صفقة: {title} بقيمة {value} ر.س"}, + "deal_at_risk": {"title_ar": "صفقة معرضة للخطر", + "body_ar": "صفقة معرضة للخطر: {title} - لا نشاط منذ {days} أيام"}, + "consent_expiring": {"title_ar": "موافقة PDPL تنتهي قريبا", + "body_ar": "موافقة PDPL تنتهي خلال {days} أيام للعميل {name}"}, + "escalation": {"title_ar": "تصعيد يتطلب انتباهك", + "body_ar": "يحتاج تدخلك: {title} - {reason}"}, + "sequence_complete": {"title_ar": "تسلسل مكتمل", + "body_ar": "اكتمل تسلسل {name} للعميل {lead_name}"}, + "meeting_booked": {"title_ar": "موعد جديد", + "body_ar": "تم حجز موعد مع {name} في {time}"}, + "competitor_alert": {"title_ar": "تنبيه منافس", + "body_ar": "تغيير من المنافس {competitor}: {detail}"}, } -# Channel routing per urgency _CHANNEL_MATRIX: Dict[AlertUrgency, List[AlertChannel]] = { - AlertUrgency.CRITICAL: [ - AlertChannel.DASHBOARD, AlertChannel.WHATSAPP, - AlertChannel.EMAIL, AlertChannel.SMS, - ], + AlertUrgency.CRITICAL: [AlertChannel.DASHBOARD, AlertChannel.WHATSAPP, AlertChannel.EMAIL, AlertChannel.SMS], AlertUrgency.HIGH: [AlertChannel.DASHBOARD, AlertChannel.WHATSAPP], AlertUrgency.MEDIUM: [AlertChannel.DASHBOARD, AlertChannel.EMAIL], AlertUrgency.LOW: [AlertChannel.DASHBOARD], } +_CAT_AR = {"lead": "العملاء المحتملون", "deal": "الصفقات", "system": "النظام", + "compliance": "الامتثال", "security": "الأمان"} -# --------------------------------------------------------------------------- -# Channel dispatchers (thin wrappers — production would call real adapters) -# --------------------------------------------------------------------------- -async def _dispatch_dashboard(alert: Alert) -> bool: - logger.info( - "[DASHBOARD] tenant=%s user=%s title=%s", - alert.tenant_id[:8], (alert.user_id or "broadcast")[:8], alert.title_ar, - ) +async def _dispatch(alert: Alert, channel: AlertChannel) -> bool: + logger.info("[%s] tenant=%s user=%s title=%s", channel.value.upper(), + alert.tenant_id[:8], (alert.user_id or "broadcast")[:8], alert.title_ar[:40]) return True -async def _dispatch_email(alert: Alert) -> bool: - logger.info( - "[EMAIL] tenant=%s user=%s subject=%s", - alert.tenant_id[:8], (alert.user_id or "broadcast")[:8], alert.title_ar, - ) - return True - - -async def _dispatch_whatsapp(alert: Alert) -> bool: - logger.info( - "[WHATSAPP] tenant=%s user=%s body=%s", - alert.tenant_id[:8], (alert.user_id or "broadcast")[:8], alert.body_ar[:60], - ) - return True - - -async def _dispatch_sms(alert: Alert) -> bool: - logger.info( - "[SMS] tenant=%s user=%s body=%s", - alert.tenant_id[:8], (alert.user_id or "broadcast")[:8], alert.body_ar[:60], - ) - return True - - -async def _dispatch_telegram(alert: Alert) -> bool: - logger.info( - "[TELEGRAM] tenant=%s user=%s body=%s", - alert.tenant_id[:8], (alert.user_id or "broadcast")[:8], alert.body_ar[:60], - ) - return True - - -_DISPATCHERS = { - AlertChannel.DASHBOARD: _dispatch_dashboard, - AlertChannel.EMAIL: _dispatch_email, - AlertChannel.WHATSAPP: _dispatch_whatsapp, - AlertChannel.SMS: _dispatch_sms, - AlertChannel.TELEGRAM: _dispatch_telegram, -} - - -# --------------------------------------------------------------------------- -# Core Service -# --------------------------------------------------------------------------- - - class AlertDelivery: - """ - Multi-channel alert delivery with urgency-based routing, acknowledgement - tracking, digest generation and delivery statistics. - """ + """Multi-channel alert delivery with urgency routing and digest generation.""" def __init__(self) -> None: - # tenant_id -> list[Alert] (most recent first) self._alerts: Dict[str, List[Alert]] = defaultdict(list) - # delivery stats counters self._stats: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int)) - # ── Send ────────────────────────────────────────────────── - async def send(self, alert: Alert) -> Dict[str, Any]: - """Route alert to channels based on urgency, deliver, and persist.""" - # Determine channels from urgency matrix, merged with explicit overrides - urgency_channels = _CHANNEL_MATRIX.get(alert.urgency, [AlertChannel.DASHBOARD]) - target_channels = list(set(urgency_channels) | set(alert.channels)) - - delivered: List[str] = [] - failed: List[str] = [] - - for ch in target_channels: + targets = list(set(_CHANNEL_MATRIX.get(alert.urgency, [AlertChannel.DASHBOARD]) + alert.channels)) + delivered, failed = [], [] + for ch in targets: ok = await self.send_to_channel(alert, ch) + (delivered if ok else failed).append(ch.value) if ok: - delivered.append(ch.value) self._stats[alert.tenant_id][ch.value] += 1 - else: - failed.append(ch.value) - alert.delivered_channels = delivered - self._alerts[alert.tenant_id].insert(0, alert) - - # Cap buffer - if len(self._alerts[alert.tenant_id]) > 10_000: - self._alerts[alert.tenant_id] = self._alerts[alert.tenant_id][:10_000] - + buf = self._alerts[alert.tenant_id] + buf.insert(0, alert) + if len(buf) > 10_000: + self._alerts[alert.tenant_id] = buf[:10_000] self._stats[alert.tenant_id]["total"] += 1 - - logger.info( - "Alert %s [%s] delivered via %s for tenant %s", - alert.id[:8], alert.urgency.value, - ", ".join(delivered) or "none", alert.tenant_id[:8], - ) - - return { - "alert_id": alert.id, - "urgency": alert.urgency.value, - "delivered": delivered, - "failed": failed, - } + logger.info("Alert %s [%s] delivered via %s", alert.id[:8], alert.urgency.value, ", ".join(delivered) or "none") + return {"alert_id": alert.id, "urgency": alert.urgency.value, "delivered": delivered, "failed": failed} async def send_to_channel(self, alert: Alert, channel: AlertChannel) -> bool: - """Dispatch to a single channel. Returns success bool.""" - dispatcher = _DISPATCHERS.get(channel) - if not dispatcher: - logger.warning("No dispatcher for channel %s", channel.value) - return False try: - return await dispatcher(alert) + return await _dispatch(alert, channel) except Exception: logger.exception("Channel %s dispatch failed for alert %s", channel.value, alert.id[:8]) return False - # ── Templates ───────────────────────────────────────────── - - async def send_from_template( - self, - template_key: str, - tenant_id: str, - urgency: AlertUrgency, - category: str = "system", - user_id: Optional[str] = None, - action_url: Optional[str] = None, - requires_ack: bool = False, - **kwargs: Any, - ) -> Dict[str, Any]: - """Build and send an alert from a named Arabic template.""" - tpl = ALERT_TEMPLATES.get(template_key) + async def send_from_template(self, template_key: str, tenant_id: str, urgency: AlertUrgency, + category: str = "system", user_id: Optional[str] = None, + action_url: Optional[str] = None, requires_ack: bool = False, + **kwargs: Any) -> Dict[str, Any]: + tpl = TEMPLATES.get(template_key) if not tpl: - logger.error("Unknown alert template: %s", template_key) return {"error": f"Unknown template: {template_key}"} - - title_ar = tpl["title_ar"] body_ar = tpl["body_ar"].format_map(defaultdict(lambda: "—", **kwargs)) - - alert = Alert( - tenant_id=tenant_id, - user_id=user_id, - title=template_key.replace("_", " ").title(), - title_ar=title_ar, - body=body_ar, - body_ar=body_ar, - urgency=urgency, - category=category, - action_url=action_url, - requires_acknowledgement=requires_ack, - metadata=dict(kwargs), - ) + alert = Alert(tenant_id=tenant_id, user_id=user_id, + title=template_key.replace("_", " ").title(), title_ar=tpl["title_ar"], + body=body_ar, body_ar=body_ar, urgency=urgency, category=category, + action_url=action_url, requires_acknowledgement=requires_ack, metadata=dict(kwargs)) return await self.send(alert) - # ── Acknowledgement ─────────────────────────────────────── - async def acknowledge(self, alert_id: str, user_id: str) -> bool: - """Mark an alert as acknowledged by a user.""" for alerts in self._alerts.values(): - for alert in alerts: - if alert.id == alert_id: - if alert.acknowledged_at: - return True # already acked - alert.acknowledged_at = datetime.now(timezone.utc) - logger.info( - "Alert %s acknowledged by user %s", - alert_id[:8], user_id[:8], - ) + for a in alerts: + if a.id == alert_id: + if a.acknowledged_at: + return True + a.acknowledged_at = datetime.now(timezone.utc) + logger.info("Alert %s acknowledged by %s", alert_id[:8], user_id[:8]) return True return False - # ── Digest ──────────────────────────────────────────────── - - async def generate_digest( - self, - tenant_id: str, - user_id: Optional[str] = None, - period: str = "daily", - ) -> Dict[str, Any]: - """Compile unacknowledged alerts into an Arabic summary digest.""" - hours = 24 if period == "daily" else 168 # weekly + async def generate_digest(self, tenant_id: str, user_id: Optional[str] = None, + period: str = "daily") -> Dict[str, Any]: + hours = 24 if period == "daily" else 168 cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) - - pending = [ - a for a in self._alerts.get(tenant_id, []) - if a.acknowledged_at is None - and a.created_at >= cutoff - and (user_id is None or a.user_id is None or a.user_id == user_id) - ] - + pending = [a for a in self._alerts.get(tenant_id, []) + if a.acknowledged_at is None and a.created_at >= cutoff + and (user_id is None or a.user_id is None or a.user_id == user_id)] if not pending: - return { - "tenant_id": tenant_id, - "period": period, - "count": 0, - "digest_ar": "لا توجد تنبيهات جديدة", - "alerts": [], - } + return {"tenant_id": tenant_id, "period": period, "count": 0, + "digest_ar": "لا توجد تنبيهات جديدة", "alerts": []} - # Group by category - by_category: Dict[str, List[Alert]] = defaultdict(list) + by_cat: Dict[str, List[Alert]] = defaultdict(list) for a in pending: - by_category[a.category].append(a) + by_cat[a.category].append(a) - category_labels = { - "lead": "العملاء المحتملون", - "deal": "الصفقات", - "system": "النظام", - "compliance": "الامتثال", - "security": "الأمان", - } - - lines: List[str] = [] - lines.append(f"ملخص التنبيهات — {'يومي' if period == 'daily' else 'أسبوعي'}") - lines.append(f"إجمالي التنبيهات: {len(pending)}") + crit = sum(1 for a in pending if a.urgency == AlertUrgency.CRITICAL) + high = sum(1 for a in pending if a.urgency == AlertUrgency.HIGH) + lines = [f"ملخص التنبيهات — {'يومي' if period == 'daily' else 'أسبوعي'}", + f"إجمالي التنبيهات: {len(pending)}"] + if crit: lines.append(f"تنبيهات حرجة: {crit}") + if high: lines.append(f"تنبيهات عالية الأهمية: {high}") lines.append("") + for cat, items in by_cat.items(): + lines.append(f"— {_CAT_AR.get(cat, cat)} ({len(items)}):") + for a in items[:10]: + tag = " [حرج]" if a.urgency == AlertUrgency.CRITICAL else ( + " [مهم]" if a.urgency == AlertUrgency.HIGH else "") + lines.append(f" - {a.title_ar}{tag}") + if len(items) > 10: + lines.append(f" ... و {len(items) - 10} تنبيهات أخرى") - critical_count = sum(1 for a in pending if a.urgency == AlertUrgency.CRITICAL) - high_count = sum(1 for a in pending if a.urgency == AlertUrgency.HIGH) - if critical_count: - lines.append(f"تنبيهات حرجة: {critical_count}") - if high_count: - lines.append(f"تنبيهات عالية الأهمية: {high_count}") - lines.append("") + return {"tenant_id": tenant_id, "user_id": user_id, "period": period, + "count": len(pending), "critical": crit, "high": high, + "digest_ar": "\n".join(lines), "alerts": [a.model_dump() for a in pending[:50]]} - for cat, cat_alerts in by_category.items(): - label = category_labels.get(cat, cat) - lines.append(f"— {label} ({len(cat_alerts)}):") - for a in cat_alerts[:10]: - urgency_marker = "" - if a.urgency == AlertUrgency.CRITICAL: - urgency_marker = " [حرج]" - elif a.urgency == AlertUrgency.HIGH: - urgency_marker = " [مهم]" - lines.append(f" - {a.title_ar}{urgency_marker}") - if len(cat_alerts) > 10: - lines.append(f" ... و {len(cat_alerts) - 10} تنبيهات أخرى") - - digest_text = "\n".join(lines) - - return { - "tenant_id": tenant_id, - "user_id": user_id, - "period": period, - "count": len(pending), - "critical": critical_count, - "high": high_count, - "digest_ar": digest_text, - "alerts": [a.model_dump() for a in pending[:50]], - } - - # ── Queries ─────────────────────────────────────────────── - - async def get_pending( - self, tenant_id: str, user_id: Optional[str] = None - ) -> List[Alert]: - """Return unacknowledged alerts for a user (or all if user_id is None).""" - return [ - a for a in self._alerts.get(tenant_id, []) - if a.acknowledged_at is None - and (user_id is None or a.user_id is None or a.user_id == user_id) - ] + async def get_pending(self, tenant_id: str, user_id: Optional[str] = None) -> List[Alert]: + return [a for a in self._alerts.get(tenant_id, []) + if a.acknowledged_at is None + and (user_id is None or a.user_id is None or a.user_id == user_id)] async def get_delivery_stats(self, tenant_id: str) -> Dict[str, Any]: - """Return delivery statistics for a tenant.""" stats = dict(self._stats.get(tenant_id, {})) total = stats.get("total", 0) - alerts = self._alerts.get(tenant_id, []) acked = sum(1 for a in alerts if a.acknowledged_at is not None) - pending = sum(1 for a in alerts if a.acknowledged_at is None) - - urgency_counts: Dict[str, int] = defaultdict(int) - category_counts: Dict[str, int] = defaultdict(int) + urg: Dict[str, int] = defaultdict(int) + cat: Dict[str, int] = defaultdict(int) for a in alerts: - urgency_counts[a.urgency.value] += 1 - category_counts[a.category] += 1 + urg[a.urgency.value] += 1 + cat[a.category] += 1 + return {"tenant_id": tenant_id, "total_sent": total, "acknowledged": acked, + "pending": sum(1 for a in alerts if a.acknowledged_at is None), + "ack_rate": round(acked / max(total, 1) * 100, 1), + "by_channel": {k: v for k, v in stats.items() if k != "total"}, + "by_urgency": dict(urg), "by_category": dict(cat)} - return { - "tenant_id": tenant_id, - "total_sent": total, - "acknowledged": acked, - "pending": pending, - "ack_rate": round(acked / max(total, 1) * 100, 1), - "by_channel": {k: v for k, v in stats.items() if k != "total"}, - "by_urgency": dict(urgency_counts), - "by_category": dict(category_counts), - } - - -# --------------------------------------------------------------------------- -# Module-level singleton -# --------------------------------------------------------------------------- _instance: Optional[AlertDelivery] = None - def get_alert_delivery() -> AlertDelivery: global _instance if _instance is None: diff --git a/salesflow-saas/backend/app/services/autopilot.py b/salesflow-saas/backend/app/services/autopilot.py index 9f6256e4..eb43116d 100644 --- a/salesflow-saas/backend/app/services/autopilot.py +++ b/salesflow-saas/backend/app/services/autopilot.py @@ -1,27 +1,14 @@ -""" -Autopilot Layer — Dealix AI Revenue OS -======================================== -نظام الطيار الآلي: تشغيل مهام CRM بشكل مستقل وآمن. -- أوضاع متعددة: محاكاة، توصية، مسودة، موافقة، مستقل -- حدود ميزانية وحماية من التجاوز -- نقاط تفتيش وإمكانية الإيقاف والاستئناف -""" +"""Autopilot Layer — Dealix AI Revenue OS — نظام الطيار الآلي""" from __future__ import annotations -import asyncio -import logging -import uuid +import asyncio, logging, uuid from datetime import datetime, timedelta, timezone from enum import Enum from typing import Any, Callable, Coroutine, Optional - from pydantic import BaseModel, Field logger = logging.getLogger(__name__) - -# ── Enums ─────────────────────────────────────────────────────────── - class AutopilotMode(str, Enum): SIMULATION = "simulation" RECOMMENDATION = "recommendation" @@ -38,13 +25,7 @@ class RunStatus(str, Enum): ABORTED = "aborted" AWAITING_APPROVAL = "awaiting_approval" - -AUTOPILOT_STEPS = [ - "monitor", "detect", "classify", "decide", "propose", "approve", "execute", "verify", "log", -] - - -# ── Models ────────────────────────────────────────────────────────── +STEPS = ["monitor", "detect", "classify", "decide", "propose", "approve", "execute", "verify", "log"] class AutopilotBudget(BaseModel): api_calls: int = 100 @@ -69,7 +50,6 @@ class AutopilotBudget(BaseModel): def exhausted(self) -> bool: return self.api_calls_used >= self.api_calls - class PendingApproval(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4())) action: str @@ -79,14 +59,12 @@ class PendingApproval(BaseModel): approved: Optional[bool] = None approved_by: Optional[str] = None - class SideEffect(BaseModel): action: str target: str detail: str occurred_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - class AutopilotUnit(BaseModel): run_id: str = Field(default_factory=lambda: str(uuid.uuid4())) agent_id: str = "" @@ -105,20 +83,16 @@ class AutopilotUnit(BaseModel): started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) completed_at: Optional[datetime] = None - class AutopilotPolicy(BaseModel): max_api_calls: int = 100 max_messages_per_hour: int = 50 max_run_duration_minutes: int = 30 require_approval_for: list[str] = Field(default_factory=lambda: [ - "send_message", "update_deal", "assign_lead", - ]) + "send_message", "update_deal", "assign_lead"]) forbidden_actions: list[str] = Field(default_factory=lambda: [ - "delete_data", "change_permissions", "bulk_send", - ]) + "delete_data", "change_permissions", "bulk_send"]) kill_switch_enabled: bool = True - class AutopilotResult(BaseModel): run_id: str task_type: str @@ -133,218 +107,130 @@ class AutopilotResult(BaseModel): duration_ms: int = 0 summary_ar: str = "" +# ── Task handlers ────────────────────────────────────────────────── -# ── Task Handlers ─────────────────────────────────────────────────── +def _advance(unit: AutopilotUnit, step: str) -> None: + unit.current_step = step + unit.checkpoint["step"] = step -async def _task_follow_up_dormant_leads( - unit: AutopilotUnit, policy: AutopilotPolicy, -) -> None: - unit.current_step = "monitor" - unit.checkpoint["step"] = "monitor" - unit.budget.consume_api_call() - - dormant = [ - {"lead_id": "L001", "name": "أحمد المطيري", "days_inactive": 5}, - {"lead_id": "L002", "name": "فاطمة العتيبي", "days_inactive": 4}, - {"lead_id": "L003", "name": "محمد القحطاني", "days_inactive": 3}, - ] - unit.result_data["dormant_leads"] = dormant - - unit.current_step = "detect" - unit.checkpoint["step"] = "detect" - unit.result_data["detected_count"] = len(dormant) - - unit.current_step = "classify" - unit.checkpoint["step"] = "classify" - for lead in dormant: - lead["urgency"] = "high" if lead["days_inactive"] >= 5 else "medium" - - unit.current_step = "decide" - unit.confidence = 0.78 - drafts = [] - for lead in dormant: - drafts.append({ - "lead_id": lead["lead_id"], - "action": "send_follow_up", - "message_ar": f"مرحباً {lead['name']}، نود متابعة محادثتنا السابقة. هل لديك أي أسئلة؟", - "channel": "whatsapp", - }) - - unit.current_step = "propose" - unit.result_data["proposed_actions"] = drafts - unit.checkpoint["step"] = "propose" - - if unit.mode in (AutopilotMode.SIMULATION, AutopilotMode.RECOMMENDATION): +async def _task_follow_up_dormant_leads(u: AutopilotUnit, p: AutopilotPolicy) -> None: + _advance(u, "monitor") + u.budget.consume_api_call() + dormant = [{"lead_id": "L001", "name": "أحمد المطيري", "days_inactive": 5}, + {"lead_id": "L002", "name": "فاطمة العتيبي", "days_inactive": 4}, + {"lead_id": "L003", "name": "محمد القحطاني", "days_inactive": 3}] + u.result_data["dormant_leads"] = dormant + _advance(u, "detect") + u.result_data["detected_count"] = len(dormant) + _advance(u, "classify") + for ld in dormant: + ld["urgency"] = "high" if ld["days_inactive"] >= 5 else "medium" + _advance(u, "decide") + u.confidence = 0.78 + drafts = [{"lead_id": ld["lead_id"], "action": "send_follow_up", "channel": "whatsapp", + "message_ar": f"مرحباً {ld['name']}، نود متابعة محادثتنا السابقة. هل لديك أي أسئلة؟"} + for ld in dormant] + _advance(u, "propose") + u.result_data["proposed_actions"] = drafts + if u.mode in (AutopilotMode.SIMULATION, AutopilotMode.RECOMMENDATION): return - - if unit.mode == AutopilotMode.DRAFT: - unit.result_data["drafts_created"] = len(drafts) + if u.mode == AutopilotMode.DRAFT: + u.result_data["drafts_created"] = len(drafts) return - - if unit.mode == AutopilotMode.APPROVAL_GATED: - for draft in drafts: - if "send_message" in policy.require_approval_for: - unit.pending_approvals.append(PendingApproval( - action="send_follow_up", - description_ar=f"إرسال متابعة لـ {draft['lead_id']}", - params=draft, - )) - unit.status = RunStatus.AWAITING_APPROVAL + if u.mode == AutopilotMode.APPROVAL_GATED: + for d in drafts: + if "send_message" in p.require_approval_for: + u.pending_approvals.append(PendingApproval( + action="send_follow_up", description_ar=f"إرسال متابعة لـ {d['lead_id']}", params=d)) + u.status = RunStatus.AWAITING_APPROVAL return - - unit.current_step = "execute" - for draft in drafts: - if not unit.budget.consume_message(): - unit.error = "تم تجاوز حد الرسائل المسموح" + _advance(u, "execute") + for d in drafts: + if not u.budget.consume_message(): + u.error = "تم تجاوز حد الرسائل" break - unit.side_effects.append(SideEffect( - action="send_whatsapp", target=draft["lead_id"], - detail=draft["message_ar"][:100], - )) - unit.result_data["messages_sent"] = len(unit.side_effects) + u.side_effects.append(SideEffect(action="send_whatsapp", target=d["lead_id"], detail=d["message_ar"][:80])) + _advance(u, "verify") - unit.current_step = "verify" - unit.checkpoint["step"] = "verify" - - -async def _task_qualify_new_leads( - unit: AutopilotUnit, policy: AutopilotPolicy, -) -> None: - unit.current_step = "monitor" - unit.budget.consume_api_call() - - new_leads = [ - {"lead_id": "L010", "name": "سارة الحربي", "source": "website"}, - {"lead_id": "L011", "name": "خالد الشمري", "source": "whatsapp"}, - ] - unit.result_data["new_leads"] = new_leads - - unit.current_step = "detect" - unit.result_data["detected_count"] = len(new_leads) - - unit.current_step = "classify" +async def _task_qualify_new_leads(u: AutopilotUnit, p: AutopilotPolicy) -> None: + _advance(u, "monitor") + u.budget.consume_api_call() + leads = [{"lead_id": "L010", "name": "سارة الحربي", "source": "website"}, + {"lead_id": "L011", "name": "خالد الشمري", "source": "whatsapp"}] + u.result_data["new_leads"] = leads + _advance(u, "detect") + _advance(u, "classify") scored = [] - for lead in new_leads: - unit.budget.consume_api_call() - scored.append({**lead, "score": 65, "qualified": True, "tier": "B"}) - unit.result_data["scored_leads"] = scored - - unit.current_step = "decide" - unit.confidence = 0.82 - - unit.current_step = "propose" - unit.result_data["proposed_actions"] = [ - {"lead_id": s["lead_id"], "action": "update_qualification", "score": s["score"]} - for s in scored - ] - unit.checkpoint["step"] = "propose" - - if unit.mode in (AutopilotMode.SIMULATION, AutopilotMode.RECOMMENDATION, AutopilotMode.DRAFT): + for ld in leads: + u.budget.consume_api_call() + scored.append({**ld, "score": 65, "qualified": True, "tier": "B"}) + u.result_data["scored_leads"] = scored + _advance(u, "decide") + u.confidence = 0.82 + _advance(u, "propose") + u.result_data["proposed_actions"] = [{"lead_id": s["lead_id"], "action": "update_qualification", + "score": s["score"]} for s in scored] + if u.mode in (AutopilotMode.SIMULATION, AutopilotMode.RECOMMENDATION, AutopilotMode.DRAFT): return - - if unit.mode == AutopilotMode.APPROVAL_GATED: + if u.mode == AutopilotMode.APPROVAL_GATED: for s in scored: - unit.pending_approvals.append(PendingApproval( - action="update_qualification", - description_ar=f"تحديث تأهيل {s['name']} — درجة {s['score']}", - params={"lead_id": s["lead_id"], "score": s["score"]}, - )) - unit.status = RunStatus.AWAITING_APPROVAL + u.pending_approvals.append(PendingApproval( + action="update_qualification", description_ar=f"تأهيل {s['name']} — درجة {s['score']}", + params={"lead_id": s["lead_id"], "score": s["score"]})) + u.status = RunStatus.AWAITING_APPROVAL return - - unit.current_step = "execute" + _advance(u, "execute") for s in scored: - unit.side_effects.append(SideEffect( - action="qualify_lead", target=s["lead_id"], - detail=f"تأهيل: {s['score']} — فئة {s['tier']}", - )) + u.side_effects.append(SideEffect(action="qualify_lead", target=s["lead_id"], + detail=f"تأهيل: {s['score']} — فئة {s['tier']}")) + _advance(u, "verify") - unit.current_step = "verify" +async def _task_pipeline_health_check(u: AutopilotUnit, p: AutopilotPolicy) -> None: + _advance(u, "monitor") + u.budget.consume_api_call() + _advance(u, "detect") + at_risk = [{"deal_id": "D100", "title": "مشروع تقنية المعلومات", "value": 250_000, "risk": "stalled"}, + {"deal_id": "D101", "title": "عقد صيانة سنوي", "value": 80_000, "risk": "competitor"}] + u.result_data["at_risk_deals"] = at_risk + _advance(u, "classify") + for d in at_risk: + d["urgency"] = "critical" if d["value"] > 100_000 else "high" + _advance(u, "decide") + u.confidence = 0.75 + u.result_data["recommendations"] = [{"deal_id": d["deal_id"], + "action_ar": "جدولة اجتماع عاجل مع العميل"} for d in at_risk] + _advance(u, "propose") - -async def _task_pipeline_health_check( - unit: AutopilotUnit, policy: AutopilotPolicy, -) -> None: - unit.current_step = "monitor" - unit.budget.consume_api_call() - - unit.current_step = "detect" - at_risk = [ - {"deal_id": "D100", "title": "مشروع تقنية المعلومات", "value": 250_000, "risk": "stalled"}, - {"deal_id": "D101", "title": "عقد صيانة سنوي", "value": 80_000, "risk": "competitor"}, - ] - unit.result_data["at_risk_deals"] = at_risk - - unit.current_step = "classify" - for deal in at_risk: - deal["urgency"] = "critical" if deal["value"] > 100_000 else "high" - - unit.current_step = "decide" - unit.confidence = 0.75 - unit.result_data["recommendations"] = [ - {"deal_id": d["deal_id"], "action_ar": "جدولة اجتماع عاجل مع العميل"} for d in at_risk - ] - - unit.current_step = "propose" - unit.checkpoint["step"] = "propose" - - -async def _task_daily_report( - unit: AutopilotUnit, policy: AutopilotPolicy, -) -> None: - unit.current_step = "monitor" - unit.budget.consume_api_call() - - unit.current_step = "detect" - unit.result_data["report"] = { +async def _task_daily_report(u: AutopilotUnit, p: AutopilotPolicy) -> None: + _advance(u, "monitor") + u.budget.consume_api_call() + _advance(u, "detect") + u.result_data["report"] = { "date": datetime.now(timezone.utc).strftime("%Y-%m-%d"), - "new_leads": 12, "qualified": 5, "deals_won": 2, - "revenue_today": 180_000, "currency": "SAR", - "top_performer": "أحمد المطيري", - "at_risk_count": 3, - "summary_ar": "يوم إيجابي: صفقتان مغلقتان بقيمة 180 ألف ريال. 3 صفقات تحتاج متابعة.", - } + "new_leads": 12, "qualified": 5, "deals_won": 2, "revenue_today": 180_000, "currency": "SAR", + "top_performer": "أحمد المطيري", "at_risk_count": 3, + "summary_ar": "يوم إيجابي: صفقتان مغلقتان بقيمة 180 ألف ريال. 3 صفقات تحتاج متابعة."} + _advance(u, "classify") + u.confidence = 0.95 + _advance(u, "propose") - unit.current_step = "classify" - unit.confidence = 0.95 +async def _task_sequence_optimizer(u: AutopilotUnit, p: AutopilotPolicy) -> None: + _advance(u, "monitor") + u.budget.consume_api_call() + _advance(u, "detect") + seqs = [{"id": "SEQ01", "name": "ترحيب عملاء جدد", "open_rate": 0.45, "reply_rate": 0.12}, + {"id": "SEQ02", "name": "متابعة بعد العرض", "open_rate": 0.62, "reply_rate": 0.25}] + u.result_data["sequences"] = seqs + _advance(u, "classify") + _advance(u, "decide") + u.confidence = 0.70 + u.result_data["suggestions"] = [ + {"sequence_id": s["id"], "proposed_change": "shorten_message", + "suggestion_ar": f"تحسين '{s['name']}' — معدل الرد منخفض ({s['reply_rate']:.0%})"} + for s in seqs if s["reply_rate"] < 0.15] + _advance(u, "propose") - unit.current_step = "propose" - unit.checkpoint["step"] = "propose" - - -async def _task_sequence_optimizer( - unit: AutopilotUnit, policy: AutopilotPolicy, -) -> None: - unit.current_step = "monitor" - unit.budget.consume_api_call() - - unit.current_step = "detect" - sequences = [ - {"id": "SEQ01", "name": "ترحيب عملاء جدد", "open_rate": 0.45, "reply_rate": 0.12}, - {"id": "SEQ02", "name": "متابعة بعد العرض", "open_rate": 0.62, "reply_rate": 0.25}, - ] - unit.result_data["sequences"] = sequences - - unit.current_step = "classify" - unit.current_step = "decide" - unit.confidence = 0.70 - suggestions = [] - for seq in sequences: - if seq["reply_rate"] < 0.15: - suggestions.append({ - "sequence_id": seq["id"], - "suggestion_ar": f"تحسين محتوى '{seq['name']}' — معدل الرد منخفض ({seq['reply_rate']:.0%})", - "proposed_change": "shorten_message", - }) - unit.result_data["suggestions"] = suggestions - - unit.current_step = "propose" - unit.checkpoint["step"] = "propose" - - -# ── Task Registry ────────────────────────────────────────────────── - -_TASK_HANDLERS: dict[str, Callable[[AutopilotUnit, AutopilotPolicy], Coroutine[Any, Any, None]]] = { +_TASK_HANDLERS: dict[str, Callable] = { "follow_up_dormant_leads": _task_follow_up_dormant_leads, "qualify_new_leads": _task_qualify_new_leads, "pipeline_health_check": _task_pipeline_health_check, @@ -352,194 +238,137 @@ _TASK_HANDLERS: dict[str, Callable[[AutopilotUnit, AutopilotPolicy], Coroutine[A "sequence_optimizer": _task_sequence_optimizer, } +_TASK_META: dict[str, dict[str, str]] = { + "follow_up_dormant_leads": {"name_ar": "متابعة العملاء الخاملين", + "desc_ar": "البحث عن عملاء بدون نشاط 3+ أيام وصياغة رسائل متابعة"}, + "qualify_new_leads": {"name_ar": "تأهيل العملاء الجدد", + "desc_ar": "تقييم وتأهيل العملاء المحتملين الجدد تلقائياً"}, + "pipeline_health_check": {"name_ar": "فحص صحة خط الأنابيب", + "desc_ar": "تحليل خط الأنابيب والكشف عن صفقات معرضة للخطر"}, + "daily_report": {"name_ar": "التقرير اليومي", "desc_ar": "إنشاء ملخص يومي لأداء المبيعات"}, + "sequence_optimizer": {"name_ar": "تحسين التسلسلات", "desc_ar": "تحليل أداء التسلسلات واقتراح تحسينات"}, +} -# ── Autopilot Runner ─────────────────────────────────────────────── +# ── Runner ───────────────────────────────────────────────────────── class AutopilotRunner: """Runs autopilot tasks safely with budgets, policies, and checkpointing.""" def __init__(self, policy: Optional[AutopilotPolicy] = None) -> None: self._policy = policy or AutopilotPolicy() - self._active_runs: dict[str, AutopilotUnit] = {} + self._active: dict[str, AutopilotUnit] = {} - async def run( - self, - task_type: str, - mode: AutopilotMode, - params: dict[str, Any], - budget: Optional[AutopilotBudget] = None, - tenant_id: str = "", - agent_id: str = "", - ) -> AutopilotResult: + async def run(self, task_type: str, mode: AutopilotMode, params: dict[str, Any], + budget: Optional[AutopilotBudget] = None, tenant_id: str = "", + agent_id: str = "") -> AutopilotResult: handler = _TASK_HANDLERS.get(task_type) if not handler: - return AutopilotResult( - run_id=str(uuid.uuid4()), task_type=task_type, mode=mode, - status=RunStatus.FAILED, - summary_ar=f"مهمة غير معروفة: {task_type}", - ) - + return AutopilotResult(run_id=str(uuid.uuid4()), task_type=task_type, mode=mode, + status=RunStatus.FAILED, summary_ar=f"مهمة غير معروفة: {task_type}") unit = AutopilotUnit( - agent_id=agent_id, tenant_id=tenant_id, task_type=task_type, - mode=mode, - budget=budget or AutopilotBudget( - api_calls=self._policy.max_api_calls, - messages=self._policy.max_messages_per_hour, - max_duration_minutes=self._policy.max_run_duration_minutes, - ), - ) - self._active_runs[unit.run_id] = unit + agent_id=agent_id, tenant_id=tenant_id, task_type=task_type, mode=mode, + budget=budget or AutopilotBudget(api_calls=self._policy.max_api_calls, + messages=self._policy.max_messages_per_hour, + max_duration_minutes=self._policy.max_run_duration_minutes)) + self._active[unit.run_id] = unit start = datetime.now(timezone.utc) - deadline = start + timedelta(minutes=unit.budget.max_duration_minutes) - - logger.info( - "[Autopilot] بدء run=%s task=%s mode=%s tenant=%s", - unit.run_id, task_type, mode.value, tenant_id, - ) - + logger.info("[Autopilot] بدء run=%s task=%s mode=%s", unit.run_id, task_type, mode.value) try: - if self._policy.kill_switch_enabled and datetime.now(timezone.utc) > deadline: - unit.status = RunStatus.FAILED - unit.error = "تم تجاوز الحد الزمني المسموح" - else: - await handler(unit, self._policy) - if unit.status == RunStatus.RUNNING: - unit.status = RunStatus.COMPLETED + await handler(unit, self._policy) + if unit.status == RunStatus.RUNNING: + unit.status = RunStatus.COMPLETED except Exception as exc: logger.exception("[Autopilot] فشل run=%s: %s", unit.run_id, exc) unit.status = RunStatus.FAILED unit.error = str(exc) - end = datetime.now(timezone.utc) unit.completed_at = end - duration_ms = int((end - start).total_seconds() * 1000) - + dur = int((end - start).total_seconds() * 1000) steps_done = [] - for step in AUTOPILOT_STEPS: - steps_done.append(step) - if step == unit.current_step: + for s in STEPS: + steps_done.append(s) + if s == unit.current_step: break - result = AutopilotResult( - run_id=unit.run_id, task_type=task_type, mode=mode, - status=unit.status, steps_completed=steps_done, + run_id=unit.run_id, task_type=task_type, mode=mode, status=unit.status, + steps_completed=steps_done, findings=unit.result_data.get("at_risk_deals", unit.result_data.get("dormant_leads", [])), actions_taken=[se.model_dump() for se in unit.side_effects], actions_proposed=unit.result_data.get("proposed_actions", []), - side_effects=unit.side_effects, - confidence=unit.confidence, duration_ms=duration_ms, - summary_ar=self._build_summary(unit), - ) - - logger.info( - "[Autopilot] انتهاء run=%s status=%s dur=%dms", - unit.run_id, unit.status.value, duration_ms, - ) + side_effects=unit.side_effects, confidence=unit.confidence, duration_ms=dur, + summary_ar=self._summary(unit)) + logger.info("[Autopilot] نهاية run=%s status=%s %dms", unit.run_id, unit.status.value, dur) return result async def pause(self, run_id: str) -> bool: - unit = self._active_runs.get(run_id) - if not unit or unit.status != RunStatus.RUNNING: + u = self._active.get(run_id) + if not u or u.status != RunStatus.RUNNING: return False - unit.status = RunStatus.PAUSED - logger.info("[Autopilot] إيقاف مؤقت run=%s at step=%s", run_id, unit.current_step) + u.status = RunStatus.PAUSED return True async def resume(self, run_id: str) -> Optional[AutopilotResult]: - unit = self._active_runs.get(run_id) - if not unit or unit.status not in (RunStatus.PAUSED, RunStatus.AWAITING_APPROVAL): + u = self._active.get(run_id) + if not u or u.status not in (RunStatus.PAUSED, RunStatus.AWAITING_APPROVAL): return None - unit.status = RunStatus.RUNNING - logger.info("[Autopilot] استئناف run=%s from step=%s", run_id, unit.current_step) - handler = _TASK_HANDLERS.get(unit.task_type) + u.status = RunStatus.RUNNING + handler = _TASK_HANDLERS.get(u.task_type) if handler: try: - await handler(unit, self._policy) - if unit.status == RunStatus.RUNNING: - unit.status = RunStatus.COMPLETED + await handler(u, self._policy) + if u.status == RunStatus.RUNNING: + u.status = RunStatus.COMPLETED except Exception as exc: - unit.status = RunStatus.FAILED - unit.error = str(exc) - return AutopilotResult( - run_id=unit.run_id, task_type=unit.task_type, mode=unit.mode, - status=unit.status, confidence=unit.confidence, - summary_ar=self._build_summary(unit), - ) + u.status = RunStatus.FAILED + u.error = str(exc) + return AutopilotResult(run_id=u.run_id, task_type=u.task_type, mode=u.mode, + status=u.status, confidence=u.confidence, summary_ar=self._summary(u)) async def abort(self, run_id: str) -> bool: - unit = self._active_runs.get(run_id) - if not unit: + u = self._active.get(run_id) + if not u: return False - unit.status = RunStatus.ABORTED - unit.completed_at = datetime.now(timezone.utc) - logger.info("[Autopilot] إلغاء run=%s", run_id) + u.status = RunStatus.ABORTED + u.completed_at = datetime.now(timezone.utc) return True async def approve_pending(self, run_id: str, approval_id: str, approved_by: str) -> bool: - unit = self._active_runs.get(run_id) - if not unit: + u = self._active.get(run_id) + if not u: return False - for pa in unit.pending_approvals: + for pa in u.pending_approvals: if pa.id == approval_id: - pa.approved = True - pa.approved_by = approved_by - logger.info("[Autopilot] تمت الموافقة approval=%s by=%s", approval_id, approved_by) + pa.approved, pa.approved_by = True, approved_by return True return False async def get_status(self, run_id: str) -> Optional[AutopilotUnit]: - return self._active_runs.get(run_id) + return self._active.get(run_id) def list_active(self, tenant_id: Optional[str] = None) -> list[AutopilotUnit]: - runs = list(self._active_runs.values()) + runs = list(self._active.values()) if tenant_id: runs = [r for r in runs if r.tenant_id == tenant_id] return [r for r in runs if r.status in (RunStatus.RUNNING, RunStatus.PAUSED, RunStatus.AWAITING_APPROVAL)] def list_supported_tasks(self) -> list[dict[str, str]]: - _TASK_META = { - "follow_up_dormant_leads": { - "name_ar": "متابعة العملاء الخاملين", - "desc_ar": "البحث عن عملاء بدون نشاط لأكثر من 3 أيام وصياغة رسائل متابعة", - }, - "qualify_new_leads": { - "name_ar": "تأهيل العملاء الجدد", - "desc_ar": "تقييم وتأهيل العملاء المحتملين الجدد تلقائياً", - }, - "pipeline_health_check": { - "name_ar": "فحص صحة خط الأنابيب", - "desc_ar": "تحليل خط الأنابيب والكشف عن الصفقات المعرضة للخطر", - }, - "daily_report": { - "name_ar": "التقرير اليومي", - "desc_ar": "إنشاء ملخص يومي لأداء المبيعات", - }, - "sequence_optimizer": { - "name_ar": "تحسين التسلسلات", - "desc_ar": "تحليل أداء التسلسلات واقتراح تحسينات", - }, - } - return [ - {"task_type": k, **_TASK_META.get(k, {"name_ar": k, "desc_ar": ""})} - for k in _TASK_HANDLERS - ] + return [{"task_type": k, **_TASK_META.get(k, {})} for k in _TASK_HANDLERS] @staticmethod - def _build_summary(unit: AutopilotUnit) -> str: - if unit.status == RunStatus.FAILED: - return f"فشل التنفيذ: {unit.error or 'خطأ غير محدد'}" - if unit.status == RunStatus.ABORTED: + def _summary(u: AutopilotUnit) -> str: + if u.status == RunStatus.FAILED: + return f"فشل التنفيذ: {u.error or 'خطأ غير محدد'}" + if u.status == RunStatus.ABORTED: return "تم إلغاء المهمة" - if unit.status == RunStatus.AWAITING_APPROVAL: - return f"بانتظار الموافقة على {len(unit.pending_approvals)} إجراء" - if unit.status == RunStatus.PAUSED: - return f"متوقف مؤقتاً عند الخطوة: {unit.current_step}" - - effects = len(unit.side_effects) - proposed = len(unit.result_data.get("proposed_actions", [])) - parts = [f"تم التنفيذ بنجاح (ثقة {unit.confidence:.0%})"] + if u.status == RunStatus.AWAITING_APPROVAL: + return f"بانتظار الموافقة على {len(u.pending_approvals)} إجراء" + if u.status == RunStatus.PAUSED: + return f"متوقف مؤقتاً عند: {u.current_step}" + effects = len(u.side_effects) + proposed = len(u.result_data.get("proposed_actions", [])) + parts = [f"اكتمل (ثقة {u.confidence:.0%})"] if effects: parts.append(f"— {effects} إجراء منفّذ") if proposed: - parts.append(f"— {proposed} إجراء مقترح") + parts.append(f"— {proposed} مقترح") return " ".join(parts) diff --git a/salesflow-saas/backend/app/services/behavior_intelligence.py b/salesflow-saas/backend/app/services/behavior_intelligence.py index 496df013..0629bd3d 100644 --- a/salesflow-saas/backend/app/services/behavior_intelligence.py +++ b/salesflow-saas/backend/app/services/behavior_intelligence.py @@ -1,35 +1,27 @@ """ -Behavior Intelligence — Pattern detection engine for Dealix CRM. +Behavior Intelligence — Pattern detection for Dealix CRM (watch-mode only). -Watch-mode analytics that detects winning sequences, top-rep behaviours, -optimal contact times, at-risk deals, and generates Arabic recommendations. -Operates on in-memory signal history; no autonomous actions taken. +Detects winning sequences, top-rep behaviours, optimal contact times, +at-risk deals, and generates Arabic recommendations. """ - from __future__ import annotations -import logging -import uuid +import logging, uuid from collections import defaultdict from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional - from pydantic import BaseModel, Field logger = logging.getLogger("dealix.services.behavior_intelligence") -# --------------------------------------------------------------------------- -# Models -# --------------------------------------------------------------------------- - class TrackedPattern(BaseModel): id: str = Field(default_factory=lambda: uuid.uuid4().hex) tenant_id: str - pattern_type: str # winning_sequence, fast_close, high_conversion_rep, at_risk, best_time + pattern_type: str description: str description_ar: str - confidence: float = 0.0 # 0-1 + confidence: float = 0.0 frequency: int = 1 entities_involved: List[str] = [] first_seen: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) @@ -41,446 +33,248 @@ class TrackedPattern(BaseModel): class Recommendation(BaseModel): id: str = Field(default_factory=lambda: uuid.uuid4().hex) tenant_id: str - category: str # performance, sequence, timing, risk + category: str title_ar: str detail_ar: str - impact: str # high, medium, low + impact: str = "medium" confidence: float = 0.0 source_patterns: List[str] = [] created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) -# --------------------------------------------------------------------------- -# Simulated data layer -# --------------------------------------------------------------------------- -# In production these would query PostgreSQL aggregates. Here we keep -# lightweight dicts that can be seeded or fed from the signal engine. - +# ── Simulated data layer ──────────────────────────────────────────────── class _TenantData: - """Holds simulated analytics data for a single tenant.""" - def __init__(self) -> None: - # rep_id -> stats self.rep_stats: Dict[str, Dict[str, Any]] = {} - # sequence_id -> stats self.sequence_stats: Dict[str, Dict[str, Any]] = {} - # deal_id -> stats self.deal_stats: Dict[str, Dict[str, Any]] = {} - # hour -> response count self.hourly_responses: Dict[int, int] = defaultdict(int) - # day-of-week (0=Mon) -> response count self.daily_responses: Dict[int, int] = defaultdict(int) - _tenant_data: Dict[str, _TenantData] = defaultdict(_TenantData) - def seed_tenant_data(tenant_id: str, data: _TenantData) -> None: - """Allow external seeding (tests, signal ingest pipeline).""" _tenant_data[tenant_id] = data +def _data(tid: str) -> _TenantData: + return _tenant_data[tid] -def _get_data(tenant_id: str) -> _TenantData: - return _tenant_data[tenant_id] +def _sample_reps() -> Dict[str, Dict[str, Any]]: + return { + "rep_001": {"name": "أحمد", "close_rate": 0.42, "avg_response_min": 18, + "avg_days_to_close": 12, "deals_closed": 28}, + "rep_002": {"name": "سارة", "close_rate": 0.35, "avg_response_min": 25, + "avg_days_to_close": 15, "deals_closed": 22}, + "rep_003": {"name": "خالد", "close_rate": 0.28, "avg_response_min": 75, + "avg_days_to_close": 22, "deals_closed": 15}, + } +def _sample_seqs() -> Dict[str, Dict[str, Any]]: + return { + "seq_001": {"name": "VIP Real Estate", "name_ar": "عقارات VIP", + "conversion_rate": 0.38, "enrolled": 120}, + "seq_002": {"name": "Tech Startup", "name_ar": "تواصل الشركات الناشئة", + "conversion_rate": 0.25, "enrolled": 85}, + "seq_003": {"name": "Standard", "name_ar": "المتابعة العادية", + "conversion_rate": 0.12, "enrolled": 200}, + } -# --------------------------------------------------------------------------- -# Core Service -# --------------------------------------------------------------------------- +def _sample_deals() -> Dict[str, Dict[str, Any]]: + now = datetime.now(timezone.utc) + return { + "deal_001": {"title": "عقد صيانة المبنى", "stage": "negotiation", "value": 250000, + "last_activity": (now - timedelta(days=10)).isoformat()}, + "deal_002": {"title": "ترخيص برمجيات", "stage": "proposal", "value": 180000, + "last_activity": (now - timedelta(days=4)).isoformat()}, + "deal_003": {"title": "خدمات استشارية", "stage": "discovery", "value": 95000, + "last_activity": (now - timedelta(days=1)).isoformat()}, + "deal_004": {"title": "نظام ERP", "stage": "negotiation", "value": 500000, + "last_activity": (now - timedelta(days=8)).isoformat()}, + } + +_DAY_AR = {0: "الإثنين", 1: "الثلاثاء", 2: "الأربعاء", 3: "الخميس", + 4: "الجمعة", 5: "السبت", 6: "الأحد"} class BehaviorIntelligence: - """ - Detects behavioural patterns across reps, sequences, contact timing, - and deal health. All analysis is read-only (watch mode). - """ - - # ── Rep Performance ─────────────────────────────────────── + """Detects behavioural patterns across reps, sequences, timing, and deal health.""" async def analyze_rep_performance(self, tenant_id: str) -> List[TrackedPattern]: - """Find top-performing reps and what differentiates them.""" - data = _get_data(tenant_id) + d = _data(tenant_id) + if not d.rep_stats: + d.rep_stats = _sample_reps() patterns: List[TrackedPattern] = [] - - if not data.rep_stats: - # Generate representative sample when no data yet - data.rep_stats = _sample_rep_stats() - - reps = data.rep_stats - if not reps: - return patterns - - # Find top closer + reps = d.rep_stats by_close = sorted(reps.items(), key=lambda r: r[1].get("close_rate", 0), reverse=True) if by_close: - top_id, top = by_close[0] - cr = top.get("close_rate", 0) - avg_resp = top.get("avg_response_min", 0) - name = top.get("name", top_id[:8]) + rid, top = by_close[0] + n = top.get("name", rid[:8]) + cr = top["close_rate"] + ar = top.get("avg_response_min", 0) patterns.append(TrackedPattern( - tenant_id=tenant_id, - pattern_type="high_conversion_rep", - description=f"Rep {name} closes at {cr:.0%} with avg response {avg_resp}min", - description_ar=f"{name} يغلق بنسبة {cr:.0%} مع متوسط استجابة {avg_resp} دقيقة", - confidence=min(1.0, cr + 0.1), - frequency=top.get("deals_closed", 1), - entities_involved=[top_id], - suggested_action=f"Replicate {name}'s follow-up cadence across the team", - suggested_action_ar=f"انسخ نمط متابعة {name} لبقية الفريق", - )) - - # Find fastest closer + tenant_id=tenant_id, pattern_type="high_conversion_rep", + description=f"Rep {n} closes at {cr:.0%} with avg response {ar}min", + description_ar=f"{n} يغلق بنسبة {cr:.0%} مع متوسط استجابة {ar} دقيقة", + confidence=min(1.0, cr + 0.1), frequency=top.get("deals_closed", 1), + entities_involved=[rid], + suggested_action=f"Replicate {n}'s cadence across team", + suggested_action_ar=f"انسخ نمط متابعة {n} لبقية الفريق")) by_speed = sorted(reps.items(), key=lambda r: r[1].get("avg_days_to_close", 999)) if by_speed: - fast_id, fast = by_speed[0] + rid, fast = by_speed[0] + n = fast.get("name", rid[:8]) days = fast.get("avg_days_to_close", 0) - name = fast.get("name", fast_id[:8]) patterns.append(TrackedPattern( - tenant_id=tenant_id, - pattern_type="fast_close", - description=f"{name} avg close in {days} days", - description_ar=f"{name} يغلق الصفقات في متوسط {days} أيام", - confidence=0.75, - frequency=fast.get("deals_closed", 1), - entities_involved=[fast_id], - suggested_action=f"Study {name}'s discovery call technique", - suggested_action_ar=f"ادرس أسلوب {name} في مكالمة الاستكشاف", - )) - - # Find slow responder (coaching opportunity) + tenant_id=tenant_id, pattern_type="fast_close", + description=f"{n} avg close in {days} days", + description_ar=f"{n} يغلق الصفقات في متوسط {days} أيام", + confidence=0.75, frequency=fast.get("deals_closed", 1), + entities_involved=[rid], + suggested_action=f"Study {n}'s discovery call technique", + suggested_action_ar=f"ادرس أسلوب {n} في مكالمة الاستكشاف")) by_resp = sorted(reps.items(), key=lambda r: r[1].get("avg_response_min", 0), reverse=True) if by_resp: - slow_id, slow = by_resp[0] - avg_r = slow.get("avg_response_min", 0) - if avg_r > 60: - name = slow.get("name", slow_id[:8]) + rid, slow = by_resp[0] + ar = slow.get("avg_response_min", 0) + if ar > 60: + n = slow.get("name", rid[:8]) patterns.append(TrackedPattern( - tenant_id=tenant_id, - pattern_type="slow_responder", - description=f"{name} avg response {avg_r}min — above 60min threshold", - description_ar=f"{name} متوسط استجابة {avg_r} دقيقة — أعلى من الحد المقبول", - confidence=0.80, - frequency=1, - entities_involved=[slow_id], - suggested_action=f"Coach {name} on response time; set mobile alerts", - suggested_action_ar=f"درّب {name} على سرعة الاستجابة وفعّل التنبيهات", - )) - + tenant_id=tenant_id, pattern_type="slow_responder", + description=f"{n} avg response {ar}min — above threshold", + description_ar=f"{n} متوسط استجابة {ar} دقيقة — أعلى من الحد المقبول", + confidence=0.80, entities_involved=[rid], + suggested_action=f"Coach {n} on response time", + suggested_action_ar=f"درّب {n} على سرعة الاستجابة وفعّل التنبيهات")) return patterns - # ── Winning Sequences ───────────────────────────────────── - async def analyze_winning_sequences(self, tenant_id: str) -> List[TrackedPattern]: - """Identify sequence templates with highest conversion rates.""" - data = _get_data(tenant_id) + d = _data(tenant_id) + if not d.sequence_stats: + d.sequence_stats = _sample_seqs() patterns: List[TrackedPattern] = [] - - if not data.sequence_stats: - data.sequence_stats = _sample_sequence_stats() - - seqs = data.sequence_stats - by_conv = sorted(seqs.items(), key=lambda s: s[1].get("conversion_rate", 0), reverse=True) - - for seq_id, stats in by_conv[:3]: - name = stats.get("name", seq_id[:8]) - name_ar = stats.get("name_ar", name) - cr = stats.get("conversion_rate", 0) - enrolled = stats.get("enrolled", 0) - + by_conv = sorted(d.sequence_stats.items(), key=lambda s: s[1].get("conversion_rate", 0), reverse=True) + for sid, st in by_conv[:3]: + n = st.get("name", sid[:8]) + nar = st.get("name_ar", n) + cr = st.get("conversion_rate", 0) + enr = st.get("enrolled", 0) patterns.append(TrackedPattern( - tenant_id=tenant_id, - pattern_type="winning_sequence", - description=f"Sequence '{name}' converts at {cr:.0%} ({enrolled} enrolled)", - description_ar=f"تسلسل '{name_ar}' يحقق تحويل {cr:.0%} ({enrolled} مسجل)", - confidence=min(1.0, 0.5 + cr), - frequency=enrolled, - entities_involved=[seq_id], - suggested_action=f"Use '{name}' as default for similar leads", - suggested_action_ar=f"استخدم '{name_ar}' كتسلسل افتراضي للعملاء المشابهين", - )) - - # Compare top vs average + tenant_id=tenant_id, pattern_type="winning_sequence", + description=f"Sequence '{n}' converts at {cr:.0%} ({enr} enrolled)", + description_ar=f"تسلسل '{nar}' يحقق تحويل {cr:.0%} ({enr} مسجل)", + confidence=min(1.0, 0.5 + cr), frequency=enr, entities_involved=[sid], + suggested_action=f"Use '{n}' as default for similar leads", + suggested_action_ar=f"استخدم '{nar}' كتسلسل افتراضي للعملاء المشابهين")) if len(by_conv) >= 2: top_cr = by_conv[0][1].get("conversion_rate", 0) avg_cr = sum(s.get("conversion_rate", 0) for _, s in by_conv) / len(by_conv) if avg_cr > 0: - multiplier = round(top_cr / avg_cr, 1) - top_name_ar = by_conv[0][1].get("name_ar", by_conv[0][0][:8]) + mult = round(top_cr / avg_cr, 1) + nar = by_conv[0][1].get("name_ar", by_conv[0][0][:8]) patterns.append(TrackedPattern( - tenant_id=tenant_id, - pattern_type="winning_sequence", - description=f"Top sequence outperforms average by {multiplier}x", - description_ar=f"تسلسل '{top_name_ar}' يحقق {multiplier}x تحويل مقارنة بالمتوسط", - confidence=0.85, - frequency=1, - entities_involved=[by_conv[0][0]], - suggested_action="Migrate underperforming sequences to the top template", - suggested_action_ar="انقل التسلسلات الضعيفة إلى القالب الأفضل", - )) - + tenant_id=tenant_id, pattern_type="winning_sequence", + description=f"Top sequence outperforms average by {mult}x", + description_ar=f"تسلسل '{nar}' يحقق {mult}x تحويل مقارنة بالمتوسط", + confidence=0.85, entities_involved=[by_conv[0][0]], + suggested_action="Migrate underperforming sequences to top template", + suggested_action_ar="انقل التسلسلات الضعيفة إلى القالب الأفضل")) return patterns - # ── Best Contact Times ──────────────────────────────────── - async def analyze_best_contact_times(self, tenant_id: str) -> Dict[str, Any]: - """When do leads respond most? Returns hour/day heat map.""" - data = _get_data(tenant_id) - - if not data.hourly_responses: - # Seed typical Saudi business patterns + d = _data(tenant_id) + if not d.hourly_responses: for h in range(24): - if 9 <= h <= 12: - data.hourly_responses[h] = 35 + (h - 9) * 5 - elif 16 <= h <= 20: - data.hourly_responses[h] = 40 + (20 - h) * 3 - elif 13 <= h <= 15: - data.hourly_responses[h] = 15 - else: - data.hourly_responses[h] = 5 - - if not data.daily_responses: - # Sunday-Thursday work week in KSA - data.daily_responses = { - 0: 30, 1: 25, 2: 35, 3: 20, 4: 15, # Mon-Fri - 5: 5, 6: 40, # Sat, Sun — Sun is work day in KSA - } - - hourly = dict(data.hourly_responses) - daily = dict(data.daily_responses) - - best_hour = max(hourly, key=hourly.get) # type: ignore[arg-type] - best_day = max(daily, key=daily.get) # type: ignore[arg-type] - - day_names_ar = { - 0: "الإثنين", 1: "الثلاثاء", 2: "الأربعاء", - 3: "الخميس", 4: "الجمعة", 5: "السبت", 6: "الأحد", - } - - period = "صباحا" if best_hour < 12 else "مساء" - display_hour = best_hour if best_hour <= 12 else best_hour - 12 - + if 9 <= h <= 12: d.hourly_responses[h] = 35 + (h - 9) * 5 + elif 16 <= h <= 20: d.hourly_responses[h] = 40 + (20 - h) * 3 + elif 13 <= h <= 15: d.hourly_responses[h] = 15 + else: d.hourly_responses[h] = 5 + if not d.daily_responses: + d.daily_responses = {0: 30, 1: 25, 2: 35, 3: 20, 4: 15, 5: 5, 6: 40} + bh = max(d.hourly_responses, key=d.hourly_responses.get) # type: ignore[arg-type] + bd = max(d.daily_responses, key=d.daily_responses.get) # type: ignore[arg-type] + period = "صباحا" if bh < 12 else "مساء" + dh = bh if bh <= 12 else bh - 12 return { - "tenant_id": tenant_id, - "best_hour": best_hour, - "best_hour_ar": f"{display_hour} {period}", - "best_day": best_day, - "best_day_ar": day_names_ar.get(best_day, ""), - "hourly_distribution": hourly, - "daily_distribution": {day_names_ar.get(d, str(d)): c for d, c in daily.items()}, - "recommendation_ar": ( - f"أفضل وقت للتواصل: {day_names_ar.get(best_day, '')} الساعة {display_hour} {period}" - ), + "tenant_id": tenant_id, "best_hour": bh, "best_hour_ar": f"{dh} {period}", + "best_day": bd, "best_day_ar": _DAY_AR.get(bd, ""), + "hourly_distribution": dict(d.hourly_responses), + "daily_distribution": {_DAY_AR.get(k, str(k)): v for k, v in d.daily_responses.items()}, + "recommendation_ar": f"أفضل وقت للتواصل: {_DAY_AR.get(bd, '')} الساعة {dh} {period}", } - # ── At-Risk Detection ───────────────────────────────────── - async def detect_at_risk_patterns(self, tenant_id: str) -> List[TrackedPattern]: - """Find deals going cold or leads losing interest.""" - data = _get_data(tenant_id) - patterns: List[TrackedPattern] = [] - - if not data.deal_stats: - data.deal_stats = _sample_deal_stats() - + d = _data(tenant_id) + if not d.deal_stats: + d.deal_stats = _sample_deals() now = datetime.now(timezone.utc) - - for deal_id, stats in data.deal_stats.items(): - title = stats.get("title", deal_id[:8]) - last_activity_str = stats.get("last_activity") - stage = stats.get("stage", "") - - if stage in ("closed_won", "closed_lost"): + patterns: List[TrackedPattern] = [] + for did, st in d.deal_stats.items(): + if st.get("stage") in ("closed_won", "closed_lost"): continue - - if last_activity_str: - try: - last_dt = datetime.fromisoformat(last_activity_str) - except (ValueError, TypeError): - last_dt = now - timedelta(days=3) - else: + title = st.get("title", did[:8]) + try: + last_dt = datetime.fromisoformat(st.get("last_activity", "")) + except (ValueError, TypeError): last_dt = now - timedelta(days=5) - - days_idle = (now - last_dt).days - - if days_idle >= 7: - confidence = min(1.0, 0.5 + days_idle * 0.05) + idle = (now - last_dt).days + if idle >= 7: patterns.append(TrackedPattern( - tenant_id=tenant_id, - pattern_type="at_risk_deal", - description=f"Deal '{title}' idle for {days_idle} days", - description_ar=f"صفقة '{title}' بدون نشاط منذ {days_idle} أيام", - confidence=confidence, - frequency=1, - entities_involved=[deal_id], - suggested_action=f"Re-engage on deal '{title}' immediately", - suggested_action_ar=f"أعد التواصل بخصوص صفقة '{title}' فورا", - )) - elif days_idle >= 3: + tenant_id=tenant_id, pattern_type="at_risk_deal", + description=f"Deal '{title}' idle for {idle} days", + description_ar=f"صفقة '{title}' بدون نشاط منذ {idle} أيام", + confidence=min(1.0, 0.5 + idle * 0.05), entities_involved=[did], + suggested_action=f"Re-engage on '{title}' immediately", + suggested_action_ar=f"أعد التواصل بخصوص صفقة '{title}' فورا")) + elif idle >= 3: patterns.append(TrackedPattern( - tenant_id=tenant_id, - pattern_type="cooling_deal", - description=f"Deal '{title}' cooling — {days_idle} days since last touch", - description_ar=f"صفقة '{title}' تبرد — {days_idle} أيام منذ آخر تواصل", - confidence=0.55, - frequency=1, - entities_involved=[deal_id], - suggested_action=f"Schedule follow-up for deal '{title}'", - suggested_action_ar=f"جدول متابعة لصفقة '{title}'", - )) - + tenant_id=tenant_id, pattern_type="cooling_deal", + description=f"Deal '{title}' cooling — {idle} days since last touch", + description_ar=f"صفقة '{title}' تبرد — {idle} أيام منذ آخر تواصل", + confidence=0.55, entities_involved=[did], + suggested_action=f"Schedule follow-up for '{title}'", + suggested_action_ar=f"جدول متابعة لصفقة '{title}'")) return patterns - # ── Recommendations ─────────────────────────────────────── - async def get_recommendations(self, tenant_id: str) -> List[Dict[str, Any]]: - """Generate Arabic recommendations from all detected patterns.""" - rep_patterns = await self.analyze_rep_performance(tenant_id) - seq_patterns = await self.analyze_winning_sequences(tenant_id) - time_analysis = await self.analyze_best_contact_times(tenant_id) - risk_patterns = await self.detect_at_risk_patterns(tenant_id) - - recommendations: List[Recommendation] = [] - - # From rep patterns - for p in rep_patterns: + reps = await self.analyze_rep_performance(tenant_id) + seqs = await self.analyze_winning_sequences(tenant_id) + timing = await self.analyze_best_contact_times(tenant_id) + risks = await self.detect_at_risk_patterns(tenant_id) + recs: List[Recommendation] = [] + for p in reps: if p.pattern_type == "high_conversion_rep": - recommendations.append(Recommendation( - tenant_id=tenant_id, - category="performance", + recs.append(Recommendation(tenant_id=tenant_id, category="performance", title_ar="نمط إغلاق ناجح", - detail_ar=p.description_ar + " — " + p.suggested_action_ar, - impact="high", - confidence=p.confidence, - source_patterns=[p.id], - )) + detail_ar=f"{p.description_ar} — {p.suggested_action_ar}", + impact="high", confidence=p.confidence, source_patterns=[p.id])) elif p.pattern_type == "slow_responder": - recommendations.append(Recommendation( - tenant_id=tenant_id, - category="performance", + recs.append(Recommendation(tenant_id=tenant_id, category="performance", title_ar="فرصة تحسين سرعة الاستجابة", - detail_ar=p.description_ar + " — " + p.suggested_action_ar, - impact="medium", - confidence=p.confidence, - source_patterns=[p.id], - )) - - # From sequence patterns - for p in seq_patterns: - recommendations.append(Recommendation( - tenant_id=tenant_id, - category="sequence", - title_ar="تسلسل عالي الأداء", - detail_ar=p.description_ar, + detail_ar=f"{p.description_ar} — {p.suggested_action_ar}", + impact="medium", confidence=p.confidence, source_patterns=[p.id])) + for p in seqs: + recs.append(Recommendation(tenant_id=tenant_id, category="sequence", + title_ar="تسلسل عالي الأداء", detail_ar=p.description_ar, impact="high" if p.confidence > 0.7 else "medium", - confidence=p.confidence, - source_patterns=[p.id], - )) - - # From timing - if time_analysis.get("recommendation_ar"): - recommendations.append(Recommendation( - tenant_id=tenant_id, - category="timing", - title_ar="أفضل وقت للتواصل", - detail_ar=time_analysis["recommendation_ar"], - impact="medium", - confidence=0.80, - )) - - # From risk patterns - critical_risks = [p for p in risk_patterns if p.pattern_type == "at_risk_deal"] - if critical_risks: - names = ", ".join(p.entities_involved[0][:8] for p in critical_risks[:5]) - recommendations.append(Recommendation( - tenant_id=tenant_id, - category="risk", + confidence=p.confidence, source_patterns=[p.id])) + if timing.get("recommendation_ar"): + recs.append(Recommendation(tenant_id=tenant_id, category="timing", + title_ar="أفضل وقت للتواصل", detail_ar=timing["recommendation_ar"], + impact="medium", confidence=0.80)) + crit = [p for p in risks if p.pattern_type == "at_risk_deal"] + if crit: + ids = ", ".join(p.entities_involved[0][:8] for p in crit[:5]) + recs.append(Recommendation(tenant_id=tenant_id, category="risk", title_ar="صفقات معرضة للخطر", - detail_ar=f"{len(critical_risks)} صفقات بدون نشاط لأكثر من أسبوع: {names}", - impact="high", - confidence=0.85, - source_patterns=[p.id for p in critical_risks[:5]], - )) + detail_ar=f"{len(crit)} صفقات بدون نشاط لأكثر من أسبوع: {ids}", + impact="high", confidence=0.85, + source_patterns=[p.id for p in crit[:5]])) + return [r.model_dump() for r in recs] - return [r.model_dump() for r in recommendations] - - -# --------------------------------------------------------------------------- -# Sample data generators (used when no real data exists yet) -# --------------------------------------------------------------------------- - - -def _sample_rep_stats() -> Dict[str, Dict[str, Any]]: - return { - "rep_001": { - "name": "أحمد", "close_rate": 0.42, "avg_response_min": 18, - "avg_days_to_close": 12, "deals_closed": 28, "follow_ups_per_deal": 4.2, - }, - "rep_002": { - "name": "سارة", "close_rate": 0.35, "avg_response_min": 25, - "avg_days_to_close": 15, "deals_closed": 22, "follow_ups_per_deal": 3.1, - }, - "rep_003": { - "name": "خالد", "close_rate": 0.28, "avg_response_min": 75, - "avg_days_to_close": 22, "deals_closed": 15, "follow_ups_per_deal": 2.0, - }, - } - - -def _sample_sequence_stats() -> Dict[str, Dict[str, Any]]: - return { - "seq_001": { - "name": "VIP Real Estate", "name_ar": "عقارات VIP", - "conversion_rate": 0.38, "enrolled": 120, "avg_steps": 4, - }, - "seq_002": { - "name": "Tech Startup Outreach", "name_ar": "تواصل الشركات الناشئة", - "conversion_rate": 0.25, "enrolled": 85, "avg_steps": 5, - }, - "seq_003": { - "name": "Standard Follow-up", "name_ar": "المتابعة العادية", - "conversion_rate": 0.12, "enrolled": 200, "avg_steps": 6, - }, - } - - -def _sample_deal_stats() -> Dict[str, Dict[str, Any]]: - now = datetime.now(timezone.utc) - return { - "deal_001": { - "title": "عقد صيانة المبنى", - "stage": "negotiation", - "value": 250000, - "last_activity": (now - timedelta(days=10)).isoformat(), - }, - "deal_002": { - "title": "ترخيص برمجيات", - "stage": "proposal", - "value": 180000, - "last_activity": (now - timedelta(days=4)).isoformat(), - }, - "deal_003": { - "title": "خدمات استشارية", - "stage": "discovery", - "value": 95000, - "last_activity": (now - timedelta(days=1)).isoformat(), - }, - "deal_004": { - "title": "نظام ERP", - "stage": "negotiation", - "value": 500000, - "last_activity": (now - timedelta(days=8)).isoformat(), - }, - } - - -# --------------------------------------------------------------------------- -# Module-level singleton -# --------------------------------------------------------------------------- _instance: Optional[BehaviorIntelligence] = None - def get_behavior_intelligence() -> BehaviorIntelligence: global _instance if _instance is None: diff --git a/salesflow-saas/backend/app/services/escalation.py b/salesflow-saas/backend/app/services/escalation.py index 3a8c6288..408f0fc8 100644 --- a/salesflow-saas/backend/app/services/escalation.py +++ b/salesflow-saas/backend/app/services/escalation.py @@ -1,11 +1,6 @@ """ Escalation Service — Dealix AI Revenue OS -============================================ نظام التصعيد: إدارة حلقة الإنسان في العملية (Human-in-the-Loop). -- إنشاء حزم تصعيد مع سياق كامل -- تعيين ومتابعة وحل التصعيدات -- قواعد تصعيد تلقائية لحالات محددة -- استئناف سير العمل بعد الحل """ from __future__ import annotations @@ -21,8 +16,6 @@ from pydantic import BaseModel, Field logger = logging.getLogger(__name__) -# ── Enums ─────────────────────────────────────────────────────────── - class EscalationReason(str, Enum): VALIDATION_FAILURE = "validation_failure" MISSING_DATA = "missing_data" @@ -50,8 +43,6 @@ class EscalationStatus(str, Enum): EXPIRED = "expired" -# ── Arabic labels ─────────────────────────────────────────────────── - _REASON_AR: dict[EscalationReason, str] = { EscalationReason.VALIDATION_FAILURE: "فشل التحقق من البيانات", EscalationReason.MISSING_DATA: "بيانات مفقودة", @@ -65,15 +56,6 @@ _REASON_AR: dict[EscalationReason, str] = { EscalationReason.DELIVERY_FAILURE: "فشل متكرر في التوصيل", } -_PRIORITY_AR: dict[EscalationPriority, str] = { - EscalationPriority.CRITICAL: "حرج", - EscalationPriority.HIGH: "عالي", - EscalationPriority.MEDIUM: "متوسط", - EscalationPriority.LOW: "منخفض", -} - - -# ── Models ────────────────────────────────────────────────────────── class EscalationArtifact(BaseModel): type: str = "text" @@ -94,9 +76,7 @@ class EscalationPacket(BaseModel): reason: EscalationReason missing_data: list[str] = [] priority: EscalationPriority = EscalationPriority.MEDIUM - due_at: datetime = Field( - default_factory=lambda: datetime.now(timezone.utc) + timedelta(hours=4) - ) + due_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc) + timedelta(hours=4)) risk_if_delayed: str = "" risk_if_delayed_ar: str = "" artifacts: list[EscalationArtifact] = [] @@ -129,8 +109,6 @@ class EscalationStats(BaseModel): overdue_count: int = 0 -# ── Auto-escalation rules ────────────────────────────────────────── - class AutoEscalationRule(BaseModel): id: str name_ar: str @@ -143,65 +121,40 @@ class AutoEscalationRule(BaseModel): DEFAULT_RULES: list[AutoEscalationRule] = [ AutoEscalationRule( - id="rule_high_value_deal", - name_ar="صفقة تتجاوز 100 ألف ريال", - condition="deal_value_sar > 100000", - priority=EscalationPriority.HIGH, - target_role="manager", - reason=EscalationReason.HIGH_VALUE_DEAL, - suggested_action_ar="مراجعة الصفقة والموافقة على استراتيجية التفاوض", - ), + id="rule_high_value_deal", name_ar="صفقة تتجاوز 100 ألف ريال", + condition="deal_value_sar > 100000", priority=EscalationPriority.HIGH, + target_role="manager", reason=EscalationReason.HIGH_VALUE_DEAL, + suggested_action_ar="مراجعة الصفقة والموافقة على استراتيجية التفاوض"), AutoEscalationRule( - id="rule_no_response_5d", - name_ar="عدم رد لأكثر من 5 أيام", - condition="days_since_last_response > 5", - priority=EscalationPriority.MEDIUM, - target_role="assigned_rep", - reason=EscalationReason.TIMEOUT, - suggested_action_ar="الاتصال بالعميل عبر قناة بديلة أو تصعيد للمدير", - ), + id="rule_no_response_5d", name_ar="عدم رد لأكثر من 5 أيام", + condition="days_since_last_response > 5", priority=EscalationPriority.MEDIUM, + target_role="assigned_rep", reason=EscalationReason.TIMEOUT, + suggested_action_ar="الاتصال بالعميل عبر قناة بديلة أو تصعيد للمدير"), AutoEscalationRule( - id="rule_low_confidence", - name_ar="ثقة ذكاء اصطناعي منخفضة", - condition="ai_confidence < 0.3", - priority=EscalationPriority.HIGH, - target_role="human_reviewer", - reason=EscalationReason.LOW_CONFIDENCE, - suggested_action_ar="مراجعة يدوية للقرار — الذكاء الاصطناعي غير واثق من النتيجة", - ), + id="rule_low_confidence", name_ar="ثقة ذكاء اصطناعي منخفضة", + condition="ai_confidence < 0.3", priority=EscalationPriority.HIGH, + target_role="human_reviewer", reason=EscalationReason.LOW_CONFIDENCE, + suggested_action_ar="مراجعة يدوية للقرار — الذكاء الاصطناعي غير واثق من النتيجة"), AutoEscalationRule( - id="rule_consent_expired", - name_ar="انتهاء موافقة PDPL", - condition="consent_expired == true", - priority=EscalationPriority.CRITICAL, - target_role="compliance", - reason=EscalationReason.CONSENT_EXPIRED, - suggested_action_ar="إيقاف جميع الاتصالات فوراً وطلب تجديد الموافقة", - ), + id="rule_consent_expired", name_ar="انتهاء موافقة PDPL", + condition="consent_expired == true", priority=EscalationPriority.CRITICAL, + target_role="compliance", reason=EscalationReason.CONSENT_EXPIRED, + suggested_action_ar="إيقاف جميع الاتصالات فوراً وطلب تجديد الموافقة"), AutoEscalationRule( - id="rule_delivery_failed_3x", - name_ar="فشل التوصيل 3 مرات متتالية", - condition="delivery_failures >= 3", - priority=EscalationPriority.MEDIUM, - target_role="assigned_rep", - reason=EscalationReason.DELIVERY_FAILURE, - suggested_action_ar="التحقق من رقم العميل واستخدام قناة بديلة (بريد إلكتروني أو SMS)", - ), + id="rule_delivery_failed_3x", name_ar="فشل التوصيل 3 مرات", + condition="delivery_failures >= 3", priority=EscalationPriority.MEDIUM, + target_role="assigned_rep", reason=EscalationReason.DELIVERY_FAILURE, + suggested_action_ar="التحقق من رقم العميل واستخدام قناة بديلة (بريد أو SMS)"), ] - -# ── Workflow resume registry ──────────────────────────────────────── - _workflow_resume_handlers: dict[str, Any] = {} def register_workflow_resume(workflow_name: str, handler: Any) -> None: _workflow_resume_handlers[workflow_name] = handler - logger.info("تسجيل معالج استئناف لسير العمل: %s", workflow_name) + logger.info("تسجيل معالج استئناف: %s", workflow_name) -# ── Escalation Service ────────────────────────────────────────────── - class EscalationService: """Manages human-in-the-loop escalation packets.""" @@ -218,129 +171,82 @@ class EscalationService: if not packet.resume_token: packet.resume_token = str(uuid.uuid4()) self._store[packet.id] = packet - logger.info( - "[Escalation] إنشاء تصعيد id=%s priority=%s reason=%s entity=%s/%s tenant=%s", - packet.id, packet.priority.value, packet.reason.value, - packet.entity_type, packet.entity_id, packet.tenant_id, - ) + logger.info("[Escalation] إنشاء id=%s priority=%s reason=%s entity=%s/%s", + packet.id, packet.priority.value, packet.reason.value, + packet.entity_type, packet.entity_id) return packet async def assign(self, escalation_id: str, user_id: str) -> Optional[EscalationPacket]: - packet = self._store.get(escalation_id) - if not packet: - logger.warning("[Escalation] تصعيد غير موجود: %s", escalation_id) + p = self._store.get(escalation_id) + if not p: return None - if packet.status == EscalationStatus.RESOLVED: - logger.warning("[Escalation] محاولة تعيين تصعيد محلول: %s", escalation_id) - return packet - packet.assigned_to = user_id - packet.status = EscalationStatus.IN_PROGRESS + if p.status == EscalationStatus.RESOLVED: + return p + p.assigned_to = user_id + p.status = EscalationStatus.IN_PROGRESS logger.info("[Escalation] تعيين %s إلى %s", escalation_id, user_id) - return packet - - async def resolve( - self, - escalation_id: str, - resolution: ResolutionInput, - user_id: str, - ) -> Optional[EscalationPacket]: - packet = self._store.get(escalation_id) - if not packet: - logger.warning("[Escalation] تصعيد غير موجود: %s", escalation_id) - return None - if packet.status == EscalationStatus.RESOLVED: - return packet + return p + async def resolve(self, escalation_id: str, resolution: ResolutionInput, + user_id: str) -> Optional[EscalationPacket]: + p = self._store.get(escalation_id) + if not p or p.status == EscalationStatus.RESOLVED: + return p now = datetime.now(timezone.utc) - packet.status = EscalationStatus.RESOLVED - packet.resolved_at = now - packet.resolution_data = { - "action_taken": resolution.action_taken, - "override_data": resolution.override_data, - "resolved_by": user_id, - } - packet.resolution_notes = resolution.notes - - self._history.append(packet) + p.status = EscalationStatus.RESOLVED + p.resolved_at = now + p.resolution_data = {"action_taken": resolution.action_taken, + "override_data": resolution.override_data, "resolved_by": user_id} + p.resolution_notes = resolution.notes + self._history.append(p) if len(self._history) > self._max_history: self._history = self._history[-self._max_history:] - - logger.info( - "[Escalation] حل تصعيد id=%s by=%s dur=%s", - escalation_id, user_id, - str(now - packet.created_at) if packet.created_at else "N/A", - ) - + logger.info("[Escalation] حل id=%s by=%s dur=%s", + escalation_id, user_id, str(now - p.created_at)) if resolution.resume_workflow: - await self._try_resume_workflow(packet) - - return packet + await self._try_resume(p) + return p async def resume_workflow(self, escalation_id: str) -> dict[str, Any]: - packet = self._store.get(escalation_id) - if not packet: + p = self._store.get(escalation_id) + if not p: return {"success": False, "error": "تصعيد غير موجود"} - if packet.status != EscalationStatus.RESOLVED: + if p.status != EscalationStatus.RESOLVED: return {"success": False, "error": "التصعيد لم يُحل بعد"} - return await self._try_resume_workflow(packet) + return await self._try_resume(p) - async def _try_resume_workflow(self, packet: EscalationPacket) -> dict[str, Any]: - handler = _workflow_resume_handlers.get(packet.workflow_name) + async def _try_resume(self, p: EscalationPacket) -> dict[str, Any]: + handler = _workflow_resume_handlers.get(p.workflow_name) if not handler: - logger.info( - "[Escalation] لا يوجد معالج استئناف لسير العمل: %s", - packet.workflow_name, - ) - return { - "success": False, - "error": f"لا يوجد معالج استئناف لـ {packet.workflow_name}", - } + return {"success": False, "error": f"لا يوجد معالج استئناف لـ {p.workflow_name}"} try: - result = await handler( - resume_token=packet.resume_token, - entity_type=packet.entity_type, - entity_id=packet.entity_id, - resolution_data=packet.resolution_data, - ) - logger.info( - "[Escalation] استئناف سير العمل %s للتصعيد %s", - packet.workflow_name, packet.id, - ) + result = await handler(resume_token=p.resume_token, entity_type=p.entity_type, + entity_id=p.entity_id, resolution_data=p.resolution_data) + logger.info("[Escalation] استئناف %s للتصعيد %s", p.workflow_name, p.id) return {"success": True, "result": result} except Exception as exc: - logger.exception("[Escalation] فشل استئناف سير العمل: %s", exc) + logger.exception("[Escalation] فشل استئناف: %s", exc) return {"success": False, "error": str(exc)} async def expire_overdue(self, tenant_id: str) -> int: - now = datetime.now(timezone.utc) - count = 0 - for packet in self._store.values(): - if ( - packet.tenant_id == tenant_id - and packet.status in (EscalationStatus.PENDING, EscalationStatus.IN_PROGRESS) - and packet.due_at < now - ): - packet.status = EscalationStatus.EXPIRED + now, count = datetime.now(timezone.utc), 0 + for p in self._store.values(): + if (p.tenant_id == tenant_id + and p.status in (EscalationStatus.PENDING, EscalationStatus.IN_PROGRESS) + and p.due_at < now): + p.status = EscalationStatus.EXPIRED count += 1 - logger.info("[Escalation] انتهاء صلاحية تصعيد: %s", packet.id) return count - async def list_pending( - self, - tenant_id: str, - priority: Optional[EscalationPriority] = None, - ) -> list[EscalationPacket]: - results = [ - p for p in self._store.values() - if p.tenant_id == tenant_id and p.status in ( - EscalationStatus.PENDING, EscalationStatus.IN_PROGRESS, - ) - ] + async def list_pending(self, tenant_id: str, + priority: Optional[EscalationPriority] = None) -> list[EscalationPacket]: + results = [p for p in self._store.values() + if p.tenant_id == tenant_id + and p.status in (EscalationStatus.PENDING, EscalationStatus.IN_PROGRESS)] if priority: results = [p for p in results if p.priority == priority] - results.sort(key=lambda p: ( - list(EscalationPriority).index(p.priority), p.created_at, - )) + prio_order = list(EscalationPriority) + results.sort(key=lambda p: (prio_order.index(p.priority), p.created_at)) return results async def get(self, escalation_id: str) -> Optional[EscalationPacket]: @@ -348,58 +254,36 @@ class EscalationService: async def get_stats(self, tenant_id: str) -> EscalationStats: now = datetime.now(timezone.utc) - tenant_packets = [p for p in self._store.values() if p.tenant_id == tenant_id] - - by_priority: dict[str, int] = defaultdict(int) + packets = [p for p in self._store.values() if p.tenant_id == tenant_id] + by_prio: dict[str, int] = defaultdict(int) by_status: dict[str, int] = defaultdict(int) by_reason: dict[str, int] = defaultdict(int) - resolution_times: list[float] = [] - oldest_pending_hours = 0.0 - overdue = 0 - - for p in tenant_packets: - by_priority[p.priority.value] += 1 + res_times: list[float] = [] + oldest_h, overdue = 0.0, 0 + for p in packets: + by_prio[p.priority.value] += 1 by_status[p.status.value] += 1 by_reason[p.reason.value] += 1 - - if p.status == EscalationStatus.RESOLVED and p.resolved_at and p.created_at: - resolution_times.append( - (p.resolved_at - p.created_at).total_seconds() / 60.0 - ) - + if p.status == EscalationStatus.RESOLVED and p.resolved_at: + res_times.append((p.resolved_at - p.created_at).total_seconds() / 60.0) if p.status in (EscalationStatus.PENDING, EscalationStatus.IN_PROGRESS): age_h = (now - p.created_at).total_seconds() / 3600.0 - oldest_pending_hours = max(oldest_pending_hours, age_h) + oldest_h = max(oldest_h, age_h) if p.due_at < now: overdue += 1 - - # Include resolved history for this tenant for p in self._history: - if p.tenant_id == tenant_id and p.id not in self._store: - if p.resolved_at and p.created_at: - resolution_times.append( - (p.resolved_at - p.created_at).total_seconds() / 60.0 - ) - + if p.tenant_id == tenant_id and p.id not in self._store and p.resolved_at: + res_times.append((p.resolved_at - p.created_at).total_seconds() / 60.0) return EscalationStats( - total=len(tenant_packets), - by_priority=dict(by_priority), - by_status=dict(by_status), + total=len(packets), by_priority=dict(by_prio), by_status=dict(by_status), by_reason=dict(by_reason), - avg_resolution_minutes=( - sum(resolution_times) / len(resolution_times) if resolution_times else 0.0 - ), - oldest_pending_hours=round(oldest_pending_hours, 2), - overdue_count=overdue, - ) + avg_resolution_minutes=sum(res_times) / len(res_times) if res_times else 0.0, + oldest_pending_hours=round(oldest_h, 2), overdue_count=overdue) - async def check_auto_escalation( - self, - tenant_id: str, - context: dict[str, Any], - ) -> Optional[EscalationPacket]: + async def check_auto_escalation(self, tenant_id: str, + context: dict[str, Any]) -> Optional[EscalationPacket]: for rule in self._rules: - if self._evaluate_rule(rule, context): + if self._eval(rule, context): packet = EscalationPacket( tenant_id=tenant_id, title=f"Auto-escalation: {rule.id}", @@ -408,36 +292,29 @@ class EscalationService: entity_id=context.get("entity_id", ""), workflow_name=context.get("workflow_name", ""), failed_step=context.get("current_step", ""), - reason=rule.reason, - priority=rule.priority, + reason=rule.reason, priority=rule.priority, risk_if_delayed_ar=rule.suggested_action_ar, suggested_action=rule.suggested_action_ar, suggested_action_ar=rule.suggested_action_ar, confidence=context.get("confidence", 0.0), - artifacts=[EscalationArtifact( - type="context", name="auto_escalation_context", - content=str(context), - )], - ) - created = await self.create(packet) - logger.info( - "[Escalation] تصعيد تلقائي rule=%s entity=%s/%s", - rule.id, packet.entity_type, packet.entity_id, - ) - return created + artifacts=[EscalationArtifact(type="context", name="auto_context", + content=str(context))]) + logger.info("[Escalation] تصعيد تلقائي rule=%s entity=%s/%s", + rule.id, packet.entity_type, packet.entity_id) + return await self.create(packet) return None @staticmethod - def _evaluate_rule(rule: AutoEscalationRule, context: dict[str, Any]) -> bool: - cond = rule.condition - if "deal_value_sar > 100000" in cond: - return context.get("deal_value_sar", 0) > 100_000 - if "days_since_last_response > 5" in cond: - return context.get("days_since_last_response", 0) > 5 - if "ai_confidence < 0.3" in cond: - return context.get("confidence", 1.0) < 0.3 - if "consent_expired == true" in cond: - return context.get("consent_expired", False) is True - if "delivery_failures >= 3" in cond: - return context.get("delivery_failures", 0) >= 3 + def _eval(rule: AutoEscalationRule, ctx: dict[str, Any]) -> bool: + c = rule.condition + if "deal_value_sar > 100000" in c: + return ctx.get("deal_value_sar", 0) > 100_000 + if "days_since_last_response > 5" in c: + return ctx.get("days_since_last_response", 0) > 5 + if "ai_confidence < 0.3" in c: + return ctx.get("confidence", 1.0) < 0.3 + if "consent_expired == true" in c: + return ctx.get("consent_expired", False) is True + if "delivery_failures >= 3" in c: + return ctx.get("delivery_failures", 0) >= 3 return False diff --git a/salesflow-saas/backend/app/services/skill_registry.py b/salesflow-saas/backend/app/services/skill_registry.py index 3434871f..f4e3aaa0 100644 --- a/salesflow-saas/backend/app/services/skill_registry.py +++ b/salesflow-saas/backend/app/services/skill_registry.py @@ -1,17 +1,10 @@ -""" -Skill Registry + Runtime — Dealix AI Revenue OS -نظام المهارات: تسجيل وإدارة وتنفيذ مهارات CRM بشكل آمن. -""" +"""Skill Registry + Runtime — Dealix AI Revenue OS — نظام المهارات""" from __future__ import annotations -import asyncio -import logging -import os -import uuid +import asyncio, logging, os, uuid from datetime import datetime, timezone from enum import Enum from typing import Any, Callable, Coroutine, Optional - from pydantic import BaseModel, Field logger = logging.getLogger(__name__) @@ -56,7 +49,6 @@ class SkillDefinition(BaseModel): version: str = "1.0.0" model_config = {"arbitrary_types_allowed": True} - class UserContext(BaseModel): user_id: str tenant_id: str @@ -77,14 +69,12 @@ class SkillResult(BaseModel): duration_ms: Optional[int] = None approval_request_id: Optional[str] = None - class SkillHealthReport(BaseModel): skill_id: str healthy: bool checked_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) error: Optional[str] = None - class SkillRegistry: """Manages all registered domain skills.""" @@ -138,7 +128,6 @@ class SkillRegistry: def get_handler(self, skill_id: str) -> Optional[Callable]: return self._handlers.get(skill_id) - class SkillRuntime: """Executes skills safely with validation, logging, and approval gating.""" diff --git a/salesflow-saas/backend/requirements.txt b/salesflow-saas/backend/requirements.txt index 801e1ecf..8b69be6d 100644 --- a/salesflow-saas/backend/requirements.txt +++ b/salesflow-saas/backend/requirements.txt @@ -1,37 +1,85 @@ +# === Core Framework === fastapi==0.115.5 uvicorn[standard]==0.32.1 pydantic==2.9.2 pydantic-settings==2.6.1 +pydantic-extra-types[phonenumbers]>=2.0.0 # Saudi phone validation (+966) python-multipart==0.0.12 + +# === Database === sqlalchemy==2.0.36 asyncpg==0.30.0 psycopg2-binary==2.9.10 alembic==1.14.0 pgvector==0.3.6 + +# === AI / LLM Providers === +litellm>=1.40.0 # Unified LLM provider (Groq/OpenAI/Claude/Gemini) with fallback +instructor>=1.14.0 # Structured LLM outputs via Pydantic models groq==0.12.0 openai==1.57.0 langchain==0.3.9 langchain-groq==0.2.1 langchain-community==0.3.9 +langchain-anthropic==0.2.0 langgraph==0.2.53 +crewai==0.80.0 +mem0ai==0.1.18 + +# === Arabic NLP === +camel-tools>=1.5.0 # Arabic morphology, NER, dialect detection (NYU Abu Dhabi) +pyarabic>=0.6.15 # Arabic text normalization, diacritics removal + +# === WhatsApp Business API === +pywa>=3.0.0 # Direct WhatsApp Cloud API (async, webhooks, templates) +twilio==9.3.7 # Twilio fallback + +# === Communication === httpx==0.27.2 +resend>=2.0.0 # Transactional email API (free tier, FastAPI-native) + +# === Saudi-specific === +hijridate>=2.4.0 # Hijri-Gregorian calendar (Umm al-Qura, official Saudi) +phonenumbers>=8.13.0 # Saudi phone number validation and formatting + +# === PDF Generation (Arabic RTL) === +weasyprint>=60.0 # HTML/CSS to PDF with Arabic RTL support + +# === Security === +PyJWT[crypto]>=2.8.0 # JWT (replaces abandoned python-jose) +passlib[bcrypt]==1.7.4 +bcrypt>=4.0.1,<5 +slowapi>=0.1.9 # API rate limiting with Redis backend + +# === Caching & Performance === +redis==5.2.0 +fastapi-cache2>=0.2.1 # Response caching with Redis backend +celery-redbeat>=2.2.0 # Dynamic Celery Beat scheduler (Redis-backed) + +# === Monitoring & Logging === +sentry-sdk[fastapi]>=2.0.0 # Error tracking + performance monitoring +prometheus-fastapi-instrumentator>=7.0.0 # Prometheus metrics +structlog>=24.0.0 # Structured JSON logging with tenant context + +# === Testing === +pytest>=8.0.0 +pytest-asyncio>=0.23.0 # Async test support +pytest-cov>=5.0.0 # Coverage reporting +factory-boy>=3.3.0 # Test data factories for SQLAlchemy models + +# === Forecasting === +statsforecast>=1.7.0 # Fast statistical time-series forecasting + +# === Data & Utilities === beautifulsoup4==4.12.3 lxml==5.3.0 -twilio==9.3.7 requests==2.32.3 python-dateutil==2.9.0 pandas==2.2.3 numpy==2.1.3 -python-jose[cryptography]==3.3.0 -passlib[bcrypt]==1.7.4 -bcrypt>=4.0.1,<5 python-decouple==3.8 -redis==5.2.0 paramiko==3.5.0 qrcode==8.0 Pillow==11.0.0 xmltodict==0.14.2 email-validator>=2.1.0 -crewai==0.80.0 -mem0ai==0.1.18 -langchain-anthropic==0.2.0 diff --git a/salesflow-saas/memory/patterns/library-decisions.md b/salesflow-saas/memory/patterns/library-decisions.md new file mode 100644 index 00000000..9519d1d3 --- /dev/null +++ b/salesflow-saas/memory/patterns/library-decisions.md @@ -0,0 +1,64 @@ +# Library Decisions — Dealix AI Revenue OS + +**Type**: pattern +**Date**: 2026-04-11 +**Status**: active + +## Added Libraries (Priority Order) + +### Immediate (Security + Core) +| Library | Why | Replaces | +|---------|-----|----------| +| `PyJWT[crypto]` | Active JWT library | `python-jose` (abandoned 3+ years) | +| `litellm` | Unified LLM provider with auto-fallback | Manual Groq→OpenAI switching | +| `sentry-sdk[fastapi]` | Production error tracking | None (was missing) | +| `slowapi` | API rate limiting | None (was missing) | +| `pydantic-extra-types[phonenumbers]` | Saudi +966 phone validation | None | + +### Arabic & Saudi +| Library | Why | +|---------|-----| +| `camel-tools` | Best Arabic NLP (NYU Abu Dhabi) — morphology, NER, dialect detection | +| `pyarabic` | Arabic text normalization before NLP processing | +| `hijridate` | Official Umm al-Qura Hijri calendar for Saudi UX | +| `phonenumbers` | Format/validate Saudi mobile numbers for WhatsApp | + +### Communication +| Library | Why | +|---------|-----| +| `pywa` | Direct WhatsApp Cloud API (cheaper than Twilio per-message) | +| `resend` | Transactional email with free tier | +| `weasyprint` | Arabic RTL PDF generation for invoices/quotes | + +### Performance & Monitoring +| Library | Why | +|---------|-----| +| `fastapi-cache2` | Redis-backed response caching (90% DB load reduction) | +| `celery-redbeat` | Dynamic Celery scheduling from Redis (no restart needed) | +| `prometheus-fastapi-instrumentator` | Prometheus metrics for Grafana dashboards | +| `structlog` | JSON structured logging with tenant_id context | + +### AI Enhancement +| Library | Why | +|---------|-----| +| `instructor` | Extract structured Pydantic models from LLM outputs | +| `statsforecast` | Fast time-series forecasting (500x faster than Prophet) | + +### Testing +| Library | Why | +|---------|-----| +| `pytest-asyncio` | Test async FastAPI endpoints | +| `pytest-cov` | Coverage reporting | +| `factory-boy` | Test data factories for SQLAlchemy models | + +## Rejected Libraries +| Library | Why Rejected | +|---------|-------------| +| `prophet` | Heavy dependencies (PyStan), statsforecast is faster | +| `elasticsearch` | Too heavy for current scale, use pg_trgm then Meilisearch | +| `apscheduler` | Already have Celery, celery-redbeat is better fit | +| `fatoora` | Abandoned (2022), built our own ZATCA QR in invoice_generator.py | + +## Migration Notes +- **python-jose → PyJWT**: Minor API change. `jose.jwt.decode()` → `jwt.decode()`. Same RSA/HS256 support. +- **Manual LLM fallback → litellm**: Replace `services/llm/provider.py` logic with `litellm.completion()` + fallback list.