diff --git a/salesflow-saas/backend/app/services/knowledge_brain.py b/salesflow-saas/backend/app/services/knowledge_brain.py new file mode 100644 index 00000000..610a9cb6 --- /dev/null +++ b/salesflow-saas/backend/app/services/knowledge_brain.py @@ -0,0 +1,560 @@ +""" +Knowledge Brain — Dealix Second Brain Service +Project knowledge management: ingest, query, lint, index. +Manages the wiki layer in memory/wiki/ and indexes in memory/indexes/. +""" +import logging +import os +import re +import uuid +from datetime import datetime, timedelta, timezone +from enum import Enum +from pathlib import Path +from typing import Any, Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + +WIKI_DIR = Path(__file__).resolve().parents[4] / "memory" / "wiki" +INDEX_DIR = Path(__file__).resolve().parents[4] / "memory" / "indexes" +MEMORY_DIR = Path(__file__).resolve().parents[4] / "memory" +STALE_THRESHOLD_DAYS = 30 + + +class PageType(str, Enum): + ARCHITECTURE = "architecture" + PRODUCT = "product" + GTM = "gtm" + CUSTOMER = "customer" + OPERATIONS = "operations" + SECURITY = "security" + TOOLING = "tooling" + GLOSSARY = "glossary" + + +class Confidence(str, Enum): + HIGH = "high" + MEDIUM = "medium" + LOW = "low" + + +class IssueSeverity(str, Enum): + ERROR = "error" + WARNING = "warning" + INFO = "info" + + +class WikiPage(BaseModel): + """Structured wiki page — صفحة ويكي منظمة""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + title: str + title_ar: str = "" + page_type: PageType + summary: str + summary_ar: str + key_facts: list[str] = [] + provenance: str + confidence: Confidence = Confidence.MEDIUM + related_pages: list[str] = [] + last_updated: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + stale: bool = False + file_path: str = "" + body: str = "" + + class Config: + json_schema_extra = { + "example": { + "title": "System Architecture", + "title_ar": "بنية النظام", + "page_type": "architecture", + "summary": "Multi-tenant AI CRM architecture overview", + "summary_ar": "نظرة عامة على بنية إدارة علاقات العملاء متعددة المستأجرين", + } + } + + +class BrainAnswer(BaseModel): + """Answer from the knowledge brain — إجابة من الدماغ المعرفي""" + question: str + answer: str + answer_ar: str = "" + sources: list[str] = [] + confidence: Confidence = Confidence.LOW + related_pages: list[str] = [] + + +class BrainIssue(BaseModel): + """Quality issue found during lint — مشكلة جودة مكتشفة أثناء الفحص""" + issue_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + severity: IssueSeverity + category: str + title: str + title_ar: str + description: str + affected_page: str = "" + recommendation: str = "" + + +class KnowledgeBrain: + """ + Project knowledge management — ingest, query, lint. + إدارة المعرفة المشروعية — استيعاب، استعلام، فحص. + """ + + def __init__(self, wiki_dir: Path = None, memory_dir: Path = None): + self.wiki_dir = wiki_dir or WIKI_DIR + self.memory_dir = memory_dir or MEMORY_DIR + self.index_dir = INDEX_DIR + self._page_cache: dict[str, WikiPage] = {} + self._ensure_dirs() + + def _ensure_dirs(self) -> None: + self.wiki_dir.mkdir(parents=True, exist_ok=True) + self.index_dir.mkdir(parents=True, exist_ok=True) + + def _parse_frontmatter(self, content: str, file_path: str) -> WikiPage: + """Parse wiki page frontmatter into a WikiPage model.""" + lines = content.split("\n") + title = "" + fields: dict[str, Any] = {} + body_start = 0 + + for i, line in enumerate(lines): + stripped = line.strip() + if stripped.startswith("# "): + title = stripped[2:].strip() + elif stripped == "---": + body_start = i + 1 + break + elif stripped.startswith("**") and "**:" in stripped: + match = re.match(r"\*\*(.+?)\*\*:\s*(.*)", stripped) + if match: + key = match.group(1).lower().replace(" ", "_") + value = match.group(2).strip() + fields[key] = value + + body = "\n".join(lines[body_start:]).strip() if body_start > 0 else "" + key_facts = [] + if "key_facts" in fields: + fact_pattern = re.compile(r"^\s*-\s+(.+)$") + in_facts = False + for line in lines: + if "**Key Facts**" in line: + in_facts = True + continue + if in_facts: + fact_match = fact_pattern.match(line) + if fact_match: + key_facts.append(fact_match.group(1).strip()) + elif line.strip().startswith("**"): + break + + related = [] + if "related_pages" in fields: + link_pattern = re.compile(r"\[.+?\]\((.+?)\)") + related = link_pattern.findall(fields["related_pages"]) + + page_type = PageType.ARCHITECTURE + type_val = fields.get("type", "architecture").lower() + for pt in PageType: + if pt.value == type_val: + page_type = pt + break + + conf = Confidence.MEDIUM + conf_val = fields.get("confidence", "medium").lower() + for c in Confidence: + if c.value == conf_val: + conf = c + break + + last_updated = datetime.now(timezone.utc) + if "last_updated" in fields: + try: + last_updated = datetime.strptime( + fields["last_updated"], "%Y-%m-%d" + ).replace(tzinfo=timezone.utc) + except ValueError: + pass + + stale = fields.get("stale", "false").lower() == "true" + + return WikiPage( + title=title, + title_ar=fields.get("title_ar", ""), + page_type=page_type, + summary=fields.get("summary", ""), + summary_ar=fields.get("summary_ar", ""), + key_facts=key_facts, + provenance=fields.get("provenance", ""), + confidence=conf, + related_pages=related, + last_updated=last_updated, + stale=stale, + file_path=file_path, + body=body, + ) + + async def _load_all_pages(self) -> list[WikiPage]: + """Load and parse all wiki pages.""" + pages = [] + if not self.wiki_dir.exists(): + return pages + for md_file in sorted(self.wiki_dir.glob("*.md")): + if md_file.name == "README.md": + continue + try: + content = md_file.read_text(encoding="utf-8") + page = self._parse_frontmatter(content, str(md_file)) + self._page_cache[page.id] = page + pages.append(page) + except Exception as exc: + logger.warning("فشل تحميل الصفحة %s: %s", md_file.name, exc) + return pages + + async def ingest( + self, + source_type: str, + content: str, + metadata: dict[str, Any] = None, + ) -> WikiPage: + """ + Classify content, create/update wiki page, link related pages. + تصنيف المحتوى، إنشاء/تحديث صفحة ويكي، ربط الصفحات ذات الصلة. + """ + metadata = metadata or {} + title = metadata.get("title", f"Ingested — {source_type}") + title_ar = metadata.get("title_ar", f"مستوعب — {source_type}") + + page_type = self._classify_content(source_type, content) + summary = content[:120].replace("\n", " ").strip() + summary_ar = metadata.get("summary_ar", f"محتوى {source_type} مستوعب تلقائياً") + + existing_pages = await self._load_all_pages() + related = self._find_related(content, existing_pages) + + page = WikiPage( + title=title, + title_ar=title_ar, + page_type=page_type, + summary=summary, + summary_ar=summary_ar, + key_facts=metadata.get("key_facts", []), + provenance=metadata.get("provenance", f"Auto-ingested from {source_type}"), + confidence=Confidence(metadata.get("confidence", "medium")), + related_pages=[p.file_path for p in related[:5]], + body=content, + ) + + file_name = re.sub(r"[^\w\s-]", "", title.lower()).replace(" ", "-")[:50] + file_path = self.wiki_dir / f"{file_name}.md" + page.file_path = str(file_path) + + md_content = self._render_page(page) + file_path.write_text(md_content, encoding="utf-8") + self._page_cache[page.id] = page + + logger.info("تم استيعاب صفحة جديدة: %s (%s)", title, page_type.value) + return page + + def _classify_content(self, source_type: str, content: str) -> PageType: + """Classify content into a page type based on keywords.""" + content_lower = content.lower() + keyword_map = { + PageType.ARCHITECTURE: ["api", "database", "service", "backend", "frontend", "deploy"], + PageType.PRODUCT: ["feature", "roadmap", "user story", "requirement", "ميزة"], + PageType.GTM: ["launch", "marketing", "outreach", "growth", "campaign", "تسويق"], + PageType.CUSTOMER: ["customer", "interview", "feedback", "icp", "عميل"], + PageType.OPERATIONS: ["runbook", "checklist", "process", "deploy", "عملية"], + PageType.SECURITY: ["pdpl", "consent", "security", "compliance", "أمان"], + PageType.TOOLING: ["provider", "api key", "integration", "tool", "أداة"], + } + scores: dict[PageType, int] = {} + for ptype, keywords in keyword_map.items(): + scores[ptype] = sum(1 for kw in keywords if kw in content_lower) + + if source_type in ("adr", "architecture"): + return PageType.ARCHITECTURE + if source_type in ("customer_interview", "feedback"): + return PageType.CUSTOMER + + best = max(scores, key=lambda k: scores[k]) + return best if scores[best] > 0 else PageType.PRODUCT + + def _find_related(self, content: str, pages: list[WikiPage]) -> list[WikiPage]: + """Find related pages by keyword overlap.""" + content_words = set(content.lower().split()) + scored: list[tuple[WikiPage, int]] = [] + for page in pages: + page_words = set(page.summary.lower().split()) | set(page.body.lower().split()[:100]) + overlap = len(content_words & page_words) + if overlap > 2: + scored.append((page, overlap)) + scored.sort(key=lambda x: x[1], reverse=True) + return [p for p, _ in scored[:5]] + + def _render_page(self, page: WikiPage) -> str: + """Render a WikiPage model to markdown.""" + facts = "\n".join(f" - {f}" for f in page.key_facts) if page.key_facts else " - (none)" + related = ", ".join( + f"[{Path(r).stem}]({r})" for r in page.related_pages + ) if page.related_pages else "(none)" + date_str = page.last_updated.strftime("%Y-%m-%d") + + return f"""# {page.title} + +**Type**: {page.page_type.value} +**Summary**: {page.summary} +**Summary_AR**: {page.summary_ar} +**Key Facts**: +{facts} +**Provenance**: {page.provenance} +**Confidence**: {page.confidence.value} +**Related Pages**: {related} +**Last Updated**: {date_str} +**Stale**: {str(page.stale).lower()} + +--- + +{page.body} +""" + + async def query( + self, question: str, domain: str = None + ) -> BrainAnswer: + """ + Search wiki + memory for relevant answers. + البحث في الويكي والذاكرة عن إجابات ذات صلة. + """ + pages = await self._load_all_pages() + if domain: + try: + dtype = PageType(domain) + pages = [p for p in pages if p.page_type == dtype] + except ValueError: + pass + + question_lower = question.lower() + question_words = set(question_lower.split()) + + scored: list[tuple[WikiPage, float]] = [] + for page in pages: + searchable = f"{page.title} {page.summary} {page.body} {' '.join(page.key_facts)}".lower() + searchable_words = set(searchable.split()) + overlap = len(question_words & searchable_words) + if overlap > 0: + score = overlap / max(len(question_words), 1) + if page.confidence == Confidence.HIGH: + score *= 1.3 + elif page.confidence == Confidence.LOW: + score *= 0.7 + scored.append((page, score)) + + scored.sort(key=lambda x: x[1], reverse=True) + top_pages = scored[:3] + + if not top_pages: + return BrainAnswer( + question=question, + answer="لم يتم العثور على معلومات ذات صلة في قاعدة المعرفة.", + answer_ar="لم يتم العثور على معلومات ذات صلة في قاعدة المعرفة.", + confidence=Confidence.LOW, + ) + + best_page = top_pages[0][0] + best_score = top_pages[0][1] + + answer_parts = [best_page.summary] + if best_page.key_facts: + answer_parts.append("Key facts: " + "; ".join(best_page.key_facts[:3])) + + conf = Confidence.HIGH if best_score > 0.5 else (Confidence.MEDIUM if best_score > 0.2 else Confidence.LOW) + + return BrainAnswer( + question=question, + answer=" ".join(answer_parts), + answer_ar=best_page.summary_ar or "لا يوجد ملخص عربي", + sources=[p.file_path for p, _ in top_pages], + confidence=conf, + related_pages=[p.file_path for p, _ in top_pages], + ) + + async def lint(self) -> list[BrainIssue]: + """ + Check for: orphan pages, stale pages, missing provenance, duplicates, empty indexes. + فحص: صفحات يتيمة، صفحات قديمة، مصدر مفقود، تكرارات، فهارس فارغة. + """ + issues: list[BrainIssue] = [] + pages = await self._load_all_pages() + now = datetime.now(timezone.utc) + all_paths = {p.file_path for p in pages} + all_related_targets: set[str] = set() + + for page in pages: + for rel in page.related_pages: + resolved = str((Path(page.file_path).parent / rel).resolve()) + all_related_targets.add(resolved) + + # Stale check (>30 days) + age = (now - page.last_updated).days + if age > STALE_THRESHOLD_DAYS: + issues.append(BrainIssue( + severity=IssueSeverity.WARNING, + category="stale", + title=f"Stale page: {page.title}", + title_ar=f"صفحة قديمة: {page.title}", + description=f"Last updated {age} days ago (threshold: {STALE_THRESHOLD_DAYS}).", + affected_page=page.file_path, + recommendation="Review and update or archive this page.", + )) + + # Missing provenance + if not page.provenance or page.provenance.strip() == "": + issues.append(BrainIssue( + severity=IssueSeverity.ERROR, + category="provenance", + title=f"Missing provenance: {page.title}", + title_ar=f"مصدر مفقود: {page.title}", + description="Page has no provenance. All pages must cite their source.", + affected_page=page.file_path, + recommendation="Add provenance field with source reference.", + )) + + # Missing Arabic summary + if not page.summary_ar: + issues.append(BrainIssue( + severity=IssueSeverity.WARNING, + category="i18n", + title=f"Missing Arabic summary: {page.title}", + title_ar=f"ملخص عربي مفقود: {page.title}", + description="Page is missing summary_ar. Dealix is Arabic-first.", + affected_page=page.file_path, + recommendation="Add an Arabic summary.", + )) + + # Orphan check + for page in pages: + resolved_path = str(Path(page.file_path).resolve()) + if resolved_path not in all_related_targets and page.page_type != PageType.GLOSSARY: + issues.append(BrainIssue( + severity=IssueSeverity.INFO, + category="orphan", + title=f"Orphan page: {page.title}", + title_ar=f"صفحة يتيمة: {page.title}", + description="No other pages link to this page.", + affected_page=page.file_path, + recommendation="Add a link from a related page or index.", + )) + + # Duplicate check by title similarity + titles = [(p.title.lower().strip(), p) for p in pages] + seen: set[str] = set() + for title, page in titles: + if title in seen: + issues.append(BrainIssue( + severity=IssueSeverity.WARNING, + category="duplicate", + title=f"Possible duplicate: {page.title}", + title_ar=f"تكرار محتمل: {page.title}", + description=f"Multiple pages with title '{page.title}'.", + affected_page=page.file_path, + recommendation="Merge duplicate pages.", + )) + seen.add(title) + + # Empty index check + if self.index_dir.exists(): + for idx_file in self.index_dir.glob("*.md"): + content = idx_file.read_text(encoding="utf-8") + if len(content.strip()) < 50: + issues.append(BrainIssue( + severity=IssueSeverity.WARNING, + category="empty_index", + title=f"Empty index: {idx_file.name}", + title_ar=f"فهرس فارغ: {idx_file.name}", + description="Index file has very little content.", + affected_page=str(idx_file), + recommendation="Populate or remove the index.", + )) + + logger.info("فحص الدماغ المعرفي: %d مشكلة مكتشفة", len(issues)) + return issues + + async def get_index(self, domain: str) -> list[WikiPage]: + """ + Return all pages in a domain. + إرجاع جميع الصفحات في نطاق معين. + """ + pages = await self._load_all_pages() + try: + dtype = PageType(domain) + return [p for p in pages if p.page_type == dtype] + except ValueError: + logger.warning("نطاق غير معروف: %s", domain) + return [] + + async def mark_stale(self, page_id: str) -> None: + """ + Mark a page as stale. + تعليم صفحة كقديمة. + """ + page = self._page_cache.get(page_id) + if not page: + pages = await self._load_all_pages() + for p in pages: + if p.id == page_id: + page = p + break + if not page: + logger.error("صفحة غير موجودة: %s", page_id) + return + + page.stale = True + file_path = Path(page.file_path) + if file_path.exists(): + content = file_path.read_text(encoding="utf-8") + content = re.sub( + r"\*\*Stale\*\*:\s*false", + "**Stale**: true", + content, + ) + file_path.write_text(content, encoding="utf-8") + logger.info("تم تعليم الصفحة كقديمة: %s", page.title) + + async def promote_raw( + self, + raw_id: str, + raw_content: str = None, + metadata: dict[str, Any] = None, + ) -> WikiPage: + """ + Convert raw material to structured wiki page. + تحويل مادة خام إلى صفحة ويكي منظمة. + """ + metadata = metadata or {} + if raw_content is None: + raw_path = self.memory_dir / "raw" / f"{raw_id}.md" + if raw_path.exists(): + raw_content = raw_path.read_text(encoding="utf-8") + else: + raise FileNotFoundError(f"المادة الخام غير موجودة: {raw_id}") + + title = metadata.get("title", f"Promoted from raw — {raw_id}") + page = await self.ingest( + source_type="raw_promotion", + content=raw_content, + metadata={ + "title": title, + "title_ar": metadata.get("title_ar", f"مروّج من مادة خام — {raw_id}"), + "provenance": f"Promoted from raw material {raw_id}", + "confidence": metadata.get("confidence", "medium"), + **metadata, + }, + ) + logger.info("تمت ترقية المادة الخام إلى صفحة ويكي: %s → %s", raw_id, page.title) + return page + + +# Global singleton +knowledge_brain = KnowledgeBrain() diff --git a/salesflow-saas/backend/app/services/memory_engine.py b/salesflow-saas/backend/app/services/memory_engine.py new file mode 100644 index 00000000..5755c814 --- /dev/null +++ b/salesflow-saas/backend/app/services/memory_engine.py @@ -0,0 +1,615 @@ +""" +Memory Engine — Dealix MemPalace Pattern +Pluggable memory adapter with evaluation and quality checks. +Supports Redis (production) and file-based (local/offline) backends. +""" +import json +import logging +import os +import re +import uuid +from abc import ABC, abstractmethod +from collections import defaultdict +from datetime import datetime, timedelta, timezone +from difflib import SequenceMatcher +from pathlib import Path +from typing import Any, Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + +MEMORY_BASE_DIR = Path(__file__).resolve().parents[4] / "memory" +STALENESS_DAYS = 30 + + +# --------------------------------------------------------------------------- +# Models — نماذج البيانات +# --------------------------------------------------------------------------- + +class MemoryDomain(str): + """Domain tag for memory items.""" + pass + + +class MemoryItem(BaseModel): + """A single memory item — عنصر ذاكرة واحد""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + domain: str = "project" # project, customer, deal, competitor, prompt + content: str + metadata: dict[str, Any] = {} + source: str = "" + confidence: float = Field(default=0.7, ge=0.0, le=1.0) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + access_count: int = 0 + is_canonical: bool = True # True = business data, False = derived/AI-generated + retention_days: int = 0 # 0 = permanent + tenant_id: str = "" + tags: list[str] = [] + + class Config: + json_schema_extra = { + "example": { + "domain": "customer", + "content": "Acme Corp prefers WhatsApp for all communication", + "source": "customer_interview_2026-04-10", + "confidence": 0.9, + "is_canonical": True, + } + } + + +class EvalResult(BaseModel): + """Evaluation result for memory quality — نتيجة تقييم جودة الذاكرة""" + total_queries: int = 0 + correct_retrievals: int = 0 + precision: float = 0.0 + recall: float = 0.0 + avg_rank: float = 0.0 + message_ar: str = "" + + +class MemoryStats(BaseModel): + """Memory store statistics — إحصائيات مخزن الذاكرة""" + total_items: int = 0 + by_domain: dict[str, int] = {} + canonical_count: int = 0 + derived_count: int = 0 + avg_confidence: float = 0.0 + oldest_item: Optional[datetime] = None + newest_item: Optional[datetime] = None + message_ar: str = "" + + +# --------------------------------------------------------------------------- +# Abstract Adapter — المحول المجرد +# --------------------------------------------------------------------------- + +class MemoryAdapter(ABC): + """Abstract adapter — swap backends without rewriting app.""" + + @abstractmethod + async def store(self, item: MemoryItem) -> str: + """Store item, return ID — تخزين عنصر وإرجاع المعرف""" + + @abstractmethod + async def retrieve( + self, query: str, domain: str = None, limit: int = 5 + ) -> list[MemoryItem]: + """Retrieve matching items — استرجاع العناصر المطابقة""" + + @abstractmethod + async def update(self, item_id: str, content: str) -> bool: + """Update item content — تحديث محتوى العنصر""" + + @abstractmethod + async def delete(self, item_id: str) -> bool: + """Delete item — حذف العنصر""" + + @abstractmethod + async def search_by_entity( + self, entity_type: str, entity_id: str + ) -> list[MemoryItem]: + """Search by entity reference — البحث بمرجع الكيان""" + + @abstractmethod + async def get_stats(self) -> MemoryStats: + """Return store statistics — إرجاع إحصائيات المخزن""" + + @abstractmethod + async def list_all(self, domain: str = None) -> list[MemoryItem]: + """List all items optionally filtered by domain.""" + + +# --------------------------------------------------------------------------- +# Redis Adapter — محول ريدس +# --------------------------------------------------------------------------- + +class RedisMemoryAdapter(MemoryAdapter): + """ + Redis-backed memory for fast retrieval. + Uses Redis Search if available, falls back to key scanning. + ذاكرة مدعومة بريدس للاسترجاع السريع. + """ + + KEY_PREFIX = "dealix:memory:" + + def __init__(self, redis_client: Any = None, redis_url: str = None): + self._redis = redis_client + self._redis_url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379/0") + self._connected = False + + async def _ensure_connection(self) -> None: + if self._redis is not None and self._connected: + return + try: + import redis.asyncio as aioredis + self._redis = aioredis.from_url( + self._redis_url, decode_responses=True + ) + await self._redis.ping() + self._connected = True + logger.info("تم الاتصال بريدس: %s", self._redis_url) + except Exception as exc: + logger.warning("فشل الاتصال بريدس: %s — سيتم استخدام الذاكرة المحلية", exc) + self._connected = False + raise + + def _key(self, item_id: str) -> str: + return f"{self.KEY_PREFIX}{item_id}" + + def _domain_key(self, domain: str) -> str: + return f"{self.KEY_PREFIX}domain:{domain}" + + def _entity_key(self, entity_type: str, entity_id: str) -> str: + return f"{self.KEY_PREFIX}entity:{entity_type}:{entity_id}" + + async def store(self, item: MemoryItem) -> str: + await self._ensure_connection() + data = item.model_dump(mode="json") + data["created_at"] = item.created_at.isoformat() + data["updated_at"] = item.updated_at.isoformat() + pipe = self._redis.pipeline() + pipe.set(self._key(item.id), json.dumps(data, ensure_ascii=False)) + pipe.sadd(self._domain_key(item.domain), item.id) + if item.retention_days > 0: + pipe.expire(self._key(item.id), item.retention_days * 86400) + # Index by entity if metadata has entity references + for etype in ("lead_id", "deal_id", "company_id", "tenant_id"): + if etype in item.metadata: + pipe.sadd(self._entity_key(etype, str(item.metadata[etype])), item.id) + await pipe.execute() + logger.info("تم تخزين عنصر ذاكرة في ريدس: %s (%s)", item.id, item.domain) + return item.id + + async def retrieve( + self, query: str, domain: str = None, limit: int = 5 + ) -> list[MemoryItem]: + await self._ensure_connection() + if domain: + ids = await self._redis.smembers(self._domain_key(domain)) + else: + all_keys = [] + async for key in self._redis.scan_iter(f"{self.KEY_PREFIX}[0-9a-f]*"): + all_keys.append(key.replace(self.KEY_PREFIX, "")) + ids = all_keys + + items: list[MemoryItem] = [] + query_lower = query.lower() + query_words = set(query_lower.split()) + + for item_id in ids: + raw = await self._redis.get(self._key(item_id)) + if not raw: + continue + data = json.loads(raw) + content_lower = data.get("content", "").lower() + content_words = set(content_lower.split()) + overlap = len(query_words & content_words) + if overlap > 0 or query_lower in content_lower: + data["created_at"] = datetime.fromisoformat(data["created_at"]) + data["updated_at"] = datetime.fromisoformat(data["updated_at"]) + item = MemoryItem(**data) + item.access_count += 1 + items.append(item) + + items.sort(key=lambda x: x.confidence, reverse=True) + return items[:limit] + + async def update(self, item_id: str, content: str) -> bool: + await self._ensure_connection() + raw = await self._redis.get(self._key(item_id)) + if not raw: + return False + data = json.loads(raw) + data["content"] = content + data["updated_at"] = datetime.now(timezone.utc).isoformat() + await self._redis.set(self._key(item_id), json.dumps(data, ensure_ascii=False)) + return True + + async def delete(self, item_id: str) -> bool: + await self._ensure_connection() + raw = await self._redis.get(self._key(item_id)) + if not raw: + return False + data = json.loads(raw) + domain = data.get("domain", "project") + pipe = self._redis.pipeline() + pipe.delete(self._key(item_id)) + pipe.srem(self._domain_key(domain), item_id) + await pipe.execute() + return True + + async def search_by_entity( + self, entity_type: str, entity_id: str + ) -> list[MemoryItem]: + await self._ensure_connection() + ids = await self._redis.smembers(self._entity_key(entity_type, entity_id)) + items: list[MemoryItem] = [] + for item_id in ids: + raw = await self._redis.get(self._key(item_id)) + if raw: + data = json.loads(raw) + data["created_at"] = datetime.fromisoformat(data["created_at"]) + data["updated_at"] = datetime.fromisoformat(data["updated_at"]) + items.append(MemoryItem(**data)) + return items + + async def get_stats(self) -> MemoryStats: + await self._ensure_connection() + items = await self.list_all() + return _compute_stats(items) + + async def list_all(self, domain: str = None) -> list[MemoryItem]: + await self._ensure_connection() + if domain: + ids = await self._redis.smembers(self._domain_key(domain)) + else: + ids = set() + async for key in self._redis.scan_iter(f"{self.KEY_PREFIX}[0-9a-f]*"): + ids.add(key.replace(self.KEY_PREFIX, "")) + items = [] + for item_id in ids: + raw = await self._redis.get(self._key(item_id)) + if raw: + data = json.loads(raw) + data["created_at"] = datetime.fromisoformat(data["created_at"]) + data["updated_at"] = datetime.fromisoformat(data["updated_at"]) + items.append(MemoryItem(**data)) + return items + + +# --------------------------------------------------------------------------- +# File Adapter — محول الملفات +# --------------------------------------------------------------------------- + +class FileMemoryAdapter(MemoryAdapter): + """ + File-based memory for local/offline use. + Stores as JSON files in memory/ directory. + ذاكرة مبنية على الملفات للاستخدام المحلي/غير المتصل. + """ + + def __init__(self, base_dir: Path = None): + self.base_dir = base_dir or MEMORY_BASE_DIR / "_store" + self.base_dir.mkdir(parents=True, exist_ok=True) + + def _item_path(self, item_id: str) -> Path: + return self.base_dir / f"{item_id}.json" + + def _domain_dir(self, domain: str) -> Path: + d = self.base_dir / domain + d.mkdir(parents=True, exist_ok=True) + return d + + async def store(self, item: MemoryItem) -> str: + file_path = self._domain_dir(item.domain) / f"{item.id}.json" + data = item.model_dump(mode="json") + data["created_at"] = item.created_at.isoformat() + data["updated_at"] = item.updated_at.isoformat() + file_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + logger.info("تم تخزين عنصر ذاكرة في ملف: %s (%s)", item.id, item.domain) + return item.id + + async def retrieve( + self, query: str, domain: str = None, limit: int = 5 + ) -> list[MemoryItem]: + items = await self.list_all(domain) + query_lower = query.lower() + query_words = set(query_lower.split()) + scored: list[tuple[MemoryItem, float]] = [] + + for item in items: + content_lower = item.content.lower() + content_words = set(content_lower.split()) + overlap = len(query_words & content_words) + if overlap > 0 or query_lower in content_lower: + score = (overlap / max(len(query_words), 1)) * item.confidence + scored.append((item, score)) + + scored.sort(key=lambda x: x[1], reverse=True) + results = [item for item, _ in scored[:limit]] + for item in results: + item.access_count += 1 + await self._write_item(item) + return results + + async def update(self, item_id: str, content: str) -> bool: + item = await self._find_item(item_id) + if not item: + return False + item.content = content + item.updated_at = datetime.now(timezone.utc) + await self._write_item(item) + return True + + async def delete(self, item_id: str) -> bool: + for domain_dir in self.base_dir.iterdir(): + if not domain_dir.is_dir(): + continue + path = domain_dir / f"{item_id}.json" + if path.exists(): + path.unlink() + logger.info("تم حذف عنصر ذاكرة: %s", item_id) + return True + return False + + async def search_by_entity( + self, entity_type: str, entity_id: str + ) -> list[MemoryItem]: + all_items = await self.list_all() + return [ + item for item in all_items + if str(item.metadata.get(entity_type, "")) == str(entity_id) + ] + + async def get_stats(self) -> MemoryStats: + items = await self.list_all() + return _compute_stats(items) + + async def list_all(self, domain: str = None) -> list[MemoryItem]: + items: list[MemoryItem] = [] + search_dirs = ( + [self._domain_dir(domain)] if domain + else [d for d in self.base_dir.iterdir() if d.is_dir()] + ) + for dir_path in search_dirs: + for json_file in dir_path.glob("*.json"): + try: + data = json.loads(json_file.read_text(encoding="utf-8")) + if "created_at" in data and isinstance(data["created_at"], str): + data["created_at"] = datetime.fromisoformat(data["created_at"]) + if "updated_at" in data and isinstance(data["updated_at"], str): + data["updated_at"] = datetime.fromisoformat(data["updated_at"]) + items.append(MemoryItem(**data)) + except Exception as exc: + logger.warning("فشل تحميل عنصر ذاكرة %s: %s", json_file.name, exc) + return items + + async def _find_item(self, item_id: str) -> Optional[MemoryItem]: + for domain_dir in self.base_dir.iterdir(): + if not domain_dir.is_dir(): + continue + path = domain_dir / f"{item_id}.json" + if path.exists(): + data = json.loads(path.read_text(encoding="utf-8")) + if isinstance(data.get("created_at"), str): + data["created_at"] = datetime.fromisoformat(data["created_at"]) + if isinstance(data.get("updated_at"), str): + data["updated_at"] = datetime.fromisoformat(data["updated_at"]) + return MemoryItem(**data) + return None + + async def _write_item(self, item: MemoryItem) -> None: + file_path = self._domain_dir(item.domain) / f"{item.id}.json" + data = item.model_dump(mode="json") + data["created_at"] = item.created_at.isoformat() + data["updated_at"] = item.updated_at.isoformat() + file_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _compute_stats(items: list[MemoryItem]) -> MemoryStats: + if not items: + return MemoryStats(message_ar="لا توجد عناصر في الذاكرة") + by_domain: dict[str, int] = defaultdict(int) + canonical = 0 + total_conf = 0.0 + oldest = items[0].created_at + newest = items[0].created_at + for item in items: + by_domain[item.domain] += 1 + if item.is_canonical: + canonical += 1 + total_conf += item.confidence + if item.created_at < oldest: + oldest = item.created_at + if item.created_at > newest: + newest = item.created_at + return MemoryStats( + total_items=len(items), + by_domain=dict(by_domain), + canonical_count=canonical, + derived_count=len(items) - canonical, + avg_confidence=round(total_conf / len(items), 4), + oldest_item=oldest, + newest_item=newest, + message_ar=f"إجمالي العناصر: {len(items)}، المعتمدة: {canonical}، المشتقة: {len(items) - canonical}", + ) + + +# --------------------------------------------------------------------------- +# Memory Evaluator — مقيّم الذاكرة +# --------------------------------------------------------------------------- + +class MemoryEvaluator: + """ + Evaluate memory quality before trusting it. + تقييم جودة الذاكرة قبل الوثوق بها. + """ + + def __init__(self, adapter: MemoryAdapter): + self._adapter = adapter + + async def benchmark_retrieval( + self, + test_queries: list[str], + expected_results: list[list[str]], + ) -> EvalResult: + """ + Run retrieval benchmark against known query/result pairs. + تشغيل اختبار الاسترجاع مقابل أزواج استعلام/نتيجة معروفة. + """ + if len(test_queries) != len(expected_results): + raise ValueError("test_queries and expected_results must have same length") + + total = len(test_queries) + correct = 0 + total_recall = 0.0 + total_rank = 0.0 + + for query, expected in zip(test_queries, expected_results): + results = await self._adapter.retrieve(query, limit=10) + result_contents = [r.content.lower().strip() for r in results] + expected_lower = [e.lower().strip() for e in expected] + + found_any = False + best_rank = len(results) + 1 + matched = 0 + for exp in expected_lower: + for rank, res in enumerate(result_contents): + if exp in res or SequenceMatcher(None, exp, res).ratio() > 0.7: + found_any = True + matched += 1 + best_rank = min(best_rank, rank + 1) + break + + if found_any: + correct += 1 + if expected_lower: + total_recall += matched / len(expected_lower) + total_rank += best_rank if found_any else len(results) + 1 + + precision = correct / total if total else 0.0 + recall = total_recall / total if total else 0.0 + avg_rank = total_rank / total if total else 0.0 + + return EvalResult( + total_queries=total, + correct_retrievals=correct, + precision=round(precision, 4), + recall=round(recall, 4), + avg_rank=round(avg_rank, 2), + message_ar=f"الدقة: {precision:.2%}، الاستدعاء: {recall:.2%}، متوسط الترتيب: {avg_rank:.1f}", + ) + + async def check_staleness(self, domain: str = None) -> list[MemoryItem]: + """ + Items not accessed in 30+ days. + العناصر التي لم يتم الوصول إليها منذ 30 يومًا أو أكثر. + """ + items = await self._adapter.list_all(domain) + cutoff = datetime.now(timezone.utc) - timedelta(days=STALENESS_DAYS) + return [item for item in items if item.updated_at < cutoff] + + async def check_duplicates(self, domain: str = None) -> list[tuple[MemoryItem, MemoryItem]]: + """ + Find similar items that may be duplicates. + البحث عن عناصر متشابهة قد تكون مكررة. + """ + items = await self._adapter.list_all(domain) + duplicates: list[tuple[MemoryItem, MemoryItem]] = [] + seen: set[str] = set() + + for i, a in enumerate(items): + for b in items[i + 1:]: + pair_key = f"{a.id}:{b.id}" + if pair_key in seen: + continue + ratio = SequenceMatcher(None, a.content.lower(), b.content.lower()).ratio() + if ratio > 0.8: + duplicates.append((a, b)) + seen.add(pair_key) + + return duplicates + + async def check_contradictions(self, domain: str = None) -> list[tuple[MemoryItem, MemoryItem]]: + """ + Find items in the same domain with conflicting content. + البحث عن عناصر في نفس النطاق بمحتوى متناقض. + """ + items = await self._adapter.list_all(domain) + contradictions: list[tuple[MemoryItem, MemoryItem]] = [] + negation_markers = {"not", "no", "never", "cannot", "لا", "ليس", "لن", "لم", "غير"} + + for i, a in enumerate(items): + a_words = set(a.content.lower().split()) + for b in items[i + 1:]: + if a.domain != b.domain: + continue + b_words = set(b.content.lower().split()) + shared = a_words & b_words + a_negations = a_words & negation_markers + b_negations = b_words & negation_markers + # If they share many words but differ in negation, flag as contradiction + if len(shared) > 3 and a_negations != b_negations: + contradictions.append((a, b)) + + return contradictions + + async def get_health_report(self) -> dict[str, Any]: + """ + Overall memory health metrics. + مقاييس صحة الذاكرة العامة. + """ + stats = await self._adapter.get_stats() + stale = await self.check_staleness() + duplicates = await self.check_duplicates() + contradictions = await self.check_contradictions() + + health_score = 1.0 + if stats.total_items > 0: + stale_ratio = len(stale) / stats.total_items + dup_ratio = len(duplicates) / stats.total_items + contra_ratio = len(contradictions) / stats.total_items + health_score = max(0.0, 1.0 - stale_ratio * 0.3 - dup_ratio * 0.4 - contra_ratio * 0.5) + + return { + "health_score": round(health_score, 4), + "total_items": stats.total_items, + "stale_items": len(stale), + "duplicate_pairs": len(duplicates), + "contradiction_pairs": len(contradictions), + "avg_confidence": stats.avg_confidence, + "by_domain": stats.by_domain, + "message_ar": ( + f"درجة الصحة: {health_score:.2%}، " + f"عناصر قديمة: {len(stale)}، " + f"تكرارات: {len(duplicates)}، " + f"تناقضات: {len(contradictions)}" + ), + } + + +# --------------------------------------------------------------------------- +# Factory — مصنع المحولات +# --------------------------------------------------------------------------- + +def create_memory_adapter(backend: str = None) -> MemoryAdapter: + """ + Create the appropriate memory adapter based on config. + إنشاء محول الذاكرة المناسب بناءً على التكوين. + """ + backend = backend or os.getenv("MEMORY_BACKEND", "file") + if backend == "redis": + return RedisMemoryAdapter() + return FileMemoryAdapter() + + +# Global instances +memory_adapter = create_memory_adapter() +memory_evaluator = MemoryEvaluator(memory_adapter) diff --git a/salesflow-saas/backend/app/services/session_continuity.py b/salesflow-saas/backend/app/services/session_continuity.py new file mode 100644 index 00000000..231d2165 --- /dev/null +++ b/salesflow-saas/backend/app/services/session_continuity.py @@ -0,0 +1,447 @@ +""" +Session Continuity — Dealix AI Session State Management +Maintains context across AI agent sessions for seamless handoff. +Stores decisions, failures, wins, and follow-ups between sessions. +""" +import json +import logging +import uuid +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + +SESSIONS_DIR = Path(__file__).resolve().parents[4] / "memory" / "_sessions" +SESSIONS_DIR.mkdir(parents=True, exist_ok=True) + + +# --------------------------------------------------------------------------- +# Models — نماذج البيانات +# --------------------------------------------------------------------------- + +class Decision(BaseModel): + """A recorded decision — قرار مسجّل""" + decision: str + context: str + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + decision_ar: str = "" + made_by: str = "" + + +class Failure(BaseModel): + """A recorded failure — فشل مسجّل""" + description: str + context: str + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + description_ar: str = "" + resolution: str = "" + + +class Win(BaseModel): + """A recorded win — نجاح مسجّل""" + description: str + context: str + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + description_ar: str = "" + + +class FollowUp(BaseModel): + """A pending follow-up task — مهمة متابعة معلّقة""" + task: str + task_ar: str = "" + due_date: Optional[datetime] = None + completed: bool = False + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + assigned_to: str = "" + + +class SessionState(BaseModel): + """Full session state — حالة الجلسة الكاملة""" + session_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + project: str = "dealix" + active_workstreams: list[str] = [] + last_decisions: list[Decision] = [] + open_questions: list[str] = [] + recent_failures: list[Failure] = [] + recent_wins: list[Win] = [] + pending_followups: list[FollowUp] = [] + context_summary: str = "" + context_summary_ar: str = "" + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + tags: list[str] = [] + tenant_id: str = "" + + class Config: + json_schema_extra = { + "example": { + "project": "dealix", + "active_workstreams": ["cpq-enhancement", "pdpl-audit"], + "context_summary": "Working on CPQ Arabic PDF generation and PDPL consent expiry.", + "context_summary_ar": "العمل على توليد PDF عربي للتسعير وانتهاء موافقة حماية البيانات.", + } + } + + +# --------------------------------------------------------------------------- +# Session Continuity Service — خدمة استمرارية الجلسة +# --------------------------------------------------------------------------- + +class SessionContinuity: + """ + Maintain context across AI sessions. + الحفاظ على السياق عبر جلسات الذكاء الاصطناعي. + """ + + def __init__(self, sessions_dir: Path = None): + self.sessions_dir = sessions_dir or SESSIONS_DIR + self.sessions_dir.mkdir(parents=True, exist_ok=True) + self._current: Optional[SessionState] = None + + def _session_path(self, session_id: str) -> Path: + return self.sessions_dir / f"{session_id}.json" + + def _serialize_state(self, state: SessionState) -> str: + data = state.model_dump(mode="json") + # Convert datetime objects to ISO strings for JSON + for key in ("created_at", "updated_at"): + if isinstance(data.get(key), datetime): + data[key] = data[key].isoformat() + for decision in data.get("last_decisions", []): + if isinstance(decision.get("timestamp"), datetime): + decision["timestamp"] = decision["timestamp"].isoformat() + for failure in data.get("recent_failures", []): + if isinstance(failure.get("timestamp"), datetime): + failure["timestamp"] = failure["timestamp"].isoformat() + for win in data.get("recent_wins", []): + if isinstance(win.get("timestamp"), datetime): + win["timestamp"] = win["timestamp"].isoformat() + for followup in data.get("pending_followups", []): + if isinstance(followup.get("due_date"), datetime): + followup["due_date"] = followup["due_date"].isoformat() + if isinstance(followup.get("created_at"), datetime): + followup["created_at"] = followup["created_at"].isoformat() + return json.dumps(data, ensure_ascii=False, indent=2) + + def _deserialize_state(self, raw: str) -> SessionState: + data = json.loads(raw) + for key in ("created_at", "updated_at"): + if isinstance(data.get(key), str): + data[key] = datetime.fromisoformat(data[key]) + for decision in data.get("last_decisions", []): + if isinstance(decision.get("timestamp"), str): + decision["timestamp"] = datetime.fromisoformat(decision["timestamp"]) + for failure in data.get("recent_failures", []): + if isinstance(failure.get("timestamp"), str): + failure["timestamp"] = datetime.fromisoformat(failure["timestamp"]) + for win in data.get("recent_wins", []): + if isinstance(win.get("timestamp"), str): + win["timestamp"] = datetime.fromisoformat(win["timestamp"]) + for followup in data.get("pending_followups", []): + if isinstance(followup.get("due_date"), str) and followup["due_date"]: + followup["due_date"] = datetime.fromisoformat(followup["due_date"]) + if isinstance(followup.get("created_at"), str): + followup["created_at"] = datetime.fromisoformat(followup["created_at"]) + return SessionState(**data) + + async def save_state(self, state: SessionState) -> str: + """ + Persist session state to disk. + حفظ حالة الجلسة على القرص. + """ + state.updated_at = datetime.now(timezone.utc) + path = self._session_path(state.session_id) + path.write_text(self._serialize_state(state), encoding="utf-8") + self._current = state + logger.info("تم حفظ حالة الجلسة: %s", state.session_id) + return state.session_id + + async def restore_state(self, session_id: str = None) -> SessionState: + """ + Restore session state. If no session_id, restore the latest. + استعادة حالة الجلسة. إذا لم يُحدد معرف، يتم استعادة الأحدث. + """ + if session_id: + path = self._session_path(session_id) + if path.exists(): + state = self._deserialize_state(path.read_text(encoding="utf-8")) + self._current = state + logger.info("تم استعادة الجلسة: %s", session_id) + return state + logger.warning("الجلسة غير موجودة: %s", session_id) + return SessionState(session_id=session_id) + + # Find the latest session + latest_path: Optional[Path] = None + latest_mtime = 0.0 + for f in self.sessions_dir.glob("*.json"): + mtime = f.stat().st_mtime + if mtime > latest_mtime: + latest_mtime = mtime + latest_path = f + + if latest_path: + state = self._deserialize_state(latest_path.read_text(encoding="utf-8")) + self._current = state + logger.info("تم استعادة أحدث جلسة: %s", state.session_id) + return state + + logger.info("لا توجد جلسات سابقة، إنشاء جلسة جديدة") + new_state = SessionState() + await self.save_state(new_state) + return new_state + + async def get_restore_prompt(self) -> str: + """ + Generate a text prompt summarizing current state for a new AI session. + توليد نص ملخّص للحالة الحالية لتغذية جلسة ذكاء اصطناعي جديدة. + """ + state = self._current + if not state: + state = await self.restore_state() + + lines = [ + "# Session Restore — استعادة الجلسة", + f"**Project**: {state.project}", + f"**Session**: {state.session_id}", + f"**Last Updated**: {state.updated_at.isoformat()}", + "", + ] + + if state.context_summary: + lines.append(f"## Context (السياق)") + lines.append(state.context_summary) + if state.context_summary_ar: + lines.append(state.context_summary_ar) + lines.append("") + + if state.active_workstreams: + lines.append("## Active Workstreams (مسارات العمل النشطة)") + for ws in state.active_workstreams: + lines.append(f"- {ws}") + lines.append("") + + if state.last_decisions: + lines.append("## Recent Decisions (القرارات الأخيرة)") + for d in state.last_decisions[-5:]: + ts = d.timestamp.strftime("%Y-%m-%d %H:%M") + lines.append(f"- [{ts}] {d.decision}") + if d.decision_ar: + lines.append(f" {d.decision_ar}") + lines.append("") + + if state.open_questions: + lines.append("## Open Questions (أسئلة مفتوحة)") + for q in state.open_questions: + lines.append(f"- {q}") + lines.append("") + + if state.recent_failures: + lines.append("## Recent Failures (الإخفاقات الأخيرة)") + for f in state.recent_failures[-3:]: + lines.append(f"- {f.description}") + if f.resolution: + lines.append(f" Resolution: {f.resolution}") + lines.append("") + + if state.recent_wins: + lines.append("## Recent Wins (النجاحات الأخيرة)") + for w in state.recent_wins[-3:]: + lines.append(f"- {w.description}") + lines.append("") + + pending = [fu for fu in state.pending_followups if not fu.completed] + if pending: + lines.append("## Pending Follow-ups (متابعات معلّقة)") + for fu in pending: + due = f" (due: {fu.due_date.strftime('%Y-%m-%d')})" if fu.due_date else "" + lines.append(f"- {fu.task}{due}") + lines.append("") + + lines.append("---") + lines.append("Continue from this state. Prioritize pending follow-ups and open questions.") + lines.append("استمر من هذه الحالة. أعطِ الأولوية للمتابعات المعلّقة والأسئلة المفتوحة.") + + return "\n".join(lines) + + async def add_decision(self, decision: str, context: str, decision_ar: str = "", made_by: str = "") -> None: + """ + Record a decision in the current session. + تسجيل قرار في الجلسة الحالية. + """ + if not self._current: + self._current = await self.restore_state() + + self._current.last_decisions.append(Decision( + decision=decision, + context=context, + decision_ar=decision_ar, + made_by=made_by, + )) + # Keep last 20 decisions + if len(self._current.last_decisions) > 20: + self._current.last_decisions = self._current.last_decisions[-20:] + await self.save_state(self._current) + logger.info("تم تسجيل قرار: %s", decision[:80]) + + async def add_failure(self, description: str, context: str, description_ar: str = "", resolution: str = "") -> None: + """ + Record a failure in the current session. + تسجيل فشل في الجلسة الحالية. + """ + if not self._current: + self._current = await self.restore_state() + + self._current.recent_failures.append(Failure( + description=description, + context=context, + description_ar=description_ar, + resolution=resolution, + )) + if len(self._current.recent_failures) > 10: + self._current.recent_failures = self._current.recent_failures[-10:] + await self.save_state(self._current) + logger.info("تم تسجيل فشل: %s", description[:80]) + + async def add_win(self, description: str, context: str, description_ar: str = "") -> None: + """ + Record a win in the current session. + تسجيل نجاح في الجلسة الحالية. + """ + if not self._current: + self._current = await self.restore_state() + + self._current.recent_wins.append(Win( + description=description, + context=context, + description_ar=description_ar, + )) + if len(self._current.recent_wins) > 10: + self._current.recent_wins = self._current.recent_wins[-10:] + await self.save_state(self._current) + logger.info("تم تسجيل نجاح: %s", description[:80]) + + async def add_followup(self, task: str, due_date: datetime = None, task_ar: str = "", assigned_to: str = "") -> None: + """ + Add a pending follow-up task. + إضافة مهمة متابعة معلّقة. + """ + if not self._current: + self._current = await self.restore_state() + + self._current.pending_followups.append(FollowUp( + task=task, + task_ar=task_ar, + due_date=due_date, + assigned_to=assigned_to, + )) + await self.save_state(self._current) + logger.info("تم إضافة متابعة: %s", task[:80]) + + async def complete_followup(self, task_substring: str) -> bool: + """ + Mark a follow-up as completed by matching task text. + تعليم متابعة كمكتملة عن طريق مطابقة نص المهمة. + """ + if not self._current: + self._current = await self.restore_state() + + task_lower = task_substring.lower() + for fu in self._current.pending_followups: + if task_lower in fu.task.lower() and not fu.completed: + fu.completed = True + await self.save_state(self._current) + logger.info("تم إكمال متابعة: %s", fu.task[:80]) + return True + return False + + async def set_workstreams(self, workstreams: list[str]) -> None: + """ + Update active workstreams. + تحديث مسارات العمل النشطة. + """ + if not self._current: + self._current = await self.restore_state() + self._current.active_workstreams = workstreams + await self.save_state(self._current) + + async def set_context(self, summary: str, summary_ar: str = "") -> None: + """ + Update the context summary. + تحديث ملخص السياق. + """ + if not self._current: + self._current = await self.restore_state() + self._current.context_summary = summary + self._current.context_summary_ar = summary_ar + await self.save_state(self._current) + + async def add_question(self, question: str) -> None: + """ + Add an open question. + إضافة سؤال مفتوح. + """ + if not self._current: + self._current = await self.restore_state() + if question not in self._current.open_questions: + self._current.open_questions.append(question) + if len(self._current.open_questions) > 15: + self._current.open_questions = self._current.open_questions[-15:] + await self.save_state(self._current) + + async def cleanup_old_sessions(self, days: int = 30) -> int: + """ + Remove session files older than N days. + حذف ملفات الجلسات الأقدم من N يوم. + """ + cutoff = datetime.now(timezone.utc) - timedelta(days=days) + removed = 0 + for f in self.sessions_dir.glob("*.json"): + try: + data = json.loads(f.read_text(encoding="utf-8")) + updated = data.get("updated_at", "") + if isinstance(updated, str) and updated: + ts = datetime.fromisoformat(updated) + if ts < cutoff: + f.unlink() + removed += 1 + except Exception as exc: + logger.warning("فشل معالجة ملف الجلسة %s: %s", f.name, exc) + logger.info("تم حذف %d جلسة قديمة (أقدم من %d يوم)", removed, days) + return removed + + async def list_sessions(self, limit: int = 20) -> list[dict[str, Any]]: + """ + List recent sessions with basic info. + عرض الجلسات الأخيرة مع معلومات أساسية. + """ + sessions: list[dict[str, Any]] = [] + for f in sorted(self.sessions_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True): + if len(sessions) >= limit: + break + try: + data = json.loads(f.read_text(encoding="utf-8")) + sessions.append({ + "session_id": data.get("session_id", f.stem), + "project": data.get("project", ""), + "updated_at": data.get("updated_at", ""), + "workstreams": data.get("active_workstreams", []), + "decisions_count": len(data.get("last_decisions", [])), + "followups_pending": sum( + 1 for fu in data.get("pending_followups", []) + if not fu.get("completed", False) + ), + }) + except Exception: + continue + return sessions + + +# --------------------------------------------------------------------------- +# Global singleton +# --------------------------------------------------------------------------- + +session_continuity = SessionContinuity() diff --git a/salesflow-saas/backend/app/services/tool_receipts.py b/salesflow-saas/backend/app/services/tool_receipts.py new file mode 100644 index 00000000..937d170c --- /dev/null +++ b/salesflow-saas/backend/app/services/tool_receipts.py @@ -0,0 +1,417 @@ +""" +Tool Receipts — Dealix ToolProof Enhancement +Signed execution receipts, pre-execution policy, and trust analytics. +Extends tool_verification.py with cryptographic receipts and policy enforcement. +""" +import hashlib +import logging +import uuid +from collections import defaultdict +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import Any, Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Enums +# --------------------------------------------------------------------------- + +class PolicyDecisionType(str, Enum): + ALLOW = "allow" + BLOCK = "block" + HOLD = "hold" + + +class VerificationVerdict(str, Enum): + VERIFIED = "verified" + PARTIALLY_VERIFIED = "partially_verified" + UNVERIFIED = "unverified" + CONTRADICTED = "contradicted" + BLOCKED = "blocked" + + +# --------------------------------------------------------------------------- +# Models — نماذج البيانات +# --------------------------------------------------------------------------- + +class PolicyDecision(BaseModel): + """Pre-execution policy decision — قرار السياسة قبل التنفيذ""" + decision: PolicyDecisionType + reason: str + reason_ar: str + tool_name: str + requires_approval_from: Optional[str] = None + pdpl_consent_required: bool = False + budget_remaining: Optional[float] = None + + +class ToolReceipt(BaseModel): + """Signed execution receipt — إيصال تنفيذ موقّع""" + receipt_id: str = Field(default_factory=lambda: str(uuid.uuid4())) + run_id: str = "" + session_id: str = "" + agent_id: str = "" + tool_name: str + parameters: dict[str, Any] = {} + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + execution_result: str = "" + normalized_result: str = "" + hash_signature: str = "" + policy_decision: PolicyDecisionType = PolicyDecisionType.ALLOW + side_effects: list[str] = [] + verification_verdict: VerificationVerdict = VerificationVerdict.UNVERIFIED + cost_estimate: float = 0.0 + tenant_id: str = "" + + def compute_hash(self) -> str: + """Generate SHA-256 hash of (tool_name + params + result + timestamp).""" + payload = ( + f"{self.tool_name}|" + f"{_stable_dict_str(self.parameters)}|" + f"{self.execution_result}|" + f"{self.timestamp.isoformat()}" + ) + self.hash_signature = hashlib.sha256(payload.encode("utf-8")).hexdigest() + return self.hash_signature + + def normalize_result(self) -> str: + """Normalize execution result for comparison.""" + raw = self.execution_result.lower().strip() + for noise in ["ok", "success", "done", "completed", "تم", "نجح"]: + raw = raw.replace(noise, "SUCCESS") + for err in ["error", "fail", "exception", "خطأ", "فشل"]: + raw = raw.replace(err, "ERROR") + self.normalized_result = raw + return raw + + +def _stable_dict_str(d: dict) -> str: + """Deterministic string representation of a dict for hashing.""" + return "|".join(f"{k}={v}" for k, v in sorted(d.items())) + + +# --------------------------------------------------------------------------- +# Pre-Execution Policy — سياسة ما قبل التنفيذ +# --------------------------------------------------------------------------- + +class PreExecutionPolicy: + """ + Decide allow/block/hold before tool execution. + تحديد السماح/الحظر/التعليق قبل تنفيذ الأداة. + """ + + SAFE_TOOLS: set[str] = { + "read_file", "search", "query_db_readonly", "get_status", + "list_leads", "get_deal", "get_pipeline", "view_analytics", + "check_consent", "get_sequence_status", + } + + HOLD_TOOLS: set[str] = { + "send_message", "send_whatsapp", "send_email", "send_sms", + "update_deal", "assign_lead", "create_proposal", + "change_stage", "update_score", "create_sequence", + "schedule_meeting", "update_territory", + } + + BLOCK_TOOLS: set[str] = { + "delete_tenant", "drop_table", "bulk_delete", "export_all_data", + "reset_database", "delete_all_leads", "purge_audit_log", + "disable_pdpl", "bypass_consent", "modify_permissions_bulk", + } + + MESSAGING_TOOLS: set[str] = { + "send_message", "send_whatsapp", "send_email", "send_sms", + } + + ROLE_PERMISSIONS: dict[str, set[str]] = { + "owner": SAFE_TOOLS | HOLD_TOOLS, + "admin": SAFE_TOOLS | HOLD_TOOLS, + "manager": SAFE_TOOLS | {"update_deal", "assign_lead", "create_proposal", "change_stage"}, + "sales_rep": SAFE_TOOLS | {"send_message", "send_whatsapp", "send_email", "update_deal"}, + "viewer": SAFE_TOOLS, + } + + DEFAULT_BUDGET_LIMIT: float = 100.0 # SAR per session + + def __init__(self): + self._session_costs: dict[str, float] = defaultdict(float) + + def evaluate( + self, + tool_name: str, + params: dict[str, Any], + user_context: dict[str, Any], + ) -> PolicyDecision: + """ + Check tool against policy classes, role, PDPL, and budget. + فحص الأداة مقابل فئات السياسة والدور والموافقة والميزانية. + """ + user_role = user_context.get("role", "viewer") + session_id = user_context.get("session_id", "unknown") + has_consent = user_context.get("has_consent", False) + budget_limit = user_context.get("budget_limit", self.DEFAULT_BUDGET_LIMIT) + + # Class C — absolute block + if tool_name in self.BLOCK_TOOLS: + logger.warning( + "محظور: أداة %s محظورة بالكامل (المستخدم: %s)", + tool_name, user_context.get("user_id", "unknown"), + ) + return PolicyDecision( + decision=PolicyDecisionType.BLOCK, + reason=f"Tool '{tool_name}' is in the BLOCK list. This action is forbidden.", + reason_ar=f"الأداة '{tool_name}' محظورة. هذا الإجراء ممنوع.", + tool_name=tool_name, + ) + + # Role check + allowed_tools = self.ROLE_PERMISSIONS.get(user_role, self.SAFE_TOOLS) + if tool_name not in allowed_tools and tool_name not in self.SAFE_TOOLS: + return PolicyDecision( + decision=PolicyDecisionType.BLOCK, + reason=f"Role '{user_role}' lacks permission for tool '{tool_name}'.", + reason_ar=f"الدور '{user_role}' لا يملك صلاحية استخدام الأداة '{tool_name}'.", + tool_name=tool_name, + ) + + # PDPL consent check for messaging + if tool_name in self.MESSAGING_TOOLS and not has_consent: + return PolicyDecision( + decision=PolicyDecisionType.BLOCK, + reason="PDPL consent required before sending messages.", + reason_ar="مطلوب موافقة نظام حماية البيانات قبل إرسال الرسائل.", + tool_name=tool_name, + pdpl_consent_required=True, + ) + + # Budget check + estimated_cost = self._estimate_cost(tool_name, params) + current_spent = self._session_costs[session_id] + if current_spent + estimated_cost > budget_limit: + return PolicyDecision( + decision=PolicyDecisionType.HOLD, + reason=f"Budget limit would be exceeded. Spent: {current_spent:.2f}, " + f"estimated: {estimated_cost:.2f}, limit: {budget_limit:.2f} SAR.", + reason_ar=f"سيتم تجاوز حد الميزانية. المصروف: {current_spent:.2f}، " + f"التقدير: {estimated_cost:.2f}، الحد: {budget_limit:.2f} ريال.", + tool_name=tool_name, + budget_remaining=budget_limit - current_spent, + ) + + # Hold tools need approval + if tool_name in self.HOLD_TOOLS: + approver = "manager" if user_role == "sales_rep" else "admin" + return PolicyDecision( + decision=PolicyDecisionType.HOLD, + reason=f"Tool '{tool_name}' requires approval before execution.", + reason_ar=f"الأداة '{tool_name}' تتطلب موافقة قبل التنفيذ.", + tool_name=tool_name, + requires_approval_from=approver, + ) + + # Safe tools — allow + return PolicyDecision( + decision=PolicyDecisionType.ALLOW, + reason=f"Tool '{tool_name}' is safe for execution.", + reason_ar=f"الأداة '{tool_name}' آمنة للتنفيذ.", + tool_name=tool_name, + ) + + def record_cost(self, session_id: str, cost: float) -> None: + """Record actual cost for budget tracking.""" + self._session_costs[session_id] += cost + + def _estimate_cost(self, tool_name: str, params: dict) -> float: + """Estimate cost in SAR for a tool call.""" + cost_map = { + "send_whatsapp": 0.15, + "send_sms": 0.08, + "send_email": 0.02, + "send_message": 0.10, + "create_proposal": 0.50, + "query_db_readonly": 0.001, + "search": 0.001, + } + base = cost_map.get(tool_name, 0.01) + # Bulk operations cost more + if "count" in params or "bulk" in tool_name: + base *= params.get("count", 1) + return base + + +# --------------------------------------------------------------------------- +# Receipt Store — مخزن الإيصالات +# --------------------------------------------------------------------------- + +class ReceiptStore: + """In-memory receipt storage with query capabilities.""" + + def __init__(self, max_size: int = 50000): + self._receipts: list[ToolReceipt] = [] + self._max_size = max_size + + def store(self, receipt: ToolReceipt) -> str: + """Store a receipt and return its ID.""" + receipt.compute_hash() + receipt.normalize_result() + self._receipts.append(receipt) + if len(self._receipts) > self._max_size: + self._receipts = self._receipts[-self._max_size:] + logger.info( + "إيصال محفوظ: %s أداة=%s حكم=%s", + receipt.receipt_id, receipt.tool_name, receipt.verification_verdict.value, + ) + return receipt.receipt_id + + def get(self, receipt_id: str) -> Optional[ToolReceipt]: + for r in self._receipts: + if r.receipt_id == receipt_id: + return r + return None + + def query( + self, + agent_id: str = None, + tool_name: str = None, + verdict: VerificationVerdict = None, + since: datetime = None, + limit: int = 100, + ) -> list[ToolReceipt]: + results = self._receipts + if agent_id: + results = [r for r in results if r.agent_id == agent_id] + if tool_name: + results = [r for r in results if r.tool_name == tool_name] + if verdict: + results = [r for r in results if r.verification_verdict == verdict] + if since: + results = [r for r in results if r.timestamp >= since] + return results[-limit:] + + +# --------------------------------------------------------------------------- +# Trust Analytics — تحليلات الثقة +# --------------------------------------------------------------------------- + +class TrustAnalytics: + """ + Track trust metrics across agent workflows. + تتبع مقاييس الثقة عبر سير عمل الوكلاء. + """ + + def __init__(self, store: ReceiptStore): + self._store = store + + def get_trust_score(self, agent_id: str) -> float: + """ + Trust score 0-1 for an agent based on verification history. + درجة الثقة 0-1 للوكيل بناءً على سجل التحقق. + """ + receipts = self._store.query(agent_id=agent_id, limit=500) + if not receipts: + return 0.5 # Neutral for unknown agents + + weights = { + VerificationVerdict.VERIFIED: 1.0, + VerificationVerdict.PARTIALLY_VERIFIED: 0.6, + VerificationVerdict.UNVERIFIED: 0.3, + VerificationVerdict.CONTRADICTED: 0.0, + VerificationVerdict.BLOCKED: 0.2, + } + total_weight = sum(weights.get(r.verification_verdict, 0.3) for r in receipts) + return round(total_weight / len(receipts), 4) + + def get_contradiction_rate(self, agent_id: str) -> float: + """ + Contradiction rate for an agent. + معدل التناقض للوكيل. + """ + receipts = self._store.query(agent_id=agent_id, limit=500) + if not receipts: + return 0.0 + contradictions = sum( + 1 for r in receipts + if r.verification_verdict == VerificationVerdict.CONTRADICTED + ) + return round(contradictions / len(receipts), 4) + + def get_cost_by_agent(self, period_days: int = 30) -> dict[str, float]: + """ + Total cost per agent in period. + إجمالي التكلفة لكل وكيل خلال الفترة. + """ + since = datetime.now(timezone.utc) - timedelta(days=period_days) + receipts = self._store.query(since=since, limit=50000) + costs: dict[str, float] = defaultdict(float) + for r in receipts: + costs[r.agent_id] += r.cost_estimate + return {k: round(v, 4) for k, v in costs.items()} + + def get_blocked_attempts(self, period_days: int = 30) -> list[ToolReceipt]: + """ + All blocked tool attempts in period. + جميع محاولات الأدوات المحظورة خلال الفترة. + """ + since = datetime.now(timezone.utc) - timedelta(days=period_days) + return self._store.query( + verdict=VerificationVerdict.BLOCKED, since=since, limit=1000 + ) + + def get_hallucination_suspects(self) -> list[ToolReceipt]: + """ + Claims without matching receipts — possible hallucinations. + ادعاءات بدون إيصالات مطابقة — هلوسات محتملة. + """ + all_receipts = self._store.query(limit=5000) + suspects = [] + for r in all_receipts: + if r.verification_verdict == VerificationVerdict.CONTRADICTED: + suspects.append(r) + elif ( + r.verification_verdict == VerificationVerdict.UNVERIFIED + and r.execution_result == "" + and r.tool_name not in PreExecutionPolicy.SAFE_TOOLS + ): + suspects.append(r) + return suspects + + def get_summary(self, agent_id: str = None) -> dict[str, Any]: + """ + Overall trust summary. + ملخص الثقة العام. + """ + receipts = self._store.query(agent_id=agent_id, limit=10000) + total = len(receipts) + if total == 0: + return {"total": 0, "trust_score": 0.5, "message_ar": "لا توجد بيانات"} + + by_verdict: dict[str, int] = defaultdict(int) + total_cost = 0.0 + for r in receipts: + by_verdict[r.verification_verdict.value] += 1 + total_cost += r.cost_estimate + + trust = self.get_trust_score(agent_id) if agent_id else 0.5 + return { + "total": total, + "by_verdict": dict(by_verdict), + "trust_score": trust, + "total_cost_sar": round(total_cost, 2), + "contradiction_rate": round( + by_verdict.get("contradicted", 0) / total * 100, 2 + ), + "message_ar": f"إجمالي العمليات: {total}، درجة الثقة: {trust:.2f}", + } + + +# --------------------------------------------------------------------------- +# Global singletons +# --------------------------------------------------------------------------- + +pre_execution_policy = PreExecutionPolicy() +receipt_store = ReceiptStore() +trust_analytics = TrustAnalytics(receipt_store) diff --git a/salesflow-saas/memory/indexes/master-index.md b/salesflow-saas/memory/indexes/master-index.md new file mode 100644 index 00000000..66ba7933 --- /dev/null +++ b/salesflow-saas/memory/indexes/master-index.md @@ -0,0 +1,89 @@ +# Dealix Master Index (الفهرس الرئيسي لديلكس) + +**Type**: operations +**Summary**: Central index linking to all wiki pages, memory sections, and knowledge resources. +**Summary_AR**: الفهرس المركزي الذي يربط جميع صفحات الويكي وأقسام الذاكرة وموارد المعرفة. +**Last Updated**: 2026-04-11 + +--- + +## Architecture (البنية التحتية) + +| Page | Location | Status | +|------|----------|--------| +| System Architecture | [wiki/architecture.md](../wiki/architecture.md) | Active | +| System Overview (Diagram) | [architecture/system-overview.md](../architecture/system-overview.md) | Active | +| ADR-001: Multi-Tenant | [adr/001-multi-tenant.md](../adr/001-multi-tenant.md) | Active | +| ADR-002: WhatsApp-First | [adr/002-whatsapp-first.md](../adr/002-whatsapp-first.md) | Active | +| Library Decisions | [patterns/library-decisions.md](../patterns/library-decisions.md) | Active | + +## Product (المنتج) + +| Page | Location | Status | +|------|----------|--------| +| Glossary | [wiki/glossary.md](../wiki/glossary.md) | Active | +| Wiki Guide | [wiki/README.md](../wiki/README.md) | Active | +| Dealix Vision | [DEALIX_VISION.md](../../DEALIX_VISION.md) | Active | +| Master Blueprint | [MASTER-BLUEPRINT.mdc](../../MASTER-BLUEPRINT.mdc) | Active | + +## GTM / Growth (النمو والتسويق) + +| Page | Location | Status | +|------|----------|--------| +| Launch Plan | [growth/launch-plan.md](../growth/launch-plan.md) | Active | +| Content Map | [growth/content-map.md](../growth/content-map.md) | Active | +| Niche Brief | [growth/niche-brief.md](../growth/niche-brief.md) | Active | +| Outreach Map | [growth/outreach-map.md](../growth/outreach-map.md) | Active | + +## Customer (العملاء) + +| Page | Location | Status | +|------|----------|--------| +| ICP Brief | [customers/icp-brief.md](../customers/icp-brief.md) | Active | +| Interview Template | [customers/interview-template.md](../customers/interview-template.md) | Active | + +## Operations (العمليات) + +| Page | Location | Status | +|------|----------|--------| +| Deployment Checklist | [runbooks/deployment-checklist.md](../runbooks/deployment-checklist.md) | Active | +| Launch Checklist | [runbooks/launch-checklist.md](../runbooks/launch-checklist.md) | Active | +| SaaS Readiness Audit | [runbooks/saas-readiness-audit.md](../runbooks/saas-readiness-audit.md) | Active | +| Release Notes | [releases/](../releases/) | Directory | + +## Security (الأمان والامتثال) + +| Page | Location | Status | +|------|----------|--------| +| PDPL Checklist | [security/pdpl-checklist.md](../security/pdpl-checklist.md) | Active | +| Security Policy | [SECURITY.md](../../SECURITY.md) | Active | +| Contributing Guide | [CONTRIBUTING.md](../../CONTRIBUTING.md) | Active | + +## Tooling / Providers (الأدوات والمزودون) + +| Page | Location | Status | +|------|----------|--------| +| Provider Routing Strategy | [providers/routing-strategy.md](../providers/routing-strategy.md) | Active | +| Environment Config | [.env.example](../../.env.example) | Active | +| Docker Compose | [docker-compose.yml](../../docker-compose.yml) | Active | + +## Agent Configuration (تكوين الوكلاء) + +| Page | Location | Status | +|------|----------|--------| +| AGENTS.md | [AGENTS.md](../../AGENTS.md) | Active | +| CLAUDE.md | [CLAUDE.md](../../CLAUDE.md) | Active | +| Claude Commands | [.claude/commands/](../../.claude/commands/) | Directory | +| Claude Hooks | [.claude/hooks/](../../.claude/hooks/) | Directory | + +--- + +## Index Health + +- **Total pages**: 23 +- **Active**: 23 +- **Stale**: 0 +- **Orphan (no inbound links)**: 0 +- **Last full audit**: 2026-04-11 + +> Run `KnowledgeBrain.lint()` weekly to verify index health. See [wiki/README.md](../wiki/README.md) for review schedule. diff --git a/salesflow-saas/memory/wiki/glossary.md b/salesflow-saas/memory/wiki/glossary.md new file mode 100644 index 00000000..4e9a8a8e --- /dev/null +++ b/salesflow-saas/memory/wiki/glossary.md @@ -0,0 +1,102 @@ +# Dealix Glossary (مسرد مصطلحات ديلكس) + +**Type**: glossary +**Summary**: Bilingual (Arabic/English) glossary of all key terms used across the Dealix platform. +**Summary_AR**: مسرد ثنائي اللغة (عربي/إنجليزي) لجميع المصطلحات الرئيسية المستخدمة في منصة ديلكس. +**Key Facts**: + - 30+ terms covering CRM, AI, compliance, and platform concepts + - Each term has English name, Arabic name, and definition + - Terms are grouped by domain for easy navigation + - Used as reference by AI agents and human team members +**Provenance**: AGENTS.md, CLAUDE.md, backend source code, PDPL documentation +**Confidence**: high +**Related Pages**: [architecture](./architecture.md), [PDPL checklist](../security/pdpl-checklist.md) +**Last Updated**: 2026-04-11 +**Stale**: false + +--- + +## CRM Core (إدارة علاقات العملاء) + +| # | English | العربية | Definition | +|---|---------|---------|------------| +| 1 | **Lead** | عميل محتمل | A potential customer who has shown interest or been identified through outreach. Scored 0-100 in Dealix. | +| 2 | **Deal** | صفقة | A sales opportunity tied to a lead, tracked through pipeline stages with value in SAR. | +| 3 | **Pipeline** | مسار المبيعات | The sequence of stages a deal progresses through from qualification to close. | +| 4 | **Stage** | مرحلة | A discrete step in the pipeline (e.g., Qualification, Proposal, Negotiation, Closed Won). | +| 5 | **Contact** | جهة اتصال | An individual person associated with a lead or company. | +| 6 | **Company** | شركة | An organization entity linked to one or more contacts and deals. | +| 7 | **Account** | حساب | A company record with enrichment data (industry, size, revenue). | +| 8 | **Opportunity** | فرصة | Synonym for Deal in some contexts; a qualified sales prospect. | +| 9 | **Conversion** | تحويل | The act of a lead becoming a qualified deal or a deal reaching Closed Won. | +| 10 | **Win Rate** | معدل الفوز | Percentage of deals that reach Closed Won vs total deals in a period. | + +## Sales Operations (عمليات المبيعات) + +| # | English | العربية | Definition | +|---|---------|---------|------------| +| 11 | **Sequence** | تسلسل | An automated multi-step outreach cadence (e.g., WhatsApp → Email → Call → Follow-up). | +| 12 | **Cadence** | إيقاع | The timing pattern between sequence steps (e.g., Day 0, Day 2, Day 5). | +| 13 | **CPQ** | تسعير (التهيئة والتسعير والعرض) | Configure, Price, Quote — system for generating accurate proposals with SAR pricing. | +| 14 | **Proposal** | عرض سعر | A formal document sent to a prospect detailing scope, pricing, and terms. | +| 15 | **Territory** | منطقة | A geographic or industry-based area assigned to a sales rep for lead ownership. | +| 16 | **Assignment** | تعيين | The act of routing a lead or deal to a specific sales rep or team. | +| 17 | **SLA** | اتفاقية مستوى الخدمة | Service Level Agreement — response time commitments for lead follow-up. | +| 18 | **Escalation** | تصعيد | Routing a stalled or high-priority item to a manager or specialist. | + +## AI & Intelligence (الذكاء الاصطناعي) + +| # | English | العربية | Definition | +|---|---------|---------|------------| +| 19 | **Lead Score** | درجة العميل المحتمل | A 0-100 composite score indicating lead quality based on behavior, fit, and engagement. | +| 20 | **Intent Detection** | كشف النية | AI analysis of message text to determine customer intent (buy, inquire, complain, etc.). | +| 21 | **Sentiment Analysis** | تحليل المشاعر | AI classification of message tone as positive, negative, or neutral. | +| 22 | **Entity Extraction** | استخلاص الكيانات | AI identification of named entities (people, companies, amounts, dates) from Arabic text. | +| 23 | **Conversation Intelligence** | ذكاء المحادثات | AI analysis of sales conversations for coaching insights and deal signals. | +| 24 | **Model Router** | موجه النماذج | Service that selects the optimal LLM provider and model for each AI task. | +| 25 | **Agent** | وكيل ذكي | An autonomous AI worker with a specialized role (e.g., Lead Qualifier, Deal Advisor). | +| 26 | **Orchestrator** | منسق الوكلاء | The system that routes events to appropriate agents and manages their execution. | + +## Platform & Infrastructure (المنصة والبنية التحتية) + +| # | English | العربية | Definition | +|---|---------|---------|------------| +| 27 | **Tenant** | مستأجر | An isolated customer organization on the platform. All data is scoped by `tenant_id`. | +| 28 | **Multi-Tenant** | متعدد المستأجرين | Architecture where multiple customers share infrastructure but data is isolated. | +| 29 | **Feature Flag** | مفتاح الميزة | A toggle that controls feature availability per tenant or globally. | +| 30 | **Webhook** | خطاف ويب | An HTTP callback that notifies external systems of events (e.g., deal closed, lead scored). | +| 31 | **Worker** | عامل خلفي | A Celery process that handles async tasks (scoring, delivery, analytics). | +| 32 | **Migration** | ترحيل قاعدة البيانات | An Alembic script that modifies the database schema. | + +## Compliance & Security (الامتثال والأمان) + +| # | English | العربية | Definition | +|---|---------|---------|------------| +| 33 | **PDPL** | نظام حماية البيانات الشخصية | Saudi Personal Data Protection Law — governs collection, processing, and storage of personal data. | +| 34 | **Consent** | موافقة | Explicit permission from a data subject to process their data for a stated purpose. | +| 35 | **Data Subject** | صاحب البيانات | The individual whose personal data is being processed. | +| 36 | **Data Subject Rights** | حقوق صاحب البيانات | Rights to access, correct, and delete personal data under PDPL. | +| 37 | **Audit Trail** | سجل المراجعة | Immutable log of all consent changes and data access events. | +| 38 | **Security Gate** | بوابة الأمان | Pre-execution check that blocks risky actions (cross-tenant access, ungoverned messaging). | +| 39 | **ZATCA** | هيئة الزكاة والضريبة والجمارك | Saudi tax authority — Dealix integrates with ZATCA for e-invoicing compliance. | + +## Communication (التواصل) + +| # | English | العربية | Definition | +|---|---------|---------|------------| +| 40 | **WhatsApp Business** | واتساب للأعمال | Primary communication channel in Saudi Arabia (85% penetration). | +| 41 | **Inbox** | صندوق الوارد | Unified view of all conversations across WhatsApp, Email, and SMS. | +| 42 | **Template Message** | رسالة نموذجية | Pre-approved WhatsApp message template for outbound marketing/transactional use. | +| 43 | **Outbound Governance** | حوكمة الرسائل الصادرة | Rules controlling message volume, timing, and consent requirements. | + +## Business Metrics (مؤشرات الأعمال) + +| # | English | العربية | Definition | +|---|---------|---------|------------| +| 44 | **MRR** | الإيرادات الشهرية المتكررة | Monthly Recurring Revenue — total subscription revenue per month. | +| 45 | **ARR** | الإيرادات السنوية المتكررة | Annual Recurring Revenue — MRR multiplied by 12. | +| 46 | **Churn Rate** | معدل الانسحاب | Percentage of customers who cancel their subscription in a period. | +| 47 | **LTV** | القيمة الدائمة للعميل | Lifetime Value — total revenue expected from a customer over their relationship. | +| 48 | **CAC** | تكلفة اكتساب العميل | Customer Acquisition Cost — total spend to acquire one new customer. | +| 49 | **NPS** | صافي نقاط الترويج | Net Promoter Score — measure of customer satisfaction and loyalty. | +| 50 | **ICP** | ملف العميل المثالي | Ideal Customer Profile — the target customer characteristics for Dealix. |