feat: Add knowledge brain, memory engine, tool receipts, session continuity

Final layer integration (Second Brain + MemPalace + ToolProof + Claude Code):

- knowledge_brain.py: Project wiki ingest, query, lint, promote raw→wiki (560 lines)
- memory_engine.py: Pluggable memory with Redis + File adapters, evaluator (615 lines)
- tool_receipts.py: Signed receipts, pre-execution policy, trust analytics (417 lines)
- session_continuity.py: AI session state management, restore prompts (478 lines)
- glossary.md: 30+ bilingual terms (Arabic/English)
- master-index.md: Top-level index linking all wiki/memory sections

https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj
This commit is contained in:
Claude 2026-04-11 08:19:56 +00:00
parent afd37142fe
commit 35e857ec52
No known key found for this signature in database
6 changed files with 2230 additions and 0 deletions

View File

@ -0,0 +1,560 @@
"""
Knowledge Brain Dealix Second Brain Service
Project knowledge management: ingest, query, lint, index.
Manages the wiki layer in memory/wiki/ and indexes in memory/indexes/.
"""
import logging
import os
import re
import uuid
from datetime import datetime, timedelta, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
WIKI_DIR = Path(__file__).resolve().parents[4] / "memory" / "wiki"
INDEX_DIR = Path(__file__).resolve().parents[4] / "memory" / "indexes"
MEMORY_DIR = Path(__file__).resolve().parents[4] / "memory"
STALE_THRESHOLD_DAYS = 30
class PageType(str, Enum):
ARCHITECTURE = "architecture"
PRODUCT = "product"
GTM = "gtm"
CUSTOMER = "customer"
OPERATIONS = "operations"
SECURITY = "security"
TOOLING = "tooling"
GLOSSARY = "glossary"
class Confidence(str, Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class IssueSeverity(str, Enum):
ERROR = "error"
WARNING = "warning"
INFO = "info"
class WikiPage(BaseModel):
"""Structured wiki page — صفحة ويكي منظمة"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
title: str
title_ar: str = ""
page_type: PageType
summary: str
summary_ar: str
key_facts: list[str] = []
provenance: str
confidence: Confidence = Confidence.MEDIUM
related_pages: list[str] = []
last_updated: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
stale: bool = False
file_path: str = ""
body: str = ""
class Config:
json_schema_extra = {
"example": {
"title": "System Architecture",
"title_ar": "بنية النظام",
"page_type": "architecture",
"summary": "Multi-tenant AI CRM architecture overview",
"summary_ar": "نظرة عامة على بنية إدارة علاقات العملاء متعددة المستأجرين",
}
}
class BrainAnswer(BaseModel):
"""Answer from the knowledge brain — إجابة من الدماغ المعرفي"""
question: str
answer: str
answer_ar: str = ""
sources: list[str] = []
confidence: Confidence = Confidence.LOW
related_pages: list[str] = []
class BrainIssue(BaseModel):
"""Quality issue found during lint — مشكلة جودة مكتشفة أثناء الفحص"""
issue_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
severity: IssueSeverity
category: str
title: str
title_ar: str
description: str
affected_page: str = ""
recommendation: str = ""
class KnowledgeBrain:
"""
Project knowledge management ingest, query, lint.
إدارة المعرفة المشروعية استيعاب، استعلام، فحص.
"""
def __init__(self, wiki_dir: Path = None, memory_dir: Path = None):
self.wiki_dir = wiki_dir or WIKI_DIR
self.memory_dir = memory_dir or MEMORY_DIR
self.index_dir = INDEX_DIR
self._page_cache: dict[str, WikiPage] = {}
self._ensure_dirs()
def _ensure_dirs(self) -> None:
self.wiki_dir.mkdir(parents=True, exist_ok=True)
self.index_dir.mkdir(parents=True, exist_ok=True)
def _parse_frontmatter(self, content: str, file_path: str) -> WikiPage:
"""Parse wiki page frontmatter into a WikiPage model."""
lines = content.split("\n")
title = ""
fields: dict[str, Any] = {}
body_start = 0
for i, line in enumerate(lines):
stripped = line.strip()
if stripped.startswith("# "):
title = stripped[2:].strip()
elif stripped == "---":
body_start = i + 1
break
elif stripped.startswith("**") and "**:" in stripped:
match = re.match(r"\*\*(.+?)\*\*:\s*(.*)", stripped)
if match:
key = match.group(1).lower().replace(" ", "_")
value = match.group(2).strip()
fields[key] = value
body = "\n".join(lines[body_start:]).strip() if body_start > 0 else ""
key_facts = []
if "key_facts" in fields:
fact_pattern = re.compile(r"^\s*-\s+(.+)$")
in_facts = False
for line in lines:
if "**Key Facts**" in line:
in_facts = True
continue
if in_facts:
fact_match = fact_pattern.match(line)
if fact_match:
key_facts.append(fact_match.group(1).strip())
elif line.strip().startswith("**"):
break
related = []
if "related_pages" in fields:
link_pattern = re.compile(r"\[.+?\]\((.+?)\)")
related = link_pattern.findall(fields["related_pages"])
page_type = PageType.ARCHITECTURE
type_val = fields.get("type", "architecture").lower()
for pt in PageType:
if pt.value == type_val:
page_type = pt
break
conf = Confidence.MEDIUM
conf_val = fields.get("confidence", "medium").lower()
for c in Confidence:
if c.value == conf_val:
conf = c
break
last_updated = datetime.now(timezone.utc)
if "last_updated" in fields:
try:
last_updated = datetime.strptime(
fields["last_updated"], "%Y-%m-%d"
).replace(tzinfo=timezone.utc)
except ValueError:
pass
stale = fields.get("stale", "false").lower() == "true"
return WikiPage(
title=title,
title_ar=fields.get("title_ar", ""),
page_type=page_type,
summary=fields.get("summary", ""),
summary_ar=fields.get("summary_ar", ""),
key_facts=key_facts,
provenance=fields.get("provenance", ""),
confidence=conf,
related_pages=related,
last_updated=last_updated,
stale=stale,
file_path=file_path,
body=body,
)
async def _load_all_pages(self) -> list[WikiPage]:
"""Load and parse all wiki pages."""
pages = []
if not self.wiki_dir.exists():
return pages
for md_file in sorted(self.wiki_dir.glob("*.md")):
if md_file.name == "README.md":
continue
try:
content = md_file.read_text(encoding="utf-8")
page = self._parse_frontmatter(content, str(md_file))
self._page_cache[page.id] = page
pages.append(page)
except Exception as exc:
logger.warning("فشل تحميل الصفحة %s: %s", md_file.name, exc)
return pages
async def ingest(
self,
source_type: str,
content: str,
metadata: dict[str, Any] = None,
) -> WikiPage:
"""
Classify content, create/update wiki page, link related pages.
تصنيف المحتوى، إنشاء/تحديث صفحة ويكي، ربط الصفحات ذات الصلة.
"""
metadata = metadata or {}
title = metadata.get("title", f"Ingested — {source_type}")
title_ar = metadata.get("title_ar", f"مستوعب — {source_type}")
page_type = self._classify_content(source_type, content)
summary = content[:120].replace("\n", " ").strip()
summary_ar = metadata.get("summary_ar", f"محتوى {source_type} مستوعب تلقائياً")
existing_pages = await self._load_all_pages()
related = self._find_related(content, existing_pages)
page = WikiPage(
title=title,
title_ar=title_ar,
page_type=page_type,
summary=summary,
summary_ar=summary_ar,
key_facts=metadata.get("key_facts", []),
provenance=metadata.get("provenance", f"Auto-ingested from {source_type}"),
confidence=Confidence(metadata.get("confidence", "medium")),
related_pages=[p.file_path for p in related[:5]],
body=content,
)
file_name = re.sub(r"[^\w\s-]", "", title.lower()).replace(" ", "-")[:50]
file_path = self.wiki_dir / f"{file_name}.md"
page.file_path = str(file_path)
md_content = self._render_page(page)
file_path.write_text(md_content, encoding="utf-8")
self._page_cache[page.id] = page
logger.info("تم استيعاب صفحة جديدة: %s (%s)", title, page_type.value)
return page
def _classify_content(self, source_type: str, content: str) -> PageType:
"""Classify content into a page type based on keywords."""
content_lower = content.lower()
keyword_map = {
PageType.ARCHITECTURE: ["api", "database", "service", "backend", "frontend", "deploy"],
PageType.PRODUCT: ["feature", "roadmap", "user story", "requirement", "ميزة"],
PageType.GTM: ["launch", "marketing", "outreach", "growth", "campaign", "تسويق"],
PageType.CUSTOMER: ["customer", "interview", "feedback", "icp", "عميل"],
PageType.OPERATIONS: ["runbook", "checklist", "process", "deploy", "عملية"],
PageType.SECURITY: ["pdpl", "consent", "security", "compliance", "أمان"],
PageType.TOOLING: ["provider", "api key", "integration", "tool", "أداة"],
}
scores: dict[PageType, int] = {}
for ptype, keywords in keyword_map.items():
scores[ptype] = sum(1 for kw in keywords if kw in content_lower)
if source_type in ("adr", "architecture"):
return PageType.ARCHITECTURE
if source_type in ("customer_interview", "feedback"):
return PageType.CUSTOMER
best = max(scores, key=lambda k: scores[k])
return best if scores[best] > 0 else PageType.PRODUCT
def _find_related(self, content: str, pages: list[WikiPage]) -> list[WikiPage]:
"""Find related pages by keyword overlap."""
content_words = set(content.lower().split())
scored: list[tuple[WikiPage, int]] = []
for page in pages:
page_words = set(page.summary.lower().split()) | set(page.body.lower().split()[:100])
overlap = len(content_words & page_words)
if overlap > 2:
scored.append((page, overlap))
scored.sort(key=lambda x: x[1], reverse=True)
return [p for p, _ in scored[:5]]
def _render_page(self, page: WikiPage) -> str:
"""Render a WikiPage model to markdown."""
facts = "\n".join(f" - {f}" for f in page.key_facts) if page.key_facts else " - (none)"
related = ", ".join(
f"[{Path(r).stem}]({r})" for r in page.related_pages
) if page.related_pages else "(none)"
date_str = page.last_updated.strftime("%Y-%m-%d")
return f"""# {page.title}
**Type**: {page.page_type.value}
**Summary**: {page.summary}
**Summary_AR**: {page.summary_ar}
**Key Facts**:
{facts}
**Provenance**: {page.provenance}
**Confidence**: {page.confidence.value}
**Related Pages**: {related}
**Last Updated**: {date_str}
**Stale**: {str(page.stale).lower()}
---
{page.body}
"""
async def query(
self, question: str, domain: str = None
) -> BrainAnswer:
"""
Search wiki + memory for relevant answers.
البحث في الويكي والذاكرة عن إجابات ذات صلة.
"""
pages = await self._load_all_pages()
if domain:
try:
dtype = PageType(domain)
pages = [p for p in pages if p.page_type == dtype]
except ValueError:
pass
question_lower = question.lower()
question_words = set(question_lower.split())
scored: list[tuple[WikiPage, float]] = []
for page in pages:
searchable = f"{page.title} {page.summary} {page.body} {' '.join(page.key_facts)}".lower()
searchable_words = set(searchable.split())
overlap = len(question_words & searchable_words)
if overlap > 0:
score = overlap / max(len(question_words), 1)
if page.confidence == Confidence.HIGH:
score *= 1.3
elif page.confidence == Confidence.LOW:
score *= 0.7
scored.append((page, score))
scored.sort(key=lambda x: x[1], reverse=True)
top_pages = scored[:3]
if not top_pages:
return BrainAnswer(
question=question,
answer="لم يتم العثور على معلومات ذات صلة في قاعدة المعرفة.",
answer_ar="لم يتم العثور على معلومات ذات صلة في قاعدة المعرفة.",
confidence=Confidence.LOW,
)
best_page = top_pages[0][0]
best_score = top_pages[0][1]
answer_parts = [best_page.summary]
if best_page.key_facts:
answer_parts.append("Key facts: " + "; ".join(best_page.key_facts[:3]))
conf = Confidence.HIGH if best_score > 0.5 else (Confidence.MEDIUM if best_score > 0.2 else Confidence.LOW)
return BrainAnswer(
question=question,
answer=" ".join(answer_parts),
answer_ar=best_page.summary_ar or "لا يوجد ملخص عربي",
sources=[p.file_path for p, _ in top_pages],
confidence=conf,
related_pages=[p.file_path for p, _ in top_pages],
)
async def lint(self) -> list[BrainIssue]:
"""
Check for: orphan pages, stale pages, missing provenance, duplicates, empty indexes.
فحص: صفحات يتيمة، صفحات قديمة، مصدر مفقود، تكرارات، فهارس فارغة.
"""
issues: list[BrainIssue] = []
pages = await self._load_all_pages()
now = datetime.now(timezone.utc)
all_paths = {p.file_path for p in pages}
all_related_targets: set[str] = set()
for page in pages:
for rel in page.related_pages:
resolved = str((Path(page.file_path).parent / rel).resolve())
all_related_targets.add(resolved)
# Stale check (>30 days)
age = (now - page.last_updated).days
if age > STALE_THRESHOLD_DAYS:
issues.append(BrainIssue(
severity=IssueSeverity.WARNING,
category="stale",
title=f"Stale page: {page.title}",
title_ar=f"صفحة قديمة: {page.title}",
description=f"Last updated {age} days ago (threshold: {STALE_THRESHOLD_DAYS}).",
affected_page=page.file_path,
recommendation="Review and update or archive this page.",
))
# Missing provenance
if not page.provenance or page.provenance.strip() == "":
issues.append(BrainIssue(
severity=IssueSeverity.ERROR,
category="provenance",
title=f"Missing provenance: {page.title}",
title_ar=f"مصدر مفقود: {page.title}",
description="Page has no provenance. All pages must cite their source.",
affected_page=page.file_path,
recommendation="Add provenance field with source reference.",
))
# Missing Arabic summary
if not page.summary_ar:
issues.append(BrainIssue(
severity=IssueSeverity.WARNING,
category="i18n",
title=f"Missing Arabic summary: {page.title}",
title_ar=f"ملخص عربي مفقود: {page.title}",
description="Page is missing summary_ar. Dealix is Arabic-first.",
affected_page=page.file_path,
recommendation="Add an Arabic summary.",
))
# Orphan check
for page in pages:
resolved_path = str(Path(page.file_path).resolve())
if resolved_path not in all_related_targets and page.page_type != PageType.GLOSSARY:
issues.append(BrainIssue(
severity=IssueSeverity.INFO,
category="orphan",
title=f"Orphan page: {page.title}",
title_ar=f"صفحة يتيمة: {page.title}",
description="No other pages link to this page.",
affected_page=page.file_path,
recommendation="Add a link from a related page or index.",
))
# Duplicate check by title similarity
titles = [(p.title.lower().strip(), p) for p in pages]
seen: set[str] = set()
for title, page in titles:
if title in seen:
issues.append(BrainIssue(
severity=IssueSeverity.WARNING,
category="duplicate",
title=f"Possible duplicate: {page.title}",
title_ar=f"تكرار محتمل: {page.title}",
description=f"Multiple pages with title '{page.title}'.",
affected_page=page.file_path,
recommendation="Merge duplicate pages.",
))
seen.add(title)
# Empty index check
if self.index_dir.exists():
for idx_file in self.index_dir.glob("*.md"):
content = idx_file.read_text(encoding="utf-8")
if len(content.strip()) < 50:
issues.append(BrainIssue(
severity=IssueSeverity.WARNING,
category="empty_index",
title=f"Empty index: {idx_file.name}",
title_ar=f"فهرس فارغ: {idx_file.name}",
description="Index file has very little content.",
affected_page=str(idx_file),
recommendation="Populate or remove the index.",
))
logger.info("فحص الدماغ المعرفي: %d مشكلة مكتشفة", len(issues))
return issues
async def get_index(self, domain: str) -> list[WikiPage]:
"""
Return all pages in a domain.
إرجاع جميع الصفحات في نطاق معين.
"""
pages = await self._load_all_pages()
try:
dtype = PageType(domain)
return [p for p in pages if p.page_type == dtype]
except ValueError:
logger.warning("نطاق غير معروف: %s", domain)
return []
async def mark_stale(self, page_id: str) -> None:
"""
Mark a page as stale.
تعليم صفحة كقديمة.
"""
page = self._page_cache.get(page_id)
if not page:
pages = await self._load_all_pages()
for p in pages:
if p.id == page_id:
page = p
break
if not page:
logger.error("صفحة غير موجودة: %s", page_id)
return
page.stale = True
file_path = Path(page.file_path)
if file_path.exists():
content = file_path.read_text(encoding="utf-8")
content = re.sub(
r"\*\*Stale\*\*:\s*false",
"**Stale**: true",
content,
)
file_path.write_text(content, encoding="utf-8")
logger.info("تم تعليم الصفحة كقديمة: %s", page.title)
async def promote_raw(
self,
raw_id: str,
raw_content: str = None,
metadata: dict[str, Any] = None,
) -> WikiPage:
"""
Convert raw material to structured wiki page.
تحويل مادة خام إلى صفحة ويكي منظمة.
"""
metadata = metadata or {}
if raw_content is None:
raw_path = self.memory_dir / "raw" / f"{raw_id}.md"
if raw_path.exists():
raw_content = raw_path.read_text(encoding="utf-8")
else:
raise FileNotFoundError(f"المادة الخام غير موجودة: {raw_id}")
title = metadata.get("title", f"Promoted from raw — {raw_id}")
page = await self.ingest(
source_type="raw_promotion",
content=raw_content,
metadata={
"title": title,
"title_ar": metadata.get("title_ar", f"مروّج من مادة خام — {raw_id}"),
"provenance": f"Promoted from raw material {raw_id}",
"confidence": metadata.get("confidence", "medium"),
**metadata,
},
)
logger.info("تمت ترقية المادة الخام إلى صفحة ويكي: %s%s", raw_id, page.title)
return page
# Global singleton
knowledge_brain = KnowledgeBrain()

View File

@ -0,0 +1,615 @@
"""
Memory Engine Dealix MemPalace Pattern
Pluggable memory adapter with evaluation and quality checks.
Supports Redis (production) and file-based (local/offline) backends.
"""
import json
import logging
import os
import re
import uuid
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from difflib import SequenceMatcher
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
MEMORY_BASE_DIR = Path(__file__).resolve().parents[4] / "memory"
STALENESS_DAYS = 30
# ---------------------------------------------------------------------------
# Models — نماذج البيانات
# ---------------------------------------------------------------------------
class MemoryDomain(str):
"""Domain tag for memory items."""
pass
class MemoryItem(BaseModel):
"""A single memory item — عنصر ذاكرة واحد"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
domain: str = "project" # project, customer, deal, competitor, prompt
content: str
metadata: dict[str, Any] = {}
source: str = ""
confidence: float = Field(default=0.7, ge=0.0, le=1.0)
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
access_count: int = 0
is_canonical: bool = True # True = business data, False = derived/AI-generated
retention_days: int = 0 # 0 = permanent
tenant_id: str = ""
tags: list[str] = []
class Config:
json_schema_extra = {
"example": {
"domain": "customer",
"content": "Acme Corp prefers WhatsApp for all communication",
"source": "customer_interview_2026-04-10",
"confidence": 0.9,
"is_canonical": True,
}
}
class EvalResult(BaseModel):
"""Evaluation result for memory quality — نتيجة تقييم جودة الذاكرة"""
total_queries: int = 0
correct_retrievals: int = 0
precision: float = 0.0
recall: float = 0.0
avg_rank: float = 0.0
message_ar: str = ""
class MemoryStats(BaseModel):
"""Memory store statistics — إحصائيات مخزن الذاكرة"""
total_items: int = 0
by_domain: dict[str, int] = {}
canonical_count: int = 0
derived_count: int = 0
avg_confidence: float = 0.0
oldest_item: Optional[datetime] = None
newest_item: Optional[datetime] = None
message_ar: str = ""
# ---------------------------------------------------------------------------
# Abstract Adapter — المحول المجرد
# ---------------------------------------------------------------------------
class MemoryAdapter(ABC):
"""Abstract adapter — swap backends without rewriting app."""
@abstractmethod
async def store(self, item: MemoryItem) -> str:
"""Store item, return ID — تخزين عنصر وإرجاع المعرف"""
@abstractmethod
async def retrieve(
self, query: str, domain: str = None, limit: int = 5
) -> list[MemoryItem]:
"""Retrieve matching items — استرجاع العناصر المطابقة"""
@abstractmethod
async def update(self, item_id: str, content: str) -> bool:
"""Update item content — تحديث محتوى العنصر"""
@abstractmethod
async def delete(self, item_id: str) -> bool:
"""Delete item — حذف العنصر"""
@abstractmethod
async def search_by_entity(
self, entity_type: str, entity_id: str
) -> list[MemoryItem]:
"""Search by entity reference — البحث بمرجع الكيان"""
@abstractmethod
async def get_stats(self) -> MemoryStats:
"""Return store statistics — إرجاع إحصائيات المخزن"""
@abstractmethod
async def list_all(self, domain: str = None) -> list[MemoryItem]:
"""List all items optionally filtered by domain."""
# ---------------------------------------------------------------------------
# Redis Adapter — محول ريدس
# ---------------------------------------------------------------------------
class RedisMemoryAdapter(MemoryAdapter):
"""
Redis-backed memory for fast retrieval.
Uses Redis Search if available, falls back to key scanning.
ذاكرة مدعومة بريدس للاسترجاع السريع.
"""
KEY_PREFIX = "dealix:memory:"
def __init__(self, redis_client: Any = None, redis_url: str = None):
self._redis = redis_client
self._redis_url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379/0")
self._connected = False
async def _ensure_connection(self) -> None:
if self._redis is not None and self._connected:
return
try:
import redis.asyncio as aioredis
self._redis = aioredis.from_url(
self._redis_url, decode_responses=True
)
await self._redis.ping()
self._connected = True
logger.info("تم الاتصال بريدس: %s", self._redis_url)
except Exception as exc:
logger.warning("فشل الاتصال بريدس: %s — سيتم استخدام الذاكرة المحلية", exc)
self._connected = False
raise
def _key(self, item_id: str) -> str:
return f"{self.KEY_PREFIX}{item_id}"
def _domain_key(self, domain: str) -> str:
return f"{self.KEY_PREFIX}domain:{domain}"
def _entity_key(self, entity_type: str, entity_id: str) -> str:
return f"{self.KEY_PREFIX}entity:{entity_type}:{entity_id}"
async def store(self, item: MemoryItem) -> str:
await self._ensure_connection()
data = item.model_dump(mode="json")
data["created_at"] = item.created_at.isoformat()
data["updated_at"] = item.updated_at.isoformat()
pipe = self._redis.pipeline()
pipe.set(self._key(item.id), json.dumps(data, ensure_ascii=False))
pipe.sadd(self._domain_key(item.domain), item.id)
if item.retention_days > 0:
pipe.expire(self._key(item.id), item.retention_days * 86400)
# Index by entity if metadata has entity references
for etype in ("lead_id", "deal_id", "company_id", "tenant_id"):
if etype in item.metadata:
pipe.sadd(self._entity_key(etype, str(item.metadata[etype])), item.id)
await pipe.execute()
logger.info("تم تخزين عنصر ذاكرة في ريدس: %s (%s)", item.id, item.domain)
return item.id
async def retrieve(
self, query: str, domain: str = None, limit: int = 5
) -> list[MemoryItem]:
await self._ensure_connection()
if domain:
ids = await self._redis.smembers(self._domain_key(domain))
else:
all_keys = []
async for key in self._redis.scan_iter(f"{self.KEY_PREFIX}[0-9a-f]*"):
all_keys.append(key.replace(self.KEY_PREFIX, ""))
ids = all_keys
items: list[MemoryItem] = []
query_lower = query.lower()
query_words = set(query_lower.split())
for item_id in ids:
raw = await self._redis.get(self._key(item_id))
if not raw:
continue
data = json.loads(raw)
content_lower = data.get("content", "").lower()
content_words = set(content_lower.split())
overlap = len(query_words & content_words)
if overlap > 0 or query_lower in content_lower:
data["created_at"] = datetime.fromisoformat(data["created_at"])
data["updated_at"] = datetime.fromisoformat(data["updated_at"])
item = MemoryItem(**data)
item.access_count += 1
items.append(item)
items.sort(key=lambda x: x.confidence, reverse=True)
return items[:limit]
async def update(self, item_id: str, content: str) -> bool:
await self._ensure_connection()
raw = await self._redis.get(self._key(item_id))
if not raw:
return False
data = json.loads(raw)
data["content"] = content
data["updated_at"] = datetime.now(timezone.utc).isoformat()
await self._redis.set(self._key(item_id), json.dumps(data, ensure_ascii=False))
return True
async def delete(self, item_id: str) -> bool:
await self._ensure_connection()
raw = await self._redis.get(self._key(item_id))
if not raw:
return False
data = json.loads(raw)
domain = data.get("domain", "project")
pipe = self._redis.pipeline()
pipe.delete(self._key(item_id))
pipe.srem(self._domain_key(domain), item_id)
await pipe.execute()
return True
async def search_by_entity(
self, entity_type: str, entity_id: str
) -> list[MemoryItem]:
await self._ensure_connection()
ids = await self._redis.smembers(self._entity_key(entity_type, entity_id))
items: list[MemoryItem] = []
for item_id in ids:
raw = await self._redis.get(self._key(item_id))
if raw:
data = json.loads(raw)
data["created_at"] = datetime.fromisoformat(data["created_at"])
data["updated_at"] = datetime.fromisoformat(data["updated_at"])
items.append(MemoryItem(**data))
return items
async def get_stats(self) -> MemoryStats:
await self._ensure_connection()
items = await self.list_all()
return _compute_stats(items)
async def list_all(self, domain: str = None) -> list[MemoryItem]:
await self._ensure_connection()
if domain:
ids = await self._redis.smembers(self._domain_key(domain))
else:
ids = set()
async for key in self._redis.scan_iter(f"{self.KEY_PREFIX}[0-9a-f]*"):
ids.add(key.replace(self.KEY_PREFIX, ""))
items = []
for item_id in ids:
raw = await self._redis.get(self._key(item_id))
if raw:
data = json.loads(raw)
data["created_at"] = datetime.fromisoformat(data["created_at"])
data["updated_at"] = datetime.fromisoformat(data["updated_at"])
items.append(MemoryItem(**data))
return items
# ---------------------------------------------------------------------------
# File Adapter — محول الملفات
# ---------------------------------------------------------------------------
class FileMemoryAdapter(MemoryAdapter):
"""
File-based memory for local/offline use.
Stores as JSON files in memory/ directory.
ذاكرة مبنية على الملفات للاستخدام المحلي/غير المتصل.
"""
def __init__(self, base_dir: Path = None):
self.base_dir = base_dir or MEMORY_BASE_DIR / "_store"
self.base_dir.mkdir(parents=True, exist_ok=True)
def _item_path(self, item_id: str) -> Path:
return self.base_dir / f"{item_id}.json"
def _domain_dir(self, domain: str) -> Path:
d = self.base_dir / domain
d.mkdir(parents=True, exist_ok=True)
return d
async def store(self, item: MemoryItem) -> str:
file_path = self._domain_dir(item.domain) / f"{item.id}.json"
data = item.model_dump(mode="json")
data["created_at"] = item.created_at.isoformat()
data["updated_at"] = item.updated_at.isoformat()
file_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
logger.info("تم تخزين عنصر ذاكرة في ملف: %s (%s)", item.id, item.domain)
return item.id
async def retrieve(
self, query: str, domain: str = None, limit: int = 5
) -> list[MemoryItem]:
items = await self.list_all(domain)
query_lower = query.lower()
query_words = set(query_lower.split())
scored: list[tuple[MemoryItem, float]] = []
for item in items:
content_lower = item.content.lower()
content_words = set(content_lower.split())
overlap = len(query_words & content_words)
if overlap > 0 or query_lower in content_lower:
score = (overlap / max(len(query_words), 1)) * item.confidence
scored.append((item, score))
scored.sort(key=lambda x: x[1], reverse=True)
results = [item for item, _ in scored[:limit]]
for item in results:
item.access_count += 1
await self._write_item(item)
return results
async def update(self, item_id: str, content: str) -> bool:
item = await self._find_item(item_id)
if not item:
return False
item.content = content
item.updated_at = datetime.now(timezone.utc)
await self._write_item(item)
return True
async def delete(self, item_id: str) -> bool:
for domain_dir in self.base_dir.iterdir():
if not domain_dir.is_dir():
continue
path = domain_dir / f"{item_id}.json"
if path.exists():
path.unlink()
logger.info("تم حذف عنصر ذاكرة: %s", item_id)
return True
return False
async def search_by_entity(
self, entity_type: str, entity_id: str
) -> list[MemoryItem]:
all_items = await self.list_all()
return [
item for item in all_items
if str(item.metadata.get(entity_type, "")) == str(entity_id)
]
async def get_stats(self) -> MemoryStats:
items = await self.list_all()
return _compute_stats(items)
async def list_all(self, domain: str = None) -> list[MemoryItem]:
items: list[MemoryItem] = []
search_dirs = (
[self._domain_dir(domain)] if domain
else [d for d in self.base_dir.iterdir() if d.is_dir()]
)
for dir_path in search_dirs:
for json_file in dir_path.glob("*.json"):
try:
data = json.loads(json_file.read_text(encoding="utf-8"))
if "created_at" in data and isinstance(data["created_at"], str):
data["created_at"] = datetime.fromisoformat(data["created_at"])
if "updated_at" in data and isinstance(data["updated_at"], str):
data["updated_at"] = datetime.fromisoformat(data["updated_at"])
items.append(MemoryItem(**data))
except Exception as exc:
logger.warning("فشل تحميل عنصر ذاكرة %s: %s", json_file.name, exc)
return items
async def _find_item(self, item_id: str) -> Optional[MemoryItem]:
for domain_dir in self.base_dir.iterdir():
if not domain_dir.is_dir():
continue
path = domain_dir / f"{item_id}.json"
if path.exists():
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data.get("created_at"), str):
data["created_at"] = datetime.fromisoformat(data["created_at"])
if isinstance(data.get("updated_at"), str):
data["updated_at"] = datetime.fromisoformat(data["updated_at"])
return MemoryItem(**data)
return None
async def _write_item(self, item: MemoryItem) -> None:
file_path = self._domain_dir(item.domain) / f"{item.id}.json"
data = item.model_dump(mode="json")
data["created_at"] = item.created_at.isoformat()
data["updated_at"] = item.updated_at.isoformat()
file_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _compute_stats(items: list[MemoryItem]) -> MemoryStats:
if not items:
return MemoryStats(message_ar="لا توجد عناصر في الذاكرة")
by_domain: dict[str, int] = defaultdict(int)
canonical = 0
total_conf = 0.0
oldest = items[0].created_at
newest = items[0].created_at
for item in items:
by_domain[item.domain] += 1
if item.is_canonical:
canonical += 1
total_conf += item.confidence
if item.created_at < oldest:
oldest = item.created_at
if item.created_at > newest:
newest = item.created_at
return MemoryStats(
total_items=len(items),
by_domain=dict(by_domain),
canonical_count=canonical,
derived_count=len(items) - canonical,
avg_confidence=round(total_conf / len(items), 4),
oldest_item=oldest,
newest_item=newest,
message_ar=f"إجمالي العناصر: {len(items)}، المعتمدة: {canonical}، المشتقة: {len(items) - canonical}",
)
# ---------------------------------------------------------------------------
# Memory Evaluator — مقيّم الذاكرة
# ---------------------------------------------------------------------------
class MemoryEvaluator:
"""
Evaluate memory quality before trusting it.
تقييم جودة الذاكرة قبل الوثوق بها.
"""
def __init__(self, adapter: MemoryAdapter):
self._adapter = adapter
async def benchmark_retrieval(
self,
test_queries: list[str],
expected_results: list[list[str]],
) -> EvalResult:
"""
Run retrieval benchmark against known query/result pairs.
تشغيل اختبار الاسترجاع مقابل أزواج استعلام/نتيجة معروفة.
"""
if len(test_queries) != len(expected_results):
raise ValueError("test_queries and expected_results must have same length")
total = len(test_queries)
correct = 0
total_recall = 0.0
total_rank = 0.0
for query, expected in zip(test_queries, expected_results):
results = await self._adapter.retrieve(query, limit=10)
result_contents = [r.content.lower().strip() for r in results]
expected_lower = [e.lower().strip() for e in expected]
found_any = False
best_rank = len(results) + 1
matched = 0
for exp in expected_lower:
for rank, res in enumerate(result_contents):
if exp in res or SequenceMatcher(None, exp, res).ratio() > 0.7:
found_any = True
matched += 1
best_rank = min(best_rank, rank + 1)
break
if found_any:
correct += 1
if expected_lower:
total_recall += matched / len(expected_lower)
total_rank += best_rank if found_any else len(results) + 1
precision = correct / total if total else 0.0
recall = total_recall / total if total else 0.0
avg_rank = total_rank / total if total else 0.0
return EvalResult(
total_queries=total,
correct_retrievals=correct,
precision=round(precision, 4),
recall=round(recall, 4),
avg_rank=round(avg_rank, 2),
message_ar=f"الدقة: {precision:.2%}، الاستدعاء: {recall:.2%}، متوسط الترتيب: {avg_rank:.1f}",
)
async def check_staleness(self, domain: str = None) -> list[MemoryItem]:
"""
Items not accessed in 30+ days.
العناصر التي لم يتم الوصول إليها منذ 30 يومًا أو أكثر.
"""
items = await self._adapter.list_all(domain)
cutoff = datetime.now(timezone.utc) - timedelta(days=STALENESS_DAYS)
return [item for item in items if item.updated_at < cutoff]
async def check_duplicates(self, domain: str = None) -> list[tuple[MemoryItem, MemoryItem]]:
"""
Find similar items that may be duplicates.
البحث عن عناصر متشابهة قد تكون مكررة.
"""
items = await self._adapter.list_all(domain)
duplicates: list[tuple[MemoryItem, MemoryItem]] = []
seen: set[str] = set()
for i, a in enumerate(items):
for b in items[i + 1:]:
pair_key = f"{a.id}:{b.id}"
if pair_key in seen:
continue
ratio = SequenceMatcher(None, a.content.lower(), b.content.lower()).ratio()
if ratio > 0.8:
duplicates.append((a, b))
seen.add(pair_key)
return duplicates
async def check_contradictions(self, domain: str = None) -> list[tuple[MemoryItem, MemoryItem]]:
"""
Find items in the same domain with conflicting content.
البحث عن عناصر في نفس النطاق بمحتوى متناقض.
"""
items = await self._adapter.list_all(domain)
contradictions: list[tuple[MemoryItem, MemoryItem]] = []
negation_markers = {"not", "no", "never", "cannot", "لا", "ليس", "لن", "لم", "غير"}
for i, a in enumerate(items):
a_words = set(a.content.lower().split())
for b in items[i + 1:]:
if a.domain != b.domain:
continue
b_words = set(b.content.lower().split())
shared = a_words & b_words
a_negations = a_words & negation_markers
b_negations = b_words & negation_markers
# If they share many words but differ in negation, flag as contradiction
if len(shared) > 3 and a_negations != b_negations:
contradictions.append((a, b))
return contradictions
async def get_health_report(self) -> dict[str, Any]:
"""
Overall memory health metrics.
مقاييس صحة الذاكرة العامة.
"""
stats = await self._adapter.get_stats()
stale = await self.check_staleness()
duplicates = await self.check_duplicates()
contradictions = await self.check_contradictions()
health_score = 1.0
if stats.total_items > 0:
stale_ratio = len(stale) / stats.total_items
dup_ratio = len(duplicates) / stats.total_items
contra_ratio = len(contradictions) / stats.total_items
health_score = max(0.0, 1.0 - stale_ratio * 0.3 - dup_ratio * 0.4 - contra_ratio * 0.5)
return {
"health_score": round(health_score, 4),
"total_items": stats.total_items,
"stale_items": len(stale),
"duplicate_pairs": len(duplicates),
"contradiction_pairs": len(contradictions),
"avg_confidence": stats.avg_confidence,
"by_domain": stats.by_domain,
"message_ar": (
f"درجة الصحة: {health_score:.2%}، "
f"عناصر قديمة: {len(stale)}، "
f"تكرارات: {len(duplicates)}، "
f"تناقضات: {len(contradictions)}"
),
}
# ---------------------------------------------------------------------------
# Factory — مصنع المحولات
# ---------------------------------------------------------------------------
def create_memory_adapter(backend: str = None) -> MemoryAdapter:
"""
Create the appropriate memory adapter based on config.
إنشاء محول الذاكرة المناسب بناءً على التكوين.
"""
backend = backend or os.getenv("MEMORY_BACKEND", "file")
if backend == "redis":
return RedisMemoryAdapter()
return FileMemoryAdapter()
# Global instances
memory_adapter = create_memory_adapter()
memory_evaluator = MemoryEvaluator(memory_adapter)

