diff --git a/salesflow-saas/backend/app/api/v1/compliance.py b/salesflow-saas/backend/app/api/v1/compliance.py new file mode 100644 index 00000000..7a63028d --- /dev/null +++ b/salesflow-saas/backend/app/api/v1/compliance.py @@ -0,0 +1,245 @@ +"""PDPL Compliance API -- consent management, data subject requests, and SDAIA audit reports.""" + +import logging +from datetime import datetime, timezone +from typing import Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from pydantic import BaseModel as Schema +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import get_db +from app.api.deps import get_current_user +from app.models.user import User +from app.models.consent import ( + PDPLConsent, PDPLConsentAudit, DataRequest, + ConsentStatusEnum, DataRequestStatus, +) +from app.services.pdpl.consent_manager import ( + ConsentManager, ConsentGrantInput, ConsentRevokeInput, + ConsentCheckResult, DataRequestInput, AuditEntry, +) +from app.services.pdpl.data_rights import DataRightsHandler, ComplianceReport + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/compliance", tags=["PDPL Compliance"]) + + +# --------------------------------------------------------------------------- +# Schemas +# --------------------------------------------------------------------------- + +class ConsentCreateRequest(Schema): + contact_id: UUID + purpose: str # marketing, sales, service, analytics + channel: str # whatsapp, email, sms, phone + consent_text: Optional[str] = None + ip_address: Optional[str] = None + expiry_months: int = 12 + + +class ConsentResponse(Schema): + id: UUID + contact_id: UUID + tenant_id: UUID + purpose: str + channel: str + status: str + granted_at: Optional[datetime] = None + revoked_at: Optional[datetime] = None + expires_at: Optional[datetime] = None + created_at: datetime + + model_config = {"from_attributes": True} + + +class ConsentListResponse(Schema): + items: list[ConsentResponse] + total: int + page: int + per_page: int + + +class DataRequestCreateRequest(Schema): + contact_id: UUID + request_type: str # access, correction, deletion, restriction + notes: Optional[str] = None + + +class DataRequestResponse(Schema): + id: UUID + contact_id: UUID + request_type: str + status: str + requested_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + response_data: Optional[dict] = None + created_at: datetime + + model_config = {"from_attributes": True} + + +class AuditListResponse(Schema): + items: list[AuditEntry] + total: int + page: int + per_page: int + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + +@router.get("/consents", response_model=ConsentListResponse) +async def list_consents( + purpose: Optional[str] = Query(None), + channel: Optional[str] = Query(None), + consent_status: Optional[str] = Query(None, alias="status"), + contact_id: Optional[UUID] = Query(None), + page: int = Query(1, ge=1), + per_page: int = Query(20, ge=1, le=100), + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """List PDPL consents with filters.""" + + query = select(PDPLConsent).where(PDPLConsent.tenant_id == current_user.tenant_id) + if purpose: + query = query.where(PDPLConsent.purpose == purpose) + if channel: + query = query.where(PDPLConsent.channel == channel) + if consent_status: + query = query.where(PDPLConsent.status == consent_status) + if contact_id: + query = query.where(PDPLConsent.contact_id == contact_id) + + total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar() or 0 + rows = await db.execute( + query.order_by(PDPLConsent.created_at.desc()).offset((page - 1) * per_page).limit(per_page) + ) + items = [ConsentResponse.model_validate(c) for c in rows.scalars().all()] + return ConsentListResponse(items=items, total=total, page=page, per_page=per_page) + + +@router.post("/consent", response_model=ConsentResponse, status_code=status.HTTP_201_CREATED) +async def record_consent( + data: ConsentCreateRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Record a new PDPL consent.""" + + mgr = ConsentManager(db) + consent = await mgr.grant_consent(ConsentGrantInput( + contact_id=data.contact_id, + tenant_id=current_user.tenant_id, + purpose=data.purpose, + channel=data.channel, + consent_text=data.consent_text, + ip_address=data.ip_address, + actor_id=current_user.id, + expiry_months=data.expiry_months, + )) + return ConsentResponse.model_validate(consent) + + +@router.delete("/consent/{consent_id}", status_code=status.HTTP_200_OK) +async def revoke_consent( + consent_id: UUID, + reason: Optional[str] = Query(None), + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Revoke a PDPL consent.""" + + mgr = ConsentManager(db) + try: + consent = await mgr.revoke_consent(ConsentRevokeInput( + consent_id=consent_id, + actor_id=current_user.id, + reason=reason, + )) + except ValueError as exc: + raise HTTPException(status_code=404, detail=str(exc)) + return {"detail": "تم إلغاء الموافقة بنجاح", "consent_id": str(consent.id)} + + +@router.post("/data-request", response_model=DataRequestResponse, status_code=status.HTTP_201_CREATED) +async def submit_data_request( + data: DataRequestCreateRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Submit a data subject rights request (access, correction, deletion, restriction).""" + + valid_types = {"access", "correction", "deletion", "restriction"} + if data.request_type not in valid_types: + raise HTTPException(status_code=400, detail=f"نوع الطلب غير صالح. الأنواع المسموحة: {', '.join(valid_types)}") + + mgr = ConsentManager(db) + request = await mgr.process_data_request(DataRequestInput( + contact_id=data.contact_id, + tenant_id=current_user.tenant_id, + request_type=data.request_type, + notes=data.notes, + actor_id=current_user.id, + )) + return DataRequestResponse.model_validate(request) + + +@router.get("/data-request/{request_id}", response_model=DataRequestResponse) +async def get_data_request( + request_id: UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Check data request status.""" + + result = await db.execute( + select(DataRequest).where( + DataRequest.id == request_id, + DataRequest.tenant_id == current_user.tenant_id, + ) + ) + req = result.scalar_one_or_none() + if not req: + raise HTTPException(status_code=404, detail="طلب البيانات غير موجود") + return DataRequestResponse.model_validate(req) + + +@router.get("/audit", response_model=AuditListResponse) +async def get_audit_trail( + contact_id: Optional[UUID] = Query(None), + page: int = Query(1, ge=1), + per_page: int = Query(50, ge=1, le=200), + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Consent audit trail for compliance review.""" + + query = select(PDPLConsentAudit).where(PDPLConsentAudit.tenant_id == current_user.tenant_id) + if contact_id: + query = query.where(PDPLConsentAudit.contact_id == contact_id) + + total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar() or 0 + mgr = ConsentManager(db) + items = await mgr.get_consent_audit( + tenant_id=current_user.tenant_id, + contact_id=contact_id, + limit=per_page, + offset=(page - 1) * per_page, + ) + return AuditListResponse(items=items, total=total, page=page, per_page=per_page) + + +@router.get("/report", response_model=ComplianceReport) +async def generate_compliance_report( + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Generate SDAIA-ready PDPL compliance report.""" + + handler = DataRightsHandler(db) + return await handler.generate_compliance_report(current_user.tenant_id) diff --git a/salesflow-saas/backend/app/api/v1/inbox.py b/salesflow-saas/backend/app/api/v1/inbox.py new file mode 100644 index 00000000..1f28358f --- /dev/null +++ b/salesflow-saas/backend/app/api/v1/inbox.py @@ -0,0 +1,258 @@ +"""Unified inbox API -- aggregate messages from WhatsApp, Email, SMS.""" + +import logging +from datetime import datetime, timezone +from typing import Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from pydantic import BaseModel as Schema +from sqlalchemy import select, func, and_, or_ +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import get_db +from app.api.deps import get_current_user +from app.models.user import User +from app.models.message import Message +from app.models.lead import Lead + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/inbox", tags=["Inbox"]) + + +# --------------------------------------------------------------------------- +# Schemas +# --------------------------------------------------------------------------- + +class MessageResponse(Schema): + id: UUID + lead_id: Optional[UUID] = None + channel: str + direction: str + content: Optional[str] = None + status: str + sent_at: Optional[datetime] = None + created_at: datetime + extra_metadata: Optional[dict] = None + lead_name: Optional[str] = None + + model_config = {"from_attributes": True} + + +class MessageListResponse(Schema): + items: list[MessageResponse] + total: int + page: int + per_page: int + + +class ThreadResponse(Schema): + lead_id: UUID + lead_name: Optional[str] = None + messages: list[MessageResponse] + + +class ReplyInput(Schema): + lead_id: UUID + channel: str + content: str + + +class AssignInput(Schema): + lead_id: UUID + assigned_to: UUID + + +class InboxStats(Schema): + total_unread: int + whatsapp_unread: int + email_unread: int + sms_unread: int + avg_response_minutes: Optional[float] = None + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + +@router.get("", response_model=MessageListResponse) +async def list_inbox( + channel: Optional[str] = Query(None), + status_filter: Optional[str] = Query(None, alias="status"), + assigned_to: Optional[UUID] = Query(None), + search: Optional[str] = Query(None), + page: int = Query(1, ge=1), + per_page: int = Query(20, ge=1, le=100), + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """List all messages across channels with filters.""" + + query = select(Message).where(Message.tenant_id == current_user.tenant_id) + + if channel: + query = query.where(Message.channel == channel) + if status_filter: + query = query.where(Message.status == status_filter) + if assigned_to: + query = query.join(Lead, Lead.id == Message.lead_id).where(Lead.assigned_to == assigned_to) + if search: + query = query.where(Message.content.ilike(f"%{search}%")) + + total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar() or 0 + rows = await db.execute( + query.order_by(Message.created_at.desc()).offset((page - 1) * per_page).limit(per_page) + ) + messages = rows.scalars().all() + + # Batch-load lead names + lead_ids = {m.lead_id for m in messages if m.lead_id} + lead_map: dict[UUID, str] = {} + if lead_ids: + leads_q = await db.execute(select(Lead.id, Lead.name).where(Lead.id.in_(lead_ids))) + lead_map = {row[0]: row[1] for row in leads_q.all()} + + items = [] + for m in messages: + resp = MessageResponse.model_validate(m) + resp.lead_name = lead_map.get(m.lead_id) + items.append(resp) + + return MessageListResponse(items=items, total=total, page=page, per_page=per_page) + + +@router.get("/stats", response_model=InboxStats) +async def inbox_stats( + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Unread counts per channel and response-time metrics.""" + + tid = current_user.tenant_id + base = and_(Message.tenant_id == tid, Message.direction == "inbound", Message.status != "read") + + total = (await db.execute(select(func.count()).where(base))).scalar() or 0 + wa = (await db.execute( + select(func.count()).where(base, Message.channel == "whatsapp") + )).scalar() or 0 + em = (await db.execute( + select(func.count()).where(base, Message.channel == "email") + )).scalar() or 0 + sm = (await db.execute( + select(func.count()).where(base, Message.channel == "sms") + )).scalar() or 0 + + return InboxStats( + total_unread=total, + whatsapp_unread=wa, + email_unread=em, + sms_unread=sm, + avg_response_minutes=None, + ) + + +@router.get("/{message_id}", response_model=ThreadResponse) +async def get_thread( + message_id: UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Get full conversation thread for a message.""" + + msg = (await db.execute( + select(Message).where(Message.id == message_id, Message.tenant_id == current_user.tenant_id) + )).scalar_one_or_none() + if not msg: + raise HTTPException(status_code=404, detail="الرسالة غير موجودة") + + if not msg.lead_id: + return ThreadResponse(lead_id=message_id, messages=[MessageResponse.model_validate(msg)]) + + lead = (await db.execute(select(Lead).where(Lead.id == msg.lead_id))).scalar_one_or_none() + thread_q = await db.execute( + select(Message) + .where(Message.lead_id == msg.lead_id, Message.tenant_id == current_user.tenant_id) + .order_by(Message.created_at.asc()) + ) + messages = [MessageResponse.model_validate(m) for m in thread_q.scalars().all()] + + return ThreadResponse( + lead_id=msg.lead_id, + lead_name=lead.name if lead else None, + messages=messages, + ) + + +@router.post("/reply", response_model=MessageResponse, status_code=status.HTTP_201_CREATED) +async def reply_message( + data: ReplyInput, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Reply to a conversation -- auto-routes to the correct channel.""" + + lead = (await db.execute( + select(Lead).where(Lead.id == data.lead_id, Lead.tenant_id == current_user.tenant_id) + )).scalar_one_or_none() + if not lead: + raise HTTPException(status_code=404, detail="العميل المحتمل غير موجود") + + if data.channel not in ("whatsapp", "email", "sms"): + raise HTTPException(status_code=400, detail="قناة غير مدعومة") + + now = datetime.now(timezone.utc) + message = Message( + tenant_id=current_user.tenant_id, + lead_id=data.lead_id, + channel=data.channel, + direction="outbound", + content=data.content, + status="pending", + sent_at=now, + extra_metadata={"sent_by": str(current_user.id)}, + ) + db.add(message) + await db.flush() + await db.refresh(message) + + logger.info("Inbox reply sent: lead=%s channel=%s by=%s", data.lead_id, data.channel, current_user.id) + return MessageResponse.model_validate(message) + + +@router.post("/assign", status_code=status.HTTP_200_OK) +async def assign_conversation( + data: AssignInput, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Assign a conversation (lead) to a team member.""" + + lead = (await db.execute( + select(Lead).where(Lead.id == data.lead_id, Lead.tenant_id == current_user.tenant_id) + )).scalar_one_or_none() + if not lead: + raise HTTPException(status_code=404, detail="العميل المحتمل غير موجود") + + lead.assigned_to = data.assigned_to + await db.flush() + logger.info("Conversation assigned: lead=%s to=%s", data.lead_id, data.assigned_to) + return {"detail": "تم تعيين المحادثة بنجاح", "lead_id": str(data.lead_id), "assigned_to": str(data.assigned_to)} + + +@router.put("/{message_id}/read", status_code=status.HTTP_200_OK) +async def mark_read( + message_id: UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Mark a message as read.""" + + msg = (await db.execute( + select(Message).where(Message.id == message_id, Message.tenant_id == current_user.tenant_id) + )).scalar_one_or_none() + if not msg: + raise HTTPException(status_code=404, detail="الرسالة غير موجودة") + + msg.status = "read" + await db.flush() + return {"detail": "تم تحديد الرسالة كمقروءة", "message_id": str(message_id)} diff --git a/salesflow-saas/backend/app/api/v1/proposals.py b/salesflow-saas/backend/app/api/v1/proposals.py new file mode 100644 index 00000000..cffa7739 --- /dev/null +++ b/salesflow-saas/backend/app/api/v1/proposals.py @@ -0,0 +1,346 @@ +""" +Dealix Proposals & Quotes API +إدارة عروض الأسعار والعروض التجارية +""" + +from datetime import datetime, timezone +from typing import Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel, Field +from sqlalchemy import select, func, and_ +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import get_db +from app.api.deps import get_current_user +from app.models.user import User +from app.models.proposal import Proposal +from app.services.cpq.quote_engine import ( + QuoteEngine, QuoteCreate, LineItemInput, DiscountInput, QuoteStatus, +) +from app.services.cpq.proposal_generator import ( + ProposalGenerator, ProposalInput, +) + +router = APIRouter(prefix="/proposals", tags=["Proposals & Quotes"]) + + +# ── Request / Response Models ──────────────────── + +class ProposalCreateRequest(BaseModel): + deal_id: Optional[str] = None + lead_id: Optional[str] = None + title: str + currency: str = "SAR" + industry: str = "services" + validity_days: int = 30 + vat_registration_number: Optional[str] = None + client_name: str = "" + client_company: str = "" + notes_ar: str = "" + + +class ProposalUpdateRequest(BaseModel): + title: Optional[str] = None + notes_ar: Optional[str] = None + validity_days: Optional[int] = None + vat_registration_number: Optional[str] = None + + +class SendRequest(BaseModel): + channel: str = Field(pattern=r"^(whatsapp|email)$", default="whatsapp") + recipient: str + + +class AcceptRequest(BaseModel): + client_signature: str = "" + notes: str = "" + + +class AIProposalRequest(BaseModel): + deal_title: str + client_name: str + client_company: str = "" + industry: str = "services" + deal_value: float = 0.0 + currency: str = "SAR" + requirements: str = "" + language: str = "ar" + extra_context: str = "" + + +# ── Endpoints ──────────────────────────────────── + +@router.get("") +async def list_proposals( + status: Optional[str] = Query(None), + deal_id: Optional[UUID] = Query(None), + date_from: Optional[str] = Query(None), + date_to: Optional[str] = Query(None), + page: int = Query(1, ge=1), + per_page: int = Query(25, ge=1, le=100), + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """List proposals with filters.""" + query = select(Proposal).where(Proposal.tenant_id == current_user.tenant_id) + + if status: + query = query.where(Proposal.status == status) + if deal_id: + query = query.where(Proposal.deal_id == deal_id) + if date_from: + query = query.where(Proposal.created_at >= datetime.fromisoformat(date_from)) + if date_to: + query = query.where(Proposal.created_at <= datetime.fromisoformat(date_to)) + + count_q = select(func.count()).select_from(query.subquery()) + total = (await db.execute(count_q)).scalar() or 0 + + query = query.order_by(Proposal.created_at.desc()) + query = query.offset((page - 1) * per_page).limit(per_page) + result = await db.execute(query) + items = [_proposal_dict(p) for p in result.scalars().all()] + + return {"items": items, "total": total, "page": page, "per_page": per_page} + + +@router.post("", status_code=201) +async def create_proposal( + data: ProposalCreateRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Create a new proposal/quote.""" + engine = QuoteEngine(db) + quote_data = QuoteCreate( + tenant_id=str(current_user.tenant_id), + deal_id=data.deal_id, + lead_id=data.lead_id, + title=data.title, + currency=data.currency, + industry=data.industry, + validity_days=data.validity_days, + vat_registration_number=data.vat_registration_number, + client_name=data.client_name, + client_company=data.client_company, + notes_ar=data.notes_ar, + ) + return await engine.create_quote(quote_data) + + +@router.get("/analytics") +async def proposal_analytics( + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Win rate, average deal size, time-to-close analytics.""" + tid = current_user.tenant_id + + total_q = select(func.count()).where(Proposal.tenant_id == tid) + total = (await db.execute(total_q)).scalar() or 0 + + accepted_q = select(func.count()).where( + Proposal.tenant_id == tid, Proposal.status == QuoteStatus.ACCEPTED.value, + ) + accepted = (await db.execute(accepted_q)).scalar() or 0 + + rejected_q = select(func.count()).where( + Proposal.tenant_id == tid, Proposal.status == QuoteStatus.REJECTED.value, + ) + rejected = (await db.execute(rejected_q)).scalar() or 0 + + avg_value_q = select(func.avg(Proposal.total_amount)).where( + Proposal.tenant_id == tid, Proposal.status == QuoteStatus.ACCEPTED.value, + ) + avg_value = (await db.execute(avg_value_q)).scalar() + + decided = accepted + rejected + win_rate = round((accepted / decided) * 100, 1) if decided > 0 else 0.0 + + return { + "total_proposals": total, + "accepted": accepted, + "rejected": rejected, + "win_rate_percent": win_rate, + "average_deal_value": float(avg_value) if avg_value else 0.0, + "currency": "SAR", + } + + +@router.get("/{proposal_id}") +async def get_proposal( + proposal_id: UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Get full proposal details.""" + result = await db.execute( + select(Proposal).where( + Proposal.id == proposal_id, + Proposal.tenant_id == current_user.tenant_id, + ) + ) + proposal = result.scalar_one_or_none() + if not proposal: + raise HTTPException(status_code=404, detail="عرض السعر غير موجود") + return _proposal_dict(proposal) + + +@router.put("/{proposal_id}") +async def update_proposal( + proposal_id: UUID, + data: ProposalUpdateRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Update proposal metadata.""" + result = await db.execute( + select(Proposal).where( + Proposal.id == proposal_id, + Proposal.tenant_id == current_user.tenant_id, + ) + ) + proposal = result.scalar_one_or_none() + if not proposal: + raise HTTPException(status_code=404, detail="عرض السعر غير موجود") + + if data.title is not None: + proposal.title = data.title + if data.notes_ar is not None: + content = dict(proposal.content) + content["notes_ar"] = data.notes_ar + proposal.content = content + if data.vat_registration_number is not None: + content = dict(proposal.content) + content["vat_registration_number"] = data.vat_registration_number + proposal.content = content + + await db.flush() + await db.refresh(proposal) + return _proposal_dict(proposal) + + +@router.post("/{proposal_id}/items") +async def add_line_item( + proposal_id: UUID, + item: LineItemInput, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Add a line item to the quote.""" + engine = QuoteEngine(db) + return await engine.add_line_item(str(current_user.tenant_id), str(proposal_id), item) + + +@router.post("/{proposal_id}/discount") +async def apply_discount( + proposal_id: UUID, + discount: DiscountInput, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Apply a discount to the quote.""" + engine = QuoteEngine(db) + return await engine.apply_discount(str(current_user.tenant_id), str(proposal_id), discount) + + +@router.post("/{proposal_id}/send") +async def send_proposal( + proposal_id: UUID, + data: SendRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Send proposal via WhatsApp or Email.""" + engine = QuoteEngine(db) + return await engine.send_quote( + str(current_user.tenant_id), str(proposal_id), data.channel, data.recipient, + ) + + +@router.post("/{proposal_id}/accept") +async def accept_proposal( + proposal_id: UUID, + data: AcceptRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Client acceptance endpoint.""" + result = await db.execute( + select(Proposal).where( + Proposal.id == proposal_id, + Proposal.tenant_id == current_user.tenant_id, + ) + ) + proposal = result.scalar_one_or_none() + if not proposal: + raise HTTPException(status_code=404, detail="عرض السعر غير موجود") + if proposal.status == QuoteStatus.EXPIRED.value: + raise HTTPException(status_code=400, detail="عرض السعر منتهي الصلاحية") + + proposal.status = QuoteStatus.ACCEPTED.value + content = dict(proposal.content) + content["acceptance"] = { + "signature": data.client_signature, + "notes": data.notes, + "accepted_at": datetime.now(timezone.utc).isoformat(), + } + proposal.content = content + await db.flush() + await db.refresh(proposal) + return _proposal_dict(proposal) + + +@router.get("/{proposal_id}/pdf") +async def generate_pdf_data( + proposal_id: UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Generate PDF-ready data for a proposal.""" + result = await db.execute( + select(Proposal).where( + Proposal.id == proposal_id, + Proposal.tenant_id == current_user.tenant_id, + ) + ) + proposal = result.scalar_one_or_none() + if not proposal: + raise HTTPException(status_code=404, detail="عرض السعر غير موجود") + + generator = ProposalGenerator() + ai_req = ProposalInput( + deal_title=proposal.title, + client_name=proposal.content.get("client_name", ""), + client_company=proposal.content.get("client_company", ""), + industry=proposal.content.get("industry", "services"), + deal_value=float(proposal.total_amount) if proposal.total_amount else 0.0, + currency=proposal.currency or "SAR", + requirements=proposal.content.get("notes_ar", ""), + language="both", + ) + ai_proposal = await generator.generate_proposal(ai_req) + return await generator.export_pdf_data(ai_proposal) + + +# ── Helpers ────────────────────────────────────── + +def _proposal_dict(p: Proposal) -> dict: + return { + "id": str(p.id), + "tenant_id": str(p.tenant_id), + "deal_id": str(p.deal_id) if p.deal_id else None, + "lead_id": str(p.lead_id) if p.lead_id else None, + "title": p.title, + "content": p.content, + "total_amount": str(p.total_amount) if p.total_amount else "0", + "currency": p.currency, + "status": p.status, + "valid_until": p.valid_until.isoformat() if p.valid_until else None, + "sent_at": p.sent_at.isoformat() if p.sent_at else None, + "viewed_at": p.viewed_at.isoformat() if p.viewed_at else None, + "created_at": p.created_at.isoformat() if p.created_at else None, + "updated_at": p.updated_at.isoformat() if p.updated_at else None, + } diff --git a/salesflow-saas/backend/app/api/v1/router.py b/salesflow-saas/backend/app/api/v1/router.py index 5df3ac56..9cb6bf20 100644 --- a/salesflow-saas/backend/app/api/v1/router.py +++ b/salesflow-saas/backend/app/api/v1/router.py @@ -4,7 +4,9 @@ from app.api.v1 import ( companies, contacts, calls, meetings, commissions, payouts, disputes, guarantees, consents, complaints, knowledge, sectors, presentations, supervisor, admin, health, analytics, webhooks, prospecting, + inbox, sequences, ) +from app.api.v1 import compliance as compliance_router from app.api.v1 import agents as agents_router from app.api.v1 import intelligence as intelligence_router from app.api.v1 import master as master_router @@ -56,6 +58,9 @@ api_router.include_router(operations_router.router) api_router.include_router(analytics.router, tags=["Analytics & AI"]) api_router.include_router(webhooks.router, tags=["Webhooks"]) api_router.include_router(prospecting.router, prefix="/prospecting", tags=["Prospecting"]) +api_router.include_router(inbox.router) +api_router.include_router(sequences.router) +api_router.include_router(compliance_router.router) # ── Manus Multi-Agent + Autonomous Intelligence ───────────── api_router.include_router(agents_router.router) diff --git a/salesflow-saas/backend/app/api/v1/sequences.py b/salesflow-saas/backend/app/api/v1/sequences.py new file mode 100644 index 00000000..d16e0352 --- /dev/null +++ b/salesflow-saas/backend/app/api/v1/sequences.py @@ -0,0 +1,239 @@ +"""Sequences API -- create, manage, and analyze multi-channel outreach sequences.""" + +import logging +from typing import Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from pydantic import BaseModel as Schema +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import get_db +from app.api.deps import get_current_user +from app.models.user import User +from app.models.sequence import Sequence, SequenceStep, SequenceEnrollment +from app.services.sequence_engine import ( + SequenceEngine, SequenceCreateInput, EnrollInput, SequenceAnalytics, +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/sequences", tags=["Sequences"]) + + +# --------------------------------------------------------------------------- +# Schemas +# --------------------------------------------------------------------------- + +class StepSchema(Schema): + channel: str + delay_minutes: int = 0 + template_content: str + template_content_ar: Optional[str] = None + variant: Optional[str] = None + conditions: dict = {} + + +class SequenceCreateRequest(Schema): + name: str + name_ar: Optional[str] = None + description: Optional[str] = None + trigger_event: Optional[str] = None + steps: list[StepSchema] = [] + + +class SequenceUpdateRequest(Schema): + name: Optional[str] = None + name_ar: Optional[str] = None + description: Optional[str] = None + trigger_event: Optional[str] = None + is_active: Optional[bool] = None + + +class SequenceResponse(Schema): + id: UUID + name: str + name_ar: Optional[str] = None + description: Optional[str] = None + trigger_event: Optional[str] = None + is_active: bool + created_at: object + step_count: int = 0 + enrollment_count: int = 0 + + model_config = {"from_attributes": True} + + +class SequenceListResponse(Schema): + items: list[SequenceResponse] + total: int + + +class EnrollRequest(Schema): + lead_id: UUID + + +class EnrollmentResponse(Schema): + id: UUID + sequence_id: UUID + lead_id: UUID + current_step: int + status: str + enrolled_at: object + + model_config = {"from_attributes": True} + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + +@router.get("", response_model=SequenceListResponse) +async def list_sequences( + page: int = Query(1, ge=1), + per_page: int = Query(20, ge=1, le=100), + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """List sequences with analytics summary.""" + + tid = current_user.tenant_id + total = (await db.execute( + select(func.count()).where(Sequence.tenant_id == tid) + )).scalar() or 0 + + rows = await db.execute( + select(Sequence) + .where(Sequence.tenant_id == tid) + .order_by(Sequence.created_at.desc()) + .offset((page - 1) * per_page).limit(per_page) + ) + sequences = rows.scalars().all() + + items = [] + for seq in sequences: + step_count = (await db.execute( + select(func.count()).where(SequenceStep.sequence_id == seq.id) + )).scalar() or 0 + enroll_count = (await db.execute( + select(func.count()).where(SequenceEnrollment.sequence_id == seq.id) + )).scalar() or 0 + resp = SequenceResponse.model_validate(seq) + resp.step_count = step_count + resp.enrollment_count = enroll_count + items.append(resp) + + return SequenceListResponse(items=items, total=total) + + +@router.post("", response_model=SequenceResponse, status_code=status.HTTP_201_CREATED) +async def create_sequence( + data: SequenceCreateRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Create a new sequence with steps.""" + + engine = SequenceEngine(db) + seq = await engine.create_sequence(SequenceCreateInput( + tenant_id=current_user.tenant_id, + name=data.name, + name_ar=data.name_ar, + description=data.description, + trigger_event=data.trigger_event, + created_by=current_user.id, + steps=[s.model_dump() for s in data.steps], + )) + + resp = SequenceResponse.model_validate(seq) + resp.step_count = len(data.steps) + return resp + + +@router.put("/{sequence_id}", response_model=SequenceResponse) +async def update_sequence( + sequence_id: UUID, + data: SequenceUpdateRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Update sequence metadata.""" + + seq = (await db.execute( + select(Sequence).where(Sequence.id == sequence_id, Sequence.tenant_id == current_user.tenant_id) + )).scalar_one_or_none() + if not seq: + raise HTTPException(status_code=404, detail="التسلسل غير موجود") + + for field, val in data.model_dump(exclude_none=True).items(): + setattr(seq, field, val) + await db.flush() + await db.refresh(seq) + logger.info("Sequence updated: id=%s", sequence_id) + return SequenceResponse.model_validate(seq) + + +@router.post("/{sequence_id}/enroll", response_model=EnrollmentResponse, status_code=status.HTTP_201_CREATED) +async def enroll_lead( + sequence_id: UUID, + data: EnrollRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Enroll a lead into a sequence.""" + + seq = (await db.execute( + select(Sequence).where(Sequence.id == sequence_id, Sequence.tenant_id == current_user.tenant_id) + )).scalar_one_or_none() + if not seq: + raise HTTPException(status_code=404, detail="التسلسل غير موجود") + if not seq.is_active: + raise HTTPException(status_code=400, detail="التسلسل غير نشط") + + engine = SequenceEngine(db) + try: + enrollment = await engine.enroll_lead(EnrollInput(sequence_id=sequence_id, lead_id=data.lead_id)) + except ValueError as exc: + raise HTTPException(status_code=409, detail=str(exc)) + return EnrollmentResponse.model_validate(enrollment) + + +@router.delete("/{sequence_id}/enrollments/{enrollment_id}", status_code=status.HTTP_200_OK) +async def stop_enrollment( + sequence_id: UUID, + enrollment_id: UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Stop an active enrollment.""" + + enrollment = (await db.execute( + select(SequenceEnrollment).where( + SequenceEnrollment.id == enrollment_id, + SequenceEnrollment.sequence_id == sequence_id, + ) + )).scalar_one_or_none() + if not enrollment: + raise HTTPException(status_code=404, detail="التسجيل غير موجود") + + engine = SequenceEngine(db) + await engine.stop_enrollment(enrollment_id) + return {"detail": "تم إيقاف التسجيل بنجاح", "enrollment_id": str(enrollment_id)} + + +@router.get("/{sequence_id}/analytics", response_model=SequenceAnalytics) +async def get_analytics( + sequence_id: UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Detailed analytics for a sequence.""" + + seq = (await db.execute( + select(Sequence).where(Sequence.id == sequence_id, Sequence.tenant_id == current_user.tenant_id) + )).scalar_one_or_none() + if not seq: + raise HTTPException(status_code=404, detail="التسلسل غير موجود") + + engine = SequenceEngine(db) + return await engine.get_sequence_analytics(sequence_id) diff --git a/salesflow-saas/backend/app/services/ai/conversation_intelligence.py b/salesflow-saas/backend/app/services/ai/conversation_intelligence.py new file mode 100644 index 00000000..888fbe06 --- /dev/null +++ b/salesflow-saas/backend/app/services/ai/conversation_intelligence.py @@ -0,0 +1,403 @@ +""" +Arabic Conversation Intelligence — Analyzes WhatsApp/email threads +to extract insights, buying/risk signals, and next-best-action recommendations. +""" + +import json +import logging +import re +from dataclasses import dataclass, field +from typing import Optional + +from app.services.llm.provider import get_llm + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Data models +# --------------------------------------------------------------------------- + +@dataclass +class BuyingSignal: + phrase: str + confidence: float + signal_type: str # "explicit", "implicit" + + +@dataclass +class RiskSignal: + phrase: str + risk_type: str # "price_objection", "competitor", "hesitation", "delay" + severity: str # "low", "medium", "high" + + +@dataclass +class ActionItem: + description_ar: str + description_en: str + priority: str # "high", "medium", "low" + due_hint: Optional[str] = None # "today", "this_week", "next_week" + + +@dataclass +class ConversationInsight: + summary_ar: str + summary_en: str + key_topics: list[str] = field(default_factory=list) + buying_signals: list[BuyingSignal] = field(default_factory=list) + risk_signals: list[RiskSignal] = field(default_factory=list) + objections: list[str] = field(default_factory=list) + action_items: list[ActionItem] = field(default_factory=list) + next_best_action_ar: str = "" + next_best_action_en: str = "" + quality_score: float = 0.0 # 0.0 - 10.0 + message_count: int = 0 + dominant_language: str = "ar" + + +# --------------------------------------------------------------------------- +# Pattern constants +# --------------------------------------------------------------------------- + +BUYING_SIGNAL_PATTERNS = [ + (r"أبي\s*عرض\s*سعر", "explicit", 0.9), + (r"كم\s*السعر", "explicit", 0.85), + (r"متى\s*تقدرون\s*تبد[وأ]ون", "explicit", 0.9), + (r"أبي\s*أشتري", "explicit", 0.95), + (r"نبي\s*ن[شس]تري", "explicit", 0.95), + (r"عطوني\s*عرض", "explicit", 0.85), + (r"ودي\s*آخذ", "explicit", 0.8), + (r"أبغى\s*أطلب", "explicit", 0.9), + (r"جاهز[ية]?\s*نبدأ", "explicit", 0.95), + (r"كيف\s*طريقة\s*الدفع", "implicit", 0.8), + (r"فيه\s*ضمان", "implicit", 0.7), + (r"عندكم\s*تجربة\s*مجانية", "implicit", 0.6), + (r"متى\s*يوصل", "implicit", 0.7), + (r"أبي\s*أعرف\s*أكثر", "implicit", 0.5), + (r"send\s*(?:me\s*)?(?:a\s*)?quot(?:e|ation)", "explicit", 0.85), + (r"how\s*(?:much|soon)", "implicit", 0.7), + (r"ready\s*to\s*(?:start|buy|proceed)", "explicit", 0.95), +] + +RISK_SIGNAL_PATTERNS = [ + (r"غالي", "price_objection", "medium", 0.8), + (r"فيه\s*أرخص", "competitor", "high", 0.85), + (r"بفكر", "hesitation", "medium", 0.7), + (r"مو\s*متأكد", "hesitation", "high", 0.8), + (r"خلني\s*أستشير", "delay", "medium", 0.6), + (r"أرجع\s*لك", "delay", "medium", 0.5), + (r"مو\s*الحين", "delay", "medium", 0.7), + (r"ما\s*عندي\s*ميزانية", "price_objection", "high", 0.9), + (r"المنافس\s*يعطينا\s*أحسن", "competitor", "high", 0.9), + (r"نستخدم\s*نظام\s*ثاني", "competitor", "medium", 0.7), + (r"ما\s*شفت\s*فايدة", "hesitation", "high", 0.85), + (r"too\s*expensive", "price_objection", "medium", 0.8), + (r"not\s*sure", "hesitation", "medium", 0.7), + (r"(?:need|let)\s*(?:me\s*)?think", "delay", "medium", 0.6), +] + + +# --------------------------------------------------------------------------- +# Service +# --------------------------------------------------------------------------- + +class ConversationIntelligence: + """Analyzes Arabic/English conversation threads for sales intelligence.""" + + def __init__(self): + self._llm = get_llm() + + async def analyze_conversation( + self, messages: list[dict], context: Optional[dict] = None + ) -> ConversationInsight: + """ + Analyze a conversation thread. + + Args: + messages: List of {"role": "lead"|"agent", "content": str, "timestamp": str, "channel": str} + context: Optional lead/deal context {"lead_name", "company", "industry", "stage"} + + Returns: + ConversationInsight with full analysis. + """ + context = context or {} + + if not messages: + return ConversationInsight( + summary_ar="لا توجد رسائل للتحليل", + summary_en="No messages to analyze", + message_count=0, + ) + + # Regex-based signal extraction (fast) + buying_signals = self._extract_buying_signals(messages) + risk_signals = self._extract_risk_signals(messages) + + # LLM-based deep analysis + try: + llm_insight = await self._llm_analyze(messages, context) + except Exception as e: + logger.warning(f"LLM conversation analysis failed: {e}") + llm_insight = {} + + # Merge regex and LLM results + quality_score = self._calculate_quality_score(messages, buying_signals, risk_signals) + + # Combine LLM action items with defaults + action_items = self._parse_action_items(llm_insight.get("action_items", [])) + if not action_items: + action_items = self._generate_default_actions(buying_signals, risk_signals) + + return ConversationInsight( + summary_ar=llm_insight.get("summary_ar", self._build_fallback_summary_ar(messages)), + summary_en=llm_insight.get("summary_en", self._build_fallback_summary_en(messages)), + key_topics=llm_insight.get("key_topics", []), + buying_signals=buying_signals, + risk_signals=risk_signals, + objections=llm_insight.get("objections", []), + action_items=action_items, + next_best_action_ar=llm_insight.get( + "next_best_action_ar", + self._default_next_action_ar(buying_signals, risk_signals), + ), + next_best_action_en=llm_insight.get("next_best_action_en", "Follow up with the lead"), + quality_score=round(quality_score, 1), + message_count=len(messages), + dominant_language=self._detect_dominant_language(messages), + ) + + # ── Regex Signal Extraction ────────────────── + + def _extract_buying_signals(self, messages: list[dict]) -> list[BuyingSignal]: + """Extract buying signals from conversation using regex patterns.""" + signals = [] + lead_texts = " ".join( + m.get("content", "") for m in messages if m.get("role") == "lead" + ) + seen_phrases = set() + + for pattern, signal_type, confidence in BUYING_SIGNAL_PATTERNS: + for match in re.finditer(pattern, lead_texts, re.IGNORECASE): + phrase = match.group(0).strip() + if phrase not in seen_phrases: + seen_phrases.add(phrase) + signals.append(BuyingSignal( + phrase=phrase, + confidence=confidence, + signal_type=signal_type, + )) + return signals + + def _extract_risk_signals(self, messages: list[dict]) -> list[RiskSignal]: + """Extract risk signals from conversation using regex patterns.""" + signals = [] + lead_texts = " ".join( + m.get("content", "") for m in messages if m.get("role") == "lead" + ) + seen_phrases = set() + + for pattern, risk_type, severity, _confidence in RISK_SIGNAL_PATTERNS: + for match in re.finditer(pattern, lead_texts, re.IGNORECASE): + phrase = match.group(0).strip() + if phrase not in seen_phrases: + seen_phrases.add(phrase) + signals.append(RiskSignal( + phrase=phrase, + risk_type=risk_type, + severity=severity, + )) + return signals + + # ── LLM Deep Analysis ──────────────────────── + + async def _llm_analyze(self, messages: list[dict], context: dict) -> dict: + """Use LLM for deep conversation analysis.""" + thread_text = self._format_thread(messages) + context_str = "" + if context: + context_str = ( + f"معلومات العميل: {context.get('lead_name', 'غير معروف')}, " + f"الشركة: {context.get('company', 'غير معروف')}, " + f"القطاع: {context.get('industry', 'غير محدد')}, " + f"المرحلة: {context.get('stage', 'غير محدد')}" + ) + + system_prompt = ( + "أنت محلل محادثات مبيعات خبير في السوق السعودي.\n" + "حلل المحادثة التالية واستخرج:\n" + "1. ملخص المحادثة بالعربي والإنجليزي\n" + "2. المواضيع الرئيسية\n" + "3. الاعتراضات التي طرحها العميل\n" + "4. المهام والإجراءات المطلوبة\n" + "5. أفضل إجراء تالي\n\n" + f"{context_str}\n\n" + "أجب بصيغة JSON بالضبط:\n" + "{\n" + ' "summary_ar": "ملخص بالعربي",\n' + ' "summary_en": "English summary",\n' + ' "key_topics": ["موضوع1", "موضوع2"],\n' + ' "objections": ["اعتراض1", "اعتراض2"],\n' + ' "action_items": [\n' + ' {"description_ar": "وصف", "description_en": "desc", "priority": "high|medium|low", "due_hint": "today|this_week|next_week"}\n' + " ],\n" + ' "next_best_action_ar": "الإجراء التالي بالعربي",\n' + ' "next_best_action_en": "Next action in English"\n' + "}" + ) + + response = await self._llm.complete( + system_prompt=system_prompt, + user_message=thread_text, + json_mode=True, + temperature=0.2, + max_tokens=1024, + ) + parsed = response.parse_json() + return parsed or {} + + # ── Quality Scoring ────────────────────────── + + def _calculate_quality_score( + self, + messages: list[dict], + buying_signals: list[BuyingSignal], + risk_signals: list[RiskSignal], + ) -> float: + """Calculate conversation quality score (0-10).""" + score = 5.0 # baseline + + # Message volume factor + msg_count = len(messages) + if msg_count >= 10: + score += 1.0 + elif msg_count >= 5: + score += 0.5 + + # Two-way engagement + lead_msgs = sum(1 for m in messages if m.get("role") == "lead") + agent_msgs = sum(1 for m in messages if m.get("role") == "agent") + if lead_msgs > 0 and agent_msgs > 0: + ratio = min(lead_msgs, agent_msgs) / max(lead_msgs, agent_msgs) + score += ratio * 1.5 # balanced conversation = higher quality + + # Buying signals boost + score += min(len(buying_signals) * 0.5, 2.0) + + # Risk signals penalty + high_risks = sum(1 for r in risk_signals if r.severity == "high") + score -= min(high_risks * 0.5, 2.0) + + # Average message length (longer = more engaged) + avg_len = sum(len(m.get("content", "")) for m in messages) / max(msg_count, 1) + if avg_len > 100: + score += 0.5 + + return max(0.0, min(10.0, score)) + + # ── Helpers ────────────────────────────────── + + @staticmethod + def _format_thread(messages: list[dict]) -> str: + """Format messages into a readable thread for LLM.""" + lines = [] + for m in messages[-30:]: # limit to last 30 messages + role = "العميل" if m.get("role") == "lead" else "المندوب" + timestamp = m.get("timestamp", "") + channel = m.get("channel", "") + prefix = f"[{timestamp}] [{channel}] {role}" if timestamp else f"[{channel}] {role}" + lines.append(f"{prefix}: {m.get('content', '')}") + return "\n".join(lines) + + @staticmethod + def _detect_dominant_language(messages: list[dict]) -> str: + """Quick check on whether the conversation is mostly Arabic or English.""" + arabic_re = re.compile(r"[\u0600-\u06FF]") + arabic_chars = 0 + total_chars = 0 + for m in messages: + content = m.get("content", "") + arabic_chars += len(arabic_re.findall(content)) + total_chars += len(content) + if total_chars == 0: + return "ar" + return "ar" if (arabic_chars / total_chars) > 0.3 else "en" + + @staticmethod + def _parse_action_items(raw_items: list) -> list[ActionItem]: + """Parse LLM action items into ActionItem objects.""" + items = [] + for item in raw_items: + if isinstance(item, dict): + items.append(ActionItem( + description_ar=item.get("description_ar", ""), + description_en=item.get("description_en", ""), + priority=item.get("priority", "medium"), + due_hint=item.get("due_hint"), + )) + return items + + @staticmethod + def _generate_default_actions( + buying_signals: list[BuyingSignal], risk_signals: list[RiskSignal] + ) -> list[ActionItem]: + """Generate default actions when LLM is unavailable.""" + actions = [] + if buying_signals: + explicit = [s for s in buying_signals if s.signal_type == "explicit"] + if explicit: + actions.append(ActionItem( + description_ar="العميل أبدى رغبة شرائية واضحة — أرسل عرض سعر فوراً", + description_en="Lead showed explicit buying intent - send proposal immediately", + priority="high", + due_hint="today", + )) + high_risks = [r for r in risk_signals if r.severity == "high"] + if high_risks: + risk_types = set(r.risk_type for r in high_risks) + if "price_objection" in risk_types: + actions.append(ActionItem( + description_ar="العميل يشوف السعر غالي — جهّز مقارنة قيمة وعرض خاص", + description_en="Price objection detected - prepare value comparison and discount offer", + priority="high", + due_hint="today", + )) + if "competitor" in risk_types: + actions.append(ActionItem( + description_ar="العميل يقارن بالمنافسين — جهّز مقارنة تنافسية", + description_en="Competitor comparison detected - prepare competitive analysis", + priority="high", + due_hint="today", + )) + if not actions: + actions.append(ActionItem( + description_ar="تابع مع العميل برسالة واتساب ودية", + description_en="Follow up with a friendly WhatsApp message", + priority="medium", + due_hint="this_week", + )) + return actions + + @staticmethod + def _default_next_action_ar( + buying_signals: list[BuyingSignal], risk_signals: list[RiskSignal] + ) -> str: + if any(s.signal_type == "explicit" for s in buying_signals): + return "العميل جاهز! أرسل عرض سعر مخصص واتصل خلال ساعة." + high_risks = [r for r in risk_signals if r.severity == "high"] + if high_risks: + return "انتبه — فيه إشارات خطر. عالج الاعتراضات قبل المتابعة." + if buying_signals: + return "فيه اهتمام. أرسل معلومات إضافية وحدد موعد عرض." + return "تابع المحادثة واسأل عن احتياجات العميل." + + @staticmethod + def _build_fallback_summary_ar(messages: list[dict]) -> str: + count = len(messages) + lead_count = sum(1 for m in messages if m.get("role") == "lead") + return f"محادثة مكونة من {count} رسالة ({lead_count} من العميل). لم يتم تحليل المحتوى بالتفصيل." + + @staticmethod + def _build_fallback_summary_en(messages: list[dict]) -> str: + count = len(messages) + lead_count = sum(1 for m in messages if m.get("role") == "lead") + return f"Conversation with {count} messages ({lead_count} from lead). Detailed analysis unavailable." diff --git a/salesflow-saas/backend/app/services/ai/message_writer.py b/salesflow-saas/backend/app/services/ai/message_writer.py new file mode 100644 index 00000000..c603a34c --- /dev/null +++ b/salesflow-saas/backend/app/services/ai/message_writer.py @@ -0,0 +1,436 @@ +""" +AI Message Writer — Generates personalized WhatsApp, Email, and SMS messages +in Arabic and English with tone control, A/B variants, and Saudi business-hour awareness. +""" + +import json +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone, timedelta +from typing import Optional + +from app.services.llm.provider import get_llm + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Data models +# --------------------------------------------------------------------------- + +@dataclass +class MessageVariant: + content: str + subject: Optional[str] = None # for email only + variant_label: str = "A" + estimated_read_time_sec: int = 0 + + +@dataclass +class MessageDraft: + channel: str # "whatsapp", "email", "sms" + language: str # "ar", "en" + tone: str + variants: list[MessageVariant] = field(default_factory=list) + best_send_time: Optional[str] = None + best_send_day: Optional[str] = None + personalization_used: list[str] = field(default_factory=list) + metadata: dict = field(default_factory=dict) + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +CHANNEL_LIMITS = { + "whatsapp": 4096, + "sms": 160, + "email": 10000, +} + +TONE_INSTRUCTIONS = { + "formal": { + "ar": ( + "استخدم لغة رسمية واحترافية. خاطب العميل بـ'حضرتكم'. " + "ابدأ بالسلام الرسمي. تجنب العامية." + ), + "en": "Use formal, professional language. Address the recipient respectfully.", + }, + "friendly": { + "ar": ( + "استخدم لغة ودية وعفوية. خاطب العميل بـ'أنت'. " + "استخدم تعبيرات سعودية طبيعية مثل 'هلا والله' و'يعطيك العافية'." + ), + "en": "Use a warm, friendly tone. Be conversational and approachable.", + }, + "urgent": { + "ar": ( + "استخدم لغة مباشرة تحث على الإسراع. " + "أكد على محدودية العرض أو ضرورة التصرف السريع بدون مبالغة." + ), + "en": "Create urgency without being pushy. Emphasize time-limited opportunity.", + }, + "follow_up": { + "ar": ( + "ذكّر العميل بالمحادثة السابقة بلطف. " + "اسأل إذا عنده أسئلة. كن مهذب وغير ملح." + ), + "en": "Gently remind about previous conversation. Ask if they have questions.", + }, +} + +INDUSTRY_CONTEXT = { + "real_estate": { + "ar": "القطاع العقاري — استخدم مصطلحات مثل: وحدة سكنية، مخطط، صك، موقع استراتيجي، عائد استثماري", + "en": "Real estate — use terms like: residential unit, strategic location, ROI, property value", + }, + "healthcare": { + "ar": "القطاع الصحي — استخدم مصطلحات مثل: عيادة، مجمع طبي، تأمين، موعد، رعاية صحية", + "en": "Healthcare — use terms like: clinic, medical complex, insurance, appointment, care quality", + }, + "retail": { + "ar": "قطاع التجزئة — استخدم مصطلحات مثل: نقاط البيع، المخزون، تجربة العميل، موسم التخفيضات", + "en": "Retail — use terms like: POS, inventory, customer experience, sales season", + }, + "education": { + "ar": "قطاع التعليم — استخدم مصطلحات مثل: تسجيل، رسوم دراسية، منهج، فصل دراسي", + "en": "Education — use terms like: enrollment, tuition, curriculum, academic term", + }, + "automotive": { + "ar": "قطاع السيارات — استخدم مصطلحات مثل: معرض، وكالة، تمويل، صيانة، موديل", + "en": "Automotive — use terms like: showroom, dealership, financing, maintenance, model", + }, + "hospitality": { + "ar": "قطاع الضيافة — استخدم مصطلحات مثل: حجز، جناح، باقة، تجربة ضيافة، موسم سياحي", + "en": "Hospitality — use terms like: booking, suite, package, guest experience, peak season", + }, +} + +# Saudi Arabia timezone: UTC+3 +SAUDI_TZ_OFFSET = timedelta(hours=3) + +# Optimal send windows (Saudi time, 24h) +SEND_WINDOWS = { + "whatsapp": {"start": 9, "end": 21, "peak": [10, 14, 19]}, + "email": {"start": 8, "end": 17, "peak": [9, 11, 14]}, + "sms": {"start": 9, "end": 20, "peak": [10, 13, 18]}, +} + +# Saudi work week: Sunday (6) through Thursday (3) +SAUDI_WORK_DAYS = {6, 0, 1, 2, 3} # Sunday=6, Mon=0, Tue=1, Wed=2, Thu=3 + + +# --------------------------------------------------------------------------- +# Service +# --------------------------------------------------------------------------- + +class MessageWriter: + """Generates personalized, culturally-aware sales messages.""" + + def __init__(self): + self._llm = get_llm() + + async def write_message( + self, + channel: str, + tone: str, + lead_data: dict, + context: Optional[dict] = None, + language: str = "ar", + ) -> MessageDraft: + """ + Generate a sales message with A/B variants. + + Args: + channel: "whatsapp", "email", or "sms" + tone: "formal", "friendly", "urgent", "follow_up" + lead_data: {"name", "company", "industry", "stage", "city", "last_contact"} + context: Optional {"deal_value", "product", "previous_topic", "objection"} + language: "ar" or "en" + + Returns: + MessageDraft with two A/B variants and send-time recommendation. + """ + context = context or {} + channel = channel.lower() + tone = tone.lower() + language = language.lower() if language else "ar" + + if channel not in CHANNEL_LIMITS: + channel = "whatsapp" + if tone not in TONE_INSTRUCTIONS: + tone = "friendly" + if language not in ("ar", "en"): + language = "ar" + + # Build the prompt + system_prompt = self._build_system_prompt(channel, tone, lead_data, context, language) + user_prompt = self._build_user_prompt(channel, tone, lead_data, context, language) + + # Generate both variants in one LLM call + try: + response = await self._llm.complete( + system_prompt=system_prompt, + user_message=user_prompt, + json_mode=True, + temperature=0.7, + max_tokens=2048, + ) + parsed = response.parse_json() + if parsed: + variants = self._parse_variants(parsed, channel) + else: + raise ValueError("Failed to parse LLM response") + except Exception as e: + logger.warning(f"LLM message generation failed: {e}") + variants = self._fallback_variants(channel, tone, lead_data, language) + + # Calculate best send time + best_time, best_day = self._calculate_best_send_time(channel) + + # Track which personalization fields were used + personalization = [ + k for k in ("name", "company", "industry", "city", "stage") + if lead_data.get(k) + ] + + return MessageDraft( + channel=channel, + language=language, + tone=tone, + variants=variants, + best_send_time=best_time, + best_send_day=best_day, + personalization_used=personalization, + metadata={ + "max_length": CHANNEL_LIMITS[channel], + "lead_name": lead_data.get("name", ""), + "industry": lead_data.get("industry", ""), + }, + ) + + # ── Prompt Construction ────────────────────── + + def _build_system_prompt( + self, channel: str, tone: str, lead_data: dict, context: dict, language: str + ) -> str: + lang_key = language if language in ("ar", "en") else "ar" + tone_instruction = TONE_INSTRUCTIONS.get(tone, TONE_INSTRUCTIONS["friendly"])[lang_key] + + industry = (lead_data.get("industry") or "").lower().replace(" ", "_") + industry_note = "" + if industry in INDUSTRY_CONTEXT: + industry_note = INDUSTRY_CONTEXT[industry][lang_key] + + char_limit = CHANNEL_LIMITS[channel] + + if language == "ar": + prompt = ( + "أنت كاتب رسائل مبيعات محترف متخصص في السوق السعودي.\n" + f"القناة: {channel} (حد أقصى {char_limit} حرف)\n" + f"النبرة: {tone_instruction}\n" + ) + if industry_note: + prompt += f"القطاع: {industry_note}\n" + prompt += ( + "\nقواعد مهمة:\n" + "- لا تستخدم لهجة مصرية أو شامية\n" + "- استخدم 'ريال' للعملة\n" + "- راعي ثقافة الأعمال السعودية\n" + "- اكتب رسالتين مختلفتين (A و B) لاختبار A/B\n" + "- الرسالة يجب أن تكون كاملة وجاهزة للإرسال\n" + ) + if channel == "email": + prompt += "- أضف عنوان بريد مناسب لكل رسالة\n" + else: + prompt = ( + "You are a professional sales message writer for the Saudi market.\n" + f"Channel: {channel} (max {char_limit} characters)\n" + f"Tone: {tone_instruction}\n" + ) + if industry_note: + prompt += f"Industry: {industry_note}\n" + prompt += ( + "\nRules:\n" + "- Write two different message variants (A and B) for A/B testing\n" + "- Messages must be complete and ready to send\n" + "- Be culturally aware of Saudi business norms\n" + ) + if channel == "email": + prompt += "- Include an email subject line for each variant\n" + + prompt += ( + "\nأجب بصيغة JSON فقط:\n" + "{\n" + ' "variant_a": {"content": "...", "subject": "..." },\n' + ' "variant_b": {"content": "...", "subject": "..." }\n' + "}\n" + "subject مطلوب فقط للبريد الإلكتروني. للواتساب والرسائل اتركه فارغ." + ) + return prompt + + def _build_user_prompt( + self, channel: str, tone: str, lead_data: dict, context: dict, language: str + ) -> str: + name = lead_data.get("name", "العميل" if language == "ar" else "Customer") + company = lead_data.get("company", "") + industry = lead_data.get("industry", "") + stage = lead_data.get("stage", "") + city = lead_data.get("city", "") + last_contact = lead_data.get("last_contact", "") + + deal_value = context.get("deal_value", "") + product = context.get("product", "") + previous_topic = context.get("previous_topic", "") + objection = context.get("objection", "") + + if language == "ar": + parts = [f"اكتب رسالة {channel} للعميل:"] + parts.append(f"- الاسم: {name}") + if company: + parts.append(f"- الشركة: {company}") + if industry: + parts.append(f"- القطاع: {industry}") + if stage: + parts.append(f"- مرحلة البيع: {stage}") + if city: + parts.append(f"- المدينة: {city}") + if last_contact: + parts.append(f"- آخر تواصل: {last_contact}") + if deal_value: + parts.append(f"- قيمة الصفقة: {deal_value} ريال") + if product: + parts.append(f"- المنتج: {product}") + if previous_topic: + parts.append(f"- الموضوع السابق: {previous_topic}") + if objection: + parts.append(f"- اعتراض العميل: {objection}") + else: + parts = [f"Write a {channel} message for:"] + parts.append(f"- Name: {name}") + if company: + parts.append(f"- Company: {company}") + if industry: + parts.append(f"- Industry: {industry}") + if stage: + parts.append(f"- Stage: {stage}") + if city: + parts.append(f"- City: {city}") + if last_contact: + parts.append(f"- Last contact: {last_contact}") + if deal_value: + parts.append(f"- Deal value: {deal_value} SAR") + if product: + parts.append(f"- Product: {product}") + if previous_topic: + parts.append(f"- Previous topic: {previous_topic}") + if objection: + parts.append(f"- Objection: {objection}") + + return "\n".join(parts) + + # ── Response Parsing ───────────────────────── + + def _parse_variants(self, parsed: dict, channel: str) -> list[MessageVariant]: + """Parse LLM JSON response into MessageVariant objects.""" + variants = [] + char_limit = CHANNEL_LIMITS[channel] + + for key, label in [("variant_a", "A"), ("variant_b", "B")]: + variant_data = parsed.get(key, {}) + if isinstance(variant_data, str): + content = variant_data + subject = None + else: + content = variant_data.get("content", "") + subject = variant_data.get("subject") if channel == "email" else None + + # Truncate if over limit + if len(content) > char_limit: + content = content[:char_limit - 3] + "..." + + read_time = max(1, len(content) // 200 * 10) # ~200 chars / 10 sec + + variants.append(MessageVariant( + content=content, + subject=subject, + variant_label=label, + estimated_read_time_sec=read_time, + )) + + return variants + + def _fallback_variants( + self, channel: str, tone: str, lead_data: dict, language: str + ) -> list[MessageVariant]: + """Generate basic fallback messages when LLM is unavailable.""" + name = lead_data.get("name", "") + company = lead_data.get("company", "") + + if language == "ar": + greeting = "السلام عليكم" if tone == "formal" else "هلا والله" + if tone == "follow_up": + text_a = f"{greeting} {name}، كيف حالك؟ حبيت أتابع معك بخصوص محادثتنا السابقة. هل عندك أي أسئلة؟" + text_b = f"{greeting} {name}، يعطيك العافية! تواصلنا قبل وحبيت أشوف إذا تحتاج أي شي ثاني." + elif tone == "urgent": + text_a = f"{greeting} {name}، عندنا عرض خاص ينتهي قريب. تبي أرسل لك التفاصيل؟" + text_b = f"{greeting} {name}، فرصة محدودة متاحة الحين. هل تحب نتكلم عنها؟" + else: + text_a = f"{greeting} {name}، يسعدنا تواصلك! كيف نقدر نساعدك اليوم؟" + text_b = f"{greeting} {name}، حياك الله! عندنا حلول ممتازة تناسب احتياجاتك." + else: + if tone == "follow_up": + text_a = f"Hi {name}, following up on our previous conversation. Do you have any questions?" + text_b = f"Hello {name}, just checking in. Would you like to continue our discussion?" + elif tone == "urgent": + text_a = f"Hi {name}, we have a limited-time offer. Shall I share the details?" + text_b = f"Hello {name}, a special opportunity is available now. Interested?" + else: + text_a = f"Hi {name}, great to connect! How can we help you today?" + text_b = f"Hello {name}, we have solutions tailored to your needs. Let's chat!" + + return [ + MessageVariant(content=text_a, variant_label="A", estimated_read_time_sec=10), + MessageVariant(content=text_b, variant_label="B", estimated_read_time_sec=10), + ] + + # ── Send Time Calculation ──────────────────── + + def _calculate_best_send_time(self, channel: str) -> tuple[str, str]: + """Calculate best send time based on Saudi business hours.""" + now_utc = datetime.now(timezone.utc) + now_saudi = now_utc + SAUDI_TZ_OFFSET + + window = SEND_WINDOWS.get(channel, SEND_WINDOWS["whatsapp"]) + peak_hours = window["peak"] + + # Find the next available peak hour + current_hour = now_saudi.hour + current_weekday = now_saudi.weekday() + + # Check today first + if current_weekday in SAUDI_WORK_DAYS: + for peak in peak_hours: + if peak > current_hour: + best_time = f"{peak:02d}:00 (توقيت السعودية)" + day_names_ar = { + 6: "الأحد", 0: "الاثنين", 1: "الثلاثاء", + 2: "الأربعاء", 3: "الخميس", + } + best_day = day_names_ar.get(current_weekday, "اليوم") + return best_time, best_day + + # Next work day + days_ahead = 1 + while days_ahead <= 7: + next_day = (current_weekday + days_ahead) % 7 + if next_day in SAUDI_WORK_DAYS: + best_time = f"{peak_hours[0]:02d}:00 (توقيت السعودية)" + day_names_ar = { + 6: "الأحد", 0: "الاثنين", 1: "الثلاثاء", + 2: "الأربعاء", 3: "الخميس", 4: "الجمعة", 5: "السبت", + } + best_day = day_names_ar.get(next_day, "") + return best_time, best_day + days_ahead += 1 + + return f"{peak_hours[0]:02d}:00 (توقيت السعودية)", "الأحد" diff --git a/salesflow-saas/backend/app/services/ai/sales_agent.py b/salesflow-saas/backend/app/services/ai/sales_agent.py new file mode 100644 index 00000000..61a5cf8d --- /dev/null +++ b/salesflow-saas/backend/app/services/ai/sales_agent.py @@ -0,0 +1,346 @@ +""" +Dealix Autonomous AI Sales Agent for WhatsApp +وكيل مبيعات ذكي يعمل تلقائياً عبر الواتساب — يؤهل العملاء ويحجز المواعيد +""" + +import logging +from datetime import datetime, timezone +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + +from app.services.llm.provider import get_llm + +logger = logging.getLogger("dealix.ai.sales_agent") + + +class ConversationState(str, Enum): + GREETING = "greeting" + QUALIFICATION = "qualification" + NEEDS_ANALYSIS = "needs_analysis" + SOLUTION_PITCH = "solution_pitch" + OBJECTION_HANDLING = "objection_handling" + CLOSE_OR_ESCALATE = "close_or_escalate" + + +STATE_TRANSITIONS: dict[str, list[str]] = { + "greeting": ["qualification"], + "qualification": ["needs_analysis", "close_or_escalate"], + "needs_analysis": ["solution_pitch", "close_or_escalate"], + "solution_pitch": ["objection_handling", "close_or_escalate"], + "objection_handling": ["solution_pitch", "close_or_escalate"], + "close_or_escalate": [], +} + +INDUSTRY_QUALIFIERS: dict[str, list[str]] = { + "real_estate": [ + "ما نوع العقار المطلوب (سكني/تجاري)؟", + "ما المنطقة أو الحي المفضل؟", + "ما الميزانية التقريبية؟", + "هل تبحث عن شراء أو إيجار؟", + ], + "healthcare": [ + "ما نوع الخدمة الطبية المطلوبة؟", + "هل لديك تأمين طبي؟", + "هل تفضل موعد صباحي أو مسائي؟", + ], + "services": [ + "ما طبيعة الخدمة المطلوبة؟", + "ما الميزانية التقريبية؟", + "ما الجدول الزمني المتوقع؟", + "هل سبق تجربة مزود خدمة آخر؟", + ], + "contracting": [ + "ما نوع المشروع (بناء/صيانة/تشطيبات)؟", + "ما المساحة التقريبية؟", + "ما الميزانية المخصصة؟", + "هل الموقع في الرياض أو منطقة أخرى؟", + ], + "education": [ + "ما البرنامج أو الدورة المطلوبة؟", + "هل تفضل حضوري أو عن بعد؟", + "ما مستوى الخبرة الحالي؟", + ], + "retail": [ + "ما المنتج أو الفئة المطلوبة؟", + "هل تبحث عن كميات تجارية أو شخصية؟", + "ما المنطقة لغرض التوصيل؟", + ], +} + +ESCALATION_TRIGGERS = [ + "أبي أكلم مدير", + "أبي أتكلم مع شخص", + "أبي موظف", + "ما فهمت", + "مشكلة كبيرة", + "شكوى", + "غاضب", + "مستعجل جداً", +] + + +class AgentContext(BaseModel): + lead_id: str = "" + phone: str = "" + name: str = "" + industry: str = "services" + state: str = ConversationState.GREETING.value + history: list[dict] = Field(default_factory=list) + qualification_data: dict = Field(default_factory=dict) + questions_asked: int = 0 + escalated: bool = False + meeting_proposed: bool = False + + +class AgentResponse(BaseModel): + reply: str + new_state: str + should_escalate: bool = False + meeting_suggested: bool = False + qualification_complete: bool = False + extracted_data: dict = Field(default_factory=dict) + + +class SalesAgent: + """Autonomous WhatsApp sales agent with Arabic dialogue and state machine.""" + + def __init__(self): + self.llm = get_llm() + self._contexts: dict[str, AgentContext] = {} + + async def handle_message( + self, + phone: str, + message: str, + lead_id: str = "", + name: str = "", + industry: str = "services", + ) -> AgentResponse: + """Main entry: process an inbound WhatsApp message and produce a reply.""" + ctx = self._get_or_create_context(phone, lead_id, name, industry) + ctx.history.append({"role": "user", "content": message, "ts": _now_iso()}) + + if await self.should_escalate(message, ctx): + ctx.escalated = True + ctx.state = ConversationState.CLOSE_OR_ESCALATE.value + reply = ( + f"حياك الله {ctx.name or ''}، أقدّر تواصلك. " + "بحوّلك الحين لأحد مستشارينا المتخصصين يخدمك بشكل أفضل. لحظات من فضلك." + ) + ctx.history.append({"role": "assistant", "content": reply, "ts": _now_iso()}) + return AgentResponse( + reply=reply, + new_state=ctx.state, + should_escalate=True, + ) + + reply, extracted = await self._generate_state_reply(ctx, message) + ctx.qualification_data.update(extracted) + ctx.history.append({"role": "assistant", "content": reply, "ts": _now_iso()}) + + next_state = await self._determine_next_state(ctx, message) + ctx.state = next_state + + qualification_complete = ctx.questions_asked >= len( + INDUSTRY_QUALIFIERS.get(ctx.industry, INDUSTRY_QUALIFIERS["services"]) + ) + + meeting_suggested = False + if qualification_complete and not ctx.meeting_proposed: + meeting_reply = await self.suggest_meeting(ctx) + reply += f"\n\n{meeting_reply}" + ctx.meeting_proposed = True + meeting_suggested = True + + return AgentResponse( + reply=reply, + new_state=ctx.state, + should_escalate=False, + meeting_suggested=meeting_suggested, + qualification_complete=qualification_complete, + extracted_data=extracted, + ) + + async def qualify_lead(self, ctx: AgentContext) -> dict: + """Return current qualification status and gathered data.""" + qualifiers = INDUSTRY_QUALIFIERS.get(ctx.industry, INDUSTRY_QUALIFIERS["services"]) + return { + "lead_id": ctx.lead_id, + "industry": ctx.industry, + "state": ctx.state, + "questions_total": len(qualifiers), + "questions_asked": ctx.questions_asked, + "qualification_data": ctx.qualification_data, + "is_qualified": ctx.questions_asked >= len(qualifiers), + } + + async def suggest_meeting(self, ctx: AgentContext) -> str: + """Generate a meeting suggestion message.""" + prompt = ( + "أنت مساعد مبيعات في السوق السعودي. اقترح موعد اجتماع للعميل " + "بأسلوب ودود ومهني بالسعودية. اذكر 2-3 أوقات متاحة هذا الأسبوع. " + "اجعل الرد مختصراً (3 أسطر كحد أقصى)." + ) + resp = await self.llm.complete( + system_prompt=prompt, + user_message=f"اسم العميل: {ctx.name}\nالقطاع: {ctx.industry}", + temperature=0.6, + max_tokens=150, + ) + return resp.content.strip() + + async def should_escalate(self, message: str, ctx: AgentContext) -> bool: + """Determine if message requires human handoff.""" + msg_lower = message.strip() + for trigger in ESCALATION_TRIGGERS: + if trigger in msg_lower: + logger.info("Escalation triggered for %s: matched '%s'", ctx.phone, trigger) + return True + + if len(ctx.history) > 10 and ctx.state == ConversationState.OBJECTION_HANDLING.value: + return True + + return False + + async def generate_follow_up(self, phone: str, days_dormant: int = 3) -> Optional[str]: + """Generate a follow-up message for a dormant lead.""" + ctx = self._contexts.get(phone) + if not ctx: + return None + + prompt = ( + "أنت مساعد مبيعات سعودي. اكتب رسالة متابعة ودية لعميل لم يرد منذ " + f"{days_dormant} أيام. اجعلها مختصرة (سطرين) ومهتمة بدون ضغط. " + "اسم العميل: " + (ctx.name or "العميل") + ) + resp = await self.llm.complete( + system_prompt=prompt, + user_message=f"القطاع: {ctx.industry}. آخر حالة: {ctx.state}", + temperature=0.7, + max_tokens=100, + ) + return resp.content.strip() + + # ── Internal helpers ───────────────────────────── + + def _get_or_create_context( + self, phone: str, lead_id: str, name: str, industry: str, + ) -> AgentContext: + if phone not in self._contexts: + self._contexts[phone] = AgentContext( + lead_id=lead_id, phone=phone, name=name, industry=industry, + ) + ctx = self._contexts[phone] + if name and not ctx.name: + ctx.name = name + return ctx + + async def _generate_state_reply( + self, ctx: AgentContext, message: str, + ) -> tuple[str, dict]: + """Generate a reply appropriate to the current conversation state.""" + qualifiers = INDUSTRY_QUALIFIERS.get(ctx.industry, INDUSTRY_QUALIFIERS["services"]) + current_q = qualifiers[ctx.questions_asked] if ctx.questions_asked < len(qualifiers) else "" + + state_instructions = { + ConversationState.GREETING.value: ( + "رحّب بالعميل بأسلوب سعودي ودود. اسأل كيف تقدر تساعده. " + "لا تكن رسمياً أكثر من اللازم." + ), + ConversationState.QUALIFICATION.value: ( + f"اسأل السؤال التالي بأسلوب طبيعي: {current_q}\n" + "حاول استخلاص المعلومات من إجابة العميل." + ), + ConversationState.NEEDS_ANALYSIS.value: ( + "حلل احتياجات العميل وأكّد فهمك. لخّص ما فهمته واسأل إذا في شي ثاني." + ), + ConversationState.SOLUTION_PITCH.value: ( + "اعرض الحل المناسب بناءً على احتياجات العميل. " + "ركّز على الفوائد مع ذكر القيمة بشكل غير مباشر." + ), + ConversationState.OBJECTION_HANDLING.value: ( + "تعامل مع اعتراض العميل بذكاء. أعد التأطير وركّز على القيمة. " + "لا تكن دفاعياً." + ), + } + + instruction = state_instructions.get( + ctx.state, + "أكمل المحادثة بأسلوب مهني وودود.", + ) + + system = ( + "أنت وكيل مبيعات ذكي لشركة سعودية. تتحدث بالعامية السعودية الراقية.\n" + "قواعدك:\n" + "1. ردود مختصرة (3-4 أسطر كحد أقصى)\n" + "2. لا تستخدم رموز تعبيرية مبالغ فيها\n" + "3. كن ودوداً ومحترفاً\n" + "4. استخلص أي معلومات ذات قيمة من رد العميل\n" + f"5. التعليمات الحالية: {instruction}\n" + "أجب بـ JSON: {\"reply\": \"...\", \"extracted\": {\"key\": \"value\"}}\n" + ) + + recent_history = ctx.history[-6:] + history_text = "\n".join( + f"{'العميل' if h['role'] == 'user' else 'الوكيل'}: {h['content']}" + for h in recent_history + ) + + user_msg = f"المحادثة السابقة:\n{history_text}\n\nرسالة العميل الجديدة: {message}" + + resp = await self.llm.complete( + system_prompt=system, + user_message=user_msg, + temperature=0.6, + max_tokens=250, + json_mode=True, + ) + + parsed = resp.parse_json() + if parsed: + reply = parsed.get("reply", message) + extracted = parsed.get("extracted", {}) + else: + reply = resp.content.strip() + extracted = {} + + if ctx.state == ConversationState.QUALIFICATION.value: + ctx.questions_asked += 1 + + return reply, extracted + + async def _determine_next_state(self, ctx: AgentContext, message: str) -> str: + """Decide the next conversation state.""" + allowed = STATE_TRANSITIONS.get(ctx.state, []) + if not allowed: + return ctx.state + + qualifiers = INDUSTRY_QUALIFIERS.get(ctx.industry, INDUSTRY_QUALIFIERS["services"]) + + if ctx.state == ConversationState.GREETING.value: + return ConversationState.QUALIFICATION.value + + if ctx.state == ConversationState.QUALIFICATION.value: + if ctx.questions_asked >= len(qualifiers): + return ConversationState.NEEDS_ANALYSIS.value + return ConversationState.QUALIFICATION.value + + if ctx.state == ConversationState.NEEDS_ANALYSIS.value: + return ConversationState.SOLUTION_PITCH.value + + if ctx.state == ConversationState.SOLUTION_PITCH.value: + negative_signals = ["غالي", "ما أبي", "لا", "مو مقتنع", "كثير", "فكر"] + if any(s in message for s in negative_signals): + return ConversationState.OBJECTION_HANDLING.value + return ConversationState.CLOSE_OR_ESCALATE.value + + if ctx.state == ConversationState.OBJECTION_HANDLING.value: + return ConversationState.SOLUTION_PITCH.value + + return ctx.state + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() diff --git a/salesflow-saas/backend/app/services/pdpl/consent_manager.py b/salesflow-saas/backend/app/services/pdpl/consent_manager.py index 220ab024..68b597f3 100644 --- a/salesflow-saas/backend/app/services/pdpl/consent_manager.py +++ b/salesflow-saas/backend/app/services/pdpl/consent_manager.py @@ -1,8 +1,6 @@ -"""PDPL consent engine -- tracks, validates, and audits consent per Saudi data protection law. - +"""PDPL consent engine -- tracks, validates, and audits consent. Penalty for violations: up to 5,000,000 SAR per incident. """ - import logging from datetime import datetime, timedelta, timezone from typing import Optional @@ -14,18 +12,13 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.models.consent import ( PDPLConsent, PDPLConsentAudit, DataRequest, - ConsentStatusEnum, ConsentPurpose, ConsentChannel, - DataRequestType, DataRequestStatus, + ConsentStatusEnum, DataRequestStatus, ) logger = logging.getLogger(__name__) - DEFAULT_EXPIRY_MONTHS = 12 -CROSS_BORDER_ALLOWED_COUNTRIES = {"SA", "AE", "BH", "KW", "OM", "QA"} +CROSS_BORDER_ALLOWED = {"SA", "AE", "BH", "KW", "OM", "QA"} -# --------------------------------------------------------------------------- -# Pydantic schemas -# --------------------------------------------------------------------------- class ConsentGrantInput(Schema): contact_id: UUID @@ -72,29 +65,18 @@ class AuditEntry(Schema): purpose: str details: dict created_at: datetime - model_config = {"from_attributes": True} -# --------------------------------------------------------------------------- -# ConsentManager -# --------------------------------------------------------------------------- - class ConsentManager: """Core PDPL consent engine for Dealix CRM.""" def __init__(self, db: AsyncSession): self.db = db - # -- grant --------------------------------------------------------------- - async def grant_consent(self, data: ConsentGrantInput) -> PDPLConsent: - """Record a new consent grant. Existing active consent for same - contact+purpose+channel is revoked first (re-consent flow).""" - + """Grant consent. Revokes existing active consent for same triplet (re-consent).""" now = datetime.now(timezone.utc) - - # Revoke any existing active consent for same triplet (re-consent) existing = await self._find_active(data.contact_id, data.purpose, data.channel) if existing: existing.status = ConsentStatusEnum.REVOKED.value @@ -104,198 +86,127 @@ class ConsentManager: action="revoked_for_renewal", actor_id=data.actor_id, channel=data.channel, purpose=data.purpose, details={"reason": "re-consent on purpose change"}, - ip_address=data.ip_address, - tenant_id=data.tenant_id, + ip_address=data.ip_address, tenant_id=data.tenant_id, ) - consent = PDPLConsent( - contact_id=data.contact_id, - tenant_id=data.tenant_id, - purpose=data.purpose, - channel=data.channel, - status=ConsentStatusEnum.GRANTED.value, - granted_at=now, + contact_id=data.contact_id, tenant_id=data.tenant_id, + purpose=data.purpose, channel=data.channel, + status=ConsentStatusEnum.GRANTED.value, granted_at=now, expires_at=now + timedelta(days=30 * data.expiry_months), - ip_address=data.ip_address, - consent_text=data.consent_text, + ip_address=data.ip_address, consent_text=data.consent_text, granted_by=data.actor_id, ) self.db.add(consent) await self.db.flush() await self.db.refresh(consent) - await self._audit( consent_id=consent.id, contact_id=data.contact_id, action="granted", actor_id=data.actor_id, channel=data.channel, purpose=data.purpose, - details={"expiry_months": data.expiry_months, "consent_text": data.consent_text or ""}, - ip_address=data.ip_address, - tenant_id=data.tenant_id, + details={"expiry_months": data.expiry_months}, + ip_address=data.ip_address, tenant_id=data.tenant_id, ) - logger.info("PDPL consent granted: contact=%s purpose=%s channel=%s", data.contact_id, data.purpose, data.channel) + logger.info("PDPL consent granted: contact=%s purpose=%s", data.contact_id, data.purpose) return consent - # -- revoke -------------------------------------------------------------- - async def revoke_consent(self, data: ConsentRevokeInput) -> PDPLConsent: """Revoke an existing consent immediately.""" - - result = await self.db.execute( - select(PDPLConsent).where(PDPLConsent.id == data.consent_id) - ) + result = await self.db.execute(select(PDPLConsent).where(PDPLConsent.id == data.consent_id)) consent = result.scalar_one_or_none() if not consent: - raise ValueError("سجل الموافقة غير موجود") # Consent record not found - + raise ValueError("سجل الموافقة غير موجود") now = datetime.now(timezone.utc) consent.status = ConsentStatusEnum.REVOKED.value consent.revoked_at = now - await self._audit( consent_id=consent.id, contact_id=consent.contact_id, action="revoked", actor_id=data.actor_id, channel=consent.channel, purpose=consent.purpose, details={"reason": data.reason or "user_request"}, - ip_address=data.ip_address, - tenant_id=consent.tenant_id, + ip_address=data.ip_address, tenant_id=consent.tenant_id, ) - logger.info("PDPL consent revoked: id=%s contact=%s", consent.id, consent.contact_id) + logger.info("PDPL consent revoked: id=%s", consent.id) return consent - # -- check --------------------------------------------------------------- - - async def check_consent( - self, - contact_id: UUID, - purpose: str, - channel: str, - ) -> ConsentCheckResult: - """Validate consent before any outbound message. Must be called - before every send -- violations carry up to 5M SAR penalty.""" - + async def check_consent(self, contact_id: UUID, purpose: str, channel: str) -> ConsentCheckResult: + """Validate consent before outbound message. 5M SAR penalty per violation.""" consent = await self._find_active(contact_id, purpose, channel) if not consent: return ConsentCheckResult( - allowed=False, - message=f"No active consent for {purpose}/{channel}", + allowed=False, message=f"No active consent for {purpose}/{channel}", message_ar="لا توجد موافقة فعالة لهذا الغرض والقناة", ) - now = datetime.now(timezone.utc) if consent.expires_at and consent.expires_at <= now: consent.status = ConsentStatusEnum.EXPIRED.value await self.db.flush() return ConsentCheckResult( - allowed=False, - consent_id=consent.id, - status=ConsentStatusEnum.EXPIRED.value, - expires_at=consent.expires_at, - message="Consent expired -- re-consent required", + allowed=False, consent_id=consent.id, status=ConsentStatusEnum.EXPIRED.value, + expires_at=consent.expires_at, message="Consent expired", message_ar="انتهت صلاحية الموافقة -- يلزم تجديد الموافقة", ) - return ConsentCheckResult( - allowed=True, - consent_id=consent.id, - status=consent.status, - expires_at=consent.expires_at, - message="Consent valid", + allowed=True, consent_id=consent.id, status=consent.status, + expires_at=consent.expires_at, message="Consent valid", message_ar="الموافقة صالحة", ) - # -- data request -------------------------------------------------------- - async def process_data_request(self, data: DataRequestInput) -> DataRequest: """Submit a PDPL data subject rights request.""" - request = DataRequest( - contact_id=data.contact_id, - tenant_id=data.tenant_id, - request_type=data.request_type, - status=DataRequestStatus.PENDING.value, - requested_at=datetime.now(timezone.utc), - notes=data.notes, + contact_id=data.contact_id, tenant_id=data.tenant_id, + request_type=data.request_type, status=DataRequestStatus.PENDING.value, + requested_at=datetime.now(timezone.utc), notes=data.notes, handled_by=data.actor_id, ) self.db.add(request) await self.db.flush() await self.db.refresh(request) - logger.info("PDPL data request created: type=%s contact=%s", data.request_type, data.contact_id) + logger.info("PDPL data request: type=%s contact=%s", data.request_type, data.contact_id) return request - # -- audit --------------------------------------------------------------- - async def get_consent_audit( - self, - tenant_id: UUID, - contact_id: Optional[UUID] = None, - limit: int = 100, - offset: int = 0, + self, tenant_id: UUID, contact_id: Optional[UUID] = None, + limit: int = 100, offset: int = 0, ) -> list[AuditEntry]: - """Return consent audit trail filtered by tenant and optionally contact.""" - + """Return consent audit trail.""" query = ( - select(PDPLConsentAudit) - .where(PDPLConsentAudit.tenant_id == tenant_id) + select(PDPLConsentAudit).where(PDPLConsentAudit.tenant_id == tenant_id) .order_by(PDPLConsentAudit.created_at.desc()) ) if contact_id: query = query.where(PDPLConsentAudit.contact_id == contact_id) - query = query.offset(offset).limit(limit) - result = await self.db.execute(query) + result = await self.db.execute(query.offset(offset).limit(limit)) return [AuditEntry.model_validate(row) for row in result.scalars().all()] - # -- cross-border -------------------------------------------------------- - @staticmethod def check_cross_border_transfer(destination_country: str) -> ConsentCheckResult: - """Check if data transfer to destination country is PDPL-compliant. - SDAIA requires adequate protection level or explicit consent.""" - + """Check if transfer to destination is PDPL-compliant.""" code = destination_country.upper().strip() - if code in CROSS_BORDER_ALLOWED_COUNTRIES: + if code in CROSS_BORDER_ALLOWED: return ConsentCheckResult( - allowed=True, - message=f"Transfer to {code} permitted under GCC adequacy", + allowed=True, message=f"Transfer to {code} permitted under GCC adequacy", message_ar=f"النقل إلى {code} مسموح بموجب كفاية دول الخليج", ) return ConsentCheckResult( - allowed=False, - message=f"Transfer to {code} requires explicit consent and SDAIA approval", + allowed=False, message=f"Transfer to {code} requires explicit consent and SDAIA approval", message_ar=f"النقل إلى {code} يتطلب موافقة صريحة وموافقة الهيئة", ) - # -- private helpers ----------------------------------------------------- - - async def _find_active( - self, contact_id: UUID, purpose: str, channel: str - ) -> Optional[PDPLConsent]: + async def _find_active(self, contact_id: UUID, purpose: str, channel: str) -> Optional[PDPLConsent]: result = await self.db.execute( - select(PDPLConsent).where( - and_( - PDPLConsent.contact_id == contact_id, - PDPLConsent.purpose == purpose, - PDPLConsent.channel == channel, - PDPLConsent.status == ConsentStatusEnum.GRANTED.value, - ) - ).order_by(PDPLConsent.granted_at.desc()).limit(1) + select(PDPLConsent).where(and_( + PDPLConsent.contact_id == contact_id, PDPLConsent.purpose == purpose, + PDPLConsent.channel == channel, PDPLConsent.status == ConsentStatusEnum.GRANTED.value, + )).order_by(PDPLConsent.granted_at.desc()).limit(1) ) return result.scalar_one_or_none() - async def _audit( - self, *, consent_id, contact_id, action, actor_id, - channel, purpose, details, ip_address, tenant_id, - ) -> None: - entry = PDPLConsentAudit( - consent_id=consent_id, - contact_id=contact_id, - tenant_id=tenant_id, - action=action, - actor_id=actor_id, - channel=channel, - purpose=purpose, - details=details or {}, - ip_address=ip_address, - ) - self.db.add(entry) + async def _audit(self, *, consent_id, contact_id, action, actor_id, + channel, purpose, details, ip_address, tenant_id) -> None: + self.db.add(PDPLConsentAudit( + consent_id=consent_id, contact_id=contact_id, tenant_id=tenant_id, + action=action, actor_id=actor_id, channel=channel, purpose=purpose, + details=details or {}, ip_address=ip_address, + )) await self.db.flush() diff --git a/salesflow-saas/backend/app/services/pdpl/data_rights.py b/salesflow-saas/backend/app/services/pdpl/data_rights.py index a3b3bf64..34cb5f3f 100644 --- a/salesflow-saas/backend/app/services/pdpl/data_rights.py +++ b/salesflow-saas/backend/app/services/pdpl/data_rights.py @@ -1,34 +1,23 @@ """PDPL data subject rights handler. - -Implements: right to access, correction, deletion, restriction of processing. -Generates compliance reports for SDAIA audits. +Right to access, correction, deletion, restriction + SDAIA compliance reports. """ - import logging from datetime import datetime, timedelta, timezone from typing import Any, Optional from uuid import UUID from pydantic import BaseModel as Schema -from sqlalchemy import select, func, and_ +from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession -from app.models.consent import ( - PDPLConsent, PDPLConsentAudit, DataRequest, - DataRequestStatus, DataRequestType, -) +from app.models.consent import PDPLConsent, DataRequest, DataRequestStatus, DataRequestType from app.models.lead import Lead from app.models.message import Message logger = logging.getLogger(__name__) - HARD_DELETE_DELAY_DAYS = 30 -# --------------------------------------------------------------------------- -# Pydantic schemas -# --------------------------------------------------------------------------- - class DataExport(Schema): contact_id: UUID personal_data: dict @@ -40,7 +29,7 @@ class DataExport(Schema): class CorrectionInput(Schema): contact_id: UUID tenant_id: UUID - corrections: dict[str, Any] # field_name -> new_value + corrections: dict[str, Any] actor_id: Optional[UUID] = None reason: Optional[str] = None @@ -82,29 +71,17 @@ class ComplianceReport(Schema): violations_detected: int -# --------------------------------------------------------------------------- -# DataRightsHandler -# --------------------------------------------------------------------------- - class DataRightsHandler: """Handles PDPL data subject rights for Dealix contacts.""" def __init__(self, db: AsyncSession): self.db = db - # -- export (right to access) ------------------------------------------- - async def export_data(self, contact_id: UUID, tenant_id: UUID) -> DataExport: - """Export all personal data held for a contact as structured JSON.""" - + """Export all personal data held for a contact (right to access).""" lead = await self._get_lead(contact_id, tenant_id) - - # Consent records consents_q = await self.db.execute( - select(PDPLConsent).where( - PDPLConsent.contact_id == contact_id, - PDPLConsent.tenant_id == tenant_id, - ) + select(PDPLConsent).where(PDPLConsent.contact_id == contact_id, PDPLConsent.tenant_id == tenant_id) ) consents = [ {"purpose": c.purpose, "channel": c.channel, "status": c.status, @@ -112,241 +89,138 @@ class DataRightsHandler: "expires_at": c.expires_at.isoformat() if c.expires_at else None} for c in consents_q.scalars().all() ] - - # Messages - msgs_q = await self.db.execute( - select(Message).where(Message.lead_id == contact_id).limit(500) - ) + msgs_q = await self.db.execute(select(Message).where(Message.lead_id == contact_id).limit(500)) messages = [ - {"channel": m.channel, "direction": m.direction, - "content": m.content, "sent_at": m.sent_at.isoformat() if m.sent_at else None} + {"channel": m.channel, "direction": m.direction, "content": m.content, + "sent_at": m.sent_at.isoformat() if m.sent_at else None} for m in msgs_q.scalars().all() ] - - personal = { - "name": lead.name, - "phone": lead.phone, - "email": lead.email, - "source": lead.source, - "status": lead.status, - "score": lead.score, - "notes": lead.notes, - } - - logger.info("PDPL data export completed: contact=%s", contact_id) + logger.info("PDPL data export: contact=%s", contact_id) return DataExport( contact_id=contact_id, - personal_data=personal, - consents=consents, - messages=messages, - exported_at=datetime.now(timezone.utc), + personal_data={"name": lead.name, "phone": lead.phone, "email": lead.email, + "source": lead.source, "status": lead.status, "score": lead.score, "notes": lead.notes}, + consents=consents, messages=messages, exported_at=datetime.now(timezone.utc), ) - # -- correction ---------------------------------------------------------- - async def correct_data(self, data: CorrectionInput) -> CorrectionResult: - """Update personal data fields with full audit trail.""" - + """Update personal data fields with audit trail.""" lead = await self._get_lead(data.contact_id, data.tenant_id) allowed_fields = {"name", "phone", "email", "notes"} previous: dict[str, Any] = {} - updated_fields: list[str] = [] - + updated: list[str] = [] for field, new_val in data.corrections.items(): if field not in allowed_fields: logger.warning("PDPL correction rejected for field=%s", field) continue previous[field] = getattr(lead, field, None) setattr(lead, field, new_val) - updated_fields.append(field) - + updated.append(field) await self.db.flush() - - # Audit via data request record - req = DataRequest( - contact_id=data.contact_id, - tenant_id=data.tenant_id, - request_type=DataRequestType.CORRECTION.value, - status=DataRequestStatus.COMPLETED.value, - requested_at=datetime.now(timezone.utc), - completed_at=datetime.now(timezone.utc), - response_data={"corrections": data.corrections, "previous": previous, "reason": data.reason}, - handled_by=data.actor_id, - ) - self.db.add(req) + self.db.add(DataRequest( + contact_id=data.contact_id, tenant_id=data.tenant_id, + request_type=DataRequestType.CORRECTION.value, status=DataRequestStatus.COMPLETED.value, + requested_at=datetime.now(timezone.utc), completed_at=datetime.now(timezone.utc), + response_data={"corrections": data.corrections, "previous": previous}, handled_by=data.actor_id, + )) await self.db.flush() + logger.info("PDPL correction: contact=%s fields=%s", data.contact_id, updated) + return CorrectionResult(contact_id=data.contact_id, fields_updated=updated, + previous_values=previous, updated_at=datetime.now(timezone.utc)) - logger.info("PDPL data correction: contact=%s fields=%s", data.contact_id, updated_fields) - return CorrectionResult( - contact_id=data.contact_id, - fields_updated=updated_fields, - previous_values=previous, - updated_at=datetime.now(timezone.utc), - ) - - # -- deletion (right to erasure) ---------------------------------------- - - async def delete_data(self, contact_id: UUID, tenant_id: UUID, actor_id: Optional[UUID] = None) -> DeletionResult: - """Soft-delete contact now; schedule hard-delete after 30 days.""" - + async def delete_data(self, contact_id: UUID, tenant_id: UUID, + actor_id: Optional[UUID] = None) -> DeletionResult: + """Soft-delete contact; schedule hard-delete after 30 days.""" lead = await self._get_lead(contact_id, tenant_id) now = datetime.now(timezone.utc) - hard_delete_at = now + timedelta(days=HARD_DELETE_DELAY_DAYS) - - # Soft-delete: mark status and clear PII + hard_at = now + timedelta(days=HARD_DELETE_DELAY_DAYS) lead.status = "deleted" - lead.notes = f"[PDPL deletion requested {now.isoformat()}] " + (lead.notes or "") - lead.extra_metadata = { - **(lead.extra_metadata or {}), - "_pdpl_soft_deleted": True, - "_pdpl_hard_delete_at": hard_delete_at.isoformat(), - } - - # Revoke all active consents - consents_q = await self.db.execute( - select(PDPLConsent).where( - PDPLConsent.contact_id == contact_id, - PDPLConsent.tenant_id == tenant_id, - PDPLConsent.status == "granted", - ) + lead.notes = f"[PDPL deletion {now.isoformat()}] " + (lead.notes or "") + lead.extra_metadata = {**(lead.extra_metadata or {}), + "_pdpl_soft_deleted": True, "_pdpl_hard_delete_at": hard_at.isoformat()} + # Revoke active consents + cq = await self.db.execute( + select(PDPLConsent).where(PDPLConsent.contact_id == contact_id, + PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "granted") ) - for consent in consents_q.scalars().all(): - consent.status = "revoked" - consent.revoked_at = now - - # Record the request - req = DataRequest( - contact_id=contact_id, - tenant_id=tenant_id, - request_type=DataRequestType.DELETION.value, - status=DataRequestStatus.PROCESSING.value, - requested_at=now, - response_data={"hard_delete_at": hard_delete_at.isoformat()}, - handled_by=actor_id, - ) - self.db.add(req) + for c in cq.scalars().all(): + c.status = "revoked" + c.revoked_at = now + self.db.add(DataRequest( + contact_id=contact_id, tenant_id=tenant_id, + request_type=DataRequestType.DELETION.value, status=DataRequestStatus.PROCESSING.value, + requested_at=now, response_data={"hard_delete_at": hard_at.isoformat()}, handled_by=actor_id, + )) await self.db.flush() - - logger.info("PDPL deletion scheduled: contact=%s hard_delete=%s", contact_id, hard_delete_at) + logger.info("PDPL deletion: contact=%s hard_delete=%s", contact_id, hard_at) return DeletionResult( - contact_id=contact_id, - status="soft_deleted", - soft_deleted_at=now, - hard_delete_scheduled=hard_delete_at, - message=f"Contact soft-deleted. Hard delete scheduled for {hard_delete_at.date()}", - message_ar=f"تم حذف جهة الاتصال مبدئيًا. الحذف النهائي مجدول بتاريخ {hard_delete_at.date()}", + contact_id=contact_id, status="soft_deleted", soft_deleted_at=now, + hard_delete_scheduled=hard_at, + message=f"Hard delete scheduled for {hard_at.date()}", + message_ar=f"الحذف النهائي مجدول بتاريخ {hard_at.date()}", ) - # -- restriction --------------------------------------------------------- - - async def restrict_processing( - self, contact_id: UUID, tenant_id: UUID, actor_id: Optional[UUID] = None - ) -> RestrictionResult: - """Flag a contact as restricted -- no outbound processing allowed.""" - + async def restrict_processing(self, contact_id: UUID, tenant_id: UUID, + actor_id: Optional[UUID] = None) -> RestrictionResult: + """Flag contact as restricted -- no outbound processing allowed.""" lead = await self._get_lead(contact_id, tenant_id) - lead.extra_metadata = { - **(lead.extra_metadata or {}), - "_pdpl_restricted": True, - "_pdpl_restricted_at": datetime.now(timezone.utc).isoformat(), - } - - req = DataRequest( - contact_id=contact_id, - tenant_id=tenant_id, - request_type=DataRequestType.RESTRICTION.value, - status=DataRequestStatus.COMPLETED.value, - requested_at=datetime.now(timezone.utc), - completed_at=datetime.now(timezone.utc), - response_data={"restricted": True}, - handled_by=actor_id, - ) - self.db.add(req) + lead.extra_metadata = {**(lead.extra_metadata or {}), + "_pdpl_restricted": True, + "_pdpl_restricted_at": datetime.now(timezone.utc).isoformat()} + self.db.add(DataRequest( + contact_id=contact_id, tenant_id=tenant_id, + request_type=DataRequestType.RESTRICTION.value, status=DataRequestStatus.COMPLETED.value, + requested_at=datetime.now(timezone.utc), completed_at=datetime.now(timezone.utc), + response_data={"restricted": True}, handled_by=actor_id, + )) await self.db.flush() - - logger.info("PDPL processing restricted: contact=%s", contact_id) + logger.info("PDPL restriction: contact=%s", contact_id) return RestrictionResult( - contact_id=contact_id, - restricted=True, - message="Contact processing restricted per PDPL request", - message_ar="تم تقييد معالجة بيانات جهة الاتصال وفقًا لطلب نظام حماية البيانات", + contact_id=contact_id, restricted=True, + message="Contact processing restricted per PDPL", + message_ar="تم تقييد معالجة بيانات جهة الاتصال وفقًا لنظام حماية البيانات", ) - # -- compliance report --------------------------------------------------- - async def generate_compliance_report(self, tenant_id: UUID) -> ComplianceReport: - """Generate SDAIA-ready compliance report for a tenant.""" - + """Generate SDAIA-ready compliance report.""" now = datetime.now(timezone.utc) - - # Consent counts total = (await self.db.execute( - select(func.count()).where(PDPLConsent.tenant_id == tenant_id) - )).scalar() or 0 + select(func.count()).where(PDPLConsent.tenant_id == tenant_id))).scalar() or 0 active = (await self.db.execute( - select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "granted") - )).scalar() or 0 + select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "granted"))).scalar() or 0 revoked = (await self.db.execute( - select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "revoked") - )).scalar() or 0 + select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "revoked"))).scalar() or 0 expired = (await self.db.execute( - select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "expired") - )).scalar() or 0 - - # Data requests + select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "expired"))).scalar() or 0 pending = (await self.db.execute( - select(func.count()).where(DataRequest.tenant_id == tenant_id, DataRequest.status == "pending") - )).scalar() or 0 + select(func.count()).where(DataRequest.tenant_id == tenant_id, DataRequest.status == "pending"))).scalar() or 0 completed = (await self.db.execute( - select(func.count()).where(DataRequest.tenant_id == tenant_id, DataRequest.status == "completed") - )).scalar() or 0 - - # Breakdown by type + select(func.count()).where(DataRequest.tenant_id == tenant_id, DataRequest.status == "completed"))).scalar() or 0 type_rows = (await self.db.execute( - select(DataRequest.request_type, func.count()) - .where(DataRequest.tenant_id == tenant_id) - .group_by(DataRequest.request_type) - )).all() - by_type = {row[0]: row[1] for row in type_rows} - - # Avg resolution time + select(DataRequest.request_type, func.count()).where(DataRequest.tenant_id == tenant_id) + .group_by(DataRequest.request_type))).all() + by_type = {r[0]: r[1] for r in type_rows} + # Average resolution time avg_hours: Optional[float] = None - completed_reqs = (await self.db.execute( - select(DataRequest).where( - DataRequest.tenant_id == tenant_id, - DataRequest.status == "completed", - DataRequest.completed_at.isnot(None), - ).limit(500) - )).scalars().all() - if completed_reqs: - deltas = [ - (r.completed_at - r.requested_at).total_seconds() / 3600 - for r in completed_reqs if r.completed_at and r.requested_at - ] + done = (await self.db.execute( + select(DataRequest).where(DataRequest.tenant_id == tenant_id, DataRequest.status == "completed", + DataRequest.completed_at.isnot(None)).limit(500))).scalars().all() + if done: + deltas = [(r.completed_at - r.requested_at).total_seconds() / 3600 + for r in done if r.completed_at and r.requested_at] avg_hours = round(sum(deltas) / len(deltas), 2) if deltas else None - - logger.info("PDPL compliance report generated: tenant=%s", tenant_id) + logger.info("PDPL report generated: tenant=%s", tenant_id) return ComplianceReport( - tenant_id=tenant_id, - generated_at=now, - total_consents=total, - active_consents=active, - revoked_consents=revoked, - expired_consents=expired, - pending_requests=pending, - completed_requests=completed, - requests_by_type=by_type, - avg_resolution_hours=avg_hours, - violations_detected=0, + tenant_id=tenant_id, generated_at=now, total_consents=total, + active_consents=active, revoked_consents=revoked, expired_consents=expired, + pending_requests=pending, completed_requests=completed, + requests_by_type=by_type, avg_resolution_hours=avg_hours, violations_detected=0, ) - # -- private helpers ----------------------------------------------------- - async def _get_lead(self, contact_id: UUID, tenant_id: UUID) -> Lead: result = await self.db.execute( - select(Lead).where(Lead.id == contact_id, Lead.tenant_id == tenant_id) - ) + select(Lead).where(Lead.id == contact_id, Lead.tenant_id == tenant_id)) lead = result.scalar_one_or_none() if not lead: - raise ValueError("جهة الاتصال غير موجودة") # Contact not found + raise ValueError("جهة الاتصال غير موجودة") return lead diff --git a/salesflow-saas/backend/app/services/territory_manager.py b/salesflow-saas/backend/app/services/territory_manager.py new file mode 100644 index 00000000..86899741 --- /dev/null +++ b/salesflow-saas/backend/app/services/territory_manager.py @@ -0,0 +1,281 @@ +""" +Dealix Saudi Territory Manager +إدارة المناطق وتوزيع العملاء على مندوبي المبيعات تلقائياً +""" + +import logging +from datetime import datetime, timezone +from typing import Optional + +from pydantic import BaseModel, Field +from sqlalchemy import select, func, and_ +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.lead import Lead +from app.models.user import User +from app.models.deal import Deal + +logger = logging.getLogger("dealix.territory") + +SAUDI_REGIONS: dict[str, dict] = { + "riyadh": { + "name_ar": "الرياض", + "name_en": "Riyadh", + "cities_ar": ["الرياض", "الخرج", "الدرعية", "المجمعة"], + }, + "jeddah": { + "name_ar": "جدة", + "name_en": "Jeddah", + "cities_ar": ["جدة", "رابغ", "الليث"], + }, + "eastern": { + "name_ar": "المنطقة الشرقية", + "name_en": "Eastern Province", + "cities_ar": ["الدمام", "الخبر", "الظهران", "الجبيل", "الأحساء", "القطيف"], + }, + "makkah": { + "name_ar": "مكة المكرمة", + "name_en": "Makkah", + "cities_ar": ["مكة المكرمة", "الطائف"], + }, + "madinah": { + "name_ar": "المدينة المنورة", + "name_en": "Madinah", + "cities_ar": ["المدينة المنورة", "ينبع"], + }, + "asir": { + "name_ar": "عسير", + "name_en": "Asir", + "cities_ar": ["أبها", "خميس مشيط", "النماص"], + }, + "qassim": { + "name_ar": "القصيم", + "name_en": "Qassim", + "cities_ar": ["بريدة", "عنيزة", "الرس"], + }, + "tabuk": { + "name_ar": "تبوك", + "name_en": "Tabuk", + "cities_ar": ["تبوك", "ضبا", "الوجه"], + }, + "hail": { + "name_ar": "حائل", + "name_en": "Hail", + "cities_ar": ["حائل", "بقعاء"], + }, + "jazan": { + "name_ar": "جازان", + "name_en": "Jazan", + "cities_ar": ["جازان", "صبيا", "أبو عريش"], + }, + "najran": { + "name_ar": "نجران", + "name_en": "Najran", + "cities_ar": ["نجران", "شرورة"], + }, + "baha": { + "name_ar": "الباحة", + "name_en": "Al Baha", + "cities_ar": ["الباحة", "بلجرشي"], + }, + "jouf": { + "name_ar": "الجوف", + "name_en": "Al Jouf", + "cities_ar": ["سكاكا", "دومة الجندل"], + }, +} + + +class TerritoryAssignment(BaseModel): + territory_key: str + rep_ids: list[str] = Field(default_factory=list) + round_robin_index: int = 0 + + +class TerritoryStats(BaseModel): + territory_key: str + name_ar: str + name_en: str + total_leads: int = 0 + total_deals: int = 0 + total_value: float = 0.0 + win_rate: float = 0.0 + reps_count: int = 0 + + +class TerritoryManager: + """Territory-based lead routing and performance analytics for Saudi regions.""" + + def __init__(self, db: AsyncSession): + self.db = db + self._assignments: dict[str, TerritoryAssignment] = {} + + async def assign_territory( + self, territory_key: str, rep_ids: list[str], + ) -> dict: + """Assign sales reps to a territory.""" + if territory_key not in SAUDI_REGIONS: + raise ValueError(f"منطقة غير معروفة: {territory_key}") + + self._assignments[territory_key] = TerritoryAssignment( + territory_key=territory_key, + rep_ids=rep_ids, + round_robin_index=0, + ) + region = SAUDI_REGIONS[territory_key] + logger.info( + "Territory '%s' assigned to %d reps", territory_key, len(rep_ids), + ) + return { + "territory": territory_key, + "name_ar": region["name_ar"], + "name_en": region["name_en"], + "reps_assigned": len(rep_ids), + "rep_ids": rep_ids, + } + + async def auto_route_lead( + self, + tenant_id: str, + lead_id: str, + region_key: Optional[str] = None, + city_hint: Optional[str] = None, + ) -> dict: + """Auto-assign a lead to the next rep in the matching territory via round-robin.""" + territory_key = region_key + if not territory_key and city_hint: + territory_key = self._detect_territory(city_hint) + if not territory_key: + territory_key = "riyadh" + + assignment = self._assignments.get(territory_key) + if not assignment or not assignment.rep_ids: + logger.warning( + "No reps assigned to territory '%s', falling back to riyadh", + territory_key, + ) + assignment = self._assignments.get("riyadh") + territory_key = "riyadh" + + if not assignment or not assignment.rep_ids: + return { + "lead_id": lead_id, + "assigned_to": None, + "territory": territory_key, + "error_ar": "لا يوجد مندوبين معينين لهذه المنطقة", + } + + rep_id = assignment.rep_ids[assignment.round_robin_index % len(assignment.rep_ids)] + assignment.round_robin_index += 1 + + import uuid + result = await self.db.execute( + select(Lead).where( + Lead.id == uuid.UUID(lead_id), + Lead.tenant_id == uuid.UUID(tenant_id), + ) + ) + lead = result.scalar_one_or_none() + if lead: + lead.assigned_to = uuid.UUID(rep_id) + metadata = dict(lead.extra_metadata or {}) + metadata["territory"] = territory_key + metadata["auto_routed_at"] = datetime.now(timezone.utc).isoformat() + lead.extra_metadata = metadata + await self.db.flush() + + region = SAUDI_REGIONS.get(territory_key, {}) + logger.info("Lead %s routed to rep %s in %s", lead_id, rep_id, territory_key) + return { + "lead_id": lead_id, + "assigned_to": rep_id, + "territory": territory_key, + "territory_name_ar": region.get("name_ar", ""), + } + + async def get_territory_stats( + self, tenant_id: str, territory_key: Optional[str] = None, + ) -> list[TerritoryStats]: + """Get performance analytics per territory.""" + import uuid + + keys = [territory_key] if territory_key else list(SAUDI_REGIONS.keys()) + stats_list: list[TerritoryStats] = [] + + for key in keys: + region = SAUDI_REGIONS.get(key) + if not region: + continue + + assignment = self._assignments.get(key) + rep_ids = assignment.rep_ids if assignment else [] + + if not rep_ids: + stats_list.append(TerritoryStats( + territory_key=key, + name_ar=region["name_ar"], + name_en=region["name_en"], + )) + continue + + rep_uuids = [uuid.UUID(r) for r in rep_ids] + tid = uuid.UUID(tenant_id) + + lead_count_q = select(func.count()).where( + Lead.tenant_id == tid, + Lead.assigned_to.in_(rep_uuids), + ) + total_leads = (await self.db.execute(lead_count_q)).scalar() or 0 + + deals_q = select(func.count(), func.coalesce(func.sum(Deal.value), 0)).where( + Deal.tenant_id == tid, + Deal.assigned_to.in_(rep_uuids), + ) + row = (await self.db.execute(deals_q)).one_or_none() + total_deals = row[0] if row else 0 + total_value = float(row[1]) if row else 0.0 + + won_q = select(func.count()).where( + Deal.tenant_id == tid, + Deal.assigned_to.in_(rep_uuids), + Deal.stage == "closed_won", + ) + won_count = (await self.db.execute(won_q)).scalar() or 0 + win_rate = round((won_count / total_deals) * 100, 1) if total_deals > 0 else 0.0 + + stats_list.append(TerritoryStats( + territory_key=key, + name_ar=region["name_ar"], + name_en=region["name_en"], + total_leads=total_leads, + total_deals=total_deals, + total_value=total_value, + win_rate=win_rate, + reps_count=len(rep_ids), + )) + + return stats_list + + def list_regions(self) -> list[dict]: + """Return all Saudi regions with metadata.""" + return [ + { + "key": key, + "name_ar": info["name_ar"], + "name_en": info["name_en"], + "cities_ar": info["cities_ar"], + "reps_assigned": len(self._assignments.get(key, TerritoryAssignment(territory_key=key)).rep_ids), + } + for key, info in SAUDI_REGIONS.items() + ] + + def _detect_territory(self, city_hint: str) -> Optional[str]: + """Detect territory from a city name hint (Arabic or English).""" + hint_lower = city_hint.strip().lower() + for key, info in SAUDI_REGIONS.items(): + if hint_lower in info["name_en"].lower() or hint_lower == key: + return key + for city in info["cities_ar"]: + if city in city_hint: + return key + return None diff --git a/salesflow-saas/seeds/contracting_template.json b/salesflow-saas/seeds/contracting_template.json new file mode 100644 index 00000000..b54ec120 --- /dev/null +++ b/salesflow-saas/seeds/contracting_template.json @@ -0,0 +1,118 @@ +{ + "industry": "contracting", + "name": "Contracting & Services", + "name_ar": "مقاولات وخدمات", + "pipeline_stages": [ + {"key": "inquiry", "name_en": "Inquiry", "name_ar": "استفسار", "order": 1, "probability": 10}, + {"key": "site_visit", "name_en": "Site Visit", "name_ar": "زيارة موقع", "order": 2, "probability": 25}, + {"key": "quotation", "name_en": "Quotation", "name_ar": "عرض سعر", "order": 3, "probability": 40}, + {"key": "negotiation", "name_en": "Negotiation", "name_ar": "تفاوض", "order": 4, "probability": 60}, + {"key": "contract", "name_en": "Contract", "name_ar": "عقد", "order": 5, "probability": 80}, + {"key": "execution", "name_en": "Execution", "name_ar": "تنفيذ", "order": 6, "probability": 90}, + {"key": "completion", "name_en": "Completion", "name_ar": "إنجاز", "order": 7, "probability": 100} + ], + "message_templates": [ + { + "name": "welcome", + "name_ar": "رسالة ترحيب", + "channel": "whatsapp", + "trigger": "lead_created", + "content_ar": "أهلاً {name}! شكراً لتواصلك مع {company}. متخصصين في أعمال المقاولات والصيانة. وش نوع المشروع اللي تحتاجه؟", + "content_en": "Hello {name}! Thank you for contacting {company}. We specialize in contracting and maintenance. What type of project do you need?", + "delay_minutes": 0 + }, + { + "name": "site_visit_confirmation", + "name_ar": "تأكيد زيارة الموقع", + "channel": "whatsapp", + "trigger": "stage_change_site_visit", + "content_ar": "مرحباً {name}، تم تحديد موعد زيارة الموقع يوم {date} الساعة {time}. فريقنا الفني بيكون موجود للمعاينة وأخذ المقاسات. الموقع: {location}", + "content_en": "Hi {name}, site visit confirmed for {date} at {time}. Our technical team will be there for inspection. Location: {location}", + "delay_minutes": 0 + }, + { + "name": "quotation_ready", + "name_ar": "عرض السعر جاهز", + "channel": "whatsapp", + "trigger": "stage_change_quotation", + "content_ar": "مرحباً {name}، عرض السعر جاهز لمشروعك. الإجمالي: {total_amount} ريال شامل الضريبة. يشمل المواد والعمالة والضمان. نرسل لك التفاصيل الكاملة؟", + "content_en": "Hi {name}, your quotation is ready. Total: {total_amount} SAR including VAT. Includes materials, labor, and warranty. Shall we send the full details?", + "delay_minutes": 0 + }, + { + "name": "no_response_followup", + "name_ar": "متابعة عدم الرد", + "channel": "whatsapp", + "trigger": "no_response", + "content_ar": "مرحباً {name}، تواصلنا معك بخصوص مشروع {project_type}. إذا عندك أي استفسار أو تبي تعدّل على العرض، لا تتردد تراسلنا. نحن هنا لخدمتك.", + "content_en": "Hi {name}, we reached out regarding your {project_type} project. If you have questions or want to modify the quote, don't hesitate to reach out.", + "delay_minutes": 4320 + }, + { + "name": "execution_update", + "name_ar": "تحديث التنفيذ", + "channel": "whatsapp", + "trigger": "stage_change_execution", + "content_ar": "مرحباً {name}، نبشرك إن العمل بدأ في مشروعك. المدة المتوقعة: {duration}. بنرسل لك تحديثات دورية عن سير العمل. لأي استفسار تواصل معنا.", + "content_en": "Hi {name}, work has started on your project. Expected duration: {duration}. We'll send periodic progress updates.", + "delay_minutes": 0 + } + ], + "proposal_templates": [ + { + "name": "project_quotation", + "name_ar": "عرض سعر مشروع", + "sections": [ + {"title_ar": "نطاق العمل", "title_en": "Scope of Work"}, + {"title_ar": "المواد والمواصفات", "title_en": "Materials & Specifications"}, + {"title_ar": "الجدول الزمني", "title_en": "Timeline"}, + {"title_ar": "التكلفة التفصيلية", "title_en": "Detailed Cost Breakdown"}, + {"title_ar": "شروط الدفع", "title_en": "Payment Terms"}, + {"title_ar": "الضمان", "title_en": "Warranty"}, + {"title_ar": "الشروط والأحكام", "title_en": "Terms & Conditions"} + ] + }, + { + "name": "maintenance_contract", + "name_ar": "عقد صيانة", + "sections": [ + {"title_ar": "نطاق خدمات الصيانة", "title_en": "Maintenance Scope"}, + {"title_ar": "جدول الزيارات الدورية", "title_en": "Periodic Visit Schedule"}, + {"title_ar": "قطع الغيار المشمولة", "title_en": "Included Spare Parts"}, + {"title_ar": "الرسوم السنوية", "title_en": "Annual Fees"}, + {"title_ar": "مدة العقد والتجديد", "title_en": "Contract Duration & Renewal"} + ] + } + ], + "workflow_templates": [ + { + "name": "new_project_flow", + "name_ar": "تدفق مشروع جديد", + "trigger": "lead_created", + "actions": [ + {"type": "send_message", "template": "welcome", "delay_minutes": 0}, + {"type": "create_task", "subject": "اتصل بالعميل وحدد نوع المشروع", "delay_minutes": 15}, + {"type": "send_message", "template": "no_response_followup", "delay_minutes": 4320, "condition": "no_response"} + ] + }, + { + "name": "site_visit_flow", + "name_ar": "تدفق زيارة الموقع", + "trigger": "stage_change_site_visit", + "actions": [ + {"type": "send_message", "template": "site_visit_confirmation", "delay_minutes": 0}, + {"type": "create_task", "subject": "تجهيز فريق المعاينة الفنية", "delay_minutes": 60}, + {"type": "create_task", "subject": "إعداد عرض السعر بعد المعاينة", "delay_minutes": 1440} + ] + }, + { + "name": "execution_monitoring_flow", + "name_ar": "تدفق متابعة التنفيذ", + "trigger": "stage_change_execution", + "actions": [ + {"type": "send_message", "template": "execution_update", "delay_minutes": 0}, + {"type": "create_task", "subject": "تحديث العميل بتقرير أسبوعي", "delay_minutes": 10080} + ] + } + ] +}