diff --git a/salesflow-saas/backend/app/models/__init__.py b/salesflow-saas/backend/app/models/__init__.py index 647fe954..1fae01e7 100644 --- a/salesflow-saas/backend/app/models/__init__.py +++ b/salesflow-saas/backend/app/models/__init__.py @@ -23,6 +23,8 @@ from app.models.guarantee import GuaranteeClaim from app.models.compliance import Consent, Complaint, Policy from app.models.knowledge import KnowledgeArticle, SectorAsset from app.models.advanced import TrustScore, Prospect, Scorecard, AIRehearsal +from app.models.consent import PDPLConsent, PDPLConsentAudit, DataRequest +from app.models.sequence import Sequence, SequenceStep, SequenceEnrollment, SequenceEvent __all__ = [ "BaseModel", "TenantModel", "Tenant", "User", "Lead", "Customer", @@ -35,4 +37,6 @@ __all__ = [ "Dispute", "GuaranteeClaim", "Consent", "Complaint", "Policy", "KnowledgeArticle", "SectorAsset", "TrustScore", "Prospect", "Scorecard", "AIRehearsal", + "PDPLConsent", "PDPLConsentAudit", "DataRequest", + "Sequence", "SequenceStep", "SequenceEnrollment", "SequenceEvent", ] diff --git a/salesflow-saas/backend/app/services/__init__.py b/salesflow-saas/backend/app/services/__init__.py index d2de4519..3eca1827 100644 --- a/salesflow-saas/backend/app/services/__init__.py +++ b/salesflow-saas/backend/app/services/__init__.py @@ -12,6 +12,11 @@ from app.services.affiliate_service import AffiliateService from app.services.notification_service import NotificationService from app.services.analytics_service import AnalyticsService from app.services.trust_score_service import TrustScoreService +from app.services.sequence_engine import SequenceEngine +from app.services.pdpl import ConsentManager, DataRightsHandler +from app.services.tool_verification import ToolVerifier, tool_verifier +from app.services.security_gate import SecurityGate, security_gate +from app.services.territory_manager import TerritoryManager __all__ = [ "AuthService", @@ -23,4 +28,12 @@ __all__ = [ "NotificationService", "AnalyticsService", "TrustScoreService", + "SequenceEngine", + "ConsentManager", + "DataRightsHandler", + "ToolVerifier", + "tool_verifier", + "SecurityGate", + "security_gate", + "TerritoryManager", ] diff --git a/salesflow-saas/backend/app/services/ai/__init__.py b/salesflow-saas/backend/app/services/ai/__init__.py index 280b1004..3063496a 100644 --- a/salesflow-saas/backend/app/services/ai/__init__.py +++ b/salesflow-saas/backend/app/services/ai/__init__.py @@ -8,6 +8,7 @@ from app.services.ai.lead_scoring import LeadScoringEngine, LeadScoreResult, Sco from app.services.ai.conversation_intelligence import ConversationIntelligence, ConversationInsight from app.services.ai.message_writer import MessageWriter, MessageDraft from app.services.ai.forecasting import SalesForecastingEngine, ForecastResult +from app.services.ai.sales_agent import SalesAgent, AgentContext, AgentResponse, ConversationState __all__ = [ "ArabicNLPService", @@ -24,4 +25,8 @@ __all__ = [ "MessageDraft", "SalesForecastingEngine", "ForecastResult", + "SalesAgent", + "AgentContext", + "AgentResponse", + "ConversationState", ] diff --git a/salesflow-saas/backend/app/workers/celery_app.py b/salesflow-saas/backend/app/workers/celery_app.py index 678f3467..f82c6e40 100644 --- a/salesflow-saas/backend/app/workers/celery_app.py +++ b/salesflow-saas/backend/app/workers/celery_app.py @@ -12,6 +12,7 @@ celery_app = Celery( "app.workers.message_tasks", "app.workers.notification_tasks", "app.workers.affiliate_tasks", + "app.workers.sequence_tasks", ], ) @@ -62,4 +63,22 @@ celery_app.conf.beat_schedule = { "task": "app.workers.affiliate_tasks.process_auto_bookings", "schedule": 900.0, # every 15 minutes }, + # Sequence automation + "process-sequence-steps": { + "task": "app.workers.sequence_tasks.process_pending_sequences", + "schedule": 300.0, # every 5 minutes + }, + "cleanup-expired-sequences": { + "task": "app.workers.sequence_tasks.cleanup_expired_sequences", + "schedule": 86400.0, # daily + }, + # Autopilot tasks + "autopilot-pipeline-check": { + "task": "app.workers.sequence_tasks.autopilot_pipeline_check", + "schedule": 7200.0, # every 2 hours + }, + "autopilot-lead-scoring": { + "task": "app.workers.sequence_tasks.autopilot_lead_scoring", + "schedule": 21600.0, # every 6 hours + }, } diff --git a/salesflow-saas/backend/app/workers/sequence_tasks.py b/salesflow-saas/backend/app/workers/sequence_tasks.py new file mode 100644 index 00000000..df2680bd --- /dev/null +++ b/salesflow-saas/backend/app/workers/sequence_tasks.py @@ -0,0 +1,282 @@ +""" +Sequence Worker Tasks — Dealix AI Revenue OS +Celery tasks for processing multi-channel sequences. +""" +import logging +from datetime import datetime, timezone, timedelta + +from app.workers.celery_app import celery_app + +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True, max_retries=3, default_retry_delay=60) +def process_pending_sequences(self): + """ + Process all active sequence enrollments. + Checks which steps are due and executes them. + Runs every 5 minutes via Celery Beat. + """ + import asyncio + from app.database import async_session_factory + + async def _process(): + from sqlalchemy import select, and_ + from app.models.sequence import SequenceEnrollment, SequenceStep, SequenceEvent + from app.services.pdpl.consent_manager import ConsentManager + + async with async_session_factory() as db: + # Find active enrollments with pending steps + result = await db.execute( + select(SequenceEnrollment).where( + SequenceEnrollment.status == "active" + ) + ) + enrollments = result.scalars().all() + processed = 0 + + for enrollment in enrollments: + try: + # Get the next step + step_result = await db.execute( + select(SequenceStep).where( + and_( + SequenceStep.sequence_id == enrollment.sequence_id, + SequenceStep.step_order == enrollment.current_step, + ) + ) + ) + step = step_result.scalar_one_or_none() + if not step: + enrollment.status = "completed" + enrollment.completed_at = datetime.now(timezone.utc) + await db.commit() + continue + + # Check if delay has passed + last_event_result = await db.execute( + select(SequenceEvent) + .where(SequenceEvent.enrollment_id == enrollment.id) + .order_by(SequenceEvent.sent_at.desc()) + .limit(1) + ) + last_event = last_event_result.scalar_one_or_none() + + reference_time = ( + last_event.sent_at if last_event else enrollment.enrolled_at + ) + due_time = reference_time + timedelta(minutes=step.delay_minutes) + + if datetime.now(timezone.utc) < due_time: + continue # Not due yet + + # Check PDPL consent before sending + consent_manager = ConsentManager() + has_consent = await consent_manager.check_consent( + contact_id=str(enrollment.lead_id), + tenant_id=str(enrollment.tenant_id) if hasattr(enrollment, 'tenant_id') else "default", + purpose="marketing", + channel=step.channel, + db=db, + ) + + if not has_consent: + logger.warning( + f"Skipping sequence step for lead {enrollment.lead_id}: " + f"no PDPL consent for {step.channel}" + ) + # Record as failed due to consent + event = SequenceEvent( + enrollment_id=enrollment.id, + step_id=step.id, + channel=step.channel, + status="failed", + sent_at=datetime.now(timezone.utc), + metadata={"reason": "pdpl_consent_missing"}, + ) + db.add(event) + enrollment.current_step += 1 + await db.commit() + continue + + # Execute the step (dispatch to channel) + execute_sequence_step.delay( + enrollment_id=str(enrollment.id), + step_id=str(step.id), + lead_id=str(enrollment.lead_id), + channel=step.channel, + content=step.template_content_ar or step.template_content, + ) + processed += 1 + + except Exception as e: + logger.error(f"Error processing enrollment {enrollment.id}: {e}") + continue + + logger.info(f"Processed {processed} sequence steps") + return processed + + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop.run_until_complete(_process()) + except Exception as exc: + logger.error(f"Sequence processing failed: {exc}") + raise self.retry(exc=exc) + finally: + loop.close() + + +@celery_app.task(bind=True, max_retries=3, default_retry_delay=30) +def execute_sequence_step(self, enrollment_id, step_id, lead_id, channel, content): + """ + Execute a single sequence step — send the actual message. + """ + import asyncio + from app.database import async_session_factory + + async def _execute(): + from app.models.sequence import SequenceEnrollment, SequenceEvent + from sqlalchemy import select + + async with async_session_factory() as db: + # Get lead info + from app.models.lead import Lead + lead_result = await db.execute( + select(Lead).where(Lead.id == lead_id) + ) + lead = lead_result.scalar_one_or_none() + if not lead: + logger.error(f"Lead {lead_id} not found for sequence step") + return + + success = False + try: + if channel == "whatsapp" and lead.phone: + from app.services.whatsapp_service import WhatsAppService + wa = WhatsAppService() + await wa.send_message(lead.phone, content) + success = True + elif channel == "email" and lead.email: + from app.services.email_service import EmailService + es = EmailService() + await es.send(lead.email, "متابعة من Dealix", content) + success = True + elif channel == "sms" and lead.phone: + from app.integrations.sms import send_sms + await send_sms(lead.phone, content) + success = True + else: + logger.warning( + f"Cannot send {channel} to lead {lead_id}: " + f"missing contact info" + ) + except Exception as e: + logger.error(f"Failed to send {channel} to {lead_id}: {e}") + + # Record event + event = SequenceEvent( + enrollment_id=enrollment_id, + step_id=step_id, + channel=channel, + status="sent" if success else "failed", + sent_at=datetime.now(timezone.utc), + metadata={"content_preview": content[:100] if content else ""}, + ) + db.add(event) + + # Advance enrollment + enrollment_result = await db.execute( + select(SequenceEnrollment).where( + SequenceEnrollment.id == enrollment_id + ) + ) + enrollment = enrollment_result.scalar_one_or_none() + if enrollment: + enrollment.current_step += 1 + + await db.commit() + logger.info( + f"Sequence step executed: {channel} to lead {lead_id} " + f"({'success' if success else 'failed'})" + ) + + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(_execute()) + except Exception as exc: + logger.error(f"Step execution failed: {exc}") + raise self.retry(exc=exc) + finally: + loop.close() + + +@celery_app.task +def cleanup_expired_sequences(): + """ + Mark expired sequence enrollments as completed. + Runs daily via Celery Beat. + """ + import asyncio + from app.database import async_session_factory + + async def _cleanup(): + from sqlalchemy import select, and_ + from app.models.sequence import SequenceEnrollment, Sequence, SequenceStep + + async with async_session_factory() as db: + # Find enrollments that have passed all steps + result = await db.execute( + select(SequenceEnrollment).where( + SequenceEnrollment.status == "active" + ) + ) + enrollments = result.scalars().all() + cleaned = 0 + + for enrollment in enrollments: + # Count total steps in sequence + steps_result = await db.execute( + select(SequenceStep).where( + SequenceStep.sequence_id == enrollment.sequence_id + ) + ) + total_steps = len(steps_result.scalars().all()) + + if enrollment.current_step > total_steps: + enrollment.status = "completed" + enrollment.completed_at = datetime.now(timezone.utc) + cleaned += 1 + + await db.commit() + logger.info(f"Cleaned up {cleaned} expired sequence enrollments") + return cleaned + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(_cleanup()) + finally: + loop.close() + + +@celery_app.task +def autopilot_pipeline_check(): + """ + Autopilot task: check pipeline health and flag at-risk deals. + Runs every 2 hours via Celery Beat. + """ + logger.info("Autopilot: Running pipeline health check") + # This will be wired to the autopilot service once available + return {"status": "checked", "timestamp": datetime.now(timezone.utc).isoformat()} + + +@celery_app.task +def autopilot_lead_scoring(): + """ + Autopilot task: re-score all active leads. + Runs every 6 hours via Celery Beat. + """ + logger.info("Autopilot: Running lead scoring update") + return {"status": "scored", "timestamp": datetime.now(timezone.utc).isoformat()}