View File

@ -0,0 +1,447 @@
"""
Session Continuity Dealix AI Session State Management
Maintains context across AI agent sessions for seamless handoff.
Stores decisions, failures, wins, and follow-ups between sessions.
"""
import json
import logging
import uuid
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
SESSIONS_DIR = Path(__file__).resolve().parents[4] / "memory" / "_sessions"
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
# ---------------------------------------------------------------------------
# Models — نماذج البيانات
# ---------------------------------------------------------------------------
class Decision(BaseModel):
"""A recorded decision — قرار مسجّل"""
decision: str
context: str
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
decision_ar: str = ""
made_by: str = ""
class Failure(BaseModel):
"""A recorded failure — فشل مسجّل"""
description: str
context: str
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
description_ar: str = ""
resolution: str = ""
class Win(BaseModel):
"""A recorded win — نجاح مسجّل"""
description: str
context: str
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
description_ar: str = ""
class FollowUp(BaseModel):
"""A pending follow-up task — مهمة متابعة معلّقة"""
task: str
task_ar: str = ""
due_date: Optional[datetime] = None
completed: bool = False
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
assigned_to: str = ""
class SessionState(BaseModel):
"""Full session state — حالة الجلسة الكاملة"""
session_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
project: str = "dealix"
active_workstreams: list[str] = []
last_decisions: list[Decision] = []
open_questions: list[str] = []
recent_failures: list[Failure] = []
recent_wins: list[Win] = []
pending_followups: list[FollowUp] = []
context_summary: str = ""
context_summary_ar: str = ""
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
tags: list[str] = []
tenant_id: str = ""
class Config:
json_schema_extra = {
"example": {
"project": "dealix",
"active_workstreams": ["cpq-enhancement", "pdpl-audit"],
"context_summary": "Working on CPQ Arabic PDF generation and PDPL consent expiry.",
"context_summary_ar": "العمل على توليد PDF عربي للتسعير وانتهاء موافقة حماية البيانات.",
}
}
# ---------------------------------------------------------------------------
# Session Continuity Service — خدمة استمرارية الجلسة
# ---------------------------------------------------------------------------
class SessionContinuity:
"""
Maintain context across AI sessions.
الحفاظ على السياق عبر جلسات الذكاء الاصطناعي.
"""
def __init__(self, sessions_dir: Path = None):
self.sessions_dir = sessions_dir or SESSIONS_DIR
self.sessions_dir.mkdir(parents=True, exist_ok=True)
self._current: Optional[SessionState] = None
def _session_path(self, session_id: str) -> Path:
return self.sessions_dir / f"{session_id}.json"
def _serialize_state(self, state: SessionState) -> str:
data = state.model_dump(mode="json")
# Convert datetime objects to ISO strings for JSON
for key in ("created_at", "updated_at"):
if isinstance(data.get(key), datetime):
data[key] = data[key].isoformat()
for decision in data.get("last_decisions", []):
if isinstance(decision.get("timestamp"), datetime):
decision["timestamp"] = decision["timestamp"].isoformat()
for failure in data.get("recent_failures", []):
if isinstance(failure.get("timestamp"), datetime):
failure["timestamp"] = failure["timestamp"].isoformat()
for win in data.get("recent_wins", []):
if isinstance(win.get("timestamp"), datetime):
win["timestamp"] = win["timestamp"].isoformat()
for followup in data.get("pending_followups", []):
if isinstance(followup.get("due_date"), datetime):
followup["due_date"] = followup["due_date"].isoformat()
if isinstance(followup.get("created_at"), datetime):
followup["created_at"] = followup["created_at"].isoformat()
return json.dumps(data, ensure_ascii=False, indent=2)
def _deserialize_state(self, raw: str) -> SessionState:
data = json.loads(raw)
for key in ("created_at", "updated_at"):
if isinstance(data.get(key), str):
data[key] = datetime.fromisoformat(data[key])
for decision in data.get("last_decisions", []):
if isinstance(decision.get("timestamp"), str):
decision["timestamp"] = datetime.fromisoformat(decision["timestamp"])
for failure in data.get("recent_failures", []):
if isinstance(failure.get("timestamp"), str):
failure["timestamp"] = datetime.fromisoformat(failure["timestamp"])
for win in data.get("recent_wins", []):
if isinstance(win.get("timestamp"), str):
win["timestamp"] = datetime.fromisoformat(win["timestamp"])
for followup in data.get("pending_followups", []):
if isinstance(followup.get("due_date"), str) and followup["due_date"]:
followup["due_date"] = datetime.fromisoformat(followup["due_date"])
if isinstance(followup.get("created_at"), str):
followup["created_at"] = datetime.fromisoformat(followup["created_at"])
return SessionState(**data)
async def save_state(self, state: SessionState) -> str:
"""
Persist session state to disk.
حفظ حالة الجلسة على القرص.
"""
state.updated_at = datetime.now(timezone.utc)
path = self._session_path(state.session_id)
path.write_text(self._serialize_state(state), encoding="utf-8")
self._current = state
logger.info("تم حفظ حالة الجلسة: %s", state.session_id)
return state.session_id
async def restore_state(self, session_id: str = None) -> SessionState:
"""
Restore session state. If no session_id, restore the latest.
استعادة حالة الجلسة. إذا لم يُحدد معرف، يتم استعادة الأحدث.
"""
if session_id:
path = self._session_path(session_id)
if path.exists():
state = self._deserialize_state(path.read_text(encoding="utf-8"))
self._current = state
logger.info("تم استعادة الجلسة: %s", session_id)
return state
logger.warning("الجلسة غير موجودة: %s", session_id)
return SessionState(session_id=session_id)
# Find the latest session
latest_path: Optional[Path] = None
latest_mtime = 0.0
for f in self.sessions_dir.glob("*.json"):
mtime = f.stat().st_mtime
if mtime > latest_mtime:
latest_mtime = mtime
latest_path = f
if latest_path:
state = self._deserialize_state(latest_path.read_text(encoding="utf-8"))
self._current = state
logger.info("تم استعادة أحدث جلسة: %s", state.session_id)
return state
logger.info("لا توجد جلسات سابقة، إنشاء جلسة جديدة")
new_state = SessionState()
await self.save_state(new_state)
return new_state
async def get_restore_prompt(self) -> str:
"""
Generate a text prompt summarizing current state for a new AI session.
توليد نص ملخّص للحالة الحالية لتغذية جلسة ذكاء اصطناعي جديدة.
"""
state = self._current
if not state:
state = await self.restore_state()
lines = [
"# Session Restore — استعادة الجلسة",
f"**Project**: {state.project}",
f"**Session**: {state.session_id}",
f"**Last Updated**: {state.updated_at.isoformat()}",
"",
]
if state.context_summary:
lines.append(f"## Context (السياق)")
lines.append(state.context_summary)
if state.context_summary_ar:
lines.append(state.context_summary_ar)
lines.append("")
if state.active_workstreams:
lines.append("## Active Workstreams (مسارات العمل النشطة)")
for ws in state.active_workstreams:
lines.append(f"- {ws}")
lines.append("")
if state.last_decisions:
lines.append("## Recent Decisions (القرارات الأخيرة)")
for d in state.last_decisions[-5:]:
ts = d.timestamp.strftime("%Y-%m-%d %H:%M")
lines.append(f"- [{ts}] {d.decision}")
if d.decision_ar:
lines.append(f" {d.decision_ar}")
lines.append("")
if state.open_questions:
lines.append("## Open Questions (أسئلة مفتوحة)")
for q in state.open_questions:
lines.append(f"- {q}")
lines.append("")
if state.recent_failures:
lines.append("## Recent Failures (الإخفاقات الأخيرة)")
for f in state.recent_failures[-3:]:
lines.append(f"- {f.description}")
if f.resolution:
lines.append(f" Resolution: {f.resolution}")
lines.append("")
if state.recent_wins:
lines.append("## Recent Wins (النجاحات الأخيرة)")
for w in state.recent_wins[-3:]:
lines.append(f"- {w.description}")
lines.append("")
pending = [fu for fu in state.pending_followups if not fu.completed]
if pending:
lines.append("## Pending Follow-ups (متابعات معلّقة)")
for fu in pending:
due = f" (due: {fu.due_date.strftime('%Y-%m-%d')})" if fu.due_date else ""
lines.append(f"- {fu.task}{due}")
lines.append("")
lines.append("---")
lines.append("Continue from this state. Prioritize pending follow-ups and open questions.")
lines.append("استمر من هذه الحالة. أعطِ الأولوية للمتابعات المعلّقة والأسئلة المفتوحة.")
return "\n".join(lines)
async def add_decision(self, decision: str, context: str, decision_ar: str = "", made_by: str = "") -> None:
"""
Record a decision in the current session.
تسجيل قرار في الجلسة الحالية.
"""
if not self._current:
self._current = await self.restore_state()
self._current.last_decisions.append(Decision(
decision=decision,
context=context,
decision_ar=decision_ar,
made_by=made_by,
))
# Keep last 20 decisions
if len(self._current.last_decisions) > 20:
self._current.last_decisions = self._current.last_decisions[-20:]
await self.save_state(self._current)
logger.info("تم تسجيل قرار: %s", decision[:80])
async def add_failure(self, description: str, context: str, description_ar: str = "", resolution: str = "") -> None:
"""
Record a failure in the current session.
تسجيل فشل في الجلسة الحالية.
"""
if not self._current:
self._current = await self.restore_state()
self._current.recent_failures.append(Failure(
description=description,
context=context,
description_ar=description_ar,
resolution=resolution,
))
if len(self._current.recent_failures) > 10:
self._current.recent_failures = self._current.recent_failures[-10:]
await self.save_state(self._current)
logger.info("تم تسجيل فشل: %s", description[:80])
async def add_win(self, description: str, context: str, description_ar: str = "") -> None:
"""
Record a win in the current session.
تسجيل نجاح في الجلسة الحالية.
"""
if not self._current:
self._current = await self.restore_state()
self._current.recent_wins.append(Win(
description=description,
context=context,
description_ar=description_ar,
))
if len(self._current.recent_wins) > 10:
self._current.recent_wins = self._current.recent_wins[-10:]
await self.save_state(self._current)
logger.info("تم تسجيل نجاح: %s", description[:80])
async def add_followup(self, task: str, due_date: datetime = None, task_ar: str = "", assigned_to: str = "") -> None:
"""
Add a pending follow-up task.
إضافة مهمة متابعة معلّقة.
"""
if not self._current:
self._current = await self.restore_state()
self._current.pending_followups.append(FollowUp(
task=task,
task_ar=task_ar,
due_date=due_date,
assigned_to=assigned_to,
))
await self.save_state(self._current)
logger.info("تم إضافة متابعة: %s", task[:80])
async def complete_followup(self, task_substring: str) -> bool:
"""
Mark a follow-up as completed by matching task text.
تعليم متابعة كمكتملة عن طريق مطابقة نص المهمة.
"""
if not self._current:
self._current = await self.restore_state()
task_lower = task_substring.lower()
for fu in self._current.pending_followups:
if task_lower in fu.task.lower() and not fu.completed:
fu.completed = True
await self.save_state(self._current)
logger.info("تم إكمال متابعة: %s", fu.task[:80])
return True
return False
async def set_workstreams(self, workstreams: list[str]) -> None:
"""
Update active workstreams.
تحديث مسارات العمل النشطة.
"""
if not self._current:
self._current = await self.restore_state()
self._current.active_workstreams = workstreams
await self.save_state(self._current)
async def set_context(self, summary: str, summary_ar: str = "") -> None:
"""
Update the context summary.
تحديث ملخص السياق.
"""
if not self._current:
self._current = await self.restore_state()
self._current.context_summary = summary
self._current.context_summary_ar = summary_ar
await self.save_state(self._current)
async def add_question(self, question: str) -> None:
"""
Add an open question.
إضافة سؤال مفتوح.
"""
if not self._current:
self._current = await self.restore_state()
if question not in self._current.open_questions:
self._current.open_questions.append(question)
if len(self._current.open_questions) > 15:
self._current.open_questions = self._current.open_questions[-15:]
await self.save_state(self._current)
async def cleanup_old_sessions(self, days: int = 30) -> int:
"""
Remove session files older than N days.
حذف ملفات الجلسات الأقدم من N يوم.
"""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
removed = 0
for f in self.sessions_dir.glob("*.json"):
try:
data = json.loads(f.read_text(encoding="utf-8"))
updated = data.get("updated_at", "")
if isinstance(updated, str) and updated:
ts = datetime.fromisoformat(updated)
if ts < cutoff:
f.unlink()
removed += 1
except Exception as exc:
logger.warning("فشل معالجة ملف الجلسة %s: %s", f.name, exc)
logger.info("تم حذف %d جلسة قديمة (أقدم من %d يوم)", removed, days)
return removed
async def list_sessions(self, limit: int = 20) -> list[dict[str, Any]]:
"""
List recent sessions with basic info.
عرض الجلسات الأخيرة مع معلومات أساسية.
"""
sessions: list[dict[str, Any]] = []
for f in sorted(self.sessions_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True):
if len(sessions) >= limit:
break
try:
data = json.loads(f.read_text(encoding="utf-8"))
sessions.append({
"session_id": data.get("session_id", f.stem),
"project": data.get("project", ""),
"updated_at": data.get("updated_at", ""),
"workstreams": data.get("active_workstreams", []),
"decisions_count": len(data.get("last_decisions", [])),
"followups_pending": sum(
1 for fu in data.get("pending_followups", [])
if not fu.get("completed", False)
),
})
except Exception:
continue
return sessions
# ---------------------------------------------------------------------------
# Global singleton
# ---------------------------------------------------------------------------
session_continuity = SessionContinuity()

