feat: Add AI forecasting, industry templates, and update sequence engine

- AI: forecasting.py (revenue prediction, deal risk analysis, Arabic summaries)
- Seeds: retail_template.json (Saudi retail/e-commerce industry template)
- Seeds: education_template.json (Saudi education/training industry template)
- Updated: sequence_engine.py with full implementation

https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj
This commit is contained in:
Claude 2026-04-11 07:43:49 +00:00
parent 141f10db76
commit 5df520d672
No known key found for this signature in database
4 changed files with 842 additions and 211 deletions

View File

@ -0,0 +1,495 @@
"""
Sales Forecasting Engine Predicts revenue, calculates deal-close probability,
identifies at-risk deals, and generates Arabic forecast summaries.
"""
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from typing import Optional
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.llm.provider import get_llm
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class DealForecast:
deal_id: str
deal_name: str
current_value: float
close_probability: float # 0.0-1.0
weighted_value: float
risk_level: str # "low", "medium", "high"
risk_reasons_ar: list[str] = field(default_factory=list)
days_inactive: int = 0
expected_close_date: Optional[str] = None
@dataclass
class PeriodForecast:
period_label: str # "2026-04", "Q2 2026"
predicted_revenue: float
weighted_pipeline: float
deal_count: int
avg_close_probability: float
best_case: float
worst_case: float
@dataclass
class ForecastResult:
tenant_id: str
period: str # "monthly" or "quarterly"
generated_at: str
periods: list[PeriodForecast] = field(default_factory=list)
at_risk_deals: list[DealForecast] = field(default_factory=list)
top_deals: list[DealForecast] = field(default_factory=list)
summary_ar: str = ""
summary_en: str = ""
total_pipeline_value: float = 0.0
total_weighted_value: float = 0.0
recommendations_ar: list[str] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# Stage-based probability defaults
STAGE_PROBABILITIES = {
"new": 0.10,
"contacted": 0.15,
"qualified": 0.25,
"proposal_sent": 0.45,
"negotiation": 0.65,
"verbal_agreement": 0.80,
"contract_sent": 0.85,
"closed_won": 1.00,
"closed_lost": 0.00,
}
# Inactivity thresholds (days)
RISK_THRESHOLDS = {
"high": 14,
"medium": 7,
"low": 3,
}
MONTHS_AR = {
1: "يناير", 2: "فبراير", 3: "مارس", 4: "أبريل",
5: "مايو", 6: "يونيو", 7: "يوليو", 8: "أغسطس",
9: "سبتمبر", 10: "أكتوبر", 11: "نوفمبر", 12: "ديسمبر",
}
# ---------------------------------------------------------------------------
# Service
# ---------------------------------------------------------------------------
class SalesForecastingEngine:
"""Predicts revenue and identifies at-risk deals."""
def __init__(self):
self._llm = get_llm()
async def generate_forecast(
self, tenant_id: str, period: str, db: AsyncSession
) -> ForecastResult:
"""
Generate sales forecast.
Args:
tenant_id: Tenant UUID string
period: "monthly" or "quarterly"
db: Async database session
Returns:
ForecastResult with predictions, at-risk deals, and summaries.
"""
now = datetime.now(timezone.utc)
try:
deals_data = await self._fetch_deals(tenant_id, db)
except Exception as e:
logger.error(f"Failed to fetch deals for tenant {tenant_id}: {e}")
return ForecastResult(
tenant_id=tenant_id,
period=period,
generated_at=now.isoformat(),
summary_ar="تعذر إنشاء التوقعات — لم يتم العثور على بيانات الصفقات",
summary_en="Forecast generation failed - deal data not found",
)
# Calculate individual deal forecasts
deal_forecasts = self._calculate_deal_forecasts(deals_data, now)
# Group by period
periods = self._group_by_period(deal_forecasts, period, now)
# Identify at-risk and top deals
at_risk = sorted(
[d for d in deal_forecasts if d.risk_level in ("high", "medium")],
key=lambda d: d.days_inactive,
reverse=True,
)[:10]
top_deals = sorted(
[d for d in deal_forecasts if d.close_probability > 0.0],
key=lambda d: d.weighted_value,
reverse=True,
)[:10]
# Totals
total_pipeline = sum(d.current_value for d in deal_forecasts if d.close_probability > 0)
total_weighted = sum(d.weighted_value for d in deal_forecasts)
# Generate summaries
summary_ar = self._build_summary_ar(periods, at_risk, total_pipeline, total_weighted, period)
summary_en = self._build_summary_en(periods, at_risk, total_pipeline, total_weighted, period)
recommendations = await self._generate_recommendations(periods, at_risk, total_pipeline)
return ForecastResult(
tenant_id=tenant_id,
period=period,
generated_at=now.isoformat(),
periods=periods,
at_risk_deals=at_risk,
top_deals=top_deals,
summary_ar=summary_ar,
summary_en=summary_en,
total_pipeline_value=round(total_pipeline, 2),
total_weighted_value=round(total_weighted, 2),
recommendations_ar=recommendations,
)
# ── Data Fetching ────────────────────────────
async def _fetch_deals(self, tenant_id: str, db: AsyncSession) -> list[dict]:
"""Fetch active deals for the tenant."""
from app.models.deal import Deal
stmt = (
select(Deal)
.where(
Deal.tenant_id == tenant_id,
Deal.status.notin_(["closed_lost", "archived"]),
)
.order_by(Deal.created_at.desc())
)
rows = await db.execute(stmt)
deals = []
for deal in rows.scalars().all():
last_activity = await self._get_last_activity_date(str(deal.id), db)
deals.append({
"id": str(deal.id),
"name": getattr(deal, "name", "") or getattr(deal, "title", "") or "",
"value": float(getattr(deal, "value", 0) or getattr(deal, "amount", 0) or 0),
"stage": getattr(deal, "stage", "new") or "new",
"status": getattr(deal, "status", "active") or "active",
"expected_close": getattr(deal, "expected_close_date", None) or getattr(deal, "close_date", None),
"created_at": getattr(deal, "created_at", None),
"updated_at": getattr(deal, "updated_at", None),
"last_activity": last_activity,
})
return deals
async def _get_last_activity_date(self, deal_id: str, db: AsyncSession) -> Optional[datetime]:
"""Get the most recent activity date for a deal."""
try:
from app.models.activity import Activity
stmt = (
select(func.max(Activity.created_at))
.where(Activity.deal_id == deal_id)
)
result = await db.execute(stmt)
return result.scalar_one_or_none()
except Exception:
return None
# ── Deal Forecast Calculation ────────────────
def _calculate_deal_forecasts(
self, deals_data: list[dict], now: datetime
) -> list[DealForecast]:
"""Calculate forecast for each deal."""
forecasts = []
for deal in deals_data:
base_prob = STAGE_PROBABILITIES.get(deal["stage"], 0.15)
# Adjust probability based on activity recency
days_inactive = self._days_since(deal.get("last_activity") or deal.get("updated_at"), now)
activity_modifier = self._activity_modifier(days_inactive)
adjusted_prob = max(0.0, min(1.0, base_prob * activity_modifier))
# Determine risk level
risk_level, risk_reasons = self._assess_risk(deal, days_inactive, adjusted_prob)
value = deal.get("value", 0) or 0
weighted = value * adjusted_prob
expected_close = deal.get("expected_close")
expected_close_str = None
if expected_close:
if isinstance(expected_close, datetime):
expected_close_str = expected_close.strftime("%Y-%m-%d")
elif isinstance(expected_close, str):
expected_close_str = expected_close
forecasts.append(DealForecast(
deal_id=deal["id"],
deal_name=deal.get("name", ""),
current_value=value,
close_probability=round(adjusted_prob, 2),
weighted_value=round(weighted, 2),
risk_level=risk_level,
risk_reasons_ar=risk_reasons,
days_inactive=days_inactive,
expected_close_date=expected_close_str,
))
return forecasts
@staticmethod
def _days_since(dt: Optional[datetime], now: datetime) -> int:
"""Calculate days since a given datetime."""
if not dt:
return 30 # assume inactive if no date
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
delta = now - dt
return max(0, delta.days)
@staticmethod
def _activity_modifier(days_inactive: int) -> float:
"""Reduce probability based on inactivity."""
if days_inactive <= 2:
return 1.0
elif days_inactive <= 5:
return 0.95
elif days_inactive <= 10:
return 0.85
elif days_inactive <= 14:
return 0.7
elif days_inactive <= 21:
return 0.5
elif days_inactive <= 30:
return 0.3
else:
return 0.15
@staticmethod
def _assess_risk(deal: dict, days_inactive: int, probability: float) -> tuple[str, list[str]]:
"""Assess deal risk level and generate Arabic reasons."""
reasons = []
if days_inactive >= RISK_THRESHOLDS["high"]:
reasons.append(f"لا يوجد نشاط منذ {days_inactive} يوم")
elif days_inactive >= RISK_THRESHOLDS["medium"]:
reasons.append(f"النشاط منخفض — آخر تفاعل قبل {days_inactive} أيام")
if probability < 0.2 and deal.get("value", 0) > 0:
reasons.append("احتمالية الإغلاق منخفضة جداً")
stage = deal.get("stage", "")
expected_close = deal.get("expected_close")
if expected_close:
close_dt = expected_close
if isinstance(close_dt, str):
try:
close_dt = datetime.fromisoformat(close_dt.replace("Z", "+00:00"))
except ValueError:
close_dt = None
if close_dt:
if isinstance(close_dt, datetime):
if close_dt.tzinfo is None:
close_dt = close_dt.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
if close_dt < now:
reasons.append("تجاوز تاريخ الإغلاق المتوقع")
elif (close_dt - now).days < 7 and stage in ("new", "contacted", "qualified"):
reasons.append("تاريخ الإغلاق قريب لكن المرحلة مبكرة")
if len(reasons) >= 2 or days_inactive >= RISK_THRESHOLDS["high"]:
return "high", reasons
elif len(reasons) >= 1 or days_inactive >= RISK_THRESHOLDS["medium"]:
return "medium", reasons
else:
return "low", reasons
# ── Period Grouping ──────────────────────────
def _group_by_period(
self, forecasts: list[DealForecast], period: str, now: datetime
) -> list[PeriodForecast]:
"""Group deal forecasts into monthly or quarterly buckets."""
periods: dict[str, list[DealForecast]] = {}
for deal in forecasts:
if not deal.expected_close_date:
# Default to current month/quarter
period_key = self._get_period_key(now, period)
else:
try:
close_dt = datetime.fromisoformat(deal.expected_close_date)
except ValueError:
close_dt = now
period_key = self._get_period_key(close_dt, period)
periods.setdefault(period_key, []).append(deal)
result = []
for label in sorted(periods.keys()):
deals = periods[label]
total_pipeline = sum(d.current_value for d in deals)
total_weighted = sum(d.weighted_value for d in deals)
probabilities = [d.close_probability for d in deals if d.close_probability > 0]
avg_prob = sum(probabilities) / len(probabilities) if probabilities else 0
# Best/worst case
best_case = sum(d.current_value for d in deals if d.close_probability >= 0.5)
worst_case = sum(d.current_value for d in deals if d.close_probability >= 0.8)
result.append(PeriodForecast(
period_label=label,
predicted_revenue=round(total_weighted, 2),
weighted_pipeline=round(total_pipeline, 2),
deal_count=len(deals),
avg_close_probability=round(avg_prob, 2),
best_case=round(best_case, 2),
worst_case=round(worst_case, 2),
))
return result
@staticmethod
def _get_period_key(dt: datetime, period: str) -> str:
"""Generate a period label string."""
if period == "quarterly":
quarter = (dt.month - 1) // 3 + 1
return f"Q{quarter} {dt.year}"
else:
month_name = MONTHS_AR.get(dt.month, str(dt.month))
return f"{month_name} {dt.year}"
# ── Summary Generation ───────────────────────
@staticmethod
def _build_summary_ar(
periods: list[PeriodForecast],
at_risk: list[DealForecast],
total_pipeline: float,
total_weighted: float,
period_type: str,
) -> str:
lines = ["ملخص التوقعات:"]
lines.append(f"إجمالي خط الأنابيب: {total_pipeline:,.0f} ريال")
lines.append(f"الإيراد المتوقع (مرجّح): {total_weighted:,.0f} ريال")
lines.append("")
period_label = "الشهر" if period_type == "monthly" else "الربع"
for p in periods[:4]:
lines.append(
f"{p.period_label}: {p.predicted_revenue:,.0f} ريال متوقع "
f"({p.deal_count} صفقة، احتمالية {p.avg_close_probability:.0%})"
)
if at_risk:
lines.append("")
lines.append(f"صفقات معرّضة للخطر ({len(at_risk)}):")
for deal in at_risk[:5]:
reasons = " | ".join(deal.risk_reasons_ar) if deal.risk_reasons_ar else "غير محدد"
lines.append(f" - {deal.deal_name}: {deal.current_value:,.0f} ريال — {reasons}")
return "\n".join(lines)
@staticmethod
def _build_summary_en(
periods: list[PeriodForecast],
at_risk: list[DealForecast],
total_pipeline: float,
total_weighted: float,
period_type: str,
) -> str:
lines = ["Forecast Summary:"]
lines.append(f"Total Pipeline: {total_pipeline:,.0f} SAR")
lines.append(f"Weighted Revenue: {total_weighted:,.0f} SAR")
for p in periods[:4]:
lines.append(
f"{p.period_label}: {p.predicted_revenue:,.0f} SAR predicted "
f"({p.deal_count} deals, {p.avg_close_probability:.0%} avg probability)"
)
if at_risk:
lines.append(f"At-risk deals: {len(at_risk)}")
return "\n".join(lines)
# ── AI Recommendations ───────────────────────
async def _generate_recommendations(
self,
periods: list[PeriodForecast],
at_risk: list[DealForecast],
total_pipeline: float,
) -> list[str]:
"""Generate Arabic recommendations using LLM."""
if not periods and not at_risk:
return ["لا توجد بيانات كافية لتقديم توصيات. أضف صفقات لخط الأنابيب."]
# Build context for LLM
context_parts = []
for p in periods[:3]:
context_parts.append(
f"{p.period_label}: إيراد متوقع {p.predicted_revenue:,.0f} ريال، "
f"{p.deal_count} صفقة"
)
if at_risk:
context_parts.append(f"صفقات معرضة للخطر: {len(at_risk)}")
for d in at_risk[:3]:
context_parts.append(f" - {d.deal_name}: {d.current_value:,.0f} ريال، غير نشط {d.days_inactive} يوم")
context_text = "\n".join(context_parts)
system_prompt = (
"أنت مستشار مبيعات خبير في السوق السعودي.\n"
"بناءً على بيانات التوقعات التالية، قدّم 3-5 توصيات عملية وقابلة للتنفيذ بالعربي.\n"
"أجب بصيغة JSON: {\"recommendations\": [\"توصية1\", \"توصية2\", ...]}"
)
try:
response = await self._llm.complete(
system_prompt=system_prompt,
user_message=context_text,
json_mode=True,
temperature=0.3,
max_tokens=512,
fast=True,
)
parsed = response.parse_json()
if parsed and "recommendations" in parsed:
return parsed["recommendations"]
except Exception as e:
logger.warning(f"LLM recommendation generation failed: {e}")
# Fallback static recommendations
recommendations = []
if at_risk:
recommendations.append(
f"تنبيه: {len(at_risk)} صفقة معرّضة للخطر. تواصل مع العملاء فوراً لإعادة التفعيل."
)
if total_pipeline < 100000:
recommendations.append(
"خط الأنابيب منخفض. ركّز على توليد عملاء محتملين جدد هذا الأسبوع."
)
if periods:
low_prob_periods = [p for p in periods if p.avg_close_probability < 0.3]
if low_prob_periods:
recommendations.append(
"احتمالية الإغلاق منخفضة في بعض الفترات. راجع تأهيل العملاء المحتملين."
)
recommendations.append("حدّث بيانات الصفقات بانتظام لتحسين دقة التوقعات.")
return recommendations

