fix: Update knowledge brain and tool receipts with final implementations

https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj
This commit is contained in:
Claude 2026-04-11 08:22:12 +00:00
parent 6cef426bc7
commit c67164ffea
No known key found for this signature in database
2 changed files with 287 additions and 817 deletions

View File

@ -1,560 +1,258 @@
""" """
Knowledge Brain Dealix Second Brain Service Knowledge Brain Dealix Second Brain Service
Project knowledge management: ingest, query, lint, index. Project knowledge management: ingest, query, lint, index.
Manages the wiki layer in memory/wiki/ and indexes in memory/indexes/.
""" """
import logging import logging, re, uuid
import os from datetime import datetime, timezone
import re
import uuid
from datetime import datetime, timedelta, timezone
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Any, Optional
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
WIKI_DIR = Path(__file__).resolve().parents[4] / "memory" / "wiki" WIKI_DIR = Path(__file__).resolve().parents[4] / "memory" / "wiki"
INDEX_DIR = Path(__file__).resolve().parents[4] / "memory" / "indexes" INDEX_DIR = Path(__file__).resolve().parents[4] / "memory" / "indexes"
MEMORY_DIR = Path(__file__).resolve().parents[4] / "memory" MEMORY_DIR = Path(__file__).resolve().parents[4] / "memory"
STALE_THRESHOLD_DAYS = 30 STALE_DAYS = 30
class PageType(str, Enum): class PageType(str, Enum):
ARCHITECTURE = "architecture" ARCHITECTURE = "architecture"; PRODUCT = "product"; GTM = "gtm"
PRODUCT = "product" CUSTOMER = "customer"; OPERATIONS = "operations"; SECURITY = "security"
GTM = "gtm" TOOLING = "tooling"; GLOSSARY = "glossary"
CUSTOMER = "customer"
OPERATIONS = "operations"
SECURITY = "security"
TOOLING = "tooling"
GLOSSARY = "glossary"
class Confidence(str, Enum): class Confidence(str, Enum):
HIGH = "high" HIGH = "high"; MEDIUM = "medium"; LOW = "low"
MEDIUM = "medium"
LOW = "low"
class IssueSeverity(str, Enum): class IssueSeverity(str, Enum):
ERROR = "error" ERROR = "error"; WARNING = "warning"; INFO = "info"
WARNING = "warning"
INFO = "info"
class WikiPage(BaseModel): class WikiPage(BaseModel):
"""Structured wiki page — صفحة ويكي منظمة""" """صفحة ويكي منظمة — Structured wiki page"""
id: str = Field(default_factory=lambda: str(uuid.uuid4())) id: str = Field(default_factory=lambda: str(uuid.uuid4()))
title: str title: str; title_ar: str = ""
title_ar: str = "" page_type: PageType; summary: str; summary_ar: str
page_type: PageType key_facts: list[str] = []; provenance: str
summary: str
summary_ar: str
key_facts: list[str] = []
provenance: str
confidence: Confidence = Confidence.MEDIUM confidence: Confidence = Confidence.MEDIUM
related_pages: list[str] = [] related_pages: list[str] = []
last_updated: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) last_updated: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
stale: bool = False stale: bool = False; file_path: str = ""; body: str = ""
file_path: str = ""
body: str = ""
class Config:
json_schema_extra = {
"example": {
"title": "System Architecture",
"title_ar": "بنية النظام",
"page_type": "architecture",
"summary": "Multi-tenant AI CRM architecture overview",
"summary_ar": "نظرة عامة على بنية إدارة علاقات العملاء متعددة المستأجرين",
}
}
class BrainAnswer(BaseModel): class BrainAnswer(BaseModel):
"""Answer from the knowledge brain — إجابة من الدماغ المعرفي""" """إجابة من الدماغ المعرفي"""
question: str question: str; answer: str; answer_ar: str = ""
answer: str sources: list[str] = []; confidence: Confidence = Confidence.LOW
answer_ar: str = ""
sources: list[str] = []
confidence: Confidence = Confidence.LOW
related_pages: list[str] = [] related_pages: list[str] = []
class BrainIssue(BaseModel): class BrainIssue(BaseModel):
"""Quality issue found during lint — مشكلة جودة مكتشفة أثناء الفحص""" """مشكلة جودة مكتشفة أثناء الفحص"""
issue_id: str = Field(default_factory=lambda: str(uuid.uuid4())) issue_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
severity: IssueSeverity severity: IssueSeverity; category: str
category: str title: str; title_ar: str; description: str
title: str affected_page: str = ""; recommendation: str = ""
title_ar: str
description: str
affected_page: str = ""
recommendation: str = ""
class KnowledgeBrain: class KnowledgeBrain:
""" """إدارة المعرفة — استيعاب، استعلام، فحص"""
Project knowledge management ingest, query, lint.
إدارة المعرفة المشروعية استيعاب، استعلام، فحص.
"""
def __init__(self, wiki_dir: Path = None, memory_dir: Path = None): def __init__(self, wiki_dir: Path = None, memory_dir: Path = None):
self.wiki_dir = wiki_dir or WIKI_DIR self.wiki_dir = wiki_dir or WIKI_DIR
self.memory_dir = memory_dir or MEMORY_DIR self.memory_dir = memory_dir or MEMORY_DIR
self.index_dir = INDEX_DIR self.index_dir = INDEX_DIR
self._page_cache: dict[str, WikiPage] = {} self._cache: dict[str, WikiPage] = {}
self._ensure_dirs()
def _ensure_dirs(self) -> None:
self.wiki_dir.mkdir(parents=True, exist_ok=True) self.wiki_dir.mkdir(parents=True, exist_ok=True)
self.index_dir.mkdir(parents=True, exist_ok=True) self.index_dir.mkdir(parents=True, exist_ok=True)
def _parse_frontmatter(self, content: str, file_path: str) -> WikiPage: def _parse_frontmatter(self, content: str, fpath: str) -> WikiPage:
"""Parse wiki page frontmatter into a WikiPage model.""" lines, title, fields, body_start = content.split("\n"), "", {}, 0
lines = content.split("\n") for i, ln in enumerate(lines):
title = "" s = ln.strip()
fields: dict[str, Any] = {} if s.startswith("# "): title = s[2:].strip()
body_start = 0 elif s == "---": body_start = i + 1; break
elif s.startswith("**") and "**:" in s:
for i, line in enumerate(lines): m = re.match(r"\*\*(.+?)\*\*:\s*(.*)", s)
stripped = line.strip() if m: fields[m.group(1).lower().replace(" ", "_")] = m.group(2).strip()
if stripped.startswith("# "): body = "\n".join(lines[body_start:]).strip() if body_start else ""
title = stripped[2:].strip() key_facts, in_f = [], False
elif stripped == "---": for ln in lines:
body_start = i + 1 if "**Key Facts**" in ln: in_f = True; continue
break if in_f:
elif stripped.startswith("**") and "**:" in stripped: fm = re.match(r"^\s*-\s+(.+)$", ln)
match = re.match(r"\*\*(.+?)\*\*:\s*(.*)", stripped) if fm: key_facts.append(fm.group(1).strip())
if match: elif ln.strip().startswith("**"): break
key = match.group(1).lower().replace(" ", "_") related = re.findall(r"\[.+?\]\((.+?)\)", fields.get("related_pages", ""))
value = match.group(2).strip() pt = next((p for p in PageType if p.value == fields.get("type", "").lower()), PageType.ARCHITECTURE)
fields[key] = value cf = next((c for c in Confidence if c.value == fields.get("confidence", "").lower()), Confidence.MEDIUM)
try: lu = datetime.strptime(fields.get("last_updated", ""), "%Y-%m-%d").replace(tzinfo=timezone.utc)
body = "\n".join(lines[body_start:]).strip() if body_start > 0 else "" except ValueError: lu = datetime.now(timezone.utc)
key_facts = [] return WikiPage(title=title, page_type=pt, summary=fields.get("summary", ""),
if "key_facts" in fields: summary_ar=fields.get("summary_ar", ""), key_facts=key_facts,
fact_pattern = re.compile(r"^\s*-\s+(.+)$") provenance=fields.get("provenance", ""), confidence=cf, related_pages=related,
in_facts = False last_updated=lu, stale=fields.get("stale", "false").lower() == "true",
for line in lines: file_path=fpath, body=body)
if "**Key Facts**" in line:
in_facts = True
continue
if in_facts:
fact_match = fact_pattern.match(line)
if fact_match:
key_facts.append(fact_match.group(1).strip())
elif line.strip().startswith("**"):
break
related = []
if "related_pages" in fields:
link_pattern = re.compile(r"\[.+?\]\((.+?)\)")
related = link_pattern.findall(fields["related_pages"])
page_type = PageType.ARCHITECTURE
type_val = fields.get("type", "architecture").lower()
for pt in PageType:
if pt.value == type_val:
page_type = pt
break
conf = Confidence.MEDIUM
conf_val = fields.get("confidence", "medium").lower()
for c in Confidence:
if c.value == conf_val:
conf = c
break
last_updated = datetime.now(timezone.utc)
if "last_updated" in fields:
try:
last_updated = datetime.strptime(
fields["last_updated"], "%Y-%m-%d"
).replace(tzinfo=timezone.utc)
except ValueError:
pass
stale = fields.get("stale", "false").lower() == "true"
return WikiPage(
title=title,
title_ar=fields.get("title_ar", ""),
page_type=page_type,
summary=fields.get("summary", ""),
summary_ar=fields.get("summary_ar", ""),
key_facts=key_facts,
provenance=fields.get("provenance", ""),
confidence=conf,
related_pages=related,
last_updated=last_updated,
stale=stale,
file_path=file_path,
body=body,
)
async def _load_all_pages(self) -> list[WikiPage]: async def _load_all_pages(self) -> list[WikiPage]:
"""Load and parse all wiki pages."""
pages = [] pages = []
if not self.wiki_dir.exists(): if not self.wiki_dir.exists(): return pages
return pages for f in sorted(self.wiki_dir.glob("*.md")):
for md_file in sorted(self.wiki_dir.glob("*.md")): if f.name == "README.md": continue
if md_file.name == "README.md":
continue
try: try:
content = md_file.read_text(encoding="utf-8") p = self._parse_frontmatter(f.read_text(encoding="utf-8"), str(f))
page = self._parse_frontmatter(content, str(md_file)) self._cache[p.id] = p; pages.append(p)
self._page_cache[page.id] = page except Exception as e: logger.warning("فشل تحميل %s: %s", f.name, e)
pages.append(page)
except Exception as exc:
logger.warning("فشل تحميل الصفحة %s: %s", md_file.name, exc)
return pages return pages
async def ingest( def _classify(self, source_type: str, content: str) -> PageType:
self, if source_type in ("adr", "architecture"): return PageType.ARCHITECTURE
source_type: str, if source_type in ("customer_interview", "feedback"): return PageType.CUSTOMER
content: str, cl = content.lower()
metadata: dict[str, Any] = None, kw = {PageType.ARCHITECTURE: ["api","database","service","backend"],
) -> WikiPage: PageType.GTM: ["launch","marketing","outreach","growth","تسويق"],
""" PageType.CUSTOMER: ["customer","interview","feedback","عميل"],
Classify content, create/update wiki page, link related pages. PageType.SECURITY: ["pdpl","consent","security","أمان"],
تصنيف المحتوى، إنشاء/تحديث صفحة ويكي، ربط الصفحات ذات الصلة. PageType.TOOLING: ["provider","integration","tool","أداة"],
""" PageType.OPERATIONS: ["runbook","checklist","process","عملية"]}
metadata = metadata or {} scores = {t: sum(1 for w in ws if w in cl) for t, ws in kw.items()}
title = metadata.get("title", f"Ingested — {source_type}") best = max(scores, key=scores.get)
title_ar = metadata.get("title_ar", f"مستوعب — {source_type}")
page_type = self._classify_content(source_type, content)
summary = content[:120].replace("\n", " ").strip()
summary_ar = metadata.get("summary_ar", f"محتوى {source_type} مستوعب تلقائياً")
existing_pages = await self._load_all_pages()
related = self._find_related(content, existing_pages)
page = WikiPage(
title=title,
title_ar=title_ar,
page_type=page_type,
summary=summary,
summary_ar=summary_ar,
key_facts=metadata.get("key_facts", []),
provenance=metadata.get("provenance", f"Auto-ingested from {source_type}"),
confidence=Confidence(metadata.get("confidence", "medium")),
related_pages=[p.file_path for p in related[:5]],
body=content,
)
file_name = re.sub(r"[^\w\s-]", "", title.lower()).replace(" ", "-")[:50]
file_path = self.wiki_dir / f"{file_name}.md"
page.file_path = str(file_path)
md_content = self._render_page(page)
file_path.write_text(md_content, encoding="utf-8")
self._page_cache[page.id] = page
logger.info("تم استيعاب صفحة جديدة: %s (%s)", title, page_type.value)
return page
def _classify_content(self, source_type: str, content: str) -> PageType:
"""Classify content into a page type based on keywords."""
content_lower = content.lower()
keyword_map = {
PageType.ARCHITECTURE: ["api", "database", "service", "backend", "frontend", "deploy"],
PageType.PRODUCT: ["feature", "roadmap", "user story", "requirement", "ميزة"],
PageType.GTM: ["launch", "marketing", "outreach", "growth", "campaign", "تسويق"],
PageType.CUSTOMER: ["customer", "interview", "feedback", "icp", "عميل"],
PageType.OPERATIONS: ["runbook", "checklist", "process", "deploy", "عملية"],
PageType.SECURITY: ["pdpl", "consent", "security", "compliance", "أمان"],
PageType.TOOLING: ["provider", "api key", "integration", "tool", "أداة"],
}
scores: dict[PageType, int] = {}
for ptype, keywords in keyword_map.items():
scores[ptype] = sum(1 for kw in keywords if kw in content_lower)
if source_type in ("adr", "architecture"):
return PageType.ARCHITECTURE
if source_type in ("customer_interview", "feedback"):
return PageType.CUSTOMER
best = max(scores, key=lambda k: scores[k])
return best if scores[best] > 0 else PageType.PRODUCT return best if scores[best] > 0 else PageType.PRODUCT
def _find_related(self, content: str, pages: list[WikiPage]) -> list[WikiPage]: def _find_related(self, content: str, pages: list[WikiPage]) -> list[WikiPage]:
"""Find related pages by keyword overlap.""" cw = set(content.lower().split())
content_words = set(content.lower().split()) scored = [(p, len(cw & set(p.summary.lower().split()))) for p in pages]
scored: list[tuple[WikiPage, int]] = [] return [p for p, s in sorted(scored, key=lambda x: -x[1]) if s > 2][:5]
for page in pages:
page_words = set(page.summary.lower().split()) | set(page.body.lower().split()[:100])
overlap = len(content_words & page_words)
if overlap > 2:
scored.append((page, overlap))
scored.sort(key=lambda x: x[1], reverse=True)
return [p for p, _ in scored[:5]]
def _render_page(self, page: WikiPage) -> str: def _render(self, p: WikiPage) -> str:
"""Render a WikiPage model to markdown.""" facts = "\n".join(f" - {f}" for f in p.key_facts) or " - (none)"
facts = "\n".join(f" - {f}" for f in page.key_facts) if page.key_facts else " - (none)" rels = ", ".join(f"[{Path(r).stem}]({r})" for r in p.related_pages) or "(none)"
related = ", ".join( return (f"# {p.title}\n\n**Type**: {p.page_type.value}\n**Summary**: {p.summary}\n"
f"[{Path(r).stem}]({r})" for r in page.related_pages f"**Summary_AR**: {p.summary_ar}\n**Key Facts**:\n{facts}\n"
) if page.related_pages else "(none)" f"**Provenance**: {p.provenance}\n**Confidence**: {p.confidence.value}\n"
date_str = page.last_updated.strftime("%Y-%m-%d") f"**Related Pages**: {rels}\n**Last Updated**: {p.last_updated:%Y-%m-%d}\n"
f"**Stale**: {str(p.stale).lower()}\n\n---\n\n{p.body}\n")
return f"""# {page.title} async def ingest(self, source_type: str, content: str, metadata: dict[str, Any] = None) -> WikiPage:
"""تصنيف المحتوى، إنشاء صفحة ويكي، ربط الصفحات ذات الصلة."""
md = metadata or {}
existing = await self._load_all_pages()
related = self._find_related(content, existing)
page = WikiPage(
title=md.get("title", f"Ingested — {source_type}"),
title_ar=md.get("title_ar", f"مستوعب — {source_type}"),
page_type=self._classify(source_type, content),
summary=content[:120].replace("\n", " ").strip(),
summary_ar=md.get("summary_ar", f"محتوى {source_type} مستوعب تلقائياً"),
key_facts=md.get("key_facts", []),
provenance=md.get("provenance", f"Auto-ingested from {source_type}"),
confidence=Confidence(md.get("confidence", "medium")),
related_pages=[p.file_path for p in related[:5]], body=content)
fname = re.sub(r"[^\w\s-]", "", page.title.lower()).replace(" ", "-")[:50]
fp = self.wiki_dir / f"{fname}.md"; page.file_path = str(fp)
fp.write_text(self._render(page), encoding="utf-8")
self._cache[page.id] = page
logger.info("تم استيعاب صفحة: %s (%s)", page.title, page.page_type.value)
return page
**Type**: {page.page_type.value} async def query(self, question: str, domain: str = None) -> BrainAnswer:
**Summary**: {page.summary} """البحث في الويكي والذاكرة عن إجابات ذات صلة."""
**Summary_AR**: {page.summary_ar}
**Key Facts**:
{facts}
**Provenance**: {page.provenance}
**Confidence**: {page.confidence.value}
**Related Pages**: {related}
**Last Updated**: {date_str}
**Stale**: {str(page.stale).lower()}
---
{page.body}
"""
async def query(
self, question: str, domain: str = None
) -> BrainAnswer:
"""
Search wiki + memory for relevant answers.
البحث في الويكي والذاكرة عن إجابات ذات صلة.
"""
pages = await self._load_all_pages() pages = await self._load_all_pages()
if domain: if domain:
try: try: pages = [p for p in pages if p.page_type == PageType(domain)]
dtype = PageType(domain) except ValueError: pass
pages = [p for p in pages if p.page_type == dtype] qw = set(question.lower().split())
except ValueError: scored = []
pass for p in pages:
sw = set(f"{p.title} {p.summary} {' '.join(p.key_facts)}".lower().split())
question_lower = question.lower() ov = len(qw & sw)
question_words = set(question_lower.split()) if ov > 0:
s = (ov / max(len(qw), 1)) * (1.3 if p.confidence == Confidence.HIGH else 0.7 if p.confidence == Confidence.LOW else 1.0)
scored: list[tuple[WikiPage, float]] = [] scored.append((p, s))
for page in pages: scored.sort(key=lambda x: -x[1])
searchable = f"{page.title} {page.summary} {page.body} {' '.join(page.key_facts)}".lower() if not scored:
searchable_words = set(searchable.split()) return BrainAnswer(question=question, answer="لم يتم العثور على معلومات ذات صلة.",
overlap = len(question_words & searchable_words) answer_ar="لم يتم العثور على معلومات ذات صلة.", confidence=Confidence.LOW)
if overlap > 0: bp, bs = scored[0]
score = overlap / max(len(question_words), 1) ans = bp.summary + (" Key facts: " + "; ".join(bp.key_facts[:3]) if bp.key_facts else "")
if page.confidence == Confidence.HIGH: conf = Confidence.HIGH if bs > 0.5 else Confidence.MEDIUM if bs > 0.2 else Confidence.LOW
score *= 1.3 return BrainAnswer(question=question, answer=ans, answer_ar=bp.summary_ar or "لا يوجد ملخص عربي",
elif page.confidence == Confidence.LOW: sources=[p.file_path for p, _ in scored[:3]], confidence=conf,
score *= 0.7 related_pages=[p.file_path for p, _ in scored[:3]])
scored.append((page, score))
scored.sort(key=lambda x: x[1], reverse=True)
top_pages = scored[:3]
if not top_pages:
return BrainAnswer(
question=question,
answer="لم يتم العثور على معلومات ذات صلة في قاعدة المعرفة.",
answer_ar="لم يتم العثور على معلومات ذات صلة في قاعدة المعرفة.",
confidence=Confidence.LOW,
)
best_page = top_pages[0][0]
best_score = top_pages[0][1]
answer_parts = [best_page.summary]
if best_page.key_facts:
answer_parts.append("Key facts: " + "; ".join(best_page.key_facts[:3]))
conf = Confidence.HIGH if best_score > 0.5 else (Confidence.MEDIUM if best_score > 0.2 else Confidence.LOW)
return BrainAnswer(
question=question,
answer=" ".join(answer_parts),
answer_ar=best_page.summary_ar or "لا يوجد ملخص عربي",
sources=[p.file_path for p, _ in top_pages],
confidence=conf,
related_pages=[p.file_path for p, _ in top_pages],
)
async def lint(self) -> list[BrainIssue]: async def lint(self) -> list[BrainIssue]:
""" """فحص: صفحات يتيمة، قديمة، مصدر مفقود، تكرارات، فهارس فارغة."""
Check for: orphan pages, stale pages, missing provenance, duplicates, empty indexes. issues, pages, now = [], await self._load_all_pages(), datetime.now(timezone.utc)
فحص: صفحات يتيمة، صفحات قديمة، مصدر مفقود، تكرارات، فهارس فارغة. targets: set[str] = set()
""" for p in pages:
issues: list[BrainIssue] = [] for r in p.related_pages: targets.add(str((Path(p.file_path).parent / r).resolve()))
pages = await self._load_all_pages() age = (now - p.last_updated).days
now = datetime.now(timezone.utc) if age > STALE_DAYS:
all_paths = {p.file_path for p in pages} issues.append(BrainIssue(severity=IssueSeverity.WARNING, category="stale",
all_related_targets: set[str] = set() title=f"Stale: {p.title}", title_ar=f"قديمة: {p.title}",
description=f"Updated {age}d ago", affected_page=p.file_path,
for page in pages: recommendation="Review and update or archive."))
for rel in page.related_pages: if not p.provenance:
resolved = str((Path(page.file_path).parent / rel).resolve()) issues.append(BrainIssue(severity=IssueSeverity.ERROR, category="provenance",
all_related_targets.add(resolved) title=f"No provenance: {p.title}", title_ar=f"مصدر مفقود: {p.title}",
description="Missing source.", affected_page=p.file_path,
# Stale check (>30 days) recommendation="Add provenance."))
age = (now - page.last_updated).days if not p.summary_ar:
if age > STALE_THRESHOLD_DAYS: issues.append(BrainIssue(severity=IssueSeverity.WARNING, category="i18n",
issues.append(BrainIssue( title=f"No Arabic summary: {p.title}", title_ar=f"ملخص عربي مفقود: {p.title}",
severity=IssueSeverity.WARNING, description="Arabic-first.", affected_page=p.file_path, recommendation="Add summary_ar."))
category="stale", for p in pages:
title=f"Stale page: {page.title}", if str(Path(p.file_path).resolve()) not in targets and p.page_type != PageType.GLOSSARY:
title_ar=f"صفحة قديمة: {page.title}", issues.append(BrainIssue(severity=IssueSeverity.INFO, category="orphan",
description=f"Last updated {age} days ago (threshold: {STALE_THRESHOLD_DAYS}).", title=f"Orphan: {p.title}", title_ar=f"يتيمة: {p.title}",
affected_page=page.file_path, description="No inbound links.", affected_page=p.file_path,
recommendation="Review and update or archive this page.", recommendation="Link from another page."))
))
# Missing provenance
if not page.provenance or page.provenance.strip() == "":
issues.append(BrainIssue(
severity=IssueSeverity.ERROR,
category="provenance",
title=f"Missing provenance: {page.title}",
title_ar=f"مصدر مفقود: {page.title}",
description="Page has no provenance. All pages must cite their source.",
affected_page=page.file_path,
recommendation="Add provenance field with source reference.",
))
# Missing Arabic summary
if not page.summary_ar:
issues.append(BrainIssue(
severity=IssueSeverity.WARNING,
category="i18n",
title=f"Missing Arabic summary: {page.title}",
title_ar=f"ملخص عربي مفقود: {page.title}",
description="Page is missing summary_ar. Dealix is Arabic-first.",
affected_page=page.file_path,
recommendation="Add an Arabic summary.",
))
# Orphan check
for page in pages:
resolved_path = str(Path(page.file_path).resolve())
if resolved_path not in all_related_targets and page.page_type != PageType.GLOSSARY:
issues.append(BrainIssue(
severity=IssueSeverity.INFO,
category="orphan",
title=f"Orphan page: {page.title}",
title_ar=f"صفحة يتيمة: {page.title}",
description="No other pages link to this page.",
affected_page=page.file_path,
recommendation="Add a link from a related page or index.",
))
# Duplicate check by title similarity
titles = [(p.title.lower().strip(), p) for p in pages]
seen: set[str] = set() seen: set[str] = set()
for title, page in titles: for p in pages:
if title in seen: t = p.title.lower().strip()
issues.append(BrainIssue( if t in seen:
severity=IssueSeverity.WARNING, issues.append(BrainIssue(severity=IssueSeverity.WARNING, category="duplicate",
category="duplicate", title=f"Duplicate: {p.title}", title_ar=f"تكرار: {p.title}",
title=f"Possible duplicate: {page.title}", description="Duplicate title.", affected_page=p.file_path, recommendation="Merge."))
title_ar=f"تكرار محتمل: {page.title}", seen.add(t)
description=f"Multiple pages with title '{page.title}'.",
affected_page=page.file_path,
recommendation="Merge duplicate pages.",
))
seen.add(title)
# Empty index check
if self.index_dir.exists(): if self.index_dir.exists():
for idx_file in self.index_dir.glob("*.md"): for f in self.index_dir.glob("*.md"):
content = idx_file.read_text(encoding="utf-8") if len(f.read_text(encoding="utf-8").strip()) < 50:
if len(content.strip()) < 50: issues.append(BrainIssue(severity=IssueSeverity.WARNING, category="empty_index",
issues.append(BrainIssue( title=f"Empty: {f.name}", title_ar=f"فارغ: {f.name}",
severity=IssueSeverity.WARNING, description="Sparse index.", affected_page=str(f), recommendation="Populate."))
category="empty_index", logger.info("فحص الدماغ: %d مشكلة", len(issues))
title=f"Empty index: {idx_file.name}",
title_ar=f"فهرس فارغ: {idx_file.name}",
description="Index file has very little content.",
affected_page=str(idx_file),
recommendation="Populate or remove the index.",
))
logger.info("فحص الدماغ المعرفي: %d مشكلة مكتشفة", len(issues))
return issues return issues
async def get_index(self, domain: str) -> list[WikiPage]: async def get_index(self, domain: str) -> list[WikiPage]:
""" """إرجاع جميع الصفحات في نطاق معين."""
Return all pages in a domain.
إرجاع جميع الصفحات في نطاق معين.
"""
pages = await self._load_all_pages() pages = await self._load_all_pages()
try: try: return [p for p in pages if p.page_type == PageType(domain)]
dtype = PageType(domain) except ValueError: return []
return [p for p in pages if p.page_type == dtype]
except ValueError:
logger.warning("نطاق غير معروف: %s", domain)
return []
async def mark_stale(self, page_id: str) -> None: async def mark_stale(self, page_id: str) -> None:
""" """تعليم صفحة كقديمة."""
Mark a page as stale. page = self._cache.get(page_id)
تعليم صفحة كقديمة.
"""
page = self._page_cache.get(page_id)
if not page: if not page:
pages = await self._load_all_pages() for p in await self._load_all_pages():
for p in pages: if p.id == page_id: page = p; break
if p.id == page_id: if not page: logger.error("صفحة غير موجودة: %s", page_id); return
page = p
break
if not page:
logger.error("صفحة غير موجودة: %s", page_id)
return
page.stale = True page.stale = True
file_path = Path(page.file_path) fp = Path(page.file_path)
if file_path.exists(): if fp.exists():
content = file_path.read_text(encoding="utf-8") fp.write_text(re.sub(r"\*\*Stale\*\*:\s*false", "**Stale**: true",
content = re.sub( fp.read_text(encoding="utf-8")), encoding="utf-8")
r"\*\*Stale\*\*:\s*false",
"**Stale**: true",
content,
)
file_path.write_text(content, encoding="utf-8")
logger.info("تم تعليم الصفحة كقديمة: %s", page.title)
async def promote_raw( async def promote_raw(self, raw_id: str, raw_content: str = None, metadata: dict[str, Any] = None) -> WikiPage:
self, """تحويل مادة خام إلى صفحة ويكي منظمة."""
raw_id: str, md = metadata or {}
raw_content: str = None,
metadata: dict[str, Any] = None,
) -> WikiPage:
"""
Convert raw material to structured wiki page.
تحويل مادة خام إلى صفحة ويكي منظمة.
"""
metadata = metadata or {}
if raw_content is None: if raw_content is None:
raw_path = self.memory_dir / "raw" / f"{raw_id}.md" rp = self.memory_dir / "raw" / f"{raw_id}.md"
if raw_path.exists(): if rp.exists(): raw_content = rp.read_text(encoding="utf-8")
raw_content = raw_path.read_text(encoding="utf-8") else: raise FileNotFoundError(f"المادة الخام غير موجودة: {raw_id}")
else: return await self.ingest("raw_promotion", raw_content, {
raise FileNotFoundError(f"المادة الخام غير موجودة: {raw_id}") "title": md.get("title", f"Promoted — {raw_id}"),
"title_ar": md.get("title_ar", f"مروّج — {raw_id}"),
title = metadata.get("title", f"Promoted from raw — {raw_id}") "provenance": f"Promoted from raw {raw_id}",
page = await self.ingest( "confidence": md.get("confidence", "medium"), **md})
source_type="raw_promotion",
content=raw_content,
metadata={
"title": title,
"title_ar": metadata.get("title_ar", f"مروّج من مادة خام — {raw_id}"),
"provenance": f"Promoted from raw material {raw_id}",
"confidence": metadata.get("confidence", "medium"),
**metadata,
},
)
logger.info("تمت ترقية المادة الخام إلى صفحة ويكي: %s%s", raw_id, page.title)
return page
# Global singleton
knowledge_brain = KnowledgeBrain() knowledge_brain = KnowledgeBrain()

View File

@ -3,414 +3,186 @@ Tool Receipts — Dealix ToolProof Enhancement
Signed execution receipts, pre-execution policy, and trust analytics. Signed execution receipts, pre-execution policy, and trust analytics.
Extends tool_verification.py with cryptographic receipts and policy enforcement. Extends tool_verification.py with cryptographic receipts and policy enforcement.
""" """
import hashlib import hashlib, logging, uuid
import logging
import uuid
from collections import defaultdict from collections import defaultdict
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from enum import Enum from enum import Enum
from typing import Any, Optional from typing import Any, Optional
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------
class PolicyDecisionType(str, Enum): class PolicyDecisionType(str, Enum):
ALLOW = "allow" ALLOW = "allow"; BLOCK = "block"; HOLD = "hold"
BLOCK = "block"
HOLD = "hold"
class VerificationVerdict(str, Enum): class VerificationVerdict(str, Enum):
VERIFIED = "verified" VERIFIED = "verified"; PARTIALLY_VERIFIED = "partially_verified"
PARTIALLY_VERIFIED = "partially_verified" UNVERIFIED = "unverified"; CONTRADICTED = "contradicted"; BLOCKED = "blocked"
UNVERIFIED = "unverified"
CONTRADICTED = "contradicted"
BLOCKED = "blocked"
# ---------------------------------------------------------------------------
# Models — نماذج البيانات
# ---------------------------------------------------------------------------
class PolicyDecision(BaseModel): class PolicyDecision(BaseModel):
"""Pre-execution policy decision — قرار السياسة قبل التنفيذ""" """قرار السياسة قبل التنفيذ"""
decision: PolicyDecisionType decision: PolicyDecisionType; reason: str; reason_ar: str; tool_name: str
reason: str
reason_ar: str
tool_name: str
requires_approval_from: Optional[str] = None requires_approval_from: Optional[str] = None
pdpl_consent_required: bool = False pdpl_consent_required: bool = False
budget_remaining: Optional[float] = None budget_remaining: Optional[float] = None
class ToolReceipt(BaseModel): class ToolReceipt(BaseModel):
"""Signed execution receipt — إيصال تنفيذ موقّع""" """إيصال تنفيذ موقّع"""
receipt_id: str = Field(default_factory=lambda: str(uuid.uuid4())) receipt_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
run_id: str = "" run_id: str = ""; session_id: str = ""; agent_id: str = ""
session_id: str = "" tool_name: str; parameters: dict[str, Any] = {}
agent_id: str = ""
tool_name: str
parameters: dict[str, Any] = {}
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
execution_result: str = "" execution_result: str = ""; normalized_result: str = ""
normalized_result: str = ""
hash_signature: str = "" hash_signature: str = ""
policy_decision: PolicyDecisionType = PolicyDecisionType.ALLOW policy_decision: PolicyDecisionType = PolicyDecisionType.ALLOW
side_effects: list[str] = [] side_effects: list[str] = []
verification_verdict: VerificationVerdict = VerificationVerdict.UNVERIFIED verification_verdict: VerificationVerdict = VerificationVerdict.UNVERIFIED
cost_estimate: float = 0.0 cost_estimate: float = 0.0; tenant_id: str = ""
tenant_id: str = ""
def compute_hash(self) -> str: def compute_hash(self) -> str:
"""Generate SHA-256 hash of (tool_name + params + result + timestamp).""" payload = f"{self.tool_name}|{'|'.join(f'{k}={v}' for k,v in sorted(self.parameters.items()))}|{self.execution_result}|{self.timestamp.isoformat()}"
payload = ( self.hash_signature = hashlib.sha256(payload.encode()).hexdigest()
f"{self.tool_name}|"
f"{_stable_dict_str(self.parameters)}|"
f"{self.execution_result}|"
f"{self.timestamp.isoformat()}"
)
self.hash_signature = hashlib.sha256(payload.encode("utf-8")).hexdigest()
return self.hash_signature return self.hash_signature
def normalize_result(self) -> str: def normalize_result(self) -> str:
"""Normalize execution result for comparison."""
raw = self.execution_result.lower().strip() raw = self.execution_result.lower().strip()
for noise in ["ok", "success", "done", "completed", "تم", "نجح"]: for w in ["ok","success","done","completed","تم","نجح"]: raw = raw.replace(w, "SUCCESS")
raw = raw.replace(noise, "SUCCESS") for w in ["error","fail","exception","خطأ","فشل"]: raw = raw.replace(w, "ERROR")
for err in ["error", "fail", "exception", "خطأ", "فشل"]: self.normalized_result = raw; return raw
raw = raw.replace(err, "ERROR")
self.normalized_result = raw
return raw
def _stable_dict_str(d: dict) -> str:
"""Deterministic string representation of a dict for hashing."""
return "|".join(f"{k}={v}" for k, v in sorted(d.items()))
# ---------------------------------------------------------------------------
# Pre-Execution Policy — سياسة ما قبل التنفيذ
# ---------------------------------------------------------------------------
class PreExecutionPolicy: class PreExecutionPolicy:
""" """تحديد السماح/الحظر/التعليق قبل تنفيذ الأداة."""
Decide allow/block/hold before tool execution. SAFE_TOOLS = {"read_file","search","query_db_readonly","get_status","list_leads",
تحديد السماح/الحظر/التعليق قبل تنفيذ الأداة. "get_deal","get_pipeline","view_analytics","check_consent","get_sequence_status"}
""" HOLD_TOOLS = {"send_message","send_whatsapp","send_email","send_sms","update_deal",
"assign_lead","create_proposal","change_stage","update_score","create_sequence",
SAFE_TOOLS: set[str] = { "schedule_meeting","update_territory"}
"read_file", "search", "query_db_readonly", "get_status", BLOCK_TOOLS = {"delete_tenant","drop_table","bulk_delete","export_all_data",
"list_leads", "get_deal", "get_pipeline", "view_analytics", "reset_database","delete_all_leads","purge_audit_log","disable_pdpl",
"check_consent", "get_sequence_status", "bypass_consent","modify_permissions_bulk"}
} MSG_TOOLS = {"send_message","send_whatsapp","send_email","send_sms"}
ROLE_PERMS: dict[str, set[str]] = {
HOLD_TOOLS: set[str] = { "owner": SAFE_TOOLS | HOLD_TOOLS, "admin": SAFE_TOOLS | HOLD_TOOLS,
"send_message", "send_whatsapp", "send_email", "send_sms", "manager": SAFE_TOOLS | {"update_deal","assign_lead","create_proposal","change_stage"},
"update_deal", "assign_lead", "create_proposal", "sales_rep": SAFE_TOOLS | {"send_message","send_whatsapp","send_email","update_deal"},
"change_stage", "update_score", "create_sequence", "viewer": SAFE_TOOLS}
"schedule_meeting", "update_territory", BUDGET_LIMIT = 100.0 # SAR per session
} COST_MAP = {"send_whatsapp": 0.15, "send_sms": 0.08, "send_email": 0.02,
"send_message": 0.10, "create_proposal": 0.50, "query_db_readonly": 0.001}
BLOCK_TOOLS: set[str] = {
"delete_tenant", "drop_table", "bulk_delete", "export_all_data",
"reset_database", "delete_all_leads", "purge_audit_log",
"disable_pdpl", "bypass_consent", "modify_permissions_bulk",
}
MESSAGING_TOOLS: set[str] = {
"send_message", "send_whatsapp", "send_email", "send_sms",
}
ROLE_PERMISSIONS: dict[str, set[str]] = {
"owner": SAFE_TOOLS | HOLD_TOOLS,
"admin": SAFE_TOOLS | HOLD_TOOLS,
"manager": SAFE_TOOLS | {"update_deal", "assign_lead", "create_proposal", "change_stage"},
"sales_rep": SAFE_TOOLS | {"send_message", "send_whatsapp", "send_email", "update_deal"},
"viewer": SAFE_TOOLS,
}
DEFAULT_BUDGET_LIMIT: float = 100.0 # SAR per session
def __init__(self): def __init__(self):
self._session_costs: dict[str, float] = defaultdict(float) self._costs: dict[str, float] = defaultdict(float)
def evaluate( def evaluate(self, tool_name: str, params: dict[str, Any], user_context: dict[str, Any]) -> PolicyDecision:
self, role = user_context.get("role", "viewer")
tool_name: str, sid = user_context.get("session_id", "unknown")
params: dict[str, Any], limit = user_context.get("budget_limit", self.BUDGET_LIMIT)
user_context: dict[str, Any],
) -> PolicyDecision:
"""
Check tool against policy classes, role, PDPL, and budget.
فحص الأداة مقابل فئات السياسة والدور والموافقة والميزانية.
"""
user_role = user_context.get("role", "viewer")
session_id = user_context.get("session_id", "unknown")
has_consent = user_context.get("has_consent", False)
budget_limit = user_context.get("budget_limit", self.DEFAULT_BUDGET_LIMIT)
# Class C — absolute block
if tool_name in self.BLOCK_TOOLS: if tool_name in self.BLOCK_TOOLS:
logger.warning( logger.warning("محظور: %s (المستخدم: %s)", tool_name, user_context.get("user_id", "?"))
"محظور: أداة %s محظورة بالكامل (المستخدم: %s)", return PolicyDecision(decision=PolicyDecisionType.BLOCK, tool_name=tool_name,
tool_name, user_context.get("user_id", "unknown"), reason=f"Tool '{tool_name}' is forbidden.", reason_ar=f"الأداة '{tool_name}' محظورة.")
) allowed = self.ROLE_PERMS.get(role, self.SAFE_TOOLS)
return PolicyDecision( if tool_name not in allowed and tool_name not in self.SAFE_TOOLS:
decision=PolicyDecisionType.BLOCK, return PolicyDecision(decision=PolicyDecisionType.BLOCK, tool_name=tool_name,
reason=f"Tool '{tool_name}' is in the BLOCK list. This action is forbidden.", reason=f"Role '{role}' lacks permission for '{tool_name}'.",
reason_ar=f"الأداة '{tool_name}' محظورة. هذا الإجراء ممنوع.", reason_ar=f"الدور '{role}' لا يملك صلاحية '{tool_name}'.")
tool_name=tool_name, if tool_name in self.MSG_TOOLS and not user_context.get("has_consent", False):
) return PolicyDecision(decision=PolicyDecisionType.BLOCK, tool_name=tool_name,
reason="PDPL consent required.", reason_ar="مطلوب موافقة حماية البيانات.",
# Role check pdpl_consent_required=True)
allowed_tools = self.ROLE_PERMISSIONS.get(user_role, self.SAFE_TOOLS) est = self.COST_MAP.get(tool_name, 0.01) * params.get("count", 1)
if tool_name not in allowed_tools and tool_name not in self.SAFE_TOOLS: if self._costs[sid] + est > limit:
return PolicyDecision( return PolicyDecision(decision=PolicyDecisionType.HOLD, tool_name=tool_name,
decision=PolicyDecisionType.BLOCK, reason=f"Budget exceeded ({self._costs[sid]:.2f}+{est:.2f} > {limit:.2f} SAR).",
reason=f"Role '{user_role}' lacks permission for tool '{tool_name}'.", reason_ar=f"تجاوز الميزانية.", budget_remaining=limit - self._costs[sid])
reason_ar=f"الدور '{user_role}' لا يملك صلاحية استخدام الأداة '{tool_name}'.",
tool_name=tool_name,
)
# PDPL consent check for messaging
if tool_name in self.MESSAGING_TOOLS and not has_consent:
return PolicyDecision(
decision=PolicyDecisionType.BLOCK,
reason="PDPL consent required before sending messages.",
reason_ar="مطلوب موافقة نظام حماية البيانات قبل إرسال الرسائل.",
tool_name=tool_name,
pdpl_consent_required=True,
)
# Budget check
estimated_cost = self._estimate_cost(tool_name, params)
current_spent = self._session_costs[session_id]
if current_spent + estimated_cost > budget_limit:
return PolicyDecision(
decision=PolicyDecisionType.HOLD,
reason=f"Budget limit would be exceeded. Spent: {current_spent:.2f}, "
f"estimated: {estimated_cost:.2f}, limit: {budget_limit:.2f} SAR.",
reason_ar=f"سيتم تجاوز حد الميزانية. المصروف: {current_spent:.2f}، "
f"التقدير: {estimated_cost:.2f}، الحد: {budget_limit:.2f} ريال.",
tool_name=tool_name,
budget_remaining=budget_limit - current_spent,
)
# Hold tools need approval
if tool_name in self.HOLD_TOOLS: if tool_name in self.HOLD_TOOLS:
approver = "manager" if user_role == "sales_rep" else "admin" approver = "manager" if role == "sales_rep" else "admin"
return PolicyDecision( return PolicyDecision(decision=PolicyDecisionType.HOLD, tool_name=tool_name,
decision=PolicyDecisionType.HOLD, reason=f"'{tool_name}' requires approval.", reason_ar=f"'{tool_name}' تتطلب موافقة.",
reason=f"Tool '{tool_name}' requires approval before execution.", requires_approval_from=approver)
reason_ar=f"الأداة '{tool_name}' تتطلب موافقة قبل التنفيذ.", return PolicyDecision(decision=PolicyDecisionType.ALLOW, tool_name=tool_name,
tool_name=tool_name, reason=f"'{tool_name}' is safe.", reason_ar=f"'{tool_name}' آمنة.")
requires_approval_from=approver,
)
# Safe tools — allow
return PolicyDecision(
decision=PolicyDecisionType.ALLOW,
reason=f"Tool '{tool_name}' is safe for execution.",
reason_ar=f"الأداة '{tool_name}' آمنة للتنفيذ.",
tool_name=tool_name,
)
def record_cost(self, session_id: str, cost: float) -> None: def record_cost(self, session_id: str, cost: float) -> None:
"""Record actual cost for budget tracking.""" self._costs[session_id] += cost
self._session_costs[session_id] += cost
def _estimate_cost(self, tool_name: str, params: dict) -> float:
"""Estimate cost in SAR for a tool call."""
cost_map = {
"send_whatsapp": 0.15,
"send_sms": 0.08,
"send_email": 0.02,
"send_message": 0.10,
"create_proposal": 0.50,
"query_db_readonly": 0.001,
"search": 0.001,
}
base = cost_map.get(tool_name, 0.01)
# Bulk operations cost more
if "count" in params or "bulk" in tool_name:
base *= params.get("count", 1)
return base
# ---------------------------------------------------------------------------
# Receipt Store — مخزن الإيصالات
# ---------------------------------------------------------------------------
class ReceiptStore: class ReceiptStore:
"""In-memory receipt storage with query capabilities.""" """مخزن الإيصالات في الذاكرة"""
def __init__(self, max_size: int = 50000): def __init__(self, max_size: int = 50000):
self._receipts: list[ToolReceipt] = [] self._receipts: list[ToolReceipt] = []; self._max = max_size
self._max_size = max_size
def store(self, receipt: ToolReceipt) -> str: def store(self, receipt: ToolReceipt) -> str:
"""Store a receipt and return its ID.""" receipt.compute_hash(); receipt.normalize_result()
receipt.compute_hash()
receipt.normalize_result()
self._receipts.append(receipt) self._receipts.append(receipt)
if len(self._receipts) > self._max_size: if len(self._receipts) > self._max: self._receipts = self._receipts[-self._max:]
self._receipts = self._receipts[-self._max_size:] logger.info("إيصال: %s أداة=%s حكم=%s", receipt.receipt_id, receipt.tool_name, receipt.verification_verdict.value)
logger.info(
"إيصال محفوظ: %s أداة=%s حكم=%s",
receipt.receipt_id, receipt.tool_name, receipt.verification_verdict.value,
)
return receipt.receipt_id return receipt.receipt_id
def get(self, receipt_id: str) -> Optional[ToolReceipt]: def get(self, receipt_id: str) -> Optional[ToolReceipt]:
for r in self._receipts: return next((r for r in self._receipts if r.receipt_id == receipt_id), None)
if r.receipt_id == receipt_id:
return r
return None
def query( def query(self, agent_id: str = None, tool_name: str = None,
self, verdict: VerificationVerdict = None, since: datetime = None, limit: int = 100) -> list[ToolReceipt]:
agent_id: str = None, r = self._receipts
tool_name: str = None, if agent_id: r = [x for x in r if x.agent_id == agent_id]
verdict: VerificationVerdict = None, if tool_name: r = [x for x in r if x.tool_name == tool_name]
since: datetime = None, if verdict: r = [x for x in r if x.verification_verdict == verdict]
limit: int = 100, if since: r = [x for x in r if x.timestamp >= since]
) -> list[ToolReceipt]: return r[-limit:]
results = self._receipts
if agent_id:
results = [r for r in results if r.agent_id == agent_id]
if tool_name:
results = [r for r in results if r.tool_name == tool_name]
if verdict:
results = [r for r in results if r.verification_verdict == verdict]
if since:
results = [r for r in results if r.timestamp >= since]
return results[-limit:]
# ---------------------------------------------------------------------------
# Trust Analytics — تحليلات الثقة
# ---------------------------------------------------------------------------
class TrustAnalytics: class TrustAnalytics:
""" """تتبع مقاييس الثقة عبر سير عمل الوكلاء"""
Track trust metrics across agent workflows. WEIGHTS = {VerificationVerdict.VERIFIED: 1.0, VerificationVerdict.PARTIALLY_VERIFIED: 0.6,
تتبع مقاييس الثقة عبر سير عمل الوكلاء. VerificationVerdict.UNVERIFIED: 0.3, VerificationVerdict.CONTRADICTED: 0.0,
""" VerificationVerdict.BLOCKED: 0.2}
def __init__(self, store: ReceiptStore): def __init__(self, store: ReceiptStore):
self._store = store self._store = store
def get_trust_score(self, agent_id: str) -> float: def get_trust_score(self, agent_id: str) -> float:
""" recs = self._store.query(agent_id=agent_id, limit=500)
Trust score 0-1 for an agent based on verification history. if not recs: return 0.5
درجة الثقة 0-1 للوكيل بناءً على سجل التحقق. return round(sum(self.WEIGHTS.get(r.verification_verdict, 0.3) for r in recs) / len(recs), 4)
"""
receipts = self._store.query(agent_id=agent_id, limit=500)
if not receipts:
return 0.5 # Neutral for unknown agents
weights = {
VerificationVerdict.VERIFIED: 1.0,
VerificationVerdict.PARTIALLY_VERIFIED: 0.6,
VerificationVerdict.UNVERIFIED: 0.3,
VerificationVerdict.CONTRADICTED: 0.0,
VerificationVerdict.BLOCKED: 0.2,
}
total_weight = sum(weights.get(r.verification_verdict, 0.3) for r in receipts)
return round(total_weight / len(receipts), 4)
def get_contradiction_rate(self, agent_id: str) -> float: def get_contradiction_rate(self, agent_id: str) -> float:
""" recs = self._store.query(agent_id=agent_id, limit=500)
Contradiction rate for an agent. if not recs: return 0.0
معدل التناقض للوكيل. return round(sum(1 for r in recs if r.verification_verdict == VerificationVerdict.CONTRADICTED) / len(recs), 4)
"""
receipts = self._store.query(agent_id=agent_id, limit=500)
if not receipts:
return 0.0
contradictions = sum(
1 for r in receipts
if r.verification_verdict == VerificationVerdict.CONTRADICTED
)
return round(contradictions / len(receipts), 4)
def get_cost_by_agent(self, period_days: int = 30) -> dict[str, float]: def get_cost_by_agent(self, period_days: int = 30) -> dict[str, float]:
"""
Total cost per agent in period.
إجمالي التكلفة لكل وكيل خلال الفترة.
"""
since = datetime.now(timezone.utc) - timedelta(days=period_days) since = datetime.now(timezone.utc) - timedelta(days=period_days)
receipts = self._store.query(since=since, limit=50000)
costs: dict[str, float] = defaultdict(float) costs: dict[str, float] = defaultdict(float)
for r in receipts: for r in self._store.query(since=since, limit=50000): costs[r.agent_id] += r.cost_estimate
costs[r.agent_id] += r.cost_estimate
return {k: round(v, 4) for k, v in costs.items()} return {k: round(v, 4) for k, v in costs.items()}
def get_blocked_attempts(self, period_days: int = 30) -> list[ToolReceipt]: def get_blocked_attempts(self, period_days: int = 30) -> list[ToolReceipt]:
""" return self._store.query(verdict=VerificationVerdict.BLOCKED,
All blocked tool attempts in period. since=datetime.now(timezone.utc) - timedelta(days=period_days), limit=1000)
جميع محاولات الأدوات المحظورة خلال الفترة.
"""
since = datetime.now(timezone.utc) - timedelta(days=period_days)
return self._store.query(
verdict=VerificationVerdict.BLOCKED, since=since, limit=1000
)
def get_hallucination_suspects(self) -> list[ToolReceipt]: def get_hallucination_suspects(self) -> list[ToolReceipt]:
""" return [r for r in self._store.query(limit=5000)
Claims without matching receipts possible hallucinations. if r.verification_verdict == VerificationVerdict.CONTRADICTED
ادعاءات بدون إيصالات مطابقة هلوسات محتملة. or (r.verification_verdict == VerificationVerdict.UNVERIFIED
""" and not r.execution_result and r.tool_name not in PreExecutionPolicy.SAFE_TOOLS)]
all_receipts = self._store.query(limit=5000)
suspects = []
for r in all_receipts:
if r.verification_verdict == VerificationVerdict.CONTRADICTED:
suspects.append(r)
elif (
r.verification_verdict == VerificationVerdict.UNVERIFIED
and r.execution_result == ""
and r.tool_name not in PreExecutionPolicy.SAFE_TOOLS
):
suspects.append(r)
return suspects
def get_summary(self, agent_id: str = None) -> dict[str, Any]: def get_summary(self, agent_id: str = None) -> dict[str, Any]:
""" recs = self._store.query(agent_id=agent_id, limit=10000)
Overall trust summary. if not recs: return {"total": 0, "trust_score": 0.5, "message_ar": "لا توجد بيانات"}
ملخص الثقة العام. by_v: dict[str, int] = defaultdict(int)
""" cost = 0.0
receipts = self._store.query(agent_id=agent_id, limit=10000) for r in recs: by_v[r.verification_verdict.value] += 1; cost += r.cost_estimate
total = len(receipts) ts = self.get_trust_score(agent_id) if agent_id else 0.5
if total == 0: return {"total": len(recs), "by_verdict": dict(by_v), "trust_score": ts,
return {"total": 0, "trust_score": 0.5, "message_ar": "لا توجد بيانات"} "total_cost_sar": round(cost, 2),
"contradiction_rate": round(by_v.get("contradicted", 0) / len(recs) * 100, 2),
"message_ar": f"عمليات: {len(recs)}، ثقة: {ts:.2f}"}
by_verdict: dict[str, int] = defaultdict(int)
total_cost = 0.0
for r in receipts:
by_verdict[r.verification_verdict.value] += 1
total_cost += r.cost_estimate
trust = self.get_trust_score(agent_id) if agent_id else 0.5
return {
"total": total,
"by_verdict": dict(by_verdict),
"trust_score": trust,
"total_cost_sar": round(total_cost, 2),
"contradiction_rate": round(
by_verdict.get("contradicted", 0) / total * 100, 2
),
"message_ar": f"إجمالي العمليات: {total}، درجة الثقة: {trust:.2f}",
}
# ---------------------------------------------------------------------------
# Global singletons
# ---------------------------------------------------------------------------
pre_execution_policy = PreExecutionPolicy() pre_execution_policy = PreExecutionPolicy()
receipt_store = ReceiptStore() receipt_store = ReceiptStore()