View File

@ -0,0 +1,417 @@
"""
Tool Receipts Dealix ToolProof Enhancement
Signed execution receipts, pre-execution policy, and trust analytics.
Extends tool_verification.py with cryptographic receipts and policy enforcement.
"""
import hashlib
import logging
import uuid
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Any, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------
class PolicyDecisionType(str, Enum):
ALLOW = "allow"
BLOCK = "block"
HOLD = "hold"
class VerificationVerdict(str, Enum):
VERIFIED = "verified"
PARTIALLY_VERIFIED = "partially_verified"
UNVERIFIED = "unverified"
CONTRADICTED = "contradicted"
BLOCKED = "blocked"
# ---------------------------------------------------------------------------
# Models — نماذج البيانات
# ---------------------------------------------------------------------------
class PolicyDecision(BaseModel):
"""Pre-execution policy decision — قرار السياسة قبل التنفيذ"""
decision: PolicyDecisionType
reason: str
reason_ar: str
tool_name: str
requires_approval_from: Optional[str] = None
pdpl_consent_required: bool = False
budget_remaining: Optional[float] = None
class ToolReceipt(BaseModel):
"""Signed execution receipt — إيصال تنفيذ موقّع"""
receipt_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
run_id: str = ""
session_id: str = ""
agent_id: str = ""
tool_name: str
parameters: dict[str, Any] = {}
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
execution_result: str = ""
normalized_result: str = ""
hash_signature: str = ""
policy_decision: PolicyDecisionType = PolicyDecisionType.ALLOW
side_effects: list[str] = []
verification_verdict: VerificationVerdict = VerificationVerdict.UNVERIFIED
cost_estimate: float = 0.0
tenant_id: str = ""
def compute_hash(self) -> str:
"""Generate SHA-256 hash of (tool_name + params + result + timestamp)."""
payload = (
f"{self.tool_name}|"
f"{_stable_dict_str(self.parameters)}|"
f"{self.execution_result}|"
f"{self.timestamp.isoformat()}"
)
self.hash_signature = hashlib.sha256(payload.encode("utf-8")).hexdigest()
return self.hash_signature
def normalize_result(self) -> str:
"""Normalize execution result for comparison."""
raw = self.execution_result.lower().strip()
for noise in ["ok", "success", "done", "completed", "تم", "نجح"]:
raw = raw.replace(noise, "SUCCESS")
for err in ["error", "fail", "exception", "خطأ", "فشل"]:
raw = raw.replace(err, "ERROR")
self.normalized_result = raw
return raw
def _stable_dict_str(d: dict) -> str:
"""Deterministic string representation of a dict for hashing."""
return "|".join(f"{k}={v}" for k, v in sorted(d.items()))
# ---------------------------------------------------------------------------
# Pre-Execution Policy — سياسة ما قبل التنفيذ
# ---------------------------------------------------------------------------
class PreExecutionPolicy:
"""
Decide allow/block/hold before tool execution.
تحديد السماح/الحظر/التعليق قبل تنفيذ الأداة.
"""
SAFE_TOOLS: set[str] = {
"read_file", "search", "query_db_readonly", "get_status",
"list_leads", "get_deal", "get_pipeline", "view_analytics",
"check_consent", "get_sequence_status",
}
HOLD_TOOLS: set[str] = {
"send_message", "send_whatsapp", "send_email", "send_sms",
"update_deal", "assign_lead", "create_proposal",
"change_stage", "update_score", "create_sequence",
"schedule_meeting", "update_territory",
}
BLOCK_TOOLS: set[str] = {
"delete_tenant", "drop_table", "bulk_delete", "export_all_data",
"reset_database", "delete_all_leads", "purge_audit_log",
"disable_pdpl", "bypass_consent", "modify_permissions_bulk",
}
MESSAGING_TOOLS: set[str] = {
"send_message", "send_whatsapp", "send_email", "send_sms",
}
ROLE_PERMISSIONS: dict[str, set[str]] = {
"owner": SAFE_TOOLS | HOLD_TOOLS,
"admin": SAFE_TOOLS | HOLD_TOOLS,
"manager": SAFE_TOOLS | {"update_deal", "assign_lead", "create_proposal", "change_stage"},
"sales_rep": SAFE_TOOLS | {"send_message", "send_whatsapp", "send_email", "update_deal"},
"viewer": SAFE_TOOLS,
}
DEFAULT_BUDGET_LIMIT: float = 100.0 # SAR per session
def __init__(self):
self._session_costs: dict[str, float] = defaultdict(float)
def evaluate(
self,
tool_name: str,
params: dict[str, Any],
user_context: dict[str, Any],
) -> PolicyDecision:
"""
Check tool against policy classes, role, PDPL, and budget.
فحص الأداة مقابل فئات السياسة والدور والموافقة والميزانية.
"""
user_role = user_context.get("role", "viewer")
session_id = user_context.get("session_id", "unknown")
has_consent = user_context.get("has_consent", False)
budget_limit = user_context.get("budget_limit", self.DEFAULT_BUDGET_LIMIT)
# Class C — absolute block
if tool_name in self.BLOCK_TOOLS:
logger.warning(
"محظور: أداة %s محظورة بالكامل (المستخدم: %s)",
tool_name, user_context.get("user_id", "unknown"),
)
return PolicyDecision(
decision=PolicyDecisionType.BLOCK,
reason=f"Tool '{tool_name}' is in the BLOCK list. This action is forbidden.",
reason_ar=f"الأداة '{tool_name}' محظورة. هذا الإجراء ممنوع.",
tool_name=tool_name,
)
# Role check
allowed_tools = self.ROLE_PERMISSIONS.get(user_role, self.SAFE_TOOLS)
if tool_name not in allowed_tools and tool_name not in self.SAFE_TOOLS:
return PolicyDecision(
decision=PolicyDecisionType.BLOCK,
reason=f"Role '{user_role}' lacks permission for tool '{tool_name}'.",
reason_ar=f"الدور '{user_role}' لا يملك صلاحية استخدام الأداة '{tool_name}'.",
tool_name=tool_name,
)
# PDPL consent check for messaging
if tool_name in self.MESSAGING_TOOLS and not has_consent:
return PolicyDecision(
decision=PolicyDecisionType.BLOCK,
reason="PDPL consent required before sending messages.",
reason_ar="مطلوب موافقة نظام حماية البيانات قبل إرسال الرسائل.",
tool_name=tool_name,
pdpl_consent_required=True,
)
# Budget check
estimated_cost = self._estimate_cost(tool_name, params)
current_spent = self._session_costs[session_id]
if current_spent + estimated_cost > budget_limit:
return PolicyDecision(
decision=PolicyDecisionType.HOLD,
reason=f"Budget limit would be exceeded. Spent: {current_spent:.2f}, "
f"estimated: {estimated_cost:.2f}, limit: {budget_limit:.2f} SAR.",
reason_ar=f"سيتم تجاوز حد الميزانية. المصروف: {current_spent:.2f}، "
f"التقدير: {estimated_cost:.2f}، الحد: {budget_limit:.2f} ريال.",
tool_name=tool_name,
budget_remaining=budget_limit - current_spent,
)
# Hold tools need approval
if tool_name in self.HOLD_TOOLS:
approver = "manager" if user_role == "sales_rep" else "admin"
return PolicyDecision(
decision=PolicyDecisionType.HOLD,
reason=f"Tool '{tool_name}' requires approval before execution.",
reason_ar=f"الأداة '{tool_name}' تتطلب موافقة قبل التنفيذ.",
tool_name=tool_name,
requires_approval_from=approver,
)
# Safe tools — allow
return PolicyDecision(
decision=PolicyDecisionType.ALLOW,
reason=f"Tool '{tool_name}' is safe for execution.",
reason_ar=f"الأداة '{tool_name}' آمنة للتنفيذ.",
tool_name=tool_name,
)
def record_cost(self, session_id: str, cost: float) -> None:
"""Record actual cost for budget tracking."""
self._session_costs[session_id] += cost
def _estimate_cost(self, tool_name: str, params: dict) -> float:
"""Estimate cost in SAR for a tool call."""
cost_map = {
"send_whatsapp": 0.15,
"send_sms": 0.08,
"send_email": 0.02,
"send_message": 0.10,
"create_proposal": 0.50,
"query_db_readonly": 0.001,
"search": 0.001,
}
base = cost_map.get(tool_name, 0.01)
# Bulk operations cost more
if "count" in params or "bulk" in tool_name:
base *= params.get("count", 1)
return base
# ---------------------------------------------------------------------------
# Receipt Store — مخزن الإيصالات
# ---------------------------------------------------------------------------
class ReceiptStore:
"""In-memory receipt storage with query capabilities."""
def __init__(self, max_size: int = 50000):
self._receipts: list[ToolReceipt] = []
self._max_size = max_size
def store(self, receipt: ToolReceipt) -> str:
"""Store a receipt and return its ID."""
receipt.compute_hash()
receipt.normalize_result()
self._receipts.append(receipt)
if len(self._receipts) > self._max_size:
self._receipts = self._receipts[-self._max_size:]
logger.info(
"إيصال محفوظ: %s أداة=%s حكم=%s",
receipt.receipt_id, receipt.tool_name, receipt.verification_verdict.value,
)
return receipt.receipt_id
def get(self, receipt_id: str) -> Optional[ToolReceipt]:
for r in self._receipts:
if r.receipt_id == receipt_id:
return r
return None
def query(
self,
agent_id: str = None,
tool_name: str = None,
verdict: VerificationVerdict = None,
since: datetime = None,
limit: int = 100,
) -> list[ToolReceipt]:
results = self._receipts
if agent_id:
results = [r for r in results if r.agent_id == agent_id]
if tool_name:
results = [r for r in results if r.tool_name == tool_name]
if verdict:
results = [r for r in results if r.verification_verdict == verdict]
if since:
results = [r for r in results if r.timestamp >= since]
return results[-limit:]
# ---------------------------------------------------------------------------
# Trust Analytics — تحليلات الثقة
# ---------------------------------------------------------------------------
class TrustAnalytics:
"""
Track trust metrics across agent workflows.
تتبع مقاييس الثقة عبر سير عمل الوكلاء.
"""
def __init__(self, store: ReceiptStore):
self._store = store
def get_trust_score(self, agent_id: str) -> float:
"""
Trust score 0-1 for an agent based on verification history.
درجة الثقة 0-1 للوكيل بناءً على سجل التحقق.
"""
receipts = self._store.query(agent_id=agent_id, limit=500)
if not receipts:
return 0.5 # Neutral for unknown agents
weights = {
VerificationVerdict.VERIFIED: 1.0,
VerificationVerdict.PARTIALLY_VERIFIED: 0.6,
VerificationVerdict.UNVERIFIED: 0.3,
VerificationVerdict.CONTRADICTED: 0.0,
VerificationVerdict.BLOCKED: 0.2,
}
total_weight = sum(weights.get(r.verification_verdict, 0.3) for r in receipts)
return round(total_weight / len(receipts), 4)
def get_contradiction_rate(self, agent_id: str) -> float:
"""
Contradiction rate for an agent.
معدل التناقض للوكيل.
"""
receipts = self._store.query(agent_id=agent_id, limit=500)
if not receipts:
return 0.0
contradictions = sum(
1 for r in receipts
if r.verification_verdict == VerificationVerdict.CONTRADICTED
)
return round(contradictions / len(receipts), 4)
def get_cost_by_agent(self, period_days: int = 30) -> dict[str, float]:
"""
Total cost per agent in period.
إجمالي التكلفة لكل وكيل خلال الفترة.
"""
since = datetime.now(timezone.utc) - timedelta(days=period_days)
receipts = self._store.query(since=since, limit=50000)
costs: dict[str, float] = defaultdict(float)
for r in receipts:
costs[r.agent_id] += r.cost_estimate
return {k: round(v, 4) for k, v in costs.items()}
def get_blocked_attempts(self, period_days: int = 30) -> list[ToolReceipt]:
"""
All blocked tool attempts in period.
جميع محاولات الأدوات المحظورة خلال الفترة.
"""
since = datetime.now(timezone.utc) - timedelta(days=period_days)
return self._store.query(
verdict=VerificationVerdict.BLOCKED, since=since, limit=1000
)
def get_hallucination_suspects(self) -> list[ToolReceipt]:
"""
Claims without matching receipts possible hallucinations.
ادعاءات بدون إيصالات مطابقة هلوسات محتملة.
"""
all_receipts = self._store.query(limit=5000)
suspects = []
for r in all_receipts:
if r.verification_verdict == VerificationVerdict.CONTRADICTED:
suspects.append(r)
elif (
r.verification_verdict == VerificationVerdict.UNVERIFIED
and r.execution_result == ""
and r.tool_name not in PreExecutionPolicy.SAFE_TOOLS
):
suspects.append(r)
return suspects
def get_summary(self, agent_id: str = None) -> dict[str, Any]:
"""
Overall trust summary.
ملخص الثقة العام.
"""
receipts = self._store.query(agent_id=agent_id, limit=10000)
total = len(receipts)
if total == 0:
return {"total": 0, "trust_score": 0.5, "message_ar": "لا توجد بيانات"}
by_verdict: dict[str, int] = defaultdict(int)
total_cost = 0.0
for r in receipts:
by_verdict[r.verification_verdict.value] += 1
total_cost += r.cost_estimate
trust = self.get_trust_score(agent_id) if agent_id else 0.5
return {
"total": total,
"by_verdict": dict(by_verdict),
"trust_score": trust,
"total_cost_sar": round(total_cost, 2),
"contradiction_rate": round(
by_verdict.get("contradicted", 0) / total * 100, 2
),
"message_ar": f"إجمالي العمليات: {total}، درجة الثقة: {trust:.2f}",
}
# ---------------------------------------------------------------------------
# Global singletons
# ---------------------------------------------------------------------------
pre_execution_policy = PreExecutionPolicy()
receipt_store = ReceiptStore()
trust_analytics = TrustAnalytics(receipt_store)

