system-prompts-and-models-o.../salesflow-saas/backend/app/workers/notification_tasks.py
Claude 84762f08ab
Add complete launch infrastructure: models, APIs, agents, compliance, docs, knowledge base
Phase 1 - Repo Hardening:
- README.md, LICENSE, SECURITY.md, CONTRIBUTING.md
- GitHub Actions repo-hygiene workflow
- docs/: ARCHITECTURE, DATA-MODEL, API-MAP, AGENT-MAP, DEPLOYMENT-NOTES

Phase 2 - Database Models (7 new):
- Company, Contact, Call, Commission, Payout, Dispute, GuaranteeClaim
- Consent, Complaint, Policy, KnowledgeArticle, SectorAsset
- Updated models/__init__.py with all 32+ models

Phase 3 - API Surfaces (16 new route files):
- companies, contacts, calls, meetings, commissions, payouts
- disputes, guarantees, consents, complaints, knowledge
- sectors, presentations, supervisor, admin, health
- Updated router.py with all 24 route groups

Phase 4 - AI Prompt Registry (18 agent contracts):
- Lead Qualification, Affiliate Recruitment Evaluator, Onboarding Coach
- Outreach Writer, Arabic WhatsApp, English Conversation, Voice Call
- Meeting Booking, Sector Strategist, Objection Handler
- Proposal Drafter, QA Reviewer, Compliance Reviewer
- Knowledge Retrieval, Revenue Attribution, Fraud Reviewer
- Guarantee Claim Reviewer, Management Summary

Phase 5 - Communication Templates:
- 15 production templates (WhatsApp, email, voice, internal)
- Arabic + English variants with variable interpolation

Phase 6 - Compliance Center (7 legal docs):
- Privacy policy, Terms of service, Refund policy
- Commission policy, Affiliate rules, Consent policy, Data protection
- All PDPL-compliant, Arabic

Phase 7 - Celery Workers (fully implemented):
- follow_up_tasks: automated lead follow-ups with workflow execution
- message_tasks: WhatsApp/email/SMS with retry logic
- notification_tasks: daily reports, meeting reminders, in-app notifications
- affiliate_tasks: target checking, commission calculation, weekly reports, AI outreach

Phase 8 - Knowledge Base OS (8 files):
- Services overview, Pricing policy, Channel policy, Meeting policy
- Identity rules, Escalation rules, Hiring path, Internal SOPs

https://claude.ai/code/session_01KnJgK7RwyeCvRZTRThHtfU
2026-03-31 07:57:48 +00:00

200 lines
6.8 KiB
Python

