feat: Add conversation intelligence, message writer, sales agent, APIs, and templates

Continuing Phase 3-6 implementation:

- AI: conversation_intelligence.py (Arabic dialogue analysis, buying signals)
- AI: message_writer.py (Arabic/English multi-channel message generation)
- AI: sales_agent.py (autonomous WhatsApp qualification bot)
- API: compliance.py (PDPL consent & data rights endpoints)
- API: inbox.py (unified multi-channel inbox)
- API: proposals.py (CPQ quote management endpoints)
- API: sequences.py (multi-channel sequence management)
- Services: territory_manager.py (Saudi region-based lead routing)
- Seeds: contracting_template.json (Saudi contracting industry template)
- Updated: router.py, consent_manager.py, data_rights.py

https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj
This commit is contained in:
Claude 2026-04-11 07:43:11 +00:00
parent a329957a3b
commit 141f10db76
No known key found for this signature in database
12 changed files with 2814 additions and 352 deletions

View File

@ -0,0 +1,245 @@
"""PDPL Compliance API -- consent management, data subject requests, and SDAIA audit reports."""
import logging
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel as Schema
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user
from app.models.user import User
from app.models.consent import (
PDPLConsent, PDPLConsentAudit, DataRequest,
ConsentStatusEnum, DataRequestStatus,
)
from app.services.pdpl.consent_manager import (
ConsentManager, ConsentGrantInput, ConsentRevokeInput,
ConsentCheckResult, DataRequestInput, AuditEntry,
)
from app.services.pdpl.data_rights import DataRightsHandler, ComplianceReport
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance", tags=["PDPL Compliance"])
# ---------------------------------------------------------------------------
# Schemas
# ---------------------------------------------------------------------------
class ConsentCreateRequest(Schema):
contact_id: UUID
purpose: str # marketing, sales, service, analytics
channel: str # whatsapp, email, sms, phone
consent_text: Optional[str] = None
ip_address: Optional[str] = None
expiry_months: int = 12
class ConsentResponse(Schema):
id: UUID
contact_id: UUID
tenant_id: UUID
purpose: str
channel: str
status: str
granted_at: Optional[datetime] = None
revoked_at: Optional[datetime] = None
expires_at: Optional[datetime] = None
created_at: datetime
model_config = {"from_attributes": True}
class ConsentListResponse(Schema):
items: list[ConsentResponse]
total: int
page: int
per_page: int
class DataRequestCreateRequest(Schema):
contact_id: UUID
request_type: str # access, correction, deletion, restriction
notes: Optional[str] = None
class DataRequestResponse(Schema):
id: UUID
contact_id: UUID
request_type: str
status: str
requested_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
response_data: Optional[dict] = None
created_at: datetime
model_config = {"from_attributes": True}
class AuditListResponse(Schema):
items: list[AuditEntry]
total: int
page: int
per_page: int
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("/consents", response_model=ConsentListResponse)
async def list_consents(
purpose: Optional[str] = Query(None),
channel: Optional[str] = Query(None),
consent_status: Optional[str] = Query(None, alias="status"),
contact_id: Optional[UUID] = Query(None),
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List PDPL consents with filters."""
query = select(PDPLConsent).where(PDPLConsent.tenant_id == current_user.tenant_id)
if purpose:
query = query.where(PDPLConsent.purpose == purpose)
if channel:
query = query.where(PDPLConsent.channel == channel)
if consent_status:
query = query.where(PDPLConsent.status == consent_status)
if contact_id:
query = query.where(PDPLConsent.contact_id == contact_id)
total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar() or 0
rows = await db.execute(
query.order_by(PDPLConsent.created_at.desc()).offset((page - 1) * per_page).limit(per_page)
)
items = [ConsentResponse.model_validate(c) for c in rows.scalars().all()]
return ConsentListResponse(items=items, total=total, page=page, per_page=per_page)
@router.post("/consent", response_model=ConsentResponse, status_code=status.HTTP_201_CREATED)
async def record_consent(
data: ConsentCreateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Record a new PDPL consent."""
mgr = ConsentManager(db)
consent = await mgr.grant_consent(ConsentGrantInput(
contact_id=data.contact_id,
tenant_id=current_user.tenant_id,
purpose=data.purpose,
channel=data.channel,
consent_text=data.consent_text,
ip_address=data.ip_address,
actor_id=current_user.id,
expiry_months=data.expiry_months,
))
return ConsentResponse.model_validate(consent)
@router.delete("/consent/{consent_id}", status_code=status.HTTP_200_OK)
async def revoke_consent(
consent_id: UUID,
reason: Optional[str] = Query(None),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Revoke a PDPL consent."""
mgr = ConsentManager(db)
try:
consent = await mgr.revoke_consent(ConsentRevokeInput(
consent_id=consent_id,
actor_id=current_user.id,
reason=reason,
))
except ValueError as exc:
raise HTTPException(status_code=404, detail=str(exc))
return {"detail": "تم إلغاء الموافقة بنجاح", "consent_id": str(consent.id)}
@router.post("/data-request", response_model=DataRequestResponse, status_code=status.HTTP_201_CREATED)
async def submit_data_request(
data: DataRequestCreateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Submit a data subject rights request (access, correction, deletion, restriction)."""
valid_types = {"access", "correction", "deletion", "restriction"}
if data.request_type not in valid_types:
raise HTTPException(status_code=400, detail=f"نوع الطلب غير صالح. الأنواع المسموحة: {', '.join(valid_types)}")
mgr = ConsentManager(db)
request = await mgr.process_data_request(DataRequestInput(
contact_id=data.contact_id,
tenant_id=current_user.tenant_id,
request_type=data.request_type,
notes=data.notes,
actor_id=current_user.id,
))
return DataRequestResponse.model_validate(request)
@router.get("/data-request/{request_id}", response_model=DataRequestResponse)
async def get_data_request(
request_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Check data request status."""
result = await db.execute(
select(DataRequest).where(
DataRequest.id == request_id,
DataRequest.tenant_id == current_user.tenant_id,
)
)
req = result.scalar_one_or_none()
if not req:
raise HTTPException(status_code=404, detail="طلب البيانات غير موجود")
return DataRequestResponse.model_validate(req)
@router.get("/audit", response_model=AuditListResponse)
async def get_audit_trail(
contact_id: Optional[UUID] = Query(None),
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=200),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Consent audit trail for compliance review."""
query = select(PDPLConsentAudit).where(PDPLConsentAudit.tenant_id == current_user.tenant_id)
if contact_id:
query = query.where(PDPLConsentAudit.contact_id == contact_id)
total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar() or 0
mgr = ConsentManager(db)
items = await mgr.get_consent_audit(
tenant_id=current_user.tenant_id,
contact_id=contact_id,
limit=per_page,
offset=(page - 1) * per_page,
)
return AuditListResponse(items=items, total=total, page=page, per_page=per_page)
@router.get("/report", response_model=ComplianceReport)
async def generate_compliance_report(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Generate SDAIA-ready PDPL compliance report."""
handler = DataRightsHandler(db)
return await handler.generate_compliance_report(current_user.tenant_id)

View File

@ -0,0 +1,258 @@
"""Unified inbox API -- aggregate messages from WhatsApp, Email, SMS."""
import logging
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel as Schema
from sqlalchemy import select, func, and_, or_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user
from app.models.user import User
from app.models.message import Message
from app.models.lead import Lead
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/inbox", tags=["Inbox"])
# ---------------------------------------------------------------------------
# Schemas
# ---------------------------------------------------------------------------
class MessageResponse(Schema):
id: UUID
lead_id: Optional[UUID] = None
channel: str
direction: str
content: Optional[str] = None
status: str
sent_at: Optional[datetime] = None
created_at: datetime
extra_metadata: Optional[dict] = None
lead_name: Optional[str] = None
model_config = {"from_attributes": True}
class MessageListResponse(Schema):
items: list[MessageResponse]
total: int
page: int
per_page: int
class ThreadResponse(Schema):
lead_id: UUID
lead_name: Optional[str] = None
messages: list[MessageResponse]
class ReplyInput(Schema):
lead_id: UUID
channel: str
content: str
class AssignInput(Schema):
lead_id: UUID
assigned_to: UUID
class InboxStats(Schema):
total_unread: int
whatsapp_unread: int
email_unread: int
sms_unread: int
avg_response_minutes: Optional[float] = None
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("", response_model=MessageListResponse)
async def list_inbox(
channel: Optional[str] = Query(None),
status_filter: Optional[str] = Query(None, alias="status"),
assigned_to: Optional[UUID] = Query(None),
search: Optional[str] = Query(None),
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List all messages across channels with filters."""
query = select(Message).where(Message.tenant_id == current_user.tenant_id)
if channel:
query = query.where(Message.channel == channel)
if status_filter:
query = query.where(Message.status == status_filter)
if assigned_to:
query = query.join(Lead, Lead.id == Message.lead_id).where(Lead.assigned_to == assigned_to)
if search:
query = query.where(Message.content.ilike(f"%{search}%"))
total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar() or 0
rows = await db.execute(
query.order_by(Message.created_at.desc()).offset((page - 1) * per_page).limit(per_page)
)
messages = rows.scalars().all()
# Batch-load lead names
lead_ids = {m.lead_id for m in messages if m.lead_id}
lead_map: dict[UUID, str] = {}
if lead_ids:
leads_q = await db.execute(select(Lead.id, Lead.name).where(Lead.id.in_(lead_ids)))
lead_map = {row[0]: row[1] for row in leads_q.all()}
items = []
for m in messages:
resp = MessageResponse.model_validate(m)
resp.lead_name = lead_map.get(m.lead_id)
items.append(resp)
return MessageListResponse(items=items, total=total, page=page, per_page=per_page)
@router.get("/stats", response_model=InboxStats)
async def inbox_stats(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Unread counts per channel and response-time metrics."""
tid = current_user.tenant_id
base = and_(Message.tenant_id == tid, Message.direction == "inbound", Message.status != "read")
total = (await db.execute(select(func.count()).where(base))).scalar() or 0
wa = (await db.execute(
select(func.count()).where(base, Message.channel == "whatsapp")
)).scalar() or 0
em = (await db.execute(
select(func.count()).where(base, Message.channel == "email")
)).scalar() or 0
sm = (await db.execute(
select(func.count()).where(base, Message.channel == "sms")
)).scalar() or 0
return InboxStats(
total_unread=total,
whatsapp_unread=wa,
email_unread=em,
sms_unread=sm,
avg_response_minutes=None,
)
@router.get("/{message_id}", response_model=ThreadResponse)
async def get_thread(
message_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get full conversation thread for a message."""
msg = (await db.execute(
select(Message).where(Message.id == message_id, Message.tenant_id == current_user.tenant_id)
)).scalar_one_or_none()
if not msg:
raise HTTPException(status_code=404, detail="الرسالة غير موجودة")
if not msg.lead_id:
return ThreadResponse(lead_id=message_id, messages=[MessageResponse.model_validate(msg)])
lead = (await db.execute(select(Lead).where(Lead.id == msg.lead_id))).scalar_one_or_none()
thread_q = await db.execute(
select(Message)
.where(Message.lead_id == msg.lead_id, Message.tenant_id == current_user.tenant_id)
.order_by(Message.created_at.asc())
)
messages = [MessageResponse.model_validate(m) for m in thread_q.scalars().all()]
return ThreadResponse(
lead_id=msg.lead_id,
lead_name=lead.name if lead else None,
messages=messages,
)
@router.post("/reply", response_model=MessageResponse, status_code=status.HTTP_201_CREATED)
async def reply_message(
data: ReplyInput,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Reply to a conversation -- auto-routes to the correct channel."""
lead = (await db.execute(
select(Lead).where(Lead.id == data.lead_id, Lead.tenant_id == current_user.tenant_id)
)).scalar_one_or_none()
if not lead:
raise HTTPException(status_code=404, detail="العميل المحتمل غير موجود")
if data.channel not in ("whatsapp", "email", "sms"):
raise HTTPException(status_code=400, detail="قناة غير مدعومة")
now = datetime.now(timezone.utc)
message = Message(
tenant_id=current_user.tenant_id,
lead_id=data.lead_id,
channel=data.channel,
direction="outbound",
content=data.content,
status="pending",
sent_at=now,
extra_metadata={"sent_by": str(current_user.id)},
)
db.add(message)
await db.flush()
await db.refresh(message)
logger.info("Inbox reply sent: lead=%s channel=%s by=%s", data.lead_id, data.channel, current_user.id)
return MessageResponse.model_validate(message)
@router.post("/assign", status_code=status.HTTP_200_OK)
async def assign_conversation(
data: AssignInput,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Assign a conversation (lead) to a team member."""
lead = (await db.execute(
select(Lead).where(Lead.id == data.lead_id, Lead.tenant_id == current_user.tenant_id)
)).scalar_one_or_none()
if not lead:
raise HTTPException(status_code=404, detail="العميل المحتمل غير موجود")
lead.assigned_to = data.assigned_to
await db.flush()
logger.info("Conversation assigned: lead=%s to=%s", data.lead_id, data.assigned_to)
return {"detail": "تم تعيين المحادثة بنجاح", "lead_id": str(data.lead_id), "assigned_to": str(data.assigned_to)}
@router.put("/{message_id}/read", status_code=status.HTTP_200_OK)
async def mark_read(
message_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Mark a message as read."""
msg = (await db.execute(
select(Message).where(Message.id == message_id, Message.tenant_id == current_user.tenant_id)
)).scalar_one_or_none()
if not msg:
raise HTTPException(status_code=404, detail="الرسالة غير موجودة")
msg.status = "read"
await db.flush()
return {"detail": "تم تحديد الرسالة كمقروءة", "message_id": str(message_id)}

View File

@ -0,0 +1,346 @@
"""
Dealix Proposals & Quotes API
إدارة عروض الأسعار والعروض التجارية
"""
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user
from app.models.user import User
from app.models.proposal import Proposal
from app.services.cpq.quote_engine import (
QuoteEngine, QuoteCreate, LineItemInput, DiscountInput, QuoteStatus,
)
from app.services.cpq.proposal_generator import (
ProposalGenerator, ProposalInput,
)
router = APIRouter(prefix="/proposals", tags=["Proposals & Quotes"])
# ── Request / Response Models ────────────────────
class ProposalCreateRequest(BaseModel):
deal_id: Optional[str] = None
lead_id: Optional[str] = None
title: str
currency: str = "SAR"
industry: str = "services"
validity_days: int = 30
vat_registration_number: Optional[str] = None
client_name: str = ""
client_company: str = ""
notes_ar: str = ""
class ProposalUpdateRequest(BaseModel):
title: Optional[str] = None
notes_ar: Optional[str] = None
validity_days: Optional[int] = None
vat_registration_number: Optional[str] = None
class SendRequest(BaseModel):
channel: str = Field(pattern=r"^(whatsapp|email)$", default="whatsapp")
recipient: str
class AcceptRequest(BaseModel):
client_signature: str = ""
notes: str = ""
class AIProposalRequest(BaseModel):
deal_title: str
client_name: str
client_company: str = ""
industry: str = "services"
deal_value: float = 0.0
currency: str = "SAR"
requirements: str = ""
language: str = "ar"
extra_context: str = ""
# ── Endpoints ────────────────────────────────────
@router.get("")
async def list_proposals(
status: Optional[str] = Query(None),
deal_id: Optional[UUID] = Query(None),
date_from: Optional[str] = Query(None),
date_to: Optional[str] = Query(None),
page: int = Query(1, ge=1),
per_page: int = Query(25, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List proposals with filters."""
query = select(Proposal).where(Proposal.tenant_id == current_user.tenant_id)
if status:
query = query.where(Proposal.status == status)
if deal_id:
query = query.where(Proposal.deal_id == deal_id)
if date_from:
query = query.where(Proposal.created_at >= datetime.fromisoformat(date_from))
if date_to:
query = query.where(Proposal.created_at <= datetime.fromisoformat(date_to))
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar() or 0
query = query.order_by(Proposal.created_at.desc())
query = query.offset((page - 1) * per_page).limit(per_page)
result = await db.execute(query)
items = [_proposal_dict(p) for p in result.scalars().all()]
return {"items": items, "total": total, "page": page, "per_page": per_page}
@router.post("", status_code=201)
async def create_proposal(
data: ProposalCreateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new proposal/quote."""
engine = QuoteEngine(db)
quote_data = QuoteCreate(
tenant_id=str(current_user.tenant_id),
deal_id=data.deal_id,
lead_id=data.lead_id,
title=data.title,
currency=data.currency,
industry=data.industry,
validity_days=data.validity_days,
vat_registration_number=data.vat_registration_number,
client_name=data.client_name,
client_company=data.client_company,
notes_ar=data.notes_ar,
)
return await engine.create_quote(quote_data)
@router.get("/analytics")
async def proposal_analytics(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Win rate, average deal size, time-to-close analytics."""
tid = current_user.tenant_id
total_q = select(func.count()).where(Proposal.tenant_id == tid)
total = (await db.execute(total_q)).scalar() or 0
accepted_q = select(func.count()).where(
Proposal.tenant_id == tid, Proposal.status == QuoteStatus.ACCEPTED.value,
)
accepted = (await db.execute(accepted_q)).scalar() or 0
rejected_q = select(func.count()).where(
Proposal.tenant_id == tid, Proposal.status == QuoteStatus.REJECTED.value,
)
rejected = (await db.execute(rejected_q)).scalar() or 0
avg_value_q = select(func.avg(Proposal.total_amount)).where(
Proposal.tenant_id == tid, Proposal.status == QuoteStatus.ACCEPTED.value,
)
avg_value = (await db.execute(avg_value_q)).scalar()
decided = accepted + rejected
win_rate = round((accepted / decided) * 100, 1) if decided > 0 else 0.0
return {
"total_proposals": total,
"accepted": accepted,
"rejected": rejected,
"win_rate_percent": win_rate,
"average_deal_value": float(avg_value) if avg_value else 0.0,
"currency": "SAR",
}
@router.get("/{proposal_id}")
async def get_proposal(
proposal_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get full proposal details."""
result = await db.execute(
select(Proposal).where(
Proposal.id == proposal_id,
Proposal.tenant_id == current_user.tenant_id,
)
)
proposal = result.scalar_one_or_none()
if not proposal:
raise HTTPException(status_code=404, detail="عرض السعر غير موجود")
return _proposal_dict(proposal)
@router.put("/{proposal_id}")
async def update_proposal(
proposal_id: UUID,
data: ProposalUpdateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Update proposal metadata."""
result = await db.execute(
select(Proposal).where(
Proposal.id == proposal_id,
Proposal.tenant_id == current_user.tenant_id,
)
)
proposal = result.scalar_one_or_none()
if not proposal:
raise HTTPException(status_code=404, detail="عرض السعر غير موجود")
if data.title is not None:
proposal.title = data.title
if data.notes_ar is not None:
content = dict(proposal.content)
content["notes_ar"] = data.notes_ar
proposal.content = content
if data.vat_registration_number is not None:
content = dict(proposal.content)
content["vat_registration_number"] = data.vat_registration_number
proposal.content = content
await db.flush()
await db.refresh(proposal)
return _proposal_dict(proposal)
@router.post("/{proposal_id}/items")
async def add_line_item(
proposal_id: UUID,
item: LineItemInput,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Add a line item to the quote."""
engine = QuoteEngine(db)
return await engine.add_line_item(str(current_user.tenant_id), str(proposal_id), item)
@router.post("/{proposal_id}/discount")
async def apply_discount(
proposal_id: UUID,
discount: DiscountInput,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Apply a discount to the quote."""
engine = QuoteEngine(db)
return await engine.apply_discount(str(current_user.tenant_id), str(proposal_id), discount)
@router.post("/{proposal_id}/send")
async def send_proposal(
proposal_id: UUID,
data: SendRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Send proposal via WhatsApp or Email."""
engine = QuoteEngine(db)
return await engine.send_quote(
str(current_user.tenant_id), str(proposal_id), data.channel, data.recipient,
)
@router.post("/{proposal_id}/accept")
async def accept_proposal(
proposal_id: UUID,
data: AcceptRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Client acceptance endpoint."""
result = await db.execute(
select(Proposal).where(
Proposal.id == proposal_id,
Proposal.tenant_id == current_user.tenant_id,
)
)
proposal = result.scalar_one_or_none()
if not proposal:
raise HTTPException(status_code=404, detail="عرض السعر غير موجود")
if proposal.status == QuoteStatus.EXPIRED.value:
raise HTTPException(status_code=400, detail="عرض السعر منتهي الصلاحية")
proposal.status = QuoteStatus.ACCEPTED.value
content = dict(proposal.content)
content["acceptance"] = {
"signature": data.client_signature,
"notes": data.notes,
"accepted_at": datetime.now(timezone.utc).isoformat(),
}
proposal.content = content
await db.flush()
await db.refresh(proposal)
return _proposal_dict(proposal)
@router.get("/{proposal_id}/pdf")
async def generate_pdf_data(
proposal_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Generate PDF-ready data for a proposal."""
result = await db.execute(
select(Proposal).where(
Proposal.id == proposal_id,
Proposal.tenant_id == current_user.tenant_id,
)
)
proposal = result.scalar_one_or_none()
if not proposal:
raise HTTPException(status_code=404, detail="عرض السعر غير موجود")
generator = ProposalGenerator()
ai_req = ProposalInput(
deal_title=proposal.title,
client_name=proposal.content.get("client_name", ""),
client_company=proposal.content.get("client_company", ""),
industry=proposal.content.get("industry", "services"),
deal_value=float(proposal.total_amount) if proposal.total_amount else 0.0,
currency=proposal.currency or "SAR",
requirements=proposal.content.get("notes_ar", ""),
language="both",
)
ai_proposal = await generator.generate_proposal(ai_req)
return await generator.export_pdf_data(ai_proposal)
# ── Helpers ──────────────────────────────────────
def _proposal_dict(p: Proposal) -> dict:
return {
"id": str(p.id),
"tenant_id": str(p.tenant_id),
"deal_id": str(p.deal_id) if p.deal_id else None,
"lead_id": str(p.lead_id) if p.lead_id else None,
"title": p.title,
"content": p.content,
"total_amount": str(p.total_amount) if p.total_amount else "0",
"currency": p.currency,
"status": p.status,
"valid_until": p.valid_until.isoformat() if p.valid_until else None,
"sent_at": p.sent_at.isoformat() if p.sent_at else None,
"viewed_at": p.viewed_at.isoformat() if p.viewed_at else None,
"created_at": p.created_at.isoformat() if p.created_at else None,
"updated_at": p.updated_at.isoformat() if p.updated_at else None,
}

View File

@ -4,7 +4,9 @@ from app.api.v1 import (
companies, contacts, calls, meetings, commissions, payouts, disputes,
guarantees, consents, complaints, knowledge, sectors, presentations,
supervisor, admin, health, analytics, webhooks, prospecting,
inbox, sequences,
)
from app.api.v1 import compliance as compliance_router
from app.api.v1 import agents as agents_router
from app.api.v1 import intelligence as intelligence_router
from app.api.v1 import master as master_router
@ -56,6 +58,9 @@ api_router.include_router(operations_router.router)
api_router.include_router(analytics.router, tags=["Analytics & AI"])
api_router.include_router(webhooks.router, tags=["Webhooks"])
api_router.include_router(prospecting.router, prefix="/prospecting", tags=["Prospecting"])
api_router.include_router(inbox.router)
api_router.include_router(sequences.router)
api_router.include_router(compliance_router.router)
# ── Manus Multi-Agent + Autonomous Intelligence ─────────────
api_router.include_router(agents_router.router)

View File

@ -0,0 +1,239 @@
"""Sequences API -- create, manage, and analyze multi-channel outreach sequences."""
import logging
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel as Schema
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user
from app.models.user import User
from app.models.sequence import Sequence, SequenceStep, SequenceEnrollment
from app.services.sequence_engine import (
SequenceEngine, SequenceCreateInput, EnrollInput, SequenceAnalytics,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/sequences", tags=["Sequences"])
# ---------------------------------------------------------------------------
# Schemas
# ---------------------------------------------------------------------------
class StepSchema(Schema):
channel: str
delay_minutes: int = 0
template_content: str
template_content_ar: Optional[str] = None
variant: Optional[str] = None
conditions: dict = {}
class SequenceCreateRequest(Schema):
name: str
name_ar: Optional[str] = None
description: Optional[str] = None
trigger_event: Optional[str] = None
steps: list[StepSchema] = []
class SequenceUpdateRequest(Schema):
name: Optional[str] = None
name_ar: Optional[str] = None
description: Optional[str] = None
trigger_event: Optional[str] = None
is_active: Optional[bool] = None
class SequenceResponse(Schema):
id: UUID
name: str
name_ar: Optional[str] = None
description: Optional[str] = None
trigger_event: Optional[str] = None
is_active: bool
created_at: object
step_count: int = 0
enrollment_count: int = 0
model_config = {"from_attributes": True}
class SequenceListResponse(Schema):
items: list[SequenceResponse]
total: int
class EnrollRequest(Schema):
lead_id: UUID
class EnrollmentResponse(Schema):
id: UUID
sequence_id: UUID
lead_id: UUID
current_step: int
status: str
enrolled_at: object
model_config = {"from_attributes": True}
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("", response_model=SequenceListResponse)
async def list_sequences(
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List sequences with analytics summary."""
tid = current_user.tenant_id
total = (await db.execute(
select(func.count()).where(Sequence.tenant_id == tid)
)).scalar() or 0
rows = await db.execute(
select(Sequence)
.where(Sequence.tenant_id == tid)
.order_by(Sequence.created_at.desc())
.offset((page - 1) * per_page).limit(per_page)
)
sequences = rows.scalars().all()
items = []
for seq in sequences:
step_count = (await db.execute(
select(func.count()).where(SequenceStep.sequence_id == seq.id)
)).scalar() or 0
enroll_count = (await db.execute(
select(func.count()).where(SequenceEnrollment.sequence_id == seq.id)
)).scalar() or 0
resp = SequenceResponse.model_validate(seq)
resp.step_count = step_count
resp.enrollment_count = enroll_count
items.append(resp)
return SequenceListResponse(items=items, total=total)
@router.post("", response_model=SequenceResponse, status_code=status.HTTP_201_CREATED)
async def create_sequence(
data: SequenceCreateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new sequence with steps."""
engine = SequenceEngine(db)
seq = await engine.create_sequence(SequenceCreateInput(
tenant_id=current_user.tenant_id,
name=data.name,
name_ar=data.name_ar,
description=data.description,
trigger_event=data.trigger_event,
created_by=current_user.id,
steps=[s.model_dump() for s in data.steps],
))
resp = SequenceResponse.model_validate(seq)
resp.step_count = len(data.steps)
return resp
@router.put("/{sequence_id}", response_model=SequenceResponse)
async def update_sequence(
sequence_id: UUID,
data: SequenceUpdateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Update sequence metadata."""
seq = (await db.execute(
select(Sequence).where(Sequence.id == sequence_id, Sequence.tenant_id == current_user.tenant_id)
)).scalar_one_or_none()
if not seq:
raise HTTPException(status_code=404, detail="التسلسل غير موجود")
for field, val in data.model_dump(exclude_none=True).items():
setattr(seq, field, val)
await db.flush()
await db.refresh(seq)
logger.info("Sequence updated: id=%s", sequence_id)
return SequenceResponse.model_validate(seq)
@router.post("/{sequence_id}/enroll", response_model=EnrollmentResponse, status_code=status.HTTP_201_CREATED)
async def enroll_lead(
sequence_id: UUID,
data: EnrollRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Enroll a lead into a sequence."""
seq = (await db.execute(
select(Sequence).where(Sequence.id == sequence_id, Sequence.tenant_id == current_user.tenant_id)
)).scalar_one_or_none()
if not seq:
raise HTTPException(status_code=404, detail="التسلسل غير موجود")
if not seq.is_active:
raise HTTPException(status_code=400, detail="التسلسل غير نشط")
engine = SequenceEngine(db)
try:
enrollment = await engine.enroll_lead(EnrollInput(sequence_id=sequence_id, lead_id=data.lead_id))
except ValueError as exc:
raise HTTPException(status_code=409, detail=str(exc))
return EnrollmentResponse.model_validate(enrollment)
@router.delete("/{sequence_id}/enrollments/{enrollment_id}", status_code=status.HTTP_200_OK)
async def stop_enrollment(
sequence_id: UUID,
enrollment_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Stop an active enrollment."""
enrollment = (await db.execute(
select(SequenceEnrollment).where(
SequenceEnrollment.id == enrollment_id,
SequenceEnrollment.sequence_id == sequence_id,
)
)).scalar_one_or_none()
if not enrollment:
raise HTTPException(status_code=404, detail="التسجيل غير موجود")
engine = SequenceEngine(db)
await engine.stop_enrollment(enrollment_id)
return {"detail": "تم إيقاف التسجيل بنجاح", "enrollment_id": str(enrollment_id)}
@router.get("/{sequence_id}/analytics", response_model=SequenceAnalytics)
async def get_analytics(
sequence_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Detailed analytics for a sequence."""
seq = (await db.execute(
select(Sequence).where(Sequence.id == sequence_id, Sequence.tenant_id == current_user.tenant_id)
)).scalar_one_or_none()
if not seq:
raise HTTPException(status_code=404, detail="التسلسل غير موجود")
engine = SequenceEngine(db)
return await engine.get_sequence_analytics(sequence_id)

View File

@ -0,0 +1,403 @@
"""
Arabic Conversation Intelligence Analyzes WhatsApp/email threads
to extract insights, buying/risk signals, and next-best-action recommendations.
"""
import json
import logging
import re
from dataclasses import dataclass, field
from typing import Optional
from app.services.llm.provider import get_llm
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class BuyingSignal:
phrase: str
confidence: float
signal_type: str # "explicit", "implicit"
@dataclass
class RiskSignal:
phrase: str
risk_type: str # "price_objection", "competitor", "hesitation", "delay"
severity: str # "low", "medium", "high"
@dataclass
class ActionItem:
description_ar: str
description_en: str
priority: str # "high", "medium", "low"
due_hint: Optional[str] = None # "today", "this_week", "next_week"
@dataclass
class ConversationInsight:
summary_ar: str
summary_en: str
key_topics: list[str] = field(default_factory=list)
buying_signals: list[BuyingSignal] = field(default_factory=list)
risk_signals: list[RiskSignal] = field(default_factory=list)
objections: list[str] = field(default_factory=list)
action_items: list[ActionItem] = field(default_factory=list)
next_best_action_ar: str = ""
next_best_action_en: str = ""
quality_score: float = 0.0 # 0.0 - 10.0
message_count: int = 0
dominant_language: str = "ar"
# ---------------------------------------------------------------------------
# Pattern constants
# ---------------------------------------------------------------------------
BUYING_SIGNAL_PATTERNS = [
(r"أبي\s*عرض\s*سعر", "explicit", 0.9),
(r"كم\s*السعر", "explicit", 0.85),
(r"متى\s*تقدرون\s*تبد[وأ]ون", "explicit", 0.9),
(r"أبي\s*أشتري", "explicit", 0.95),
(r"نبي\s*ن[شس]تري", "explicit", 0.95),
(r"عطوني\s*عرض", "explicit", 0.85),
(r"ودي\s*آخذ", "explicit", 0.8),
(r"أبغى\s*أطلب", "explicit", 0.9),
(r"جاهز[ية]?\s*نبدأ", "explicit", 0.95),
(r"كيف\s*طريقة\s*الدفع", "implicit", 0.8),
(r"فيه\s*ضمان", "implicit", 0.7),
(r"عندكم\s*تجربة\s*مجانية", "implicit", 0.6),
(r"متى\s*يوصل", "implicit", 0.7),
(r"أبي\s*أعرف\s*أكثر", "implicit", 0.5),
(r"send\s*(?:me\s*)?(?:a\s*)?quot(?:e|ation)", "explicit", 0.85),
(r"how\s*(?:much|soon)", "implicit", 0.7),
(r"ready\s*to\s*(?:start|buy|proceed)", "explicit", 0.95),
]
RISK_SIGNAL_PATTERNS = [
(r"غالي", "price_objection", "medium", 0.8),
(r"فيه\s*أرخص", "competitor", "high", 0.85),
(r"بفكر", "hesitation", "medium", 0.7),
(r"مو\s*متأكد", "hesitation", "high", 0.8),
(r"خلني\s*أستشير", "delay", "medium", 0.6),
(r"أرجع\s*لك", "delay", "medium", 0.5),
(r"مو\s*الحين", "delay", "medium", 0.7),
(r"ما\s*عندي\s*ميزانية", "price_objection", "high", 0.9),
(r"المنافس\s*يعطينا\s*أحسن", "competitor", "high", 0.9),
(r"نستخدم\s*نظام\s*ثاني", "competitor", "medium", 0.7),
(r"ما\s*شفت\s*فايدة", "hesitation", "high", 0.85),
(r"too\s*expensive", "price_objection", "medium", 0.8),
(r"not\s*sure", "hesitation", "medium", 0.7),
(r"(?:need|let)\s*(?:me\s*)?think", "delay", "medium", 0.6),
]
# ---------------------------------------------------------------------------
# Service
# ---------------------------------------------------------------------------
class ConversationIntelligence:
"""Analyzes Arabic/English conversation threads for sales intelligence."""
def __init__(self):
self._llm = get_llm()
async def analyze_conversation(
self, messages: list[dict], context: Optional[dict] = None
) -> ConversationInsight:
"""
Analyze a conversation thread.
Args:
messages: List of {"role": "lead"|"agent", "content": str, "timestamp": str, "channel": str}
context: Optional lead/deal context {"lead_name", "company", "industry", "stage"}
Returns:
ConversationInsight with full analysis.
"""
context = context or {}
if not messages:
return ConversationInsight(
summary_ar="لا توجد رسائل للتحليل",
summary_en="No messages to analyze",
message_count=0,
)
# Regex-based signal extraction (fast)
buying_signals = self._extract_buying_signals(messages)
risk_signals = self._extract_risk_signals(messages)
# LLM-based deep analysis
try:
llm_insight = await self._llm_analyze(messages, context)
except Exception as e:
logger.warning(f"LLM conversation analysis failed: {e}")
llm_insight = {}
# Merge regex and LLM results
quality_score = self._calculate_quality_score(messages, buying_signals, risk_signals)
# Combine LLM action items with defaults
action_items = self._parse_action_items(llm_insight.get("action_items", []))
if not action_items:
action_items = self._generate_default_actions(buying_signals, risk_signals)
return ConversationInsight(
summary_ar=llm_insight.get("summary_ar", self._build_fallback_summary_ar(messages)),
summary_en=llm_insight.get("summary_en", self._build_fallback_summary_en(messages)),
key_topics=llm_insight.get("key_topics", []),
buying_signals=buying_signals,
risk_signals=risk_signals,
objections=llm_insight.get("objections", []),
action_items=action_items,
next_best_action_ar=llm_insight.get(
"next_best_action_ar",
self._default_next_action_ar(buying_signals, risk_signals),
),
next_best_action_en=llm_insight.get("next_best_action_en", "Follow up with the lead"),
quality_score=round(quality_score, 1),
message_count=len(messages),
dominant_language=self._detect_dominant_language(messages),
)
# ── Regex Signal Extraction ──────────────────
def _extract_buying_signals(self, messages: list[dict]) -> list[BuyingSignal]:
"""Extract buying signals from conversation using regex patterns."""
signals = []
lead_texts = " ".join(
m.get("content", "") for m in messages if m.get("role") == "lead"
)
seen_phrases = set()
for pattern, signal_type, confidence in BUYING_SIGNAL_PATTERNS:
for match in re.finditer(pattern, lead_texts, re.IGNORECASE):
phrase = match.group(0).strip()
if phrase not in seen_phrases:
seen_phrases.add(phrase)
signals.append(BuyingSignal(
phrase=phrase,
confidence=confidence,
signal_type=signal_type,
))
return signals
def _extract_risk_signals(self, messages: list[dict]) -> list[RiskSignal]:
"""Extract risk signals from conversation using regex patterns."""
signals = []
lead_texts = " ".join(
m.get("content", "") for m in messages if m.get("role") == "lead"
)
seen_phrases = set()
for pattern, risk_type, severity, _confidence in RISK_SIGNAL_PATTERNS:
for match in re.finditer(pattern, lead_texts, re.IGNORECASE):
phrase = match.group(0).strip()
if phrase not in seen_phrases:
seen_phrases.add(phrase)
signals.append(RiskSignal(
phrase=phrase,
risk_type=risk_type,
severity=severity,
))
return signals
# ── LLM Deep Analysis ────────────────────────
async def _llm_analyze(self, messages: list[dict], context: dict) -> dict:
"""Use LLM for deep conversation analysis."""
thread_text = self._format_thread(messages)
context_str = ""
if context:
context_str = (
f"معلومات العميل: {context.get('lead_name', 'غير معروف')}, "
f"الشركة: {context.get('company', 'غير معروف')}, "
f"القطاع: {context.get('industry', 'غير محدد')}, "
f"المرحلة: {context.get('stage', 'غير محدد')}"
)
system_prompt = (
"أنت محلل محادثات مبيعات خبير في السوق السعودي.\n"
"حلل المحادثة التالية واستخرج:\n"
"1. ملخص المحادثة بالعربي والإنجليزي\n"
"2. المواضيع الرئيسية\n"
"3. الاعتراضات التي طرحها العميل\n"
"4. المهام والإجراءات المطلوبة\n"
"5. أفضل إجراء تالي\n\n"
f"{context_str}\n\n"
"أجب بصيغة JSON بالضبط:\n"
"{\n"
' "summary_ar": "ملخص بالعربي",\n'
' "summary_en": "English summary",\n'
' "key_topics": ["موضوع1", "موضوع2"],\n'
' "objections": ["اعتراض1", "اعتراض2"],\n'
' "action_items": [\n'
' {"description_ar": "وصف", "description_en": "desc", "priority": "high|medium|low", "due_hint": "today|this_week|next_week"}\n'
" ],\n"
' "next_best_action_ar": "الإجراء التالي بالعربي",\n'
' "next_best_action_en": "Next action in English"\n'
"}"
)
response = await self._llm.complete(
system_prompt=system_prompt,
user_message=thread_text,
json_mode=True,
temperature=0.2,
max_tokens=1024,
)
parsed = response.parse_json()
return parsed or {}
# ── Quality Scoring ──────────────────────────
def _calculate_quality_score(
self,
messages: list[dict],
buying_signals: list[BuyingSignal],
risk_signals: list[RiskSignal],
) -> float:
"""Calculate conversation quality score (0-10)."""
score = 5.0 # baseline
# Message volume factor
msg_count = len(messages)
if msg_count >= 10:
score += 1.0
elif msg_count >= 5:
score += 0.5
# Two-way engagement
lead_msgs = sum(1 for m in messages if m.get("role") == "lead")
agent_msgs = sum(1 for m in messages if m.get("role") == "agent")
if lead_msgs > 0 and agent_msgs > 0:
ratio = min(lead_msgs, agent_msgs) / max(lead_msgs, agent_msgs)
score += ratio * 1.5 # balanced conversation = higher quality
# Buying signals boost
score += min(len(buying_signals) * 0.5, 2.0)
# Risk signals penalty
high_risks = sum(1 for r in risk_signals if r.severity == "high")
score -= min(high_risks * 0.5, 2.0)
# Average message length (longer = more engaged)
avg_len = sum(len(m.get("content", "")) for m in messages) / max(msg_count, 1)
if avg_len > 100:
score += 0.5
return max(0.0, min(10.0, score))
# ── Helpers ──────────────────────────────────
@staticmethod
def _format_thread(messages: list[dict]) -> str:
"""Format messages into a readable thread for LLM."""
lines = []
for m in messages[-30:]: # limit to last 30 messages
role = "العميل" if m.get("role") == "lead" else "المندوب"
timestamp = m.get("timestamp", "")
channel = m.get("channel", "")
prefix = f"[{timestamp}] [{channel}] {role}" if timestamp else f"[{channel}] {role}"
lines.append(f"{prefix}: {m.get('content', '')}")
return "\n".join(lines)
@staticmethod
def _detect_dominant_language(messages: list[dict]) -> str:
"""Quick check on whether the conversation is mostly Arabic or English."""
arabic_re = re.compile(r"[\u0600-\u06FF]")
arabic_chars = 0
total_chars = 0
for m in messages:
content = m.get("content", "")
arabic_chars += len(arabic_re.findall(content))
total_chars += len(content)
if total_chars == 0:
return "ar"
return "ar" if (arabic_chars / total_chars) > 0.3 else "en"
@staticmethod
def _parse_action_items(raw_items: list) -> list[ActionItem]:
"""Parse LLM action items into ActionItem objects."""
items = []
for item in raw_items:
if isinstance(item, dict):
items.append(ActionItem(
description_ar=item.get("description_ar", ""),
description_en=item.get("description_en", ""),
priority=item.get("priority", "medium"),
due_hint=item.get("due_hint"),
))
return items
@staticmethod
def _generate_default_actions(
buying_signals: list[BuyingSignal], risk_signals: list[RiskSignal]
) -> list[ActionItem]:
"""Generate default actions when LLM is unavailable."""
actions = []
if buying_signals:
explicit = [s for s in buying_signals if s.signal_type == "explicit"]
if explicit:
actions.append(ActionItem(
description_ar="العميل أبدى رغبة شرائية واضحة — أرسل عرض سعر فوراً",
description_en="Lead showed explicit buying intent - send proposal immediately",
priority="high",
due_hint="today",
))
high_risks = [r for r in risk_signals if r.severity == "high"]
if high_risks:
risk_types = set(r.risk_type for r in high_risks)
if "price_objection" in risk_types:
actions.append(ActionItem(
description_ar="العميل يشوف السعر غالي — جهّز مقارنة قيمة وعرض خاص",
description_en="Price objection detected - prepare value comparison and discount offer",
priority="high",
due_hint="today",
))
if "competitor" in risk_types:
actions.append(ActionItem(
description_ar="العميل يقارن بالمنافسين — جهّز مقارنة تنافسية",
description_en="Competitor comparison detected - prepare competitive analysis",
priority="high",
due_hint="today",
))
if not actions:
actions.append(ActionItem(
description_ar="تابع مع العميل برسالة واتساب ودية",
description_en="Follow up with a friendly WhatsApp message",
priority="medium",
due_hint="this_week",
))
return actions
@staticmethod
def _default_next_action_ar(
buying_signals: list[BuyingSignal], risk_signals: list[RiskSignal]
) -> str:
if any(s.signal_type == "explicit" for s in buying_signals):
return "العميل جاهز! أرسل عرض سعر مخصص واتصل خلال ساعة."
high_risks = [r for r in risk_signals if r.severity == "high"]
if high_risks:
return "انتبه — فيه إشارات خطر. عالج الاعتراضات قبل المتابعة."
if buying_signals:
return "فيه اهتمام. أرسل معلومات إضافية وحدد موعد عرض."
return "تابع المحادثة واسأل عن احتياجات العميل."
@staticmethod
def _build_fallback_summary_ar(messages: list[dict]) -> str:
count = len(messages)
lead_count = sum(1 for m in messages if m.get("role") == "lead")
return f"محادثة مكونة من {count} رسالة ({lead_count} من العميل). لم يتم تحليل المحتوى بالتفصيل."
@staticmethod
def _build_fallback_summary_en(messages: list[dict]) -> str:
count = len(messages)
lead_count = sum(1 for m in messages if m.get("role") == "lead")
return f"Conversation with {count} messages ({lead_count} from lead). Detailed analysis unavailable."

View File

@ -0,0 +1,436 @@
"""
AI Message Writer Generates personalized WhatsApp, Email, and SMS messages
in Arabic and English with tone control, A/B variants, and Saudi business-hour awareness.
"""
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from typing import Optional
from app.services.llm.provider import get_llm
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class MessageVariant:
content: str
subject: Optional[str] = None # for email only
variant_label: str = "A"
estimated_read_time_sec: int = 0
@dataclass
class MessageDraft:
channel: str # "whatsapp", "email", "sms"
language: str # "ar", "en"
tone: str
variants: list[MessageVariant] = field(default_factory=list)
best_send_time: Optional[str] = None
best_send_day: Optional[str] = None
personalization_used: list[str] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
CHANNEL_LIMITS = {
"whatsapp": 4096,
"sms": 160,
"email": 10000,
}
TONE_INSTRUCTIONS = {
"formal": {
"ar": (
"استخدم لغة رسمية واحترافية. خاطب العميل بـ'حضرتكم'. "
"ابدأ بالسلام الرسمي. تجنب العامية."
),
"en": "Use formal, professional language. Address the recipient respectfully.",
},
"friendly": {
"ar": (
"استخدم لغة ودية وعفوية. خاطب العميل بـ'أنت'. "
"استخدم تعبيرات سعودية طبيعية مثل 'هلا والله' و'يعطيك العافية'."
),
"en": "Use a warm, friendly tone. Be conversational and approachable.",
},
"urgent": {
"ar": (
"استخدم لغة مباشرة تحث على الإسراع. "
"أكد على محدودية العرض أو ضرورة التصرف السريع بدون مبالغة."
),
"en": "Create urgency without being pushy. Emphasize time-limited opportunity.",
},
"follow_up": {
"ar": (
"ذكّر العميل بالمحادثة السابقة بلطف. "
"اسأل إذا عنده أسئلة. كن مهذب وغير ملح."
),
"en": "Gently remind about previous conversation. Ask if they have questions.",
},
}
INDUSTRY_CONTEXT = {
"real_estate": {
"ar": "القطاع العقاري — استخدم مصطلحات مثل: وحدة سكنية، مخطط، صك، موقع استراتيجي، عائد استثماري",
"en": "Real estate — use terms like: residential unit, strategic location, ROI, property value",
},
"healthcare": {
"ar": "القطاع الصحي — استخدم مصطلحات مثل: عيادة، مجمع طبي، تأمين، موعد، رعاية صحية",
"en": "Healthcare — use terms like: clinic, medical complex, insurance, appointment, care quality",
},
"retail": {
"ar": "قطاع التجزئة — استخدم مصطلحات مثل: نقاط البيع، المخزون، تجربة العميل، موسم التخفيضات",
"en": "Retail — use terms like: POS, inventory, customer experience, sales season",
},
"education": {
"ar": "قطاع التعليم — استخدم مصطلحات مثل: تسجيل، رسوم دراسية، منهج، فصل دراسي",
"en": "Education — use terms like: enrollment, tuition, curriculum, academic term",
},
"automotive": {
"ar": "قطاع السيارات — استخدم مصطلحات مثل: معرض، وكالة، تمويل، صيانة، موديل",
"en": "Automotive — use terms like: showroom, dealership, financing, maintenance, model",
},
"hospitality": {
"ar": "قطاع الضيافة — استخدم مصطلحات مثل: حجز، جناح، باقة، تجربة ضيافة، موسم سياحي",
"en": "Hospitality — use terms like: booking, suite, package, guest experience, peak season",
},
}
# Saudi Arabia timezone: UTC+3
SAUDI_TZ_OFFSET = timedelta(hours=3)
# Optimal send windows (Saudi time, 24h)
SEND_WINDOWS = {
"whatsapp": {"start": 9, "end": 21, "peak": [10, 14, 19]},
"email": {"start": 8, "end": 17, "peak": [9, 11, 14]},
"sms": {"start": 9, "end": 20, "peak": [10, 13, 18]},
}
# Saudi work week: Sunday (6) through Thursday (3)
SAUDI_WORK_DAYS = {6, 0, 1, 2, 3} # Sunday=6, Mon=0, Tue=1, Wed=2, Thu=3
# ---------------------------------------------------------------------------
# Service
# ---------------------------------------------------------------------------
class MessageWriter:
"""Generates personalized, culturally-aware sales messages."""
def __init__(self):
self._llm = get_llm()
async def write_message(
self,
channel: str,
tone: str,
lead_data: dict,
context: Optional[dict] = None,
language: str = "ar",
) -> MessageDraft:
"""
Generate a sales message with A/B variants.
Args:
channel: "whatsapp", "email", or "sms"
tone: "formal", "friendly", "urgent", "follow_up"
lead_data: {"name", "company", "industry", "stage", "city", "last_contact"}
context: Optional {"deal_value", "product", "previous_topic", "objection"}
language: "ar" or "en"
Returns:
MessageDraft with two A/B variants and send-time recommendation.
"""
context = context or {}
channel = channel.lower()
tone = tone.lower()
language = language.lower() if language else "ar"
if channel not in CHANNEL_LIMITS:
channel = "whatsapp"
if tone not in TONE_INSTRUCTIONS:
tone = "friendly"
if language not in ("ar", "en"):
language = "ar"
# Build the prompt
system_prompt = self._build_system_prompt(channel, tone, lead_data, context, language)
user_prompt = self._build_user_prompt(channel, tone, lead_data, context, language)
# Generate both variants in one LLM call
try:
response = await self._llm.complete(
system_prompt=system_prompt,
user_message=user_prompt,
json_mode=True,
temperature=0.7,
max_tokens=2048,
)
parsed = response.parse_json()
if parsed:
variants = self._parse_variants(parsed, channel)
else:
raise ValueError("Failed to parse LLM response")
except Exception as e:
logger.warning(f"LLM message generation failed: {e}")
variants = self._fallback_variants(channel, tone, lead_data, language)
# Calculate best send time
best_time, best_day = self._calculate_best_send_time(channel)
# Track which personalization fields were used
personalization = [
k for k in ("name", "company", "industry", "city", "stage")
if lead_data.get(k)
]
return MessageDraft(
channel=channel,
language=language,
tone=tone,
variants=variants,
best_send_time=best_time,
best_send_day=best_day,
personalization_used=personalization,
metadata={
"max_length": CHANNEL_LIMITS[channel],
"lead_name": lead_data.get("name", ""),
"industry": lead_data.get("industry", ""),
},
)
# ── Prompt Construction ──────────────────────
def _build_system_prompt(
self, channel: str, tone: str, lead_data: dict, context: dict, language: str
) -> str:
lang_key = language if language in ("ar", "en") else "ar"
tone_instruction = TONE_INSTRUCTIONS.get(tone, TONE_INSTRUCTIONS["friendly"])[lang_key]
industry = (lead_data.get("industry") or "").lower().replace(" ", "_")
industry_note = ""
if industry in INDUSTRY_CONTEXT:
industry_note = INDUSTRY_CONTEXT[industry][lang_key]
char_limit = CHANNEL_LIMITS[channel]
if language == "ar":
prompt = (
"أنت كاتب رسائل مبيعات محترف متخصص في السوق السعودي.\n"
f"القناة: {channel} (حد أقصى {char_limit} حرف)\n"
f"النبرة: {tone_instruction}\n"
)
if industry_note:
prompt += f"القطاع: {industry_note}\n"
prompt += (
"\nقواعد مهمة:\n"
"- لا تستخدم لهجة مصرية أو شامية\n"
"- استخدم 'ريال' للعملة\n"
"- راعي ثقافة الأعمال السعودية\n"
"- اكتب رسالتين مختلفتين (A و B) لاختبار A/B\n"
"- الرسالة يجب أن تكون كاملة وجاهزة للإرسال\n"
)
if channel == "email":
prompt += "- أضف عنوان بريد مناسب لكل رسالة\n"
else:
prompt = (
"You are a professional sales message writer for the Saudi market.\n"
f"Channel: {channel} (max {char_limit} characters)\n"
f"Tone: {tone_instruction}\n"
)
if industry_note:
prompt += f"Industry: {industry_note}\n"
prompt += (
"\nRules:\n"
"- Write two different message variants (A and B) for A/B testing\n"
"- Messages must be complete and ready to send\n"
"- Be culturally aware of Saudi business norms\n"
)
if channel == "email":
prompt += "- Include an email subject line for each variant\n"
prompt += (
"\nأجب بصيغة JSON فقط:\n"
"{\n"
' "variant_a": {"content": "...", "subject": "..." },\n'
' "variant_b": {"content": "...", "subject": "..." }\n'
"}\n"
"subject مطلوب فقط للبريد الإلكتروني. للواتساب والرسائل اتركه فارغ."
)
return prompt
def _build_user_prompt(
self, channel: str, tone: str, lead_data: dict, context: dict, language: str
) -> str:
name = lead_data.get("name", "العميل" if language == "ar" else "Customer")
company = lead_data.get("company", "")
industry = lead_data.get("industry", "")
stage = lead_data.get("stage", "")
city = lead_data.get("city", "")
last_contact = lead_data.get("last_contact", "")
deal_value = context.get("deal_value", "")
product = context.get("product", "")
previous_topic = context.get("previous_topic", "")
objection = context.get("objection", "")
if language == "ar":
parts = [f"اكتب رسالة {channel} للعميل:"]
parts.append(f"- الاسم: {name}")
if company:
parts.append(f"- الشركة: {company}")
if industry:
parts.append(f"- القطاع: {industry}")
if stage:
parts.append(f"- مرحلة البيع: {stage}")
if city:
parts.append(f"- المدينة: {city}")
if last_contact:
parts.append(f"- آخر تواصل: {last_contact}")
if deal_value:
parts.append(f"- قيمة الصفقة: {deal_value} ريال")
if product:
parts.append(f"- المنتج: {product}")
if previous_topic:
parts.append(f"- الموضوع السابق: {previous_topic}")
if objection:
parts.append(f"- اعتراض العميل: {objection}")
else:
parts = [f"Write a {channel} message for:"]
parts.append(f"- Name: {name}")
if company:
parts.append(f"- Company: {company}")
if industry:
parts.append(f"- Industry: {industry}")
if stage:
parts.append(f"- Stage: {stage}")
if city:
parts.append(f"- City: {city}")
if last_contact:
parts.append(f"- Last contact: {last_contact}")
if deal_value:
parts.append(f"- Deal value: {deal_value} SAR")
if product:
parts.append(f"- Product: {product}")
if previous_topic:
parts.append(f"- Previous topic: {previous_topic}")
if objection:
parts.append(f"- Objection: {objection}")
return "\n".join(parts)
# ── Response Parsing ─────────────────────────
def _parse_variants(self, parsed: dict, channel: str) -> list[MessageVariant]:
"""Parse LLM JSON response into MessageVariant objects."""
variants = []
char_limit = CHANNEL_LIMITS[channel]
for key, label in [("variant_a", "A"), ("variant_b", "B")]:
variant_data = parsed.get(key, {})
if isinstance(variant_data, str):
content = variant_data
subject = None
else:
content = variant_data.get("content", "")
subject = variant_data.get("subject") if channel == "email" else None
# Truncate if over limit
if len(content) > char_limit:
content = content[:char_limit - 3] + "..."
read_time = max(1, len(content) // 200 * 10) # ~200 chars / 10 sec
variants.append(MessageVariant(
content=content,
subject=subject,
variant_label=label,
estimated_read_time_sec=read_time,
))
return variants
def _fallback_variants(
self, channel: str, tone: str, lead_data: dict, language: str
) -> list[MessageVariant]:
"""Generate basic fallback messages when LLM is unavailable."""
name = lead_data.get("name", "")
company = lead_data.get("company", "")
if language == "ar":
greeting = "السلام عليكم" if tone == "formal" else "هلا والله"
if tone == "follow_up":
text_a = f"{greeting} {name}، كيف حالك؟ حبيت أتابع معك بخصوص محادثتنا السابقة. هل عندك أي أسئلة؟"
text_b = f"{greeting} {name}، يعطيك العافية! تواصلنا قبل وحبيت أشوف إذا تحتاج أي شي ثاني."
elif tone == "urgent":
text_a = f"{greeting} {name}، عندنا عرض خاص ينتهي قريب. تبي أرسل لك التفاصيل؟"
text_b = f"{greeting} {name}، فرصة محدودة متاحة الحين. هل تحب نتكلم عنها؟"
else:
text_a = f"{greeting} {name}، يسعدنا تواصلك! كيف نقدر نساعدك اليوم؟"
text_b = f"{greeting} {name}، حياك الله! عندنا حلول ممتازة تناسب احتياجاتك."
else:
if tone == "follow_up":
text_a = f"Hi {name}, following up on our previous conversation. Do you have any questions?"
text_b = f"Hello {name}, just checking in. Would you like to continue our discussion?"
elif tone == "urgent":
text_a = f"Hi {name}, we have a limited-time offer. Shall I share the details?"
text_b = f"Hello {name}, a special opportunity is available now. Interested?"
else:
text_a = f"Hi {name}, great to connect! How can we help you today?"
text_b = f"Hello {name}, we have solutions tailored to your needs. Let's chat!"
return [
MessageVariant(content=text_a, variant_label="A", estimated_read_time_sec=10),
MessageVariant(content=text_b, variant_label="B", estimated_read_time_sec=10),
]
# ── Send Time Calculation ────────────────────
def _calculate_best_send_time(self, channel: str) -> tuple[str, str]:
"""Calculate best send time based on Saudi business hours."""
now_utc = datetime.now(timezone.utc)
now_saudi = now_utc + SAUDI_TZ_OFFSET
window = SEND_WINDOWS.get(channel, SEND_WINDOWS["whatsapp"])
peak_hours = window["peak"]
# Find the next available peak hour
current_hour = now_saudi.hour
current_weekday = now_saudi.weekday()
# Check today first
if current_weekday in SAUDI_WORK_DAYS:
for peak in peak_hours:
if peak > current_hour:
best_time = f"{peak:02d}:00 (توقيت السعودية)"
day_names_ar = {
6: "الأحد", 0: "الاثنين", 1: "الثلاثاء",
2: "الأربعاء", 3: "الخميس",
}
best_day = day_names_ar.get(current_weekday, "اليوم")
return best_time, best_day
# Next work day
days_ahead = 1
while days_ahead <= 7:
next_day = (current_weekday + days_ahead) % 7
if next_day in SAUDI_WORK_DAYS:
best_time = f"{peak_hours[0]:02d}:00 (توقيت السعودية)"
day_names_ar = {
6: "الأحد", 0: "الاثنين", 1: "الثلاثاء",
2: "الأربعاء", 3: "الخميس", 4: "الجمعة", 5: "السبت",
}
best_day = day_names_ar.get(next_day, "")
return best_time, best_day
days_ahead += 1
return f"{peak_hours[0]:02d}:00 (توقيت السعودية)", "الأحد"

View File

@ -0,0 +1,346 @@
"""
Dealix Autonomous AI Sales Agent for WhatsApp
وكيل مبيعات ذكي يعمل تلقائياً عبر الواتساب يؤهل العملاء ويحجز المواعيد
"""
import logging
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
from app.services.llm.provider import get_llm
logger = logging.getLogger("dealix.ai.sales_agent")
class ConversationState(str, Enum):
GREETING = "greeting"
QUALIFICATION = "qualification"
NEEDS_ANALYSIS = "needs_analysis"
SOLUTION_PITCH = "solution_pitch"
OBJECTION_HANDLING = "objection_handling"
CLOSE_OR_ESCALATE = "close_or_escalate"
STATE_TRANSITIONS: dict[str, list[str]] = {
"greeting": ["qualification"],
"qualification": ["needs_analysis", "close_or_escalate"],
"needs_analysis": ["solution_pitch", "close_or_escalate"],
"solution_pitch": ["objection_handling", "close_or_escalate"],
"objection_handling": ["solution_pitch", "close_or_escalate"],
"close_or_escalate": [],
}
INDUSTRY_QUALIFIERS: dict[str, list[str]] = {
"real_estate": [
"ما نوع العقار المطلوب (سكني/تجاري)؟",
"ما المنطقة أو الحي المفضل؟",
"ما الميزانية التقريبية؟",
"هل تبحث عن شراء أو إيجار؟",
],
"healthcare": [
"ما نوع الخدمة الطبية المطلوبة؟",
"هل لديك تأمين طبي؟",
"هل تفضل موعد صباحي أو مسائي؟",
],
"services": [
"ما طبيعة الخدمة المطلوبة؟",
"ما الميزانية التقريبية؟",
"ما الجدول الزمني المتوقع؟",
"هل سبق تجربة مزود خدمة آخر؟",
],
"contracting": [
"ما نوع المشروع (بناء/صيانة/تشطيبات)؟",
"ما المساحة التقريبية؟",
"ما الميزانية المخصصة؟",
"هل الموقع في الرياض أو منطقة أخرى؟",
],
"education": [
"ما البرنامج أو الدورة المطلوبة؟",
"هل تفضل حضوري أو عن بعد؟",
"ما مستوى الخبرة الحالي؟",
],
"retail": [
"ما المنتج أو الفئة المطلوبة؟",
"هل تبحث عن كميات تجارية أو شخصية؟",
"ما المنطقة لغرض التوصيل؟",
],
}
ESCALATION_TRIGGERS = [
"أبي أكلم مدير",
"أبي أتكلم مع شخص",
"أبي موظف",
"ما فهمت",
"مشكلة كبيرة",
"شكوى",
"غاضب",
"مستعجل جداً",
]
class AgentContext(BaseModel):
lead_id: str = ""
phone: str = ""
name: str = ""
industry: str = "services"
state: str = ConversationState.GREETING.value
history: list[dict] = Field(default_factory=list)
qualification_data: dict = Field(default_factory=dict)
questions_asked: int = 0
escalated: bool = False
meeting_proposed: bool = False
class AgentResponse(BaseModel):
reply: str
new_state: str
should_escalate: bool = False
meeting_suggested: bool = False
qualification_complete: bool = False
extracted_data: dict = Field(default_factory=dict)
class SalesAgent:
"""Autonomous WhatsApp sales agent with Arabic dialogue and state machine."""
def __init__(self):
self.llm = get_llm()
self._contexts: dict[str, AgentContext] = {}
async def handle_message(
self,
phone: str,
message: str,
lead_id: str = "",
name: str = "",
industry: str = "services",
) -> AgentResponse:
"""Main entry: process an inbound WhatsApp message and produce a reply."""
ctx = self._get_or_create_context(phone, lead_id, name, industry)
ctx.history.append({"role": "user", "content": message, "ts": _now_iso()})
if await self.should_escalate(message, ctx):
ctx.escalated = True
ctx.state = ConversationState.CLOSE_OR_ESCALATE.value
reply = (
f"حياك الله {ctx.name or ''}، أقدّر تواصلك. "
"بحوّلك الحين لأحد مستشارينا المتخصصين يخدمك بشكل أفضل. لحظات من فضلك."
)
ctx.history.append({"role": "assistant", "content": reply, "ts": _now_iso()})
return AgentResponse(
reply=reply,
new_state=ctx.state,
should_escalate=True,
)
reply, extracted = await self._generate_state_reply(ctx, message)
ctx.qualification_data.update(extracted)
ctx.history.append({"role": "assistant", "content": reply, "ts": _now_iso()})
next_state = await self._determine_next_state(ctx, message)
ctx.state = next_state
qualification_complete = ctx.questions_asked >= len(
INDUSTRY_QUALIFIERS.get(ctx.industry, INDUSTRY_QUALIFIERS["services"])
)
meeting_suggested = False
if qualification_complete and not ctx.meeting_proposed:
meeting_reply = await self.suggest_meeting(ctx)
reply += f"\n\n{meeting_reply}"
ctx.meeting_proposed = True
meeting_suggested = True
return AgentResponse(
reply=reply,
new_state=ctx.state,
should_escalate=False,
meeting_suggested=meeting_suggested,
qualification_complete=qualification_complete,
extracted_data=extracted,
)
async def qualify_lead(self, ctx: AgentContext) -> dict:
"""Return current qualification status and gathered data."""
qualifiers = INDUSTRY_QUALIFIERS.get(ctx.industry, INDUSTRY_QUALIFIERS["services"])
return {
"lead_id": ctx.lead_id,
"industry": ctx.industry,
"state": ctx.state,
"questions_total": len(qualifiers),
"questions_asked": ctx.questions_asked,
"qualification_data": ctx.qualification_data,
"is_qualified": ctx.questions_asked >= len(qualifiers),
}
async def suggest_meeting(self, ctx: AgentContext) -> str:
"""Generate a meeting suggestion message."""
prompt = (
"أنت مساعد مبيعات في السوق السعودي. اقترح موعد اجتماع للعميل "
"بأسلوب ودود ومهني بالسعودية. اذكر 2-3 أوقات متاحة هذا الأسبوع. "
"اجعل الرد مختصراً (3 أسطر كحد أقصى)."
)
resp = await self.llm.complete(
system_prompt=prompt,
user_message=f"اسم العميل: {ctx.name}\nالقطاع: {ctx.industry}",
temperature=0.6,
max_tokens=150,
)
return resp.content.strip()
async def should_escalate(self, message: str, ctx: AgentContext) -> bool:
"""Determine if message requires human handoff."""
msg_lower = message.strip()
for trigger in ESCALATION_TRIGGERS:
if trigger in msg_lower:
logger.info("Escalation triggered for %s: matched '%s'", ctx.phone, trigger)
return True
if len(ctx.history) > 10 and ctx.state == ConversationState.OBJECTION_HANDLING.value:
return True
return False
async def generate_follow_up(self, phone: str, days_dormant: int = 3) -> Optional[str]:
"""Generate a follow-up message for a dormant lead."""
ctx = self._contexts.get(phone)
if not ctx:
return None
prompt = (
"أنت مساعد مبيعات سعودي. اكتب رسالة متابعة ودية لعميل لم يرد منذ "
f"{days_dormant} أيام. اجعلها مختصرة (سطرين) ومهتمة بدون ضغط. "
"اسم العميل: " + (ctx.name or "العميل")
)
resp = await self.llm.complete(
system_prompt=prompt,
user_message=f"القطاع: {ctx.industry}. آخر حالة: {ctx.state}",
temperature=0.7,
max_tokens=100,
)
return resp.content.strip()
# ── Internal helpers ─────────────────────────────
def _get_or_create_context(
self, phone: str, lead_id: str, name: str, industry: str,
) -> AgentContext:
if phone not in self._contexts:
self._contexts[phone] = AgentContext(
lead_id=lead_id, phone=phone, name=name, industry=industry,
)
ctx = self._contexts[phone]
if name and not ctx.name:
ctx.name = name
return ctx
async def _generate_state_reply(
self, ctx: AgentContext, message: str,
) -> tuple[str, dict]:
"""Generate a reply appropriate to the current conversation state."""
qualifiers = INDUSTRY_QUALIFIERS.get(ctx.industry, INDUSTRY_QUALIFIERS["services"])
current_q = qualifiers[ctx.questions_asked] if ctx.questions_asked < len(qualifiers) else ""
state_instructions = {
ConversationState.GREETING.value: (
"رحّب بالعميل بأسلوب سعودي ودود. اسأل كيف تقدر تساعده. "
"لا تكن رسمياً أكثر من اللازم."
),
ConversationState.QUALIFICATION.value: (
f"اسأل السؤال التالي بأسلوب طبيعي: {current_q}\n"
"حاول استخلاص المعلومات من إجابة العميل."
),
ConversationState.NEEDS_ANALYSIS.value: (
"حلل احتياجات العميل وأكّد فهمك. لخّص ما فهمته واسأل إذا في شي ثاني."
),
ConversationState.SOLUTION_PITCH.value: (
"اعرض الحل المناسب بناءً على احتياجات العميل. "
"ركّز على الفوائد مع ذكر القيمة بشكل غير مباشر."
),
ConversationState.OBJECTION_HANDLING.value: (
"تعامل مع اعتراض العميل بذكاء. أعد التأطير وركّز على القيمة. "
"لا تكن دفاعياً."
),
}
instruction = state_instructions.get(
ctx.state,
"أكمل المحادثة بأسلوب مهني وودود.",
)
system = (
"أنت وكيل مبيعات ذكي لشركة سعودية. تتحدث بالعامية السعودية الراقية.\n"
"قواعدك:\n"
"1. ردود مختصرة (3-4 أسطر كحد أقصى)\n"
"2. لا تستخدم رموز تعبيرية مبالغ فيها\n"
"3. كن ودوداً ومحترفاً\n"
"4. استخلص أي معلومات ذات قيمة من رد العميل\n"
f"5. التعليمات الحالية: {instruction}\n"
"أجب بـ JSON: {\"reply\": \"...\", \"extracted\": {\"key\": \"value\"}}\n"
)
recent_history = ctx.history[-6:]
history_text = "\n".join(
f"{'العميل' if h['role'] == 'user' else 'الوكيل'}: {h['content']}"
for h in recent_history
)
user_msg = f"المحادثة السابقة:\n{history_text}\n\nرسالة العميل الجديدة: {message}"
resp = await self.llm.complete(
system_prompt=system,
user_message=user_msg,
temperature=0.6,
max_tokens=250,
json_mode=True,
)
parsed = resp.parse_json()
if parsed:
reply = parsed.get("reply", message)
extracted = parsed.get("extracted", {})
else:
reply = resp.content.strip()
extracted = {}
if ctx.state == ConversationState.QUALIFICATION.value:
ctx.questions_asked += 1
return reply, extracted
async def _determine_next_state(self, ctx: AgentContext, message: str) -> str:
"""Decide the next conversation state."""
allowed = STATE_TRANSITIONS.get(ctx.state, [])
if not allowed:
return ctx.state
qualifiers = INDUSTRY_QUALIFIERS.get(ctx.industry, INDUSTRY_QUALIFIERS["services"])
if ctx.state == ConversationState.GREETING.value:
return ConversationState.QUALIFICATION.value
if ctx.state == ConversationState.QUALIFICATION.value:
if ctx.questions_asked >= len(qualifiers):
return ConversationState.NEEDS_ANALYSIS.value
return ConversationState.QUALIFICATION.value
if ctx.state == ConversationState.NEEDS_ANALYSIS.value:
return ConversationState.SOLUTION_PITCH.value
if ctx.state == ConversationState.SOLUTION_PITCH.value:
negative_signals = ["غالي", "ما أبي", "لا", "مو مقتنع", "كثير", "فكر"]
if any(s in message for s in negative_signals):
return ConversationState.OBJECTION_HANDLING.value
return ConversationState.CLOSE_OR_ESCALATE.value
if ctx.state == ConversationState.OBJECTION_HANDLING.value:
return ConversationState.SOLUTION_PITCH.value
return ctx.state
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()

View File

@ -1,8 +1,6 @@
"""PDPL consent engine -- tracks, validates, and audits consent per Saudi data protection law.
"""PDPL consent engine -- tracks, validates, and audits consent.
Penalty for violations: up to 5,000,000 SAR per incident.
"""
import logging
from datetime import datetime, timedelta, timezone
from typing import Optional
@ -14,18 +12,13 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.models.consent import (
PDPLConsent, PDPLConsentAudit, DataRequest,
ConsentStatusEnum, ConsentPurpose, ConsentChannel,
DataRequestType, DataRequestStatus,
ConsentStatusEnum, DataRequestStatus,
)
logger = logging.getLogger(__name__)
DEFAULT_EXPIRY_MONTHS = 12
CROSS_BORDER_ALLOWED_COUNTRIES = {"SA", "AE", "BH", "KW", "OM", "QA"}
CROSS_BORDER_ALLOWED = {"SA", "AE", "BH", "KW", "OM", "QA"}
# ---------------------------------------------------------------------------
# Pydantic schemas
# ---------------------------------------------------------------------------
class ConsentGrantInput(Schema):
contact_id: UUID
@ -72,29 +65,18 @@ class AuditEntry(Schema):
purpose: str
details: dict
created_at: datetime
model_config = {"from_attributes": True}
# ---------------------------------------------------------------------------
# ConsentManager
# ---------------------------------------------------------------------------
class ConsentManager:
"""Core PDPL consent engine for Dealix CRM."""
def __init__(self, db: AsyncSession):
self.db = db
# -- grant ---------------------------------------------------------------
async def grant_consent(self, data: ConsentGrantInput) -> PDPLConsent:
"""Record a new consent grant. Existing active consent for same
contact+purpose+channel is revoked first (re-consent flow)."""
"""Grant consent. Revokes existing active consent for same triplet (re-consent)."""
now = datetime.now(timezone.utc)
# Revoke any existing active consent for same triplet (re-consent)
existing = await self._find_active(data.contact_id, data.purpose, data.channel)
if existing:
existing.status = ConsentStatusEnum.REVOKED.value
@ -104,198 +86,127 @@ class ConsentManager:
action="revoked_for_renewal", actor_id=data.actor_id,
channel=data.channel, purpose=data.purpose,
details={"reason": "re-consent on purpose change"},
ip_address=data.ip_address,
tenant_id=data.tenant_id,
ip_address=data.ip_address, tenant_id=data.tenant_id,
)
consent = PDPLConsent(
contact_id=data.contact_id,
tenant_id=data.tenant_id,
purpose=data.purpose,
channel=data.channel,
status=ConsentStatusEnum.GRANTED.value,
granted_at=now,
contact_id=data.contact_id, tenant_id=data.tenant_id,
purpose=data.purpose, channel=data.channel,
status=ConsentStatusEnum.GRANTED.value, granted_at=now,
expires_at=now + timedelta(days=30 * data.expiry_months),
ip_address=data.ip_address,
consent_text=data.consent_text,
ip_address=data.ip_address, consent_text=data.consent_text,
granted_by=data.actor_id,
)
self.db.add(consent)
await self.db.flush()
await self.db.refresh(consent)
await self._audit(
consent_id=consent.id, contact_id=data.contact_id,
action="granted", actor_id=data.actor_id,
channel=data.channel, purpose=data.purpose,
details={"expiry_months": data.expiry_months, "consent_text": data.consent_text or ""},
ip_address=data.ip_address,
tenant_id=data.tenant_id,
details={"expiry_months": data.expiry_months},
ip_address=data.ip_address, tenant_id=data.tenant_id,
)
logger.info("PDPL consent granted: contact=%s purpose=%s channel=%s", data.contact_id, data.purpose, data.channel)
logger.info("PDPL consent granted: contact=%s purpose=%s", data.contact_id, data.purpose)
return consent
# -- revoke --------------------------------------------------------------
async def revoke_consent(self, data: ConsentRevokeInput) -> PDPLConsent:
"""Revoke an existing consent immediately."""
result = await self.db.execute(
select(PDPLConsent).where(PDPLConsent.id == data.consent_id)
)
result = await self.db.execute(select(PDPLConsent).where(PDPLConsent.id == data.consent_id))
consent = result.scalar_one_or_none()
if not consent:
raise ValueError("سجل الموافقة غير موجود") # Consent record not found
raise ValueError("سجل الموافقة غير موجود")
now = datetime.now(timezone.utc)
consent.status = ConsentStatusEnum.REVOKED.value
consent.revoked_at = now
await self._audit(
consent_id=consent.id, contact_id=consent.contact_id,
action="revoked", actor_id=data.actor_id,
channel=consent.channel, purpose=consent.purpose,
details={"reason": data.reason or "user_request"},
ip_address=data.ip_address,
tenant_id=consent.tenant_id,
ip_address=data.ip_address, tenant_id=consent.tenant_id,
)
logger.info("PDPL consent revoked: id=%s contact=%s", consent.id, consent.contact_id)
logger.info("PDPL consent revoked: id=%s", consent.id)
return consent
# -- check ---------------------------------------------------------------
async def check_consent(
self,
contact_id: UUID,
purpose: str,
channel: str,
) -> ConsentCheckResult:
"""Validate consent before any outbound message. Must be called
before every send -- violations carry up to 5M SAR penalty."""
async def check_consent(self, contact_id: UUID, purpose: str, channel: str) -> ConsentCheckResult:
"""Validate consent before outbound message. 5M SAR penalty per violation."""
consent = await self._find_active(contact_id, purpose, channel)
if not consent:
return ConsentCheckResult(
allowed=False,
message=f"No active consent for {purpose}/{channel}",
allowed=False, message=f"No active consent for {purpose}/{channel}",
message_ar="لا توجد موافقة فعالة لهذا الغرض والقناة",
)
now = datetime.now(timezone.utc)
if consent.expires_at and consent.expires_at <= now:
consent.status = ConsentStatusEnum.EXPIRED.value
await self.db.flush()
return ConsentCheckResult(
allowed=False,
consent_id=consent.id,
status=ConsentStatusEnum.EXPIRED.value,
expires_at=consent.expires_at,
message="Consent expired -- re-consent required",
allowed=False, consent_id=consent.id, status=ConsentStatusEnum.EXPIRED.value,
expires_at=consent.expires_at, message="Consent expired",
message_ar="انتهت صلاحية الموافقة -- يلزم تجديد الموافقة",
)
return ConsentCheckResult(
allowed=True,
consent_id=consent.id,
status=consent.status,
expires_at=consent.expires_at,
message="Consent valid",
allowed=True, consent_id=consent.id, status=consent.status,
expires_at=consent.expires_at, message="Consent valid",
message_ar="الموافقة صالحة",
)
# -- data request --------------------------------------------------------
async def process_data_request(self, data: DataRequestInput) -> DataRequest:
"""Submit a PDPL data subject rights request."""
request = DataRequest(
contact_id=data.contact_id,
tenant_id=data.tenant_id,
request_type=data.request_type,
status=DataRequestStatus.PENDING.value,
requested_at=datetime.now(timezone.utc),
notes=data.notes,
contact_id=data.contact_id, tenant_id=data.tenant_id,
request_type=data.request_type, status=DataRequestStatus.PENDING.value,
requested_at=datetime.now(timezone.utc), notes=data.notes,
handled_by=data.actor_id,
)
self.db.add(request)
await self.db.flush()
await self.db.refresh(request)
logger.info("PDPL data request created: type=%s contact=%s", data.request_type, data.contact_id)
logger.info("PDPL data request: type=%s contact=%s", data.request_type, data.contact_id)
return request
# -- audit ---------------------------------------------------------------
async def get_consent_audit(
self,
tenant_id: UUID,
contact_id: Optional[UUID] = None,
limit: int = 100,
offset: int = 0,
self, tenant_id: UUID, contact_id: Optional[UUID] = None,
limit: int = 100, offset: int = 0,
) -> list[AuditEntry]:
"""Return consent audit trail filtered by tenant and optionally contact."""
"""Return consent audit trail."""
query = (
select(PDPLConsentAudit)
.where(PDPLConsentAudit.tenant_id == tenant_id)
select(PDPLConsentAudit).where(PDPLConsentAudit.tenant_id == tenant_id)
.order_by(PDPLConsentAudit.created_at.desc())
)
if contact_id:
query = query.where(PDPLConsentAudit.contact_id == contact_id)
query = query.offset(offset).limit(limit)
result = await self.db.execute(query)
result = await self.db.execute(query.offset(offset).limit(limit))
return [AuditEntry.model_validate(row) for row in result.scalars().all()]
# -- cross-border --------------------------------------------------------
@staticmethod
def check_cross_border_transfer(destination_country: str) -> ConsentCheckResult:
"""Check if data transfer to destination country is PDPL-compliant.
SDAIA requires adequate protection level or explicit consent."""
"""Check if transfer to destination is PDPL-compliant."""
code = destination_country.upper().strip()
if code in CROSS_BORDER_ALLOWED_COUNTRIES:
if code in CROSS_BORDER_ALLOWED:
return ConsentCheckResult(
allowed=True,
message=f"Transfer to {code} permitted under GCC adequacy",
allowed=True, message=f"Transfer to {code} permitted under GCC adequacy",
message_ar=f"النقل إلى {code} مسموح بموجب كفاية دول الخليج",
)
return ConsentCheckResult(
allowed=False,
message=f"Transfer to {code} requires explicit consent and SDAIA approval",
allowed=False, message=f"Transfer to {code} requires explicit consent and SDAIA approval",
message_ar=f"النقل إلى {code} يتطلب موافقة صريحة وموافقة الهيئة",
)
# -- private helpers -----------------------------------------------------
async def _find_active(
self, contact_id: UUID, purpose: str, channel: str
) -> Optional[PDPLConsent]:
async def _find_active(self, contact_id: UUID, purpose: str, channel: str) -> Optional[PDPLConsent]:
result = await self.db.execute(
select(PDPLConsent).where(
and_(
PDPLConsent.contact_id == contact_id,
PDPLConsent.purpose == purpose,
PDPLConsent.channel == channel,
PDPLConsent.status == ConsentStatusEnum.GRANTED.value,
)
).order_by(PDPLConsent.granted_at.desc()).limit(1)
select(PDPLConsent).where(and_(
PDPLConsent.contact_id == contact_id, PDPLConsent.purpose == purpose,
PDPLConsent.channel == channel, PDPLConsent.status == ConsentStatusEnum.GRANTED.value,
)).order_by(PDPLConsent.granted_at.desc()).limit(1)
)
return result.scalar_one_or_none()
async def _audit(
self, *, consent_id, contact_id, action, actor_id,
channel, purpose, details, ip_address, tenant_id,
) -> None:
entry = PDPLConsentAudit(
consent_id=consent_id,
contact_id=contact_id,
tenant_id=tenant_id,
action=action,
actor_id=actor_id,
channel=channel,
purpose=purpose,
details=details or {},
ip_address=ip_address,
)
self.db.add(entry)
async def _audit(self, *, consent_id, contact_id, action, actor_id,
channel, purpose, details, ip_address, tenant_id) -> None:
self.db.add(PDPLConsentAudit(
consent_id=consent_id, contact_id=contact_id, tenant_id=tenant_id,
action=action, actor_id=actor_id, channel=channel, purpose=purpose,
details=details or {}, ip_address=ip_address,
))
await self.db.flush()

View File

@ -1,34 +1,23 @@
"""PDPL data subject rights handler.
Implements: right to access, correction, deletion, restriction of processing.
Generates compliance reports for SDAIA audits.
Right to access, correction, deletion, restriction + SDAIA compliance reports.
"""
import logging
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from uuid import UUID
from pydantic import BaseModel as Schema
from sqlalchemy import select, func, and_
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.consent import (
PDPLConsent, PDPLConsentAudit, DataRequest,
DataRequestStatus, DataRequestType,
)
from app.models.consent import PDPLConsent, DataRequest, DataRequestStatus, DataRequestType
from app.models.lead import Lead
from app.models.message import Message
logger = logging.getLogger(__name__)
HARD_DELETE_DELAY_DAYS = 30
# ---------------------------------------------------------------------------
# Pydantic schemas
# ---------------------------------------------------------------------------
class DataExport(Schema):
contact_id: UUID
personal_data: dict
@ -40,7 +29,7 @@ class DataExport(Schema):
class CorrectionInput(Schema):
contact_id: UUID
tenant_id: UUID
corrections: dict[str, Any] # field_name -> new_value
corrections: dict[str, Any]
actor_id: Optional[UUID] = None
reason: Optional[str] = None
@ -82,29 +71,17 @@ class ComplianceReport(Schema):
violations_detected: int
# ---------------------------------------------------------------------------
# DataRightsHandler
# ---------------------------------------------------------------------------
class DataRightsHandler:
"""Handles PDPL data subject rights for Dealix contacts."""
def __init__(self, db: AsyncSession):
self.db = db
# -- export (right to access) -------------------------------------------
async def export_data(self, contact_id: UUID, tenant_id: UUID) -> DataExport:
"""Export all personal data held for a contact as structured JSON."""
"""Export all personal data held for a contact (right to access)."""
lead = await self._get_lead(contact_id, tenant_id)
# Consent records
consents_q = await self.db.execute(
select(PDPLConsent).where(
PDPLConsent.contact_id == contact_id,
PDPLConsent.tenant_id == tenant_id,
)
select(PDPLConsent).where(PDPLConsent.contact_id == contact_id, PDPLConsent.tenant_id == tenant_id)
)
consents = [
{"purpose": c.purpose, "channel": c.channel, "status": c.status,
@ -112,241 +89,138 @@ class DataRightsHandler:
"expires_at": c.expires_at.isoformat() if c.expires_at else None}
for c in consents_q.scalars().all()
]
# Messages
msgs_q = await self.db.execute(
select(Message).where(Message.lead_id == contact_id).limit(500)
)
msgs_q = await self.db.execute(select(Message).where(Message.lead_id == contact_id).limit(500))
messages = [
{"channel": m.channel, "direction": m.direction,
"content": m.content, "sent_at": m.sent_at.isoformat() if m.sent_at else None}
{"channel": m.channel, "direction": m.direction, "content": m.content,
"sent_at": m.sent_at.isoformat() if m.sent_at else None}
for m in msgs_q.scalars().all()
]
personal = {
"name": lead.name,
"phone": lead.phone,
"email": lead.email,
"source": lead.source,
"status": lead.status,
"score": lead.score,
"notes": lead.notes,
}
logger.info("PDPL data export completed: contact=%s", contact_id)
logger.info("PDPL data export: contact=%s", contact_id)
return DataExport(
contact_id=contact_id,
personal_data=personal,
consents=consents,
messages=messages,
exported_at=datetime.now(timezone.utc),
personal_data={"name": lead.name, "phone": lead.phone, "email": lead.email,
"source": lead.source, "status": lead.status, "score": lead.score, "notes": lead.notes},
consents=consents, messages=messages, exported_at=datetime.now(timezone.utc),
)
# -- correction ----------------------------------------------------------
async def correct_data(self, data: CorrectionInput) -> CorrectionResult:
"""Update personal data fields with full audit trail."""
"""Update personal data fields with audit trail."""
lead = await self._get_lead(data.contact_id, data.tenant_id)
allowed_fields = {"name", "phone", "email", "notes"}
previous: dict[str, Any] = {}
updated_fields: list[str] = []
updated: list[str] = []
for field, new_val in data.corrections.items():
if field not in allowed_fields:
logger.warning("PDPL correction rejected for field=%s", field)
continue
previous[field] = getattr(lead, field, None)
setattr(lead, field, new_val)
updated_fields.append(field)
updated.append(field)
await self.db.flush()
# Audit via data request record
req = DataRequest(
contact_id=data.contact_id,
tenant_id=data.tenant_id,
request_type=DataRequestType.CORRECTION.value,
status=DataRequestStatus.COMPLETED.value,
requested_at=datetime.now(timezone.utc),
completed_at=datetime.now(timezone.utc),
response_data={"corrections": data.corrections, "previous": previous, "reason": data.reason},
handled_by=data.actor_id,
)
self.db.add(req)
self.db.add(DataRequest(
contact_id=data.contact_id, tenant_id=data.tenant_id,
request_type=DataRequestType.CORRECTION.value, status=DataRequestStatus.COMPLETED.value,
requested_at=datetime.now(timezone.utc), completed_at=datetime.now(timezone.utc),
response_data={"corrections": data.corrections, "previous": previous}, handled_by=data.actor_id,
))
await self.db.flush()
logger.info("PDPL correction: contact=%s fields=%s", data.contact_id, updated)
return CorrectionResult(contact_id=data.contact_id, fields_updated=updated,
previous_values=previous, updated_at=datetime.now(timezone.utc))
logger.info("PDPL data correction: contact=%s fields=%s", data.contact_id, updated_fields)
return CorrectionResult(
contact_id=data.contact_id,
fields_updated=updated_fields,
previous_values=previous,
updated_at=datetime.now(timezone.utc),
)
# -- deletion (right to erasure) ----------------------------------------
async def delete_data(self, contact_id: UUID, tenant_id: UUID, actor_id: Optional[UUID] = None) -> DeletionResult:
"""Soft-delete contact now; schedule hard-delete after 30 days."""
async def delete_data(self, contact_id: UUID, tenant_id: UUID,
actor_id: Optional[UUID] = None) -> DeletionResult:
"""Soft-delete contact; schedule hard-delete after 30 days."""
lead = await self._get_lead(contact_id, tenant_id)
now = datetime.now(timezone.utc)
hard_delete_at = now + timedelta(days=HARD_DELETE_DELAY_DAYS)
# Soft-delete: mark status and clear PII
hard_at = now + timedelta(days=HARD_DELETE_DELAY_DAYS)
lead.status = "deleted"
lead.notes = f"[PDPL deletion requested {now.isoformat()}] " + (lead.notes or "")
lead.extra_metadata = {
**(lead.extra_metadata or {}),
"_pdpl_soft_deleted": True,
"_pdpl_hard_delete_at": hard_delete_at.isoformat(),
}
# Revoke all active consents
consents_q = await self.db.execute(
select(PDPLConsent).where(
PDPLConsent.contact_id == contact_id,
PDPLConsent.tenant_id == tenant_id,
PDPLConsent.status == "granted",
)
lead.notes = f"[PDPL deletion {now.isoformat()}] " + (lead.notes or "")
lead.extra_metadata = {**(lead.extra_metadata or {}),
"_pdpl_soft_deleted": True, "_pdpl_hard_delete_at": hard_at.isoformat()}
# Revoke active consents
cq = await self.db.execute(
select(PDPLConsent).where(PDPLConsent.contact_id == contact_id,
PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "granted")
)
for consent in consents_q.scalars().all():
consent.status = "revoked"
consent.revoked_at = now
# Record the request
req = DataRequest(
contact_id=contact_id,
tenant_id=tenant_id,
request_type=DataRequestType.DELETION.value,
status=DataRequestStatus.PROCESSING.value,
requested_at=now,
response_data={"hard_delete_at": hard_delete_at.isoformat()},
handled_by=actor_id,
)
self.db.add(req)
for c in cq.scalars().all():
c.status = "revoked"
c.revoked_at = now
self.db.add(DataRequest(
contact_id=contact_id, tenant_id=tenant_id,
request_type=DataRequestType.DELETION.value, status=DataRequestStatus.PROCESSING.value,
requested_at=now, response_data={"hard_delete_at": hard_at.isoformat()}, handled_by=actor_id,
))
await self.db.flush()
logger.info("PDPL deletion scheduled: contact=%s hard_delete=%s", contact_id, hard_delete_at)
logger.info("PDPL deletion: contact=%s hard_delete=%s", contact_id, hard_at)
return DeletionResult(
contact_id=contact_id,
status="soft_deleted",
soft_deleted_at=now,
hard_delete_scheduled=hard_delete_at,
message=f"Contact soft-deleted. Hard delete scheduled for {hard_delete_at.date()}",
message_ar=f"تم حذف جهة الاتصال مبدئيًا. الحذف النهائي مجدول بتاريخ {hard_delete_at.date()}",
contact_id=contact_id, status="soft_deleted", soft_deleted_at=now,
hard_delete_scheduled=hard_at,
message=f"Hard delete scheduled for {hard_at.date()}",
message_ar=f"الحذف النهائي مجدول بتاريخ {hard_at.date()}",
)
# -- restriction ---------------------------------------------------------
async def restrict_processing(
self, contact_id: UUID, tenant_id: UUID, actor_id: Optional[UUID] = None
) -> RestrictionResult:
"""Flag a contact as restricted -- no outbound processing allowed."""
async def restrict_processing(self, contact_id: UUID, tenant_id: UUID,
actor_id: Optional[UUID] = None) -> RestrictionResult:
"""Flag contact as restricted -- no outbound processing allowed."""
lead = await self._get_lead(contact_id, tenant_id)
lead.extra_metadata = {
**(lead.extra_metadata or {}),
"_pdpl_restricted": True,
"_pdpl_restricted_at": datetime.now(timezone.utc).isoformat(),
}
req = DataRequest(
contact_id=contact_id,
tenant_id=tenant_id,
request_type=DataRequestType.RESTRICTION.value,
status=DataRequestStatus.COMPLETED.value,
requested_at=datetime.now(timezone.utc),
completed_at=datetime.now(timezone.utc),
response_data={"restricted": True},
handled_by=actor_id,
)
self.db.add(req)
lead.extra_metadata = {**(lead.extra_metadata or {}),
"_pdpl_restricted": True,
"_pdpl_restricted_at": datetime.now(timezone.utc).isoformat()}
self.db.add(DataRequest(
contact_id=contact_id, tenant_id=tenant_id,
request_type=DataRequestType.RESTRICTION.value, status=DataRequestStatus.COMPLETED.value,
requested_at=datetime.now(timezone.utc), completed_at=datetime.now(timezone.utc),
response_data={"restricted": True}, handled_by=actor_id,
))
await self.db.flush()
logger.info("PDPL processing restricted: contact=%s", contact_id)
logger.info("PDPL restriction: contact=%s", contact_id)
return RestrictionResult(
contact_id=contact_id,
restricted=True,
message="Contact processing restricted per PDPL request",
message_ar="تم تقييد معالجة بيانات جهة الاتصال وفقًا لطلب نظام حماية البيانات",
contact_id=contact_id, restricted=True,
message="Contact processing restricted per PDPL",
message_ar="تم تقييد معالجة بيانات جهة الاتصال وفقًا لنظام حماية البيانات",
)
# -- compliance report ---------------------------------------------------
async def generate_compliance_report(self, tenant_id: UUID) -> ComplianceReport:
"""Generate SDAIA-ready compliance report for a tenant."""
"""Generate SDAIA-ready compliance report."""
now = datetime.now(timezone.utc)
# Consent counts
total = (await self.db.execute(
select(func.count()).where(PDPLConsent.tenant_id == tenant_id)
)).scalar() or 0
select(func.count()).where(PDPLConsent.tenant_id == tenant_id))).scalar() or 0
active = (await self.db.execute(
select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "granted")
)).scalar() or 0
select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "granted"))).scalar() or 0
revoked = (await self.db.execute(
select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "revoked")
)).scalar() or 0
select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "revoked"))).scalar() or 0
expired = (await self.db.execute(
select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "expired")
)).scalar() or 0
# Data requests
select(func.count()).where(PDPLConsent.tenant_id == tenant_id, PDPLConsent.status == "expired"))).scalar() or 0
pending = (await self.db.execute(
select(func.count()).where(DataRequest.tenant_id == tenant_id, DataRequest.status == "pending")
)).scalar() or 0
select(func.count()).where(DataRequest.tenant_id == tenant_id, DataRequest.status == "pending"))).scalar() or 0
completed = (await self.db.execute(
select(func.count()).where(DataRequest.tenant_id == tenant_id, DataRequest.status == "completed")
)).scalar() or 0
# Breakdown by type
select(func.count()).where(DataRequest.tenant_id == tenant_id, DataRequest.status == "completed"))).scalar() or 0
type_rows = (await self.db.execute(
select(DataRequest.request_type, func.count())
.where(DataRequest.tenant_id == tenant_id)
.group_by(DataRequest.request_type)
)).all()
by_type = {row[0]: row[1] for row in type_rows}
# Avg resolution time
select(DataRequest.request_type, func.count()).where(DataRequest.tenant_id == tenant_id)
.group_by(DataRequest.request_type))).all()
by_type = {r[0]: r[1] for r in type_rows}
# Average resolution time
avg_hours: Optional[float] = None
completed_reqs = (await self.db.execute(
select(DataRequest).where(
DataRequest.tenant_id == tenant_id,
DataRequest.status == "completed",
DataRequest.completed_at.isnot(None),
).limit(500)
)).scalars().all()
if completed_reqs:
deltas = [
(r.completed_at - r.requested_at).total_seconds() / 3600
for r in completed_reqs if r.completed_at and r.requested_at
]
done = (await self.db.execute(
select(DataRequest).where(DataRequest.tenant_id == tenant_id, DataRequest.status == "completed",
DataRequest.completed_at.isnot(None)).limit(500))).scalars().all()
if done:
deltas = [(r.completed_at - r.requested_at).total_seconds() / 3600
for r in done if r.completed_at and r.requested_at]
avg_hours = round(sum(deltas) / len(deltas), 2) if deltas else None
logger.info("PDPL compliance report generated: tenant=%s", tenant_id)
logger.info("PDPL report generated: tenant=%s", tenant_id)
return ComplianceReport(
tenant_id=tenant_id,
generated_at=now,
total_consents=total,
active_consents=active,
revoked_consents=revoked,
expired_consents=expired,
pending_requests=pending,
completed_requests=completed,
requests_by_type=by_type,
avg_resolution_hours=avg_hours,
violations_detected=0,
tenant_id=tenant_id, generated_at=now, total_consents=total,
active_consents=active, revoked_consents=revoked, expired_consents=expired,
pending_requests=pending, completed_requests=completed,
requests_by_type=by_type, avg_resolution_hours=avg_hours, violations_detected=0,
)
# -- private helpers -----------------------------------------------------
async def _get_lead(self, contact_id: UUID, tenant_id: UUID) -> Lead:
result = await self.db.execute(
select(Lead).where(Lead.id == contact_id, Lead.tenant_id == tenant_id)
)
select(Lead).where(Lead.id == contact_id, Lead.tenant_id == tenant_id))
lead = result.scalar_one_or_none()
if not lead:
raise ValueError("جهة الاتصال غير موجودة") # Contact not found
raise ValueError("جهة الاتصال غير موجودة")
return lead

