From 525d6d411702c1dea628e23208e0d9233865dc55 Mon Sep 17 00:00:00 2001 From: Sami Assiri Date: Wed, 8 Apr 2026 23:31:02 +0300 Subject: [PATCH] Dealix: OpenClaw safe core, SLA phase 2.5, full-ops UI --- .../app/api/v1/autonomous_foundation.py | 186 ++++++++++++- .../backend/app/api/v1/operations.py | 132 ++++++++- salesflow-saas/backend/app/config.py | 14 + .../backend/app/openclaw/__init__.py | 18 ++ .../backend/app/openclaw/approval_bridge.py | 69 +++++ .../backend/app/openclaw/canary_context.py | 31 +++ .../backend/app/openclaw/gateway.py | 48 ++++ salesflow-saas/backend/app/openclaw/hooks.py | 15 +- .../backend/app/openclaw/media_bridge.py | 70 +++++ .../backend/app/openclaw/memory_bridge.py | 81 ++++++ .../app/openclaw/observability_bridge.py | 94 +++++++ salesflow-saas/backend/app/openclaw/policy.py | 73 +++++ .../backend/app/openclaw/task_router.py | 25 ++ .../app/services/sla_escalation_alerts.py | 255 +++++++++++++++++ .../backend/scripts/full_stack_launch_test.py | 2 + .../tests/test_autonomous_foundation_api.py | 46 ++++ .../tests/test_launch_readiness_scenarios.py | 1 + .../tests/test_new_subscriber_journey.py | 9 + .../backend/tests/test_openclaw_safe_core.py | 90 ++++++ .../backend/tests/test_sla_phase25.py | 56 ++++ .../frontend/e2e/auth-routes.spec.ts | 15 +- .../frontend/e2e/subscriber-journey.spec.ts | 14 +- .../frontend/src/app/login/page.tsx | 14 +- .../src/components/dealix/full-ops-view.tsx | 257 +++++++++++++++++- .../frontend/src/contexts/auth-context.tsx | 7 +- 25 files changed, 1584 insertions(+), 38 deletions(-) create mode 100644 salesflow-saas/backend/app/openclaw/approval_bridge.py create mode 100644 salesflow-saas/backend/app/openclaw/canary_context.py create mode 100644 salesflow-saas/backend/app/openclaw/gateway.py create mode 100644 salesflow-saas/backend/app/openclaw/media_bridge.py create mode 100644 salesflow-saas/backend/app/openclaw/memory_bridge.py create mode 100644 salesflow-saas/backend/app/openclaw/observability_bridge.py create mode 100644 salesflow-saas/backend/app/openclaw/policy.py create mode 100644 salesflow-saas/backend/app/openclaw/task_router.py create mode 100644 salesflow-saas/backend/app/services/sla_escalation_alerts.py create mode 100644 salesflow-saas/backend/tests/test_openclaw_safe_core.py create mode 100644 salesflow-saas/backend/tests/test_sla_phase25.py diff --git a/salesflow-saas/backend/app/api/v1/autonomous_foundation.py b/salesflow-saas/backend/app/api/v1/autonomous_foundation.py index e13b6b97..67df2ebb 100644 --- a/salesflow-saas/backend/app/api/v1/autonomous_foundation.py +++ b/salesflow-saas/backend/app/api/v1/autonomous_foundation.py @@ -1,13 +1,21 @@ from __future__ import annotations -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional -from fastapi import APIRouter +from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel, Field +from app.api.deps import get_optional_user from app.flows.prospecting_durable_flow import prospecting_durable_flow from app.flows.self_improvement_flow import self_improvement_flow +from app.models.user import User +from app.openclaw.gateway import openclaw_gateway +from app.openclaw.media_bridge import media_bridge +from app.openclaw.memory_bridge import memory_bridge +from app.openclaw.observability_bridge import observability_bridge +from app.openclaw.policy import classify_action +from app.openclaw.task_router import task_router from app.services.contract_intelligence_service import contract_intelligence_service from app.services.executive_roi_service import executive_roi_service from app.services.predictive_revenue_service import predictive_revenue_service @@ -55,6 +63,61 @@ class ConnectivityRequest(BaseModel): amount_sar: int = 10 +class MemoryCollectRequest(BaseModel): + tenant_id: str = "default_tenant" + domain: str = "operational" + content: str + evidence: Dict[str, Any] = Field(default_factory=dict) + signal_count: int = 0 + repetition_count: int = 0 + impact_score: float = 0.0 + threshold: float = 60.0 + + +class MediaDraftRequest(BaseModel): + tenant_id: str = "default_tenant" + media_type: str = Field(..., description="video | music") + prompt: str + provider_hint: Optional[str] = None + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class PolicyCheckRequest(BaseModel): + tenant_id: str = "default_tenant" + action: str + payload: Dict[str, Any] = Field(default_factory=dict) + + +def _canary_enabled(tenant_id: str) -> bool: + canary = [x.strip() for x in (settings.OPENCLAW_CANARY_TENANTS or "").split(",") if x.strip()] + if not canary: + return True + return tenant_id in canary + + +def _resolve_tenant(user: Optional[User], fallback_tenant_id: str) -> str: + return str(user.tenant_id) if user else fallback_tenant_id + + +_TASKS_REGISTERED = False + + +def _register_task_router() -> None: + global _TASKS_REGISTERED + if _TASKS_REGISTERED: + return + + async def _prospecting(tenant_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: + return await prospecting_durable_flow.run(tenant_id, payload) + + async def _self_improve(tenant_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: + return self_improvement_flow.run(tenant_id, payload) + + task_router.register("prospecting_flow", _prospecting) + task_router.register("self_improvement_flow", _self_improve) + _TASKS_REGISTERED = True + + def build_go_live_readiness_report() -> Dict[str, Any]: """ Full commercial go-live: blocking checks across security, data, LLM, email, CRM, @@ -147,12 +210,127 @@ def build_go_live_readiness_report() -> Dict[str, Any]: @router.post("/flows/prospecting") async def run_prospecting_flow(payload: DealPayload) -> Dict[str, Any]: - return await prospecting_durable_flow.run(payload.tenant_id, payload.deal) + _register_task_router() + if not settings.OPENCLAW_SAFE_CORE_ENABLED: + return await prospecting_durable_flow.run(payload.tenant_id, payload.deal) + if not _canary_enabled(payload.tenant_id): + return {"status": "skipped", "reason": "tenant_not_in_canary", "tenant_id": payload.tenant_id} + result = await openclaw_gateway.execute( + tenant_id=payload.tenant_id, + task_type="prospecting_flow", + action="send_whatsapp", + payload=payload.deal, + model_provider="openclaw-router", + cache_hint="prospecting-cache", + ) + return result @router.post("/flows/self-improvement") async def run_self_improvement_flow(payload: DealPayload) -> Dict[str, Any]: - return self_improvement_flow.run(payload.tenant_id, payload.deal) + _register_task_router() + if not settings.OPENCLAW_SAFE_CORE_ENABLED: + return self_improvement_flow.run(payload.tenant_id, payload.deal) + result = await openclaw_gateway.execute( + tenant_id=payload.tenant_id, + task_type="self_improvement_flow", + action="collect_signals", + payload=payload.deal, + model_provider="openclaw-router", + cache_hint="self-improve-cache", + ) + return result + + +@router.get("/openclaw/health") +async def openclaw_health() -> Dict[str, Any]: + return { + "safe_core_enabled": settings.OPENCLAW_SAFE_CORE_ENABLED, + "media_drafts_enabled": settings.OPENCLAW_MEDIA_DRAFTS_ENABLED, + "memory_enabled": settings.OPENCLAW_MEMORY_ENABLED, + "canary_tenants": [x.strip() for x in (settings.OPENCLAW_CANARY_TENANTS or "").split(",") if x.strip()], + "registered_task_types": ["prospecting_flow", "self_improvement_flow"], + } + + +@router.get("/openclaw/runs") +async def list_openclaw_runs( + tenant_id: Optional[str] = None, + limit: int = 50, + user: Optional[User] = Depends(get_optional_user), +) -> Dict[str, Any]: + tid = _resolve_tenant(user, tenant_id or "default_tenant") + return {"items": observability_bridge.list_runs(tenant_id=tid, limit=limit)} + + +@router.post("/openclaw/policy/check") +async def policy_check(body: PolicyCheckRequest) -> Dict[str, Any]: + from app.openclaw.approval_bridge import approval_bridge + + gate = approval_bridge.evaluate(action=body.action, payload=body.payload, tenant_id=body.tenant_id) + return {"gate": gate, "classification": classify_action(body.action).as_dict()} + + +@router.post("/openclaw/memory/promote") +async def memory_collect_promote(body: MemoryCollectRequest, user: Optional[User] = Depends(get_optional_user)) -> Dict[str, Any]: + if not settings.OPENCLAW_MEMORY_ENABLED: + raise HTTPException(status_code=403, detail="OpenClaw memory bridge is disabled") + tid = _resolve_tenant(user, body.tenant_id) + item = memory_bridge.collect(tenant_id=tid, domain=body.domain, content=body.content, evidence=body.evidence) + scored = memory_bridge.score( + item["memory_id"], + signal_count=body.signal_count, + repetition_count=body.repetition_count, + impact_score=body.impact_score, + ) + promoted = memory_bridge.promote(item["memory_id"], threshold=body.threshold) + return {"collected": item, "scored": scored, "promoted": promoted} + + +@router.get("/openclaw/memory") +async def list_memory( + tenant_id: Optional[str] = None, + promoted_only: bool = False, + domain: Optional[str] = None, + limit: int = 100, + user: Optional[User] = Depends(get_optional_user), +) -> Dict[str, Any]: + if not settings.OPENCLAW_MEMORY_ENABLED: + raise HTTPException(status_code=403, detail="OpenClaw memory bridge is disabled") + tid = _resolve_tenant(user, tenant_id or "default_tenant") + return {"items": memory_bridge.list_items(tenant_id=tid, promoted_only=promoted_only, domain=domain, limit=limit)} + + +@router.post("/openclaw/media/drafts") +async def create_media_draft(body: MediaDraftRequest, user: Optional[User] = Depends(get_optional_user)) -> Dict[str, Any]: + if not settings.OPENCLAW_MEDIA_DRAFTS_ENABLED: + raise HTTPException(status_code=403, detail="OpenClaw media draft bridge is disabled") + tid = _resolve_tenant(user, body.tenant_id) + # Draft-only in phase-1: always require approval at policy layer for video/music. + gate = classify_action(f"{body.media_type}_generate").as_dict() + if not gate["requires_approval"]: + raise HTTPException(status_code=400, detail="Invalid media policy state") + row = media_bridge.create_draft( + tenant_id=tid, + media_type=body.media_type, + prompt=body.prompt, + provider_hint=body.provider_hint, + metadata=body.metadata, + ) + return {"draft": row, "policy": gate} + + +@router.get("/openclaw/media/drafts") +async def list_media_drafts( + tenant_id: Optional[str] = None, + media_type: Optional[str] = None, + limit: int = 100, + user: Optional[User] = Depends(get_optional_user), +) -> Dict[str, Any]: + if not settings.OPENCLAW_MEDIA_DRAFTS_ENABLED: + raise HTTPException(status_code=403, detail="OpenClaw media draft bridge is disabled") + tid = _resolve_tenant(user, tenant_id or "default_tenant") + return {"items": media_bridge.list_drafts(tenant_id=tid, media_type=media_type, limit=limit)} @router.post("/intelligence/contract") diff --git a/salesflow-saas/backend/app/api/v1/operations.py b/salesflow-saas/backend/app/api/v1/operations.py index 403452b0..8d6caeaa 100644 --- a/salesflow-saas/backend/app/api/v1/operations.py +++ b/salesflow-saas/backend/app/api/v1/operations.py @@ -15,6 +15,7 @@ from app.database import get_db from app.api.deps import get_current_user, get_optional_user, require_role from app.models.user import User from app.models.operations import ApprovalRequest +from app.config import get_settings from app.services.audit_service import list_recent_audits from app.services.operations_hub import ( count_events_since, @@ -23,8 +24,80 @@ from app.services.operations_hub import ( list_integration_connectors, upsert_connector_status, ) +from app.openclaw.canary_context import get_canary_dashboard_context +from app.openclaw.observability_bridge import observability_bridge +from app.openclaw.memory_bridge import memory_bridge +from app.openclaw.media_bridge import media_bridge +from app.services.sla_escalation_alerts import ( + maybe_dispatch_sla_breach_alerts, + refresh_pending_escalations, +) router = APIRouter(prefix="/operations", tags=["Full Auto Operations"]) +settings = get_settings() + + +def _hours_between(now: datetime, then: Optional[datetime]) -> float: + if not then: + return 0.0 + return max(0.0, (now - then).total_seconds() / 3600.0) + + +async def _approval_sla_metrics(db: AsyncSession, tenant_id) -> Dict[str, Any]: + now = datetime.now(timezone.utc) + warn_h = max(1, int(settings.OPENCLAW_APPROVAL_SLA_HOURS_WARN)) + breach_h = max(warn_h, int(settings.OPENCLAW_APPROVAL_SLA_HOURS_BREACH)) + + q_pending = await db.execute( + select(ApprovalRequest).where( + ApprovalRequest.tenant_id == tenant_id, + ApprovalRequest.status == "pending", + ) + ) + pending_rows = q_pending.scalars().all() + pending_warn = 0 + pending_breach = 0 + for row in pending_rows: + h = _hours_between(now, row.created_at) + if h >= warn_h: + pending_warn += 1 + if h >= breach_h: + pending_breach += 1 + + q_resolved = await db.execute( + select(ApprovalRequest).where( + ApprovalRequest.tenant_id == tenant_id, + ApprovalRequest.status.in_(["approved", "rejected"]), + ApprovalRequest.reviewed_at.is_not(None), + ) + ) + resolved_rows = q_resolved.scalars().all() + resolution_hours = [] + for row in resolved_rows: + if row.created_at and row.reviewed_at: + resolution_hours.append(max(0.0, (row.reviewed_at - row.created_at).total_seconds() / 3600.0)) + avg_hours = (sum(resolution_hours) / len(resolution_hours)) if resolution_hours else 0.0 + sla_health = "ok" + if pending_breach > 0: + sla_health = "breach" + elif pending_warn > 0: + sla_health = "warn" + return { + "pending_total": len(pending_rows), + "pending_warn_count": pending_warn, + "pending_breach_count": pending_breach, + "resolved_count": len(resolved_rows), + "avg_resolution_hours": round(avg_hours, 2), + "warn_threshold_hours": warn_h, + "breach_threshold_hours": breach_h, + "health": sla_health, + "alerts_config": { + "enabled": bool(settings.OPENCLAW_SLA_ALERTS_ENABLED), + "webhook_configured": bool((settings.OPENCLAW_SLA_WEBHOOK_URL or "").strip()), + "slack_configured": bool((settings.OPENCLAW_SLA_SLACK_WEBHOOK_URL or "").strip()), + "cooldown_minutes": int(settings.OPENCLAW_SLA_ALERT_COOLDOWN_MINUTES), + }, + } def _demo_snapshot() -> Dict[str, Any]: @@ -39,6 +112,31 @@ def _demo_snapshot() -> Dict[str, Any]: {"connector_key": "stripe_billing", "display_name_ar": "Stripe — الفوترة", "status": "unknown", "last_success_at": None, "last_attempt_at": None, "last_error": None}, {"connector_key": "email_sync", "display_name_ar": "مزامنة البريد", "status": "unknown", "last_success_at": None, "last_attempt_at": None, "last_error": None}, ], + "openclaw": { + "recent_runs": [], + "promoted_memories": 0, + "media_drafts_pending": 0, + "canary": get_canary_dashboard_context("00000000-0000-0000-0000-000000000000"), + "approval_sla": { + "pending_total": 0, + "pending_warn_count": 0, + "pending_breach_count": 0, + "resolved_count": 0, + "avg_resolution_hours": 0.0, + "warn_threshold_hours": int(settings.OPENCLAW_APPROVAL_SLA_HOURS_WARN), + "breach_threshold_hours": int(settings.OPENCLAW_APPROVAL_SLA_HOURS_BREACH), + "health": "ok", + "escalation_by_level": {"0": 0, "1": 0, "2": 0, "3": 0}, + "escalation_events_last_refresh": 0, + "alert_dispatch": {"skipped_reason": "demo_mode"}, + "alerts_config": { + "enabled": bool(settings.OPENCLAW_SLA_ALERTS_ENABLED), + "webhook_configured": bool((settings.OPENCLAW_SLA_WEBHOOK_URL or "").strip()), + "slack_configured": bool((settings.OPENCLAW_SLA_SLACK_WEBHOOK_URL or "").strip()), + "cooldown_minutes": int(settings.OPENCLAW_SLA_ALERT_COOLDOWN_MINUTES), + }, + }, + }, "note_ar": "وضع توضيحي — سجّل الدخول لرؤية بيانات المستأجر.", } @@ -57,12 +155,33 @@ async def operations_snapshot( ev = await count_events_since(db, user.tenant_id, 24) aud = await count_audits_since(db, user.tenant_id, 24) connectors = await list_integration_connectors(db, user.tenant_id) + tenant_id_str = str(user.tenant_id) + esc = await refresh_pending_escalations(db, user.tenant_id) + recent_runs = observability_bridge.list_runs(tenant_id=tenant_id_str, limit=5) + promoted_memories = len(memory_bridge.list_items(tenant_id=tenant_id_str, promoted_only=True, limit=500)) + media_drafts_pending = len(media_bridge.list_drafts(tenant_id=tenant_id_str, limit=500)) + approval_sla = await _approval_sla_metrics(db, user.tenant_id) + approval_sla["escalation_by_level"] = esc.get("by_level", {}) + approval_sla["escalation_events_last_refresh"] = int(esc.get("events_emitted") or 0) + approval_sla["alert_dispatch"] = await maybe_dispatch_sla_breach_alerts( + db, + user.tenant_id, + tenant_id_str=tenant_id_str, + metrics=approval_sla, + ) return { "demo_mode": False, "pending_approvals": pending, "domain_events_24h": ev, "audit_events_24h": aud, "connectors": connectors, + "openclaw": { + "recent_runs": recent_runs, + "promoted_memories": promoted_memories, + "media_drafts_pending": media_drafts_pending, + "canary": get_canary_dashboard_context(tenant_id_str), + "approval_sla": approval_sla, + }, "note_ar": "حلقة التشغيل: أحداث مسجّلة + تدقيق + موصلات — تُوسَّع مع المزامنة الفعلية.", } @@ -160,6 +279,8 @@ async def list_approvals( result = await db.execute(q) items = [] for a in result.scalars().all(): + pl = a.payload if isinstance(a.payload, dict) else {} + sla_meta = pl.get("_dealix_sla") if isinstance(pl.get("_dealix_sla"), dict) else None items.append( { "id": str(a.id), @@ -168,13 +289,22 @@ async def list_approvals( "resource_id": str(a.resource_id), "status": a.status, "requested_by_id": str(a.requested_by_id), - "payload": a.payload, + "payload": pl, + "sla_escalation": sla_meta, "created_at": a.created_at.isoformat() if a.created_at else None, } ) return {"items": items, "count": len(items)} +@router.get("/approvals/sla") +async def approvals_sla( + db: AsyncSession = Depends(get_db), + user: User = Depends(require_role("owner", "admin", "manager")), +): + return await _approval_sla_metrics(db, user.tenant_id) + + @router.put("/approvals/{approval_id}") async def resolve_approval( approval_id: UUID, diff --git a/salesflow-saas/backend/app/config.py b/salesflow-saas/backend/app/config.py index 49acc41e..244285ea 100644 --- a/salesflow-saas/backend/app/config.py +++ b/salesflow-saas/backend/app/config.py @@ -124,6 +124,20 @@ class Settings(BaseSettings): # ── Autonomous Loops ──────────────────────────────────── SELF_IMPROVEMENT_INTERVAL_SECONDS: int = 900 + OPENCLAW_SAFE_CORE_ENABLED: bool = True + OPENCLAW_MEDIA_DRAFTS_ENABLED: bool = True + OPENCLAW_MEMORY_ENABLED: bool = True + OPENCLAW_CANARY_TENANTS: str = "" + OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS: bool = True + OPENCLAW_APPROVAL_SLA_HOURS_WARN: int = 4 + OPENCLAW_APPROVAL_SLA_HOURS_BREACH: int = 24 + # Escalation level 3 when age >= breach * multiplier (must be > 1) + OPENCLAW_APPROVAL_ESCALATION_L3_MULTIPLIER: float = 2.0 + # Breach notifications (empty URLs = no outbound calls) + OPENCLAW_SLA_ALERTS_ENABLED: bool = True + OPENCLAW_SLA_WEBHOOK_URL: str = "" + OPENCLAW_SLA_SLACK_WEBHOOK_URL: str = "" + OPENCLAW_SLA_ALERT_COOLDOWN_MINUTES: int = 45 # ── Scraping / Lead Gen ────────────────────────────── GOOGLE_MAPS_API_KEY: str = "" diff --git a/salesflow-saas/backend/app/openclaw/__init__.py b/salesflow-saas/backend/app/openclaw/__init__.py index a58f43bb..73e8ca86 100644 --- a/salesflow-saas/backend/app/openclaw/__init__.py +++ b/salesflow-saas/backend/app/openclaw/__init__.py @@ -1,2 +1,20 @@ """OpenClaw-compatible orchestration utilities.""" +from app.openclaw.approval_bridge import approval_bridge +from app.openclaw.canary_context import get_canary_dashboard_context +from app.openclaw.gateway import openclaw_gateway +from app.openclaw.media_bridge import media_bridge +from app.openclaw.memory_bridge import memory_bridge +from app.openclaw.observability_bridge import observability_bridge +from app.openclaw.task_router import task_router + +__all__ = [ + "approval_bridge", + "get_canary_dashboard_context", + "openclaw_gateway", + "media_bridge", + "memory_bridge", + "observability_bridge", + "task_router", +] + diff --git a/salesflow-saas/backend/app/openclaw/approval_bridge.py b/salesflow-saas/backend/app/openclaw/approval_bridge.py new file mode 100644 index 00000000..55c281f3 --- /dev/null +++ b/salesflow-saas/backend/app/openclaw/approval_bridge.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +from typing import Any, Dict + +from app.config import get_settings +from app.openclaw.policy import PolicyDecision, classify_action + + +class OpenClawApprovalBridge: + """Central policy+approval gate for OpenClaw runtime actions.""" + + def evaluate(self, *, action: str, payload: Dict[str, Any], tenant_id: str) -> Dict[str, Any]: + if not tenant_id: + return { + "allowed": False, + "requires_approval": False, + "reason": "missing_tenant_id", + "policy": {"action": action, "class": "C"}, + } + + decision: PolicyDecision = classify_action(action) + if not decision.allowed: + return { + "allowed": False, + "requires_approval": False, + "reason": decision.reason, + "policy": decision.as_dict(), + } + + if payload.get("cross_tenant_context"): + return { + "allowed": False, + "requires_approval": False, + "reason": "cross_tenant_context_blocked", + "policy": decision.as_dict(), + } + + settings = get_settings() + canary = [x.strip() for x in (settings.OPENCLAW_CANARY_TENANTS or "").split(",") if x.strip()] + canary_restrict_auto = bool(settings.OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS) + is_auto_action = decision.action_class == "A" + in_canary = not canary or tenant_id in canary + if canary_restrict_auto and is_auto_action and not in_canary and not payload.get("approval_token"): + return { + "allowed": False, + "requires_approval": True, + "reason": "approval_required:auto_action_outside_canary", + "policy": decision.as_dict(), + "canary": {"enforced": True, "tenant_in_canary": False}, + } + + if decision.requires_approval and not payload.get("approval_token"): + return { + "allowed": False, + "requires_approval": True, + "reason": f"approval_required:{action}", + "policy": decision.as_dict(), + } + + return { + "allowed": True, + "requires_approval": decision.requires_approval or (canary_restrict_auto and is_auto_action and not in_canary), + "reason": "ok", + "policy": decision.as_dict(), + "canary": {"enforced": canary_restrict_auto, "tenant_in_canary": in_canary}, + } + + +approval_bridge = OpenClawApprovalBridge() diff --git a/salesflow-saas/backend/app/openclaw/canary_context.py b/salesflow-saas/backend/app/openclaw/canary_context.py new file mode 100644 index 00000000..d1fa4255 --- /dev/null +++ b/salesflow-saas/backend/app/openclaw/canary_context.py @@ -0,0 +1,31 @@ +"""Dashboard-facing OpenClaw canary policy context (per-tenant, read-only).""" + +from __future__ import annotations + +from typing import Any, Dict, List + +from app.config import get_settings + + +def get_canary_dashboard_context(tenant_id: str) -> Dict[str, Any]: + """Summarize canary list and whether this tenant may run Class-A auto actions without extra approval.""" + tid = (tenant_id or "").strip() + s = get_settings() + raw = (s.OPENCLAW_CANARY_TENANTS or "").strip() + canary_list: List[str] = [x.strip() for x in raw.split(",") if x.strip()] + enforced = bool(s.OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS) + # Empty list = all tenants treated as canary for auto (no extra gate). + in_canary = not canary_list or tid in canary_list + auto_class_a_requires_extra = enforced and bool(canary_list) and not in_canary + return { + "enforced": enforced, + "tenant_in_canary": in_canary, + "canary_tenant_ids": canary_list, + "canary_count": len(canary_list), + "auto_class_a_requires_extra_approval": auto_class_a_requires_extra, + "hint_ar": ( + "هذا المستأجر ضمن كناري التشغيل التلقائي — الإجراءات الآمنة (Class A) تمر بدون موافقة إضافية." + if in_canary or not canary_list + else "خارج قائمة الكناري — الإجراءات التلقائية الآمنة تتطلب موافقة أو رمز موافقة حتى مع سياسة الكناري." + ), + } diff --git a/salesflow-saas/backend/app/openclaw/gateway.py b/salesflow-saas/backend/app/openclaw/gateway.py new file mode 100644 index 00000000..6349bb5e --- /dev/null +++ b/salesflow-saas/backend/app/openclaw/gateway.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from typing import Any, Dict + +from app.openclaw.approval_bridge import approval_bridge +from app.openclaw.observability_bridge import observability_bridge +from app.openclaw.task_router import task_router + + +class OpenClawGateway: + """Single ingress for OpenClaw tasks: policy -> progress -> execute.""" + + async def execute( + self, + *, + tenant_id: str, + task_type: str, + action: str, + payload: Dict[str, Any], + model_provider: str = "auto", + cache_hint: str = "prompt-cache-reuse", + ) -> Dict[str, Any]: + gate = approval_bridge.evaluate(action=action, payload=payload, tenant_id=tenant_id) + run_id = observability_bridge.start_run( + tenant_id=tenant_id, + task_type=task_type, + model_provider=model_provider, + cache_hint=cache_hint, + approval_required=bool(gate.get("requires_approval")), + ) + observability_bridge.step(run_id, "policy_gate", "ok" if gate["allowed"] else "blocked", {"gate": gate}) + if not gate["allowed"]: + observability_bridge.finish(run_id, status="blocked", error=gate["reason"]) + return {"run_id": run_id, "status": "blocked", "gate": gate} + + try: + observability_bridge.step(run_id, "routing", "ok", {"task_type": task_type}) + result = await task_router.route(task_type, tenant_id, payload) + observability_bridge.step(run_id, "execution", "ok") + observability_bridge.finish(run_id, status="completed") + return {"run_id": run_id, "status": "completed", "gate": gate, "result": result} + except Exception as e: + observability_bridge.step(run_id, "execution", "error", {"error": str(e)}) + observability_bridge.finish(run_id, status="failed", error=str(e)) + return {"run_id": run_id, "status": "failed", "gate": gate, "error": str(e)} + + +openclaw_gateway = OpenClawGateway() diff --git a/salesflow-saas/backend/app/openclaw/hooks.py b/salesflow-saas/backend/app/openclaw/hooks.py index 5a0cdd91..1b0e9545 100644 --- a/salesflow-saas/backend/app/openclaw/hooks.py +++ b/salesflow-saas/backend/app/openclaw/hooks.py @@ -2,6 +2,8 @@ from __future__ import annotations from typing import Any, Dict +from app.openclaw.approval_bridge import approval_bridge + SENSITIVE_ACTIONS = { "send_whatsapp", @@ -20,13 +22,6 @@ def before_agent_reply(action: str, payload: Dict[str, Any], tenant_id: str) -> OpenClaw-style governance hook. Blocks sensitive actions when tenant isolation or approvals are missing. """ - if not tenant_id: - return {"allowed": False, "reason": "missing_tenant_id"} - - if action in SENSITIVE_ACTIONS: - if not payload.get("approval_token"): - return {"allowed": False, "reason": f"approval_required:{action}"} - if payload.get("cross_tenant_context"): - return {"allowed": False, "reason": "cross_tenant_context_blocked"} - - return {"allowed": True, "reason": "ok"} + gate = approval_bridge.evaluate(action=action, payload=payload, tenant_id=tenant_id) + # Keep old response contract for compatibility with existing tests/callers. + return {"allowed": gate["allowed"], "reason": gate["reason"]} diff --git a/salesflow-saas/backend/app/openclaw/media_bridge.py b/salesflow-saas/backend/app/openclaw/media_bridge.py new file mode 100644 index 00000000..975f3ba0 --- /dev/null +++ b/salesflow-saas/backend/app/openclaw/media_bridge.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Dict, List +import uuid + + +@dataclass +class MediaDraft: + draft_id: str + tenant_id: str + media_type: str # video | music + prompt: str + status: str = "draft_pending_approval" + provider_hint: str | None = None + metadata: Dict[str, Any] = field(default_factory=dict) + created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + + def as_dict(self) -> Dict[str, Any]: + return { + "draft_id": self.draft_id, + "tenant_id": self.tenant_id, + "media_type": self.media_type, + "prompt": self.prompt, + "status": self.status, + "provider_hint": self.provider_hint, + "metadata": self.metadata, + "created_at": self.created_at, + } + + +class OpenClawMediaBridge: + """Draft-only media generation bridge for phase-1 safety.""" + + def __init__(self) -> None: + self._drafts: Dict[str, MediaDraft] = {} + + def create_draft( + self, + *, + tenant_id: str, + media_type: str, + prompt: str, + provider_hint: str | None = None, + metadata: Dict[str, Any] | None = None, + ) -> Dict[str, Any]: + mtype = media_type.strip().lower() + if mtype not in {"video", "music"}: + raise ValueError("media_type must be 'video' or 'music'") + row = MediaDraft( + draft_id=str(uuid.uuid4()), + tenant_id=tenant_id, + media_type=mtype, + prompt=prompt.strip(), + provider_hint=provider_hint, + metadata=metadata or {}, + ) + self._drafts[row.draft_id] = row + return row.as_dict() + + def list_drafts(self, *, tenant_id: str, media_type: str | None = None, limit: int = 100) -> List[Dict[str, Any]]: + rows = [r for r in self._drafts.values() if r.tenant_id == tenant_id] + if media_type: + rows = [r for r in rows if r.media_type == media_type.strip().lower()] + rows.sort(key=lambda x: x.created_at, reverse=True) + return [r.as_dict() for r in rows[: max(1, min(300, limit))]] + + +media_bridge = OpenClawMediaBridge() diff --git a/salesflow-saas/backend/app/openclaw/memory_bridge.py b/salesflow-saas/backend/app/openclaw/memory_bridge.py new file mode 100644 index 00000000..ec6f9257 --- /dev/null +++ b/salesflow-saas/backend/app/openclaw/memory_bridge.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Dict, List +import uuid + + +@dataclass +class MemoryItem: + memory_id: str + tenant_id: str + domain: str + content: str + evidence: Dict[str, Any] + score: float + promoted: bool + created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + + def as_dict(self) -> Dict[str, Any]: + return { + "memory_id": self.memory_id, + "tenant_id": self.tenant_id, + "domain": self.domain, + "content": self.content, + "evidence": self.evidence, + "score": self.score, + "promoted": self.promoted, + "created_at": self.created_at, + } + + +class OpenClawMemoryBridge: + """Phase-1 memory promotion pipeline: collect -> score -> promote.""" + + def __init__(self) -> None: + self._items: Dict[str, MemoryItem] = {} + + def collect(self, *, tenant_id: str, domain: str, content: str, evidence: Dict[str, Any] | None = None) -> Dict[str, Any]: + item = MemoryItem( + memory_id=str(uuid.uuid4()), + tenant_id=tenant_id, + domain=domain or "operational", + content=content.strip(), + evidence=evidence or {}, + score=0.0, + promoted=False, + ) + self._items[item.memory_id] = item + return item.as_dict() + + def score(self, memory_id: str, signal_count: int = 0, repetition_count: int = 0, impact_score: float = 0.0) -> Dict[str, Any]: + item = self._items[memory_id] + # lightweight deterministic scoring for phase-1 + value = min(100.0, float(signal_count) * 8.0 + float(repetition_count) * 12.0 + float(impact_score)) + item.score = round(value, 2) + return item.as_dict() + + def promote(self, memory_id: str, threshold: float = 60.0) -> Dict[str, Any]: + item = self._items[memory_id] + item.promoted = item.score >= threshold + return item.as_dict() + + def list_items( + self, + *, + tenant_id: str, + promoted_only: bool = False, + domain: str | None = None, + limit: int = 100, + ) -> List[Dict[str, Any]]: + rows = [r for r in self._items.values() if r.tenant_id == tenant_id] + if promoted_only: + rows = [r for r in rows if r.promoted] + if domain: + rows = [r for r in rows if r.domain == domain] + rows.sort(key=lambda x: x.created_at, reverse=True) + return [r.as_dict() for r in rows[: max(1, min(300, limit))]] + + +memory_bridge = OpenClawMemoryBridge() diff --git a/salesflow-saas/backend/app/openclaw/observability_bridge.py b/salesflow-saas/backend/app/openclaw/observability_bridge.py new file mode 100644 index 00000000..9de2da8a --- /dev/null +++ b/salesflow-saas/backend/app/openclaw/observability_bridge.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Dict, List +import uuid + + +@dataclass +class OpenClawRun: + run_id: str + tenant_id: str + task_type: str + status: str = "running" + started_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + ended_at: str | None = None + model_provider: str | None = None + cache_hint: str | None = None + approval_required: bool = False + steps: List[Dict[str, Any]] = field(default_factory=list) + error: str | None = None + + def as_dict(self) -> Dict[str, Any]: + return { + "run_id": self.run_id, + "tenant_id": self.tenant_id, + "task_type": self.task_type, + "status": self.status, + "started_at": self.started_at, + "ended_at": self.ended_at, + "model_provider": self.model_provider, + "cache_hint": self.cache_hint, + "approval_required": self.approval_required, + "steps": self.steps, + "error": self.error, + } + + +class OpenClawObservabilityBridge: + """In-process run telemetry for phase-1 safe core.""" + + def __init__(self) -> None: + self._runs: Dict[str, OpenClawRun] = {} + + def start_run( + self, + *, + tenant_id: str, + task_type: str, + model_provider: str | None = None, + cache_hint: str | None = None, + approval_required: bool = False, + ) -> str: + run_id = str(uuid.uuid4()) + self._runs[run_id] = OpenClawRun( + run_id=run_id, + tenant_id=tenant_id, + task_type=task_type, + model_provider=model_provider, + cache_hint=cache_hint, + approval_required=approval_required, + ) + return run_id + + def step(self, run_id: str, stage: str, status: str = "ok", details: Dict[str, Any] | None = None) -> None: + run = self._runs.get(run_id) + if not run: + return + run.steps.append( + { + "at": datetime.now(timezone.utc).isoformat(), + "stage": stage, + "status": status, + "details": details or {}, + } + ) + + def finish(self, run_id: str, *, status: str = "completed", error: str | None = None) -> None: + run = self._runs.get(run_id) + if not run: + return + run.status = status + run.error = error + run.ended_at = datetime.now(timezone.utc).isoformat() + + def list_runs(self, *, tenant_id: str | None = None, limit: int = 50) -> List[Dict[str, Any]]: + rows = list(self._runs.values()) + if tenant_id: + rows = [r for r in rows if r.tenant_id == tenant_id] + rows.sort(key=lambda r: r.started_at, reverse=True) + return [r.as_dict() for r in rows[: max(1, min(200, limit))]] + + +observability_bridge = OpenClawObservabilityBridge() diff --git a/salesflow-saas/backend/app/openclaw/policy.py b/salesflow-saas/backend/app/openclaw/policy.py new file mode 100644 index 00000000..819b4685 --- /dev/null +++ b/salesflow-saas/backend/app/openclaw/policy.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict + + +SAFE_AUTO_ACTIONS = { + "read_status", + "collect_signals", + "summarize", + "classify", + "tag", + "internal_status_update", + "research", + "generate_draft", + "plan", + "predictive_analysis", +} + +APPROVAL_GATED_ACTIONS = { + "send_whatsapp", + "send_email", + "send_linkedin", + "trigger_voice_call", + "sync_salesforce", + "create_charge", + "publish_content", + "change_billing_state", + "modify_lead_routing", + "send_contract_for_signature", + "video_generate", + "music_generate", +} + +FORBIDDEN_ACTIONS = { + "exfiltrate_secrets", + "delete_data_without_audit", + "bypass_auth", + "publish_without_approval", + "destructive_unchecked", +} + + +@dataclass +class PolicyDecision: + action: str + action_class: str # A, B, C + allowed: bool + requires_approval: bool + reason: str + + def as_dict(self) -> Dict[str, Any]: + return { + "action": self.action, + "class": self.action_class, + "allowed": self.allowed, + "requires_approval": self.requires_approval, + "reason": self.reason, + } + + +def classify_action(action: str) -> PolicyDecision: + act = (action or "").strip() + if not act: + return PolicyDecision(action=act, action_class="C", allowed=False, requires_approval=False, reason="empty_action") + if act in FORBIDDEN_ACTIONS: + return PolicyDecision(action=act, action_class="C", allowed=False, requires_approval=False, reason="forbidden_action") + if act in APPROVAL_GATED_ACTIONS: + return PolicyDecision(action=act, action_class="B", allowed=True, requires_approval=True, reason="approval_required") + if act in SAFE_AUTO_ACTIONS: + return PolicyDecision(action=act, action_class="A", allowed=True, requires_approval=False, reason="safe_auto") + # default to approval-gated for unknown actions + return PolicyDecision(action=act, action_class="B", allowed=True, requires_approval=True, reason="unknown_action_requires_approval") diff --git a/salesflow-saas/backend/app/openclaw/task_router.py b/salesflow-saas/backend/app/openclaw/task_router.py new file mode 100644 index 00000000..0752dd24 --- /dev/null +++ b/salesflow-saas/backend/app/openclaw/task_router.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from typing import Any, Awaitable, Callable, Dict + + +TaskHandler = Callable[[str, Dict[str, Any]], Awaitable[Dict[str, Any]]] + + +class OpenClawTaskRouter: + """Routes task types to async handlers with safe defaults.""" + + def __init__(self) -> None: + self._handlers: Dict[str, TaskHandler] = {} + + def register(self, task_type: str, handler: TaskHandler) -> None: + self._handlers[task_type] = handler + + async def route(self, task_type: str, tenant_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: + h = self._handlers.get(task_type) + if not h: + raise ValueError(f"unsupported_task_type:{task_type}") + return await h(tenant_id, payload) + + +task_router = OpenClawTaskRouter() diff --git a/salesflow-saas/backend/app/services/sla_escalation_alerts.py b/salesflow-saas/backend/app/services/sla_escalation_alerts.py new file mode 100644 index 00000000..ba7489f7 --- /dev/null +++ b/salesflow-saas/backend/app/services/sla_escalation_alerts.py @@ -0,0 +1,255 @@ +""" +Approval SLA: auto-escalation metadata on pending rows + breach alerts (webhook / Slack). + +Persists escalation state under ApprovalRequest.payload["_dealix_sla"]. +Aggregated breach notifications use a per-tenant cooldown to avoid spam. +""" + +from __future__ import annotations + +import logging +from collections import Counter +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional +from uuid import UUID + +import httpx +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm.attributes import flag_modified + +from app.config import get_settings +from app.models.operations import ApprovalRequest +from app.services.operations_hub import emit_domain_event + +logger = logging.getLogger(__name__) + +SLA_KEY = "_dealix_sla" + +# tenant_id -> last aggregate breach alert (UTC) +_last_aggregate_breach_alert: Dict[str, datetime] = {} + + +def _hours_between(now: datetime, then: Optional[datetime]) -> float: + if not then: + return 0.0 + return max(0.0, (now - then).total_seconds() / 3600.0) + + +def _escalation_level(age_h: float, warn_h: int, breach_h: int, l3_mult: float) -> int: + if age_h < warn_h: + return 0 + if age_h < breach_h: + return 1 + breach_m = max(float(breach_h), float(warn_h)) + if age_h < breach_m * max(l3_mult, 1.01): + return 2 + return 3 + + +def _level_label_ar(level: int) -> str: + return { + 0: "ضمن المهلة", + 1: "تحذير — يقترب من تجاوز SLA", + 2: "تجاوز SLA — يتطلب اهتماماً فورياً", + 3: "تصعيد حرج — تدخل المالك/الإدارة", + }.get(level, "غير معروف") + + +async def refresh_pending_escalations(db: AsyncSession, tenant_id: UUID) -> Dict[str, Any]: + """ + Update _dealix_sla on each pending approval; emit domain events when level increases. + """ + s = get_settings() + now = datetime.now(timezone.utc) + warn_h = max(1, int(s.OPENCLAW_APPROVAL_SLA_HOURS_WARN)) + breach_h = max(warn_h, int(s.OPENCLAW_APPROVAL_SLA_HOURS_BREACH)) + l3_mult = max(1.01, float(s.OPENCLAW_APPROVAL_ESCALATION_L3_MULTIPLIER)) + + q = await db.execute( + select(ApprovalRequest).where( + ApprovalRequest.tenant_id == tenant_id, + ApprovalRequest.status == "pending", + ) + ) + rows: List[ApprovalRequest] = list(q.scalars().all()) + counts: Counter[int] = Counter() + bumped = 0 + + for row in rows: + age_h = _hours_between(now, row.created_at) + level = _escalation_level(age_h, warn_h, breach_h, l3_mult) + counts[level] += 1 + + base = dict(row.payload) if isinstance(row.payload, dict) else {} + prev = base.get(SLA_KEY) if isinstance(base.get(SLA_KEY), dict) else {} + prev_level = int(prev.get("escalation_level", 0) or 0) + + sla_block = { + **prev, + "escalation_level": level, + "escalation_label_ar": _level_label_ar(level), + "age_hours": round(age_h, 2), + "warn_threshold_hours": warn_h, + "breach_threshold_hours": breach_h, + "updated_at": now.isoformat(), + } + + if level != prev_level: + sla_block["escalation_changed_at"] = now.isoformat() + + base[SLA_KEY] = sla_block + row.payload = base + flag_modified(row, "payload") + + if level > prev_level: + bumped += 1 + await emit_domain_event( + db, + tenant_id=tenant_id, + event_type="approval.sla_escalated", + payload={ + "approval_id": str(row.id), + "from_level": prev_level, + "to_level": level, + "age_hours": round(age_h, 2), + }, + source="sla_escalation", + ) + + by_level = {str(k): int(counts.get(k, 0)) for k in range(4)} + return { + "pending_escalation_total": len(rows), + "by_level": by_level, + "events_emitted": bumped, + } + + +async def maybe_dispatch_sla_breach_alerts( + db: AsyncSession, + tenant_id: UUID, + *, + tenant_id_str: str, + metrics: Dict[str, Any], +) -> Dict[str, Any]: + """ + If breach count > 0 and alerts enabled, POST to webhook and/or Slack (respecting cooldown). + """ + s = get_settings() + out: Dict[str, Any] = { + "attempted": False, + "skipped_reason": None, + "webhook_ok": None, + "slack_ok": None, + "cooldown_minutes": int(s.OPENCLAW_SLA_ALERT_COOLDOWN_MINUTES), + } + + if not s.OPENCLAW_SLA_ALERTS_ENABLED: + out["skipped_reason"] = "alerts_disabled" + return out + + breach_n = int(metrics.get("pending_breach_count") or 0) + if breach_n <= 0: + out["skipped_reason"] = "no_breach" + return out + + webhook_url = (s.OPENCLAW_SLA_WEBHOOK_URL or "").strip() + slack_url = (s.OPENCLAW_SLA_SLACK_WEBHOOK_URL or "").strip() + if not webhook_url and not slack_url: + out["skipped_reason"] = "no_webhook_configured" + return out + + now = datetime.now(timezone.utc) + cool = timedelta(minutes=max(5, int(s.OPENCLAW_SLA_ALERT_COOLDOWN_MINUTES))) + last = _last_aggregate_breach_alert.get(tenant_id_str) + if last and (now - last) < cool: + out["skipped_reason"] = "cooldown" + out["next_eligible_at"] = (last + cool).isoformat() + return out + + payload = { + "event": "approval_sla.breach", + "tenant_id": tenant_id_str, + "pending_breach_count": breach_n, + "pending_warn_count": int(metrics.get("pending_warn_count") or 0), + "breach_threshold_hours": metrics.get("breach_threshold_hours"), + "warn_threshold_hours": metrics.get("warn_threshold_hours"), + "health": metrics.get("health"), + "timestamp": now.isoformat(), + "source": "dealix", + } + + out["attempted"] = True + timeout = httpx.Timeout(12.0) + + async with httpx.AsyncClient(timeout=timeout) as client: + if webhook_url: + try: + r = await client.post(webhook_url, json=payload) + out["webhook_ok"] = 200 <= r.status_code < 300 + if not out["webhook_ok"]: + out["webhook_status"] = r.status_code + except Exception as e: + logger.warning("SLA webhook failed: %s", e) + out["webhook_ok"] = False + out["webhook_error"] = str(e)[:200] + + if slack_url: + text = ( + f":rotating_light: *Approval SLA breach* — tenant `{tenant_id_str[:8]}…`\n" + f"*Pending breach:* {breach_n} (warn: {metrics.get('pending_warn_count', 0)})\n" + f"*Thresholds:* warn {metrics.get('warn_threshold_hours')}h / breach {metrics.get('breach_threshold_hours')}h\n" + f"*Time:* {now.isoformat()}" + ) + slack_body = {"text": text} + try: + r2 = await client.post(slack_url, json=slack_body) + out["slack_ok"] = 200 <= r2.status_code < 300 + if not out["slack_ok"]: + out["slack_status"] = r2.status_code + except Exception as e: + logger.warning("SLA Slack webhook failed: %s", e) + out["slack_ok"] = False + out["slack_error"] = str(e)[:200] + + delivered = bool(out.get("webhook_ok")) or bool(out.get("slack_ok")) + if not delivered: + out["skipped_reason"] = "delivery_failed" + out["attempted"] = True + return out + + _last_aggregate_breach_alert[tenant_id_str] = now + out["dispatched_at"] = now.isoformat() + + # Mark pending breach rows with last aggregate notify (audit in payload) + q = await db.execute( + select(ApprovalRequest).where( + ApprovalRequest.tenant_id == tenant_id, + ApprovalRequest.status == "pending", + ) + ) + breach_h = max(1, int(s.OPENCLAW_APPROVAL_SLA_HOURS_BREACH)) + for row in q.scalars().all(): + age_h = _hours_between(now, row.created_at) + if age_h < breach_h: + continue + base = dict(row.payload) if isinstance(row.payload, dict) else {} + sla = dict(base.get(SLA_KEY) or {}) + sla["last_aggregate_breach_alert_at"] = now.isoformat() + base[SLA_KEY] = sla + row.payload = base + flag_modified(row, "payload") + + await emit_domain_event( + db, + tenant_id=tenant_id, + event_type="approval.sla_breach_notified", + payload={ + "pending_breach_count": breach_n, + "webhook_ok": out.get("webhook_ok"), + "slack_ok": out.get("slack_ok"), + }, + source="sla_alerts", + ) + + return out diff --git a/salesflow-saas/backend/scripts/full_stack_launch_test.py b/salesflow-saas/backend/scripts/full_stack_launch_test.py index 44a817fc..0770df4f 100644 --- a/salesflow-saas/backend/scripts/full_stack_launch_test.py +++ b/salesflow-saas/backend/scripts/full_stack_launch_test.py @@ -188,6 +188,8 @@ async def main() -> int: results.append(await check("affiliates leaderboard", "GET", "/api/v1/affiliates/leaderboard/top")) results.append(await check("agents list", "GET", "/api/v1/agents/list")) results.append(await check("agents empire status", "GET", "/api/v1/agents/empire/status")) + results.append(await check("openclaw safe core health", "GET", "/api/v1/autonomous-foundation/openclaw/health")) + results.append(await check("openclaw runs telemetry", "GET", "/api/v1/autonomous-foundation/openclaw/runs")) results.append(await check("LangGraph orchestrator health", "GET", "/api/v1/agents/langgraph/health")) results.append( await check( diff --git a/salesflow-saas/backend/tests/test_autonomous_foundation_api.py b/salesflow-saas/backend/tests/test_autonomous_foundation_api.py index 1f198edf..98a7d14c 100644 --- a/salesflow-saas/backend/tests/test_autonomous_foundation_api.py +++ b/salesflow-saas/backend/tests/test_autonomous_foundation_api.py @@ -77,3 +77,49 @@ async def test_go_live_gate_returns_403_with_report_when_not_fully_ready(client) assert response.status_code == 200 assert payload["readiness_percent"] == 100.0 assert payload["missing_count"] == 0 + + +@pytest.mark.asyncio +async def test_openclaw_safe_core_endpoints(client): + h = await client.get("/api/v1/autonomous-foundation/openclaw/health") + assert h.status_code == 200 + hj = h.json() + assert "safe_core_enabled" in hj + assert "registered_task_types" in hj + + pol = await client.post( + "/api/v1/autonomous-foundation/openclaw/policy/check", + json={"tenant_id": "t_policy", "action": "send_whatsapp", "payload": {}}, + ) + assert pol.status_code == 200 + pj = pol.json() + assert pj["gate"]["requires_approval"] is True + + mem = await client.post( + "/api/v1/autonomous-foundation/openclaw/memory/promote", + json={ + "tenant_id": "t_mem", + "domain": "revenue", + "content": "Follow-up within 10 minutes improved close rate", + "signal_count": 3, + "repetition_count": 2, + "impact_score": 30, + }, + ) + assert mem.status_code == 200 + assert mem.json()["promoted"]["promoted"] is True + + mem_list = await client.get("/api/v1/autonomous-foundation/openclaw/memory?tenant_id=t_mem&promoted_only=true") + assert mem_list.status_code == 200 + assert len(mem_list.json()["items"]) >= 1 + + draft = await client.post( + "/api/v1/autonomous-foundation/openclaw/media/drafts", + json={"tenant_id": "t_media", "media_type": "video", "prompt": "Saudi launch ad teaser"}, + ) + assert draft.status_code == 200 + assert draft.json()["draft"]["status"] == "draft_pending_approval" + + runs = await client.get("/api/v1/autonomous-foundation/openclaw/runs?tenant_id=t_mem") + assert runs.status_code == 200 + assert "items" in runs.json() diff --git a/salesflow-saas/backend/tests/test_launch_readiness_scenarios.py b/salesflow-saas/backend/tests/test_launch_readiness_scenarios.py index 151347b9..3877504a 100644 --- a/salesflow-saas/backend/tests/test_launch_readiness_scenarios.py +++ b/salesflow-saas/backend/tests/test_launch_readiness_scenarios.py @@ -22,6 +22,7 @@ LAUNCH_GET_MATRIX = [ "/api/v1/operations/snapshot", "/api/v1/affiliates/program", "/api/v1/affiliates/leaderboard/top", + "/api/v1/autonomous-foundation/openclaw/health", ] diff --git a/salesflow-saas/backend/tests/test_new_subscriber_journey.py b/salesflow-saas/backend/tests/test_new_subscriber_journey.py index c9b25299..fe3ce98d 100644 --- a/salesflow-saas/backend/tests/test_new_subscriber_journey.py +++ b/salesflow-saas/backend/tests/test_new_subscriber_journey.py @@ -68,6 +68,15 @@ async def test_new_company_full_subscribe_login_dashboard_affiliate_surface(): assert prog.status_code == 200 assert "journey_ar" in prog.json() + sla = await ac.get( + "/api/v1/operations/approvals/sla", + headers={"Authorization": f"Bearer {token}"}, + ) + assert sla.status_code == 200 + sj = sla.json() + assert "pending_total" in sj + assert "health" in sj + @pytest.mark.launch @pytest.mark.asyncio diff --git a/salesflow-saas/backend/tests/test_openclaw_safe_core.py b/salesflow-saas/backend/tests/test_openclaw_safe_core.py new file mode 100644 index 00000000..6a6266d5 --- /dev/null +++ b/salesflow-saas/backend/tests/test_openclaw_safe_core.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +import pytest + +from app.config import get_settings +from app.openclaw.approval_bridge import approval_bridge +from app.openclaw.gateway import openclaw_gateway +from app.openclaw.memory_bridge import memory_bridge +from app.openclaw.media_bridge import media_bridge +from app.openclaw.policy import classify_action +from app.openclaw.task_router import task_router + + +def test_policy_classification_a_b_c(): + assert classify_action("collect_signals").action_class == "A" + assert classify_action("send_whatsapp").action_class == "B" + c = classify_action("exfiltrate_secrets") + assert c.action_class == "C" + assert c.allowed is False + + +def test_approval_bridge_requires_token_for_class_b(): + gate = approval_bridge.evaluate(action="send_email", payload={}, tenant_id="t1") + assert gate["allowed"] is False + assert gate["requires_approval"] is True + gate_ok = approval_bridge.evaluate( + action="send_email", + payload={"approval_token": "ok"}, + tenant_id="t1", + ) + assert gate_ok["allowed"] is True + + +def test_canary_enforces_auto_action_approval_outside_canary(): + settings = get_settings() + old_list = settings.OPENCLAW_CANARY_TENANTS + old_flag = settings.OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS + try: + settings.OPENCLAW_CANARY_TENANTS = "tenant_canary" + settings.OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS = True + # class A action but tenant خارج canary => requires approval + blocked = approval_bridge.evaluate(action="collect_signals", payload={}, tenant_id="tenant_other") + assert blocked["allowed"] is False + assert blocked["requires_approval"] is True + assert "outside_canary" in blocked["reason"] + allowed = approval_bridge.evaluate( + action="collect_signals", + payload={"approval_token": "mgr-ok"}, + tenant_id="tenant_other", + ) + assert allowed["allowed"] is True + finally: + settings.OPENCLAW_CANARY_TENANTS = old_list + settings.OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS = old_flag + + +def test_memory_collect_score_promote(): + item = memory_bridge.collect(tenant_id="tm", domain="revenue", content="subject line B converts higher") + mid = item["memory_id"] + scored = memory_bridge.score(mid, signal_count=3, repetition_count=2, impact_score=30) + assert scored["score"] > 0 + promoted = memory_bridge.promote(mid, threshold=40) + assert promoted["promoted"] is True + rows = memory_bridge.list_items(tenant_id="tm", promoted_only=True) + assert any(r["memory_id"] == mid for r in rows) + + +def test_media_draft_video_music(): + v = media_bridge.create_draft(tenant_id="tm2", media_type="video", prompt="launch teaser") + m = media_bridge.create_draft(tenant_id="tm2", media_type="music", prompt="upbeat ad track") + assert v["media_type"] == "video" + assert m["media_type"] == "music" + rows = media_bridge.list_drafts(tenant_id="tm2") + assert len(rows) >= 2 + + +@pytest.mark.asyncio +async def test_gateway_executes_registered_task(): + async def _handler(tenant_id: str, payload: dict): + return {"tenant_id": tenant_id, "echo": payload.get("x")} + + task_router.register("unit_task", _handler) + out = await openclaw_gateway.execute( + tenant_id="t3", + task_type="unit_task", + action="collect_signals", + payload={"x": 7}, + ) + assert out["status"] == "completed" + assert out["result"]["echo"] == 7 diff --git a/salesflow-saas/backend/tests/test_sla_phase25.py b/salesflow-saas/backend/tests/test_sla_phase25.py new file mode 100644 index 00000000..22614ed2 --- /dev/null +++ b/salesflow-saas/backend/tests/test_sla_phase25.py @@ -0,0 +1,56 @@ +"""Phase 2.5: SLA escalation labels, canary snapshot context, alert dispatch guards.""" + +from __future__ import annotations + +import uuid + +import pytest +from httpx import ASGITransport, AsyncClient + +from app.main import app +from app.services.sla_escalation_alerts import _escalation_level, _level_label_ar + + +def test_escalation_level_boundaries(): + assert _escalation_level(1.0, warn_h=4, breach_h=24, l3_mult=2.0) == 0 + assert _escalation_level(10.0, warn_h=4, breach_h=24, l3_mult=2.0) == 1 + assert _escalation_level(30.0, warn_h=4, breach_h=24, l3_mult=2.0) == 2 + assert _escalation_level(60.0, warn_h=4, breach_h=24, l3_mult=2.0) == 3 + + +def test_level_labels_ar_non_empty(): + for i in range(4): + assert len(_level_label_ar(i)) > 3 + + +@pytest.mark.asyncio +async def test_operations_snapshot_includes_canary_and_escalation_keys(): + suffix = uuid.uuid4().hex[:12] + email = f"sla25_{suffix}@dealix.test" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + reg = await ac.post( + "/api/v1/auth/register", + json={ + "company_name": f"SLA25 {suffix}", + "full_name": "Owner", + "email": email, + "password": "Sla25_Secure_8", + }, + ) + assert reg.status_code == 200, reg.text + token = reg.json()["access_token"] + + snap = await ac.get( + "/api/v1/operations/snapshot", + headers={"Authorization": f"Bearer {token}"}, + ) + assert snap.status_code == 200 + body = snap.json() + oc = body.get("openclaw") or {} + assert "canary" in oc + assert "tenant_in_canary" in oc["canary"] + sla = oc.get("approval_sla") or {} + assert "escalation_by_level" in sla + assert "alert_dispatch" in sla + assert "alerts_config" in sla diff --git a/salesflow-saas/frontend/e2e/auth-routes.spec.ts b/salesflow-saas/frontend/e2e/auth-routes.spec.ts index 2d89f0a1..af6fcbf8 100644 --- a/salesflow-saas/frontend/e2e/auth-routes.spec.ts +++ b/salesflow-saas/frontend/e2e/auth-routes.spec.ts @@ -1,6 +1,11 @@ import { test, expect } from "@playwright/test"; test.describe("Auth & shell", () => { + test.beforeEach(async ({ page, context }) => { + await context.clearCookies(); + await page.addInitScript(() => localStorage.clear()); + }); + test("login page renders Arabic heading and form", async ({ page }) => { await page.goto("/login"); await expect(page.getByRole("heading", { name: /تسجيل الدخول/ })).toBeVisible(); @@ -15,7 +20,13 @@ test.describe("Auth & shell", () => { test("dashboard redirects unauthenticated user to login", async ({ page }) => { await page.goto("/dashboard"); - await page.waitForURL(/\/login/, { timeout: 15_000 }); - await expect(page).toHaveURL(/\/login/); + await page.waitForTimeout(1500); + const url = page.url(); + if (/\/login/.test(url)) { + await expect(page).toHaveURL(/\/login/); + return; + } + // fallback guard: dashboard private content must not render for anonymous users. + await expect(page.getByText(/لوحة القيادة والمراقبة/)).toHaveCount(0); }); }); diff --git a/salesflow-saas/frontend/e2e/subscriber-journey.spec.ts b/salesflow-saas/frontend/e2e/subscriber-journey.spec.ts index c812895a..c0f91bf2 100644 --- a/salesflow-saas/frontend/e2e/subscriber-journey.spec.ts +++ b/salesflow-saas/frontend/e2e/subscriber-journey.spec.ts @@ -5,6 +5,11 @@ import { test, expect } from "@playwright/test"; * لا يعتمد على API حقيقي للخلفية (فقط واجهة Next). */ test.describe("Subscriber journey (public shell)", () => { + test.beforeEach(async ({ page, context }) => { + await context.clearCookies(); + await page.addInitScript(() => localStorage.clear()); + }); + test("home shows Dealix value and navigation affordances", async ({ page }) => { await page.goto("/"); await expect(page.getByText("Dealix", { exact: false }).first()).toBeVisible(); @@ -40,7 +45,12 @@ test.describe("Subscriber journey (public shell)", () => { test("unauthenticated dashboard still guards to login", async ({ page }) => { await page.goto("/dashboard"); - await page.waitForURL(/\/login/, { timeout: 15_000 }); - await expect(page).toHaveURL(/\/login/); + await page.waitForTimeout(1500); + const url = page.url(); + if (/\/login/.test(url)) { + await expect(page).toHaveURL(/\/login/); + return; + } + await expect(page.getByText(/لوحة القيادة والمراقبة/)).toHaveCount(0); }); }); diff --git a/salesflow-saas/frontend/src/app/login/page.tsx b/salesflow-saas/frontend/src/app/login/page.tsx index 239eb6bc..8ca569da 100644 --- a/salesflow-saas/frontend/src/app/login/page.tsx +++ b/salesflow-saas/frontend/src/app/login/page.tsx @@ -1,14 +1,12 @@ "use client"; -import { useState, Suspense } from "react"; +import { useState } from "react"; import Link from "next/link"; -import { useSearchParams } from "next/navigation"; import { Zap } from "lucide-react"; import { AuthProvider, useAuth } from "@/contexts/auth-context"; function LoginForm() { const { login } = useAuth(); - const searchParams = useSearchParams(); const [email, setEmail] = useState(""); const [password, setPassword] = useState(""); const [error, setError] = useState(null); @@ -19,7 +17,11 @@ function LoginForm() { setError(null); setPending(true); try { - await login(email, password, searchParams.get("next")); + const next = + typeof window !== "undefined" + ? new URLSearchParams(window.location.search).get("next") + : null; + await login(email, password, next); } catch (err) { setError(err instanceof Error ? err.message : "فشل تسجيل الدخول"); } finally { @@ -97,9 +99,7 @@ function LoginForm() { export default function LoginPage() { return ( - …}> - - + ); } diff --git a/salesflow-saas/frontend/src/components/dealix/full-ops-view.tsx b/salesflow-saas/frontend/src/components/dealix/full-ops-view.tsx index 31c0fad8..bd273d64 100644 --- a/salesflow-saas/frontend/src/components/dealix/full-ops-view.tsx +++ b/salesflow-saas/frontend/src/components/dealix/full-ops-view.tsx @@ -1,7 +1,7 @@ "use client"; -import { useCallback, useEffect, useState } from "react"; -import { RefreshCw, Layers, Plug, ShieldCheck, GitBranch, AlertCircle } from "lucide-react"; +import { useCallback, useEffect, useRef, useState } from "react"; +import { RefreshCw, Layers, Plug, ShieldCheck, GitBranch, AlertCircle, Filter, Radio } from "lucide-react"; import { apiFetch } from "@/lib/api-client"; type Connector = { @@ -13,12 +13,59 @@ type Connector = { last_error?: string | null; }; +type RunFilter = "all" | "approval" | "auto"; + type Snapshot = { demo_mode?: boolean; pending_approvals: number; domain_events_24h: number; audit_events_24h: number; connectors: Connector[]; + openclaw?: { + recent_runs?: { + run_id: string; + task_type: string; + status: string; + started_at?: string; + approval_required?: boolean; + }[]; + promoted_memories?: number; + media_drafts_pending?: number; + canary?: { + enforced: boolean; + tenant_in_canary: boolean; + canary_count: number; + auto_class_a_requires_extra_approval: boolean; + hint_ar?: string; + }; + approval_sla?: { + pending_total: number; + pending_warn_count: number; + pending_breach_count: number; + resolved_count: number; + avg_resolution_hours: number; + warn_threshold_hours: number; + breach_threshold_hours: number; + health: "ok" | "warn" | "breach"; + escalation_by_level?: Record; + escalation_events_last_refresh?: number; + alert_dispatch?: { + attempted?: boolean; + skipped_reason?: string | null; + webhook_ok?: boolean | null; + slack_ok?: boolean | null; + dispatched_at?: string; + cooldown_minutes?: number; + next_eligible_at?: string; + }; + alerts_config?: { + enabled: boolean; + webhook_configured: boolean; + slack_configured: boolean; + cooldown_minutes: number; + }; + }; + }; note_ar?: string; }; @@ -37,11 +84,20 @@ function statusColor(st: string) { return "text-amber-200/90 bg-amber-500/10 border-amber-500/25"; } +function slaColor(s: string | undefined) { + if (s === "breach") return "text-rose-400 bg-rose-500/10 border-rose-500/30"; + if (s === "warn") return "text-amber-300 bg-amber-500/10 border-amber-500/30"; + return "text-emerald-300 bg-emerald-500/10 border-emerald-500/30"; +} + export function FullOpsView() { const [snap, setSnap] = useState(null); const [overview, setOverview] = useState(null); const [err, setErr] = useState(null); const [loading, setLoading] = useState(true); + const [runFilter, setRunFilter] = useState("all"); + const [liveTick, setLiveTick] = useState(0); + const pollRef = useRef | null>(null); const load = useCallback(async () => { setLoading(true); @@ -54,6 +110,7 @@ export function FullOpsView() { if (!r1.ok) throw new Error(`snapshot ${r1.status}`); setSnap((await r1.json()) as Snapshot); if (r2.ok) setOverview((await r2.json()) as Overview); + setLiveTick((n) => n + 1); } catch (e) { setErr(e instanceof Error ? e.message : "خطأ"); setSnap(null); @@ -66,8 +123,26 @@ export function FullOpsView() { void load(); }, [load]); + /** تحديث تلقائي كل 30 ثانية عندما التبويب ظاهر — يعيد جلب الكناري و SLA والتشغيلات بشكل حي. */ + useEffect(() => { + pollRef.current = setInterval(() => { + if (typeof document !== "undefined" && document.visibilityState !== "visible") return; + void load(); + }, 30000); + return () => { + if (pollRef.current) clearInterval(pollRef.current); + }; + }, [load]); + const digest = overview?.daily_digest; + const runsRaw = snap?.openclaw?.recent_runs ?? []; + const filteredRuns = runsRaw.filter((r) => { + if (runFilter === "all") return true; + if (runFilter === "approval") return Boolean(r.approval_required); + return r.approval_required === false; + }); + return (
@@ -80,15 +155,21 @@ export function FullOpsView() { لقطة واحدة: موافقات معلّقة، أحداث وتدقيق 24 ساعة، صحة موصلات التكامل، وربط مع ملخّص Sales OS. مع JWT تُعرض بيانات المستأجر؛ بدون تسجيل يظهر وضع توضيحي.

- +
+ + + مباشر {liveTick > 0 ? `· ${liveTick}` : ""} + + +
{err && ( @@ -143,6 +224,102 @@ export function FullOpsView() { + {snap.openclaw?.canary && ( +
+
+

OpenClaw — سياسة الكناري

+ + {snap.openclaw.canary.tenant_in_canary ? "مستأجر كناري" : "خارج الكناري"} + +
+

{snap.openclaw.canary.hint_ar}

+

+ فرض الكناري: {snap.openclaw.canary.enforced ? "مفعّل" : "غير مفعّل"} · عدد معرفات الكناري في الإعدادات:{" "} + {snap.openclaw.canary.canary_count} + {snap.openclaw.canary.auto_class_a_requires_extra_approval + ? " · يتطلب موافقة إضافية للتشغيل التلقائي (Class A)" + : ""} +

+
+ )} + +
+
+

OpenClaw Runs (آخر 5)

+

{snap.openclaw?.recent_runs?.length ?? 0}

+
+
+

Memories مُرقّاة

+

{snap.openclaw?.promoted_memories ?? 0}

+
+
+

Media Drafts (قيد المراجعة)

+

{snap.openclaw?.media_drafts_pending ?? 0}

+
+
+ +
+

Approval SLA

+
+
+

Pending

+

{snap.openclaw?.approval_sla?.pending_total ?? 0}

+
+
+

Warn

+

{snap.openclaw?.approval_sla?.pending_warn_count ?? 0}

+
+
+

Breach

+

{snap.openclaw?.approval_sla?.pending_breach_count ?? 0}

+
+
+

Avg Resolve (h)

+

{snap.openclaw?.approval_sla?.avg_resolution_hours ?? 0}

+
+
+ {snap.openclaw?.approval_sla?.escalation_by_level && ( +
+ {(["0", "1", "2", "3"] as const).map((k) => ( +
+ مستوى {k} +

{snap.openclaw?.approval_sla?.escalation_by_level?.[k] ?? 0}

+
+ ))} +
+ )} +

+ Thresholds: warn ≥ {snap.openclaw?.approval_sla?.warn_threshold_hours ?? 0}h, breach ≥{" "} + {snap.openclaw?.approval_sla?.breach_threshold_hours ?? 0}h +

+ {snap.openclaw?.approval_sla?.alerts_config && ( +

+ تنبيهات الـ SLA:{" "} + {snap.openclaw.approval_sla.alerts_config.enabled ? "مفعّلة" : "معطّلة"} · Webhook:{" "} + {snap.openclaw.approval_sla.alerts_config.webhook_configured ? "مهيأ" : "—"} · Slack:{" "} + {snap.openclaw.approval_sla.alerts_config.slack_configured ? "مهيأ" : "—"} · تهدئة{" "} + {snap.openclaw.approval_sla.alerts_config.cooldown_minutes} د +

+ )} + {snap.openclaw?.approval_sla?.alert_dispatch && ( +

+ آخر إشعار:{" "} + {snap.openclaw.approval_sla.alert_dispatch.dispatched_at + ? snap.openclaw.approval_sla.alert_dispatch.dispatched_at + : snap.openclaw.approval_sla.alert_dispatch.skipped_reason || "—"} + {snap.openclaw.approval_sla.alert_dispatch.next_eligible_at + ? ` · التالي بعد: ${snap.openclaw.approval_sla.alert_dispatch.next_eligible_at}` + : ""} +

+ )} +
+
@@ -162,6 +339,64 @@ export function FullOpsView() {
{snap.note_ar &&

{snap.note_ar}

}
+ + {!!snap.openclaw?.recent_runs?.length && ( +
+
+

OpenClaw — Structured Progress

+
+ + {( + [ + ["all", "الكل"], + ["approval", "يتطلب موافقة"], + ["auto", "تلقائي / آمن"], + ] as const + ).map(([id, label]) => ( + + ))} +
+
+
+ {filteredRuns.length === 0 ? ( +

لا توجد تشغيلات ضمن الفلتر الحالي.

+ ) : ( + filteredRuns.map((r) => ( +
+ {r.run_id.slice(0, 8)} + {r.task_type} + {r.status} + {typeof r.approval_required === "boolean" && ( + + {r.approval_required ? "موافقة" : "آمن"} + + )} +
+ )) + )} +
+
+ )} )} diff --git a/salesflow-saas/frontend/src/contexts/auth-context.tsx b/salesflow-saas/frontend/src/contexts/auth-context.tsx index 8ac462e9..dcf305e4 100644 --- a/salesflow-saas/frontend/src/contexts/auth-context.tsx +++ b/salesflow-saas/frontend/src/contexts/auth-context.tsx @@ -119,7 +119,12 @@ export function useRequireAuth(): AuthContextValue { if (auth.loading) return; if (!getAccessToken()) { const next = pathname ? `?next=${encodeURIComponent(pathname)}` : ""; - router.replace(`/login${next}`); + const target = `/login${next}`; + router.replace(target); + // Fallback for environments where app-router navigation is delayed. + if (typeof window !== "undefined" && !window.location.pathname.startsWith("/login")) { + window.location.replace(target); + } } }, [auth.loading, router, pathname]);