Dealix: OpenClaw safe core, SLA phase 2.5, full-ops UI

This commit is contained in:
Sami Assiri 2026-04-08 23:31:02 +03:00
parent 378ea5f742
commit 525d6d4117
25 changed files with 1584 additions and 38 deletions

View File

@ -1,13 +1,21 @@
from __future__ import annotations
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from fastapi import APIRouter
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from app.api.deps import get_optional_user
from app.flows.prospecting_durable_flow import prospecting_durable_flow
from app.flows.self_improvement_flow import self_improvement_flow
from app.models.user import User
from app.openclaw.gateway import openclaw_gateway
from app.openclaw.media_bridge import media_bridge
from app.openclaw.memory_bridge import memory_bridge
from app.openclaw.observability_bridge import observability_bridge
from app.openclaw.policy import classify_action
from app.openclaw.task_router import task_router
from app.services.contract_intelligence_service import contract_intelligence_service
from app.services.executive_roi_service import executive_roi_service
from app.services.predictive_revenue_service import predictive_revenue_service
@ -55,6 +63,61 @@ class ConnectivityRequest(BaseModel):
amount_sar: int = 10
class MemoryCollectRequest(BaseModel):
tenant_id: str = "default_tenant"
domain: str = "operational"
content: str
evidence: Dict[str, Any] = Field(default_factory=dict)
signal_count: int = 0
repetition_count: int = 0
impact_score: float = 0.0
threshold: float = 60.0
class MediaDraftRequest(BaseModel):
tenant_id: str = "default_tenant"
media_type: str = Field(..., description="video | music")
prompt: str
provider_hint: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class PolicyCheckRequest(BaseModel):
tenant_id: str = "default_tenant"
action: str
payload: Dict[str, Any] = Field(default_factory=dict)
def _canary_enabled(tenant_id: str) -> bool:
canary = [x.strip() for x in (settings.OPENCLAW_CANARY_TENANTS or "").split(",") if x.strip()]
if not canary:
return True
return tenant_id in canary
def _resolve_tenant(user: Optional[User], fallback_tenant_id: str) -> str:
return str(user.tenant_id) if user else fallback_tenant_id
_TASKS_REGISTERED = False
def _register_task_router() -> None:
global _TASKS_REGISTERED
if _TASKS_REGISTERED:
return
async def _prospecting(tenant_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
return await prospecting_durable_flow.run(tenant_id, payload)
async def _self_improve(tenant_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
return self_improvement_flow.run(tenant_id, payload)
task_router.register("prospecting_flow", _prospecting)
task_router.register("self_improvement_flow", _self_improve)
_TASKS_REGISTERED = True
def build_go_live_readiness_report() -> Dict[str, Any]:
"""
Full commercial go-live: blocking checks across security, data, LLM, email, CRM,
@ -147,12 +210,127 @@ def build_go_live_readiness_report() -> Dict[str, Any]:
@router.post("/flows/prospecting")
async def run_prospecting_flow(payload: DealPayload) -> Dict[str, Any]:
return await prospecting_durable_flow.run(payload.tenant_id, payload.deal)
_register_task_router()
if not settings.OPENCLAW_SAFE_CORE_ENABLED:
return await prospecting_durable_flow.run(payload.tenant_id, payload.deal)
if not _canary_enabled(payload.tenant_id):
return {"status": "skipped", "reason": "tenant_not_in_canary", "tenant_id": payload.tenant_id}
result = await openclaw_gateway.execute(
tenant_id=payload.tenant_id,
task_type="prospecting_flow",
action="send_whatsapp",
payload=payload.deal,
model_provider="openclaw-router",
cache_hint="prospecting-cache",
)
return result
@router.post("/flows/self-improvement")
async def run_self_improvement_flow(payload: DealPayload) -> Dict[str, Any]:
return self_improvement_flow.run(payload.tenant_id, payload.deal)
_register_task_router()
if not settings.OPENCLAW_SAFE_CORE_ENABLED:
return self_improvement_flow.run(payload.tenant_id, payload.deal)
result = await openclaw_gateway.execute(
tenant_id=payload.tenant_id,
task_type="self_improvement_flow",
action="collect_signals",
payload=payload.deal,
model_provider="openclaw-router",
cache_hint="self-improve-cache",
)
return result
@router.get("/openclaw/health")
async def openclaw_health() -> Dict[str, Any]:
return {
"safe_core_enabled": settings.OPENCLAW_SAFE_CORE_ENABLED,
"media_drafts_enabled": settings.OPENCLAW_MEDIA_DRAFTS_ENABLED,
"memory_enabled": settings.OPENCLAW_MEMORY_ENABLED,
"canary_tenants": [x.strip() for x in (settings.OPENCLAW_CANARY_TENANTS or "").split(",") if x.strip()],
"registered_task_types": ["prospecting_flow", "self_improvement_flow"],
}
@router.get("/openclaw/runs")
async def list_openclaw_runs(
tenant_id: Optional[str] = None,
limit: int = 50,
user: Optional[User] = Depends(get_optional_user),
) -> Dict[str, Any]:
tid = _resolve_tenant(user, tenant_id or "default_tenant")
return {"items": observability_bridge.list_runs(tenant_id=tid, limit=limit)}
@router.post("/openclaw/policy/check")
async def policy_check(body: PolicyCheckRequest) -> Dict[str, Any]:
from app.openclaw.approval_bridge import approval_bridge
gate = approval_bridge.evaluate(action=body.action, payload=body.payload, tenant_id=body.tenant_id)
return {"gate": gate, "classification": classify_action(body.action).as_dict()}
@router.post("/openclaw/memory/promote")
async def memory_collect_promote(body: MemoryCollectRequest, user: Optional[User] = Depends(get_optional_user)) -> Dict[str, Any]:
if not settings.OPENCLAW_MEMORY_ENABLED:
raise HTTPException(status_code=403, detail="OpenClaw memory bridge is disabled")
tid = _resolve_tenant(user, body.tenant_id)
item = memory_bridge.collect(tenant_id=tid, domain=body.domain, content=body.content, evidence=body.evidence)
scored = memory_bridge.score(
item["memory_id"],
signal_count=body.signal_count,
repetition_count=body.repetition_count,
impact_score=body.impact_score,
)
promoted = memory_bridge.promote(item["memory_id"], threshold=body.threshold)
return {"collected": item, "scored": scored, "promoted": promoted}
@router.get("/openclaw/memory")
async def list_memory(
tenant_id: Optional[str] = None,
promoted_only: bool = False,
domain: Optional[str] = None,
limit: int = 100,
user: Optional[User] = Depends(get_optional_user),
) -> Dict[str, Any]:
if not settings.OPENCLAW_MEMORY_ENABLED:
raise HTTPException(status_code=403, detail="OpenClaw memory bridge is disabled")
tid = _resolve_tenant(user, tenant_id or "default_tenant")
return {"items": memory_bridge.list_items(tenant_id=tid, promoted_only=promoted_only, domain=domain, limit=limit)}
@router.post("/openclaw/media/drafts")
async def create_media_draft(body: MediaDraftRequest, user: Optional[User] = Depends(get_optional_user)) -> Dict[str, Any]:
if not settings.OPENCLAW_MEDIA_DRAFTS_ENABLED:
raise HTTPException(status_code=403, detail="OpenClaw media draft bridge is disabled")
tid = _resolve_tenant(user, body.tenant_id)
# Draft-only in phase-1: always require approval at policy layer for video/music.
gate = classify_action(f"{body.media_type}_generate").as_dict()
if not gate["requires_approval"]:
raise HTTPException(status_code=400, detail="Invalid media policy state")
row = media_bridge.create_draft(
tenant_id=tid,
media_type=body.media_type,
prompt=body.prompt,
provider_hint=body.provider_hint,
metadata=body.metadata,
)
return {"draft": row, "policy": gate}
@router.get("/openclaw/media/drafts")
async def list_media_drafts(
tenant_id: Optional[str] = None,
media_type: Optional[str] = None,
limit: int = 100,
user: Optional[User] = Depends(get_optional_user),
) -> Dict[str, Any]:
if not settings.OPENCLAW_MEDIA_DRAFTS_ENABLED:
raise HTTPException(status_code=403, detail="OpenClaw media draft bridge is disabled")
tid = _resolve_tenant(user, tenant_id or "default_tenant")
return {"items": media_bridge.list_drafts(tenant_id=tid, media_type=media_type, limit=limit)}
@router.post("/intelligence/contract")

View File

@ -15,6 +15,7 @@ from app.database import get_db
from app.api.deps import get_current_user, get_optional_user, require_role
from app.models.user import User
from app.models.operations import ApprovalRequest
from app.config import get_settings
from app.services.audit_service import list_recent_audits
from app.services.operations_hub import (
count_events_since,
@ -23,8 +24,80 @@ from app.services.operations_hub import (
list_integration_connectors,
upsert_connector_status,
)
from app.openclaw.canary_context import get_canary_dashboard_context
from app.openclaw.observability_bridge import observability_bridge
from app.openclaw.memory_bridge import memory_bridge
from app.openclaw.media_bridge import media_bridge
from app.services.sla_escalation_alerts import (
maybe_dispatch_sla_breach_alerts,
refresh_pending_escalations,
)
router = APIRouter(prefix="/operations", tags=["Full Auto Operations"])
settings = get_settings()
def _hours_between(now: datetime, then: Optional[datetime]) -> float:
if not then:
return 0.0
return max(0.0, (now - then).total_seconds() / 3600.0)
async def _approval_sla_metrics(db: AsyncSession, tenant_id) -> Dict[str, Any]:
now = datetime.now(timezone.utc)
warn_h = max(1, int(settings.OPENCLAW_APPROVAL_SLA_HOURS_WARN))
breach_h = max(warn_h, int(settings.OPENCLAW_APPROVAL_SLA_HOURS_BREACH))
q_pending = await db.execute(
select(ApprovalRequest).where(
ApprovalRequest.tenant_id == tenant_id,
ApprovalRequest.status == "pending",
)
)
pending_rows = q_pending.scalars().all()
pending_warn = 0
pending_breach = 0
for row in pending_rows:
h = _hours_between(now, row.created_at)
if h >= warn_h:
pending_warn += 1
if h >= breach_h:
pending_breach += 1
q_resolved = await db.execute(
select(ApprovalRequest).where(
ApprovalRequest.tenant_id == tenant_id,
ApprovalRequest.status.in_(["approved", "rejected"]),
ApprovalRequest.reviewed_at.is_not(None),
)
)
resolved_rows = q_resolved.scalars().all()
resolution_hours = []
for row in resolved_rows:
if row.created_at and row.reviewed_at:
resolution_hours.append(max(0.0, (row.reviewed_at - row.created_at).total_seconds() / 3600.0))
avg_hours = (sum(resolution_hours) / len(resolution_hours)) if resolution_hours else 0.0
sla_health = "ok"
if pending_breach > 0:
sla_health = "breach"
elif pending_warn > 0:
sla_health = "warn"
return {
"pending_total": len(pending_rows),
"pending_warn_count": pending_warn,
"pending_breach_count": pending_breach,
"resolved_count": len(resolved_rows),
"avg_resolution_hours": round(avg_hours, 2),
"warn_threshold_hours": warn_h,
"breach_threshold_hours": breach_h,
"health": sla_health,
"alerts_config": {
"enabled": bool(settings.OPENCLAW_SLA_ALERTS_ENABLED),
"webhook_configured": bool((settings.OPENCLAW_SLA_WEBHOOK_URL or "").strip()),
"slack_configured": bool((settings.OPENCLAW_SLA_SLACK_WEBHOOK_URL or "").strip()),
"cooldown_minutes": int(settings.OPENCLAW_SLA_ALERT_COOLDOWN_MINUTES),
},
}
def _demo_snapshot() -> Dict[str, Any]:
@ -39,6 +112,31 @@ def _demo_snapshot() -> Dict[str, Any]:
{"connector_key": "stripe_billing", "display_name_ar": "Stripe — الفوترة", "status": "unknown", "last_success_at": None, "last_attempt_at": None, "last_error": None},
{"connector_key": "email_sync", "display_name_ar": "مزامنة البريد", "status": "unknown", "last_success_at": None, "last_attempt_at": None, "last_error": None},
],
"openclaw": {
"recent_runs": [],
"promoted_memories": 0,
"media_drafts_pending": 0,
"canary": get_canary_dashboard_context("00000000-0000-0000-0000-000000000000"),
"approval_sla": {
"pending_total": 0,
"pending_warn_count": 0,
"pending_breach_count": 0,
"resolved_count": 0,
"avg_resolution_hours": 0.0,
"warn_threshold_hours": int(settings.OPENCLAW_APPROVAL_SLA_HOURS_WARN),
"breach_threshold_hours": int(settings.OPENCLAW_APPROVAL_SLA_HOURS_BREACH),
"health": "ok",
"escalation_by_level": {"0": 0, "1": 0, "2": 0, "3": 0},
"escalation_events_last_refresh": 0,
"alert_dispatch": {"skipped_reason": "demo_mode"},
"alerts_config": {
"enabled": bool(settings.OPENCLAW_SLA_ALERTS_ENABLED),
"webhook_configured": bool((settings.OPENCLAW_SLA_WEBHOOK_URL or "").strip()),
"slack_configured": bool((settings.OPENCLAW_SLA_SLACK_WEBHOOK_URL or "").strip()),
"cooldown_minutes": int(settings.OPENCLAW_SLA_ALERT_COOLDOWN_MINUTES),
},
},
},
"note_ar": "وضع توضيحي — سجّل الدخول لرؤية بيانات المستأجر.",
}
@ -57,12 +155,33 @@ async def operations_snapshot(
ev = await count_events_since(db, user.tenant_id, 24)
aud = await count_audits_since(db, user.tenant_id, 24)
connectors = await list_integration_connectors(db, user.tenant_id)
tenant_id_str = str(user.tenant_id)
esc = await refresh_pending_escalations(db, user.tenant_id)
recent_runs = observability_bridge.list_runs(tenant_id=tenant_id_str, limit=5)
promoted_memories = len(memory_bridge.list_items(tenant_id=tenant_id_str, promoted_only=True, limit=500))
media_drafts_pending = len(media_bridge.list_drafts(tenant_id=tenant_id_str, limit=500))
approval_sla = await _approval_sla_metrics(db, user.tenant_id)
approval_sla["escalation_by_level"] = esc.get("by_level", {})
approval_sla["escalation_events_last_refresh"] = int(esc.get("events_emitted") or 0)
approval_sla["alert_dispatch"] = await maybe_dispatch_sla_breach_alerts(
db,
user.tenant_id,
tenant_id_str=tenant_id_str,
metrics=approval_sla,
)
return {
"demo_mode": False,
"pending_approvals": pending,
"domain_events_24h": ev,
"audit_events_24h": aud,
"connectors": connectors,
"openclaw": {
"recent_runs": recent_runs,
"promoted_memories": promoted_memories,
"media_drafts_pending": media_drafts_pending,
"canary": get_canary_dashboard_context(tenant_id_str),
"approval_sla": approval_sla,
},
"note_ar": "حلقة التشغيل: أحداث مسجّلة + تدقيق + موصلات — تُوسَّع مع المزامنة الفعلية.",
}
@ -160,6 +279,8 @@ async def list_approvals(
result = await db.execute(q)
items = []
for a in result.scalars().all():
pl = a.payload if isinstance(a.payload, dict) else {}
sla_meta = pl.get("_dealix_sla") if isinstance(pl.get("_dealix_sla"), dict) else None
items.append(
{
"id": str(a.id),
@ -168,13 +289,22 @@ async def list_approvals(
"resource_id": str(a.resource_id),
"status": a.status,
"requested_by_id": str(a.requested_by_id),
"payload": a.payload,
"payload": pl,
"sla_escalation": sla_meta,
"created_at": a.created_at.isoformat() if a.created_at else None,
}
)
return {"items": items, "count": len(items)}
@router.get("/approvals/sla")
async def approvals_sla(
db: AsyncSession = Depends(get_db),
user: User = Depends(require_role("owner", "admin", "manager")),
):
return await _approval_sla_metrics(db, user.tenant_id)
@router.put("/approvals/{approval_id}")
async def resolve_approval(
approval_id: UUID,

View File

@ -124,6 +124,20 @@ class Settings(BaseSettings):
# ── Autonomous Loops ────────────────────────────────────
SELF_IMPROVEMENT_INTERVAL_SECONDS: int = 900
OPENCLAW_SAFE_CORE_ENABLED: bool = True
OPENCLAW_MEDIA_DRAFTS_ENABLED: bool = True
OPENCLAW_MEMORY_ENABLED: bool = True
OPENCLAW_CANARY_TENANTS: str = ""
OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS: bool = True
OPENCLAW_APPROVAL_SLA_HOURS_WARN: int = 4
OPENCLAW_APPROVAL_SLA_HOURS_BREACH: int = 24
# Escalation level 3 when age >= breach * multiplier (must be > 1)
OPENCLAW_APPROVAL_ESCALATION_L3_MULTIPLIER: float = 2.0
# Breach notifications (empty URLs = no outbound calls)
OPENCLAW_SLA_ALERTS_ENABLED: bool = True
OPENCLAW_SLA_WEBHOOK_URL: str = ""
OPENCLAW_SLA_SLACK_WEBHOOK_URL: str = ""
OPENCLAW_SLA_ALERT_COOLDOWN_MINUTES: int = 45
# ── Scraping / Lead Gen ──────────────────────────────
GOOGLE_MAPS_API_KEY: str = ""

View File

@ -1,2 +1,20 @@
"""OpenClaw-compatible orchestration utilities."""
from app.openclaw.approval_bridge import approval_bridge
from app.openclaw.canary_context import get_canary_dashboard_context
from app.openclaw.gateway import openclaw_gateway
from app.openclaw.media_bridge import media_bridge
from app.openclaw.memory_bridge import memory_bridge
from app.openclaw.observability_bridge import observability_bridge
from app.openclaw.task_router import task_router
__all__ = [
"approval_bridge",
"get_canary_dashboard_context",
"openclaw_gateway",
"media_bridge",
"memory_bridge",
"observability_bridge",
"task_router",
]

View File

@ -0,0 +1,69 @@
from __future__ import annotations
from typing import Any, Dict
from app.config import get_settings
from app.openclaw.policy import PolicyDecision, classify_action
class OpenClawApprovalBridge:
"""Central policy+approval gate for OpenClaw runtime actions."""
def evaluate(self, *, action: str, payload: Dict[str, Any], tenant_id: str) -> Dict[str, Any]:
if not tenant_id:
return {
"allowed": False,
"requires_approval": False,
"reason": "missing_tenant_id",
"policy": {"action": action, "class": "C"},
}
decision: PolicyDecision = classify_action(action)
if not decision.allowed:
return {
"allowed": False,
"requires_approval": False,
"reason": decision.reason,
"policy": decision.as_dict(),
}
if payload.get("cross_tenant_context"):
return {
"allowed": False,
"requires_approval": False,
"reason": "cross_tenant_context_blocked",
"policy": decision.as_dict(),
}
settings = get_settings()
canary = [x.strip() for x in (settings.OPENCLAW_CANARY_TENANTS or "").split(",") if x.strip()]
canary_restrict_auto = bool(settings.OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS)
is_auto_action = decision.action_class == "A"
in_canary = not canary or tenant_id in canary
if canary_restrict_auto and is_auto_action and not in_canary and not payload.get("approval_token"):
return {
"allowed": False,
"requires_approval": True,
"reason": "approval_required:auto_action_outside_canary",
"policy": decision.as_dict(),
"canary": {"enforced": True, "tenant_in_canary": False},
}
if decision.requires_approval and not payload.get("approval_token"):
return {
"allowed": False,
"requires_approval": True,
"reason": f"approval_required:{action}",
"policy": decision.as_dict(),
}
return {
"allowed": True,
"requires_approval": decision.requires_approval or (canary_restrict_auto and is_auto_action and not in_canary),
"reason": "ok",
"policy": decision.as_dict(),
"canary": {"enforced": canary_restrict_auto, "tenant_in_canary": in_canary},
}
approval_bridge = OpenClawApprovalBridge()

View File

@ -0,0 +1,31 @@
"""Dashboard-facing OpenClaw canary policy context (per-tenant, read-only)."""
from __future__ import annotations
from typing import Any, Dict, List
from app.config import get_settings
def get_canary_dashboard_context(tenant_id: str) -> Dict[str, Any]:
"""Summarize canary list and whether this tenant may run Class-A auto actions without extra approval."""
tid = (tenant_id or "").strip()
s = get_settings()
raw = (s.OPENCLAW_CANARY_TENANTS or "").strip()
canary_list: List[str] = [x.strip() for x in raw.split(",") if x.strip()]
enforced = bool(s.OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS)
# Empty list = all tenants treated as canary for auto (no extra gate).
in_canary = not canary_list or tid in canary_list
auto_class_a_requires_extra = enforced and bool(canary_list) and not in_canary
return {
"enforced": enforced,
"tenant_in_canary": in_canary,
"canary_tenant_ids": canary_list,
"canary_count": len(canary_list),
"auto_class_a_requires_extra_approval": auto_class_a_requires_extra,
"hint_ar": (
"هذا المستأجر ضمن كناري التشغيل التلقائي — الإجراءات الآمنة (Class A) تمر بدون موافقة إضافية."
if in_canary or not canary_list
else "خارج قائمة الكناري — الإجراءات التلقائية الآمنة تتطلب موافقة أو رمز موافقة حتى مع سياسة الكناري."
),
}

View File

@ -0,0 +1,48 @@
from __future__ import annotations
from typing import Any, Dict
from app.openclaw.approval_bridge import approval_bridge
from app.openclaw.observability_bridge import observability_bridge
from app.openclaw.task_router import task_router
class OpenClawGateway:
"""Single ingress for OpenClaw tasks: policy -> progress -> execute."""
async def execute(
self,
*,
tenant_id: str,
task_type: str,
action: str,
payload: Dict[str, Any],
model_provider: str = "auto",
cache_hint: str = "prompt-cache-reuse",
) -> Dict[str, Any]:
gate = approval_bridge.evaluate(action=action, payload=payload, tenant_id=tenant_id)
run_id = observability_bridge.start_run(
tenant_id=tenant_id,
task_type=task_type,
model_provider=model_provider,
cache_hint=cache_hint,
approval_required=bool(gate.get("requires_approval")),
)
observability_bridge.step(run_id, "policy_gate", "ok" if gate["allowed"] else "blocked", {"gate": gate})
if not gate["allowed"]:
observability_bridge.finish(run_id, status="blocked", error=gate["reason"])
return {"run_id": run_id, "status": "blocked", "gate": gate}
try:
observability_bridge.step(run_id, "routing", "ok", {"task_type": task_type})
result = await task_router.route(task_type, tenant_id, payload)
observability_bridge.step(run_id, "execution", "ok")
observability_bridge.finish(run_id, status="completed")
return {"run_id": run_id, "status": "completed", "gate": gate, "result": result}
except Exception as e:
observability_bridge.step(run_id, "execution", "error", {"error": str(e)})
observability_bridge.finish(run_id, status="failed", error=str(e))
return {"run_id": run_id, "status": "failed", "gate": gate, "error": str(e)}
openclaw_gateway = OpenClawGateway()

View File

@ -2,6 +2,8 @@ from __future__ import annotations
from typing import Any, Dict
from app.openclaw.approval_bridge import approval_bridge
SENSITIVE_ACTIONS = {
"send_whatsapp",
@ -20,13 +22,6 @@ def before_agent_reply(action: str, payload: Dict[str, Any], tenant_id: str) ->
OpenClaw-style governance hook.
Blocks sensitive actions when tenant isolation or approvals are missing.
"""
if not tenant_id:
return {"allowed": False, "reason": "missing_tenant_id"}
if action in SENSITIVE_ACTIONS:
if not payload.get("approval_token"):
return {"allowed": False, "reason": f"approval_required:{action}"}
if payload.get("cross_tenant_context"):
return {"allowed": False, "reason": "cross_tenant_context_blocked"}
return {"allowed": True, "reason": "ok"}
gate = approval_bridge.evaluate(action=action, payload=payload, tenant_id=tenant_id)
# Keep old response contract for compatibility with existing tests/callers.
return {"allowed": gate["allowed"], "reason": gate["reason"]}

View File

@ -0,0 +1,70 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List
import uuid
@dataclass
class MediaDraft:
draft_id: str
tenant_id: str
media_type: str # video | music
prompt: str
status: str = "draft_pending_approval"
provider_hint: str | None = None
metadata: Dict[str, Any] = field(default_factory=dict)
created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
def as_dict(self) -> Dict[str, Any]:
return {
"draft_id": self.draft_id,
"tenant_id": self.tenant_id,
"media_type": self.media_type,
"prompt": self.prompt,
"status": self.status,
"provider_hint": self.provider_hint,
"metadata": self.metadata,
"created_at": self.created_at,
}
class OpenClawMediaBridge:
"""Draft-only media generation bridge for phase-1 safety."""
def __init__(self) -> None:
self._drafts: Dict[str, MediaDraft] = {}
def create_draft(
self,
*,
tenant_id: str,
media_type: str,
prompt: str,
provider_hint: str | None = None,
metadata: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
mtype = media_type.strip().lower()
if mtype not in {"video", "music"}:
raise ValueError("media_type must be 'video' or 'music'")
row = MediaDraft(
draft_id=str(uuid.uuid4()),
tenant_id=tenant_id,
media_type=mtype,
prompt=prompt.strip(),
provider_hint=provider_hint,
metadata=metadata or {},
)
self._drafts[row.draft_id] = row
return row.as_dict()
def list_drafts(self, *, tenant_id: str, media_type: str | None = None, limit: int = 100) -> List[Dict[str, Any]]:
rows = [r for r in self._drafts.values() if r.tenant_id == tenant_id]
if media_type:
rows = [r for r in rows if r.media_type == media_type.strip().lower()]
rows.sort(key=lambda x: x.created_at, reverse=True)
return [r.as_dict() for r in rows[: max(1, min(300, limit))]]
media_bridge = OpenClawMediaBridge()

View File

@ -0,0 +1,81 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List
import uuid
@dataclass
class MemoryItem:
memory_id: str
tenant_id: str
domain: str
content: str
evidence: Dict[str, Any]
score: float
promoted: bool
created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
def as_dict(self) -> Dict[str, Any]:
return {
"memory_id": self.memory_id,
"tenant_id": self.tenant_id,
"domain": self.domain,
"content": self.content,
"evidence": self.evidence,
"score": self.score,
"promoted": self.promoted,
"created_at": self.created_at,
}
class OpenClawMemoryBridge:
"""Phase-1 memory promotion pipeline: collect -> score -> promote."""
def __init__(self) -> None:
self._items: Dict[str, MemoryItem] = {}
def collect(self, *, tenant_id: str, domain: str, content: str, evidence: Dict[str, Any] | None = None) -> Dict[str, Any]:
item = MemoryItem(
memory_id=str(uuid.uuid4()),
tenant_id=tenant_id,
domain=domain or "operational",
content=content.strip(),
evidence=evidence or {},
score=0.0,
promoted=False,
)
self._items[item.memory_id] = item
return item.as_dict()
def score(self, memory_id: str, signal_count: int = 0, repetition_count: int = 0, impact_score: float = 0.0) -> Dict[str, Any]:
item = self._items[memory_id]
# lightweight deterministic scoring for phase-1
value = min(100.0, float(signal_count) * 8.0 + float(repetition_count) * 12.0 + float(impact_score))
item.score = round(value, 2)
return item.as_dict()
def promote(self, memory_id: str, threshold: float = 60.0) -> Dict[str, Any]:
item = self._items[memory_id]
item.promoted = item.score >= threshold
return item.as_dict()
def list_items(
self,
*,
tenant_id: str,
promoted_only: bool = False,
domain: str | None = None,
limit: int = 100,
) -> List[Dict[str, Any]]:
rows = [r for r in self._items.values() if r.tenant_id == tenant_id]
if promoted_only:
rows = [r for r in rows if r.promoted]
if domain:
rows = [r for r in rows if r.domain == domain]
rows.sort(key=lambda x: x.created_at, reverse=True)
return [r.as_dict() for r in rows[: max(1, min(300, limit))]]
memory_bridge = OpenClawMemoryBridge()

View File

@ -0,0 +1,94 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List
import uuid
@dataclass
class OpenClawRun:
run_id: str
tenant_id: str
task_type: str
status: str = "running"
started_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
ended_at: str | None = None
model_provider: str | None = None
cache_hint: str | None = None
approval_required: bool = False
steps: List[Dict[str, Any]] = field(default_factory=list)
error: str | None = None
def as_dict(self) -> Dict[str, Any]:
return {
"run_id": self.run_id,
"tenant_id": self.tenant_id,
"task_type": self.task_type,
"status": self.status,
"started_at": self.started_at,
"ended_at": self.ended_at,
"model_provider": self.model_provider,
"cache_hint": self.cache_hint,
"approval_required": self.approval_required,
"steps": self.steps,
"error": self.error,
}
class OpenClawObservabilityBridge:
"""In-process run telemetry for phase-1 safe core."""
def __init__(self) -> None:
self._runs: Dict[str, OpenClawRun] = {}
def start_run(
self,
*,
tenant_id: str,
task_type: str,
model_provider: str | None = None,
cache_hint: str | None = None,
approval_required: bool = False,
) -> str:
run_id = str(uuid.uuid4())
self._runs[run_id] = OpenClawRun(
run_id=run_id,
tenant_id=tenant_id,
task_type=task_type,
model_provider=model_provider,
cache_hint=cache_hint,
approval_required=approval_required,
)
return run_id
def step(self, run_id: str, stage: str, status: str = "ok", details: Dict[str, Any] | None = None) -> None:
run = self._runs.get(run_id)
if not run:
return
run.steps.append(
{
"at": datetime.now(timezone.utc).isoformat(),
"stage": stage,
"status": status,
"details": details or {},
}
)
def finish(self, run_id: str, *, status: str = "completed", error: str | None = None) -> None:
run = self._runs.get(run_id)
if not run:
return
run.status = status
run.error = error
run.ended_at = datetime.now(timezone.utc).isoformat()
def list_runs(self, *, tenant_id: str | None = None, limit: int = 50) -> List[Dict[str, Any]]:
rows = list(self._runs.values())
if tenant_id:
rows = [r for r in rows if r.tenant_id == tenant_id]
rows.sort(key=lambda r: r.started_at, reverse=True)
return [r.as_dict() for r in rows[: max(1, min(200, limit))]]
observability_bridge = OpenClawObservabilityBridge()

View File

@ -0,0 +1,73 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict
SAFE_AUTO_ACTIONS = {
"read_status",
"collect_signals",
"summarize",
"classify",
"tag",
"internal_status_update",
"research",
"generate_draft",
"plan",
"predictive_analysis",
}
APPROVAL_GATED_ACTIONS = {
"send_whatsapp",
"send_email",
"send_linkedin",
"trigger_voice_call",
"sync_salesforce",
"create_charge",
"publish_content",
"change_billing_state",
"modify_lead_routing",
"send_contract_for_signature",
"video_generate",
"music_generate",
}
FORBIDDEN_ACTIONS = {
"exfiltrate_secrets",
"delete_data_without_audit",
"bypass_auth",
"publish_without_approval",
"destructive_unchecked",
}
@dataclass
class PolicyDecision:
action: str
action_class: str # A, B, C
allowed: bool
requires_approval: bool
reason: str
def as_dict(self) -> Dict[str, Any]:
return {
"action": self.action,
"class": self.action_class,
"allowed": self.allowed,
"requires_approval": self.requires_approval,
"reason": self.reason,
}
def classify_action(action: str) -> PolicyDecision:
act = (action or "").strip()
if not act:
return PolicyDecision(action=act, action_class="C", allowed=False, requires_approval=False, reason="empty_action")
if act in FORBIDDEN_ACTIONS:
return PolicyDecision(action=act, action_class="C", allowed=False, requires_approval=False, reason="forbidden_action")
if act in APPROVAL_GATED_ACTIONS:
return PolicyDecision(action=act, action_class="B", allowed=True, requires_approval=True, reason="approval_required")
if act in SAFE_AUTO_ACTIONS:
return PolicyDecision(action=act, action_class="A", allowed=True, requires_approval=False, reason="safe_auto")
# default to approval-gated for unknown actions
return PolicyDecision(action=act, action_class="B", allowed=True, requires_approval=True, reason="unknown_action_requires_approval")

View File

@ -0,0 +1,25 @@
from __future__ import annotations
from typing import Any, Awaitable, Callable, Dict
TaskHandler = Callable[[str, Dict[str, Any]], Awaitable[Dict[str, Any]]]
class OpenClawTaskRouter:
"""Routes task types to async handlers with safe defaults."""
def __init__(self) -> None:
self._handlers: Dict[str, TaskHandler] = {}
def register(self, task_type: str, handler: TaskHandler) -> None:
self._handlers[task_type] = handler
async def route(self, task_type: str, tenant_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
h = self._handlers.get(task_type)
if not h:
raise ValueError(f"unsupported_task_type:{task_type}")
return await h(tenant_id, payload)
task_router = OpenClawTaskRouter()

View File

@ -0,0 +1,255 @@
"""
Approval SLA: auto-escalation metadata on pending rows + breach alerts (webhook / Slack).
Persists escalation state under ApprovalRequest.payload["_dealix_sla"].
Aggregated breach notifications use a per-tenant cooldown to avoid spam.
"""
from __future__ import annotations
import logging
from collections import Counter
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
from uuid import UUID
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.config import get_settings
from app.models.operations import ApprovalRequest
from app.services.operations_hub import emit_domain_event
logger = logging.getLogger(__name__)
SLA_KEY = "_dealix_sla"
# tenant_id -> last aggregate breach alert (UTC)
_last_aggregate_breach_alert: Dict[str, datetime] = {}
def _hours_between(now: datetime, then: Optional[datetime]) -> float:
if not then:
return 0.0
return max(0.0, (now - then).total_seconds() / 3600.0)
def _escalation_level(age_h: float, warn_h: int, breach_h: int, l3_mult: float) -> int:
if age_h < warn_h:
return 0
if age_h < breach_h:
return 1
breach_m = max(float(breach_h), float(warn_h))
if age_h < breach_m * max(l3_mult, 1.01):
return 2
return 3
def _level_label_ar(level: int) -> str:
return {
0: "ضمن المهلة",
1: "تحذير — يقترب من تجاوز SLA",
2: "تجاوز SLA — يتطلب اهتماماً فورياً",
3: "تصعيد حرج — تدخل المالك/الإدارة",
}.get(level, "غير معروف")
async def refresh_pending_escalations(db: AsyncSession, tenant_id: UUID) -> Dict[str, Any]:
"""
Update _dealix_sla on each pending approval; emit domain events when level increases.
"""
s = get_settings()
now = datetime.now(timezone.utc)
warn_h = max(1, int(s.OPENCLAW_APPROVAL_SLA_HOURS_WARN))
breach_h = max(warn_h, int(s.OPENCLAW_APPROVAL_SLA_HOURS_BREACH))
l3_mult = max(1.01, float(s.OPENCLAW_APPROVAL_ESCALATION_L3_MULTIPLIER))
q = await db.execute(
select(ApprovalRequest).where(
ApprovalRequest.tenant_id == tenant_id,
ApprovalRequest.status == "pending",
)
)
rows: List[ApprovalRequest] = list(q.scalars().all())
counts: Counter[int] = Counter()
bumped = 0
for row in rows:
age_h = _hours_between(now, row.created_at)
level = _escalation_level(age_h, warn_h, breach_h, l3_mult)
counts[level] += 1
base = dict(row.payload) if isinstance(row.payload, dict) else {}
prev = base.get(SLA_KEY) if isinstance(base.get(SLA_KEY), dict) else {}
prev_level = int(prev.get("escalation_level", 0) or 0)
sla_block = {
**prev,
"escalation_level": level,
"escalation_label_ar": _level_label_ar(level),
"age_hours": round(age_h, 2),
"warn_threshold_hours": warn_h,
"breach_threshold_hours": breach_h,
"updated_at": now.isoformat(),
}
if level != prev_level:
sla_block["escalation_changed_at"] = now.isoformat()
base[SLA_KEY] = sla_block
row.payload = base
flag_modified(row, "payload")
if level > prev_level:
bumped += 1
await emit_domain_event(
db,
tenant_id=tenant_id,
event_type="approval.sla_escalated",
payload={
"approval_id": str(row.id),
"from_level": prev_level,
"to_level": level,
"age_hours": round(age_h, 2),
},
source="sla_escalation",
)
by_level = {str(k): int(counts.get(k, 0)) for k in range(4)}
return {
"pending_escalation_total": len(rows),
"by_level": by_level,
"events_emitted": bumped,
}
async def maybe_dispatch_sla_breach_alerts(
db: AsyncSession,
tenant_id: UUID,
*,
tenant_id_str: str,
metrics: Dict[str, Any],
) -> Dict[str, Any]:
"""
If breach count > 0 and alerts enabled, POST to webhook and/or Slack (respecting cooldown).
"""
s = get_settings()
out: Dict[str, Any] = {
"attempted": False,
"skipped_reason": None,
"webhook_ok": None,
"slack_ok": None,
"cooldown_minutes": int(s.OPENCLAW_SLA_ALERT_COOLDOWN_MINUTES),
}
if not s.OPENCLAW_SLA_ALERTS_ENABLED:
out["skipped_reason"] = "alerts_disabled"
return out
breach_n = int(metrics.get("pending_breach_count") or 0)
if breach_n <= 0:
out["skipped_reason"] = "no_breach"
return out
webhook_url = (s.OPENCLAW_SLA_WEBHOOK_URL or "").strip()
slack_url = (s.OPENCLAW_SLA_SLACK_WEBHOOK_URL or "").strip()
if not webhook_url and not slack_url:
out["skipped_reason"] = "no_webhook_configured"
return out
now = datetime.now(timezone.utc)
cool = timedelta(minutes=max(5, int(s.OPENCLAW_SLA_ALERT_COOLDOWN_MINUTES)))
last = _last_aggregate_breach_alert.get(tenant_id_str)
if last and (now - last) < cool:
out["skipped_reason"] = "cooldown"
out["next_eligible_at"] = (last + cool).isoformat()
return out
payload = {
"event": "approval_sla.breach",
"tenant_id": tenant_id_str,
"pending_breach_count": breach_n,
"pending_warn_count": int(metrics.get("pending_warn_count") or 0),
"breach_threshold_hours": metrics.get("breach_threshold_hours"),
"warn_threshold_hours": metrics.get("warn_threshold_hours"),
"health": metrics.get("health"),
"timestamp": now.isoformat(),
"source": "dealix",
}
out["attempted"] = True
timeout = httpx.Timeout(12.0)
async with httpx.AsyncClient(timeout=timeout) as client:
if webhook_url:
try:
r = await client.post(webhook_url, json=payload)
out["webhook_ok"] = 200 <= r.status_code < 300
if not out["webhook_ok"]:
out["webhook_status"] = r.status_code
except Exception as e:
logger.warning("SLA webhook failed: %s", e)
out["webhook_ok"] = False
out["webhook_error"] = str(e)[:200]
if slack_url:
text = (
f":rotating_light: *Approval SLA breach* — tenant `{tenant_id_str[:8]}…`\n"
f"*Pending breach:* {breach_n} (warn: {metrics.get('pending_warn_count', 0)})\n"
f"*Thresholds:* warn {metrics.get('warn_threshold_hours')}h / breach {metrics.get('breach_threshold_hours')}h\n"
f"*Time:* {now.isoformat()}"
)
slack_body = {"text": text}
try:
r2 = await client.post(slack_url, json=slack_body)
out["slack_ok"] = 200 <= r2.status_code < 300
if not out["slack_ok"]:
out["slack_status"] = r2.status_code
except Exception as e:
logger.warning("SLA Slack webhook failed: %s", e)
out["slack_ok"] = False
out["slack_error"] = str(e)[:200]
delivered = bool(out.get("webhook_ok")) or bool(out.get("slack_ok"))
if not delivered:
out["skipped_reason"] = "delivery_failed"
out["attempted"] = True
return out
_last_aggregate_breach_alert[tenant_id_str] = now
out["dispatched_at"] = now.isoformat()
# Mark pending breach rows with last aggregate notify (audit in payload)
q = await db.execute(
select(ApprovalRequest).where(
ApprovalRequest.tenant_id == tenant_id,
ApprovalRequest.status == "pending",
)
)
breach_h = max(1, int(s.OPENCLAW_APPROVAL_SLA_HOURS_BREACH))
for row in q.scalars().all():
age_h = _hours_between(now, row.created_at)
if age_h < breach_h:
continue
base = dict(row.payload) if isinstance(row.payload, dict) else {}
sla = dict(base.get(SLA_KEY) or {})
sla["last_aggregate_breach_alert_at"] = now.isoformat()
base[SLA_KEY] = sla
row.payload = base
flag_modified(row, "payload")
await emit_domain_event(
db,
tenant_id=tenant_id,
event_type="approval.sla_breach_notified",
payload={
"pending_breach_count": breach_n,
"webhook_ok": out.get("webhook_ok"),
"slack_ok": out.get("slack_ok"),
},
source="sla_alerts",
)
return out

View File

@ -188,6 +188,8 @@ async def main() -> int:
results.append(await check("affiliates leaderboard", "GET", "/api/v1/affiliates/leaderboard/top"))
results.append(await check("agents list", "GET", "/api/v1/agents/list"))
results.append(await check("agents empire status", "GET", "/api/v1/agents/empire/status"))
results.append(await check("openclaw safe core health", "GET", "/api/v1/autonomous-foundation/openclaw/health"))
results.append(await check("openclaw runs telemetry", "GET", "/api/v1/autonomous-foundation/openclaw/runs"))
results.append(await check("LangGraph orchestrator health", "GET", "/api/v1/agents/langgraph/health"))
results.append(
await check(

View File

@ -77,3 +77,49 @@ async def test_go_live_gate_returns_403_with_report_when_not_fully_ready(client)
assert response.status_code == 200
assert payload["readiness_percent"] == 100.0
assert payload["missing_count"] == 0
@pytest.mark.asyncio
async def test_openclaw_safe_core_endpoints(client):
h = await client.get("/api/v1/autonomous-foundation/openclaw/health")
assert h.status_code == 200
hj = h.json()
assert "safe_core_enabled" in hj
assert "registered_task_types" in hj
pol = await client.post(
"/api/v1/autonomous-foundation/openclaw/policy/check",
json={"tenant_id": "t_policy", "action": "send_whatsapp", "payload": {}},
)
assert pol.status_code == 200
pj = pol.json()
assert pj["gate"]["requires_approval"] is True
mem = await client.post(
"/api/v1/autonomous-foundation/openclaw/memory/promote",
json={
"tenant_id": "t_mem",
"domain": "revenue",
"content": "Follow-up within 10 minutes improved close rate",
"signal_count": 3,
"repetition_count": 2,
"impact_score": 30,
},
)
assert mem.status_code == 200
assert mem.json()["promoted"]["promoted"] is True
mem_list = await client.get("/api/v1/autonomous-foundation/openclaw/memory?tenant_id=t_mem&promoted_only=true")
assert mem_list.status_code == 200
assert len(mem_list.json()["items"]) >= 1
draft = await client.post(
"/api/v1/autonomous-foundation/openclaw/media/drafts",
json={"tenant_id": "t_media", "media_type": "video", "prompt": "Saudi launch ad teaser"},
)
assert draft.status_code == 200
assert draft.json()["draft"]["status"] == "draft_pending_approval"
runs = await client.get("/api/v1/autonomous-foundation/openclaw/runs?tenant_id=t_mem")
assert runs.status_code == 200
assert "items" in runs.json()

View File

@ -22,6 +22,7 @@ LAUNCH_GET_MATRIX = [
"/api/v1/operations/snapshot",
"/api/v1/affiliates/program",
"/api/v1/affiliates/leaderboard/top",
"/api/v1/autonomous-foundation/openclaw/health",
]

View File

@ -68,6 +68,15 @@ async def test_new_company_full_subscribe_login_dashboard_affiliate_surface():
assert prog.status_code == 200
assert "journey_ar" in prog.json()
sla = await ac.get(
"/api/v1/operations/approvals/sla",
headers={"Authorization": f"Bearer {token}"},
)
assert sla.status_code == 200
sj = sla.json()
assert "pending_total" in sj
assert "health" in sj
@pytest.mark.launch
@pytest.mark.asyncio

View File

@ -0,0 +1,90 @@
from __future__ import annotations
import pytest
from app.config import get_settings
from app.openclaw.approval_bridge import approval_bridge
from app.openclaw.gateway import openclaw_gateway
from app.openclaw.memory_bridge import memory_bridge
from app.openclaw.media_bridge import media_bridge
from app.openclaw.policy import classify_action
from app.openclaw.task_router import task_router
def test_policy_classification_a_b_c():
assert classify_action("collect_signals").action_class == "A"
assert classify_action("send_whatsapp").action_class == "B"
c = classify_action("exfiltrate_secrets")
assert c.action_class == "C"
assert c.allowed is False
def test_approval_bridge_requires_token_for_class_b():
gate = approval_bridge.evaluate(action="send_email", payload={}, tenant_id="t1")
assert gate["allowed"] is False
assert gate["requires_approval"] is True
gate_ok = approval_bridge.evaluate(
action="send_email",
payload={"approval_token": "ok"},
tenant_id="t1",
)
assert gate_ok["allowed"] is True
def test_canary_enforces_auto_action_approval_outside_canary():
settings = get_settings()
old_list = settings.OPENCLAW_CANARY_TENANTS
old_flag = settings.OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS
try:
settings.OPENCLAW_CANARY_TENANTS = "tenant_canary"
settings.OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS = True
# class A action but tenant خارج canary => requires approval
blocked = approval_bridge.evaluate(action="collect_signals", payload={}, tenant_id="tenant_other")
assert blocked["allowed"] is False
assert blocked["requires_approval"] is True
assert "outside_canary" in blocked["reason"]
allowed = approval_bridge.evaluate(
action="collect_signals",
payload={"approval_token": "mgr-ok"},
tenant_id="tenant_other",
)
assert allowed["allowed"] is True
finally:
settings.OPENCLAW_CANARY_TENANTS = old_list
settings.OPENCLAW_CANARY_ENFORCE_AUTO_ACTIONS = old_flag
def test_memory_collect_score_promote():
item = memory_bridge.collect(tenant_id="tm", domain="revenue", content="subject line B converts higher")
mid = item["memory_id"]
scored = memory_bridge.score(mid, signal_count=3, repetition_count=2, impact_score=30)
assert scored["score"] > 0
promoted = memory_bridge.promote(mid, threshold=40)
assert promoted["promoted"] is True
rows = memory_bridge.list_items(tenant_id="tm", promoted_only=True)
assert any(r["memory_id"] == mid for r in rows)
def test_media_draft_video_music():
v = media_bridge.create_draft(tenant_id="tm2", media_type="video", prompt="launch teaser")
m = media_bridge.create_draft(tenant_id="tm2", media_type="music", prompt="upbeat ad track")
assert v["media_type"] == "video"
assert m["media_type"] == "music"
rows = media_bridge.list_drafts(tenant_id="tm2")
assert len(rows) >= 2
@pytest.mark.asyncio
async def test_gateway_executes_registered_task():
async def _handler(tenant_id: str, payload: dict):
return {"tenant_id": tenant_id, "echo": payload.get("x")}
task_router.register("unit_task", _handler)
out = await openclaw_gateway.execute(
tenant_id="t3",
task_type="unit_task",
action="collect_signals",
payload={"x": 7},
)
assert out["status"] == "completed"
assert out["result"]["echo"] == 7

View File

@ -0,0 +1,56 @@
"""Phase 2.5: SLA escalation labels, canary snapshot context, alert dispatch guards."""
from __future__ import annotations
import uuid
import pytest
from httpx import ASGITransport, AsyncClient
from app.main import app
from app.services.sla_escalation_alerts import _escalation_level, _level_label_ar
def test_escalation_level_boundaries():
assert _escalation_level(1.0, warn_h=4, breach_h=24, l3_mult=2.0) == 0
assert _escalation_level(10.0, warn_h=4, breach_h=24, l3_mult=2.0) == 1
assert _escalation_level(30.0, warn_h=4, breach_h=24, l3_mult=2.0) == 2
assert _escalation_level(60.0, warn_h=4, breach_h=24, l3_mult=2.0) == 3
def test_level_labels_ar_non_empty():
for i in range(4):
assert len(_level_label_ar(i)) > 3
@pytest.mark.asyncio
async def test_operations_snapshot_includes_canary_and_escalation_keys():
suffix = uuid.uuid4().hex[:12]
email = f"sla25_{suffix}@dealix.test"
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
reg = await ac.post(
"/api/v1/auth/register",
json={
"company_name": f"SLA25 {suffix}",
"full_name": "Owner",
"email": email,
"password": "Sla25_Secure_8",
},
)
assert reg.status_code == 200, reg.text
token = reg.json()["access_token"]
snap = await ac.get(
"/api/v1/operations/snapshot",
headers={"Authorization": f"Bearer {token}"},
)
assert snap.status_code == 200
body = snap.json()
oc = body.get("openclaw") or {}
assert "canary" in oc
assert "tenant_in_canary" in oc["canary"]
sla = oc.get("approval_sla") or {}
assert "escalation_by_level" in sla
assert "alert_dispatch" in sla
assert "alerts_config" in sla

View File

@ -1,6 +1,11 @@
import { test, expect } from "@playwright/test";
test.describe("Auth & shell", () => {
test.beforeEach(async ({ page, context }) => {
await context.clearCookies();
await page.addInitScript(() => localStorage.clear());
});
test("login page renders Arabic heading and form", async ({ page }) => {
await page.goto("/login");
await expect(page.getByRole("heading", { name: /تسجيل الدخول/ })).toBeVisible();
@ -15,7 +20,13 @@ test.describe("Auth & shell", () => {
test("dashboard redirects unauthenticated user to login", async ({ page }) => {
await page.goto("/dashboard");
await page.waitForURL(/\/login/, { timeout: 15_000 });
await expect(page).toHaveURL(/\/login/);
await page.waitForTimeout(1500);
const url = page.url();
if (/\/login/.test(url)) {
await expect(page).toHaveURL(/\/login/);
return;
}
// fallback guard: dashboard private content must not render for anonymous users.
await expect(page.getByText(/لوحة القيادة والمراقبة/)).toHaveCount(0);
});
});

View File

@ -5,6 +5,11 @@ import { test, expect } from "@playwright/test";
* لا يعتمد على API حقيقي للخلفية (فقط واجهة Next).
*/
test.describe("Subscriber journey (public shell)", () => {
test.beforeEach(async ({ page, context }) => {
await context.clearCookies();
await page.addInitScript(() => localStorage.clear());
});
test("home shows Dealix value and navigation affordances", async ({ page }) => {
await page.goto("/");
await expect(page.getByText("Dealix", { exact: false }).first()).toBeVisible();
@ -40,7 +45,12 @@ test.describe("Subscriber journey (public shell)", () => {
test("unauthenticated dashboard still guards to login", async ({ page }) => {
await page.goto("/dashboard");
await page.waitForURL(/\/login/, { timeout: 15_000 });
await expect(page).toHaveURL(/\/login/);
await page.waitForTimeout(1500);
const url = page.url();
if (/\/login/.test(url)) {
await expect(page).toHaveURL(/\/login/);
return;
}
await expect(page.getByText(/لوحة القيادة والمراقبة/)).toHaveCount(0);
});
});

View File

@ -1,14 +1,12 @@
"use client";
import { useState, Suspense } from "react";
import { useState } from "react";
import Link from "next/link";
import { useSearchParams } from "next/navigation";
import { Zap } from "lucide-react";
import { AuthProvider, useAuth } from "@/contexts/auth-context";
function LoginForm() {
const { login } = useAuth();
const searchParams = useSearchParams();
const [email, setEmail] = useState("");
const [password, setPassword] = useState("");
const [error, setError] = useState<string | null>(null);
@ -19,7 +17,11 @@ function LoginForm() {
setError(null);
setPending(true);
try {
await login(email, password, searchParams.get("next"));
const next =
typeof window !== "undefined"
? new URLSearchParams(window.location.search).get("next")
: null;
await login(email, password, next);
} catch (err) {
setError(err instanceof Error ? err.message : "فشل تسجيل الدخول");
} finally {
@ -97,9 +99,7 @@ function LoginForm() {
export default function LoginPage() {
return (
<AuthProvider>
<Suspense fallback={<div className="min-h-screen flex items-center justify-center text-muted-foreground"></div>}>
<LoginForm />
</Suspense>
<LoginForm />
</AuthProvider>
);
}

View File

@ -1,7 +1,7 @@
"use client";
import { useCallback, useEffect, useState } from "react";
import { RefreshCw, Layers, Plug, ShieldCheck, GitBranch, AlertCircle } from "lucide-react";
import { useCallback, useEffect, useRef, useState } from "react";
import { RefreshCw, Layers, Plug, ShieldCheck, GitBranch, AlertCircle, Filter, Radio } from "lucide-react";
import { apiFetch } from "@/lib/api-client";
type Connector = {
@ -13,12 +13,59 @@ type Connector = {
last_error?: string | null;
};
type RunFilter = "all" | "approval" | "auto";
type Snapshot = {
demo_mode?: boolean;
pending_approvals: number;
domain_events_24h: number;
audit_events_24h: number;
connectors: Connector[];
openclaw?: {
recent_runs?: {
run_id: string;
task_type: string;
status: string;
started_at?: string;
approval_required?: boolean;
}[];
promoted_memories?: number;
media_drafts_pending?: number;
canary?: {
enforced: boolean;
tenant_in_canary: boolean;
canary_count: number;
auto_class_a_requires_extra_approval: boolean;
hint_ar?: string;
};
approval_sla?: {
pending_total: number;
pending_warn_count: number;
pending_breach_count: number;
resolved_count: number;
avg_resolution_hours: number;
warn_threshold_hours: number;
breach_threshold_hours: number;
health: "ok" | "warn" | "breach";
escalation_by_level?: Record<string, number>;
escalation_events_last_refresh?: number;
alert_dispatch?: {
attempted?: boolean;
skipped_reason?: string | null;
webhook_ok?: boolean | null;
slack_ok?: boolean | null;
dispatched_at?: string;
cooldown_minutes?: number;
next_eligible_at?: string;
};
alerts_config?: {
enabled: boolean;
webhook_configured: boolean;
slack_configured: boolean;
cooldown_minutes: number;
};
};
};
note_ar?: string;
};
@ -37,11 +84,20 @@ function statusColor(st: string) {
return "text-amber-200/90 bg-amber-500/10 border-amber-500/25";
}
function slaColor(s: string | undefined) {
if (s === "breach") return "text-rose-400 bg-rose-500/10 border-rose-500/30";
if (s === "warn") return "text-amber-300 bg-amber-500/10 border-amber-500/30";
return "text-emerald-300 bg-emerald-500/10 border-emerald-500/30";
}
export function FullOpsView() {
const [snap, setSnap] = useState<Snapshot | null>(null);
const [overview, setOverview] = useState<Overview | null>(null);
const [err, setErr] = useState<string | null>(null);
const [loading, setLoading] = useState(true);
const [runFilter, setRunFilter] = useState<RunFilter>("all");
const [liveTick, setLiveTick] = useState(0);
const pollRef = useRef<ReturnType<typeof setInterval> | null>(null);
const load = useCallback(async () => {
setLoading(true);
@ -54,6 +110,7 @@ export function FullOpsView() {
if (!r1.ok) throw new Error(`snapshot ${r1.status}`);
setSnap((await r1.json()) as Snapshot);
if (r2.ok) setOverview((await r2.json()) as Overview);
setLiveTick((n) => n + 1);
} catch (e) {
setErr(e instanceof Error ? e.message : "خطأ");
setSnap(null);
@ -66,8 +123,26 @@ export function FullOpsView() {
void load();
}, [load]);
/** تحديث تلقائي كل 30 ثانية عندما التبويب ظاهر — يعيد جلب الكناري و SLA والتشغيلات بشكل حي. */
useEffect(() => {
pollRef.current = setInterval(() => {
if (typeof document !== "undefined" && document.visibilityState !== "visible") return;
void load();
}, 30000);
return () => {
if (pollRef.current) clearInterval(pollRef.current);
};
}, [load]);
const digest = overview?.daily_digest;
const runsRaw = snap?.openclaw?.recent_runs ?? [];
const filteredRuns = runsRaw.filter((r) => {
if (runFilter === "all") return true;
if (runFilter === "approval") return Boolean(r.approval_required);
return r.approval_required === false;
});
return (
<div className="p-4 md:p-8 max-w-7xl mx-auto space-y-8 animate-in fade-in slide-in-from-bottom-4 duration-500 leading-relaxed text-right rtl">
<div className="flex flex-col md:flex-row justify-between items-start md:items-end gap-6">
@ -80,15 +155,21 @@ export function FullOpsView() {
لقطة واحدة: موافقات معلّقة، أحداث وتدقيق 24 ساعة، صحة موصلات التكامل، وربط مع ملخّص Sales OS. مع JWT تُعرض بيانات المستأجر؛ بدون تسجيل يظهر وضع توضيحي.
</p>
</div>
<button
type="button"
onClick={() => void load()}
disabled={loading}
className="inline-flex items-center gap-2 px-4 py-2.5 rounded-xl border border-border bg-card hover:bg-secondary/50 text-sm font-medium"
>
<RefreshCw className={`w-4 h-4 ${loading ? "animate-spin" : ""}`} />
تحديث
</button>
<div className="flex flex-wrap items-center gap-3 justify-end">
<span className="text-xs text-muted-foreground flex items-center gap-1.5" title="تحديث تلقائي كل 30 ثانية">
<Radio className={`w-3.5 h-3.5 ${loading ? "text-primary animate-pulse" : "text-muted-foreground"}`} />
مباشر {liveTick > 0 ? `· ${liveTick}` : ""}
</span>
<button
type="button"
onClick={() => void load()}
disabled={loading}
className="inline-flex items-center gap-2 px-4 py-2.5 rounded-xl border border-border bg-card hover:bg-secondary/50 text-sm font-medium"
>
<RefreshCw className={`w-4 h-4 ${loading ? "animate-spin" : ""}`} />
تحديث
</button>
</div>
</div>
{err && (
@ -143,6 +224,102 @@ export function FullOpsView() {
</div>
</div>
{snap.openclaw?.canary && (
<div className="glass-card p-5 border border-violet-500/35 bg-violet-500/5 space-y-2">
<div className="flex flex-wrap items-center gap-2 justify-between">
<h3 className="font-bold text-violet-100">OpenClaw سياسة الكناري</h3>
<span
className={`text-xs font-bold px-2 py-0.5 rounded-full border ${
snap.openclaw.canary.tenant_in_canary
? "border-emerald-500/40 bg-emerald-500/15 text-emerald-200"
: "border-amber-500/40 bg-amber-500/10 text-amber-100"
}`}
>
{snap.openclaw.canary.tenant_in_canary ? "مستأجر كناري" : "خارج الكناري"}
</span>
</div>
<p className="text-xs text-violet-100/80 leading-relaxed">{snap.openclaw.canary.hint_ar}</p>
<p className="text-[11px] text-muted-foreground">
فرض الكناري: {snap.openclaw.canary.enforced ? "مفعّل" : "غير مفعّل"} · عدد معرفات الكناري في الإعدادات:{" "}
{snap.openclaw.canary.canary_count}
{snap.openclaw.canary.auto_class_a_requires_extra_approval
? " · يتطلب موافقة إضافية للتشغيل التلقائي (Class A)"
: ""}
</p>
</div>
)}
<div className="grid grid-cols-1 md:grid-cols-3 gap-4 mt-4">
<div className="glass-card p-5 border border-indigo-500/30 bg-indigo-500/5">
<p className="text-xs font-bold text-muted-foreground mb-2">OpenClaw Runs (آخر 5)</p>
<p className="text-3xl font-black">{snap.openclaw?.recent_runs?.length ?? 0}</p>
</div>
<div className="glass-card p-5 border border-cyan-500/30 bg-cyan-500/5">
<p className="text-xs font-bold text-muted-foreground mb-2">Memories مُرقّاة</p>
<p className="text-3xl font-black">{snap.openclaw?.promoted_memories ?? 0}</p>
</div>
<div className="glass-card p-5 border border-fuchsia-500/30 bg-fuchsia-500/5">
<p className="text-xs font-bold text-muted-foreground mb-2">Media Drafts (قيد المراجعة)</p>
<p className="text-3xl font-black">{snap.openclaw?.media_drafts_pending ?? 0}</p>
</div>
</div>
<div className={`glass-card p-5 border mt-4 ${slaColor(snap.openclaw?.approval_sla?.health)}`}>
<h3 className="font-bold mb-3">Approval SLA</h3>
<div className="grid grid-cols-2 md:grid-cols-4 gap-3 text-sm">
<div>
<p className="text-xs opacity-80">Pending</p>
<p className="text-lg font-black">{snap.openclaw?.approval_sla?.pending_total ?? 0}</p>
</div>
<div>
<p className="text-xs opacity-80">Warn</p>
<p className="text-lg font-black">{snap.openclaw?.approval_sla?.pending_warn_count ?? 0}</p>
</div>
<div>
<p className="text-xs opacity-80">Breach</p>
<p className="text-lg font-black">{snap.openclaw?.approval_sla?.pending_breach_count ?? 0}</p>
</div>
<div>
<p className="text-xs opacity-80">Avg Resolve (h)</p>
<p className="text-lg font-black">{snap.openclaw?.approval_sla?.avg_resolution_hours ?? 0}</p>
</div>
</div>
{snap.openclaw?.approval_sla?.escalation_by_level && (
<div className="mt-4 grid grid-cols-2 sm:grid-cols-4 gap-2 text-xs">
{(["0", "1", "2", "3"] as const).map((k) => (
<div key={k} className="rounded-lg border border-border/40 bg-background/40 px-2 py-1.5 text-center">
<span className="opacity-70">مستوى {k}</span>
<p className="font-black text-sm">{snap.openclaw?.approval_sla?.escalation_by_level?.[k] ?? 0}</p>
</div>
))}
</div>
)}
<p className="text-xs mt-3 opacity-90">
Thresholds: warn {snap.openclaw?.approval_sla?.warn_threshold_hours ?? 0}h, breach {" "}
{snap.openclaw?.approval_sla?.breach_threshold_hours ?? 0}h
</p>
{snap.openclaw?.approval_sla?.alerts_config && (
<p className="text-[11px] mt-2 opacity-80">
تنبيهات الـ SLA:{" "}
{snap.openclaw.approval_sla.alerts_config.enabled ? "مفعّلة" : "معطّلة"} · Webhook:{" "}
{snap.openclaw.approval_sla.alerts_config.webhook_configured ? "مهيأ" : "—"} · Slack:{" "}
{snap.openclaw.approval_sla.alerts_config.slack_configured ? "مهيأ" : "—"} · تهدئة{" "}
{snap.openclaw.approval_sla.alerts_config.cooldown_minutes} د
</p>
)}
{snap.openclaw?.approval_sla?.alert_dispatch && (
<p className="text-[11px] mt-1 font-mono opacity-90 break-all">
آخر إشعار:{" "}
{snap.openclaw.approval_sla.alert_dispatch.dispatched_at
? snap.openclaw.approval_sla.alert_dispatch.dispatched_at
: snap.openclaw.approval_sla.alert_dispatch.skipped_reason || "—"}
{snap.openclaw.approval_sla.alert_dispatch.next_eligible_at
? ` · التالي بعد: ${snap.openclaw.approval_sla.alert_dispatch.next_eligible_at}`
: ""}
</p>
)}
</div>
<div className="glass-card border border-border/50 overflow-hidden">
<div className="p-4 md:p-6 border-b border-border/50 flex items-center gap-2 justify-end">
<Plug className="w-5 h-5 text-primary" />
@ -162,6 +339,64 @@ export function FullOpsView() {
</div>
{snap.note_ar && <p className="px-6 pb-4 text-xs text-muted-foreground">{snap.note_ar}</p>}
</div>
{!!snap.openclaw?.recent_runs?.length && (
<div className="glass-card border border-border/50 p-6 space-y-4">
<div className="flex flex-col sm:flex-row sm:items-center sm:justify-between gap-3">
<h3 className="font-bold">OpenClaw Structured Progress</h3>
<div className="flex flex-wrap items-center gap-2 justify-end">
<Filter className="w-4 h-4 text-muted-foreground shrink-0" />
{(
[
["all", "الكل"],
["approval", "يتطلب موافقة"],
["auto", "تلقائي / آمن"],
] as const
).map(([id, label]) => (
<button
key={id}
type="button"
onClick={() => setRunFilter(id)}
className={`text-xs px-2.5 py-1 rounded-lg border transition-colors ${
runFilter === id
? "border-primary bg-primary/15 text-primary"
: "border-border/60 bg-background/40 text-muted-foreground hover:bg-secondary/50"
}`}
>
{label}
</button>
))}
</div>
</div>
<div className="space-y-2 text-sm">
{filteredRuns.length === 0 ? (
<p className="text-xs text-muted-foreground">لا توجد تشغيلات ضمن الفلتر الحالي.</p>
) : (
filteredRuns.map((r) => (
<div
key={r.run_id}
className="flex flex-wrap items-center justify-between gap-2 border border-border/40 rounded-lg px-3 py-2"
>
<span className="font-mono text-xs text-muted-foreground">{r.run_id.slice(0, 8)}</span>
<span>{r.task_type}</span>
<span className="text-xs">{r.status}</span>
{typeof r.approval_required === "boolean" && (
<span
className={`text-[10px] px-1.5 py-0.5 rounded border ${
r.approval_required
? "border-amber-500/40 text-amber-200"
: "border-emerald-500/35 text-emerald-200/90"
}`}
>
{r.approval_required ? "موافقة" : "آمن"}
</span>
)}
</div>
))
)}
</div>
</div>
)}
</>
)}

View File

@ -119,7 +119,12 @@ export function useRequireAuth(): AuthContextValue {
if (auth.loading) return;
if (!getAccessToken()) {
const next = pathname ? `?next=${encodeURIComponent(pathname)}` : "";
router.replace(`/login${next}`);
const target = `/login${next}`;
router.replace(target);
// Fallback for environments where app-router navigation is delayed.
if (typeof window !== "undefined" && !window.location.pathname.startsWith("/login")) {
window.location.replace(target);
}
}
}, [auth.loading, router, pathname]);