diff --git a/salesflow-saas/backend/app/services/self_improvement.py b/salesflow-saas/backend/app/services/self_improvement.py index fe283c3c..3cb1d8b3 100644 --- a/salesflow-saas/backend/app/services/self_improvement.py +++ b/salesflow-saas/backend/app/services/self_improvement.py @@ -1,251 +1,431 @@ """ -Self-Improvement Engine — Dealix AI Revenue OS -Bounded cycle: inspect → measure → propose → verify → apply → report. +Self-Improvement Engine -- Dealix AI Revenue OS -- محرك التحسين الذاتي +Bounded self-improvement loop: Inspect -> Measure -> Propose -> Verify -> Apply. Max 5 proposals per cycle, max 2 auto-applies (trivial only). """ +from __future__ import annotations + import logging -from datetime import datetime, timezone -from enum import Enum -from typing import Optional +import uuid +from collections import defaultdict +from datetime import datetime, timedelta, timezone +from typing import Any, Optional from pydantic import BaseModel, Field logger = logging.getLogger(__name__) -class ImprovementCategory(str, Enum): - SKILL_FIX = "skill_fix" - KNOWLEDGE_UPDATE = "knowledge_update" - COST_REDUCTION = "cost_reduction" - QUALITY = "quality" - PERFORMANCE = "performance" - SECURITY = "security" +# --------------------------------------------------------------------------- +# Models +# --------------------------------------------------------------------------- - -class ImprovementStatus(str, Enum): - PROPOSED = "proposed" - APPROVED = "approved" - APPLIED = "applied" - REJECTED = "rejected" - TESTED = "tested" - FAILED = "failed" +class Metric(BaseModel): + """Quantified measurement of a detected issue.""" + name: str + name_ar: str + value: float + unit: str = "" + threshold: float = 0.0 + exceeds_threshold: bool = False + frequency: int = 0 + estimated_cost_usd: float = 0.0 + impact: str = "low" class ImprovementProposal(BaseModel): - id: str - category: ImprovementCategory + """A single bounded improvement proposal.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + category: str = "quality" # skill_fix, knowledge_update, cost_reduction, quality title: str title_ar: str description: str evidence: list[str] = [] impact: str = "medium" # high, medium, low effort: str = "small" # trivial, small, medium, large - proposed_action: str + proposed_action: str = "" requires_approval: bool = True - status: ImprovementStatus = ImprovementStatus.PROPOSED + status: str = "proposed" # proposed, approved, applied, rejected, tested + approved_by: Optional[str] = None created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) applied_at: Optional[datetime] = None - approved_by: Optional[str] = None - - -class Metric(BaseModel): - name: str - value: float - unit: str - trend: str = "stable" # improving, degrading, stable - severity: str = "info" # critical, warning, info + rollback_action: str = "" + test_result: Optional[str] = None class CycleResult(BaseModel): - cycle_id: str - inspected_areas: list[str] - metrics: list[Metric] - proposals: list[ImprovementProposal] - auto_applied: int - awaiting_approval: int - started_at: datetime - completed_at: datetime - summary: str - summary_ar: str + """Result of a full self-improvement cycle.""" + cycle_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + tenant_id: str = "" + started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + completed_at: Optional[datetime] = None + duration_ms: int = 0 + issues_found: int = 0 + metrics_measured: int = 0 + proposals_generated: int = 0 + proposals_auto_applied: int = 0 + proposals_pending: int = 0 + summary: str = "" + summary_ar: str = "" +# --------------------------------------------------------------------------- +# Engine +# --------------------------------------------------------------------------- + class SelfImprovementEngine: + """Bounded self-improvement. Inspect -> Measure -> Propose -> Verify -> Apply.""" + MAX_PROPOSALS_PER_CYCLE = 5 - MAX_AUTO_APPLY = 2 + MAX_AUTO_APPLY = 2 # only trivial improvements auto-apply - def __init__(self): + def __init__(self) -> None: self._proposals: list[ImprovementProposal] = [] - self._cycle_count = 0 - self._metrics_history: list[dict] = [] + self._cycles: list[CycleResult] = [] + self._max_proposals = 5_000 + self._max_cycles = 500 + logger.info("محرك التحسين الذاتي: تم التهيئة") - async def inspect(self, tenant_id: str = None) -> dict: - issues = {} - issues["skill_failures"] = { - "check": "مهارات فاشلة", - "description": "Skills with >20% failure rate in last 7 days", - "action": "Review and fix or disable failing skills", - } - issues["expensive_workflows"] = { - "check": "سير عمل مكلف", - "description": "Workflows costing >$1/run", - "action": "Optimize prompts or switch to cheaper model", - } - issues["stale_knowledge"] = { - "check": "معرفة قديمة", - "description": "Wiki pages not updated in 30+ days", - "action": "Review and update or archive", - } - issues["repeated_escalations"] = { - "check": "تصعيدات متكررة", - "description": "Same escalation reason >5 times in 7 days", - "action": "Automate the resolution or improve the workflow", - } - issues["low_trust_calls"] = { - "check": "استدعاءات منخفضة الثقة", - "description": "Tool calls with <50% verification rate", - "action": "Add better verification or restrict the tool", - } - logger.info(f"Self-improvement inspection: {len(issues)} areas checked") - return issues + # -- Phase 1: Inspect -------------------------------------------------- - async def measure(self, inspection: dict) -> list[Metric]: - metrics = [ - Metric(name="skill_success_rate", value=87.5, unit="%", trend="stable"), - Metric(name="avg_workflow_cost", value=0.12, unit="USD", trend="improving"), - Metric(name="knowledge_freshness", value=72.0, unit="%", trend="degrading", severity="warning"), - Metric(name="escalation_rate", value=8.3, unit="%", trend="stable"), - Metric(name="tool_trust_score", value=91.0, unit="%", trend="improving"), - Metric(name="avg_response_time", value=1.2, unit="seconds", trend="stable"), - ] - self._metrics_history.append({ - "timestamp": datetime.now(timezone.utc).isoformat(), - "metrics": [m.model_dump() for m in metrics], + async def inspect(self, tenant_id: Optional[str] = None) -> dict[str, Any]: + """Check for issues: skill failures, expensive workflows, stale knowledge, + repeated escalations, low-trust tool calls, slow endpoints.""" + issues: list[dict[str, Any]] = [] + + # Simulated inspection checks (in production, these query real services) + issues.append({ + "type": "skill_failure_rate", + "description": "Some skills have failure rate above 10%", + "description_ar": "بعض المهارات لديها معدل فشل أعلى من 10%", + "data": {"skill_id": "messaging.whatsapp.send", "failure_rate": 0.12, "sample_size": 200}, }) + issues.append({ + "type": "expensive_workflow", + "description": "Content generation workflow uses 3x expected budget", + "description_ar": "سير عمل توليد المحتوى يستخدم 3 أضعاف الميزانية المتوقعة", + "data": {"workflow": "content_generation", "cost_usd": 0.15, "expected_usd": 0.05}, + }) + issues.append({ + "type": "stale_knowledge", + "description": "12 knowledge pages not updated in 30+ days", + "description_ar": "12 صفحة معرفة لم تُحدّث منذ أكثر من 30 يوماً", + "data": {"stale_count": 12, "threshold_days": 30}, + }) + issues.append({ + "type": "repeated_escalation", + "description": "Consent-expired escalations repeat 5+ times/week", + "description_ar": "تصعيدات انتهاء الموافقة تتكرر أكثر من 5 مرات أسبوعياً", + "data": {"escalation_type": "consent_expired", "weekly_count": 7}, + }) + issues.append({ + "type": "low_trust_calls", + "description": "Agent 'growth' has 15% unverified tool calls", + "description_ar": "وكيل 'النمو' لديه 15% مكالمات أدوات غير متحقق منها", + "data": {"agent_id": "growth", "unverified_rate": 0.15}, + }) + + logger.info("[SelfImprove] فحص: %d مشكلة اكتُشفت tenant=%s", len(issues), tenant_id or "global") + return {"tenant_id": tenant_id or "global", "issues": issues, "count": len(issues)} + + # -- Phase 2: Measure -------------------------------------------------- + + async def measure(self, inspection: dict[str, Any]) -> list[Metric]: + """Quantify each issue: frequency, cost, impact.""" + metrics: list[Metric] = [] + for issue in inspection.get("issues", []): + data = issue.get("data", {}) + itype = issue.get("type", "unknown") + + if itype == "skill_failure_rate": + metrics.append(Metric( + name=f"failure_rate:{data.get('skill_id', '?')}", + name_ar=f"معدل الفشل: {data.get('skill_id', '?')}", + value=data.get("failure_rate", 0), + unit="rate", threshold=0.10, + exceeds_threshold=data.get("failure_rate", 0) > 0.10, + frequency=data.get("sample_size", 0), + estimated_cost_usd=data.get("failure_rate", 0) * data.get("sample_size", 0) * 0.01, + impact="high" if data.get("failure_rate", 0) > 0.2 else "medium", + )) + elif itype == "expensive_workflow": + excess = data.get("cost_usd", 0) - data.get("expected_usd", 0) + metrics.append(Metric( + name=f"cost_excess:{data.get('workflow', '?')}", + name_ar=f"تكلفة زائدة: {data.get('workflow', '?')}", + value=excess, unit="usd", threshold=0.05, + exceeds_threshold=excess > 0.05, + estimated_cost_usd=excess * 100, # projected monthly + impact="high" if excess > 0.10 else "medium", + )) + elif itype == "stale_knowledge": + metrics.append(Metric( + name="stale_knowledge_pages", + name_ar="صفحات معرفة قديمة", + value=data.get("stale_count", 0), + unit="pages", threshold=5, + exceeds_threshold=data.get("stale_count", 0) > 5, + impact="low", + )) + elif itype == "repeated_escalation": + metrics.append(Metric( + name=f"repeated_escalation:{data.get('escalation_type', '?')}", + name_ar=f"تصعيد متكرر: {data.get('escalation_type', '?')}", + value=data.get("weekly_count", 0), + unit="per_week", threshold=3, + exceeds_threshold=data.get("weekly_count", 0) > 3, + impact="medium", + )) + elif itype == "low_trust_calls": + metrics.append(Metric( + name=f"unverified_rate:{data.get('agent_id', '?')}", + name_ar=f"معدل غير متحقق: {data.get('agent_id', '?')}", + value=data.get("unverified_rate", 0), + unit="rate", threshold=0.10, + exceeds_threshold=data.get("unverified_rate", 0) > 0.10, + impact="medium", + )) + + logger.info("[SelfImprove] قياس: %d مقاييس", len(metrics)) return metrics + # -- Phase 3: Propose -------------------------------------------------- + async def propose(self, metrics: list[Metric]) -> list[ImprovementProposal]: - proposals = [] - for metric in metrics: - if metric.severity == "warning" or metric.trend == "degrading": - proposal = self._create_proposal(metric) - if proposal: - proposals.append(proposal) - proposals = proposals[:self.MAX_PROPOSALS_PER_CYCLE] + """Generate max 5 proposals per cycle, prioritized by impact/effort.""" + proposals: list[ImprovementProposal] = [] + + for m in sorted(metrics, key=lambda x: -x.estimated_cost_usd): + if len(proposals) >= self.MAX_PROPOSALS_PER_CYCLE: + break + if not m.exceeds_threshold: + continue + + if "failure_rate" in m.name: + proposals.append(ImprovementProposal( + category="skill_fix", + title=f"Fix high failure rate on {m.name.split(':')[-1]}", + title_ar=f"إصلاح معدل الفشل المرتفع في {m.name.split(':')[-1]}", + description=f"Failure rate {m.value:.1%} exceeds {m.threshold:.1%} threshold.", + evidence=[f"Measured {m.frequency} calls, {m.value:.1%} failed"], + impact=m.impact, effort="small", + proposed_action="Add retry logic and improve error handling for the skill handler", + requires_approval=True, + rollback_action="Revert skill handler to previous version", + )) + elif "cost_excess" in m.name: + proposals.append(ImprovementProposal( + category="cost_reduction", + title=f"Reduce cost of {m.name.split(':')[-1]} workflow", + title_ar=f"تقليل تكلفة سير عمل {m.name.split(':')[-1]}", + description=f"Excess cost: ${m.value:.3f}/call (projected: ${m.estimated_cost_usd:.2f}/mo).", + evidence=[f"Current: ${m.value + m.threshold:.3f}, Expected: ${m.threshold:.3f}"], + impact=m.impact, effort="medium", + proposed_action="Switch to cheaper model for simple content or add caching layer", + requires_approval=True, + rollback_action="Revert model routing configuration", + )) + elif "stale_knowledge" in m.name: + proposals.append(ImprovementProposal( + category="knowledge_update", + title="Refresh stale knowledge pages", + title_ar="تحديث صفحات المعرفة القديمة", + description=f"{int(m.value)} pages older than {int(m.threshold)} days.", + evidence=[f"{int(m.value)} pages not updated in 30+ days"], + impact="low", effort="trivial", + proposed_action="Flag stale pages for review and auto-mark as needs-update", + requires_approval=False, + rollback_action="Remove stale flags", + )) + elif "repeated_escalation" in m.name: + proposals.append(ImprovementProposal( + category="quality", + title=f"Automate resolution for {m.name.split(':')[-1]}", + title_ar=f"أتمتة الحل لتصعيد {m.name.split(':')[-1]}", + description=f"{int(m.value)} escalations/week exceeds threshold of {int(m.threshold)}.", + evidence=[f"{int(m.value)} weekly occurrences"], + impact="medium", effort="medium", + proposed_action="Add auto-consent-renewal reminder workflow before expiry", + requires_approval=True, + rollback_action="Disable auto-reminder workflow", + )) + elif "unverified_rate" in m.name: + proposals.append(ImprovementProposal( + category="quality", + title=f"Improve verification for agent {m.name.split(':')[-1]}", + title_ar=f"تحسين التحقق لوكيل {m.name.split(':')[-1]}", + description=f"Unverified rate {m.value:.1%} exceeds {m.threshold:.1%}.", + evidence=[f"{m.value:.1%} of tool calls unverified"], + impact="medium", effort="small", + proposed_action="Add post-execution verification step for this agent profile", + requires_approval=False, + rollback_action="Remove extra verification step", + )) + self._proposals.extend(proposals) + if len(self._proposals) > self._max_proposals: + self._proposals = self._proposals[-self._max_proposals:] + + logger.info("[SelfImprove] اقتراحات: %d", len(proposals)) return proposals - def _create_proposal(self, metric: Metric) -> Optional[ImprovementProposal]: - self._cycle_count += 1 - pid = f"IMP-{self._cycle_count:04d}" - - if metric.name == "knowledge_freshness" and metric.value < 80: - return ImprovementProposal( - id=pid, - category=ImprovementCategory.KNOWLEDGE_UPDATE, - title="Update stale wiki pages", - title_ar="تحديث صفحات الويكي القديمة", - description=f"Knowledge freshness at {metric.value}%, below 80% threshold", - evidence=[f"{metric.name}={metric.value}{metric.unit}"], - impact="medium", - effort="trivial", - proposed_action="Run knowledge_brain.lint() and update flagged pages", - requires_approval=False, - ) - if metric.name == "avg_workflow_cost" and metric.value > 0.50: - return ImprovementProposal( - id=pid, - category=ImprovementCategory.COST_REDUCTION, - title="Optimize expensive workflows", - title_ar="تحسين سير العمل المكلف", - description=f"Average workflow cost ${metric.value}, above $0.50 threshold", - evidence=[f"{metric.name}=${metric.value}"], - impact="high", - effort="medium", - proposed_action="Switch to Groq for classification tasks, reduce prompt tokens", - requires_approval=True, - ) - return None + # -- Phase 4: Verify --------------------------------------------------- async def verify(self, proposal: ImprovementProposal) -> bool: - if proposal.effort == "trivial" and not proposal.requires_approval: - return True - if proposal.category == ImprovementCategory.SECURITY: - return False # Security changes always need approval - return proposal.effort in ("trivial", "small") - - async def apply( - self, proposal_id: str, approved_by: str = None - ) -> bool: - proposal = next((p for p in self._proposals if p.id == proposal_id), None) - if not proposal: + """Can we test this safely? Does it have rollback?""" + if not proposal.rollback_action: + logger.warning("[SelfImprove] لا يوجد إجراء تراجع: %s", proposal.id) return False + if proposal.effort == "large": + logger.info("[SelfImprove] جهد كبير يتطلب مراجعة يدوية: %s", proposal.id) + return False + return True + + # -- Phase 5: Apply ---------------------------------------------------- + + async def apply(self, proposal: ImprovementProposal, approved_by: Optional[str] = None) -> bool: + """Apply only if approved (or trivial + auto-allowed). Log everything.""" + if proposal.status in ("applied", "rejected"): + logger.info("[SelfImprove] اقتراح بالفعل %s: %s", proposal.status, proposal.id) + return False + if proposal.requires_approval and not approved_by: - logger.warning(f"Proposal {proposal_id} requires approval") + logger.info("[SelfImprove] يتطلب موافقة: %s", proposal.id) return False - proposal.status = ImprovementStatus.APPLIED - proposal.applied_at = datetime.now(timezone.utc) + + safe = await self.verify(proposal) + if not safe: + proposal.status = "rejected" + proposal.test_result = "Failed safety verification" + logger.warning("[SelfImprove] رفض: %s -- فشل التحقق من السلامة", proposal.id) + return False + + proposal.status = "applied" proposal.approved_by = approved_by or "auto" - logger.info(f"Self-improvement applied: {proposal.title}") + proposal.applied_at = datetime.now(timezone.utc) + proposal.test_result = "Applied successfully" + + logger.info( + "[SelfImprove] تطبيق: %s cat=%s by=%s", + proposal.id, proposal.category, proposal.approved_by, + ) return True - async def reject(self, proposal_id: str, reason: str = "") -> bool: - proposal = next((p for p in self._proposals if p.id == proposal_id), None) - if not proposal: - return False - proposal.status = ImprovementStatus.REJECTED - logger.info(f"Self-improvement rejected: {proposal.title} — {reason}") - return True + # -- Approval ---------------------------------------------------------- - async def run_cycle(self, tenant_id: str = None) -> CycleResult: - started_at = datetime.now(timezone.utc) + async def approve(self, proposal_id: str, user_id: str) -> Optional[ImprovementProposal]: + """Approve a pending proposal.""" + for p in self._proposals: + if p.id == proposal_id and p.status == "proposed": + p.status = "approved" + p.approved_by = user_id + await self.apply(p, approved_by=user_id) + return p + return None + + async def reject(self, proposal_id: str, user_id: str) -> Optional[ImprovementProposal]: + """Reject a pending proposal.""" + for p in self._proposals: + if p.id == proposal_id and p.status in ("proposed", "approved"): + p.status = "rejected" + p.approved_by = user_id + return p + return None + + # -- Full cycle -------------------------------------------------------- + + async def run_cycle(self, tenant_id: Optional[str] = None) -> CycleResult: + """Full cycle: inspect -> measure -> propose -> verify -> approve -> apply -> report.""" + start = datetime.now(timezone.utc) + + # 1. Inspect inspection = await self.inspect(tenant_id) + + # 2. Measure metrics = await self.measure(inspection) + + # 3. Propose proposals = await self.propose(metrics) + # 4+5. Auto-apply trivial proposals (up to MAX_AUTO_APPLY) auto_applied = 0 - for proposal in proposals: - can_verify = await self.verify(proposal) - if can_verify and not proposal.requires_approval: - if auto_applied < self.MAX_AUTO_APPLY: - await self.apply(proposal.id) + for p in proposals: + if auto_applied >= self.MAX_AUTO_APPLY: + break + if not p.requires_approval and p.effort == "trivial": + if await self.apply(p, approved_by="auto"): auto_applied += 1 - awaiting = sum( - 1 for p in proposals - if p.status == ImprovementStatus.PROPOSED + pending = sum(1 for p in proposals if p.status == "proposed") + + now = datetime.now(timezone.utc) + cycle = CycleResult( + tenant_id=tenant_id or "global", + started_at=start, + completed_at=now, + duration_ms=int((now - start).total_seconds() * 1000), + issues_found=inspection["count"], + metrics_measured=len(metrics), + proposals_generated=len(proposals), + proposals_auto_applied=auto_applied, + proposals_pending=pending, + summary=( + f"Found {inspection['count']} issues, measured {len(metrics)} metrics, " + f"generated {len(proposals)} proposals, auto-applied {auto_applied}" + ), + summary_ar=( + f"اكتشاف {inspection['count']} مشاكل، قياس {len(metrics)} مقاييس، " + f"إنشاء {len(proposals)} مقترحات، تطبيق تلقائي {auto_applied}" + ), ) - summary = ( - f"Cycle complete: {len(metrics)} metrics, {len(proposals)} proposals, " - f"{auto_applied} auto-applied, {awaiting} awaiting approval" - ) - summary_ar = ( - f"اكتملت الدورة: {len(metrics)} مقاييس، {len(proposals)} مقترحات، " - f"{auto_applied} تطبيق تلقائي، {awaiting} بانتظار الموافقة" - ) + self._cycles.append(cycle) + if len(self._cycles) > self._max_cycles: + self._cycles = self._cycles[-self._max_cycles:] - return CycleResult( - cycle_id=f"CYCLE-{self._cycle_count}", - inspected_areas=list(inspection.keys()), - metrics=metrics, - proposals=proposals, - auto_applied=auto_applied, - awaiting_approval=awaiting, - started_at=started_at, - completed_at=datetime.now(timezone.utc), - summary=summary, - summary_ar=summary_ar, + logger.info( + "[SelfImprove] دورة اكتملت: issues=%d proposals=%d auto=%d pending=%d %dms", + inspection["count"], len(proposals), auto_applied, pending, cycle.duration_ms, ) + return cycle - async def get_proposals( - self, status: ImprovementStatus = None - ) -> list[ImprovementProposal]: + # -- Reporting --------------------------------------------------------- + + async def report(self) -> dict[str, Any]: + """Summary: proposals made, applied, rejected, impact measured.""" + by_status: dict[str, int] = defaultdict(int) + by_category: dict[str, int] = defaultdict(int) + for p in self._proposals: + by_status[p.status] += 1 + by_category[p.category] += 1 + + return { + "total_proposals": len(self._proposals), + "by_status": dict(by_status), + "by_category": dict(by_category), + "total_cycles": len(self._cycles), + "last_cycle": self._cycles[-1].model_dump(mode="json") if self._cycles else None, + "message_ar": ( + f"مقترحات: {len(self._proposals)}، " + f"مطبقة: {by_status.get('applied', 0)}، " + f"مرفوضة: {by_status.get('rejected', 0)}، " + f"معلقة: {by_status.get('proposed', 0)}" + ), + } + + def list_proposals(self, status: Optional[str] = None) -> list[ImprovementProposal]: + """List all proposals, optionally filtered by status.""" if status: return [p for p in self._proposals if p.status == status] - return self._proposals + return list(self._proposals) - async def get_metrics_history(self) -> list[dict]: - return self._metrics_history + def get_proposal(self, proposal_id: str) -> Optional[ImprovementProposal]: + """Get a single proposal by ID.""" + return next((p for p in self._proposals if p.id == proposal_id), None) -self_improvement = SelfImprovementEngine() +# --------------------------------------------------------------------------- +# Module-level singleton +# --------------------------------------------------------------------------- + +self_improvement_engine = SelfImprovementEngine()