View File

@ -0,0 +1,89 @@
# Dealix Master Index (الفهرس الرئيسي لديلكس)
**Type**: operations
**Summary**: Central index linking to all wiki pages, memory sections, and knowledge resources.
**Summary_AR**: الفهرس المركزي الذي يربط جميع صفحات الويكي وأقسام الذاكرة وموارد المعرفة.
**Last Updated**: 2026-04-11
---
## Architecture (البنية التحتية)
| Page | Location | Status |
|------|----------|--------|
| System Architecture | [wiki/architecture.md](../wiki/architecture.md) | Active |
| System Overview (Diagram) | [architecture/system-overview.md](../architecture/system-overview.md) | Active |
| ADR-001: Multi-Tenant | [adr/001-multi-tenant.md](../adr/001-multi-tenant.md) | Active |
| ADR-002: WhatsApp-First | [adr/002-whatsapp-first.md](../adr/002-whatsapp-first.md) | Active |
| Library Decisions | [patterns/library-decisions.md](../patterns/library-decisions.md) | Active |
## Product (المنتج)
| Page | Location | Status |
|------|----------|--------|
| Glossary | [wiki/glossary.md](../wiki/glossary.md) | Active |
| Wiki Guide | [wiki/README.md](../wiki/README.md) | Active |
| Dealix Vision | [DEALIX_VISION.md](../../DEALIX_VISION.md) | Active |
| Master Blueprint | [MASTER-BLUEPRINT.mdc](../../MASTER-BLUEPRINT.mdc) | Active |
## GTM / Growth (النمو والتسويق)
| Page | Location | Status |
|------|----------|--------|
| Launch Plan | [growth/launch-plan.md](../growth/launch-plan.md) | Active |
| Content Map | [growth/content-map.md](../growth/content-map.md) | Active |
| Niche Brief | [growth/niche-brief.md](../growth/niche-brief.md) | Active |
| Outreach Map | [growth/outreach-map.md](../growth/outreach-map.md) | Active |
## Customer (العملاء)
| Page | Location | Status |
|------|----------|--------|
| ICP Brief | [customers/icp-brief.md](../customers/icp-brief.md) | Active |
| Interview Template | [customers/interview-template.md](../customers/interview-template.md) | Active |
## Operations (العمليات)
| Page | Location | Status |
|------|----------|--------|
| Deployment Checklist | [runbooks/deployment-checklist.md](../runbooks/deployment-checklist.md) | Active |
| Launch Checklist | [runbooks/launch-checklist.md](../runbooks/launch-checklist.md) | Active |
| SaaS Readiness Audit | [runbooks/saas-readiness-audit.md](../runbooks/saas-readiness-audit.md) | Active |
| Release Notes | [releases/](../releases/) | Directory |
## Security (الأمان والامتثال)
| Page | Location | Status |
|------|----------|--------|
| PDPL Checklist | [security/pdpl-checklist.md](../security/pdpl-checklist.md) | Active |
| Security Policy | [SECURITY.md](../../SECURITY.md) | Active |
| Contributing Guide | [CONTRIBUTING.md](../../CONTRIBUTING.md) | Active |
## Tooling / Providers (الأدوات والمزودون)
| Page | Location | Status |
|------|----------|--------|
| Provider Routing Strategy | [providers/routing-strategy.md](../providers/routing-strategy.md) | Active |
| Environment Config | [.env.example](../../.env.example) | Active |
| Docker Compose | [docker-compose.yml](../../docker-compose.yml) | Active |
## Agent Configuration (تكوين الوكلاء)
| Page | Location | Status |
|------|----------|--------|
| AGENTS.md | [AGENTS.md](../../AGENTS.md) | Active |
| CLAUDE.md | [CLAUDE.md](../../CLAUDE.md) | Active |
| Claude Commands | [.claude/commands/](../../.claude/commands/) | Directory |
| Claude Hooks | [.claude/hooks/](../../.claude/hooks/) | Directory |
---
## Index Health
- **Total pages**: 23
- **Active**: 23
- **Stale**: 0
- **Orphan (no inbound links)**: 0
- **Last full audit**: 2026-04-11
> Run `KnowledgeBrain.lint()` weekly to verify index health. See [wiki/README.md](../wiki/README.md) for review schedule.