View File

@ -1,9 +1,6 @@
"""Multi-channel sequence engine for Dealix CRM.
Orchestrates ordered outreach steps across WhatsApp, email, and SMS
with PDPL consent checks, A/B testing, and analytics.
Orchestrates outreach steps across WhatsApp, email, SMS with PDPL consent checks and A/B testing.
"""
import logging
import random
from datetime import datetime, timedelta, timezone
@ -11,7 +8,7 @@ from typing import Optional
from uuid import UUID
from pydantic import BaseModel as Schema
from sqlalchemy import select, func, and_
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.sequence import (
@ -23,10 +20,6 @@ from app.services.pdpl.consent_manager import ConsentManager
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Pydantic schemas
# ---------------------------------------------------------------------------
class SequenceCreateInput(Schema):
tenant_id: UUID
name: str
@ -67,83 +60,54 @@ class SequenceAnalytics(Schema):
conversion_rate: float
# ---------------------------------------------------------------------------
# SequenceEngine
# ---------------------------------------------------------------------------
class SequenceEngine:
"""Manages multi-channel outreach sequences."""
def __init__(self, db: AsyncSession):
self.db = db
# -- create sequence -----------------------------------------------------
async def create_sequence(self, data: SequenceCreateInput) -> Sequence:
"""Create a new sequence with optional steps."""
seq = Sequence(
tenant_id=data.tenant_id,
name=data.name,
name_ar=data.name_ar,
description=data.description,
trigger_event=data.trigger_event,
is_active=True,
created_by=data.created_by,
tenant_id=data.tenant_id, name=data.name, name_ar=data.name_ar,
description=data.description, trigger_event=data.trigger_event,
is_active=True, created_by=data.created_by,
)
self.db.add(seq)
await self.db.flush()
for i, step_data in enumerate(data.steps):
step = SequenceStep(
sequence_id=seq.id,
step_order=i + 1,
channel=step_data.get("channel", "email"),
delay_minutes=step_data.get("delay_minutes", 0),
template_content=step_data.get("template_content", ""),
template_content_ar=step_data.get("template_content_ar"),
variant=step_data.get("variant"),
conditions=step_data.get("conditions", {}),
)
self.db.add(step)
for i, sd in enumerate(data.steps):
self.db.add(SequenceStep(
sequence_id=seq.id, step_order=i + 1,
channel=sd.get("channel", "email"), delay_minutes=sd.get("delay_minutes", 0),
template_content=sd.get("template_content", ""),
template_content_ar=sd.get("template_content_ar"),
variant=sd.get("variant"), conditions=sd.get("conditions", {}),
))
await self.db.flush()
await self.db.refresh(seq)
logger.info("Sequence created: id=%s name=%s", seq.id, seq.name)
return seq
# -- enroll lead ---------------------------------------------------------
async def enroll_lead(self, data: EnrollInput) -> SequenceEnrollment:
"""Enroll a lead into a sequence. Starts at step 0."""
# Prevent duplicate active enrollments
existing = await self.db.execute(
"""Enroll a lead into a sequence."""
existing = (await self.db.execute(
select(SequenceEnrollment).where(
SequenceEnrollment.sequence_id == data.sequence_id,
SequenceEnrollment.lead_id == data.lead_id,
SequenceEnrollment.status == SequenceStatus.ACTIVE.value,
)
)
if existing.scalar_one_or_none():
raise ValueError("العميل المحتمل مسجل بالفعل في هذا التسلسل") # Lead already enrolled
# Fetch first step to calculate next_step_at
first_step = await self.db.execute(
)).scalar_one_or_none()
if existing:
raise ValueError("العميل المحتمل مسجل بالفعل في هذا التسلسل")
first_step = (await self.db.execute(
select(SequenceStep).where(SequenceStep.sequence_id == data.sequence_id)
.order_by(SequenceStep.step_order).limit(1)
)
step = first_step.scalar_one_or_none()
)).scalar_one_or_none()
now = datetime.now(timezone.utc)
next_at = now + timedelta(minutes=step.delay_minutes) if step else None
enrollment = SequenceEnrollment(
sequence_id=data.sequence_id,
lead_id=data.lead_id,
current_step=0,
status=SequenceStatus.ACTIVE.value,
enrolled_at=now,
next_step_at=next_at,
sequence_id=data.sequence_id, lead_id=data.lead_id, current_step=0,
status=SequenceStatus.ACTIVE.value, enrolled_at=now,
next_step_at=now + timedelta(minutes=first_step.delay_minutes) if first_step else None,
)
self.db.add(enrollment)
await self.db.flush()
@ -151,207 +115,135 @@ class SequenceEngine:
logger.info("Lead enrolled: lead=%s sequence=%s", data.lead_id, data.sequence_id)
return enrollment
# -- process pending steps -----------------------------------------------
async def process_pending_steps(self, tenant_id: UUID) -> list[StepProcessResult]:
"""Process all enrollments whose next step is due. Checks PDPL consent."""
"""Process enrollments whose next step is due. Checks PDPL consent."""
now = datetime.now(timezone.utc)
consent_mgr = ConsentManager(self.db)
results: list[StepProcessResult] = []
# Find active enrollments that are due
query = (
rows = await self.db.execute(
select(SequenceEnrollment)
.join(Sequence, Sequence.id == SequenceEnrollment.sequence_id)
.where(
Sequence.tenant_id == tenant_id,
Sequence.is_active == True,
SequenceEnrollment.status == SequenceStatus.ACTIVE.value,
SequenceEnrollment.next_step_at <= now,
)
.where(Sequence.tenant_id == tenant_id, Sequence.is_active == True,
SequenceEnrollment.status == SequenceStatus.ACTIVE.value,
SequenceEnrollment.next_step_at <= now)
.limit(200)
)
rows = await self.db.execute(query)
enrollments = rows.scalars().all()
for enrollment in enrollments:
result = await self._execute_next_step(enrollment, consent_mgr)
if result:
results.append(result)
for enrollment in rows.scalars().all():
r = await self._execute_next_step(enrollment, consent_mgr)
if r:
results.append(r)
logger.info("Processed %d pending steps for tenant=%s", len(results), tenant_id)
return results
# -- pause / resume / stop -----------------------------------------------
async def pause_enrollment(self, enrollment_id: UUID) -> SequenceEnrollment:
enrollment = await self._get_enrollment(enrollment_id)
enrollment.status = SequenceStatus.PAUSED.value
e = await self._get_enrollment(enrollment_id)
e.status = SequenceStatus.PAUSED.value
await self.db.flush()
logger.info("Enrollment paused: %s", enrollment_id)
return enrollment
return e
async def resume_enrollment(self, enrollment_id: UUID) -> SequenceEnrollment:
enrollment = await self._get_enrollment(enrollment_id)
enrollment.status = SequenceStatus.ACTIVE.value
enrollment.next_step_at = datetime.now(timezone.utc)
e = await self._get_enrollment(enrollment_id)
e.status = SequenceStatus.ACTIVE.value
e.next_step_at = datetime.now(timezone.utc)
await self.db.flush()
logger.info("Enrollment resumed: %s", enrollment_id)
return enrollment
return e
async def stop_enrollment(self, enrollment_id: UUID) -> SequenceEnrollment:
enrollment = await self._get_enrollment(enrollment_id)
enrollment.status = SequenceStatus.STOPPED.value
enrollment.completed_at = datetime.now(timezone.utc)
e = await self._get_enrollment(enrollment_id)
e.status = SequenceStatus.STOPPED.value
e.completed_at = datetime.now(timezone.utc)
await self.db.flush()
logger.info("Enrollment stopped: %s", enrollment_id)
return enrollment
# -- analytics -----------------------------------------------------------
return e
async def get_sequence_analytics(self, sequence_id: UUID) -> SequenceAnalytics:
"""Compute open/response/conversion rates for a sequence."""
seq = (await self.db.execute(
select(Sequence).where(Sequence.id == sequence_id)
)).scalar_one_or_none()
select(Sequence).where(Sequence.id == sequence_id))).scalar_one_or_none()
if not seq:
raise ValueError("التسلسل غير موجود")
# Enrollment counts
def _count_enrollments(status: str):
return select(func.count()).where(
SequenceEnrollment.sequence_id == sequence_id,
SequenceEnrollment.status == status,
)
async def _enroll_count(st: str) -> int:
return (await self.db.execute(
select(func.count()).where(SequenceEnrollment.sequence_id == sequence_id,
SequenceEnrollment.status == st))).scalar() or 0
total = (await self.db.execute(
select(func.count()).where(SequenceEnrollment.sequence_id == sequence_id)
)).scalar() or 0
active = (await self.db.execute(_count_enrollments(SequenceStatus.ACTIVE.value))).scalar() or 0
completed = (await self.db.execute(_count_enrollments(SequenceStatus.COMPLETED.value))).scalar() or 0
stopped = (await self.db.execute(_count_enrollments(SequenceStatus.STOPPED.value))).scalar() or 0
select(func.count()).where(SequenceEnrollment.sequence_id == sequence_id))).scalar() or 0
active = await _enroll_count(SequenceStatus.ACTIVE.value)
completed = await _enroll_count(SequenceStatus.COMPLETED.value)
stopped = await _enroll_count(SequenceStatus.STOPPED.value)
# Event counts
base_event = (
select(func.count())
.select_from(SequenceEvent)
.join(SequenceEnrollment, SequenceEnrollment.id == SequenceEvent.enrollment_id)
.where(SequenceEnrollment.sequence_id == sequence_id)
)
total_sent = (await self.db.execute(base_event)).scalar() or 0
base = (select(func.count()).select_from(SequenceEvent)
.join(SequenceEnrollment, SequenceEnrollment.id == SequenceEvent.enrollment_id)
.where(SequenceEnrollment.sequence_id == sequence_id))
total_sent = (await self.db.execute(base)).scalar() or 0
delivered = (await self.db.execute(
base_event.where(SequenceEvent.status.in_(["delivered", "opened", "replied"]))
)).scalar() or 0
base.where(SequenceEvent.status.in_(["delivered", "opened", "replied"])))).scalar() or 0
opened = (await self.db.execute(
base_event.where(SequenceEvent.status.in_(["opened", "replied"]))
)).scalar() or 0
base.where(SequenceEvent.status.in_(["opened", "replied"])))).scalar() or 0
replied = (await self.db.execute(
base_event.where(SequenceEvent.status == "replied")
)).scalar() or 0
base.where(SequenceEvent.status == "replied"))).scalar() or 0
failed = (await self.db.execute(
base_event.where(SequenceEvent.status == "failed")
)).scalar() or 0
safe_div = lambda n, d: round(n / d * 100, 2) if d else 0.0
base.where(SequenceEvent.status == "failed"))).scalar() or 0
safe = lambda n, d: round(n / d * 100, 2) if d else 0.0
return SequenceAnalytics(
sequence_id=sequence_id,
name=seq.name,
total_enrolled=total,
active=active,
completed=completed,
stopped=stopped,
total_sent=total_sent,
delivered=delivered,
opened=opened,
replied=replied,
failed=failed,
open_rate=safe_div(opened, total_sent),
reply_rate=safe_div(replied, total_sent),
conversion_rate=safe_div(completed, total) if total else 0.0,
sequence_id=sequence_id, name=seq.name, total_enrolled=total,
active=active, completed=completed, stopped=stopped,
total_sent=total_sent, delivered=delivered, opened=opened,
replied=replied, failed=failed,
open_rate=safe(opened, total_sent), reply_rate=safe(replied, total_sent),
conversion_rate=safe(completed, total),
)
# -- private helpers -----------------------------------------------------
async def _execute_next_step(
self, enrollment: SequenceEnrollment, consent_mgr: ConsentManager,
) -> Optional[StepProcessResult]:
"""Execute the next step for an enrollment."""
steps_q = await self.db.execute(
select(SequenceStep)
.where(SequenceStep.sequence_id == enrollment.sequence_id)
.order_by(SequenceStep.step_order)
)
steps = steps_q.scalars().all()
next_idx = enrollment.current_step
if next_idx >= len(steps):
async def _execute_next_step(self, enrollment: SequenceEnrollment,
consent_mgr: ConsentManager) -> Optional[StepProcessResult]:
steps = (await self.db.execute(
select(SequenceStep).where(SequenceStep.sequence_id == enrollment.sequence_id)
.order_by(SequenceStep.step_order))).scalars().all()
idx = enrollment.current_step
if idx >= len(steps):
enrollment.status = SequenceStatus.COMPLETED.value
enrollment.completed_at = datetime.now(timezone.utc)
await self.db.flush()
return None
# A/B test: pick variant randomly if multiple exist for same order
candidates = [s for s in steps if s.step_order == steps[next_idx].step_order]
step = random.choice(candidates) if len(candidates) > 1 else steps[next_idx]
# PDPL consent check
seq = (await self.db.execute(
select(Sequence).where(Sequence.id == enrollment.sequence_id)
)).scalar_one()
consent_result = await consent_mgr.check_consent(
contact_id=enrollment.lead_id,
purpose="marketing",
channel=step.channel,
)
if not consent_result.allowed:
event = SequenceEvent(
enrollment_id=enrollment.id, step_id=step.id,
channel=step.channel, status=SequenceEventStatus.FAILED.value,
metadata={"reason": "no_consent", "message": consent_result.message},
)
self.db.add(event)
# A/B variant selection
candidates = [s for s in steps if s.step_order == steps[idx].step_order]
step = random.choice(candidates) if len(candidates) > 1 else steps[idx]
# PDPL consent gate
cr = await consent_mgr.check_consent(enrollment.lead_id, "marketing", step.channel)
if not cr.allowed:
self.db.add(SequenceEvent(
enrollment_id=enrollment.id, step_id=step.id, channel=step.channel,
status=SequenceEventStatus.FAILED.value,
metadata={"reason": "no_consent", "detail": cr.message},
))
enrollment.status = SequenceStatus.STOPPED.value
await self.db.flush()
return StepProcessResult(
enrollment_id=enrollment.id, step_id=step.id,
channel=step.channel, status="failed",
message=f"PDPL consent denied: {consent_result.message}",
)
# Record send event
event = SequenceEvent(
enrollment_id=enrollment.id, step_id=step.id,
channel=step.channel, status=SequenceEventStatus.SENT.value,
metadata={"variant": step.variant, "template_preview": step.template_content[:100]},
)
self.db.add(event)
# Advance enrollment
enrollment.current_step = next_idx + 1
return StepProcessResult(enrollment_id=enrollment.id, step_id=step.id,
channel=step.channel, status="failed",
message=f"PDPL consent denied: {cr.message}")
self.db.add(SequenceEvent(
enrollment_id=enrollment.id, step_id=step.id, channel=step.channel,
status=SequenceEventStatus.SENT.value,
metadata={"variant": step.variant, "preview": step.template_content[:80]},
))
enrollment.current_step = idx + 1
if enrollment.current_step >= len(steps):
enrollment.status = SequenceStatus.COMPLETED.value
enrollment.completed_at = datetime.now(timezone.utc)
enrollment.next_step_at = None
else:
next_step = steps[enrollment.current_step]
enrollment.next_step_at = datetime.now(timezone.utc) + timedelta(minutes=next_step.delay_minutes)
enrollment.next_step_at = datetime.now(timezone.utc) + timedelta(minutes=steps[enrollment.current_step].delay_minutes)
await self.db.flush()
return StepProcessResult(
enrollment_id=enrollment.id, step_id=step.id,
channel=step.channel, status="sent",
message=f"Step {next_idx + 1} sent via {step.channel}",
)
return StepProcessResult(enrollment_id=enrollment.id, step_id=step.id,
channel=step.channel, status="sent",
message=f"Step {idx + 1} sent via {step.channel}")
async def _get_enrollment(self, enrollment_id: UUID) -> SequenceEnrollment:
result = await self.db.execute(
select(SequenceEnrollment).where(SequenceEnrollment.id == enrollment_id)
)
enrollment = result.scalar_one_or_none()
if not enrollment:
raise ValueError("التسجيل غير موجود") # Enrollment not found
return enrollment
select(SequenceEnrollment).where(SequenceEnrollment.id == enrollment_id))
e = result.scalar_one_or_none()
if not e:
raise ValueError("التسجيل غير موجود")
return e

