diff --git a/salesflow-saas/backend/app/services/ai/forecasting.py b/salesflow-saas/backend/app/services/ai/forecasting.py new file mode 100644 index 00000000..55bf28a3 --- /dev/null +++ b/salesflow-saas/backend/app/services/ai/forecasting.py @@ -0,0 +1,495 @@ +""" +Sales Forecasting Engine — Predicts revenue, calculates deal-close probability, +identifies at-risk deals, and generates Arabic forecast summaries. +""" + +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone, timedelta +from typing import Optional + +from sqlalchemy import select, func, and_ +from sqlalchemy.ext.asyncio import AsyncSession + +from app.services.llm.provider import get_llm + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Data models +# --------------------------------------------------------------------------- + +@dataclass +class DealForecast: + deal_id: str + deal_name: str + current_value: float + close_probability: float # 0.0-1.0 + weighted_value: float + risk_level: str # "low", "medium", "high" + risk_reasons_ar: list[str] = field(default_factory=list) + days_inactive: int = 0 + expected_close_date: Optional[str] = None + + +@dataclass +class PeriodForecast: + period_label: str # "2026-04", "Q2 2026" + predicted_revenue: float + weighted_pipeline: float + deal_count: int + avg_close_probability: float + best_case: float + worst_case: float + + +@dataclass +class ForecastResult: + tenant_id: str + period: str # "monthly" or "quarterly" + generated_at: str + periods: list[PeriodForecast] = field(default_factory=list) + at_risk_deals: list[DealForecast] = field(default_factory=list) + top_deals: list[DealForecast] = field(default_factory=list) + summary_ar: str = "" + summary_en: str = "" + total_pipeline_value: float = 0.0 + total_weighted_value: float = 0.0 + recommendations_ar: list[str] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +# Stage-based probability defaults +STAGE_PROBABILITIES = { + "new": 0.10, + "contacted": 0.15, + "qualified": 0.25, + "proposal_sent": 0.45, + "negotiation": 0.65, + "verbal_agreement": 0.80, + "contract_sent": 0.85, + "closed_won": 1.00, + "closed_lost": 0.00, +} + +# Inactivity thresholds (days) +RISK_THRESHOLDS = { + "high": 14, + "medium": 7, + "low": 3, +} + +MONTHS_AR = { + 1: "يناير", 2: "فبراير", 3: "مارس", 4: "أبريل", + 5: "مايو", 6: "يونيو", 7: "يوليو", 8: "أغسطس", + 9: "سبتمبر", 10: "أكتوبر", 11: "نوفمبر", 12: "ديسمبر", +} + + +# --------------------------------------------------------------------------- +# Service +# --------------------------------------------------------------------------- + +class SalesForecastingEngine: + """Predicts revenue and identifies at-risk deals.""" + + def __init__(self): + self._llm = get_llm() + + async def generate_forecast( + self, tenant_id: str, period: str, db: AsyncSession + ) -> ForecastResult: + """ + Generate sales forecast. + + Args: + tenant_id: Tenant UUID string + period: "monthly" or "quarterly" + db: Async database session + + Returns: + ForecastResult with predictions, at-risk deals, and summaries. + """ + now = datetime.now(timezone.utc) + + try: + deals_data = await self._fetch_deals(tenant_id, db) + except Exception as e: + logger.error(f"Failed to fetch deals for tenant {tenant_id}: {e}") + return ForecastResult( + tenant_id=tenant_id, + period=period, + generated_at=now.isoformat(), + summary_ar="تعذر إنشاء التوقعات — لم يتم العثور على بيانات الصفقات", + summary_en="Forecast generation failed - deal data not found", + ) + + # Calculate individual deal forecasts + deal_forecasts = self._calculate_deal_forecasts(deals_data, now) + + # Group by period + periods = self._group_by_period(deal_forecasts, period, now) + + # Identify at-risk and top deals + at_risk = sorted( + [d for d in deal_forecasts if d.risk_level in ("high", "medium")], + key=lambda d: d.days_inactive, + reverse=True, + )[:10] + + top_deals = sorted( + [d for d in deal_forecasts if d.close_probability > 0.0], + key=lambda d: d.weighted_value, + reverse=True, + )[:10] + + # Totals + total_pipeline = sum(d.current_value for d in deal_forecasts if d.close_probability > 0) + total_weighted = sum(d.weighted_value for d in deal_forecasts) + + # Generate summaries + summary_ar = self._build_summary_ar(periods, at_risk, total_pipeline, total_weighted, period) + summary_en = self._build_summary_en(periods, at_risk, total_pipeline, total_weighted, period) + recommendations = await self._generate_recommendations(periods, at_risk, total_pipeline) + + return ForecastResult( + tenant_id=tenant_id, + period=period, + generated_at=now.isoformat(), + periods=periods, + at_risk_deals=at_risk, + top_deals=top_deals, + summary_ar=summary_ar, + summary_en=summary_en, + total_pipeline_value=round(total_pipeline, 2), + total_weighted_value=round(total_weighted, 2), + recommendations_ar=recommendations, + ) + + # ── Data Fetching ──────────────────────────── + + async def _fetch_deals(self, tenant_id: str, db: AsyncSession) -> list[dict]: + """Fetch active deals for the tenant.""" + from app.models.deal import Deal + + stmt = ( + select(Deal) + .where( + Deal.tenant_id == tenant_id, + Deal.status.notin_(["closed_lost", "archived"]), + ) + .order_by(Deal.created_at.desc()) + ) + rows = await db.execute(stmt) + deals = [] + for deal in rows.scalars().all(): + last_activity = await self._get_last_activity_date(str(deal.id), db) + deals.append({ + "id": str(deal.id), + "name": getattr(deal, "name", "") or getattr(deal, "title", "") or "", + "value": float(getattr(deal, "value", 0) or getattr(deal, "amount", 0) or 0), + "stage": getattr(deal, "stage", "new") or "new", + "status": getattr(deal, "status", "active") or "active", + "expected_close": getattr(deal, "expected_close_date", None) or getattr(deal, "close_date", None), + "created_at": getattr(deal, "created_at", None), + "updated_at": getattr(deal, "updated_at", None), + "last_activity": last_activity, + }) + return deals + + async def _get_last_activity_date(self, deal_id: str, db: AsyncSession) -> Optional[datetime]: + """Get the most recent activity date for a deal.""" + try: + from app.models.activity import Activity + stmt = ( + select(func.max(Activity.created_at)) + .where(Activity.deal_id == deal_id) + ) + result = await db.execute(stmt) + return result.scalar_one_or_none() + except Exception: + return None + + # ── Deal Forecast Calculation ──────────────── + + def _calculate_deal_forecasts( + self, deals_data: list[dict], now: datetime + ) -> list[DealForecast]: + """Calculate forecast for each deal.""" + forecasts = [] + for deal in deals_data: + base_prob = STAGE_PROBABILITIES.get(deal["stage"], 0.15) + + # Adjust probability based on activity recency + days_inactive = self._days_since(deal.get("last_activity") or deal.get("updated_at"), now) + activity_modifier = self._activity_modifier(days_inactive) + adjusted_prob = max(0.0, min(1.0, base_prob * activity_modifier)) + + # Determine risk level + risk_level, risk_reasons = self._assess_risk(deal, days_inactive, adjusted_prob) + + value = deal.get("value", 0) or 0 + weighted = value * adjusted_prob + + expected_close = deal.get("expected_close") + expected_close_str = None + if expected_close: + if isinstance(expected_close, datetime): + expected_close_str = expected_close.strftime("%Y-%m-%d") + elif isinstance(expected_close, str): + expected_close_str = expected_close + + forecasts.append(DealForecast( + deal_id=deal["id"], + deal_name=deal.get("name", ""), + current_value=value, + close_probability=round(adjusted_prob, 2), + weighted_value=round(weighted, 2), + risk_level=risk_level, + risk_reasons_ar=risk_reasons, + days_inactive=days_inactive, + expected_close_date=expected_close_str, + )) + return forecasts + + @staticmethod + def _days_since(dt: Optional[datetime], now: datetime) -> int: + """Calculate days since a given datetime.""" + if not dt: + return 30 # assume inactive if no date + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + delta = now - dt + return max(0, delta.days) + + @staticmethod + def _activity_modifier(days_inactive: int) -> float: + """Reduce probability based on inactivity.""" + if days_inactive <= 2: + return 1.0 + elif days_inactive <= 5: + return 0.95 + elif days_inactive <= 10: + return 0.85 + elif days_inactive <= 14: + return 0.7 + elif days_inactive <= 21: + return 0.5 + elif days_inactive <= 30: + return 0.3 + else: + return 0.15 + + @staticmethod + def _assess_risk(deal: dict, days_inactive: int, probability: float) -> tuple[str, list[str]]: + """Assess deal risk level and generate Arabic reasons.""" + reasons = [] + + if days_inactive >= RISK_THRESHOLDS["high"]: + reasons.append(f"لا يوجد نشاط منذ {days_inactive} يوم") + elif days_inactive >= RISK_THRESHOLDS["medium"]: + reasons.append(f"النشاط منخفض — آخر تفاعل قبل {days_inactive} أيام") + + if probability < 0.2 and deal.get("value", 0) > 0: + reasons.append("احتمالية الإغلاق منخفضة جداً") + + stage = deal.get("stage", "") + expected_close = deal.get("expected_close") + if expected_close: + close_dt = expected_close + if isinstance(close_dt, str): + try: + close_dt = datetime.fromisoformat(close_dt.replace("Z", "+00:00")) + except ValueError: + close_dt = None + if close_dt: + if isinstance(close_dt, datetime): + if close_dt.tzinfo is None: + close_dt = close_dt.replace(tzinfo=timezone.utc) + now = datetime.now(timezone.utc) + if close_dt < now: + reasons.append("تجاوز تاريخ الإغلاق المتوقع") + elif (close_dt - now).days < 7 and stage in ("new", "contacted", "qualified"): + reasons.append("تاريخ الإغلاق قريب لكن المرحلة مبكرة") + + if len(reasons) >= 2 or days_inactive >= RISK_THRESHOLDS["high"]: + return "high", reasons + elif len(reasons) >= 1 or days_inactive >= RISK_THRESHOLDS["medium"]: + return "medium", reasons + else: + return "low", reasons + + # ── Period Grouping ────────────────────────── + + def _group_by_period( + self, forecasts: list[DealForecast], period: str, now: datetime + ) -> list[PeriodForecast]: + """Group deal forecasts into monthly or quarterly buckets.""" + periods: dict[str, list[DealForecast]] = {} + + for deal in forecasts: + if not deal.expected_close_date: + # Default to current month/quarter + period_key = self._get_period_key(now, period) + else: + try: + close_dt = datetime.fromisoformat(deal.expected_close_date) + except ValueError: + close_dt = now + period_key = self._get_period_key(close_dt, period) + + periods.setdefault(period_key, []).append(deal) + + result = [] + for label in sorted(periods.keys()): + deals = periods[label] + total_pipeline = sum(d.current_value for d in deals) + total_weighted = sum(d.weighted_value for d in deals) + probabilities = [d.close_probability for d in deals if d.close_probability > 0] + avg_prob = sum(probabilities) / len(probabilities) if probabilities else 0 + + # Best/worst case + best_case = sum(d.current_value for d in deals if d.close_probability >= 0.5) + worst_case = sum(d.current_value for d in deals if d.close_probability >= 0.8) + + result.append(PeriodForecast( + period_label=label, + predicted_revenue=round(total_weighted, 2), + weighted_pipeline=round(total_pipeline, 2), + deal_count=len(deals), + avg_close_probability=round(avg_prob, 2), + best_case=round(best_case, 2), + worst_case=round(worst_case, 2), + )) + + return result + + @staticmethod + def _get_period_key(dt: datetime, period: str) -> str: + """Generate a period label string.""" + if period == "quarterly": + quarter = (dt.month - 1) // 3 + 1 + return f"Q{quarter} {dt.year}" + else: + month_name = MONTHS_AR.get(dt.month, str(dt.month)) + return f"{month_name} {dt.year}" + + # ── Summary Generation ─────────────────────── + + @staticmethod + def _build_summary_ar( + periods: list[PeriodForecast], + at_risk: list[DealForecast], + total_pipeline: float, + total_weighted: float, + period_type: str, + ) -> str: + lines = ["ملخص التوقعات:"] + lines.append(f"إجمالي خط الأنابيب: {total_pipeline:,.0f} ريال") + lines.append(f"الإيراد المتوقع (مرجّح): {total_weighted:,.0f} ريال") + lines.append("") + + period_label = "الشهر" if period_type == "monthly" else "الربع" + for p in periods[:4]: + lines.append( + f"{p.period_label}: {p.predicted_revenue:,.0f} ريال متوقع " + f"({p.deal_count} صفقة، احتمالية {p.avg_close_probability:.0%})" + ) + + if at_risk: + lines.append("") + lines.append(f"صفقات معرّضة للخطر ({len(at_risk)}):") + for deal in at_risk[:5]: + reasons = " | ".join(deal.risk_reasons_ar) if deal.risk_reasons_ar else "غير محدد" + lines.append(f" - {deal.deal_name}: {deal.current_value:,.0f} ريال — {reasons}") + + return "\n".join(lines) + + @staticmethod + def _build_summary_en( + periods: list[PeriodForecast], + at_risk: list[DealForecast], + total_pipeline: float, + total_weighted: float, + period_type: str, + ) -> str: + lines = ["Forecast Summary:"] + lines.append(f"Total Pipeline: {total_pipeline:,.0f} SAR") + lines.append(f"Weighted Revenue: {total_weighted:,.0f} SAR") + for p in periods[:4]: + lines.append( + f"{p.period_label}: {p.predicted_revenue:,.0f} SAR predicted " + f"({p.deal_count} deals, {p.avg_close_probability:.0%} avg probability)" + ) + if at_risk: + lines.append(f"At-risk deals: {len(at_risk)}") + return "\n".join(lines) + + # ── AI Recommendations ─────────────────────── + + async def _generate_recommendations( + self, + periods: list[PeriodForecast], + at_risk: list[DealForecast], + total_pipeline: float, + ) -> list[str]: + """Generate Arabic recommendations using LLM.""" + if not periods and not at_risk: + return ["لا توجد بيانات كافية لتقديم توصيات. أضف صفقات لخط الأنابيب."] + + # Build context for LLM + context_parts = [] + for p in periods[:3]: + context_parts.append( + f"{p.period_label}: إيراد متوقع {p.predicted_revenue:,.0f} ريال، " + f"{p.deal_count} صفقة" + ) + if at_risk: + context_parts.append(f"صفقات معرضة للخطر: {len(at_risk)}") + for d in at_risk[:3]: + context_parts.append(f" - {d.deal_name}: {d.current_value:,.0f} ريال، غير نشط {d.days_inactive} يوم") + + context_text = "\n".join(context_parts) + + system_prompt = ( + "أنت مستشار مبيعات خبير في السوق السعودي.\n" + "بناءً على بيانات التوقعات التالية، قدّم 3-5 توصيات عملية وقابلة للتنفيذ بالعربي.\n" + "أجب بصيغة JSON: {\"recommendations\": [\"توصية1\", \"توصية2\", ...]}" + ) + + try: + response = await self._llm.complete( + system_prompt=system_prompt, + user_message=context_text, + json_mode=True, + temperature=0.3, + max_tokens=512, + fast=True, + ) + parsed = response.parse_json() + if parsed and "recommendations" in parsed: + return parsed["recommendations"] + except Exception as e: + logger.warning(f"LLM recommendation generation failed: {e}") + + # Fallback static recommendations + recommendations = [] + if at_risk: + recommendations.append( + f"تنبيه: {len(at_risk)} صفقة معرّضة للخطر. تواصل مع العملاء فوراً لإعادة التفعيل." + ) + if total_pipeline < 100000: + recommendations.append( + "خط الأنابيب منخفض. ركّز على توليد عملاء محتملين جدد هذا الأسبوع." + ) + if periods: + low_prob_periods = [p for p in periods if p.avg_close_probability < 0.3] + if low_prob_periods: + recommendations.append( + "احتمالية الإغلاق منخفضة في بعض الفترات. راجع تأهيل العملاء المحتملين." + ) + recommendations.append("حدّث بيانات الصفقات بانتظام لتحسين دقة التوقعات.") + return recommendations diff --git a/salesflow-saas/backend/app/services/sequence_engine.py b/salesflow-saas/backend/app/services/sequence_engine.py index 4b7aa28e..798a5583 100644 --- a/salesflow-saas/backend/app/services/sequence_engine.py +++ b/salesflow-saas/backend/app/services/sequence_engine.py @@ -1,9 +1,6 @@ """Multi-channel sequence engine for Dealix CRM. - -Orchestrates ordered outreach steps across WhatsApp, email, and SMS -with PDPL consent checks, A/B testing, and analytics. +Orchestrates outreach steps across WhatsApp, email, SMS with PDPL consent checks and A/B testing. """ - import logging import random from datetime import datetime, timedelta, timezone @@ -11,7 +8,7 @@ from typing import Optional from uuid import UUID from pydantic import BaseModel as Schema -from sqlalchemy import select, func, and_ +from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from app.models.sequence import ( @@ -23,10 +20,6 @@ from app.services.pdpl.consent_manager import ConsentManager logger = logging.getLogger(__name__) -# --------------------------------------------------------------------------- -# Pydantic schemas -# --------------------------------------------------------------------------- - class SequenceCreateInput(Schema): tenant_id: UUID name: str @@ -67,83 +60,54 @@ class SequenceAnalytics(Schema): conversion_rate: float -# --------------------------------------------------------------------------- -# SequenceEngine -# --------------------------------------------------------------------------- - class SequenceEngine: """Manages multi-channel outreach sequences.""" def __init__(self, db: AsyncSession): self.db = db - # -- create sequence ----------------------------------------------------- - async def create_sequence(self, data: SequenceCreateInput) -> Sequence: """Create a new sequence with optional steps.""" - seq = Sequence( - tenant_id=data.tenant_id, - name=data.name, - name_ar=data.name_ar, - description=data.description, - trigger_event=data.trigger_event, - is_active=True, - created_by=data.created_by, + tenant_id=data.tenant_id, name=data.name, name_ar=data.name_ar, + description=data.description, trigger_event=data.trigger_event, + is_active=True, created_by=data.created_by, ) self.db.add(seq) await self.db.flush() - - for i, step_data in enumerate(data.steps): - step = SequenceStep( - sequence_id=seq.id, - step_order=i + 1, - channel=step_data.get("channel", "email"), - delay_minutes=step_data.get("delay_minutes", 0), - template_content=step_data.get("template_content", ""), - template_content_ar=step_data.get("template_content_ar"), - variant=step_data.get("variant"), - conditions=step_data.get("conditions", {}), - ) - self.db.add(step) - + for i, sd in enumerate(data.steps): + self.db.add(SequenceStep( + sequence_id=seq.id, step_order=i + 1, + channel=sd.get("channel", "email"), delay_minutes=sd.get("delay_minutes", 0), + template_content=sd.get("template_content", ""), + template_content_ar=sd.get("template_content_ar"), + variant=sd.get("variant"), conditions=sd.get("conditions", {}), + )) await self.db.flush() await self.db.refresh(seq) logger.info("Sequence created: id=%s name=%s", seq.id, seq.name) return seq - # -- enroll lead --------------------------------------------------------- - async def enroll_lead(self, data: EnrollInput) -> SequenceEnrollment: - """Enroll a lead into a sequence. Starts at step 0.""" - - # Prevent duplicate active enrollments - existing = await self.db.execute( + """Enroll a lead into a sequence.""" + existing = (await self.db.execute( select(SequenceEnrollment).where( SequenceEnrollment.sequence_id == data.sequence_id, SequenceEnrollment.lead_id == data.lead_id, SequenceEnrollment.status == SequenceStatus.ACTIVE.value, ) - ) - if existing.scalar_one_or_none(): - raise ValueError("العميل المحتمل مسجل بالفعل في هذا التسلسل") # Lead already enrolled - - # Fetch first step to calculate next_step_at - first_step = await self.db.execute( + )).scalar_one_or_none() + if existing: + raise ValueError("العميل المحتمل مسجل بالفعل في هذا التسلسل") + first_step = (await self.db.execute( select(SequenceStep).where(SequenceStep.sequence_id == data.sequence_id) .order_by(SequenceStep.step_order).limit(1) - ) - step = first_step.scalar_one_or_none() + )).scalar_one_or_none() now = datetime.now(timezone.utc) - next_at = now + timedelta(minutes=step.delay_minutes) if step else None - enrollment = SequenceEnrollment( - sequence_id=data.sequence_id, - lead_id=data.lead_id, - current_step=0, - status=SequenceStatus.ACTIVE.value, - enrolled_at=now, - next_step_at=next_at, + sequence_id=data.sequence_id, lead_id=data.lead_id, current_step=0, + status=SequenceStatus.ACTIVE.value, enrolled_at=now, + next_step_at=now + timedelta(minutes=first_step.delay_minutes) if first_step else None, ) self.db.add(enrollment) await self.db.flush() @@ -151,207 +115,135 @@ class SequenceEngine: logger.info("Lead enrolled: lead=%s sequence=%s", data.lead_id, data.sequence_id) return enrollment - # -- process pending steps ----------------------------------------------- - async def process_pending_steps(self, tenant_id: UUID) -> list[StepProcessResult]: - """Process all enrollments whose next step is due. Checks PDPL consent.""" - + """Process enrollments whose next step is due. Checks PDPL consent.""" now = datetime.now(timezone.utc) consent_mgr = ConsentManager(self.db) results: list[StepProcessResult] = [] - - # Find active enrollments that are due - query = ( + rows = await self.db.execute( select(SequenceEnrollment) .join(Sequence, Sequence.id == SequenceEnrollment.sequence_id) - .where( - Sequence.tenant_id == tenant_id, - Sequence.is_active == True, - SequenceEnrollment.status == SequenceStatus.ACTIVE.value, - SequenceEnrollment.next_step_at <= now, - ) + .where(Sequence.tenant_id == tenant_id, Sequence.is_active == True, + SequenceEnrollment.status == SequenceStatus.ACTIVE.value, + SequenceEnrollment.next_step_at <= now) .limit(200) ) - rows = await self.db.execute(query) - enrollments = rows.scalars().all() - - for enrollment in enrollments: - result = await self._execute_next_step(enrollment, consent_mgr) - if result: - results.append(result) - + for enrollment in rows.scalars().all(): + r = await self._execute_next_step(enrollment, consent_mgr) + if r: + results.append(r) logger.info("Processed %d pending steps for tenant=%s", len(results), tenant_id) return results - # -- pause / resume / stop ----------------------------------------------- - async def pause_enrollment(self, enrollment_id: UUID) -> SequenceEnrollment: - enrollment = await self._get_enrollment(enrollment_id) - enrollment.status = SequenceStatus.PAUSED.value + e = await self._get_enrollment(enrollment_id) + e.status = SequenceStatus.PAUSED.value await self.db.flush() - logger.info("Enrollment paused: %s", enrollment_id) - return enrollment + return e async def resume_enrollment(self, enrollment_id: UUID) -> SequenceEnrollment: - enrollment = await self._get_enrollment(enrollment_id) - enrollment.status = SequenceStatus.ACTIVE.value - enrollment.next_step_at = datetime.now(timezone.utc) + e = await self._get_enrollment(enrollment_id) + e.status = SequenceStatus.ACTIVE.value + e.next_step_at = datetime.now(timezone.utc) await self.db.flush() - logger.info("Enrollment resumed: %s", enrollment_id) - return enrollment + return e async def stop_enrollment(self, enrollment_id: UUID) -> SequenceEnrollment: - enrollment = await self._get_enrollment(enrollment_id) - enrollment.status = SequenceStatus.STOPPED.value - enrollment.completed_at = datetime.now(timezone.utc) + e = await self._get_enrollment(enrollment_id) + e.status = SequenceStatus.STOPPED.value + e.completed_at = datetime.now(timezone.utc) await self.db.flush() - logger.info("Enrollment stopped: %s", enrollment_id) - return enrollment - - # -- analytics ----------------------------------------------------------- + return e async def get_sequence_analytics(self, sequence_id: UUID) -> SequenceAnalytics: """Compute open/response/conversion rates for a sequence.""" - seq = (await self.db.execute( - select(Sequence).where(Sequence.id == sequence_id) - )).scalar_one_or_none() + select(Sequence).where(Sequence.id == sequence_id))).scalar_one_or_none() if not seq: raise ValueError("التسلسل غير موجود") - # Enrollment counts - def _count_enrollments(status: str): - return select(func.count()).where( - SequenceEnrollment.sequence_id == sequence_id, - SequenceEnrollment.status == status, - ) + async def _enroll_count(st: str) -> int: + return (await self.db.execute( + select(func.count()).where(SequenceEnrollment.sequence_id == sequence_id, + SequenceEnrollment.status == st))).scalar() or 0 total = (await self.db.execute( - select(func.count()).where(SequenceEnrollment.sequence_id == sequence_id) - )).scalar() or 0 - active = (await self.db.execute(_count_enrollments(SequenceStatus.ACTIVE.value))).scalar() or 0 - completed = (await self.db.execute(_count_enrollments(SequenceStatus.COMPLETED.value))).scalar() or 0 - stopped = (await self.db.execute(_count_enrollments(SequenceStatus.STOPPED.value))).scalar() or 0 + select(func.count()).where(SequenceEnrollment.sequence_id == sequence_id))).scalar() or 0 + active = await _enroll_count(SequenceStatus.ACTIVE.value) + completed = await _enroll_count(SequenceStatus.COMPLETED.value) + stopped = await _enroll_count(SequenceStatus.STOPPED.value) - # Event counts - base_event = ( - select(func.count()) - .select_from(SequenceEvent) - .join(SequenceEnrollment, SequenceEnrollment.id == SequenceEvent.enrollment_id) - .where(SequenceEnrollment.sequence_id == sequence_id) - ) - - total_sent = (await self.db.execute(base_event)).scalar() or 0 + base = (select(func.count()).select_from(SequenceEvent) + .join(SequenceEnrollment, SequenceEnrollment.id == SequenceEvent.enrollment_id) + .where(SequenceEnrollment.sequence_id == sequence_id)) + total_sent = (await self.db.execute(base)).scalar() or 0 delivered = (await self.db.execute( - base_event.where(SequenceEvent.status.in_(["delivered", "opened", "replied"])) - )).scalar() or 0 + base.where(SequenceEvent.status.in_(["delivered", "opened", "replied"])))).scalar() or 0 opened = (await self.db.execute( - base_event.where(SequenceEvent.status.in_(["opened", "replied"])) - )).scalar() or 0 + base.where(SequenceEvent.status.in_(["opened", "replied"])))).scalar() or 0 replied = (await self.db.execute( - base_event.where(SequenceEvent.status == "replied") - )).scalar() or 0 + base.where(SequenceEvent.status == "replied"))).scalar() or 0 failed = (await self.db.execute( - base_event.where(SequenceEvent.status == "failed") - )).scalar() or 0 - - safe_div = lambda n, d: round(n / d * 100, 2) if d else 0.0 + base.where(SequenceEvent.status == "failed"))).scalar() or 0 + safe = lambda n, d: round(n / d * 100, 2) if d else 0.0 return SequenceAnalytics( - sequence_id=sequence_id, - name=seq.name, - total_enrolled=total, - active=active, - completed=completed, - stopped=stopped, - total_sent=total_sent, - delivered=delivered, - opened=opened, - replied=replied, - failed=failed, - open_rate=safe_div(opened, total_sent), - reply_rate=safe_div(replied, total_sent), - conversion_rate=safe_div(completed, total) if total else 0.0, + sequence_id=sequence_id, name=seq.name, total_enrolled=total, + active=active, completed=completed, stopped=stopped, + total_sent=total_sent, delivered=delivered, opened=opened, + replied=replied, failed=failed, + open_rate=safe(opened, total_sent), reply_rate=safe(replied, total_sent), + conversion_rate=safe(completed, total), ) - # -- private helpers ----------------------------------------------------- - - async def _execute_next_step( - self, enrollment: SequenceEnrollment, consent_mgr: ConsentManager, - ) -> Optional[StepProcessResult]: - """Execute the next step for an enrollment.""" - - steps_q = await self.db.execute( - select(SequenceStep) - .where(SequenceStep.sequence_id == enrollment.sequence_id) - .order_by(SequenceStep.step_order) - ) - steps = steps_q.scalars().all() - next_idx = enrollment.current_step - if next_idx >= len(steps): + async def _execute_next_step(self, enrollment: SequenceEnrollment, + consent_mgr: ConsentManager) -> Optional[StepProcessResult]: + steps = (await self.db.execute( + select(SequenceStep).where(SequenceStep.sequence_id == enrollment.sequence_id) + .order_by(SequenceStep.step_order))).scalars().all() + idx = enrollment.current_step + if idx >= len(steps): enrollment.status = SequenceStatus.COMPLETED.value enrollment.completed_at = datetime.now(timezone.utc) await self.db.flush() return None - - # A/B test: pick variant randomly if multiple exist for same order - candidates = [s for s in steps if s.step_order == steps[next_idx].step_order] - step = random.choice(candidates) if len(candidates) > 1 else steps[next_idx] - - # PDPL consent check - seq = (await self.db.execute( - select(Sequence).where(Sequence.id == enrollment.sequence_id) - )).scalar_one() - consent_result = await consent_mgr.check_consent( - contact_id=enrollment.lead_id, - purpose="marketing", - channel=step.channel, - ) - if not consent_result.allowed: - event = SequenceEvent( - enrollment_id=enrollment.id, step_id=step.id, - channel=step.channel, status=SequenceEventStatus.FAILED.value, - metadata={"reason": "no_consent", "message": consent_result.message}, - ) - self.db.add(event) + # A/B variant selection + candidates = [s for s in steps if s.step_order == steps[idx].step_order] + step = random.choice(candidates) if len(candidates) > 1 else steps[idx] + # PDPL consent gate + cr = await consent_mgr.check_consent(enrollment.lead_id, "marketing", step.channel) + if not cr.allowed: + self.db.add(SequenceEvent( + enrollment_id=enrollment.id, step_id=step.id, channel=step.channel, + status=SequenceEventStatus.FAILED.value, + metadata={"reason": "no_consent", "detail": cr.message}, + )) enrollment.status = SequenceStatus.STOPPED.value await self.db.flush() - return StepProcessResult( - enrollment_id=enrollment.id, step_id=step.id, - channel=step.channel, status="failed", - message=f"PDPL consent denied: {consent_result.message}", - ) - - # Record send event - event = SequenceEvent( - enrollment_id=enrollment.id, step_id=step.id, - channel=step.channel, status=SequenceEventStatus.SENT.value, - metadata={"variant": step.variant, "template_preview": step.template_content[:100]}, - ) - self.db.add(event) - - # Advance enrollment - enrollment.current_step = next_idx + 1 + return StepProcessResult(enrollment_id=enrollment.id, step_id=step.id, + channel=step.channel, status="failed", + message=f"PDPL consent denied: {cr.message}") + self.db.add(SequenceEvent( + enrollment_id=enrollment.id, step_id=step.id, channel=step.channel, + status=SequenceEventStatus.SENT.value, + metadata={"variant": step.variant, "preview": step.template_content[:80]}, + )) + enrollment.current_step = idx + 1 if enrollment.current_step >= len(steps): enrollment.status = SequenceStatus.COMPLETED.value enrollment.completed_at = datetime.now(timezone.utc) enrollment.next_step_at = None else: - next_step = steps[enrollment.current_step] - enrollment.next_step_at = datetime.now(timezone.utc) + timedelta(minutes=next_step.delay_minutes) - + enrollment.next_step_at = datetime.now(timezone.utc) + timedelta(minutes=steps[enrollment.current_step].delay_minutes) await self.db.flush() - return StepProcessResult( - enrollment_id=enrollment.id, step_id=step.id, - channel=step.channel, status="sent", - message=f"Step {next_idx + 1} sent via {step.channel}", - ) + return StepProcessResult(enrollment_id=enrollment.id, step_id=step.id, + channel=step.channel, status="sent", + message=f"Step {idx + 1} sent via {step.channel}") async def _get_enrollment(self, enrollment_id: UUID) -> SequenceEnrollment: result = await self.db.execute( - select(SequenceEnrollment).where(SequenceEnrollment.id == enrollment_id) - ) - enrollment = result.scalar_one_or_none() - if not enrollment: - raise ValueError("التسجيل غير موجود") # Enrollment not found - return enrollment + select(SequenceEnrollment).where(SequenceEnrollment.id == enrollment_id)) + e = result.scalar_one_or_none() + if not e: + raise ValueError("التسجيل غير موجود") + return e diff --git a/salesflow-saas/seeds/education_template.json b/salesflow-saas/seeds/education_template.json new file mode 100644 index 00000000..a059c823 --- /dev/null +++ b/salesflow-saas/seeds/education_template.json @@ -0,0 +1,127 @@ +{ + "industry": "education", + "name": "Education & Training", + "name_ar": "تعليم وتدريب", + "pipeline_stages": [ + {"key": "inquiry", "name_en": "Inquiry", "name_ar": "استفسار", "order": 1, "probability": 10}, + {"key": "consultation", "name_en": "Consultation", "name_ar": "استشارة", "order": 2, "probability": 25}, + {"key": "registration", "name_en": "Registration", "name_ar": "تسجيل", "order": 3, "probability": 50}, + {"key": "payment", "name_en": "Payment", "name_ar": "دفع", "order": 4, "probability": 75}, + {"key": "enrolled", "name_en": "Enrolled", "name_ar": "مسجّل", "order": 5, "probability": 90}, + {"key": "completed", "name_en": "Completed", "name_ar": "مكتمل", "order": 6, "probability": 100}, + {"key": "alumni", "name_en": "Alumni", "name_ar": "خريج", "order": 7, "probability": 100} + ], + "message_templates": [ + { + "name": "welcome", + "name_ar": "رسالة ترحيب", + "channel": "whatsapp", + "trigger": "lead_created", + "content_ar": "أهلاً {name}! شكراً لتواصلك مع {company}. عندنا برامج تعليمية وتدريبية متميزة. وش البرنامج أو الدورة اللي تهمك؟", + "content_en": "Hello {name}! Thank you for contacting {company}. We offer outstanding educational and training programs. Which program interests you?", + "delay_minutes": 0 + }, + { + "name": "consultation_scheduled", + "name_ar": "موعد استشارة", + "channel": "whatsapp", + "trigger": "stage_change_consultation", + "content_ar": "مرحباً {name}، تم حجز جلسة استشارة مجانية لك يوم {date} الساعة {time}. مستشارنا الأكاديمي بيساعدك تختار البرنامج الأنسب لأهدافك.", + "content_en": "Hi {name}, your free consultation is scheduled for {date} at {time}. Our academic advisor will help you choose the best program for your goals.", + "delay_minutes": 0 + }, + { + "name": "registration_confirmed", + "name_ar": "تأكيد التسجيل", + "channel": "whatsapp", + "trigger": "stage_change_registration", + "content_ar": "مبروك {name}! تم تسجيلك في برنامج {program_name}. تاريخ البدء: {start_date}. بنرسل لك تفاصيل الدخول والمواد قريباً.", + "content_en": "Congratulations {name}! You're registered for {program_name}. Start date: {start_date}. We'll send access details and materials soon.", + "delay_minutes": 0 + }, + { + "name": "payment_reminder", + "name_ar": "تذكير بالدفع", + "channel": "whatsapp", + "trigger": "stage_change_payment", + "content_ar": "مرحباً {name}، للتأكيد على مقعدك في برنامج {program_name}، يرجى إتمام الدفع. المبلغ: {amount} ريال. رابط الدفع: {payment_link}. عندنا خيار تقسيط بدون فوائد.", + "content_en": "Hi {name}, to confirm your spot in {program_name}, please complete payment. Amount: {amount} SAR. Payment link: {payment_link}.", + "delay_minutes": 0 + }, + { + "name": "no_response_followup", + "name_ar": "متابعة عدم الرد", + "channel": "whatsapp", + "trigger": "no_response", + "content_ar": "مرحباً {name}، لاحظنا اهتمامك ببرامجنا التدريبية. المقاعد محدودة للدفعة القادمة. تبي نحجز لك مقعد أو تحتاج معلومات أكثر؟", + "content_en": "Hi {name}, we noticed your interest in our programs. Seats are limited for the next cohort. Want to reserve a spot or need more info?", + "delay_minutes": 2880 + }, + { + "name": "completion_certificate", + "name_ar": "شهادة إتمام", + "channel": "whatsapp", + "trigger": "stage_change_completed", + "content_ar": "مبروك {name}! أتممت برنامج {program_name} بنجاح. شهادتك جاهزة للتحميل. نفتخر فيك! تبي تطّلع على برامجنا المتقدمة؟", + "content_en": "Congratulations {name}! You've completed {program_name} successfully. Your certificate is ready for download. Interested in advanced programs?", + "delay_minutes": 0 + } + ], + "proposal_templates": [ + { + "name": "training_program", + "name_ar": "عرض برنامج تدريبي", + "sections": [ + {"title_ar": "نبذة عن البرنامج", "title_en": "Program Overview"}, + {"title_ar": "المحاور والمحتوى", "title_en": "Curriculum & Content"}, + {"title_ar": "مدة البرنامج والجدول", "title_en": "Duration & Schedule"}, + {"title_ar": "المدربون والمؤهلات", "title_en": "Instructors & Qualifications"}, + {"title_ar": "الرسوم وخيارات الدفع", "title_en": "Fees & Payment Options"}, + {"title_ar": "الشهادات المعتمدة", "title_en": "Accredited Certificates"} + ] + }, + { + "name": "corporate_training", + "name_ar": "عرض تدريب مؤسسي", + "sections": [ + {"title_ar": "تحليل الاحتياجات التدريبية", "title_en": "Training Needs Analysis"}, + {"title_ar": "البرنامج المخصص", "title_en": "Customized Program"}, + {"title_ar": "منهجية التدريب", "title_en": "Training Methodology"}, + {"title_ar": "التسعير المؤسسي", "title_en": "Corporate Pricing"}, + {"title_ar": "قياس الأثر التدريبي", "title_en": "Training Impact Measurement"}, + {"title_ar": "الشروط والأحكام", "title_en": "Terms & Conditions"} + ] + } + ], + "workflow_templates": [ + { + "name": "new_student_flow", + "name_ar": "تدفق طالب جديد", + "trigger": "lead_created", + "actions": [ + {"type": "send_message", "template": "welcome", "delay_minutes": 0}, + {"type": "create_task", "subject": "اتصل بالمتقدم واعرف اهتماماته التعليمية", "delay_minutes": 30}, + {"type": "send_message", "template": "no_response_followup", "delay_minutes": 2880, "condition": "no_response"} + ] + }, + { + "name": "registration_flow", + "name_ar": "تدفق التسجيل", + "trigger": "stage_change_registration", + "actions": [ + {"type": "send_message", "template": "registration_confirmed", "delay_minutes": 0}, + {"type": "send_message", "template": "payment_reminder", "delay_minutes": 1440}, + {"type": "create_task", "subject": "متابعة الدفع مع المتدرب", "delay_minutes": 2880} + ] + }, + { + "name": "alumni_engagement_flow", + "name_ar": "تدفق الخريجين", + "trigger": "stage_change_completed", + "actions": [ + {"type": "send_message", "template": "completion_certificate", "delay_minutes": 0}, + {"type": "create_task", "subject": "إرسال استبيان رضا المتدرب", "delay_minutes": 1440} + ] + } + ] +} diff --git a/salesflow-saas/seeds/retail_template.json b/salesflow-saas/seeds/retail_template.json new file mode 100644 index 00000000..eb55764d --- /dev/null +++ b/salesflow-saas/seeds/retail_template.json @@ -0,0 +1,117 @@ +{ + "industry": "retail", + "name": "Retail & E-commerce", + "name_ar": "تجارة وريتيل", + "pipeline_stages": [ + {"key": "lead", "name_en": "Lead", "name_ar": "عميل محتمل", "order": 1, "probability": 10}, + {"key": "interest", "name_en": "Interest", "name_ar": "مهتم", "order": 2, "probability": 20}, + {"key": "demo", "name_en": "Demo", "name_ar": "عرض توضيحي", "order": 3, "probability": 40}, + {"key": "trial", "name_en": "Trial", "name_ar": "تجربة", "order": 4, "probability": 55}, + {"key": "negotiation", "name_en": "Negotiation", "name_ar": "تفاوض", "order": 5, "probability": 70}, + {"key": "closed_won", "name_en": "Closed Won", "name_ar": "تم البيع", "order": 6, "probability": 100}, + {"key": "onboarding", "name_en": "Onboarding", "name_ar": "تفعيل", "order": 7, "probability": 100} + ], + "message_templates": [ + { + "name": "welcome", + "name_ar": "رسالة ترحيب", + "channel": "whatsapp", + "trigger": "lead_created", + "content_ar": "أهلاً {name}! شكراً لاهتمامك بـ {company}. عندنا حلول تجارية ذكية تساعدك تنمّي مبيعاتك. وش تبي تعرف أكثر عنه؟", + "content_en": "Hello {name}! Thanks for your interest in {company}. We have smart retail solutions to grow your sales. What would you like to know more about?", + "delay_minutes": 0 + }, + { + "name": "demo_invitation", + "name_ar": "دعوة عرض توضيحي", + "channel": "whatsapp", + "trigger": "stage_change_demo", + "content_ar": "مرحباً {name}، جهّزنا لك عرض توضيحي مخصص لمجال {business_type}. الموعد يوم {date} الساعة {time}. بنوريك كيف تقدر تزيد مبيعاتك بنسبة تصل لـ 40%.", + "content_en": "Hi {name}, we've prepared a customized demo for your {business_type}. Scheduled for {date} at {time}. We'll show you how to increase sales by up to 40%.", + "delay_minutes": 0 + }, + { + "name": "trial_started", + "name_ar": "بداية التجربة", + "channel": "whatsapp", + "trigger": "stage_change_trial", + "content_ar": "مرحباً {name}! تم تفعيل حسابك التجريبي. الفترة: {trial_days} يوم. فريق الدعم جاهز يساعدك في أي وقت. استمتع بالتجربة!", + "content_en": "Hi {name}! Your trial account is activated. Duration: {trial_days} days. Our support team is ready to help anytime.", + "delay_minutes": 0 + }, + { + "name": "trial_ending", + "name_ar": "انتهاء التجربة", + "channel": "whatsapp", + "trigger": "scheduled", + "content_ar": "مرحباً {name}، فترتك التجريبية تنتهي خلال 3 أيام. كيف كانت التجربة؟ لو حبيت تكمل معنا عندنا عرض خاص بخصم {discount}% على الباقة السنوية.", + "content_en": "Hi {name}, your trial ends in 3 days. How was the experience? If you'd like to continue, we have a special {discount}% discount on the annual plan.", + "delay_minutes": -4320 + }, + { + "name": "no_response_followup", + "name_ar": "متابعة عدم الرد", + "channel": "whatsapp", + "trigger": "no_response", + "content_ar": "مرحباً {name}، لاحظنا إنك ما رديت. عادي تماماً! بس حبينا نخبرك إن عندنا عروض جديدة ممكن تناسب متجرك. تبي نرسل لك التفاصيل؟", + "content_en": "Hi {name}, we noticed you haven't responded. That's totally fine! We just wanted to let you know about new offers for your store.", + "delay_minutes": 2880 + }, + { + "name": "onboarding_welcome", + "name_ar": "ترحيب التفعيل", + "channel": "whatsapp", + "trigger": "stage_change_onboarding", + "content_ar": "مبروك {name}! تم تفعيل حسابك الرسمي. مدير حسابك {account_manager} بيتواصل معك خلال 24 ساعة لمساعدتك في الإعداد. نتمنى لك النجاح!", + "content_en": "Congratulations {name}! Your account is officially active. Your account manager {account_manager} will contact you within 24 hours.", + "delay_minutes": 0 + } + ], + "proposal_templates": [ + { + "name": "retail_solution", + "name_ar": "عرض حل تجاري", + "sections": [ + {"title_ar": "ملخص الحل", "title_en": "Solution Summary"}, + {"title_ar": "المميزات والفوائد", "title_en": "Features & Benefits"}, + {"title_ar": "الباقات والأسعار", "title_en": "Packages & Pricing"}, + {"title_ar": "دراسة حالة مشابهة", "title_en": "Similar Case Study"}, + {"title_ar": "خطة التفعيل", "title_en": "Activation Plan"}, + {"title_ar": "الشروط والأحكام", "title_en": "Terms & Conditions"} + ] + }, + { + "name": "enterprise_package", + "name_ar": "باقة المؤسسات", + "sections": [ + {"title_ar": "نظرة عامة", "title_en": "Overview"}, + {"title_ar": "التكامل مع الأنظمة الحالية", "title_en": "Integration with Existing Systems"}, + {"title_ar": "التسعير المؤسسي", "title_en": "Enterprise Pricing"}, + {"title_ar": "الدعم الفني والتدريب", "title_en": "Technical Support & Training"}, + {"title_ar": "اتفاقية مستوى الخدمة", "title_en": "SLA"} + ] + } + ], + "workflow_templates": [ + { + "name": "new_retail_lead_flow", + "name_ar": "تدفق عميل تجاري جديد", + "trigger": "lead_created", + "actions": [ + {"type": "send_message", "template": "welcome", "delay_minutes": 0}, + {"type": "create_task", "subject": "تأهيل العميل وتحديد احتياجاته التجارية", "delay_minutes": 30}, + {"type": "send_message", "template": "no_response_followup", "delay_minutes": 2880, "condition": "no_response"} + ] + }, + { + "name": "trial_management_flow", + "name_ar": "تدفق إدارة التجربة", + "trigger": "stage_change_trial", + "actions": [ + {"type": "send_message", "template": "trial_started", "delay_minutes": 0}, + {"type": "create_task", "subject": "متابعة العميل بعد 3 أيام من التجربة", "delay_minutes": 4320}, + {"type": "send_message", "template": "trial_ending", "delay_minutes": -4320} + ] + } + ] +}