View File

@ -0,0 +1,281 @@
"""
Dealix Saudi Territory Manager
إدارة المناطق وتوزيع العملاء على مندوبي المبيعات تلقائياً
"""
import logging
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel, Field
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.lead import Lead
from app.models.user import User
from app.models.deal import Deal
logger = logging.getLogger("dealix.territory")
SAUDI_REGIONS: dict[str, dict] = {
"riyadh": {
"name_ar": "الرياض",
"name_en": "Riyadh",
"cities_ar": ["الرياض", "الخرج", "الدرعية", "المجمعة"],
},
"jeddah": {
"name_ar": "جدة",
"name_en": "Jeddah",
"cities_ar": ["جدة", "رابغ", "الليث"],
},
"eastern": {
"name_ar": "المنطقة الشرقية",
"name_en": "Eastern Province",
"cities_ar": ["الدمام", "الخبر", "الظهران", "الجبيل", "الأحساء", "القطيف"],
},
"makkah": {
"name_ar": "مكة المكرمة",
"name_en": "Makkah",
"cities_ar": ["مكة المكرمة", "الطائف"],
},
"madinah": {
"name_ar": "المدينة المنورة",
"name_en": "Madinah",
"cities_ar": ["المدينة المنورة", "ينبع"],
},
"asir": {
"name_ar": "عسير",
"name_en": "Asir",
"cities_ar": ["أبها", "خميس مشيط", "النماص"],
},
"qassim": {
"name_ar": "القصيم",
"name_en": "Qassim",
"cities_ar": ["بريدة", "عنيزة", "الرس"],
},
"tabuk": {
"name_ar": "تبوك",
"name_en": "Tabuk",
"cities_ar": ["تبوك", "ضبا", "الوجه"],
},
"hail": {
"name_ar": "حائل",
"name_en": "Hail",
"cities_ar": ["حائل", "بقعاء"],
},
"jazan": {
"name_ar": "جازان",
"name_en": "Jazan",
"cities_ar": ["جازان", "صبيا", "أبو عريش"],
},
"najran": {
"name_ar": "نجران",
"name_en": "Najran",
"cities_ar": ["نجران", "شرورة"],
},
"baha": {
"name_ar": "الباحة",
"name_en": "Al Baha",
"cities_ar": ["الباحة", "بلجرشي"],
},
"jouf": {
"name_ar": "الجوف",
"name_en": "Al Jouf",
"cities_ar": ["سكاكا", "دومة الجندل"],
},
}
class TerritoryAssignment(BaseModel):
territory_key: str
rep_ids: list[str] = Field(default_factory=list)
round_robin_index: int = 0
class TerritoryStats(BaseModel):
territory_key: str
name_ar: str
name_en: str
total_leads: int = 0
total_deals: int = 0
total_value: float = 0.0
win_rate: float = 0.0
reps_count: int = 0
class TerritoryManager:
"""Territory-based lead routing and performance analytics for Saudi regions."""
def __init__(self, db: AsyncSession):
self.db = db
self._assignments: dict[str, TerritoryAssignment] = {}
async def assign_territory(
self, territory_key: str, rep_ids: list[str],
) -> dict:
"""Assign sales reps to a territory."""
if territory_key not in SAUDI_REGIONS:
raise ValueError(f"منطقة غير معروفة: {territory_key}")
self._assignments[territory_key] = TerritoryAssignment(
territory_key=territory_key,
rep_ids=rep_ids,
round_robin_index=0,
)
region = SAUDI_REGIONS[territory_key]
logger.info(
"Territory '%s' assigned to %d reps", territory_key, len(rep_ids),
)
return {
"territory": territory_key,
"name_ar": region["name_ar"],
"name_en": region["name_en"],
"reps_assigned": len(rep_ids),
"rep_ids": rep_ids,
}
async def auto_route_lead(
self,
tenant_id: str,
lead_id: str,
region_key: Optional[str] = None,
city_hint: Optional[str] = None,
) -> dict:
"""Auto-assign a lead to the next rep in the matching territory via round-robin."""
territory_key = region_key
if not territory_key and city_hint:
territory_key = self._detect_territory(city_hint)
if not territory_key:
territory_key = "riyadh"
assignment = self._assignments.get(territory_key)
if not assignment or not assignment.rep_ids:
logger.warning(
"No reps assigned to territory '%s', falling back to riyadh",
territory_key,
)
assignment = self._assignments.get("riyadh")
territory_key = "riyadh"
if not assignment or not assignment.rep_ids:
return {
"lead_id": lead_id,
"assigned_to": None,
"territory": territory_key,
"error_ar": "لا يوجد مندوبين معينين لهذه المنطقة",
}
rep_id = assignment.rep_ids[assignment.round_robin_index % len(assignment.rep_ids)]
assignment.round_robin_index += 1
import uuid
result = await self.db.execute(
select(Lead).where(
Lead.id == uuid.UUID(lead_id),
Lead.tenant_id == uuid.UUID(tenant_id),
)
)
lead = result.scalar_one_or_none()
if lead:
lead.assigned_to = uuid.UUID(rep_id)
metadata = dict(lead.extra_metadata or {})
metadata["territory"] = territory_key
metadata["auto_routed_at"] = datetime.now(timezone.utc).isoformat()
lead.extra_metadata = metadata
await self.db.flush()
region = SAUDI_REGIONS.get(territory_key, {})
logger.info("Lead %s routed to rep %s in %s", lead_id, rep_id, territory_key)
return {
"lead_id": lead_id,
"assigned_to": rep_id,
"territory": territory_key,
"territory_name_ar": region.get("name_ar", ""),
}
async def get_territory_stats(
self, tenant_id: str, territory_key: Optional[str] = None,
) -> list[TerritoryStats]:
"""Get performance analytics per territory."""
import uuid
keys = [territory_key] if territory_key else list(SAUDI_REGIONS.keys())
stats_list: list[TerritoryStats] = []
for key in keys:
region = SAUDI_REGIONS.get(key)
if not region:
continue
assignment = self._assignments.get(key)
rep_ids = assignment.rep_ids if assignment else []
if not rep_ids:
stats_list.append(TerritoryStats(
territory_key=key,
name_ar=region["name_ar"],
name_en=region["name_en"],
))
continue
rep_uuids = [uuid.UUID(r) for r in rep_ids]
tid = uuid.UUID(tenant_id)
lead_count_q = select(func.count()).where(
Lead.tenant_id == tid,
Lead.assigned_to.in_(rep_uuids),
)
total_leads = (await self.db.execute(lead_count_q)).scalar() or 0
deals_q = select(func.count(), func.coalesce(func.sum(Deal.value), 0)).where(
Deal.tenant_id == tid,
Deal.assigned_to.in_(rep_uuids),
)
row = (await self.db.execute(deals_q)).one_or_none()
total_deals = row[0] if row else 0
total_value = float(row[1]) if row else 0.0
won_q = select(func.count()).where(
Deal.tenant_id == tid,
Deal.assigned_to.in_(rep_uuids),
Deal.stage == "closed_won",
)
won_count = (await self.db.execute(won_q)).scalar() or 0
win_rate = round((won_count / total_deals) * 100, 1) if total_deals > 0 else 0.0
stats_list.append(TerritoryStats(
territory_key=key,
name_ar=region["name_ar"],
name_en=region["name_en"],
total_leads=total_leads,
total_deals=total_deals,
total_value=total_value,
win_rate=win_rate,
reps_count=len(rep_ids),
))
return stats_list
def list_regions(self) -> list[dict]:
"""Return all Saudi regions with metadata."""
return [
{
"key": key,
"name_ar": info["name_ar"],
"name_en": info["name_en"],
"cities_ar": info["cities_ar"],
"reps_assigned": len(self._assignments.get(key, TerritoryAssignment(territory_key=key)).rep_ids),
}
for key, info in SAUDI_REGIONS.items()
]
def _detect_territory(self, city_hint: str) -> Optional[str]:
"""Detect territory from a city name hint (Arabic or English)."""
hint_lower = city_hint.strip().lower()
for key, info in SAUDI_REGIONS.items():
if hint_lower in info["name_en"].lower() or hint_lower == key:
return key
for city in info["cities_ar"]:
if city in city_hint:
return key
return None

