From c67164ffeaeb7ae9db07b66ef7e9b0cdd3aa8b98 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 11 Apr 2026 08:22:12 +0000 Subject: [PATCH] fix: Update knowledge brain and tool receipts with final implementations https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj --- .../backend/app/services/knowledge_brain.py | 668 +++++------------- .../backend/app/services/tool_receipts.py | 436 +++--------- 2 files changed, 287 insertions(+), 817 deletions(-) diff --git a/salesflow-saas/backend/app/services/knowledge_brain.py b/salesflow-saas/backend/app/services/knowledge_brain.py index 610a9cb6..d4765f16 100644 --- a/salesflow-saas/backend/app/services/knowledge_brain.py +++ b/salesflow-saas/backend/app/services/knowledge_brain.py @@ -1,560 +1,258 @@ """ Knowledge Brain — Dealix Second Brain Service Project knowledge management: ingest, query, lint, index. -Manages the wiki layer in memory/wiki/ and indexes in memory/indexes/. """ -import logging -import os -import re -import uuid -from datetime import datetime, timedelta, timezone +import logging, re, uuid +from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any, Optional - from pydantic import BaseModel, Field logger = logging.getLogger(__name__) - WIKI_DIR = Path(__file__).resolve().parents[4] / "memory" / "wiki" INDEX_DIR = Path(__file__).resolve().parents[4] / "memory" / "indexes" MEMORY_DIR = Path(__file__).resolve().parents[4] / "memory" -STALE_THRESHOLD_DAYS = 30 +STALE_DAYS = 30 class PageType(str, Enum): - ARCHITECTURE = "architecture" - PRODUCT = "product" - GTM = "gtm" - CUSTOMER = "customer" - OPERATIONS = "operations" - SECURITY = "security" - TOOLING = "tooling" - GLOSSARY = "glossary" - + ARCHITECTURE = "architecture"; PRODUCT = "product"; GTM = "gtm" + CUSTOMER = "customer"; OPERATIONS = "operations"; SECURITY = "security" + TOOLING = "tooling"; GLOSSARY = "glossary" class Confidence(str, Enum): - HIGH = "high" - MEDIUM = "medium" - LOW = "low" - + HIGH = "high"; MEDIUM = "medium"; LOW = "low" class IssueSeverity(str, Enum): - ERROR = "error" - WARNING = "warning" - INFO = "info" + ERROR = "error"; WARNING = "warning"; INFO = "info" class WikiPage(BaseModel): - """Structured wiki page — صفحة ويكي منظمة""" + """صفحة ويكي منظمة — Structured wiki page""" id: str = Field(default_factory=lambda: str(uuid.uuid4())) - title: str - title_ar: str = "" - page_type: PageType - summary: str - summary_ar: str - key_facts: list[str] = [] - provenance: str + title: str; title_ar: str = "" + page_type: PageType; summary: str; summary_ar: str + key_facts: list[str] = []; provenance: str confidence: Confidence = Confidence.MEDIUM related_pages: list[str] = [] last_updated: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - stale: bool = False - file_path: str = "" - body: str = "" - - class Config: - json_schema_extra = { - "example": { - "title": "System Architecture", - "title_ar": "بنية النظام", - "page_type": "architecture", - "summary": "Multi-tenant AI CRM architecture overview", - "summary_ar": "نظرة عامة على بنية إدارة علاقات العملاء متعددة المستأجرين", - } - } - + stale: bool = False; file_path: str = ""; body: str = "" class BrainAnswer(BaseModel): - """Answer from the knowledge brain — إجابة من الدماغ المعرفي""" - question: str - answer: str - answer_ar: str = "" - sources: list[str] = [] - confidence: Confidence = Confidence.LOW + """إجابة من الدماغ المعرفي""" + question: str; answer: str; answer_ar: str = "" + sources: list[str] = []; confidence: Confidence = Confidence.LOW related_pages: list[str] = [] - class BrainIssue(BaseModel): - """Quality issue found during lint — مشكلة جودة مكتشفة أثناء الفحص""" + """مشكلة جودة مكتشفة أثناء الفحص""" issue_id: str = Field(default_factory=lambda: str(uuid.uuid4())) - severity: IssueSeverity - category: str - title: str - title_ar: str - description: str - affected_page: str = "" - recommendation: str = "" + severity: IssueSeverity; category: str + title: str; title_ar: str; description: str + affected_page: str = ""; recommendation: str = "" class KnowledgeBrain: - """ - Project knowledge management — ingest, query, lint. - إدارة المعرفة المشروعية — استيعاب، استعلام، فحص. - """ + """إدارة المعرفة — استيعاب، استعلام، فحص""" def __init__(self, wiki_dir: Path = None, memory_dir: Path = None): self.wiki_dir = wiki_dir or WIKI_DIR self.memory_dir = memory_dir or MEMORY_DIR self.index_dir = INDEX_DIR - self._page_cache: dict[str, WikiPage] = {} - self._ensure_dirs() - - def _ensure_dirs(self) -> None: + self._cache: dict[str, WikiPage] = {} self.wiki_dir.mkdir(parents=True, exist_ok=True) self.index_dir.mkdir(parents=True, exist_ok=True) - def _parse_frontmatter(self, content: str, file_path: str) -> WikiPage: - """Parse wiki page frontmatter into a WikiPage model.""" - lines = content.split("\n") - title = "" - fields: dict[str, Any] = {} - body_start = 0 - - for i, line in enumerate(lines): - stripped = line.strip() - if stripped.startswith("# "): - title = stripped[2:].strip() - elif stripped == "---": - body_start = i + 1 - break - elif stripped.startswith("**") and "**:" in stripped: - match = re.match(r"\*\*(.+?)\*\*:\s*(.*)", stripped) - if match: - key = match.group(1).lower().replace(" ", "_") - value = match.group(2).strip() - fields[key] = value - - body = "\n".join(lines[body_start:]).strip() if body_start > 0 else "" - key_facts = [] - if "key_facts" in fields: - fact_pattern = re.compile(r"^\s*-\s+(.+)$") - in_facts = False - for line in lines: - if "**Key Facts**" in line: - in_facts = True - continue - if in_facts: - fact_match = fact_pattern.match(line) - if fact_match: - key_facts.append(fact_match.group(1).strip()) - elif line.strip().startswith("**"): - break - - related = [] - if "related_pages" in fields: - link_pattern = re.compile(r"\[.+?\]\((.+?)\)") - related = link_pattern.findall(fields["related_pages"]) - - page_type = PageType.ARCHITECTURE - type_val = fields.get("type", "architecture").lower() - for pt in PageType: - if pt.value == type_val: - page_type = pt - break - - conf = Confidence.MEDIUM - conf_val = fields.get("confidence", "medium").lower() - for c in Confidence: - if c.value == conf_val: - conf = c - break - - last_updated = datetime.now(timezone.utc) - if "last_updated" in fields: - try: - last_updated = datetime.strptime( - fields["last_updated"], "%Y-%m-%d" - ).replace(tzinfo=timezone.utc) - except ValueError: - pass - - stale = fields.get("stale", "false").lower() == "true" - - return WikiPage( - title=title, - title_ar=fields.get("title_ar", ""), - page_type=page_type, - summary=fields.get("summary", ""), - summary_ar=fields.get("summary_ar", ""), - key_facts=key_facts, - provenance=fields.get("provenance", ""), - confidence=conf, - related_pages=related, - last_updated=last_updated, - stale=stale, - file_path=file_path, - body=body, - ) + def _parse_frontmatter(self, content: str, fpath: str) -> WikiPage: + lines, title, fields, body_start = content.split("\n"), "", {}, 0 + for i, ln in enumerate(lines): + s = ln.strip() + if s.startswith("# "): title = s[2:].strip() + elif s == "---": body_start = i + 1; break + elif s.startswith("**") and "**:" in s: + m = re.match(r"\*\*(.+?)\*\*:\s*(.*)", s) + if m: fields[m.group(1).lower().replace(" ", "_")] = m.group(2).strip() + body = "\n".join(lines[body_start:]).strip() if body_start else "" + key_facts, in_f = [], False + for ln in lines: + if "**Key Facts**" in ln: in_f = True; continue + if in_f: + fm = re.match(r"^\s*-\s+(.+)$", ln) + if fm: key_facts.append(fm.group(1).strip()) + elif ln.strip().startswith("**"): break + related = re.findall(r"\[.+?\]\((.+?)\)", fields.get("related_pages", "")) + pt = next((p for p in PageType if p.value == fields.get("type", "").lower()), PageType.ARCHITECTURE) + cf = next((c for c in Confidence if c.value == fields.get("confidence", "").lower()), Confidence.MEDIUM) + try: lu = datetime.strptime(fields.get("last_updated", ""), "%Y-%m-%d").replace(tzinfo=timezone.utc) + except ValueError: lu = datetime.now(timezone.utc) + return WikiPage(title=title, page_type=pt, summary=fields.get("summary", ""), + summary_ar=fields.get("summary_ar", ""), key_facts=key_facts, + provenance=fields.get("provenance", ""), confidence=cf, related_pages=related, + last_updated=lu, stale=fields.get("stale", "false").lower() == "true", + file_path=fpath, body=body) async def _load_all_pages(self) -> list[WikiPage]: - """Load and parse all wiki pages.""" pages = [] - if not self.wiki_dir.exists(): - return pages - for md_file in sorted(self.wiki_dir.glob("*.md")): - if md_file.name == "README.md": - continue + if not self.wiki_dir.exists(): return pages + for f in sorted(self.wiki_dir.glob("*.md")): + if f.name == "README.md": continue try: - content = md_file.read_text(encoding="utf-8") - page = self._parse_frontmatter(content, str(md_file)) - self._page_cache[page.id] = page - pages.append(page) - except Exception as exc: - logger.warning("فشل تحميل الصفحة %s: %s", md_file.name, exc) + p = self._parse_frontmatter(f.read_text(encoding="utf-8"), str(f)) + self._cache[p.id] = p; pages.append(p) + except Exception as e: logger.warning("فشل تحميل %s: %s", f.name, e) return pages - async def ingest( - self, - source_type: str, - content: str, - metadata: dict[str, Any] = None, - ) -> WikiPage: - """ - Classify content, create/update wiki page, link related pages. - تصنيف المحتوى، إنشاء/تحديث صفحة ويكي، ربط الصفحات ذات الصلة. - """ - metadata = metadata or {} - title = metadata.get("title", f"Ingested — {source_type}") - title_ar = metadata.get("title_ar", f"مستوعب — {source_type}") - - page_type = self._classify_content(source_type, content) - summary = content[:120].replace("\n", " ").strip() - summary_ar = metadata.get("summary_ar", f"محتوى {source_type} مستوعب تلقائياً") - - existing_pages = await self._load_all_pages() - related = self._find_related(content, existing_pages) - - page = WikiPage( - title=title, - title_ar=title_ar, - page_type=page_type, - summary=summary, - summary_ar=summary_ar, - key_facts=metadata.get("key_facts", []), - provenance=metadata.get("provenance", f"Auto-ingested from {source_type}"), - confidence=Confidence(metadata.get("confidence", "medium")), - related_pages=[p.file_path for p in related[:5]], - body=content, - ) - - file_name = re.sub(r"[^\w\s-]", "", title.lower()).replace(" ", "-")[:50] - file_path = self.wiki_dir / f"{file_name}.md" - page.file_path = str(file_path) - - md_content = self._render_page(page) - file_path.write_text(md_content, encoding="utf-8") - self._page_cache[page.id] = page - - logger.info("تم استيعاب صفحة جديدة: %s (%s)", title, page_type.value) - return page - - def _classify_content(self, source_type: str, content: str) -> PageType: - """Classify content into a page type based on keywords.""" - content_lower = content.lower() - keyword_map = { - PageType.ARCHITECTURE: ["api", "database", "service", "backend", "frontend", "deploy"], - PageType.PRODUCT: ["feature", "roadmap", "user story", "requirement", "ميزة"], - PageType.GTM: ["launch", "marketing", "outreach", "growth", "campaign", "تسويق"], - PageType.CUSTOMER: ["customer", "interview", "feedback", "icp", "عميل"], - PageType.OPERATIONS: ["runbook", "checklist", "process", "deploy", "عملية"], - PageType.SECURITY: ["pdpl", "consent", "security", "compliance", "أمان"], - PageType.TOOLING: ["provider", "api key", "integration", "tool", "أداة"], - } - scores: dict[PageType, int] = {} - for ptype, keywords in keyword_map.items(): - scores[ptype] = sum(1 for kw in keywords if kw in content_lower) - - if source_type in ("adr", "architecture"): - return PageType.ARCHITECTURE - if source_type in ("customer_interview", "feedback"): - return PageType.CUSTOMER - - best = max(scores, key=lambda k: scores[k]) + def _classify(self, source_type: str, content: str) -> PageType: + if source_type in ("adr", "architecture"): return PageType.ARCHITECTURE + if source_type in ("customer_interview", "feedback"): return PageType.CUSTOMER + cl = content.lower() + kw = {PageType.ARCHITECTURE: ["api","database","service","backend"], + PageType.GTM: ["launch","marketing","outreach","growth","تسويق"], + PageType.CUSTOMER: ["customer","interview","feedback","عميل"], + PageType.SECURITY: ["pdpl","consent","security","أمان"], + PageType.TOOLING: ["provider","integration","tool","أداة"], + PageType.OPERATIONS: ["runbook","checklist","process","عملية"]} + scores = {t: sum(1 for w in ws if w in cl) for t, ws in kw.items()} + best = max(scores, key=scores.get) return best if scores[best] > 0 else PageType.PRODUCT def _find_related(self, content: str, pages: list[WikiPage]) -> list[WikiPage]: - """Find related pages by keyword overlap.""" - content_words = set(content.lower().split()) - scored: list[tuple[WikiPage, int]] = [] - for page in pages: - page_words = set(page.summary.lower().split()) | set(page.body.lower().split()[:100]) - overlap = len(content_words & page_words) - if overlap > 2: - scored.append((page, overlap)) - scored.sort(key=lambda x: x[1], reverse=True) - return [p for p, _ in scored[:5]] + cw = set(content.lower().split()) + scored = [(p, len(cw & set(p.summary.lower().split()))) for p in pages] + return [p for p, s in sorted(scored, key=lambda x: -x[1]) if s > 2][:5] - def _render_page(self, page: WikiPage) -> str: - """Render a WikiPage model to markdown.""" - facts = "\n".join(f" - {f}" for f in page.key_facts) if page.key_facts else " - (none)" - related = ", ".join( - f"[{Path(r).stem}]({r})" for r in page.related_pages - ) if page.related_pages else "(none)" - date_str = page.last_updated.strftime("%Y-%m-%d") + def _render(self, p: WikiPage) -> str: + facts = "\n".join(f" - {f}" for f in p.key_facts) or " - (none)" + rels = ", ".join(f"[{Path(r).stem}]({r})" for r in p.related_pages) or "(none)" + return (f"# {p.title}\n\n**Type**: {p.page_type.value}\n**Summary**: {p.summary}\n" + f"**Summary_AR**: {p.summary_ar}\n**Key Facts**:\n{facts}\n" + f"**Provenance**: {p.provenance}\n**Confidence**: {p.confidence.value}\n" + f"**Related Pages**: {rels}\n**Last Updated**: {p.last_updated:%Y-%m-%d}\n" + f"**Stale**: {str(p.stale).lower()}\n\n---\n\n{p.body}\n") - return f"""# {page.title} + async def ingest(self, source_type: str, content: str, metadata: dict[str, Any] = None) -> WikiPage: + """تصنيف المحتوى، إنشاء صفحة ويكي، ربط الصفحات ذات الصلة.""" + md = metadata or {} + existing = await self._load_all_pages() + related = self._find_related(content, existing) + page = WikiPage( + title=md.get("title", f"Ingested — {source_type}"), + title_ar=md.get("title_ar", f"مستوعب — {source_type}"), + page_type=self._classify(source_type, content), + summary=content[:120].replace("\n", " ").strip(), + summary_ar=md.get("summary_ar", f"محتوى {source_type} مستوعب تلقائياً"), + key_facts=md.get("key_facts", []), + provenance=md.get("provenance", f"Auto-ingested from {source_type}"), + confidence=Confidence(md.get("confidence", "medium")), + related_pages=[p.file_path for p in related[:5]], body=content) + fname = re.sub(r"[^\w\s-]", "", page.title.lower()).replace(" ", "-")[:50] + fp = self.wiki_dir / f"{fname}.md"; page.file_path = str(fp) + fp.write_text(self._render(page), encoding="utf-8") + self._cache[page.id] = page + logger.info("تم استيعاب صفحة: %s (%s)", page.title, page.page_type.value) + return page -**Type**: {page.page_type.value} -**Summary**: {page.summary} -**Summary_AR**: {page.summary_ar} -**Key Facts**: -{facts} -**Provenance**: {page.provenance} -**Confidence**: {page.confidence.value} -**Related Pages**: {related} -**Last Updated**: {date_str} -**Stale**: {str(page.stale).lower()} - ---- - -{page.body} -""" - - async def query( - self, question: str, domain: str = None - ) -> BrainAnswer: - """ - Search wiki + memory for relevant answers. - البحث في الويكي والذاكرة عن إجابات ذات صلة. - """ + async def query(self, question: str, domain: str = None) -> BrainAnswer: + """البحث في الويكي والذاكرة عن إجابات ذات صلة.""" pages = await self._load_all_pages() if domain: - try: - dtype = PageType(domain) - pages = [p for p in pages if p.page_type == dtype] - except ValueError: - pass - - question_lower = question.lower() - question_words = set(question_lower.split()) - - scored: list[tuple[WikiPage, float]] = [] - for page in pages: - searchable = f"{page.title} {page.summary} {page.body} {' '.join(page.key_facts)}".lower() - searchable_words = set(searchable.split()) - overlap = len(question_words & searchable_words) - if overlap > 0: - score = overlap / max(len(question_words), 1) - if page.confidence == Confidence.HIGH: - score *= 1.3 - elif page.confidence == Confidence.LOW: - score *= 0.7 - scored.append((page, score)) - - scored.sort(key=lambda x: x[1], reverse=True) - top_pages = scored[:3] - - if not top_pages: - return BrainAnswer( - question=question, - answer="لم يتم العثور على معلومات ذات صلة في قاعدة المعرفة.", - answer_ar="لم يتم العثور على معلومات ذات صلة في قاعدة المعرفة.", - confidence=Confidence.LOW, - ) - - best_page = top_pages[0][0] - best_score = top_pages[0][1] - - answer_parts = [best_page.summary] - if best_page.key_facts: - answer_parts.append("Key facts: " + "; ".join(best_page.key_facts[:3])) - - conf = Confidence.HIGH if best_score > 0.5 else (Confidence.MEDIUM if best_score > 0.2 else Confidence.LOW) - - return BrainAnswer( - question=question, - answer=" ".join(answer_parts), - answer_ar=best_page.summary_ar or "لا يوجد ملخص عربي", - sources=[p.file_path for p, _ in top_pages], - confidence=conf, - related_pages=[p.file_path for p, _ in top_pages], - ) + try: pages = [p for p in pages if p.page_type == PageType(domain)] + except ValueError: pass + qw = set(question.lower().split()) + scored = [] + for p in pages: + sw = set(f"{p.title} {p.summary} {' '.join(p.key_facts)}".lower().split()) + ov = len(qw & sw) + if ov > 0: + s = (ov / max(len(qw), 1)) * (1.3 if p.confidence == Confidence.HIGH else 0.7 if p.confidence == Confidence.LOW else 1.0) + scored.append((p, s)) + scored.sort(key=lambda x: -x[1]) + if not scored: + return BrainAnswer(question=question, answer="لم يتم العثور على معلومات ذات صلة.", + answer_ar="لم يتم العثور على معلومات ذات صلة.", confidence=Confidence.LOW) + bp, bs = scored[0] + ans = bp.summary + (" Key facts: " + "; ".join(bp.key_facts[:3]) if bp.key_facts else "") + conf = Confidence.HIGH if bs > 0.5 else Confidence.MEDIUM if bs > 0.2 else Confidence.LOW + return BrainAnswer(question=question, answer=ans, answer_ar=bp.summary_ar or "لا يوجد ملخص عربي", + sources=[p.file_path for p, _ in scored[:3]], confidence=conf, + related_pages=[p.file_path for p, _ in scored[:3]]) async def lint(self) -> list[BrainIssue]: - """ - Check for: orphan pages, stale pages, missing provenance, duplicates, empty indexes. - فحص: صفحات يتيمة، صفحات قديمة، مصدر مفقود، تكرارات، فهارس فارغة. - """ - issues: list[BrainIssue] = [] - pages = await self._load_all_pages() - now = datetime.now(timezone.utc) - all_paths = {p.file_path for p in pages} - all_related_targets: set[str] = set() - - for page in pages: - for rel in page.related_pages: - resolved = str((Path(page.file_path).parent / rel).resolve()) - all_related_targets.add(resolved) - - # Stale check (>30 days) - age = (now - page.last_updated).days - if age > STALE_THRESHOLD_DAYS: - issues.append(BrainIssue( - severity=IssueSeverity.WARNING, - category="stale", - title=f"Stale page: {page.title}", - title_ar=f"صفحة قديمة: {page.title}", - description=f"Last updated {age} days ago (threshold: {STALE_THRESHOLD_DAYS}).", - affected_page=page.file_path, - recommendation="Review and update or archive this page.", - )) - - # Missing provenance - if not page.provenance or page.provenance.strip() == "": - issues.append(BrainIssue( - severity=IssueSeverity.ERROR, - category="provenance", - title=f"Missing provenance: {page.title}", - title_ar=f"مصدر مفقود: {page.title}", - description="Page has no provenance. All pages must cite their source.", - affected_page=page.file_path, - recommendation="Add provenance field with source reference.", - )) - - # Missing Arabic summary - if not page.summary_ar: - issues.append(BrainIssue( - severity=IssueSeverity.WARNING, - category="i18n", - title=f"Missing Arabic summary: {page.title}", - title_ar=f"ملخص عربي مفقود: {page.title}", - description="Page is missing summary_ar. Dealix is Arabic-first.", - affected_page=page.file_path, - recommendation="Add an Arabic summary.", - )) - - # Orphan check - for page in pages: - resolved_path = str(Path(page.file_path).resolve()) - if resolved_path not in all_related_targets and page.page_type != PageType.GLOSSARY: - issues.append(BrainIssue( - severity=IssueSeverity.INFO, - category="orphan", - title=f"Orphan page: {page.title}", - title_ar=f"صفحة يتيمة: {page.title}", - description="No other pages link to this page.", - affected_page=page.file_path, - recommendation="Add a link from a related page or index.", - )) - - # Duplicate check by title similarity - titles = [(p.title.lower().strip(), p) for p in pages] + """فحص: صفحات يتيمة، قديمة، مصدر مفقود، تكرارات، فهارس فارغة.""" + issues, pages, now = [], await self._load_all_pages(), datetime.now(timezone.utc) + targets: set[str] = set() + for p in pages: + for r in p.related_pages: targets.add(str((Path(p.file_path).parent / r).resolve())) + age = (now - p.last_updated).days + if age > STALE_DAYS: + issues.append(BrainIssue(severity=IssueSeverity.WARNING, category="stale", + title=f"Stale: {p.title}", title_ar=f"قديمة: {p.title}", + description=f"Updated {age}d ago", affected_page=p.file_path, + recommendation="Review and update or archive.")) + if not p.provenance: + issues.append(BrainIssue(severity=IssueSeverity.ERROR, category="provenance", + title=f"No provenance: {p.title}", title_ar=f"مصدر مفقود: {p.title}", + description="Missing source.", affected_page=p.file_path, + recommendation="Add provenance.")) + if not p.summary_ar: + issues.append(BrainIssue(severity=IssueSeverity.WARNING, category="i18n", + title=f"No Arabic summary: {p.title}", title_ar=f"ملخص عربي مفقود: {p.title}", + description="Arabic-first.", affected_page=p.file_path, recommendation="Add summary_ar.")) + for p in pages: + if str(Path(p.file_path).resolve()) not in targets and p.page_type != PageType.GLOSSARY: + issues.append(BrainIssue(severity=IssueSeverity.INFO, category="orphan", + title=f"Orphan: {p.title}", title_ar=f"يتيمة: {p.title}", + description="No inbound links.", affected_page=p.file_path, + recommendation="Link from another page.")) seen: set[str] = set() - for title, page in titles: - if title in seen: - issues.append(BrainIssue( - severity=IssueSeverity.WARNING, - category="duplicate", - title=f"Possible duplicate: {page.title}", - title_ar=f"تكرار محتمل: {page.title}", - description=f"Multiple pages with title '{page.title}'.", - affected_page=page.file_path, - recommendation="Merge duplicate pages.", - )) - seen.add(title) - - # Empty index check + for p in pages: + t = p.title.lower().strip() + if t in seen: + issues.append(BrainIssue(severity=IssueSeverity.WARNING, category="duplicate", + title=f"Duplicate: {p.title}", title_ar=f"تكرار: {p.title}", + description="Duplicate title.", affected_page=p.file_path, recommendation="Merge.")) + seen.add(t) if self.index_dir.exists(): - for idx_file in self.index_dir.glob("*.md"): - content = idx_file.read_text(encoding="utf-8") - if len(content.strip()) < 50: - issues.append(BrainIssue( - severity=IssueSeverity.WARNING, - category="empty_index", - title=f"Empty index: {idx_file.name}", - title_ar=f"فهرس فارغ: {idx_file.name}", - description="Index file has very little content.", - affected_page=str(idx_file), - recommendation="Populate or remove the index.", - )) - - logger.info("فحص الدماغ المعرفي: %d مشكلة مكتشفة", len(issues)) + for f in self.index_dir.glob("*.md"): + if len(f.read_text(encoding="utf-8").strip()) < 50: + issues.append(BrainIssue(severity=IssueSeverity.WARNING, category="empty_index", + title=f"Empty: {f.name}", title_ar=f"فارغ: {f.name}", + description="Sparse index.", affected_page=str(f), recommendation="Populate.")) + logger.info("فحص الدماغ: %d مشكلة", len(issues)) return issues async def get_index(self, domain: str) -> list[WikiPage]: - """ - Return all pages in a domain. - إرجاع جميع الصفحات في نطاق معين. - """ + """إرجاع جميع الصفحات في نطاق معين.""" pages = await self._load_all_pages() - try: - dtype = PageType(domain) - return [p for p in pages if p.page_type == dtype] - except ValueError: - logger.warning("نطاق غير معروف: %s", domain) - return [] + try: return [p for p in pages if p.page_type == PageType(domain)] + except ValueError: return [] async def mark_stale(self, page_id: str) -> None: - """ - Mark a page as stale. - تعليم صفحة كقديمة. - """ - page = self._page_cache.get(page_id) + """تعليم صفحة كقديمة.""" + page = self._cache.get(page_id) if not page: - pages = await self._load_all_pages() - for p in pages: - if p.id == page_id: - page = p - break - if not page: - logger.error("صفحة غير موجودة: %s", page_id) - return - + for p in await self._load_all_pages(): + if p.id == page_id: page = p; break + if not page: logger.error("صفحة غير موجودة: %s", page_id); return page.stale = True - file_path = Path(page.file_path) - if file_path.exists(): - content = file_path.read_text(encoding="utf-8") - content = re.sub( - r"\*\*Stale\*\*:\s*false", - "**Stale**: true", - content, - ) - file_path.write_text(content, encoding="utf-8") - logger.info("تم تعليم الصفحة كقديمة: %s", page.title) + fp = Path(page.file_path) + if fp.exists(): + fp.write_text(re.sub(r"\*\*Stale\*\*:\s*false", "**Stale**: true", + fp.read_text(encoding="utf-8")), encoding="utf-8") - async def promote_raw( - self, - raw_id: str, - raw_content: str = None, - metadata: dict[str, Any] = None, - ) -> WikiPage: - """ - Convert raw material to structured wiki page. - تحويل مادة خام إلى صفحة ويكي منظمة. - """ - metadata = metadata or {} + async def promote_raw(self, raw_id: str, raw_content: str = None, metadata: dict[str, Any] = None) -> WikiPage: + """تحويل مادة خام إلى صفحة ويكي منظمة.""" + md = metadata or {} if raw_content is None: - raw_path = self.memory_dir / "raw" / f"{raw_id}.md" - if raw_path.exists(): - raw_content = raw_path.read_text(encoding="utf-8") - else: - raise FileNotFoundError(f"المادة الخام غير موجودة: {raw_id}") - - title = metadata.get("title", f"Promoted from raw — {raw_id}") - page = await self.ingest( - source_type="raw_promotion", - content=raw_content, - metadata={ - "title": title, - "title_ar": metadata.get("title_ar", f"مروّج من مادة خام — {raw_id}"), - "provenance": f"Promoted from raw material {raw_id}", - "confidence": metadata.get("confidence", "medium"), - **metadata, - }, - ) - logger.info("تمت ترقية المادة الخام إلى صفحة ويكي: %s → %s", raw_id, page.title) - return page + rp = self.memory_dir / "raw" / f"{raw_id}.md" + if rp.exists(): raw_content = rp.read_text(encoding="utf-8") + else: raise FileNotFoundError(f"المادة الخام غير موجودة: {raw_id}") + return await self.ingest("raw_promotion", raw_content, { + "title": md.get("title", f"Promoted — {raw_id}"), + "title_ar": md.get("title_ar", f"مروّج — {raw_id}"), + "provenance": f"Promoted from raw {raw_id}", + "confidence": md.get("confidence", "medium"), **md}) -# Global singleton knowledge_brain = KnowledgeBrain() diff --git a/salesflow-saas/backend/app/services/tool_receipts.py b/salesflow-saas/backend/app/services/tool_receipts.py index 937d170c..c0ae1c3a 100644 --- a/salesflow-saas/backend/app/services/tool_receipts.py +++ b/salesflow-saas/backend/app/services/tool_receipts.py @@ -3,414 +3,186 @@ Tool Receipts — Dealix ToolProof Enhancement Signed execution receipts, pre-execution policy, and trust analytics. Extends tool_verification.py with cryptographic receipts and policy enforcement. """ -import hashlib -import logging -import uuid +import hashlib, logging, uuid from collections import defaultdict from datetime import datetime, timedelta, timezone from enum import Enum from typing import Any, Optional - from pydantic import BaseModel, Field logger = logging.getLogger(__name__) -# --------------------------------------------------------------------------- -# Enums -# --------------------------------------------------------------------------- - class PolicyDecisionType(str, Enum): - ALLOW = "allow" - BLOCK = "block" - HOLD = "hold" - + ALLOW = "allow"; BLOCK = "block"; HOLD = "hold" class VerificationVerdict(str, Enum): - VERIFIED = "verified" - PARTIALLY_VERIFIED = "partially_verified" - UNVERIFIED = "unverified" - CONTRADICTED = "contradicted" - BLOCKED = "blocked" + VERIFIED = "verified"; PARTIALLY_VERIFIED = "partially_verified" + UNVERIFIED = "unverified"; CONTRADICTED = "contradicted"; BLOCKED = "blocked" -# --------------------------------------------------------------------------- -# Models — نماذج البيانات -# --------------------------------------------------------------------------- - class PolicyDecision(BaseModel): - """Pre-execution policy decision — قرار السياسة قبل التنفيذ""" - decision: PolicyDecisionType - reason: str - reason_ar: str - tool_name: str + """قرار السياسة قبل التنفيذ""" + decision: PolicyDecisionType; reason: str; reason_ar: str; tool_name: str requires_approval_from: Optional[str] = None pdpl_consent_required: bool = False budget_remaining: Optional[float] = None class ToolReceipt(BaseModel): - """Signed execution receipt — إيصال تنفيذ موقّع""" + """إيصال تنفيذ موقّع""" receipt_id: str = Field(default_factory=lambda: str(uuid.uuid4())) - run_id: str = "" - session_id: str = "" - agent_id: str = "" - tool_name: str - parameters: dict[str, Any] = {} + run_id: str = ""; session_id: str = ""; agent_id: str = "" + tool_name: str; parameters: dict[str, Any] = {} timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - execution_result: str = "" - normalized_result: str = "" + execution_result: str = ""; normalized_result: str = "" hash_signature: str = "" policy_decision: PolicyDecisionType = PolicyDecisionType.ALLOW side_effects: list[str] = [] verification_verdict: VerificationVerdict = VerificationVerdict.UNVERIFIED - cost_estimate: float = 0.0 - tenant_id: str = "" + cost_estimate: float = 0.0; tenant_id: str = "" def compute_hash(self) -> str: - """Generate SHA-256 hash of (tool_name + params + result + timestamp).""" - payload = ( - f"{self.tool_name}|" - f"{_stable_dict_str(self.parameters)}|" - f"{self.execution_result}|" - f"{self.timestamp.isoformat()}" - ) - self.hash_signature = hashlib.sha256(payload.encode("utf-8")).hexdigest() + payload = f"{self.tool_name}|{'|'.join(f'{k}={v}' for k,v in sorted(self.parameters.items()))}|{self.execution_result}|{self.timestamp.isoformat()}" + self.hash_signature = hashlib.sha256(payload.encode()).hexdigest() return self.hash_signature def normalize_result(self) -> str: - """Normalize execution result for comparison.""" raw = self.execution_result.lower().strip() - for noise in ["ok", "success", "done", "completed", "تم", "نجح"]: - raw = raw.replace(noise, "SUCCESS") - for err in ["error", "fail", "exception", "خطأ", "فشل"]: - raw = raw.replace(err, "ERROR") - self.normalized_result = raw - return raw + for w in ["ok","success","done","completed","تم","نجح"]: raw = raw.replace(w, "SUCCESS") + for w in ["error","fail","exception","خطأ","فشل"]: raw = raw.replace(w, "ERROR") + self.normalized_result = raw; return raw -def _stable_dict_str(d: dict) -> str: - """Deterministic string representation of a dict for hashing.""" - return "|".join(f"{k}={v}" for k, v in sorted(d.items())) - - -# --------------------------------------------------------------------------- -# Pre-Execution Policy — سياسة ما قبل التنفيذ -# --------------------------------------------------------------------------- - class PreExecutionPolicy: - """ - Decide allow/block/hold before tool execution. - تحديد السماح/الحظر/التعليق قبل تنفيذ الأداة. - """ - - SAFE_TOOLS: set[str] = { - "read_file", "search", "query_db_readonly", "get_status", - "list_leads", "get_deal", "get_pipeline", "view_analytics", - "check_consent", "get_sequence_status", - } - - HOLD_TOOLS: set[str] = { - "send_message", "send_whatsapp", "send_email", "send_sms", - "update_deal", "assign_lead", "create_proposal", - "change_stage", "update_score", "create_sequence", - "schedule_meeting", "update_territory", - } - - BLOCK_TOOLS: set[str] = { - "delete_tenant", "drop_table", "bulk_delete", "export_all_data", - "reset_database", "delete_all_leads", "purge_audit_log", - "disable_pdpl", "bypass_consent", "modify_permissions_bulk", - } - - MESSAGING_TOOLS: set[str] = { - "send_message", "send_whatsapp", "send_email", "send_sms", - } - - ROLE_PERMISSIONS: dict[str, set[str]] = { - "owner": SAFE_TOOLS | HOLD_TOOLS, - "admin": SAFE_TOOLS | HOLD_TOOLS, - "manager": SAFE_TOOLS | {"update_deal", "assign_lead", "create_proposal", "change_stage"}, - "sales_rep": SAFE_TOOLS | {"send_message", "send_whatsapp", "send_email", "update_deal"}, - "viewer": SAFE_TOOLS, - } - - DEFAULT_BUDGET_LIMIT: float = 100.0 # SAR per session + """تحديد السماح/الحظر/التعليق قبل تنفيذ الأداة.""" + SAFE_TOOLS = {"read_file","search","query_db_readonly","get_status","list_leads", + "get_deal","get_pipeline","view_analytics","check_consent","get_sequence_status"} + HOLD_TOOLS = {"send_message","send_whatsapp","send_email","send_sms","update_deal", + "assign_lead","create_proposal","change_stage","update_score","create_sequence", + "schedule_meeting","update_territory"} + BLOCK_TOOLS = {"delete_tenant","drop_table","bulk_delete","export_all_data", + "reset_database","delete_all_leads","purge_audit_log","disable_pdpl", + "bypass_consent","modify_permissions_bulk"} + MSG_TOOLS = {"send_message","send_whatsapp","send_email","send_sms"} + ROLE_PERMS: dict[str, set[str]] = { + "owner": SAFE_TOOLS | HOLD_TOOLS, "admin": SAFE_TOOLS | HOLD_TOOLS, + "manager": SAFE_TOOLS | {"update_deal","assign_lead","create_proposal","change_stage"}, + "sales_rep": SAFE_TOOLS | {"send_message","send_whatsapp","send_email","update_deal"}, + "viewer": SAFE_TOOLS} + BUDGET_LIMIT = 100.0 # SAR per session + COST_MAP = {"send_whatsapp": 0.15, "send_sms": 0.08, "send_email": 0.02, + "send_message": 0.10, "create_proposal": 0.50, "query_db_readonly": 0.001} def __init__(self): - self._session_costs: dict[str, float] = defaultdict(float) + self._costs: dict[str, float] = defaultdict(float) - def evaluate( - self, - tool_name: str, - params: dict[str, Any], - user_context: dict[str, Any], - ) -> PolicyDecision: - """ - Check tool against policy classes, role, PDPL, and budget. - فحص الأداة مقابل فئات السياسة والدور والموافقة والميزانية. - """ - user_role = user_context.get("role", "viewer") - session_id = user_context.get("session_id", "unknown") - has_consent = user_context.get("has_consent", False) - budget_limit = user_context.get("budget_limit", self.DEFAULT_BUDGET_LIMIT) - - # Class C — absolute block + def evaluate(self, tool_name: str, params: dict[str, Any], user_context: dict[str, Any]) -> PolicyDecision: + role = user_context.get("role", "viewer") + sid = user_context.get("session_id", "unknown") + limit = user_context.get("budget_limit", self.BUDGET_LIMIT) if tool_name in self.BLOCK_TOOLS: - logger.warning( - "محظور: أداة %s محظورة بالكامل (المستخدم: %s)", - tool_name, user_context.get("user_id", "unknown"), - ) - return PolicyDecision( - decision=PolicyDecisionType.BLOCK, - reason=f"Tool '{tool_name}' is in the BLOCK list. This action is forbidden.", - reason_ar=f"الأداة '{tool_name}' محظورة. هذا الإجراء ممنوع.", - tool_name=tool_name, - ) - - # Role check - allowed_tools = self.ROLE_PERMISSIONS.get(user_role, self.SAFE_TOOLS) - if tool_name not in allowed_tools and tool_name not in self.SAFE_TOOLS: - return PolicyDecision( - decision=PolicyDecisionType.BLOCK, - reason=f"Role '{user_role}' lacks permission for tool '{tool_name}'.", - reason_ar=f"الدور '{user_role}' لا يملك صلاحية استخدام الأداة '{tool_name}'.", - tool_name=tool_name, - ) - - # PDPL consent check for messaging - if tool_name in self.MESSAGING_TOOLS and not has_consent: - return PolicyDecision( - decision=PolicyDecisionType.BLOCK, - reason="PDPL consent required before sending messages.", - reason_ar="مطلوب موافقة نظام حماية البيانات قبل إرسال الرسائل.", - tool_name=tool_name, - pdpl_consent_required=True, - ) - - # Budget check - estimated_cost = self._estimate_cost(tool_name, params) - current_spent = self._session_costs[session_id] - if current_spent + estimated_cost > budget_limit: - return PolicyDecision( - decision=PolicyDecisionType.HOLD, - reason=f"Budget limit would be exceeded. Spent: {current_spent:.2f}, " - f"estimated: {estimated_cost:.2f}, limit: {budget_limit:.2f} SAR.", - reason_ar=f"سيتم تجاوز حد الميزانية. المصروف: {current_spent:.2f}، " - f"التقدير: {estimated_cost:.2f}، الحد: {budget_limit:.2f} ريال.", - tool_name=tool_name, - budget_remaining=budget_limit - current_spent, - ) - - # Hold tools need approval + logger.warning("محظور: %s (المستخدم: %s)", tool_name, user_context.get("user_id", "?")) + return PolicyDecision(decision=PolicyDecisionType.BLOCK, tool_name=tool_name, + reason=f"Tool '{tool_name}' is forbidden.", reason_ar=f"الأداة '{tool_name}' محظورة.") + allowed = self.ROLE_PERMS.get(role, self.SAFE_TOOLS) + if tool_name not in allowed and tool_name not in self.SAFE_TOOLS: + return PolicyDecision(decision=PolicyDecisionType.BLOCK, tool_name=tool_name, + reason=f"Role '{role}' lacks permission for '{tool_name}'.", + reason_ar=f"الدور '{role}' لا يملك صلاحية '{tool_name}'.") + if tool_name in self.MSG_TOOLS and not user_context.get("has_consent", False): + return PolicyDecision(decision=PolicyDecisionType.BLOCK, tool_name=tool_name, + reason="PDPL consent required.", reason_ar="مطلوب موافقة حماية البيانات.", + pdpl_consent_required=True) + est = self.COST_MAP.get(tool_name, 0.01) * params.get("count", 1) + if self._costs[sid] + est > limit: + return PolicyDecision(decision=PolicyDecisionType.HOLD, tool_name=tool_name, + reason=f"Budget exceeded ({self._costs[sid]:.2f}+{est:.2f} > {limit:.2f} SAR).", + reason_ar=f"تجاوز الميزانية.", budget_remaining=limit - self._costs[sid]) if tool_name in self.HOLD_TOOLS: - approver = "manager" if user_role == "sales_rep" else "admin" - return PolicyDecision( - decision=PolicyDecisionType.HOLD, - reason=f"Tool '{tool_name}' requires approval before execution.", - reason_ar=f"الأداة '{tool_name}' تتطلب موافقة قبل التنفيذ.", - tool_name=tool_name, - requires_approval_from=approver, - ) - - # Safe tools — allow - return PolicyDecision( - decision=PolicyDecisionType.ALLOW, - reason=f"Tool '{tool_name}' is safe for execution.", - reason_ar=f"الأداة '{tool_name}' آمنة للتنفيذ.", - tool_name=tool_name, - ) + approver = "manager" if role == "sales_rep" else "admin" + return PolicyDecision(decision=PolicyDecisionType.HOLD, tool_name=tool_name, + reason=f"'{tool_name}' requires approval.", reason_ar=f"'{tool_name}' تتطلب موافقة.", + requires_approval_from=approver) + return PolicyDecision(decision=PolicyDecisionType.ALLOW, tool_name=tool_name, + reason=f"'{tool_name}' is safe.", reason_ar=f"'{tool_name}' آمنة.") def record_cost(self, session_id: str, cost: float) -> None: - """Record actual cost for budget tracking.""" - self._session_costs[session_id] += cost + self._costs[session_id] += cost - def _estimate_cost(self, tool_name: str, params: dict) -> float: - """Estimate cost in SAR for a tool call.""" - cost_map = { - "send_whatsapp": 0.15, - "send_sms": 0.08, - "send_email": 0.02, - "send_message": 0.10, - "create_proposal": 0.50, - "query_db_readonly": 0.001, - "search": 0.001, - } - base = cost_map.get(tool_name, 0.01) - # Bulk operations cost more - if "count" in params or "bulk" in tool_name: - base *= params.get("count", 1) - return base - - -# --------------------------------------------------------------------------- -# Receipt Store — مخزن الإيصالات -# --------------------------------------------------------------------------- class ReceiptStore: - """In-memory receipt storage with query capabilities.""" - + """مخزن الإيصالات في الذاكرة""" def __init__(self, max_size: int = 50000): - self._receipts: list[ToolReceipt] = [] - self._max_size = max_size + self._receipts: list[ToolReceipt] = []; self._max = max_size def store(self, receipt: ToolReceipt) -> str: - """Store a receipt and return its ID.""" - receipt.compute_hash() - receipt.normalize_result() + receipt.compute_hash(); receipt.normalize_result() self._receipts.append(receipt) - if len(self._receipts) > self._max_size: - self._receipts = self._receipts[-self._max_size:] - logger.info( - "إيصال محفوظ: %s أداة=%s حكم=%s", - receipt.receipt_id, receipt.tool_name, receipt.verification_verdict.value, - ) + if len(self._receipts) > self._max: self._receipts = self._receipts[-self._max:] + logger.info("إيصال: %s أداة=%s حكم=%s", receipt.receipt_id, receipt.tool_name, receipt.verification_verdict.value) return receipt.receipt_id def get(self, receipt_id: str) -> Optional[ToolReceipt]: - for r in self._receipts: - if r.receipt_id == receipt_id: - return r - return None + return next((r for r in self._receipts if r.receipt_id == receipt_id), None) - def query( - self, - agent_id: str = None, - tool_name: str = None, - verdict: VerificationVerdict = None, - since: datetime = None, - limit: int = 100, - ) -> list[ToolReceipt]: - results = self._receipts - if agent_id: - results = [r for r in results if r.agent_id == agent_id] - if tool_name: - results = [r for r in results if r.tool_name == tool_name] - if verdict: - results = [r for r in results if r.verification_verdict == verdict] - if since: - results = [r for r in results if r.timestamp >= since] - return results[-limit:] + def query(self, agent_id: str = None, tool_name: str = None, + verdict: VerificationVerdict = None, since: datetime = None, limit: int = 100) -> list[ToolReceipt]: + r = self._receipts + if agent_id: r = [x for x in r if x.agent_id == agent_id] + if tool_name: r = [x for x in r if x.tool_name == tool_name] + if verdict: r = [x for x in r if x.verification_verdict == verdict] + if since: r = [x for x in r if x.timestamp >= since] + return r[-limit:] -# --------------------------------------------------------------------------- -# Trust Analytics — تحليلات الثقة -# --------------------------------------------------------------------------- - class TrustAnalytics: - """ - Track trust metrics across agent workflows. - تتبع مقاييس الثقة عبر سير عمل الوكلاء. - """ + """تتبع مقاييس الثقة عبر سير عمل الوكلاء""" + WEIGHTS = {VerificationVerdict.VERIFIED: 1.0, VerificationVerdict.PARTIALLY_VERIFIED: 0.6, + VerificationVerdict.UNVERIFIED: 0.3, VerificationVerdict.CONTRADICTED: 0.0, + VerificationVerdict.BLOCKED: 0.2} def __init__(self, store: ReceiptStore): self._store = store def get_trust_score(self, agent_id: str) -> float: - """ - Trust score 0-1 for an agent based on verification history. - درجة الثقة 0-1 للوكيل بناءً على سجل التحقق. - """ - receipts = self._store.query(agent_id=agent_id, limit=500) - if not receipts: - return 0.5 # Neutral for unknown agents - - weights = { - VerificationVerdict.VERIFIED: 1.0, - VerificationVerdict.PARTIALLY_VERIFIED: 0.6, - VerificationVerdict.UNVERIFIED: 0.3, - VerificationVerdict.CONTRADICTED: 0.0, - VerificationVerdict.BLOCKED: 0.2, - } - total_weight = sum(weights.get(r.verification_verdict, 0.3) for r in receipts) - return round(total_weight / len(receipts), 4) + recs = self._store.query(agent_id=agent_id, limit=500) + if not recs: return 0.5 + return round(sum(self.WEIGHTS.get(r.verification_verdict, 0.3) for r in recs) / len(recs), 4) def get_contradiction_rate(self, agent_id: str) -> float: - """ - Contradiction rate for an agent. - معدل التناقض للوكيل. - """ - receipts = self._store.query(agent_id=agent_id, limit=500) - if not receipts: - return 0.0 - contradictions = sum( - 1 for r in receipts - if r.verification_verdict == VerificationVerdict.CONTRADICTED - ) - return round(contradictions / len(receipts), 4) + recs = self._store.query(agent_id=agent_id, limit=500) + if not recs: return 0.0 + return round(sum(1 for r in recs if r.verification_verdict == VerificationVerdict.CONTRADICTED) / len(recs), 4) def get_cost_by_agent(self, period_days: int = 30) -> dict[str, float]: - """ - Total cost per agent in period. - إجمالي التكلفة لكل وكيل خلال الفترة. - """ since = datetime.now(timezone.utc) - timedelta(days=period_days) - receipts = self._store.query(since=since, limit=50000) costs: dict[str, float] = defaultdict(float) - for r in receipts: - costs[r.agent_id] += r.cost_estimate + for r in self._store.query(since=since, limit=50000): costs[r.agent_id] += r.cost_estimate return {k: round(v, 4) for k, v in costs.items()} def get_blocked_attempts(self, period_days: int = 30) -> list[ToolReceipt]: - """ - All blocked tool attempts in period. - جميع محاولات الأدوات المحظورة خلال الفترة. - """ - since = datetime.now(timezone.utc) - timedelta(days=period_days) - return self._store.query( - verdict=VerificationVerdict.BLOCKED, since=since, limit=1000 - ) + return self._store.query(verdict=VerificationVerdict.BLOCKED, + since=datetime.now(timezone.utc) - timedelta(days=period_days), limit=1000) def get_hallucination_suspects(self) -> list[ToolReceipt]: - """ - Claims without matching receipts — possible hallucinations. - ادعاءات بدون إيصالات مطابقة — هلوسات محتملة. - """ - all_receipts = self._store.query(limit=5000) - suspects = [] - for r in all_receipts: - if r.verification_verdict == VerificationVerdict.CONTRADICTED: - suspects.append(r) - elif ( - r.verification_verdict == VerificationVerdict.UNVERIFIED - and r.execution_result == "" - and r.tool_name not in PreExecutionPolicy.SAFE_TOOLS - ): - suspects.append(r) - return suspects + return [r for r in self._store.query(limit=5000) + if r.verification_verdict == VerificationVerdict.CONTRADICTED + or (r.verification_verdict == VerificationVerdict.UNVERIFIED + and not r.execution_result and r.tool_name not in PreExecutionPolicy.SAFE_TOOLS)] def get_summary(self, agent_id: str = None) -> dict[str, Any]: - """ - Overall trust summary. - ملخص الثقة العام. - """ - receipts = self._store.query(agent_id=agent_id, limit=10000) - total = len(receipts) - if total == 0: - return {"total": 0, "trust_score": 0.5, "message_ar": "لا توجد بيانات"} + recs = self._store.query(agent_id=agent_id, limit=10000) + if not recs: return {"total": 0, "trust_score": 0.5, "message_ar": "لا توجد بيانات"} + by_v: dict[str, int] = defaultdict(int) + cost = 0.0 + for r in recs: by_v[r.verification_verdict.value] += 1; cost += r.cost_estimate + ts = self.get_trust_score(agent_id) if agent_id else 0.5 + return {"total": len(recs), "by_verdict": dict(by_v), "trust_score": ts, + "total_cost_sar": round(cost, 2), + "contradiction_rate": round(by_v.get("contradicted", 0) / len(recs) * 100, 2), + "message_ar": f"عمليات: {len(recs)}، ثقة: {ts:.2f}"} - by_verdict: dict[str, int] = defaultdict(int) - total_cost = 0.0 - for r in receipts: - by_verdict[r.verification_verdict.value] += 1 - total_cost += r.cost_estimate - - trust = self.get_trust_score(agent_id) if agent_id else 0.5 - return { - "total": total, - "by_verdict": dict(by_verdict), - "trust_score": trust, - "total_cost_sar": round(total_cost, 2), - "contradiction_rate": round( - by_verdict.get("contradicted", 0) / total * 100, 2 - ), - "message_ar": f"إجمالي العمليات: {total}، درجة الثقة: {trust:.2f}", - } - - -# --------------------------------------------------------------------------- -# Global singletons -# --------------------------------------------------------------------------- pre_execution_policy = PreExecutionPolicy() receipt_store = ReceiptStore()