From 379dcf941ea24086f07a9d1e646f6daa48b0a2d7 Mon Sep 17 00:00:00 2001 From: Sami Assiri Date: Thu, 16 Apr 2026 07:18:14 +0300 Subject: [PATCH] feat(core-os): implement Sprint 3-4 Control Plane (Memory Store, Provider Router, Verification Ledger) --- .../backend/app/services/core_os/__init__.py | 3 + .../services/core_os/project_memory_store.py | 85 +++++++++++++++++++ .../app/services/core_os/provider_router.py | 67 +++++++++++++++ .../services/core_os/verification_ledger.py | 82 ++++++++++++++++++ 4 files changed, 237 insertions(+) create mode 100644 salesflow-saas/backend/app/services/core_os/__init__.py create mode 100644 salesflow-saas/backend/app/services/core_os/project_memory_store.py create mode 100644 salesflow-saas/backend/app/services/core_os/provider_router.py create mode 100644 salesflow-saas/backend/app/services/core_os/verification_ledger.py diff --git a/salesflow-saas/backend/app/services/core_os/__init__.py b/salesflow-saas/backend/app/services/core_os/__init__.py new file mode 100644 index 00000000..effa70bb --- /dev/null +++ b/salesflow-saas/backend/app/services/core_os/__init__.py @@ -0,0 +1,3 @@ +""" +Core OS Services - Dealix Sovereign Growth OS +""" diff --git a/salesflow-saas/backend/app/services/core_os/project_memory_store.py b/salesflow-saas/backend/app/services/core_os/project_memory_store.py new file mode 100644 index 00000000..df1b2660 --- /dev/null +++ b/salesflow-saas/backend/app/services/core_os/project_memory_store.py @@ -0,0 +1,85 @@ +import os +import json +import uuid +from typing import Dict, Any, List, Optional +from datetime import datetime +from pathlib import Path + +class ProjectMemoryStore: + """ + Second Brain & Project Memory Store for Sovereign OS. + File-based local memory with strict schemas to maintain institutional knowledge + without turning into an unstructured dump. + """ + + MEMORY_DOMAINS = [ + "architecture", "adr", "runbooks", "releases", "postmortems", + "growth", "partners", "ma", "seo", "security", "providers", + "benchmarks", "patterns", "prompts", "experiments", "customers" + ] + + def __init__(self, base_path: str = "memory"): + self.base_path = Path(base_path) + self._initialize_structure() + + def _initialize_structure(self): + """Creates the internal memory domain folders if they don't exist.""" + for domain in self.MEMORY_DOMAINS: + domain_path = self.base_path / domain + domain_path.mkdir(parents=True, exist_ok=True) + + def store_item(self, domain: str, title: str, memory_type: str, owner: str, + confidence: int, summary: str, links: List[str] = None, + tags: List[str] = None, review_date: str = None) -> str: + """ + Ingests a decision memo, realization, or learning into the Memory OS. + Returns the memory ID. + """ + if domain not in self.MEMORY_DOMAINS: + raise ValueError(f"Invalid memory domain: {domain}. Must be one of {self.MEMORY_DOMAINS}") + + if confidence < 0 or confidence > 100: + raise ValueError("Confidence must be between 0 and 100.") + + memory_id = f"mem_{uuid.uuid4().hex[:8]}" + timestamp = datetime.utcnow().isoformat() + + item = { + "id": memory_id, + "title": title, + "type": memory_type, + "owner": owner, + "date": timestamp, + "confidence": confidence, + "summary": summary, + "links": links or [], + "tags": tags or [], + "review_date": review_date, + "status": "active" + } + + file_path = self.base_path / domain / f"{memory_id}.json" + + with open(file_path, "w", encoding="utf-8") as f: + json.dump(item, f, ensure_ascii=False, indent=2) + + return memory_id + + def retrieve_by_tags(self, domain: str, tags: List[str]) -> List[Dict[str, Any]]: + """Retrieve memory items matching specific tags within a domain.""" + domain_path = self.base_path / domain + if not domain_path.exists(): + return [] + + results = [] + for file_path in domain_path.glob("*.json"): + try: + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + if any(tag in data.get("tags", []) for tag in tags): + results.append(data) + except Exception: + continue + + # Sort by confidence descending + return sorted(results, key=lambda x: x.get("confidence", 0), reverse=True) diff --git a/salesflow-saas/backend/app/services/core_os/provider_router.py b/salesflow-saas/backend/app/services/core_os/provider_router.py new file mode 100644 index 00000000..5d715a5f --- /dev/null +++ b/salesflow-saas/backend/app/services/core_os/provider_router.py @@ -0,0 +1,67 @@ +from typing import Dict, Any, Optional + +class ProviderRouter: + """ + Model & Provider Routing Layer for Sovereign OS. + Determines the appropriate execution environment (Cloud vs Local) based on task parameters. + """ + + # Priority list of models based on environments + # For Dealix Sovereign OS, maintaining PDPL (Saudi Data Privacy Law) is crucial. + PROVIDERS = { + "cloud_coding": ["claude-3-5-sonnet", "gpt-4o"], + "cloud_reasoning": ["claude-3-opus"], + "local_private": ["atomic-chat-local", "llama-3-8b"], # Example local inference + "ops_agent": ["goose-cli"] # Specialized terminal manipulation + } + + def __init__(self): + pass + + def route_task(self, task_type: str, privacy_sensitivity: str, + latency_budget_ms: int = 5000) -> Dict[str, Any]: + """ + Takes task requirements and outputs the selected provider, backup chain, + and routing rationale. + + Args: + task_type: "code", "research", "summarization", "financial_diligence" + privacy_sensitivity: "public", "internal", "highly_confidential" + latency_budget_ms: maximum allowed time for first byte or completion. + + Returns: Dict containing selected provider info. + """ + route_decision = { + "selected_provider": "", + "backup_chain": [], + "reason": "", + "retry_rules": {"max_retries": 3, "backoff": "exponential"} + } + + # Rule 1: Highly confidential tasks (M&A DD, financials) must be routed locally + if privacy_sensitivity == "highly_confidential": + route_decision["selected_provider"] = self.PROVIDERS["local_private"][0] + route_decision["backup_chain"] = [self.PROVIDERS["local_private"][1]] + route_decision["reason"] = "PDPL/High Confidentiality enforcement overrides cloud." + return route_decision + + # Rule 2: Operations and deployments route to specialized agent + if task_type == "deployment_ops": + route_decision["selected_provider"] = self.PROVIDERS["ops_agent"][0] + route_decision["backup_chain"] = [] + route_decision["reason"] = "Task requires direct terminal/OS manipulation." + return route_decision + + # Rule 3: Complex reasoning routes to heaviest cloud models (if allowed) + if task_type in ["financial_diligence", "alliance_structuring"]: + route_decision["selected_provider"] = self.PROVIDERS["cloud_reasoning"][0] + route_decision["backup_chain"] = self.PROVIDERS["cloud_coding"] + route_decision["reason"] = "Task demands extreme reasoning fidelity and is not tightly bound by latency." + return route_decision + + # Default fallback: Standard Cloud execution with latency awareness + route_decision["selected_provider"] = self.PROVIDERS["cloud_coding"][0] + route_decision["backup_chain"] = [self.PROVIDERS["cloud_coding"][1]] + route_decision["reason"] = "Default general purpose routing." + + return route_decision diff --git a/salesflow-saas/backend/app/services/core_os/verification_ledger.py b/salesflow-saas/backend/app/services/core_os/verification_ledger.py new file mode 100644 index 00000000..c8a7c21e --- /dev/null +++ b/salesflow-saas/backend/app/services/core_os/verification_ledger.py @@ -0,0 +1,82 @@ +import os +import json +import uuid +import hashlib +from typing import Dict, Any, List +from datetime import datetime +from pathlib import Path + +class VerificationLedger: + """ + Tool Verification Layer for Dealix Sovereign OS. + Prevents Agent Hallucinations and ensures all autonomous actions are verifiable. + Implements the ToolProof pattern: logging Intent, Claim, Actual Execution, and side-effects. + """ + + def __init__(self, ledger_path: str = "memory/verification_ledger"): + self.ledger_path = Path(ledger_path) + self.ledger_path.mkdir(parents=True, exist_ok=True) + + def hash_parameters(self, params: Dict[str, Any]) -> str: + """Create a deterministic hash of tool parameters for audit matching.""" + serialized = json.dumps(params, sort_keys=True) + return hashlib.sha256(serialized.encode("utf-8")).hexdigest() + + def create_proof(self, agent_id: str, task_id: str, intended_action: str, + claimed_action: str, current_tool_call: str, + parameters: Dict[str, Any]) -> str: + """ + Creates an unverified tool proof record BEFORE the tool executes. + """ + run_id = f"tx_{uuid.uuid4().hex[:10]}" + timestamp = datetime.utcnow().isoformat() + + proof = { + "run_id": run_id, + "agent_id": agent_id, + "task_id": task_id, + "intended_action": intended_action, + "claimed_action": claimed_action, + "actual_tool_call": current_tool_call, + "parameters_hash": self.hash_parameters(parameters), + "timestamps": {"started": timestamp}, + "side_effects": [], + "evidence_paths": [], + "verification_status": "unverified" + } + + self._write_proof(run_id, proof) + return run_id + + def resolve_proof(self, run_id: str, side_effects: List[str], + evidence_paths: List[str], status: str): + """ + Updates the proof AFTER execution with actual side effects and sets status to verified. + Status must be 'verified', 'partially_verified', 'unverified', or 'contradicted'. + """ + valid_statuses = ["verified", "partially_verified", "unverified", "contradicted"] + if status not in valid_statuses: + raise ValueError(f"Status must be one of {valid_statuses}") + + proof = self._read_proof(run_id) + if not proof: + raise KeyError(f"Run ID {run_id} not found in ledger.") + + proof["side_effects"] = side_effects + proof["evidence_paths"] = evidence_paths + proof["verification_status"] = status + proof["timestamps"]["resolved"] = datetime.utcnow().isoformat() + + self._write_proof(run_id, proof) + + def _write_proof(self, run_id: str, proof: Dict[str, Any]): + file_path = self.ledger_path / f"{run_id}.json" + with open(file_path, "w", encoding="utf-8") as f: + json.dump(proof, f, ensure_ascii=False, indent=2) + + def _read_proof(self, run_id: str) -> Dict[str, Any]: + file_path = self.ledger_path / f"{run_id}.json" + if not file_path.exists(): + return None + with open(file_path, "r", encoding="utf-8") as f: + return json.load(f)