from app.workers.celery_app import celery_app
from app.config import get_settings
from app.database import SessionLocal
from datetime import datetime, timezone, timedelta
import logging
logger = logging.getLogger(__name__)
settings = get_settings()
@celery_app.task(name="app.workers.notification_tasks.send_daily_report")
def send_daily_report():
"""Generate and send daily sales report to all active tenants."""
from app.models.tenant import Tenant
from app.models.lead import Lead
from app.models.deal import Deal
from app.models.user import User
from sqlalchemy import select, func, and_
logger.info("Generating daily reports...")
with SessionLocal() as db:
now = datetime.now(timezone.utc)
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_start = today_start - timedelta(days=1)
tenants = db.execute(
select(Tenant).where(Tenant.is_active == True)
).scalars().all()
reports_sent = 0
for tenant in tenants:
# Gather daily stats
new_leads = db.execute(
select(func.count(Lead.id)).where(
and_(Lead.tenant_id == tenant.id, Lead.created_at >= today_start)
)
).scalar() or 0
deals_closed = db.execute(
select(func.count(Deal.id)).where(
and_(
Deal.tenant_id == tenant.id,
Deal.stage == "closed_won",
Deal.updated_at >= today_start,
)
)
).scalar() or 0
revenue_today = db.execute(
select(func.sum(Deal.value)).where(
and_(
Deal.tenant_id == tenant.id,
Deal.stage == "closed_won",
Deal.updated_at >= today_start,
)
)
).scalar() or 0
total_pipeline = db.execute(
select(func.sum(Deal.value)).where(
and_(
Deal.tenant_id == tenant.id,
Deal.stage.notin_(["closed_won", "closed_lost"]),
)
)
).scalar() or 0
# Build report
report = f"""📊 تقرير Dealix اليومي - {now.strftime('%Y-%m-%d')}
🆕 عملاء جدد: {new_leads}
✅ صفقات مغلقة: {deals_closed}
💰 إيرادات اليوم: {revenue_today:,.0f} ر.س
📈 إجمالي الفرص المفتوحة: {total_pipeline:,.0f} ر.س
Dealix - ديل اي اكس
مبيعاتك تشتغل وأنت ترتاح"""
# Send to tenant owner
owner = db.execute(
select(User).where(
and_(User.tenant_id == tenant.id, User.role == "owner")
)
).scalars().first()
if owner:
# Send via WhatsApp if phone available
if owner.phone:
from app.workers.message_tasks import send_whatsapp
send_whatsapp.delay(owner.phone, report, str(tenant.id))
# Send via email
if owner.email:
from app.workers.message_tasks import send_email
send_email.delay(
owner.email,
f"تقرير Dealix اليومي - {now.strftime('%Y-%m-%d')}",
report,
str(tenant.id),
)
# Create in-app notification
notify_user.delay(str(owner.id), "التقرير اليومي", report, "daily_report")
reports_sent += 1
logger.info(f"Daily reports sent to {reports_sent} tenants")
return {"reports_sent": reports_sent}
@celery_app.task(name="app.workers.notification_tasks.notify_user")
def notify_user(user_id: str, title: str, body: str, notification_type: str = "info"):
"""Create an in-app notification for a user."""
from app.models.notification import Notification
logger.info(f"Creating notification for user {user_id}: {title}")
with SessionLocal() as db:
notification = Notification(
user_id=user_id,
type=notification_type,
title=title,
body=body,
is_read=False,
metadata={"created_by": "system"},
)
db.add(notification)
db.commit()
return {"status": "created", "user_id": user_id}
@celery_app.task(name="app.workers.notification_tasks.send_meeting_reminder")
def send_meeting_reminder():
"""Send meeting reminders 1 hour before scheduled meetings."""
from app.models.ai_conversation import AutoBooking
from sqlalchemy import select, and_
logger.info("Checking for upcoming meetings...")
with SessionLocal() as db:
now = datetime.now(timezone.utc)
reminder_window_start = now + timedelta(minutes=55)
reminder_window_end = now + timedelta(minutes=65)
upcoming = db.execute(
select(AutoBooking).where(
and_(
AutoBooking.status == "scheduled",
AutoBooking.meeting_datetime >= reminder_window_start,
AutoBooking.meeting_datetime <= reminder_window_end,
)
)
).scalars().all()
reminders_sent = 0
for booking in upcoming:
meeting_time = booking.meeting_datetime.strftime("%H:%M")
meeting_date = booking.meeting_datetime.strftime("%Y-%m-%d")
reminder = f"""⏰ تذكير باجتماع Dealix
📅 التاريخ: {meeting_date}
⏰ الوقت: {meeting_time}
👤 العميل: {booking.client_name}
🏢 الشركة: {booking.client_company or '-'}
📱 الجوال: {booking.client_phone or '-'}
نتطلع لمقابلتك!
Dealix"""
# Notify assigned sales rep
if booking.assigned_sales_rep:
notify_user.delay(
str(booking.assigned_sales_rep),
f"تذكير: اجتماع مع {booking.client_name}",
reminder,
"meeting_reminder",
)
# Send WhatsApp reminder to client
if booking.client_phone:
from app.workers.message_tasks import send_whatsapp
send_whatsapp.delay(
booking.client_phone,
f"مرحباً {booking.client_name}! تذكير باجتماعك مع فريق Dealix اليوم الساعة {meeting_time}. نتطلع لمقابلتك!",
str(booking.tenant_id),
)
reminders_sent += 1
logger.info(f"Sent {reminders_sent} meeting reminders")
return {"reminders_sent": reminders_sent}