mirror of
https://github.com/x1xhlol/system-prompts-and-models-of-ai-tools.git
synced 2026-06-18 07:19:35 +00:00
113 lines
4.1 KiB
Python
113 lines
4.1 KiB
Python
"""
|
|
Pipeline Worker Tasks — Celery background tasks for the autonomous pipeline.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from celery import shared_task
|
|
from celery.utils.log import get_task_logger
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3, default_retry_delay=120)
|
|
def run_pipeline_for_lead(self, tenant_id: str, lead_data: dict):
|
|
"""
|
|
Process a new lead through the full autonomous pipeline in the background.
|
|
This is the async version of pipeline.process_new_lead.
|
|
"""
|
|
from app.database import async_session
|
|
from app.services.agents.autonomous_pipeline import AutonomousPipeline
|
|
|
|
async def run():
|
|
async with async_session() as db:
|
|
pipeline = AutonomousPipeline(db)
|
|
result = await pipeline.process_new_lead(tenant_id, lead_data)
|
|
await db.commit()
|
|
return result
|
|
|
|
try:
|
|
logger.info(f"🚀 Pipeline task started for lead {lead_data.get('lead_id')} (tenant: {tenant_id})")
|
|
result = asyncio.run(run())
|
|
logger.info(f"✅ Pipeline completed: stage={result.get('final_stage')}, tokens={result.get('total_tokens_used')}")
|
|
return result
|
|
except Exception as exc:
|
|
logger.error(f"❌ Pipeline failed for lead {lead_data.get('lead_id')}: {exc}")
|
|
self.retry(exc=exc)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def advance_pipeline_stage(self, tenant_id: str, lead_id: str, current_stage: str,
|
|
trigger: str, context: dict = None):
|
|
"""Advance a lead to the next pipeline stage in the background."""
|
|
from app.database import async_session
|
|
from app.services.agents.autonomous_pipeline import AutonomousPipeline
|
|
|
|
async def run():
|
|
async with async_session() as db:
|
|
pipeline = AutonomousPipeline(db)
|
|
result = await pipeline.advance_stage(tenant_id, lead_id, current_stage, trigger, context)
|
|
await db.commit()
|
|
return result
|
|
|
|
try:
|
|
logger.info(f"📈 Stage advance: {current_stage} → (trigger: {trigger}) for lead {lead_id}")
|
|
result = asyncio.run(run())
|
|
logger.info(f"✅ Stage advanced to: {result.get('new_stage')}")
|
|
return result
|
|
except Exception as exc:
|
|
logger.error(f"❌ Stage advance failed: {exc}")
|
|
self.retry(exc=exc)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=2)
|
|
def dispatch_agent_actions(self, actions: list, tenant_id: str):
|
|
"""Dispatch agent-generated actions to external services."""
|
|
from app.database import async_session
|
|
from app.services.agents.action_dispatcher import ActionDispatcher
|
|
|
|
async def run():
|
|
async with async_session() as db:
|
|
dispatcher = ActionDispatcher(db)
|
|
results = await dispatcher.dispatch(actions, tenant_id)
|
|
await db.commit()
|
|
return results
|
|
|
|
try:
|
|
logger.info(f"📤 Dispatching {len(actions)} actions for tenant {tenant_id}")
|
|
results = asyncio.run(run())
|
|
success = sum(1 for r in results if r.get("status") == "success")
|
|
logger.info(f"✅ Dispatched: {success}/{len(actions)} successful")
|
|
return results
|
|
except Exception as exc:
|
|
logger.error(f"❌ Action dispatch failed: {exc}")
|
|
self.retry(exc=exc)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=1)
|
|
def run_daily_pipeline_sweep(self, tenant_id: str):
|
|
"""
|
|
Daily sweep: find stale leads and advance or nurture them.
|
|
Runs as a scheduled task (every 24h).
|
|
"""
|
|
from app.database import async_session
|
|
from app.services.agents.autonomous_pipeline import AutonomousPipeline
|
|
|
|
async def run():
|
|
async with async_session() as db:
|
|
pipeline = AutonomousPipeline(db)
|
|
# TODO: Query stale leads from DB and advance them
|
|
summary = pipeline.get_pipeline_summary()
|
|
return {
|
|
"status": "sweep_completed",
|
|
"pipeline_summary": summary,
|
|
}
|
|
|
|
try:
|
|
logger.info(f"🧹 Daily pipeline sweep for tenant {tenant_id}")
|
|
result = asyncio.run(run())
|
|
return result
|
|
except Exception as exc:
|
|
logger.error(f"❌ Daily sweep failed: {exc}")
|
|
return {"status": "error", "detail": str(exc)}
|