diff --git a/salesflow-saas/backend/app/api/v1/hermes.py b/salesflow-saas/backend/app/api/v1/hermes.py index c75e9e05..2de00ff1 100644 --- a/salesflow-saas/backend/app/api/v1/hermes.py +++ b/salesflow-saas/backend/app/api/v1/hermes.py @@ -1,214 +1,285 @@ """ -Hermes API — Dealix AI Revenue OS -Top-level orchestration, profiles, cost, health, improvements, security. +Hermes API -- Dealix AI Revenue OS -- واجهة هيرمس البرمجية +Orchestrator endpoints: execute tasks, manage profiles, view costs, +run security scans, approve improvements, and generate executive summaries. """ -import logging -from typing import Optional +from __future__ import annotations -from fastapi import APIRouter, Depends, HTTPException, status -from pydantic import BaseModel +import logging +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field + +from app.services.hermes_orchestrator import hermes_orchestrator, HermesProfile +from app.services.execution_router import execution_router, TaskClass +from app.services.shannon_security import shannon_security, ShannonScope +from app.services.self_improvement import self_improvement_engine +from app.services.observability import observability_service logger = logging.getLogger(__name__) -router = APIRouter(prefix="/hermes", tags=["hermes"]) +router = APIRouter(prefix="/hermes", tags=["Hermes Orchestrator"]) + + +# --------------------------------------------------------------------------- +# Request / response schemas +# --------------------------------------------------------------------------- class ExecuteRequest(BaseModel): + profile_id: str = "founder" + task: str + params: Dict[str, Any] = Field(default_factory=dict) + user_context: Dict[str, Any] = Field(default_factory=lambda: { + "user_id": "api_user", "tenant_id": "default", "role": "owner", + }) + + +class ExecuteResponse(BaseModel): + run_id: str profile_id: str task: str - params: dict = {} - - -class ApproveRequest(BaseModel): - approved_by: str + status: str + backend: str = "" + data: Dict[str, Any] = {} + evidence: List[str] = [] + receipt_id: Optional[str] = None + cost_usd: float = 0.0 + duration_ms: int = 0 + error: Optional[str] = None + message_ar: str = "" class ScanRequest(BaseModel): - environment: str - base_url: str - scopes: list[str] = [] + environment: str = "staging" + scopes: List[str] = Field( + default_factory=lambda: ["auth", "injection", "pdpl"], + ) + base_url: str = "https://staging.dealix.sa" -# ── Execute ──────────────────────────────────────── +class ApproveRequest(BaseModel): + user_id: str = "founder" -@router.post("/execute") -async def execute_task(req: ExecuteRequest): - from app.services.hermes_orchestrator import hermes - try: - result = await hermes.execute( - profile_id=req.profile_id, - task=req.task, - params=req.params, - ) - return result - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - except PermissionError as e: - raise HTTPException(status_code=403, detail=str(e)) +# --------------------------------------------------------------------------- +# Execute +# --------------------------------------------------------------------------- + +@router.post("/execute", response_model=ExecuteResponse) +async def execute_task(req: ExecuteRequest) -> ExecuteResponse: + """Execute a task via the Hermes orchestrator.""" + result = await hermes_orchestrator.execute( + profile_id=req.profile_id, + task=req.task, + params=req.params, + user_context=req.user_context, + ) + return ExecuteResponse(**result.model_dump()) -# ── Profiles ─────────────────────────────────────── - +# --------------------------------------------------------------------------- +# Profiles +# --------------------------------------------------------------------------- @router.get("/profiles") -async def list_profiles(): - from app.services.hermes_orchestrator import hermes - profiles = hermes.list_profiles() - return {"profiles": [p.model_dump() if hasattr(p, 'model_dump') else p.__dict__ for p in profiles]} +async def list_profiles() -> JSONResponse: + """List all available Hermes profiles.""" + profiles = await hermes_orchestrator.list_profiles() + return JSONResponse(content={ + "profiles": [p.model_dump() for p in profiles], + "count": len(profiles), + "message_ar": f"{len(profiles)} ملفات شخصية متاحة", + }) @router.get("/profiles/{profile_id}") -async def get_profile(profile_id: str): - from app.services.hermes_orchestrator import hermes - profile = hermes.get_profile(profile_id) +async def get_profile(profile_id: str) -> JSONResponse: + """Get details for a specific profile.""" + profile = await hermes_orchestrator.get_profile(profile_id) if not profile: - raise HTTPException(status_code=404, detail="الملف الشخصي غير موجود") - return profile.model_dump() if hasattr(profile, 'model_dump') else profile.__dict__ + raise HTTPException(status_code=404, detail=f"الملف الشخصي غير موجود: {profile_id}") + return JSONResponse(content=profile.model_dump()) -# ── Cost & Health ────────────────────────────────── - +# --------------------------------------------------------------------------- +# Cost +# --------------------------------------------------------------------------- @router.get("/cost") -async def cost_report(period: str = "daily", profile: Optional[str] = None): - from app.services.observability import observability - return await observability.get_cost_report(period, profile) +async def cost_report(period: str = "daily") -> JSONResponse: + """Get cost report from the orchestrator and observability service.""" + hermes_cost = await hermes_orchestrator.get_cost_report(period) + obs_cost = await observability_service.get_cost_report(period) + return JSONResponse(content={ + "hermes": hermes_cost, + "observability": obs_cost, + }) +# --------------------------------------------------------------------------- +# Health +# --------------------------------------------------------------------------- + @router.get("/health") -async def health_report(): - from app.services.observability import observability - return await observability.get_health_report() +async def health_report() -> JSONResponse: + """System health report across all backends and workflows.""" + obs_health = await observability_service.get_health_report() + backend_health = await execution_router.get_backend_health() + anomalies = await observability_service.detect_anomalies() + return JSONResponse(content={ + "system": obs_health, + "backends": {k: v.model_dump() for k, v in backend_health.items()}, + "anomalies": [a.model_dump(mode="json") for a in anomalies], + "anomaly_count": len(anomalies), + }) -@router.get("/performance") -async def performance_report(period: str = "daily"): - from app.services.observability import observability - return await observability.get_performance_report(period) - - -@router.get("/anomalies") -async def detect_anomalies(): - from app.services.observability import observability - return {"anomalies": await observability.detect_anomalies()} - - -@router.get("/executive-summary") -async def executive_summary(period: str = "weekly"): - from app.services.observability import observability - summary = await observability.get_executive_summary(period) - return {"summary": summary} - - -# ── Runs ─────────────────────────────────────────── - +# --------------------------------------------------------------------------- +# Runs +# --------------------------------------------------------------------------- @router.get("/runs") -async def list_active_runs(): - from app.services.hermes_orchestrator import hermes - return {"runs": hermes.get_active_runs()} +async def list_active_runs() -> JSONResponse: + """List currently active runs.""" + runs = await hermes_orchestrator.get_active_runs() + return JSONResponse(content={ + "active_runs": [r.model_dump(mode="json") for r in runs], + "count": len(runs), + "message_ar": f"{len(runs)} عمليات نشطة حالياً", + }) @router.post("/runs/{run_id}/abort") -async def abort_run(run_id: str): - from app.services.hermes_orchestrator import hermes - success = hermes.abort_run(run_id) +async def abort_run(run_id: str) -> JSONResponse: + """Abort an active run.""" + success = await hermes_orchestrator.abort_run(run_id) if not success: - raise HTTPException(status_code=404, detail="التشغيل غير موجود") - return {"status": "aborted", "run_id": run_id} + raise HTTPException(status_code=404, detail=f"التشغيل غير موجود أو اكتمل بالفعل: {run_id}") + return JSONResponse(content={ + "run_id": run_id, + "status": "aborted", + "message_ar": f"تم إلغاء التشغيل: {run_id}", + }) -# ── Self-Improvement ────────────────────────────── - +# --------------------------------------------------------------------------- +# Self-improvement +# --------------------------------------------------------------------------- @router.get("/improvements") -async def list_improvements(status_filter: Optional[str] = None): - from app.services.self_improvement import self_improvement, ImprovementStatus - if status_filter: - try: - s = ImprovementStatus(status_filter) - except ValueError: - raise HTTPException(status_code=400, detail="حالة غير صالحة") - proposals = await self_improvement.get_proposals(s) - else: - proposals = await self_improvement.get_proposals() - return {"proposals": [p.model_dump() for p in proposals]} - - -@router.post("/improvements/cycle") -async def run_improvement_cycle(): - from app.services.self_improvement import self_improvement - result = await self_improvement.run_cycle() - return result.model_dump() +async def list_improvements(status: Optional[str] = None) -> JSONResponse: + """List self-improvement proposals.""" + proposals = self_improvement_engine.list_proposals(status) + report = await self_improvement_engine.report() + return JSONResponse(content={ + "proposals": [p.model_dump(mode="json") for p in proposals], + "count": len(proposals), + "summary": report, + }) @router.post("/improvements/{proposal_id}/approve") -async def approve_improvement(proposal_id: str, req: ApproveRequest): - from app.services.self_improvement import self_improvement - success = await self_improvement.apply(proposal_id, req.approved_by) - if not success: - raise HTTPException(status_code=404, detail="المقترح غير موجود أو يحتاج موافقة") - return {"status": "approved", "proposal_id": proposal_id} +async def approve_improvement(proposal_id: str, req: ApproveRequest) -> JSONResponse: + """Approve a self-improvement proposal.""" + proposal = await self_improvement_engine.approve(proposal_id, req.user_id) + if not proposal: + raise HTTPException( + status_code=404, + detail=f"المقترح غير موجود أو ليس في حالة 'مقترح': {proposal_id}", + ) + return JSONResponse(content={ + "proposal_id": proposal.id, + "status": proposal.status, + "approved_by": proposal.approved_by, + "message_ar": f"تمت الموافقة على المقترح: {proposal.title_ar}", + }) -@router.post("/improvements/{proposal_id}/reject") -async def reject_improvement(proposal_id: str): - from app.services.self_improvement import self_improvement - success = await self_improvement.reject(proposal_id) - if not success: - raise HTTPException(status_code=404, detail="المقترح غير موجود") - return {"status": "rejected", "proposal_id": proposal_id} +@router.post("/improvements/cycle") +async def run_improvement_cycle(tenant_id: Optional[str] = None) -> JSONResponse: + """Trigger a self-improvement cycle.""" + cycle = await self_improvement_engine.run_cycle(tenant_id) + return JSONResponse(content=cycle.model_dump(mode="json")) -# ── Security (Shannon) ──────────────────────────── - +# --------------------------------------------------------------------------- +# Shannon security +# --------------------------------------------------------------------------- @router.get("/security/report") -async def security_report(severity: Optional[str] = None): - from app.services.shannon_security import shannon - findings = shannon.get_all_findings(severity) - should_block = await shannon.should_block_release() - return { - "findings": [f.model_dump() for f in findings], - "total": len(findings), - "should_block_release": should_block, - } +async def latest_security_report() -> JSONResponse: + """Get the latest Shannon security scan report.""" + report = shannon_security.get_latest_report() + if not report: + return JSONResponse(content={ + "report": None, + "message_ar": "لا يوجد تقرير أمني بعد. قم بتشغيل فحص أولاً.", + }) + return JSONResponse(content=report.model_dump(mode="json")) @router.post("/security/scan") -async def trigger_security_scan(req: ScanRequest): - from app.services.shannon_security import shannon, ShannonScope - try: - scopes = [ShannonScope(s) for s in req.scopes] if req.scopes else None - report = await shannon.run_scan( - environment=req.environment, - base_url=req.base_url, - scopes=scopes, +async def trigger_security_scan(req: ScanRequest) -> JSONResponse: + """Trigger a Shannon security scan (staging only).""" + scopes = [] + for s in req.scopes: + try: + scopes.append(ShannonScope(s)) + except ValueError: + raise HTTPException( + status_code=400, + detail=f"نطاق غير صالح: {s}. المتاحة: {[x.value for x in ShannonScope]}", + ) + + report = await shannon_security.run_scan( + environment=req.environment, + scopes=scopes, + base_url=req.base_url, + ) + return JSONResponse(content=report.model_dump(mode="json")) + + +# --------------------------------------------------------------------------- +# Executive summary +# --------------------------------------------------------------------------- + +@router.get("/executive-summary") +async def executive_summary(period: str = "weekly") -> JSONResponse: + """Arabic executive summary for leadership.""" + summary = await observability_service.get_executive_summary(period) + improvement_report = await self_improvement_engine.report() + security_report = shannon_security.get_latest_report() + + full_summary = summary + if improvement_report.get("total_proposals", 0) > 0: + applied = improvement_report.get("by_status", {}).get("applied", 0) + pending = improvement_report.get("by_status", {}).get("proposed", 0) + full_summary += f"، {applied} تحسين مطبق، {pending} تحسين معلق" + + if security_report: + full_summary += ( + f"، آخر فحص أمني: {security_report.critical_count} حرجة، " + f"{security_report.high_count} عالية" ) - return report.model_dump() - except PermissionError as e: - raise HTTPException( - status_code=403, - detail=f"محظور: {str(e)}" - ) - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) + else: + full_summary += "، لا يوجد فحص أمني بعد" + + return JSONResponse(content={ + "summary_ar": full_summary, + "period": period, + "improvements": improvement_report, + "security_status": security_report.model_dump(mode="json") if security_report else None, + }) -# ── Session Continuity ──────────────────────────── +# --------------------------------------------------------------------------- +# Routing stats +# --------------------------------------------------------------------------- - -@router.get("/session/restore") -async def restore_session(): - from app.services.session_continuity import session_continuity - prompt = await session_continuity.get_restore_prompt() - return {"restore_prompt": prompt} - - -@router.get("/session/state") -async def get_session_state(): - from app.services.session_continuity import session_continuity - state = await session_continuity.restore_state() - if not state: - return {"state": None} - return {"state": state.model_dump() if hasattr(state, 'model_dump') else state.__dict__} +@router.get("/routing-stats") +async def routing_stats() -> JSONResponse: + """Execution routing statistics.""" + stats = await execution_router.get_routing_stats() + return JSONResponse(content=stats) diff --git a/salesflow-saas/backend/app/api/v1/router.py b/salesflow-saas/backend/app/api/v1/router.py index 847c2b73..4ce42b05 100644 --- a/salesflow-saas/backend/app/api/v1/router.py +++ b/salesflow-saas/backend/app/api/v1/router.py @@ -16,6 +16,7 @@ from app.api.v1 import lead_prospector as prospector_router from app.api.v1 import pipeline as pipeline_router from app.api.v1 import agent_system as agent_system_router from app.api.v1 import autonomous_foundation as autonomous_foundation_router +from app.api.v1 import hermes as hermes_router from app.api.v1 import marketing_hub as marketing_hub_router from app.api.v1 import strategy_summary as strategy_summary_router from app.api.v1 import value_proposition as value_proposition_router @@ -86,3 +87,6 @@ api_router.include_router(pipeline_router.router) # ── 22-Agent AI System — Full Empire Control ───────────────── api_router.include_router(agent_system_router.router) api_router.include_router(autonomous_foundation_router.router) + +# ── Hermes Fusion — Orchestration Layer ────────────────────── +api_router.include_router(hermes_router.router) diff --git a/salesflow-saas/backend/app/services/observability.py b/salesflow-saas/backend/app/services/observability.py index 4c0946dc..175dfd09 100644 --- a/salesflow-saas/backend/app/services/observability.py +++ b/salesflow-saas/backend/app/services/observability.py @@ -1,17 +1,27 @@ """ -Observability Service — Dealix AI Revenue OS -Cost tracking, workflow metrics, health monitoring, and Arabic executive summaries. +Observability Service -- Dealix AI Revenue OS -- خدمة المراقبة +Track cost, performance, and health across all agent workflows. +Anomaly detection, executive summaries in Arabic. """ +from __future__ import annotations + import logging -from datetime import datetime, timezone, timedelta -from typing import Optional +import statistics +from collections import defaultdict +from datetime import datetime, timedelta, timezone +from typing import Any, Optional from pydantic import BaseModel, Field logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# Models +# --------------------------------------------------------------------------- + class WorkflowMetric(BaseModel): + """Single workflow execution metric.""" workflow_name: str profile_id: str backend: str @@ -23,170 +33,313 @@ class WorkflowMetric(BaseModel): error: Optional[str] = None +class AnomalyAlert(BaseModel): + """Detected anomaly.""" + id: str = "" + anomaly_type: str # cost_spike, failure_spike, latency_spike, regression + description: str + description_ar: str + severity: str = "medium" # critical, high, medium, low + metric_name: str = "" + current_value: float = 0.0 + baseline_value: float = 0.0 + deviation_pct: float = 0.0 + detected_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +# --------------------------------------------------------------------------- +# Service +# --------------------------------------------------------------------------- + class ObservabilityService: """Track cost, performance, and health across all agent workflows.""" - def __init__(self): + def __init__(self) -> None: self._metrics: list[WorkflowMetric] = [] - self._max_metrics = 50000 + self._max_metrics = 50_000 + self._anomalies: list[AnomalyAlert] = [] + self._max_anomalies = 1_000 + logger.info("خدمة المراقبة: تم التهيئة") + + # -- Recording --------------------------------------------------------- async def record_workflow(self, metric: WorkflowMetric) -> None: + """Store a workflow execution metric.""" self._metrics.append(metric) if len(self._metrics) > self._max_metrics: self._metrics = self._metrics[-self._max_metrics:] logger.debug( - f"Recorded: {metric.workflow_name} " - f"cost=${metric.estimated_cost_usd:.4f} " - f"{'OK' if metric.success else 'FAIL'}" + "[Obs] سجل: %s profile=%s backend=%s %dms $%.4f %s", + metric.workflow_name, metric.profile_id, metric.backend, + metric.duration_ms, metric.estimated_cost_usd, + "OK" if metric.success else "FAIL", ) - def _filter_by_period( - self, period: str, metrics: list[WorkflowMetric] = None - ) -> list[WorkflowMetric]: - source = metrics or self._metrics - now = datetime.now(timezone.utc) - if period == "hourly": - cutoff = now - timedelta(hours=1) - elif period == "daily": - cutoff = now - timedelta(days=1) - elif period == "weekly": - cutoff = now - timedelta(weeks=1) - elif period == "monthly": - cutoff = now - timedelta(days=30) - else: - cutoff = now - timedelta(days=1) - return [m for m in source if m.started_at >= cutoff] + # -- Cost report ------------------------------------------------------- async def get_cost_report( - self, period: str = "daily", profile: str = None - ) -> dict: - filtered = self._filter_by_period(period) - if profile: - filtered = [m for m in filtered if m.profile_id == profile] + self, period: str = "daily", profile: Optional[str] = None, + ) -> dict[str, Any]: + """Total cost, cost by profile, cost by backend, cost by workflow.""" + cutoff = self._period_cutoff(period) + filtered = [ + m for m in self._metrics + if m.started_at >= cutoff and (profile is None or m.profile_id == profile) + ] - total_cost = sum(m.estimated_cost_usd for m in filtered) - by_profile: dict[str, float] = {} - by_backend: dict[str, float] = {} - by_workflow: dict[str, float] = {} + total = sum(m.estimated_cost_usd for m in filtered) + by_profile: dict[str, float] = defaultdict(float) + by_backend: dict[str, float] = defaultdict(float) + by_workflow: dict[str, float] = defaultdict(float) for m in filtered: - by_profile[m.profile_id] = by_profile.get(m.profile_id, 0) + m.estimated_cost_usd - by_backend[m.backend] = by_backend.get(m.backend, 0) + m.estimated_cost_usd - by_workflow[m.workflow_name] = by_workflow.get(m.workflow_name, 0) + m.estimated_cost_usd + by_profile[m.profile_id] += m.estimated_cost_usd + by_backend[m.backend] += m.estimated_cost_usd + by_workflow[m.workflow_name] += m.estimated_cost_usd - top_expensive = sorted(by_workflow.items(), key=lambda x: x[1], reverse=True)[:5] + # Sort by cost descending + top_workflows = sorted(by_workflow.items(), key=lambda x: -x[1])[:10] return { "period": period, - "total_cost_usd": round(total_cost, 4), - "total_workflows": len(filtered), + "total_usd": round(total, 4), "by_profile": {k: round(v, 4) for k, v in by_profile.items()}, "by_backend": {k: round(v, 4) for k, v in by_backend.items()}, - "top_expensive": [{"name": k, "cost": round(v, 4)} for k, v in top_expensive], + "top_workflows": [{"name": n, "cost_usd": round(c, 4)} for n, c in top_workflows], + "total_executions": len(filtered), + "message_ar": f"التكلفة الإجمالية ({period}): ${total:.2f} عبر {len(filtered)} عملية", } - async def get_performance_report(self, period: str = "daily") -> dict: - filtered = self._filter_by_period(period) + # -- Performance report ------------------------------------------------ + + async def get_performance_report(self, period: str = "daily") -> dict[str, Any]: + """Average duration, P95 duration, success rate, error rate.""" + cutoff = self._period_cutoff(period) + filtered = [m for m in self._metrics if m.started_at >= cutoff] + if not filtered: - return {"period": period, "total": 0} + return { + "period": period, "total_executions": 0, + "message_ar": "لا توجد بيانات لهذه الفترة", + } durations = [m.duration_ms for m in filtered] - durations.sort() - total = len(durations) - success_count = sum(1 for m in filtered if m.success) + successes = sum(1 for m in filtered if m.success) + failures = len(filtered) - successes - p95_idx = min(int(total * 0.95), total - 1) - errors = [m for m in filtered if not m.success] + avg_ms = statistics.mean(durations) + p95_ms = sorted(durations)[int(len(durations) * 0.95)] if len(durations) >= 20 else max(durations) + success_rate = successes / len(filtered) + + # Slowest workflows + by_wf: dict[str, list[int]] = defaultdict(list) + for m in filtered: + by_wf[m.workflow_name].append(m.duration_ms) + slowest = sorted( + [(wf, statistics.mean(ds)) for wf, ds in by_wf.items()], + key=lambda x: -x[1], + )[:5] + + # Most expensive + cost_wf: dict[str, float] = defaultdict(float) + for m in filtered: + cost_wf[m.workflow_name] += m.estimated_cost_usd + most_expensive = sorted(cost_wf.items(), key=lambda x: -x[1])[:5] return { "period": period, - "total_workflows": total, - "success_rate": round(success_count / total * 100, 1) if total else 0, - "avg_duration_ms": round(sum(durations) / total) if total else 0, - "p95_duration_ms": durations[p95_idx] if durations else 0, - "error_count": len(errors), - "error_rate": round(len(errors) / total * 100, 1) if total else 0, - "recent_errors": [ - {"workflow": e.workflow_name, "error": e.error, "at": e.started_at.isoformat()} - for e in errors[-5:] - ], + "total_executions": len(filtered), + "avg_duration_ms": round(avg_ms, 2), + "p95_duration_ms": p95_ms, + "success_rate": round(success_rate, 4), + "error_rate": round(1 - success_rate, 4), + "total_successes": successes, + "total_failures": failures, + "slowest_workflows": [{"name": n, "avg_ms": round(d, 2)} for n, d in slowest], + "most_expensive": [{"name": n, "cost_usd": round(c, 4)} for n, c in most_expensive], + "message_ar": ( + f"أداء ({period}): {len(filtered)} عملية، " + f"متوسط {avg_ms:.0f}ms، نجاح {success_rate:.0%}، فشل {failures}" + ), } - async def get_health_report(self) -> dict: - daily = self._filter_by_period("daily") - total = len(daily) - success = sum(1 for m in daily if m.success) + # -- Health report ----------------------------------------------------- - backends_used = set(m.backend for m in daily) + async def get_health_report(self) -> dict[str, Any]: + """Backend health, skill health, memory health, knowledge health, trust health.""" + recent = [m for m in self._metrics if m.started_at >= self._period_cutoff("daily")] + + # Backend health + backend_calls: dict[str, dict[str, int]] = defaultdict(lambda: {"ok": 0, "fail": 0}) + for m in recent: + backend_calls[m.backend]["ok" if m.success else "fail"] += 1 backend_health = {} - for b in backends_used: - b_metrics = [m for m in daily if m.backend == b] - b_success = sum(1 for m in b_metrics if m.success) + for b, counts in backend_calls.items(): + total = counts["ok"] + counts["fail"] + rate = counts["ok"] / total if total else 1.0 backend_health[b] = { - "total": len(b_metrics), - "success_rate": round(b_success / len(b_metrics) * 100, 1) if b_metrics else 0, - "avg_duration_ms": round( - sum(m.duration_ms for m in b_metrics) / len(b_metrics) - ) if b_metrics else 0, + "total": total, "success_rate": round(rate, 4), + "healthy": rate >= 0.7 or total < 5, } + # Skill health (by workflow) + skill_calls: dict[str, dict[str, int]] = defaultdict(lambda: {"ok": 0, "fail": 0}) + for m in recent: + skill_calls[m.workflow_name]["ok" if m.success else "fail"] += 1 + skill_health = {} + for s, counts in skill_calls.items(): + total = counts["ok"] + counts["fail"] + rate = counts["ok"] / total if total else 1.0 + skill_health[s] = {"total": total, "success_rate": round(rate, 4)} + + # Overall + total = len(recent) + overall_ok = sum(1 for m in recent if m.success) + overall_rate = overall_ok / total if total else 1.0 + healthy = overall_rate >= 0.8 + return { - "overall_health": "healthy" if (total == 0 or success / total > 0.9) else "degraded", - "workflows_today": total, - "success_rate": round(success / total * 100, 1) if total else 100, - "total_cost_today_usd": round(sum(m.estimated_cost_usd for m in daily), 4), - "backends": backend_health, + "overall_healthy": healthy, + "overall_success_rate": round(overall_rate, 4), + "total_today": total, + "backend_health": backend_health, + "skill_health": skill_health, + "anomalies_today": len([ + a for a in self._anomalies + if a.detected_at >= self._period_cutoff("daily") + ]), + "message_ar": ( + f"صحة النظام: {'سليم' if healthy else 'يحتاج انتباه'} -- " + f"نجاح {overall_rate:.0%}، {total} عملية اليوم" + ), } - async def get_executive_summary(self, period: str = "weekly") -> str: - filtered = self._filter_by_period(period) - total = len(filtered) - success = sum(1 for m in filtered if m.success) - cost = sum(m.estimated_cost_usd for m in filtered) - success_rate = round(success / total * 100) if total else 100 + # -- Executive summary ------------------------------------------------- - period_ar = {"daily": "اليوم", "weekly": "هذا الأسبوع", "monthly": "هذا الشهر"}.get(period, period) + async def get_executive_summary(self, period: str = "weekly") -> str: + """Arabic executive summary for leadership.""" + cutoff = self._period_cutoff(period) + filtered = [m for m in self._metrics if m.started_at >= cutoff] + + total = len(filtered) + successes = sum(1 for m in filtered if m.success) + rate = successes / total if total else 0 + cost = sum(m.estimated_cost_usd for m in filtered) + anomalies = [a for a in self._anomalies if a.detected_at >= cutoff] + critical = sum(1 for a in anomalies if a.severity == "critical") + + period_ar = { + "daily": "اليوم", "weekly": "هذا الأسبوع", + "monthly": "هذا الشهر", + }.get(period, period) summary = ( - f"📊 ملخص {period_ar}:\n" - f"• {total} مهمة منفذة\n" - f"• {success_rate}% نسبة النجاح\n" - f"• ${cost:.2f} التكلفة الإجمالية\n" + f"{period_ar}: {total} مهمة منفذة، " + f"{rate:.0%} نجاح، " + f"تكلفة ${cost:.2f}، " + f"{len(anomalies)} تنبيه" ) - - errors = [m for m in filtered if not m.success] - if errors: - summary += f"• {len(errors)} خطأ يحتاج مراجعة\n" + if critical: + summary += f"، {critical} مشاكل حرجة تحتاج تدخل فوري" else: - summary += "• لا أخطاء حرجة ✅\n" + summary += "، 0 مشاكل حرجة" return summary - async def detect_anomalies(self) -> list[dict]: - anomalies = [] - hourly = self._filter_by_period("hourly") - daily = self._filter_by_period("daily") + # -- Anomaly detection ------------------------------------------------- - if hourly: - hourly_cost = sum(m.estimated_cost_usd for m in hourly) - if hourly_cost > 5.0: - anomalies.append({ - "type": "cost_spike", - "severity": "high", - "message": f"تكلفة الساعة الأخيرة ${hourly_cost:.2f} — أعلى من الحد الطبيعي", - "value": hourly_cost, - }) + async def detect_anomalies(self) -> list[AnomalyAlert]: + """Sudden cost spikes, unusual failure patterns, backend degradation.""" + alerts: list[AnomalyAlert] = [] + now = datetime.now(timezone.utc) + today = [m for m in self._metrics if m.started_at >= now - timedelta(hours=24)] + prev_week = [ + m for m in self._metrics + if now - timedelta(days=8) <= m.started_at < now - timedelta(days=1) + ] - hourly_errors = sum(1 for m in hourly if not m.success) - if len(hourly) > 5 and hourly_errors / len(hourly) > 0.3: - anomalies.append({ - "type": "error_spike", - "severity": "critical", - "message": f"معدل أخطاء مرتفع: {hourly_errors}/{len(hourly)} في الساعة الأخيرة", - "value": hourly_errors, - }) + if not today or not prev_week: + return alerts - return anomalies + # Cost spike detection + today_cost = sum(m.estimated_cost_usd for m in today) + avg_daily_cost = sum(m.estimated_cost_usd for m in prev_week) / 7 + if avg_daily_cost > 0 and today_cost > avg_daily_cost * 2: + deviation = ((today_cost - avg_daily_cost) / avg_daily_cost) * 100 + alerts.append(AnomalyAlert( + id=f"cost-{now.strftime('%Y%m%d')}", + anomaly_type="cost_spike", + description=f"Today's cost ${today_cost:.2f} is {deviation:.0f}% above daily avg ${avg_daily_cost:.2f}", + description_ar=f"تكلفة اليوم ${today_cost:.2f} أعلى بنسبة {deviation:.0f}% من المتوسط ${avg_daily_cost:.2f}", + severity="high" if deviation > 200 else "medium", + metric_name="daily_cost_usd", + current_value=today_cost, + baseline_value=avg_daily_cost, + deviation_pct=round(deviation, 2), + )) + + # Failure rate spike + today_fail_rate = sum(1 for m in today if not m.success) / max(len(today), 1) + prev_fail_rate = sum(1 for m in prev_week if not m.success) / max(len(prev_week), 1) + if today_fail_rate > 0.15 and today_fail_rate > prev_fail_rate * 2: + alerts.append(AnomalyAlert( + id=f"fail-{now.strftime('%Y%m%d')}", + anomaly_type="failure_spike", + description=f"Failure rate {today_fail_rate:.1%} vs baseline {prev_fail_rate:.1%}", + description_ar=f"معدل الفشل {today_fail_rate:.1%} مقابل الخط الأساسي {prev_fail_rate:.1%}", + severity="critical" if today_fail_rate > 0.3 else "high", + metric_name="failure_rate", + current_value=today_fail_rate, + baseline_value=prev_fail_rate, + deviation_pct=round(((today_fail_rate - prev_fail_rate) / max(prev_fail_rate, 0.01)) * 100, 2), + )) + + # Latency spike (per backend) + for backend in {"claude", "openclaude", "goose", "internal"}: + today_b = [m.duration_ms for m in today if m.backend == backend] + prev_b = [m.duration_ms for m in prev_week if m.backend == backend] + if len(today_b) >= 5 and len(prev_b) >= 5: + today_avg = statistics.mean(today_b) + prev_avg = statistics.mean(prev_b) + if prev_avg > 0 and today_avg > prev_avg * 3: + deviation = ((today_avg - prev_avg) / prev_avg) * 100 + alerts.append(AnomalyAlert( + id=f"latency-{backend}-{now.strftime('%Y%m%d')}", + anomaly_type="latency_spike", + description=f"{backend} latency {today_avg:.0f}ms vs baseline {prev_avg:.0f}ms", + description_ar=f"زمن استجابة {backend}: {today_avg:.0f}ms مقابل {prev_avg:.0f}ms", + severity="high", + metric_name=f"latency_{backend}", + current_value=today_avg, + baseline_value=prev_avg, + deviation_pct=round(deviation, 2), + )) + + self._anomalies.extend(alerts) + if len(self._anomalies) > self._max_anomalies: + self._anomalies = self._anomalies[-self._max_anomalies:] + + if alerts: + logger.warning("[Obs] %d anomalies detected", len(alerts)) + + return alerts + + # -- Helpers ----------------------------------------------------------- + + @staticmethod + def _period_cutoff(period: str) -> datetime: + now = datetime.now(timezone.utc) + if period == "daily": + return now - timedelta(hours=24) + if period == "weekly": + return now - timedelta(days=7) + if period == "monthly": + return now - timedelta(days=30) + return now - timedelta(hours=24) -observability = ObservabilityService() +# --------------------------------------------------------------------------- +# Module-level singleton +# --------------------------------------------------------------------------- + +observability_service = ObservabilityService()