View File

@ -0,0 +1,127 @@
{
"industry": "education",
"name": "Education & Training",
"name_ar": "تعليم وتدريب",
"pipeline_stages": [
{"key": "inquiry", "name_en": "Inquiry", "name_ar": "استفسار", "order": 1, "probability": 10},
{"key": "consultation", "name_en": "Consultation", "name_ar": "استشارة", "order": 2, "probability": 25},
{"key": "registration", "name_en": "Registration", "name_ar": "تسجيل", "order": 3, "probability": 50},
{"key": "payment", "name_en": "Payment", "name_ar": "دفع", "order": 4, "probability": 75},
{"key": "enrolled", "name_en": "Enrolled", "name_ar": "مسجّل", "order": 5, "probability": 90},
{"key": "completed", "name_en": "Completed", "name_ar": "مكتمل", "order": 6, "probability": 100},
{"key": "alumni", "name_en": "Alumni", "name_ar": "خريج", "order": 7, "probability": 100}
],
"message_templates": [
{
"name": "welcome",
"name_ar": "رسالة ترحيب",
"channel": "whatsapp",
"trigger": "lead_created",
"content_ar": "أهلاً {name}! شكراً لتواصلك مع {company}. عندنا برامج تعليمية وتدريبية متميزة. وش البرنامج أو الدورة اللي تهمك؟",
"content_en": "Hello {name}! Thank you for contacting {company}. We offer outstanding educational and training programs. Which program interests you?",
"delay_minutes": 0
},
{
"name": "consultation_scheduled",
"name_ar": "موعد استشارة",
"channel": "whatsapp",
"trigger": "stage_change_consultation",
"content_ar": "مرحباً {name}، تم حجز جلسة استشارة مجانية لك يوم {date} الساعة {time}. مستشارنا الأكاديمي بيساعدك تختار البرنامج الأنسب لأهدافك.",
"content_en": "Hi {name}, your free consultation is scheduled for {date} at {time}. Our academic advisor will help you choose the best program for your goals.",
"delay_minutes": 0
},
{
"name": "registration_confirmed",
"name_ar": "تأكيد التسجيل",
"channel": "whatsapp",
"trigger": "stage_change_registration",
"content_ar": "مبروك {name}! تم تسجيلك في برنامج {program_name}. تاريخ البدء: {start_date}. بنرسل لك تفاصيل الدخول والمواد قريباً.",
"content_en": "Congratulations {name}! You're registered for {program_name}. Start date: {start_date}. We'll send access details and materials soon.",
"delay_minutes": 0
},
{
"name": "payment_reminder",
"name_ar": "تذكير بالدفع",
"channel": "whatsapp",
"trigger": "stage_change_payment",
"content_ar": "مرحباً {name}، للتأكيد على مقعدك في برنامج {program_name}، يرجى إتمام الدفع. المبلغ: {amount} ريال. رابط الدفع: {payment_link}. عندنا خيار تقسيط بدون فوائد.",
"content_en": "Hi {name}, to confirm your spot in {program_name}, please complete payment. Amount: {amount} SAR. Payment link: {payment_link}.",
"delay_minutes": 0
},
{
"name": "no_response_followup",
"name_ar": "متابعة عدم الرد",
"channel": "whatsapp",
"trigger": "no_response",
"content_ar": "مرحباً {name}، لاحظنا اهتمامك ببرامجنا التدريبية. المقاعد محدودة للدفعة القادمة. تبي نحجز لك مقعد أو تحتاج معلومات أكثر؟",
"content_en": "Hi {name}, we noticed your interest in our programs. Seats are limited for the next cohort. Want to reserve a spot or need more info?",
"delay_minutes": 2880
},
{
"name": "completion_certificate",
"name_ar": "شهادة إتمام",
"channel": "whatsapp",
"trigger": "stage_change_completed",
"content_ar": "مبروك {name}! أتممت برنامج {program_name} بنجاح. شهادتك جاهزة للتحميل. نفتخر فيك! تبي تطّلع على برامجنا المتقدمة؟",
"content_en": "Congratulations {name}! You've completed {program_name} successfully. Your certificate is ready for download. Interested in advanced programs?",
"delay_minutes": 0
}
],
"proposal_templates": [
{
"name": "training_program",
"name_ar": "عرض برنامج تدريبي",
"sections": [
{"title_ar": "نبذة عن البرنامج", "title_en": "Program Overview"},
{"title_ar": "المحاور والمحتوى", "title_en": "Curriculum & Content"},
{"title_ar": "مدة البرنامج والجدول", "title_en": "Duration & Schedule"},
{"title_ar": "المدربون والمؤهلات", "title_en": "Instructors & Qualifications"},
{"title_ar": "الرسوم وخيارات الدفع", "title_en": "Fees & Payment Options"},
{"title_ar": "الشهادات المعتمدة", "title_en": "Accredited Certificates"}
]
},
{
"name": "corporate_training",
"name_ar": "عرض تدريب مؤسسي",
"sections": [
{"title_ar": "تحليل الاحتياجات التدريبية", "title_en": "Training Needs Analysis"},
{"title_ar": "البرنامج المخصص", "title_en": "Customized Program"},
{"title_ar": "منهجية التدريب", "title_en": "Training Methodology"},
{"title_ar": "التسعير المؤسسي", "title_en": "Corporate Pricing"},
{"title_ar": "قياس الأثر التدريبي", "title_en": "Training Impact Measurement"},
{"title_ar": "الشروط والأحكام", "title_en": "Terms & Conditions"}
]
}
],
"workflow_templates": [
{
"name": "new_student_flow",
"name_ar": "تدفق طالب جديد",
"trigger": "lead_created",
"actions": [
{"type": "send_message", "template": "welcome", "delay_minutes": 0},
{"type": "create_task", "subject": "اتصل بالمتقدم واعرف اهتماماته التعليمية", "delay_minutes": 30},
{"type": "send_message", "template": "no_response_followup", "delay_minutes": 2880, "condition": "no_response"}
]
},
{
"name": "registration_flow",
"name_ar": "تدفق التسجيل",
"trigger": "stage_change_registration",
"actions": [
{"type": "send_message", "template": "registration_confirmed", "delay_minutes": 0},
{"type": "send_message", "template": "payment_reminder", "delay_minutes": 1440},
{"type": "create_task", "subject": "متابعة الدفع مع المتدرب", "delay_minutes": 2880}
]
},
{
"name": "alumni_engagement_flow",
"name_ar": "تدفق الخريجين",
"trigger": "stage_change_completed",
"actions": [
{"type": "send_message", "template": "completion_certificate", "delay_minutes": 0},
{"type": "create_task", "subject": "إرسال استبيان رضا المتدرب", "delay_minutes": 1440}
]
}
]
}