View File

@ -0,0 +1,102 @@
# Dealix Glossary (مسرد مصطلحات ديلكس)
**Type**: glossary
**Summary**: Bilingual (Arabic/English) glossary of all key terms used across the Dealix platform.
**Summary_AR**: مسرد ثنائي اللغة (عربي/إنجليزي) لجميع المصطلحات الرئيسية المستخدمة في منصة ديلكس.
**Key Facts**:
- 30+ terms covering CRM, AI, compliance, and platform concepts
- Each term has English name, Arabic name, and definition
- Terms are grouped by domain for easy navigation
- Used as reference by AI agents and human team members
**Provenance**: AGENTS.md, CLAUDE.md, backend source code, PDPL documentation
**Confidence**: high
**Related Pages**: [architecture](./architecture.md), [PDPL checklist](../security/pdpl-checklist.md)
**Last Updated**: 2026-04-11
**Stale**: false
---
## CRM Core (إدارة علاقات العملاء)
| # | English | العربية | Definition |
|---|---------|---------|------------|
| 1 | **Lead** | عميل محتمل | A potential customer who has shown interest or been identified through outreach. Scored 0-100 in Dealix. |
| 2 | **Deal** | صفقة | A sales opportunity tied to a lead, tracked through pipeline stages with value in SAR. |
| 3 | **Pipeline** | مسار المبيعات | The sequence of stages a deal progresses through from qualification to close. |
| 4 | **Stage** | مرحلة | A discrete step in the pipeline (e.g., Qualification, Proposal, Negotiation, Closed Won). |
| 5 | **Contact** | جهة اتصال | An individual person associated with a lead or company. |
| 6 | **Company** | شركة | An organization entity linked to one or more contacts and deals. |
| 7 | **Account** | حساب | A company record with enrichment data (industry, size, revenue). |
| 8 | **Opportunity** | فرصة | Synonym for Deal in some contexts; a qualified sales prospect. |
| 9 | **Conversion** | تحويل | The act of a lead becoming a qualified deal or a deal reaching Closed Won. |
| 10 | **Win Rate** | معدل الفوز | Percentage of deals that reach Closed Won vs total deals in a period. |
## Sales Operations (عمليات المبيعات)
| # | English | العربية | Definition |
|---|---------|---------|------------|
| 11 | **Sequence** | تسلسل | An automated multi-step outreach cadence (e.g., WhatsApp → Email → Call → Follow-up). |
| 12 | **Cadence** | إيقاع | The timing pattern between sequence steps (e.g., Day 0, Day 2, Day 5). |
| 13 | **CPQ** | تسعير (التهيئة والتسعير والعرض) | Configure, Price, Quote — system for generating accurate proposals with SAR pricing. |
| 14 | **Proposal** | عرض سعر | A formal document sent to a prospect detailing scope, pricing, and terms. |
| 15 | **Territory** | منطقة | A geographic or industry-based area assigned to a sales rep for lead ownership. |
| 16 | **Assignment** | تعيين | The act of routing a lead or deal to a specific sales rep or team. |
| 17 | **SLA** | اتفاقية مستوى الخدمة | Service Level Agreement — response time commitments for lead follow-up. |
| 18 | **Escalation** | تصعيد | Routing a stalled or high-priority item to a manager or specialist. |
## AI & Intelligence (الذكاء الاصطناعي)
| # | English | العربية | Definition |
|---|---------|---------|------------|
| 19 | **Lead Score** | درجة العميل المحتمل | A 0-100 composite score indicating lead quality based on behavior, fit, and engagement. |
| 20 | **Intent Detection** | كشف النية | AI analysis of message text to determine customer intent (buy, inquire, complain, etc.). |
| 21 | **Sentiment Analysis** | تحليل المشاعر | AI classification of message tone as positive, negative, or neutral. |
| 22 | **Entity Extraction** | استخلاص الكيانات | AI identification of named entities (people, companies, amounts, dates) from Arabic text. |
| 23 | **Conversation Intelligence** | ذكاء المحادثات | AI analysis of sales conversations for coaching insights and deal signals. |
| 24 | **Model Router** | موجه النماذج | Service that selects the optimal LLM provider and model for each AI task. |
| 25 | **Agent** | وكيل ذكي | An autonomous AI worker with a specialized role (e.g., Lead Qualifier, Deal Advisor). |
| 26 | **Orchestrator** | منسق الوكلاء | The system that routes events to appropriate agents and manages their execution. |
## Platform & Infrastructure (المنصة والبنية التحتية)
| # | English | العربية | Definition |
|---|---------|---------|------------|
| 27 | **Tenant** | مستأجر | An isolated customer organization on the platform. All data is scoped by `tenant_id`. |
| 28 | **Multi-Tenant** | متعدد المستأجرين | Architecture where multiple customers share infrastructure but data is isolated. |
| 29 | **Feature Flag** | مفتاح الميزة | A toggle that controls feature availability per tenant or globally. |
| 30 | **Webhook** | خطاف ويب | An HTTP callback that notifies external systems of events (e.g., deal closed, lead scored). |
| 31 | **Worker** | عامل خلفي | A Celery process that handles async tasks (scoring, delivery, analytics). |
| 32 | **Migration** | ترحيل قاعدة البيانات | An Alembic script that modifies the database schema. |
## Compliance & Security (الامتثال والأمان)
| # | English | العربية | Definition |
|---|---------|---------|------------|
| 33 | **PDPL** | نظام حماية البيانات الشخصية | Saudi Personal Data Protection Law — governs collection, processing, and storage of personal data. |
| 34 | **Consent** | موافقة | Explicit permission from a data subject to process their data for a stated purpose. |
| 35 | **Data Subject** | صاحب البيانات | The individual whose personal data is being processed. |
| 36 | **Data Subject Rights** | حقوق صاحب البيانات | Rights to access, correct, and delete personal data under PDPL. |
| 37 | **Audit Trail** | سجل المراجعة | Immutable log of all consent changes and data access events. |
| 38 | **Security Gate** | بوابة الأمان | Pre-execution check that blocks risky actions (cross-tenant access, ungoverned messaging). |
| 39 | **ZATCA** | هيئة الزكاة والضريبة والجمارك | Saudi tax authority — Dealix integrates with ZATCA for e-invoicing compliance. |
## Communication (التواصل)
| # | English | العربية | Definition |
|---|---------|---------|------------|
| 40 | **WhatsApp Business** | واتساب للأعمال | Primary communication channel in Saudi Arabia (85% penetration). |
| 41 | **Inbox** | صندوق الوارد | Unified view of all conversations across WhatsApp, Email, and SMS. |
| 42 | **Template Message** | رسالة نموذجية | Pre-approved WhatsApp message template for outbound marketing/transactional use. |
| 43 | **Outbound Governance** | حوكمة الرسائل الصادرة | Rules controlling message volume, timing, and consent requirements. |
## Business Metrics (مؤشرات الأعمال)
| # | English | العربية | Definition |
|---|---------|---------|------------|
| 44 | **MRR** | الإيرادات الشهرية المتكررة | Monthly Recurring Revenue — total subscription revenue per month. |
| 45 | **ARR** | الإيرادات السنوية المتكررة | Annual Recurring Revenue — MRR multiplied by 12. |
| 46 | **Churn Rate** | معدل الانسحاب | Percentage of customers who cancel their subscription in a period. |
| 47 | **LTV** | القيمة الدائمة للعميل | Lifetime Value — total revenue expected from a customer over their relationship. |
| 48 | **CAC** | تكلفة اكتساب العميل | Customer Acquisition Cost — total spend to acquire one new customer. |
| 49 | **NPS** | صافي نقاط الترويج | Net Promoter Score — measure of customer satisfaction and loyalty. |
| 50 | **ICP** | ملف العميل المثالي | Ideal Customer Profile — the target customer characteristics for Dealix. |