From 30f134a5faf357051bab91fa3029fae5d3023803 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 11 Apr 2026 08:24:02 +0000 Subject: [PATCH] fix: Update memory engine and session continuity implementations https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj --- .../backend/app/services/memory_engine.py | 692 +++++------------- .../app/services/session_continuity.py | 498 ++++--------- 2 files changed, 332 insertions(+), 858 deletions(-) diff --git a/salesflow-saas/backend/app/services/memory_engine.py b/salesflow-saas/backend/app/services/memory_engine.py index 5755c814..067f5e28 100644 --- a/salesflow-saas/backend/app/services/memory_engine.py +++ b/salesflow-saas/backend/app/services/memory_engine.py @@ -1,615 +1,307 @@ """ Memory Engine — Dealix MemPalace Pattern Pluggable memory adapter with evaluation and quality checks. -Supports Redis (production) and file-based (local/offline) backends. """ -import json -import logging -import os -import re -import uuid +import json, logging, os, uuid from abc import ABC, abstractmethod from collections import defaultdict from datetime import datetime, timedelta, timezone from difflib import SequenceMatcher from pathlib import Path from typing import Any, Optional - from pydantic import BaseModel, Field logger = logging.getLogger(__name__) - -MEMORY_BASE_DIR = Path(__file__).resolve().parents[4] / "memory" -STALENESS_DAYS = 30 - - -# --------------------------------------------------------------------------- -# Models — نماذج البيانات -# --------------------------------------------------------------------------- - -class MemoryDomain(str): - """Domain tag for memory items.""" - pass +MEMORY_BASE = Path(__file__).resolve().parents[4] / "memory" +STALE_DAYS = 30 class MemoryItem(BaseModel): - """A single memory item — عنصر ذاكرة واحد""" + """عنصر ذاكرة واحد""" id: str = Field(default_factory=lambda: str(uuid.uuid4())) domain: str = "project" # project, customer, deal, competitor, prompt - content: str - metadata: dict[str, Any] = {} - source: str = "" + content: str; metadata: dict[str, Any] = {}; source: str = "" confidence: float = Field(default=0.7, ge=0.0, le=1.0) created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) access_count: int = 0 - is_canonical: bool = True # True = business data, False = derived/AI-generated + is_canonical: bool = True # True = business data, False = derived/AI retention_days: int = 0 # 0 = permanent - tenant_id: str = "" - tags: list[str] = [] - - class Config: - json_schema_extra = { - "example": { - "domain": "customer", - "content": "Acme Corp prefers WhatsApp for all communication", - "source": "customer_interview_2026-04-10", - "confidence": 0.9, - "is_canonical": True, - } - } - + tenant_id: str = ""; tags: list[str] = [] class EvalResult(BaseModel): - """Evaluation result for memory quality — نتيجة تقييم جودة الذاكرة""" - total_queries: int = 0 - correct_retrievals: int = 0 - precision: float = 0.0 - recall: float = 0.0 - avg_rank: float = 0.0 + """نتيجة تقييم جودة الذاكرة""" + total_queries: int = 0; correct_retrievals: int = 0 + precision: float = 0.0; recall: float = 0.0; avg_rank: float = 0.0 message_ar: str = "" - class MemoryStats(BaseModel): - """Memory store statistics — إحصائيات مخزن الذاكرة""" - total_items: int = 0 - by_domain: dict[str, int] = {} - canonical_count: int = 0 - derived_count: int = 0 - avg_confidence: float = 0.0 - oldest_item: Optional[datetime] = None - newest_item: Optional[datetime] = None + """إحصائيات مخزن الذاكرة""" + total_items: int = 0; by_domain: dict[str, int] = {} + canonical_count: int = 0; derived_count: int = 0; avg_confidence: float = 0.0 + oldest_item: Optional[datetime] = None; newest_item: Optional[datetime] = None message_ar: str = "" -# --------------------------------------------------------------------------- -# Abstract Adapter — المحول المجرد -# --------------------------------------------------------------------------- - class MemoryAdapter(ABC): """Abstract adapter — swap backends without rewriting app.""" - @abstractmethod - async def store(self, item: MemoryItem) -> str: - """Store item, return ID — تخزين عنصر وإرجاع المعرف""" - + async def store(self, item: MemoryItem) -> str: ... @abstractmethod - async def retrieve( - self, query: str, domain: str = None, limit: int = 5 - ) -> list[MemoryItem]: - """Retrieve matching items — استرجاع العناصر المطابقة""" - + async def retrieve(self, query: str, domain: str = None, limit: int = 5) -> list[MemoryItem]: ... @abstractmethod - async def update(self, item_id: str, content: str) -> bool: - """Update item content — تحديث محتوى العنصر""" - + async def update(self, item_id: str, content: str) -> bool: ... @abstractmethod - async def delete(self, item_id: str) -> bool: - """Delete item — حذف العنصر""" - + async def delete(self, item_id: str) -> bool: ... @abstractmethod - async def search_by_entity( - self, entity_type: str, entity_id: str - ) -> list[MemoryItem]: - """Search by entity reference — البحث بمرجع الكيان""" - + async def search_by_entity(self, entity_type: str, entity_id: str) -> list[MemoryItem]: ... @abstractmethod - async def get_stats(self) -> MemoryStats: - """Return store statistics — إرجاع إحصائيات المخزن""" - + async def get_stats(self) -> MemoryStats: ... @abstractmethod - async def list_all(self, domain: str = None) -> list[MemoryItem]: - """List all items optionally filtered by domain.""" + async def list_all(self, domain: str = None) -> list[MemoryItem]: ... -# --------------------------------------------------------------------------- -# Redis Adapter — محول ريدس -# --------------------------------------------------------------------------- +def _compute_stats(items: list[MemoryItem]) -> MemoryStats: + if not items: return MemoryStats(message_ar="لا توجد عناصر") + by_d: dict[str, int] = defaultdict(int) + can, tc = 0, 0.0 + old = new = items[0].created_at + for i in items: + by_d[i.domain] += 1; can += i.is_canonical; tc += i.confidence + if i.created_at < old: old = i.created_at + if i.created_at > new: new = i.created_at + return MemoryStats(total_items=len(items), by_domain=dict(by_d), canonical_count=can, + derived_count=len(items)-can, avg_confidence=round(tc/len(items), 4), + oldest_item=old, newest_item=new, + message_ar=f"عناصر: {len(items)}، معتمدة: {can}، مشتقة: {len(items)-can}") + +def _parse_dt(v: Any) -> datetime: + return datetime.fromisoformat(v) if isinstance(v, str) else v + class RedisMemoryAdapter(MemoryAdapter): - """ - Redis-backed memory for fast retrieval. - Uses Redis Search if available, falls back to key scanning. - ذاكرة مدعومة بريدس للاسترجاع السريع. - """ - - KEY_PREFIX = "dealix:memory:" + """ذاكرة مدعومة بريدس للاسترجاع السريع.""" + PFX = "dealix:memory:" def __init__(self, redis_client: Any = None, redis_url: str = None): self._redis = redis_client - self._redis_url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379/0") - self._connected = False + self._url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379/0") + self._ok = False - async def _ensure_connection(self) -> None: - if self._redis is not None and self._connected: - return - try: - import redis.asyncio as aioredis - self._redis = aioredis.from_url( - self._redis_url, decode_responses=True - ) - await self._redis.ping() - self._connected = True - logger.info("تم الاتصال بريدس: %s", self._redis_url) - except Exception as exc: - logger.warning("فشل الاتصال بريدس: %s — سيتم استخدام الذاكرة المحلية", exc) - self._connected = False - raise + async def _conn(self): + if self._redis and self._ok: return + import redis.asyncio as aioredis + self._redis = aioredis.from_url(self._url, decode_responses=True) + await self._redis.ping(); self._ok = True - def _key(self, item_id: str) -> str: - return f"{self.KEY_PREFIX}{item_id}" - - def _domain_key(self, domain: str) -> str: - return f"{self.KEY_PREFIX}domain:{domain}" - - def _entity_key(self, entity_type: str, entity_id: str) -> str: - return f"{self.KEY_PREFIX}entity:{entity_type}:{entity_id}" + def _k(self, id: str) -> str: return f"{self.PFX}{id}" + def _dk(self, d: str) -> str: return f"{self.PFX}domain:{d}" + def _ek(self, et: str, eid: str) -> str: return f"{self.PFX}entity:{et}:{eid}" async def store(self, item: MemoryItem) -> str: - await self._ensure_connection() + await self._conn() data = item.model_dump(mode="json") - data["created_at"] = item.created_at.isoformat() - data["updated_at"] = item.updated_at.isoformat() + data["created_at"] = item.created_at.isoformat(); data["updated_at"] = item.updated_at.isoformat() pipe = self._redis.pipeline() - pipe.set(self._key(item.id), json.dumps(data, ensure_ascii=False)) - pipe.sadd(self._domain_key(item.domain), item.id) - if item.retention_days > 0: - pipe.expire(self._key(item.id), item.retention_days * 86400) - # Index by entity if metadata has entity references - for etype in ("lead_id", "deal_id", "company_id", "tenant_id"): - if etype in item.metadata: - pipe.sadd(self._entity_key(etype, str(item.metadata[etype])), item.id) - await pipe.execute() - logger.info("تم تخزين عنصر ذاكرة في ريدس: %s (%s)", item.id, item.domain) - return item.id + pipe.set(self._k(item.id), json.dumps(data, ensure_ascii=False)) + pipe.sadd(self._dk(item.domain), item.id) + if item.retention_days > 0: pipe.expire(self._k(item.id), item.retention_days * 86400) + for et in ("lead_id","deal_id","company_id","tenant_id"): + if et in item.metadata: pipe.sadd(self._ek(et, str(item.metadata[et])), item.id) + await pipe.execute(); return item.id - async def retrieve( - self, query: str, domain: str = None, limit: int = 5 - ) -> list[MemoryItem]: - await self._ensure_connection() - if domain: - ids = await self._redis.smembers(self._domain_key(domain)) - else: - all_keys = [] - async for key in self._redis.scan_iter(f"{self.KEY_PREFIX}[0-9a-f]*"): - all_keys.append(key.replace(self.KEY_PREFIX, "")) - ids = all_keys - - items: list[MemoryItem] = [] - query_lower = query.lower() - query_words = set(query_lower.split()) - - for item_id in ids: - raw = await self._redis.get(self._key(item_id)) - if not raw: - continue - data = json.loads(raw) - content_lower = data.get("content", "").lower() - content_words = set(content_lower.split()) - overlap = len(query_words & content_words) - if overlap > 0 or query_lower in content_lower: - data["created_at"] = datetime.fromisoformat(data["created_at"]) - data["updated_at"] = datetime.fromisoformat(data["updated_at"]) - item = MemoryItem(**data) - item.access_count += 1 - items.append(item) - - items.sort(key=lambda x: x.confidence, reverse=True) - return items[:limit] + async def retrieve(self, query: str, domain: str = None, limit: int = 5) -> list[MemoryItem]: + await self._conn() + ids = await self._redis.smembers(self._dk(domain)) if domain else [ + k.replace(self.PFX, "") async for k in self._redis.scan_iter(f"{self.PFX}[0-9a-f]*")] + qw, items = set(query.lower().split()), [] + for iid in ids: + raw = await self._redis.get(self._k(iid)) + if not raw: continue + d = json.loads(raw); cl = d.get("content", "").lower() + if qw & set(cl.split()) or query.lower() in cl: + d["created_at"] = _parse_dt(d["created_at"]); d["updated_at"] = _parse_dt(d["updated_at"]) + items.append(MemoryItem(**d)) + items.sort(key=lambda x: -x.confidence); return items[:limit] async def update(self, item_id: str, content: str) -> bool: - await self._ensure_connection() - raw = await self._redis.get(self._key(item_id)) - if not raw: - return False - data = json.loads(raw) - data["content"] = content - data["updated_at"] = datetime.now(timezone.utc).isoformat() - await self._redis.set(self._key(item_id), json.dumps(data, ensure_ascii=False)) - return True + await self._conn() + raw = await self._redis.get(self._k(item_id)) + if not raw: return False + d = json.loads(raw); d["content"] = content; d["updated_at"] = datetime.now(timezone.utc).isoformat() + await self._redis.set(self._k(item_id), json.dumps(d, ensure_ascii=False)); return True async def delete(self, item_id: str) -> bool: - await self._ensure_connection() - raw = await self._redis.get(self._key(item_id)) - if not raw: - return False - data = json.loads(raw) - domain = data.get("domain", "project") - pipe = self._redis.pipeline() - pipe.delete(self._key(item_id)) - pipe.srem(self._domain_key(domain), item_id) - await pipe.execute() - return True + await self._conn() + raw = await self._redis.get(self._k(item_id)) + if not raw: return False + d = json.loads(raw) + pipe = self._redis.pipeline(); pipe.delete(self._k(item_id)) + pipe.srem(self._dk(d.get("domain", "project")), item_id); await pipe.execute(); return True - async def search_by_entity( - self, entity_type: str, entity_id: str - ) -> list[MemoryItem]: - await self._ensure_connection() - ids = await self._redis.smembers(self._entity_key(entity_type, entity_id)) - items: list[MemoryItem] = [] - for item_id in ids: - raw = await self._redis.get(self._key(item_id)) + async def search_by_entity(self, entity_type: str, entity_id: str) -> list[MemoryItem]: + await self._conn() + items = [] + for iid in await self._redis.smembers(self._ek(entity_type, entity_id)): + raw = await self._redis.get(self._k(iid)) if raw: - data = json.loads(raw) - data["created_at"] = datetime.fromisoformat(data["created_at"]) - data["updated_at"] = datetime.fromisoformat(data["updated_at"]) - items.append(MemoryItem(**data)) + d = json.loads(raw); d["created_at"] = _parse_dt(d["created_at"]); d["updated_at"] = _parse_dt(d["updated_at"]) + items.append(MemoryItem(**d)) return items - async def get_stats(self) -> MemoryStats: - await self._ensure_connection() - items = await self.list_all() - return _compute_stats(items) + async def get_stats(self) -> MemoryStats: return _compute_stats(await self.list_all()) async def list_all(self, domain: str = None) -> list[MemoryItem]: - await self._ensure_connection() - if domain: - ids = await self._redis.smembers(self._domain_key(domain)) - else: - ids = set() - async for key in self._redis.scan_iter(f"{self.KEY_PREFIX}[0-9a-f]*"): - ids.add(key.replace(self.KEY_PREFIX, "")) + await self._conn() + ids = await self._redis.smembers(self._dk(domain)) if domain else { + k.replace(self.PFX, "") async for k in self._redis.scan_iter(f"{self.PFX}[0-9a-f]*")} items = [] - for item_id in ids: - raw = await self._redis.get(self._key(item_id)) + for iid in ids: + raw = await self._redis.get(self._k(iid)) if raw: - data = json.loads(raw) - data["created_at"] = datetime.fromisoformat(data["created_at"]) - data["updated_at"] = datetime.fromisoformat(data["updated_at"]) - items.append(MemoryItem(**data)) + d = json.loads(raw); d["created_at"] = _parse_dt(d["created_at"]); d["updated_at"] = _parse_dt(d["updated_at"]) + items.append(MemoryItem(**d)) return items -# --------------------------------------------------------------------------- -# File Adapter — محول الملفات -# --------------------------------------------------------------------------- - class FileMemoryAdapter(MemoryAdapter): - """ - File-based memory for local/offline use. - Stores as JSON files in memory/ directory. - ذاكرة مبنية على الملفات للاستخدام المحلي/غير المتصل. - """ + """ذاكرة مبنية على الملفات للاستخدام المحلي.""" def __init__(self, base_dir: Path = None): - self.base_dir = base_dir or MEMORY_BASE_DIR / "_store" - self.base_dir.mkdir(parents=True, exist_ok=True) + self.base = base_dir or MEMORY_BASE / "_store"; self.base.mkdir(parents=True, exist_ok=True) - def _item_path(self, item_id: str) -> Path: - return self.base_dir / f"{item_id}.json" + def _dd(self, domain: str) -> Path: + d = self.base / domain; d.mkdir(parents=True, exist_ok=True); return d - def _domain_dir(self, domain: str) -> Path: - d = self.base_dir / domain - d.mkdir(parents=True, exist_ok=True) - return d + def _ser(self, item: MemoryItem) -> str: + d = item.model_dump(mode="json") + d["created_at"] = item.created_at.isoformat(); d["updated_at"] = item.updated_at.isoformat() + return json.dumps(d, ensure_ascii=False, indent=2) + + def _de(self, path: Path) -> MemoryItem: + d = json.loads(path.read_text(encoding="utf-8")) + d["created_at"] = _parse_dt(d["created_at"]); d["updated_at"] = _parse_dt(d["updated_at"]) + return MemoryItem(**d) async def store(self, item: MemoryItem) -> str: - file_path = self._domain_dir(item.domain) / f"{item.id}.json" - data = item.model_dump(mode="json") - data["created_at"] = item.created_at.isoformat() - data["updated_at"] = item.updated_at.isoformat() - file_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") - logger.info("تم تخزين عنصر ذاكرة في ملف: %s (%s)", item.id, item.domain) - return item.id + (self._dd(item.domain) / f"{item.id}.json").write_text(self._ser(item), encoding="utf-8") + logger.info("ذاكرة ملف: %s (%s)", item.id, item.domain); return item.id - async def retrieve( - self, query: str, domain: str = None, limit: int = 5 - ) -> list[MemoryItem]: - items = await self.list_all(domain) - query_lower = query.lower() - query_words = set(query_lower.split()) - scored: list[tuple[MemoryItem, float]] = [] - - for item in items: - content_lower = item.content.lower() - content_words = set(content_lower.split()) - overlap = len(query_words & content_words) - if overlap > 0 or query_lower in content_lower: - score = (overlap / max(len(query_words), 1)) * item.confidence - scored.append((item, score)) - - scored.sort(key=lambda x: x[1], reverse=True) - results = [item for item, _ in scored[:limit]] - for item in results: - item.access_count += 1 - await self._write_item(item) - return results + async def retrieve(self, query: str, domain: str = None, limit: int = 5) -> list[MemoryItem]: + items = await self.list_all(domain); qw = set(query.lower().split()) + scored = [] + for it in items: + cw = set(it.content.lower().split()); ov = len(qw & cw) + if ov > 0 or query.lower() in it.content.lower(): + scored.append((it, (ov / max(len(qw), 1)) * it.confidence)) + scored.sort(key=lambda x: -x[1]) + for it, _ in scored[:limit]: it.access_count += 1; await self._write(it) + return [it for it, _ in scored[:limit]] async def update(self, item_id: str, content: str) -> bool: - item = await self._find_item(item_id) - if not item: - return False - item.content = content - item.updated_at = datetime.now(timezone.utc) - await self._write_item(item) - return True + it = await self._find(item_id) + if not it: return False + it.content = content; it.updated_at = datetime.now(timezone.utc); await self._write(it); return True async def delete(self, item_id: str) -> bool: - for domain_dir in self.base_dir.iterdir(): - if not domain_dir.is_dir(): - continue - path = domain_dir / f"{item_id}.json" - if path.exists(): - path.unlink() - logger.info("تم حذف عنصر ذاكرة: %s", item_id) - return True + for dd in self.base.iterdir(): + if not dd.is_dir(): continue + p = dd / f"{item_id}.json" + if p.exists(): p.unlink(); return True return False - async def search_by_entity( - self, entity_type: str, entity_id: str - ) -> list[MemoryItem]: - all_items = await self.list_all() - return [ - item for item in all_items - if str(item.metadata.get(entity_type, "")) == str(entity_id) - ] + async def search_by_entity(self, entity_type: str, entity_id: str) -> list[MemoryItem]: + return [i for i in await self.list_all() if str(i.metadata.get(entity_type, "")) == str(entity_id)] - async def get_stats(self) -> MemoryStats: - items = await self.list_all() - return _compute_stats(items) + async def get_stats(self) -> MemoryStats: return _compute_stats(await self.list_all()) async def list_all(self, domain: str = None) -> list[MemoryItem]: - items: list[MemoryItem] = [] - search_dirs = ( - [self._domain_dir(domain)] if domain - else [d for d in self.base_dir.iterdir() if d.is_dir()] - ) - for dir_path in search_dirs: - for json_file in dir_path.glob("*.json"): - try: - data = json.loads(json_file.read_text(encoding="utf-8")) - if "created_at" in data and isinstance(data["created_at"], str): - data["created_at"] = datetime.fromisoformat(data["created_at"]) - if "updated_at" in data and isinstance(data["updated_at"], str): - data["updated_at"] = datetime.fromisoformat(data["updated_at"]) - items.append(MemoryItem(**data)) - except Exception as exc: - logger.warning("فشل تحميل عنصر ذاكرة %s: %s", json_file.name, exc) + dirs = [self._dd(domain)] if domain else [d for d in self.base.iterdir() if d.is_dir()] + items = [] + for dd in dirs: + for f in dd.glob("*.json"): + try: items.append(self._de(f)) + except Exception as e: logger.warning("فشل تحميل %s: %s", f.name, e) return items - async def _find_item(self, item_id: str) -> Optional[MemoryItem]: - for domain_dir in self.base_dir.iterdir(): - if not domain_dir.is_dir(): - continue - path = domain_dir / f"{item_id}.json" - if path.exists(): - data = json.loads(path.read_text(encoding="utf-8")) - if isinstance(data.get("created_at"), str): - data["created_at"] = datetime.fromisoformat(data["created_at"]) - if isinstance(data.get("updated_at"), str): - data["updated_at"] = datetime.fromisoformat(data["updated_at"]) - return MemoryItem(**data) + async def _find(self, item_id: str) -> Optional[MemoryItem]: + for dd in self.base.iterdir(): + if not dd.is_dir(): continue + p = dd / f"{item_id}.json" + if p.exists(): return self._de(p) return None - async def _write_item(self, item: MemoryItem) -> None: - file_path = self._domain_dir(item.domain) / f"{item.id}.json" - data = item.model_dump(mode="json") - data["created_at"] = item.created_at.isoformat() - data["updated_at"] = item.updated_at.isoformat() - file_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + async def _write(self, item: MemoryItem) -> None: + (self._dd(item.domain) / f"{item.id}.json").write_text(self._ser(item), encoding="utf-8") -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -def _compute_stats(items: list[MemoryItem]) -> MemoryStats: - if not items: - return MemoryStats(message_ar="لا توجد عناصر في الذاكرة") - by_domain: dict[str, int] = defaultdict(int) - canonical = 0 - total_conf = 0.0 - oldest = items[0].created_at - newest = items[0].created_at - for item in items: - by_domain[item.domain] += 1 - if item.is_canonical: - canonical += 1 - total_conf += item.confidence - if item.created_at < oldest: - oldest = item.created_at - if item.created_at > newest: - newest = item.created_at - return MemoryStats( - total_items=len(items), - by_domain=dict(by_domain), - canonical_count=canonical, - derived_count=len(items) - canonical, - avg_confidence=round(total_conf / len(items), 4), - oldest_item=oldest, - newest_item=newest, - message_ar=f"إجمالي العناصر: {len(items)}، المعتمدة: {canonical}، المشتقة: {len(items) - canonical}", - ) - - -# --------------------------------------------------------------------------- -# Memory Evaluator — مقيّم الذاكرة -# --------------------------------------------------------------------------- - class MemoryEvaluator: - """ - Evaluate memory quality before trusting it. - تقييم جودة الذاكرة قبل الوثوق بها. - """ + """تقييم جودة الذاكرة قبل الوثوق بها.""" def __init__(self, adapter: MemoryAdapter): - self._adapter = adapter + self._a = adapter - async def benchmark_retrieval( - self, - test_queries: list[str], - expected_results: list[list[str]], - ) -> EvalResult: - """ - Run retrieval benchmark against known query/result pairs. - تشغيل اختبار الاسترجاع مقابل أزواج استعلام/نتيجة معروفة. - """ + async def benchmark_retrieval(self, test_queries: list[str], expected_results: list[list[str]]) -> EvalResult: if len(test_queries) != len(expected_results): - raise ValueError("test_queries and expected_results must have same length") - - total = len(test_queries) - correct = 0 - total_recall = 0.0 - total_rank = 0.0 - - for query, expected in zip(test_queries, expected_results): - results = await self._adapter.retrieve(query, limit=10) - result_contents = [r.content.lower().strip() for r in results] - expected_lower = [e.lower().strip() for e in expected] - - found_any = False - best_rank = len(results) + 1 - matched = 0 - for exp in expected_lower: - for rank, res in enumerate(result_contents): - if exp in res or SequenceMatcher(None, exp, res).ratio() > 0.7: - found_any = True - matched += 1 - best_rank = min(best_rank, rank + 1) - break - - if found_any: - correct += 1 - if expected_lower: - total_recall += matched / len(expected_lower) - total_rank += best_rank if found_any else len(results) + 1 - - precision = correct / total if total else 0.0 - recall = total_recall / total if total else 0.0 - avg_rank = total_rank / total if total else 0.0 - - return EvalResult( - total_queries=total, - correct_retrievals=correct, - precision=round(precision, 4), - recall=round(recall, 4), - avg_rank=round(avg_rank, 2), - message_ar=f"الدقة: {precision:.2%}، الاستدعاء: {recall:.2%}، متوسط الترتيب: {avg_rank:.1f}", - ) + raise ValueError("Mismatched lengths") + total, correct, t_recall, t_rank = len(test_queries), 0, 0.0, 0.0 + for q, exp in zip(test_queries, expected_results): + res = [r.content.lower().strip() for r in await self._a.retrieve(q, limit=10)] + el = [e.lower().strip() for e in exp]; found, best, matched = False, len(res)+1, 0 + for e in el: + for rank, r in enumerate(res): + if e in r or SequenceMatcher(None, e, r).ratio() > 0.7: + found = True; matched += 1; best = min(best, rank+1); break + if found: correct += 1 + if el: t_recall += matched / len(el) + t_rank += best if found else len(res)+1 + p, r = (correct/total if total else 0), (t_recall/total if total else 0) + ar = t_rank/total if total else 0 + return EvalResult(total_queries=total, correct_retrievals=correct, + precision=round(p, 4), recall=round(r, 4), avg_rank=round(ar, 2), + message_ar=f"الدقة: {p:.2%}، الاستدعاء: {r:.2%}") async def check_staleness(self, domain: str = None) -> list[MemoryItem]: - """ - Items not accessed in 30+ days. - العناصر التي لم يتم الوصول إليها منذ 30 يومًا أو أكثر. - """ - items = await self._adapter.list_all(domain) - cutoff = datetime.now(timezone.utc) - timedelta(days=STALENESS_DAYS) - return [item for item in items if item.updated_at < cutoff] + cutoff = datetime.now(timezone.utc) - timedelta(days=STALE_DAYS) + return [i for i in await self._a.list_all(domain) if i.updated_at < cutoff] async def check_duplicates(self, domain: str = None) -> list[tuple[MemoryItem, MemoryItem]]: - """ - Find similar items that may be duplicates. - البحث عن عناصر متشابهة قد تكون مكررة. - """ - items = await self._adapter.list_all(domain) - duplicates: list[tuple[MemoryItem, MemoryItem]] = [] - seen: set[str] = set() - + items, dups, seen = await self._a.list_all(domain), [], set() for i, a in enumerate(items): - for b in items[i + 1:]: - pair_key = f"{a.id}:{b.id}" - if pair_key in seen: - continue - ratio = SequenceMatcher(None, a.content.lower(), b.content.lower()).ratio() - if ratio > 0.8: - duplicates.append((a, b)) - seen.add(pair_key) - - return duplicates + for b in items[i+1:]: + k = f"{a.id}:{b.id}" + if k not in seen and SequenceMatcher(None, a.content.lower(), b.content.lower()).ratio() > 0.8: + dups.append((a, b)); seen.add(k) + return dups async def check_contradictions(self, domain: str = None) -> list[tuple[MemoryItem, MemoryItem]]: - """ - Find items in the same domain with conflicting content. - البحث عن عناصر في نفس النطاق بمحتوى متناقض. - """ - items = await self._adapter.list_all(domain) - contradictions: list[tuple[MemoryItem, MemoryItem]] = [] - negation_markers = {"not", "no", "never", "cannot", "لا", "ليس", "لن", "لم", "غير"} - + items, contras = await self._a.list_all(domain), [] + negs = {"not","no","never","cannot","لا","ليس","لن","لم","غير"} for i, a in enumerate(items): - a_words = set(a.content.lower().split()) - for b in items[i + 1:]: - if a.domain != b.domain: - continue - b_words = set(b.content.lower().split()) - shared = a_words & b_words - a_negations = a_words & negation_markers - b_negations = b_words & negation_markers - # If they share many words but differ in negation, flag as contradiction - if len(shared) > 3 and a_negations != b_negations: - contradictions.append((a, b)) - - return contradictions + aw = set(a.content.lower().split()) + for b in items[i+1:]: + if a.domain != b.domain: continue + bw = set(b.content.lower().split()) + if len(aw & bw) > 3 and (aw & negs) != (bw & negs): contras.append((a, b)) + return contras async def get_health_report(self) -> dict[str, Any]: - """ - Overall memory health metrics. - مقاييس صحة الذاكرة العامة. - """ - stats = await self._adapter.get_stats() - stale = await self.check_staleness() - duplicates = await self.check_duplicates() - contradictions = await self.check_contradictions() - - health_score = 1.0 - if stats.total_items > 0: - stale_ratio = len(stale) / stats.total_items - dup_ratio = len(duplicates) / stats.total_items - contra_ratio = len(contradictions) / stats.total_items - health_score = max(0.0, 1.0 - stale_ratio * 0.3 - dup_ratio * 0.4 - contra_ratio * 0.5) - - return { - "health_score": round(health_score, 4), - "total_items": stats.total_items, - "stale_items": len(stale), - "duplicate_pairs": len(duplicates), - "contradiction_pairs": len(contradictions), - "avg_confidence": stats.avg_confidence, + stats = await self._a.get_stats() + stale = await self.check_staleness(); dups = await self.check_duplicates() + contras = await self.check_contradictions() + hs = max(0.0, 1.0 - (len(stale)/max(stats.total_items,1))*0.3 + - (len(dups)/max(stats.total_items,1))*0.4 + - (len(contras)/max(stats.total_items,1))*0.5) if stats.total_items else 1.0 + return {"health_score": round(hs, 4), "total_items": stats.total_items, + "stale_items": len(stale), "duplicate_pairs": len(dups), + "contradiction_pairs": len(contras), "avg_confidence": stats.avg_confidence, "by_domain": stats.by_domain, - "message_ar": ( - f"درجة الصحة: {health_score:.2%}، " - f"عناصر قديمة: {len(stale)}، " - f"تكرارات: {len(duplicates)}، " - f"تناقضات: {len(contradictions)}" - ), - } + "message_ar": f"صحة: {hs:.2%}، قديمة: {len(stale)}، تكرار: {len(dups)}، تناقض: {len(contras)}"} -# --------------------------------------------------------------------------- -# Factory — مصنع المحولات -# --------------------------------------------------------------------------- - def create_memory_adapter(backend: str = None) -> MemoryAdapter: - """ - Create the appropriate memory adapter based on config. - إنشاء محول الذاكرة المناسب بناءً على التكوين. - """ backend = backend or os.getenv("MEMORY_BACKEND", "file") - if backend == "redis": - return RedisMemoryAdapter() - return FileMemoryAdapter() + return RedisMemoryAdapter() if backend == "redis" else FileMemoryAdapter() - -# Global instances memory_adapter = create_memory_adapter() memory_evaluator = MemoryEvaluator(memory_adapter) diff --git a/salesflow-saas/backend/app/services/session_continuity.py b/salesflow-saas/backend/app/services/session_continuity.py index 231d2165..1e905643 100644 --- a/salesflow-saas/backend/app/services/session_continuity.py +++ b/salesflow-saas/backend/app/services/session_continuity.py @@ -1,447 +1,229 @@ """ Session Continuity — Dealix AI Session State Management Maintains context across AI agent sessions for seamless handoff. -Stores decisions, failures, wins, and follow-ups between sessions. """ -import json -import logging -import uuid +import json, logging, uuid from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Optional - from pydantic import BaseModel, Field logger = logging.getLogger(__name__) - SESSIONS_DIR = Path(__file__).resolve().parents[4] / "memory" / "_sessions" SESSIONS_DIR.mkdir(parents=True, exist_ok=True) -# --------------------------------------------------------------------------- -# Models — نماذج البيانات -# --------------------------------------------------------------------------- - class Decision(BaseModel): - """A recorded decision — قرار مسجّل""" - decision: str - context: str + """قرار مسجّل""" + decision: str; context: str; decision_ar: str = ""; made_by: str = "" timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - decision_ar: str = "" - made_by: str = "" - class Failure(BaseModel): - """A recorded failure — فشل مسجّل""" - description: str - context: str + """فشل مسجّل""" + description: str; context: str; description_ar: str = ""; resolution: str = "" timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - description_ar: str = "" - resolution: str = "" - class Win(BaseModel): - """A recorded win — نجاح مسجّل""" - description: str - context: str + """نجاح مسجّل""" + description: str; context: str; description_ar: str = "" timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - description_ar: str = "" - class FollowUp(BaseModel): - """A pending follow-up task — مهمة متابعة معلّقة""" - task: str - task_ar: str = "" - due_date: Optional[datetime] = None - completed: bool = False + """مهمة متابعة معلّقة""" + task: str; task_ar: str = ""; due_date: Optional[datetime] = None + completed: bool = False; assigned_to: str = "" created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - assigned_to: str = "" - class SessionState(BaseModel): - """Full session state — حالة الجلسة الكاملة""" + """حالة الجلسة الكاملة""" session_id: str = Field(default_factory=lambda: str(uuid.uuid4())) - project: str = "dealix" - active_workstreams: list[str] = [] - last_decisions: list[Decision] = [] - open_questions: list[str] = [] - recent_failures: list[Failure] = [] - recent_wins: list[Win] = [] + project: str = "dealix"; active_workstreams: list[str] = [] + last_decisions: list[Decision] = []; open_questions: list[str] = [] + recent_failures: list[Failure] = []; recent_wins: list[Win] = [] pending_followups: list[FollowUp] = [] - context_summary: str = "" - context_summary_ar: str = "" + context_summary: str = ""; context_summary_ar: str = "" created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - tags: list[str] = [] - tenant_id: str = "" - - class Config: - json_schema_extra = { - "example": { - "project": "dealix", - "active_workstreams": ["cpq-enhancement", "pdpl-audit"], - "context_summary": "Working on CPQ Arabic PDF generation and PDPL consent expiry.", - "context_summary_ar": "العمل على توليد PDF عربي للتسعير وانتهاء موافقة حماية البيانات.", - } - } + tags: list[str] = []; tenant_id: str = "" -# --------------------------------------------------------------------------- -# Session Continuity Service — خدمة استمرارية الجلسة -# --------------------------------------------------------------------------- +def _dt_hook(obj: Any) -> Any: + """Convert datetime strings in nested dicts.""" + if isinstance(obj, dict): + for k in ("timestamp", "created_at", "updated_at", "due_date"): + if k in obj and isinstance(obj[k], str) and obj[k]: + obj[k] = datetime.fromisoformat(obj[k]) + return obj + return obj + class SessionContinuity: - """ - Maintain context across AI sessions. - الحفاظ على السياق عبر جلسات الذكاء الاصطناعي. - """ + """الحفاظ على السياق عبر جلسات الذكاء الاصطناعي""" def __init__(self, sessions_dir: Path = None): - self.sessions_dir = sessions_dir or SESSIONS_DIR - self.sessions_dir.mkdir(parents=True, exist_ok=True) + self.dir = sessions_dir or SESSIONS_DIR + self.dir.mkdir(parents=True, exist_ok=True) self._current: Optional[SessionState] = None - def _session_path(self, session_id: str) -> Path: - return self.sessions_dir / f"{session_id}.json" + def _path(self, sid: str) -> Path: return self.dir / f"{sid}.json" - def _serialize_state(self, state: SessionState) -> str: + def _save_json(self, state: SessionState) -> None: data = state.model_dump(mode="json") - # Convert datetime objects to ISO strings for JSON + # Ensure all datetimes are ISO strings for key in ("created_at", "updated_at"): - if isinstance(data.get(key), datetime): - data[key] = data[key].isoformat() - for decision in data.get("last_decisions", []): - if isinstance(decision.get("timestamp"), datetime): - decision["timestamp"] = decision["timestamp"].isoformat() - for failure in data.get("recent_failures", []): - if isinstance(failure.get("timestamp"), datetime): - failure["timestamp"] = failure["timestamp"].isoformat() - for win in data.get("recent_wins", []): - if isinstance(win.get("timestamp"), datetime): - win["timestamp"] = win["timestamp"].isoformat() - for followup in data.get("pending_followups", []): - if isinstance(followup.get("due_date"), datetime): - followup["due_date"] = followup["due_date"].isoformat() - if isinstance(followup.get("created_at"), datetime): - followup["created_at"] = followup["created_at"].isoformat() - return json.dumps(data, ensure_ascii=False, indent=2) + if isinstance(data.get(key), datetime): data[key] = data[key].isoformat() + for lst in ("last_decisions", "recent_failures", "recent_wins", "pending_followups"): + for item in data.get(lst, []): + for dk in ("timestamp", "created_at", "updated_at", "due_date"): + if dk in item and isinstance(item[dk], datetime): item[dk] = item[dk].isoformat() + self._path(state.session_id).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") - def _deserialize_state(self, raw: str) -> SessionState: - data = json.loads(raw) - for key in ("created_at", "updated_at"): - if isinstance(data.get(key), str): - data[key] = datetime.fromisoformat(data[key]) - for decision in data.get("last_decisions", []): - if isinstance(decision.get("timestamp"), str): - decision["timestamp"] = datetime.fromisoformat(decision["timestamp"]) - for failure in data.get("recent_failures", []): - if isinstance(failure.get("timestamp"), str): - failure["timestamp"] = datetime.fromisoformat(failure["timestamp"]) - for win in data.get("recent_wins", []): - if isinstance(win.get("timestamp"), str): - win["timestamp"] = datetime.fromisoformat(win["timestamp"]) - for followup in data.get("pending_followups", []): - if isinstance(followup.get("due_date"), str) and followup["due_date"]: - followup["due_date"] = datetime.fromisoformat(followup["due_date"]) - if isinstance(followup.get("created_at"), str): - followup["created_at"] = datetime.fromisoformat(followup["created_at"]) + def _load_json(self, path: Path) -> SessionState: + data = json.loads(path.read_text(encoding="utf-8")) + for key in ("created_at", "updated_at"): _dt_hook(data) if key in data else None + for lst in ("last_decisions", "recent_failures", "recent_wins", "pending_followups"): + for item in data.get(lst, []): _dt_hook(item) + # Parse top-level dates + for k in ("created_at", "updated_at"): + if isinstance(data.get(k), str): data[k] = datetime.fromisoformat(data[k]) return SessionState(**data) async def save_state(self, state: SessionState) -> str: - """ - Persist session state to disk. - حفظ حالة الجلسة على القرص. - """ + """حفظ حالة الجلسة.""" state.updated_at = datetime.now(timezone.utc) - path = self._session_path(state.session_id) - path.write_text(self._serialize_state(state), encoding="utf-8") - self._current = state - logger.info("تم حفظ حالة الجلسة: %s", state.session_id) - return state.session_id + self._save_json(state); self._current = state + logger.info("حفظ جلسة: %s", state.session_id); return state.session_id async def restore_state(self, session_id: str = None) -> SessionState: - """ - Restore session state. If no session_id, restore the latest. - استعادة حالة الجلسة. إذا لم يُحدد معرف، يتم استعادة الأحدث. - """ + """استعادة حالة الجلسة — الأحدث إذا لم يُحدد معرف.""" if session_id: - path = self._session_path(session_id) - if path.exists(): - state = self._deserialize_state(path.read_text(encoding="utf-8")) - self._current = state - logger.info("تم استعادة الجلسة: %s", session_id) - return state - logger.warning("الجلسة غير موجودة: %s", session_id) + p = self._path(session_id) + if p.exists(): + s = self._load_json(p); self._current = s; return s return SessionState(session_id=session_id) - - # Find the latest session - latest_path: Optional[Path] = None - latest_mtime = 0.0 - for f in self.sessions_dir.glob("*.json"): - mtime = f.stat().st_mtime - if mtime > latest_mtime: - latest_mtime = mtime - latest_path = f - - if latest_path: - state = self._deserialize_state(latest_path.read_text(encoding="utf-8")) - self._current = state - logger.info("تم استعادة أحدث جلسة: %s", state.session_id) - return state - - logger.info("لا توجد جلسات سابقة، إنشاء جلسة جديدة") - new_state = SessionState() - await self.save_state(new_state) - return new_state + # Find latest + latest = max(self.dir.glob("*.json"), key=lambda f: f.stat().st_mtime, default=None) + if latest: + s = self._load_json(latest); self._current = s; return s + s = SessionState(); await self.save_state(s); return s async def get_restore_prompt(self) -> str: - """ - Generate a text prompt summarizing current state for a new AI session. - توليد نص ملخّص للحالة الحالية لتغذية جلسة ذكاء اصطناعي جديدة. - """ - state = self._current - if not state: - state = await self.restore_state() - - lines = [ - "# Session Restore — استعادة الجلسة", - f"**Project**: {state.project}", - f"**Session**: {state.session_id}", - f"**Last Updated**: {state.updated_at.isoformat()}", - "", - ] - - if state.context_summary: - lines.append(f"## Context (السياق)") - lines.append(state.context_summary) - if state.context_summary_ar: - lines.append(state.context_summary_ar) + """توليد نص ملخّص للحالة الحالية لتغذية جلسة جديدة.""" + s = self._current or await self.restore_state() + lines = [f"# Session Restore — استعادة الجلسة", + f"**Project**: {s.project} | **Session**: {s.session_id}", + f"**Updated**: {s.updated_at.isoformat()}", ""] + if s.context_summary: + lines += ["## Context (السياق)", s.context_summary] + if s.context_summary_ar: lines.append(s.context_summary_ar) lines.append("") - - if state.active_workstreams: - lines.append("## Active Workstreams (مسارات العمل النشطة)") - for ws in state.active_workstreams: - lines.append(f"- {ws}") + if s.active_workstreams: + lines += ["## Workstreams (مسارات العمل)"] + [f"- {w}" for w in s.active_workstreams] + [""] + if s.last_decisions: + lines.append("## Decisions (القرارات)") + for d in s.last_decisions[-5:]: + lines.append(f"- [{d.timestamp:%Y-%m-%d %H:%M}] {d.decision}") + if d.decision_ar: lines.append(f" {d.decision_ar}") lines.append("") - - if state.last_decisions: - lines.append("## Recent Decisions (القرارات الأخيرة)") - for d in state.last_decisions[-5:]: - ts = d.timestamp.strftime("%Y-%m-%d %H:%M") - lines.append(f"- [{ts}] {d.decision}") - if d.decision_ar: - lines.append(f" {d.decision_ar}") - lines.append("") - - if state.open_questions: - lines.append("## Open Questions (أسئلة مفتوحة)") - for q in state.open_questions: - lines.append(f"- {q}") - lines.append("") - - if state.recent_failures: - lines.append("## Recent Failures (الإخفاقات الأخيرة)") - for f in state.recent_failures[-3:]: + if s.open_questions: + lines += ["## Questions (أسئلة)"] + [f"- {q}" for q in s.open_questions] + [""] + if s.recent_failures: + lines.append("## Failures (إخفاقات)") + for f in s.recent_failures[-3:]: lines.append(f"- {f.description}") - if f.resolution: - lines.append(f" Resolution: {f.resolution}") + if f.resolution: lines.append(f" Fix: {f.resolution}") lines.append("") - - if state.recent_wins: - lines.append("## Recent Wins (النجاحات الأخيرة)") - for w in state.recent_wins[-3:]: - lines.append(f"- {w.description}") - lines.append("") - - pending = [fu for fu in state.pending_followups if not fu.completed] + if s.recent_wins: + lines += ["## Wins (نجاحات)"] + [f"- {w.description}" for w in s.recent_wins[-3:]] + [""] + pending = [fu for fu in s.pending_followups if not fu.completed] if pending: - lines.append("## Pending Follow-ups (متابعات معلّقة)") + lines.append("## Follow-ups (متابعات)") for fu in pending: - due = f" (due: {fu.due_date.strftime('%Y-%m-%d')})" if fu.due_date else "" + due = f" (due: {fu.due_date:%Y-%m-%d})" if fu.due_date else "" lines.append(f"- {fu.task}{due}") lines.append("") - - lines.append("---") - lines.append("Continue from this state. Prioritize pending follow-ups and open questions.") - lines.append("استمر من هذه الحالة. أعطِ الأولوية للمتابعات المعلّقة والأسئلة المفتوحة.") - + lines += ["---", "Continue from this state. Prioritize pending follow-ups.", + "استمر من هذه الحالة. أعطِ الأولوية للمتابعات المعلّقة."] return "\n".join(lines) - async def add_decision(self, decision: str, context: str, decision_ar: str = "", made_by: str = "") -> None: - """ - Record a decision in the current session. - تسجيل قرار في الجلسة الحالية. - """ - if not self._current: - self._current = await self.restore_state() + async def _ensure_current(self) -> SessionState: + if not self._current: self._current = await self.restore_state() + return self._current - self._current.last_decisions.append(Decision( - decision=decision, - context=context, - decision_ar=decision_ar, - made_by=made_by, - )) - # Keep last 20 decisions - if len(self._current.last_decisions) > 20: - self._current.last_decisions = self._current.last_decisions[-20:] - await self.save_state(self._current) - logger.info("تم تسجيل قرار: %s", decision[:80]) + async def add_decision(self, decision: str, context: str, decision_ar: str = "", made_by: str = "") -> None: + """تسجيل قرار.""" + s = await self._ensure_current() + s.last_decisions.append(Decision(decision=decision, context=context, decision_ar=decision_ar, made_by=made_by)) + if len(s.last_decisions) > 20: s.last_decisions = s.last_decisions[-20:] + await self.save_state(s) async def add_failure(self, description: str, context: str, description_ar: str = "", resolution: str = "") -> None: - """ - Record a failure in the current session. - تسجيل فشل في الجلسة الحالية. - """ - if not self._current: - self._current = await self.restore_state() - - self._current.recent_failures.append(Failure( - description=description, - context=context, - description_ar=description_ar, - resolution=resolution, - )) - if len(self._current.recent_failures) > 10: - self._current.recent_failures = self._current.recent_failures[-10:] - await self.save_state(self._current) - logger.info("تم تسجيل فشل: %s", description[:80]) + """تسجيل فشل.""" + s = await self._ensure_current() + s.recent_failures.append(Failure(description=description, context=context, description_ar=description_ar, resolution=resolution)) + if len(s.recent_failures) > 10: s.recent_failures = s.recent_failures[-10:] + await self.save_state(s) async def add_win(self, description: str, context: str, description_ar: str = "") -> None: - """ - Record a win in the current session. - تسجيل نجاح في الجلسة الحالية. - """ - if not self._current: - self._current = await self.restore_state() - - self._current.recent_wins.append(Win( - description=description, - context=context, - description_ar=description_ar, - )) - if len(self._current.recent_wins) > 10: - self._current.recent_wins = self._current.recent_wins[-10:] - await self.save_state(self._current) - logger.info("تم تسجيل نجاح: %s", description[:80]) + """تسجيل نجاح.""" + s = await self._ensure_current() + s.recent_wins.append(Win(description=description, context=context, description_ar=description_ar)) + if len(s.recent_wins) > 10: s.recent_wins = s.recent_wins[-10:] + await self.save_state(s) async def add_followup(self, task: str, due_date: datetime = None, task_ar: str = "", assigned_to: str = "") -> None: - """ - Add a pending follow-up task. - إضافة مهمة متابعة معلّقة. - """ - if not self._current: - self._current = await self.restore_state() - - self._current.pending_followups.append(FollowUp( - task=task, - task_ar=task_ar, - due_date=due_date, - assigned_to=assigned_to, - )) - await self.save_state(self._current) - logger.info("تم إضافة متابعة: %s", task[:80]) + """إضافة مهمة متابعة.""" + s = await self._ensure_current() + s.pending_followups.append(FollowUp(task=task, task_ar=task_ar, due_date=due_date, assigned_to=assigned_to)) + await self.save_state(s) async def complete_followup(self, task_substring: str) -> bool: - """ - Mark a follow-up as completed by matching task text. - تعليم متابعة كمكتملة عن طريق مطابقة نص المهمة. - """ - if not self._current: - self._current = await self.restore_state() - - task_lower = task_substring.lower() - for fu in self._current.pending_followups: - if task_lower in fu.task.lower() and not fu.completed: - fu.completed = True - await self.save_state(self._current) - logger.info("تم إكمال متابعة: %s", fu.task[:80]) - return True + """تعليم متابعة كمكتملة.""" + s = await self._ensure_current() + tl = task_substring.lower() + for fu in s.pending_followups: + if tl in fu.task.lower() and not fu.completed: + fu.completed = True; await self.save_state(s); return True return False async def set_workstreams(self, workstreams: list[str]) -> None: - """ - Update active workstreams. - تحديث مسارات العمل النشطة. - """ - if not self._current: - self._current = await self.restore_state() - self._current.active_workstreams = workstreams - await self.save_state(self._current) + s = await self._ensure_current(); s.active_workstreams = workstreams; await self.save_state(s) async def set_context(self, summary: str, summary_ar: str = "") -> None: - """ - Update the context summary. - تحديث ملخص السياق. - """ - if not self._current: - self._current = await self.restore_state() - self._current.context_summary = summary - self._current.context_summary_ar = summary_ar - await self.save_state(self._current) + s = await self._ensure_current() + s.context_summary = summary; s.context_summary_ar = summary_ar; await self.save_state(s) async def add_question(self, question: str) -> None: - """ - Add an open question. - إضافة سؤال مفتوح. - """ - if not self._current: - self._current = await self.restore_state() - if question not in self._current.open_questions: - self._current.open_questions.append(question) - if len(self._current.open_questions) > 15: - self._current.open_questions = self._current.open_questions[-15:] - await self.save_state(self._current) + s = await self._ensure_current() + if question not in s.open_questions: + s.open_questions.append(question) + if len(s.open_questions) > 15: s.open_questions = s.open_questions[-15:] + await self.save_state(s) async def cleanup_old_sessions(self, days: int = 30) -> int: - """ - Remove session files older than N days. - حذف ملفات الجلسات الأقدم من N يوم. - """ - cutoff = datetime.now(timezone.utc) - timedelta(days=days) - removed = 0 - for f in self.sessions_dir.glob("*.json"): + """حذف جلسات أقدم من N يوم.""" + cutoff, removed = datetime.now(timezone.utc) - timedelta(days=days), 0 + for f in self.dir.glob("*.json"): try: - data = json.loads(f.read_text(encoding="utf-8")) - updated = data.get("updated_at", "") - if isinstance(updated, str) and updated: - ts = datetime.fromisoformat(updated) - if ts < cutoff: - f.unlink() - removed += 1 - except Exception as exc: - logger.warning("فشل معالجة ملف الجلسة %s: %s", f.name, exc) - logger.info("تم حذف %d جلسة قديمة (أقدم من %d يوم)", removed, days) - return removed + d = json.loads(f.read_text(encoding="utf-8")) + u = d.get("updated_at", "") + if isinstance(u, str) and u and datetime.fromisoformat(u) < cutoff: + f.unlink(); removed += 1 + except Exception: pass + logger.info("حذف %d جلسة قديمة", removed); return removed async def list_sessions(self, limit: int = 20) -> list[dict[str, Any]]: - """ - List recent sessions with basic info. - عرض الجلسات الأخيرة مع معلومات أساسية. - """ - sessions: list[dict[str, Any]] = [] - for f in sorted(self.sessions_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True): - if len(sessions) >= limit: - break + """عرض الجلسات الأخيرة.""" + sessions = [] + for f in sorted(self.dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)[:limit]: try: - data = json.loads(f.read_text(encoding="utf-8")) - sessions.append({ - "session_id": data.get("session_id", f.stem), - "project": data.get("project", ""), - "updated_at": data.get("updated_at", ""), - "workstreams": data.get("active_workstreams", []), - "decisions_count": len(data.get("last_decisions", [])), - "followups_pending": sum( - 1 for fu in data.get("pending_followups", []) - if not fu.get("completed", False) - ), - }) - except Exception: - continue + d = json.loads(f.read_text(encoding="utf-8")) + sessions.append({"session_id": d.get("session_id", f.stem), "project": d.get("project", ""), + "updated_at": d.get("updated_at", ""), "workstreams": d.get("active_workstreams", []), + "decisions": len(d.get("last_decisions", [])), + "pending": sum(1 for fu in d.get("pending_followups", []) if not fu.get("completed"))}) + except Exception: continue return sessions -# --------------------------------------------------------------------------- -# Global singleton -# --------------------------------------------------------------------------- - session_continuity = SessionContinuity()