View File

@ -0,0 +1,117 @@
{
"industry": "retail",
"name": "Retail & E-commerce",
"name_ar": "تجارة وريتيل",
"pipeline_stages": [
{"key": "lead", "name_en": "Lead", "name_ar": "عميل محتمل", "order": 1, "probability": 10},
{"key": "interest", "name_en": "Interest", "name_ar": "مهتم", "order": 2, "probability": 20},
{"key": "demo", "name_en": "Demo", "name_ar": "عرض توضيحي", "order": 3, "probability": 40},
{"key": "trial", "name_en": "Trial", "name_ar": "تجربة", "order": 4, "probability": 55},
{"key": "negotiation", "name_en": "Negotiation", "name_ar": "تفاوض", "order": 5, "probability": 70},
{"key": "closed_won", "name_en": "Closed Won", "name_ar": "تم البيع", "order": 6, "probability": 100},
{"key": "onboarding", "name_en": "Onboarding", "name_ar": "تفعيل", "order": 7, "probability": 100}
],
"message_templates": [
{
"name": "welcome",
"name_ar": "رسالة ترحيب",
"channel": "whatsapp",
"trigger": "lead_created",
"content_ar": "أهلاً {name}! شكراً لاهتمامك بـ {company}. عندنا حلول تجارية ذكية تساعدك تنمّي مبيعاتك. وش تبي تعرف أكثر عنه؟",
"content_en": "Hello {name}! Thanks for your interest in {company}. We have smart retail solutions to grow your sales. What would you like to know more about?",
"delay_minutes": 0
},
{
"name": "demo_invitation",
"name_ar": "دعوة عرض توضيحي",
"channel": "whatsapp",
"trigger": "stage_change_demo",
"content_ar": "مرحباً {name}، جهّزنا لك عرض توضيحي مخصص لمجال {business_type}. الموعد يوم {date} الساعة {time}. بنوريك كيف تقدر تزيد مبيعاتك بنسبة تصل لـ 40%.",
"content_en": "Hi {name}, we've prepared a customized demo for your {business_type}. Scheduled for {date} at {time}. We'll show you how to increase sales by up to 40%.",
"delay_minutes": 0
},
{
"name": "trial_started",
"name_ar": "بداية التجربة",
"channel": "whatsapp",
"trigger": "stage_change_trial",
"content_ar": "مرحباً {name}! تم تفعيل حسابك التجريبي. الفترة: {trial_days} يوم. فريق الدعم جاهز يساعدك في أي وقت. استمتع بالتجربة!",
"content_en": "Hi {name}! Your trial account is activated. Duration: {trial_days} days. Our support team is ready to help anytime.",
"delay_minutes": 0
},
{
"name": "trial_ending",
"name_ar": "انتهاء التجربة",
"channel": "whatsapp",
"trigger": "scheduled",
"content_ar": "مرحباً {name}، فترتك التجريبية تنتهي خلال 3 أيام. كيف كانت التجربة؟ لو حبيت تكمل معنا عندنا عرض خاص بخصم {discount}% على الباقة السنوية.",
"content_en": "Hi {name}, your trial ends in 3 days. How was the experience? If you'd like to continue, we have a special {discount}% discount on the annual plan.",
"delay_minutes": -4320
},
{
"name": "no_response_followup",
"name_ar": "متابعة عدم الرد",
"channel": "whatsapp",
"trigger": "no_response",
"content_ar": "مرحباً {name}، لاحظنا إنك ما رديت. عادي تماماً! بس حبينا نخبرك إن عندنا عروض جديدة ممكن تناسب متجرك. تبي نرسل لك التفاصيل؟",
"content_en": "Hi {name}, we noticed you haven't responded. That's totally fine! We just wanted to let you know about new offers for your store.",
"delay_minutes": 2880
},
{
"name": "onboarding_welcome",
"name_ar": "ترحيب التفعيل",
"channel": "whatsapp",
"trigger": "stage_change_onboarding",
"content_ar": "مبروك {name}! تم تفعيل حسابك الرسمي. مدير حسابك {account_manager} بيتواصل معك خلال 24 ساعة لمساعدتك في الإعداد. نتمنى لك النجاح!",
"content_en": "Congratulations {name}! Your account is officially active. Your account manager {account_manager} will contact you within 24 hours.",
"delay_minutes": 0
}
],
"proposal_templates": [
{
"name": "retail_solution",
"name_ar": "عرض حل تجاري",
"sections": [
{"title_ar": "ملخص الحل", "title_en": "Solution Summary"},
{"title_ar": "المميزات والفوائد", "title_en": "Features & Benefits"},
{"title_ar": "الباقات والأسعار", "title_en": "Packages & Pricing"},
{"title_ar": "دراسة حالة مشابهة", "title_en": "Similar Case Study"},
{"title_ar": "خطة التفعيل", "title_en": "Activation Plan"},
{"title_ar": "الشروط والأحكام", "title_en": "Terms & Conditions"}
]
},
{
"name": "enterprise_package",
"name_ar": "باقة المؤسسات",
"sections": [
{"title_ar": "نظرة عامة", "title_en": "Overview"},
{"title_ar": "التكامل مع الأنظمة الحالية", "title_en": "Integration with Existing Systems"},
{"title_ar": "التسعير المؤسسي", "title_en": "Enterprise Pricing"},
{"title_ar": "الدعم الفني والتدريب", "title_en": "Technical Support & Training"},
{"title_ar": "اتفاقية مستوى الخدمة", "title_en": "SLA"}
]
}
],
"workflow_templates": [
{
"name": "new_retail_lead_flow",
"name_ar": "تدفق عميل تجاري جديد",
"trigger": "lead_created",
"actions": [
{"type": "send_message", "template": "welcome", "delay_minutes": 0},
{"type": "create_task", "subject": "تأهيل العميل وتحديد احتياجاته التجارية", "delay_minutes": 30},
{"type": "send_message", "template": "no_response_followup", "delay_minutes": 2880, "condition": "no_response"}
]
},
{
"name": "trial_management_flow",
"name_ar": "تدفق إدارة التجربة",
"trigger": "stage_change_trial",
"actions": [
{"type": "send_message", "template": "trial_started", "delay_minutes": 0},
{"type": "create_task", "subject": "متابعة العميل بعد 3 أيام من التجربة", "delay_minutes": 4320},
{"type": "send_message", "template": "trial_ending", "delay_minutes": -4320}
]
}
]
}