View File

@ -0,0 +1,118 @@
{
"industry": "contracting",
"name": "Contracting & Services",
"name_ar": "مقاولات وخدمات",
"pipeline_stages": [
{"key": "inquiry", "name_en": "Inquiry", "name_ar": "استفسار", "order": 1, "probability": 10},
{"key": "site_visit", "name_en": "Site Visit", "name_ar": "زيارة موقع", "order": 2, "probability": 25},
{"key": "quotation", "name_en": "Quotation", "name_ar": "عرض سعر", "order": 3, "probability": 40},
{"key": "negotiation", "name_en": "Negotiation", "name_ar": "تفاوض", "order": 4, "probability": 60},
{"key": "contract", "name_en": "Contract", "name_ar": "عقد", "order": 5, "probability": 80},
{"key": "execution", "name_en": "Execution", "name_ar": "تنفيذ", "order": 6, "probability": 90},
{"key": "completion", "name_en": "Completion", "name_ar": "إنجاز", "order": 7, "probability": 100}
],
"message_templates": [
{
"name": "welcome",
"name_ar": "رسالة ترحيب",
"channel": "whatsapp",
"trigger": "lead_created",
"content_ar": "أهلاً {name}! شكراً لتواصلك مع {company}. متخصصين في أعمال المقاولات والصيانة. وش نوع المشروع اللي تحتاجه؟",
"content_en": "Hello {name}! Thank you for contacting {company}. We specialize in contracting and maintenance. What type of project do you need?",
"delay_minutes": 0
},
{
"name": "site_visit_confirmation",
"name_ar": "تأكيد زيارة الموقع",
"channel": "whatsapp",
"trigger": "stage_change_site_visit",
"content_ar": "مرحباً {name}، تم تحديد موعد زيارة الموقع يوم {date} الساعة {time}. فريقنا الفني بيكون موجود للمعاينة وأخذ المقاسات. الموقع: {location}",
"content_en": "Hi {name}, site visit confirmed for {date} at {time}. Our technical team will be there for inspection. Location: {location}",
"delay_minutes": 0
},
{
"name": "quotation_ready",
"name_ar": "عرض السعر جاهز",
"channel": "whatsapp",
"trigger": "stage_change_quotation",
"content_ar": "مرحباً {name}، عرض السعر جاهز لمشروعك. الإجمالي: {total_amount} ريال شامل الضريبة. يشمل المواد والعمالة والضمان. نرسل لك التفاصيل الكاملة؟",
"content_en": "Hi {name}, your quotation is ready. Total: {total_amount} SAR including VAT. Includes materials, labor, and warranty. Shall we send the full details?",
"delay_minutes": 0
},
{
"name": "no_response_followup",
"name_ar": "متابعة عدم الرد",
"channel": "whatsapp",
"trigger": "no_response",
"content_ar": "مرحباً {name}، تواصلنا معك بخصوص مشروع {project_type}. إذا عندك أي استفسار أو تبي تعدّل على العرض، لا تتردد تراسلنا. نحن هنا لخدمتك.",
"content_en": "Hi {name}, we reached out regarding your {project_type} project. If you have questions or want to modify the quote, don't hesitate to reach out.",
"delay_minutes": 4320
},
{
"name": "execution_update",
"name_ar": "تحديث التنفيذ",
"channel": "whatsapp",
"trigger": "stage_change_execution",
"content_ar": "مرحباً {name}، نبشرك إن العمل بدأ في مشروعك. المدة المتوقعة: {duration}. بنرسل لك تحديثات دورية عن سير العمل. لأي استفسار تواصل معنا.",
"content_en": "Hi {name}, work has started on your project. Expected duration: {duration}. We'll send periodic progress updates.",
"delay_minutes": 0
}
],
"proposal_templates": [
{
"name": "project_quotation",
"name_ar": "عرض سعر مشروع",
"sections": [
{"title_ar": "نطاق العمل", "title_en": "Scope of Work"},
{"title_ar": "المواد والمواصفات", "title_en": "Materials & Specifications"},
{"title_ar": "الجدول الزمني", "title_en": "Timeline"},
{"title_ar": "التكلفة التفصيلية", "title_en": "Detailed Cost Breakdown"},
{"title_ar": "شروط الدفع", "title_en": "Payment Terms"},
{"title_ar": "الضمان", "title_en": "Warranty"},
{"title_ar": "الشروط والأحكام", "title_en": "Terms & Conditions"}
]
},
{
"name": "maintenance_contract",
"name_ar": "عقد صيانة",
"sections": [
{"title_ar": "نطاق خدمات الصيانة", "title_en": "Maintenance Scope"},
{"title_ar": "جدول الزيارات الدورية", "title_en": "Periodic Visit Schedule"},
{"title_ar": "قطع الغيار المشمولة", "title_en": "Included Spare Parts"},
{"title_ar": "الرسوم السنوية", "title_en": "Annual Fees"},
{"title_ar": "مدة العقد والتجديد", "title_en": "Contract Duration & Renewal"}
]
}
],
"workflow_templates": [
{
"name": "new_project_flow",
"name_ar": "تدفق مشروع جديد",
"trigger": "lead_created",
"actions": [
{"type": "send_message", "template": "welcome", "delay_minutes": 0},
{"type": "create_task", "subject": "اتصل بالعميل وحدد نوع المشروع", "delay_minutes": 15},
{"type": "send_message", "template": "no_response_followup", "delay_minutes": 4320, "condition": "no_response"}
]
},
{
"name": "site_visit_flow",
"name_ar": "تدفق زيارة الموقع",
"trigger": "stage_change_site_visit",
"actions": [
{"type": "send_message", "template": "site_visit_confirmation", "delay_minutes": 0},
{"type": "create_task", "subject": "تجهيز فريق المعاينة الفنية", "delay_minutes": 60},
{"type": "create_task", "subject": "إعداد عرض السعر بعد المعاينة", "delay_minutes": 1440}
]
},
{
"name": "execution_monitoring_flow",
"name_ar": "تدفق متابعة التنفيذ",
"trigger": "stage_change_execution",
"actions": [
{"type": "send_message", "template": "execution_update", "delay_minutes": 0},
{"type": "create_task", "subject": "تحديث العميل بتقرير أسبوعي", "delay_minutes": 10080}
]
}
]
}