fix: Update memory engine and session continuity implementations

https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj
This commit is contained in:
Claude 2026-04-11 08:24:02 +00:00
parent c67164ffea
commit 30f134a5fa
No known key found for this signature in database
2 changed files with 332 additions and 858 deletions

View File

@ -1,615 +1,307 @@
"""
Memory Engine Dealix MemPalace Pattern
Pluggable memory adapter with evaluation and quality checks.
Supports Redis (production) and file-based (local/offline) backends.
"""
import json
import logging
import os
import re
import uuid
import json, logging, os, uuid
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from difflib import SequenceMatcher
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
MEMORY_BASE_DIR = Path(__file__).resolve().parents[4] / "memory"
STALENESS_DAYS = 30
# ---------------------------------------------------------------------------
# Models — نماذج البيانات
# ---------------------------------------------------------------------------
class MemoryDomain(str):
"""Domain tag for memory items."""
pass
MEMORY_BASE = Path(__file__).resolve().parents[4] / "memory"
STALE_DAYS = 30
class MemoryItem(BaseModel):
"""A single memory item — عنصر ذاكرة واحد"""
"""عنصر ذاكرة واحد"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
domain: str = "project" # project, customer, deal, competitor, prompt
content: str
metadata: dict[str, Any] = {}
source: str = ""
content: str; metadata: dict[str, Any] = {}; source: str = ""
confidence: float = Field(default=0.7, ge=0.0, le=1.0)
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
access_count: int = 0
is_canonical: bool = True # True = business data, False = derived/AI-generated
is_canonical: bool = True # True = business data, False = derived/AI
retention_days: int = 0 # 0 = permanent
tenant_id: str = ""
tags: list[str] = []
class Config:
json_schema_extra = {
"example": {
"domain": "customer",
"content": "Acme Corp prefers WhatsApp for all communication",
"source": "customer_interview_2026-04-10",
"confidence": 0.9,
"is_canonical": True,
}
}
tenant_id: str = ""; tags: list[str] = []
class EvalResult(BaseModel):
"""Evaluation result for memory quality — نتيجة تقييم جودة الذاكرة"""
total_queries: int = 0
correct_retrievals: int = 0
precision: float = 0.0
recall: float = 0.0
avg_rank: float = 0.0
"""نتيجة تقييم جودة الذاكرة"""
total_queries: int = 0; correct_retrievals: int = 0
precision: float = 0.0; recall: float = 0.0; avg_rank: float = 0.0
message_ar: str = ""
class MemoryStats(BaseModel):
"""Memory store statistics — إحصائيات مخزن الذاكرة"""
total_items: int = 0
by_domain: dict[str, int] = {}
canonical_count: int = 0
derived_count: int = 0
avg_confidence: float = 0.0
oldest_item: Optional[datetime] = None
newest_item: Optional[datetime] = None
"""إحصائيات مخزن الذاكرة"""
total_items: int = 0; by_domain: dict[str, int] = {}
canonical_count: int = 0; derived_count: int = 0; avg_confidence: float = 0.0
oldest_item: Optional[datetime] = None; newest_item: Optional[datetime] = None
message_ar: str = ""
# ---------------------------------------------------------------------------
# Abstract Adapter — المحول المجرد
# ---------------------------------------------------------------------------
class MemoryAdapter(ABC):
"""Abstract adapter — swap backends without rewriting app."""
@abstractmethod
async def store(self, item: MemoryItem) -> str:
"""Store item, return ID — تخزين عنصر وإرجاع المعرف"""
async def store(self, item: MemoryItem) -> str: ...
@abstractmethod
async def retrieve(
self, query: str, domain: str = None, limit: int = 5
) -> list[MemoryItem]:
"""Retrieve matching items — استرجاع العناصر المطابقة"""
async def retrieve(self, query: str, domain: str = None, limit: int = 5) -> list[MemoryItem]: ...
@abstractmethod
async def update(self, item_id: str, content: str) -> bool:
"""Update item content — تحديث محتوى العنصر"""
async def update(self, item_id: str, content: str) -> bool: ...
@abstractmethod
async def delete(self, item_id: str) -> bool:
"""Delete item — حذف العنصر"""
async def delete(self, item_id: str) -> bool: ...
@abstractmethod
async def search_by_entity(
self, entity_type: str, entity_id: str
) -> list[MemoryItem]:
"""Search by entity reference — البحث بمرجع الكيان"""
async def search_by_entity(self, entity_type: str, entity_id: str) -> list[MemoryItem]: ...
@abstractmethod
async def get_stats(self) -> MemoryStats:
"""Return store statistics — إرجاع إحصائيات المخزن"""
async def get_stats(self) -> MemoryStats: ...
@abstractmethod
async def list_all(self, domain: str = None) -> list[MemoryItem]:
"""List all items optionally filtered by domain."""
async def list_all(self, domain: str = None) -> list[MemoryItem]: ...
# ---------------------------------------------------------------------------
# Redis Adapter — محول ريدس
# ---------------------------------------------------------------------------
def _compute_stats(items: list[MemoryItem]) -> MemoryStats:
if not items: return MemoryStats(message_ar="لا توجد عناصر")
by_d: dict[str, int] = defaultdict(int)
can, tc = 0, 0.0
old = new = items[0].created_at
for i in items:
by_d[i.domain] += 1; can += i.is_canonical; tc += i.confidence
if i.created_at < old: old = i.created_at
if i.created_at > new: new = i.created_at
return MemoryStats(total_items=len(items), by_domain=dict(by_d), canonical_count=can,
derived_count=len(items)-can, avg_confidence=round(tc/len(items), 4),
oldest_item=old, newest_item=new,
message_ar=f"عناصر: {len(items)}، معتمدة: {can}، مشتقة: {len(items)-can}")
def _parse_dt(v: Any) -> datetime:
return datetime.fromisoformat(v) if isinstance(v, str) else v
class RedisMemoryAdapter(MemoryAdapter):
"""
Redis-backed memory for fast retrieval.
Uses Redis Search if available, falls back to key scanning.
ذاكرة مدعومة بريدس للاسترجاع السريع.
"""
KEY_PREFIX = "dealix:memory:"
"""ذاكرة مدعومة بريدس للاسترجاع السريع."""
PFX = "dealix:memory:"
def __init__(self, redis_client: Any = None, redis_url: str = None):
self._redis = redis_client
self._redis_url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379/0")
self._connected = False
self._url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379/0")
self._ok = False
async def _ensure_connection(self) -> None:
if self._redis is not None and self._connected:
return
try:
import redis.asyncio as aioredis
self._redis = aioredis.from_url(
self._redis_url, decode_responses=True
)
await self._redis.ping()
self._connected = True
logger.info("تم الاتصال بريدس: %s", self._redis_url)
except Exception as exc:
logger.warning("فشل الاتصال بريدس: %s — سيتم استخدام الذاكرة المحلية", exc)
self._connected = False
raise
async def _conn(self):
if self._redis and self._ok: return
import redis.asyncio as aioredis
self._redis = aioredis.from_url(self._url, decode_responses=True)
await self._redis.ping(); self._ok = True
def _key(self, item_id: str) -> str:
return f"{self.KEY_PREFIX}{item_id}"
def _domain_key(self, domain: str) -> str:
return f"{self.KEY_PREFIX}domain:{domain}"
def _entity_key(self, entity_type: str, entity_id: str) -> str:
return f"{self.KEY_PREFIX}entity:{entity_type}:{entity_id}"
def _k(self, id: str) -> str: return f"{self.PFX}{id}"
def _dk(self, d: str) -> str: return f"{self.PFX}domain:{d}"
def _ek(self, et: str, eid: str) -> str: return f"{self.PFX}entity:{et}:{eid}"
async def store(self, item: MemoryItem) -> str:
await self._ensure_connection()
await self._conn()
data = item.model_dump(mode="json")
data["created_at"] = item.created_at.isoformat()
data["updated_at"] = item.updated_at.isoformat()
data["created_at"] = item.created_at.isoformat(); data["updated_at"] = item.updated_at.isoformat()
pipe = self._redis.pipeline()
pipe.set(self._key(item.id), json.dumps(data, ensure_ascii=False))
pipe.sadd(self._domain_key(item.domain), item.id)
if item.retention_days > 0:
pipe.expire(self._key(item.id), item.retention_days * 86400)
# Index by entity if metadata has entity references
for etype in ("lead_id", "deal_id", "company_id", "tenant_id"):
if etype in item.metadata:
pipe.sadd(self._entity_key(etype, str(item.metadata[etype])), item.id)
await pipe.execute()
logger.info("تم تخزين عنصر ذاكرة في ريدس: %s (%s)", item.id, item.domain)
return item.id
pipe.set(self._k(item.id), json.dumps(data, ensure_ascii=False))
pipe.sadd(self._dk(item.domain), item.id)
if item.retention_days > 0: pipe.expire(self._k(item.id), item.retention_days * 86400)
for et in ("lead_id","deal_id","company_id","tenant_id"):
if et in item.metadata: pipe.sadd(self._ek(et, str(item.metadata[et])), item.id)
await pipe.execute(); return item.id
async def retrieve(
self, query: str, domain: str = None, limit: int = 5
) -> list[MemoryItem]:
await self._ensure_connection()
if domain:
ids = await self._redis.smembers(self._domain_key(domain))
else:
all_keys = []
async for key in self._redis.scan_iter(f"{self.KEY_PREFIX}[0-9a-f]*"):
all_keys.append(key.replace(self.KEY_PREFIX, ""))
ids = all_keys
items: list[MemoryItem] = []
query_lower = query.lower()
query_words = set(query_lower.split())
for item_id in ids:
raw = await self._redis.get(self._key(item_id))
if not raw:
continue
data = json.loads(raw)
content_lower = data.get("content", "").lower()
content_words = set(content_lower.split())
overlap = len(query_words & content_words)
if overlap > 0 or query_lower in content_lower:
data["created_at"] = datetime.fromisoformat(data["created_at"])
data["updated_at"] = datetime.fromisoformat(data["updated_at"])
item = MemoryItem(**data)
item.access_count += 1
items.append(item)
items.sort(key=lambda x: x.confidence, reverse=True)
return items[:limit]
async def retrieve(self, query: str, domain: str = None, limit: int = 5) -> list[MemoryItem]:
await self._conn()
ids = await self._redis.smembers(self._dk(domain)) if domain else [
k.replace(self.PFX, "") async for k in self._redis.scan_iter(f"{self.PFX}[0-9a-f]*")]
qw, items = set(query.lower().split()), []
for iid in ids:
raw = await self._redis.get(self._k(iid))
if not raw: continue
d = json.loads(raw); cl = d.get("content", "").lower()
if qw & set(cl.split()) or query.lower() in cl:
d["created_at"] = _parse_dt(d["created_at"]); d["updated_at"] = _parse_dt(d["updated_at"])
items.append(MemoryItem(**d))
items.sort(key=lambda x: -x.confidence); return items[:limit]
async def update(self, item_id: str, content: str) -> bool:
await self._ensure_connection()
raw = await self._redis.get(self._key(item_id))
if not raw:
return False
data = json.loads(raw)
data["content"] = content
data["updated_at"] = datetime.now(timezone.utc).isoformat()
await self._redis.set(self._key(item_id), json.dumps(data, ensure_ascii=False))
return True
await self._conn()
raw = await self._redis.get(self._k(item_id))
if not raw: return False
d = json.loads(raw); d["content"] = content; d["updated_at"] = datetime.now(timezone.utc).isoformat()
await self._redis.set(self._k(item_id), json.dumps(d, ensure_ascii=False)); return True
async def delete(self, item_id: str) -> bool:
await self._ensure_connection()
raw = await self._redis.get(self._key(item_id))
if not raw:
return False
data = json.loads(raw)
domain = data.get("domain", "project")
pipe = self._redis.pipeline()
pipe.delete(self._key(item_id))
pipe.srem(self._domain_key(domain), item_id)
await pipe.execute()
return True
await self._conn()
raw = await self._redis.get(self._k(item_id))
if not raw: return False
d = json.loads(raw)
pipe = self._redis.pipeline(); pipe.delete(self._k(item_id))
pipe.srem(self._dk(d.get("domain", "project")), item_id); await pipe.execute(); return True
async def search_by_entity(
self, entity_type: str, entity_id: str
) -> list[MemoryItem]:
await self._ensure_connection()
ids = await self._redis.smembers(self._entity_key(entity_type, entity_id))
items: list[MemoryItem] = []
for item_id in ids:
raw = await self._redis.get(self._key(item_id))
async def search_by_entity(self, entity_type: str, entity_id: str) -> list[MemoryItem]:
await self._conn()
items = []
for iid in await self._redis.smembers(self._ek(entity_type, entity_id)):
raw = await self._redis.get(self._k(iid))
if raw:
data = json.loads(raw)
data["created_at"] = datetime.fromisoformat(data["created_at"])
data["updated_at"] = datetime.fromisoformat(data["updated_at"])
items.append(MemoryItem(**data))
d = json.loads(raw); d["created_at"] = _parse_dt(d["created_at"]); d["updated_at"] = _parse_dt(d["updated_at"])
items.append(MemoryItem(**d))
return items
async def get_stats(self) -> MemoryStats:
await self._ensure_connection()
items = await self.list_all()
return _compute_stats(items)
async def get_stats(self) -> MemoryStats: return _compute_stats(await self.list_all())
async def list_all(self, domain: str = None) -> list[MemoryItem]:
await self._ensure_connection()
if domain:
ids = await self._redis.smembers(self._domain_key(domain))
else:
ids = set()
async for key in self._redis.scan_iter(f"{self.KEY_PREFIX}[0-9a-f]*"):
ids.add(key.replace(self.KEY_PREFIX, ""))
await self._conn()
ids = await self._redis.smembers(self._dk(domain)) if domain else {
k.replace(self.PFX, "") async for k in self._redis.scan_iter(f"{self.PFX}[0-9a-f]*")}
items = []
for item_id in ids:
raw = await self._redis.get(self._key(item_id))
for iid in ids:
raw = await self._redis.get(self._k(iid))
if raw:
data = json.loads(raw)
data["created_at"] = datetime.fromisoformat(data["created_at"])
data["updated_at"] = datetime.fromisoformat(data["updated_at"])
items.append(MemoryItem(**data))
d = json.loads(raw); d["created_at"] = _parse_dt(d["created_at"]); d["updated_at"] = _parse_dt(d["updated_at"])
items.append(MemoryItem(**d))
return items
# ---------------------------------------------------------------------------
# File Adapter — محول الملفات
# ---------------------------------------------------------------------------
class FileMemoryAdapter(MemoryAdapter):
"""
File-based memory for local/offline use.
Stores as JSON files in memory/ directory.
ذاكرة مبنية على الملفات للاستخدام المحلي/غير المتصل.
"""
"""ذاكرة مبنية على الملفات للاستخدام المحلي."""
def __init__(self, base_dir: Path = None):
self.base_dir = base_dir or MEMORY_BASE_DIR / "_store"
self.base_dir.mkdir(parents=True, exist_ok=True)
self.base = base_dir or MEMORY_BASE / "_store"; self.base.mkdir(parents=True, exist_ok=True)
def _item_path(self, item_id: str) -> Path:
return self.base_dir / f"{item_id}.json"
def _dd(self, domain: str) -> Path:
d = self.base / domain; d.mkdir(parents=True, exist_ok=True); return d
def _domain_dir(self, domain: str) -> Path:
d = self.base_dir / domain
d.mkdir(parents=True, exist_ok=True)
return d
def _ser(self, item: MemoryItem) -> str:
d = item.model_dump(mode="json")
d["created_at"] = item.created_at.isoformat(); d["updated_at"] = item.updated_at.isoformat()
return json.dumps(d, ensure_ascii=False, indent=2)
def _de(self, path: Path) -> MemoryItem:
d = json.loads(path.read_text(encoding="utf-8"))
d["created_at"] = _parse_dt(d["created_at"]); d["updated_at"] = _parse_dt(d["updated_at"])
return MemoryItem(**d)
async def store(self, item: MemoryItem) -> str:
file_path = self._domain_dir(item.domain) / f"{item.id}.json"
data = item.model_dump(mode="json")
data["created_at"] = item.created_at.isoformat()
data["updated_at"] = item.updated_at.isoformat()
file_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
logger.info("تم تخزين عنصر ذاكرة في ملف: %s (%s)", item.id, item.domain)
return item.id
(self._dd(item.domain) / f"{item.id}.json").write_text(self._ser(item), encoding="utf-8")
logger.info("ذاكرة ملف: %s (%s)", item.id, item.domain); return item.id
async def retrieve(
self, query: str, domain: str = None, limit: int = 5
) -> list[MemoryItem]:
items = await self.list_all(domain)
query_lower = query.lower()
query_words = set(query_lower.split())
scored: list[tuple[MemoryItem, float]] = []
for item in items:
content_lower = item.content.lower()
content_words = set(content_lower.split())
overlap = len(query_words & content_words)
if overlap > 0 or query_lower in content_lower:
score = (overlap / max(len(query_words), 1)) * item.confidence
scored.append((item, score))
scored.sort(key=lambda x: x[1], reverse=True)
results = [item for item, _ in scored[:limit]]
for item in results:
item.access_count += 1
await self._write_item(item)
return results
async def retrieve(self, query: str, domain: str = None, limit: int = 5) -> list[MemoryItem]:
items = await self.list_all(domain); qw = set(query.lower().split())
scored = []
for it in items:
cw = set(it.content.lower().split()); ov = len(qw & cw)
if ov > 0 or query.lower() in it.content.lower():
scored.append((it, (ov / max(len(qw), 1)) * it.confidence))
scored.sort(key=lambda x: -x[1])
for it, _ in scored[:limit]: it.access_count += 1; await self._write(it)
return [it for it, _ in scored[:limit]]
async def update(self, item_id: str, content: str) -> bool:
item = await self._find_item(item_id)
if not item:
return False
item.content = content
item.updated_at = datetime.now(timezone.utc)
await self._write_item(item)
return True
it = await self._find(item_id)
if not it: return False
it.content = content; it.updated_at = datetime.now(timezone.utc); await self._write(it); return True
async def delete(self, item_id: str) -> bool:
for domain_dir in self.base_dir.iterdir():
if not domain_dir.is_dir():
continue
path = domain_dir / f"{item_id}.json"
if path.exists():
path.unlink()
logger.info("تم حذف عنصر ذاكرة: %s", item_id)
return True
for dd in self.base.iterdir():
if not dd.is_dir(): continue
p = dd / f"{item_id}.json"
if p.exists(): p.unlink(); return True
return False
async def search_by_entity(
self, entity_type: str, entity_id: str
) -> list[MemoryItem]:
all_items = await self.list_all()
return [
item for item in all_items
if str(item.metadata.get(entity_type, "")) == str(entity_id)
]
async def search_by_entity(self, entity_type: str, entity_id: str) -> list[MemoryItem]:
return [i for i in await self.list_all() if str(i.metadata.get(entity_type, "")) == str(entity_id)]
async def get_stats(self) -> MemoryStats:
items = await self.list_all()
return _compute_stats(items)
async def get_stats(self) -> MemoryStats: return _compute_stats(await self.list_all())
async def list_all(self, domain: str = None) -> list[MemoryItem]:
items: list[MemoryItem] = []
search_dirs = (
[self._domain_dir(domain)] if domain
else [d for d in self.base_dir.iterdir() if d.is_dir()]
)
for dir_path in search_dirs:
for json_file in dir_path.glob("*.json"):
try:
data = json.loads(json_file.read_text(encoding="utf-8"))
if "created_at" in data and isinstance(data["created_at"], str):
data["created_at"] = datetime.fromisoformat(data["created_at"])
if "updated_at" in data and isinstance(data["updated_at"], str):
data["updated_at"] = datetime.fromisoformat(data["updated_at"])
items.append(MemoryItem(**data))
except Exception as exc:
logger.warning("فشل تحميل عنصر ذاكرة %s: %s", json_file.name, exc)
dirs = [self._dd(domain)] if domain else [d for d in self.base.iterdir() if d.is_dir()]
items = []
for dd in dirs:
for f in dd.glob("*.json"):
try: items.append(self._de(f))
except Exception as e: logger.warning("فشل تحميل %s: %s", f.name, e)
return items
async def _find_item(self, item_id: str) -> Optional[MemoryItem]:
for domain_dir in self.base_dir.iterdir():
if not domain_dir.is_dir():
continue
path = domain_dir / f"{item_id}.json"
if path.exists():
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data.get("created_at"), str):
data["created_at"] = datetime.fromisoformat(data["created_at"])
if isinstance(data.get("updated_at"), str):
data["updated_at"] = datetime.fromisoformat(data["updated_at"])
return MemoryItem(**data)
async def _find(self, item_id: str) -> Optional[MemoryItem]:
for dd in self.base.iterdir():
if not dd.is_dir(): continue
p = dd / f"{item_id}.json"
if p.exists(): return self._de(p)
return None
async def _write_item(self, item: MemoryItem) -> None:
file_path = self._domain_dir(item.domain) / f"{item.id}.json"
data = item.model_dump(mode="json")
data["created_at"] = item.created_at.isoformat()
data["updated_at"] = item.updated_at.isoformat()
file_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
async def _write(self, item: MemoryItem) -> None:
(self._dd(item.domain) / f"{item.id}.json").write_text(self._ser(item), encoding="utf-8")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _compute_stats(items: list[MemoryItem]) -> MemoryStats:
if not items:
return MemoryStats(message_ar="لا توجد عناصر في الذاكرة")
by_domain: dict[str, int] = defaultdict(int)
canonical = 0
total_conf = 0.0
oldest = items[0].created_at
newest = items[0].created_at
for item in items:
by_domain[item.domain] += 1
if item.is_canonical:
canonical += 1
total_conf += item.confidence
if item.created_at < oldest:
oldest = item.created_at
if item.created_at > newest:
newest = item.created_at
return MemoryStats(
total_items=len(items),
by_domain=dict(by_domain),
canonical_count=canonical,
derived_count=len(items) - canonical,
avg_confidence=round(total_conf / len(items), 4),
oldest_item=oldest,
newest_item=newest,
message_ar=f"إجمالي العناصر: {len(items)}، المعتمدة: {canonical}، المشتقة: {len(items) - canonical}",
)
# ---------------------------------------------------------------------------
# Memory Evaluator — مقيّم الذاكرة
# ---------------------------------------------------------------------------
class MemoryEvaluator:
"""
Evaluate memory quality before trusting it.
تقييم جودة الذاكرة قبل الوثوق بها.
"""
"""تقييم جودة الذاكرة قبل الوثوق بها."""
def __init__(self, adapter: MemoryAdapter):
self._adapter = adapter
self._a = adapter
async def benchmark_retrieval(
self,
test_queries: list[str],
expected_results: list[list[str]],
) -> EvalResult:
"""
Run retrieval benchmark against known query/result pairs.
تشغيل اختبار الاسترجاع مقابل أزواج استعلام/نتيجة معروفة.
"""
async def benchmark_retrieval(self, test_queries: list[str], expected_results: list[list[str]]) -> EvalResult:
if len(test_queries) != len(expected_results):
raise ValueError("test_queries and expected_results must have same length")
total = len(test_queries)
correct = 0
total_recall = 0.0
total_rank = 0.0
for query, expected in zip(test_queries, expected_results):
results = await self._adapter.retrieve(query, limit=10)
result_contents = [r.content.lower().strip() for r in results]
expected_lower = [e.lower().strip() for e in expected]
found_any = False
best_rank = len(results) + 1
matched = 0
for exp in expected_lower:
for rank, res in enumerate(result_contents):
if exp in res or SequenceMatcher(None, exp, res).ratio() > 0.7:
found_any = True
matched += 1
best_rank = min(best_rank, rank + 1)
break
if found_any:
correct += 1
if expected_lower:
total_recall += matched / len(expected_lower)
total_rank += best_rank if found_any else len(results) + 1
precision = correct / total if total else 0.0
recall = total_recall / total if total else 0.0
avg_rank = total_rank / total if total else 0.0
return EvalResult(
total_queries=total,
correct_retrievals=correct,
precision=round(precision, 4),
recall=round(recall, 4),
avg_rank=round(avg_rank, 2),
message_ar=f"الدقة: {precision:.2%}، الاستدعاء: {recall:.2%}، متوسط الترتيب: {avg_rank:.1f}",
)
raise ValueError("Mismatched lengths")
total, correct, t_recall, t_rank = len(test_queries), 0, 0.0, 0.0
for q, exp in zip(test_queries, expected_results):
res = [r.content.lower().strip() for r in await self._a.retrieve(q, limit=10)]
el = [e.lower().strip() for e in exp]; found, best, matched = False, len(res)+1, 0
for e in el:
for rank, r in enumerate(res):
if e in r or SequenceMatcher(None, e, r).ratio() > 0.7:
found = True; matched += 1; best = min(best, rank+1); break
if found: correct += 1
if el: t_recall += matched / len(el)
t_rank += best if found else len(res)+1
p, r = (correct/total if total else 0), (t_recall/total if total else 0)
ar = t_rank/total if total else 0
return EvalResult(total_queries=total, correct_retrievals=correct,
precision=round(p, 4), recall=round(r, 4), avg_rank=round(ar, 2),
message_ar=f"الدقة: {p:.2%}، الاستدعاء: {r:.2%}")
async def check_staleness(self, domain: str = None) -> list[MemoryItem]:
"""
Items not accessed in 30+ days.
العناصر التي لم يتم الوصول إليها منذ 30 يومًا أو أكثر.
"""
items = await self._adapter.list_all(domain)
cutoff = datetime.now(timezone.utc) - timedelta(days=STALENESS_DAYS)
return [item for item in items if item.updated_at < cutoff]
cutoff = datetime.now(timezone.utc) - timedelta(days=STALE_DAYS)
return [i for i in await self._a.list_all(domain) if i.updated_at < cutoff]
async def check_duplicates(self, domain: str = None) -> list[tuple[MemoryItem, MemoryItem]]:
"""
Find similar items that may be duplicates.
البحث عن عناصر متشابهة قد تكون مكررة.
"""
items = await self._adapter.list_all(domain)
duplicates: list[tuple[MemoryItem, MemoryItem]] = []
seen: set[str] = set()
items, dups, seen = await self._a.list_all(domain), [], set()
for i, a in enumerate(items):
for b in items[i + 1:]:
pair_key = f"{a.id}:{b.id}"
if pair_key in seen:
continue
ratio = SequenceMatcher(None, a.content.lower(), b.content.lower()).ratio()
if ratio > 0.8:
duplicates.append((a, b))
seen.add(pair_key)
return duplicates
for b in items[i+1:]:
k = f"{a.id}:{b.id}"
if k not in seen and SequenceMatcher(None, a.content.lower(), b.content.lower()).ratio() > 0.8:
dups.append((a, b)); seen.add(k)
return dups
async def check_contradictions(self, domain: str = None) -> list[tuple[MemoryItem, MemoryItem]]:
"""
Find items in the same domain with conflicting content.
البحث عن عناصر في نفس النطاق بمحتوى متناقض.
"""
items = await self._adapter.list_all(domain)
contradictions: list[tuple[MemoryItem, MemoryItem]] = []
negation_markers = {"not", "no", "never", "cannot", "لا", "ليس", "لن", "لم", "غير"}
items, contras = await self._a.list_all(domain), []
negs = {"not","no","never","cannot","لا","ليس","لن","لم","غير"}
for i, a in enumerate(items):
a_words = set(a.content.lower().split())
for b in items[i + 1:]:
if a.domain != b.domain:
continue
b_words = set(b.content.lower().split())
shared = a_words & b_words
a_negations = a_words & negation_markers
b_negations = b_words & negation_markers
# If they share many words but differ in negation, flag as contradiction
if len(shared) > 3 and a_negations != b_negations:
contradictions.append((a, b))
return contradictions
aw = set(a.content.lower().split())
for b in items[i+1:]:
if a.domain != b.domain: continue
bw = set(b.content.lower().split())
if len(aw & bw) > 3 and (aw & negs) != (bw & negs): contras.append((a, b))
return contras
async def get_health_report(self) -> dict[str, Any]:
"""
Overall memory health metrics.
مقاييس صحة الذاكرة العامة.
"""
stats = await self._adapter.get_stats()
stale = await self.check_staleness()
duplicates = await self.check_duplicates()
contradictions = await self.check_contradictions()
health_score = 1.0
if stats.total_items > 0:
stale_ratio = len(stale) / stats.total_items
dup_ratio = len(duplicates) / stats.total_items
contra_ratio = len(contradictions) / stats.total_items
health_score = max(0.0, 1.0 - stale_ratio * 0.3 - dup_ratio * 0.4 - contra_ratio * 0.5)
return {
"health_score": round(health_score, 4),
"total_items": stats.total_items,
"stale_items": len(stale),
"duplicate_pairs": len(duplicates),
"contradiction_pairs": len(contradictions),
"avg_confidence": stats.avg_confidence,
stats = await self._a.get_stats()
stale = await self.check_staleness(); dups = await self.check_duplicates()
contras = await self.check_contradictions()
hs = max(0.0, 1.0 - (len(stale)/max(stats.total_items,1))*0.3
- (len(dups)/max(stats.total_items,1))*0.4
- (len(contras)/max(stats.total_items,1))*0.5) if stats.total_items else 1.0
return {"health_score": round(hs, 4), "total_items": stats.total_items,
"stale_items": len(stale), "duplicate_pairs": len(dups),
"contradiction_pairs": len(contras), "avg_confidence": stats.avg_confidence,
"by_domain": stats.by_domain,
"message_ar": (
f"درجة الصحة: {health_score:.2%}، "
f"عناصر قديمة: {len(stale)}، "
f"تكرارات: {len(duplicates)}، "
f"تناقضات: {len(contradictions)}"
),
}
"message_ar": f"صحة: {hs:.2%}، قديمة: {len(stale)}، تكرار: {len(dups)}، تناقض: {len(contras)}"}
# ---------------------------------------------------------------------------
# Factory — مصنع المحولات
# ---------------------------------------------------------------------------
def create_memory_adapter(backend: str = None) -> MemoryAdapter:
"""
Create the appropriate memory adapter based on config.
إنشاء محول الذاكرة المناسب بناءً على التكوين.
"""
backend = backend or os.getenv("MEMORY_BACKEND", "file")
if backend == "redis":
return RedisMemoryAdapter()
return FileMemoryAdapter()
return RedisMemoryAdapter() if backend == "redis" else FileMemoryAdapter()
# Global instances
memory_adapter = create_memory_adapter()
memory_evaluator = MemoryEvaluator(memory_adapter)

View File

@ -1,447 +1,229 @@
"""
Session Continuity Dealix AI Session State Management
Maintains context across AI agent sessions for seamless handoff.
Stores decisions, failures, wins, and follow-ups between sessions.
"""
import json
import logging
import uuid
import json, logging, uuid
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
SESSIONS_DIR = Path(__file__).resolve().parents[4] / "memory" / "_sessions"
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
# ---------------------------------------------------------------------------
# Models — نماذج البيانات
# ---------------------------------------------------------------------------
class Decision(BaseModel):
"""A recorded decision — قرار مسجّل"""
decision: str
context: str
"""قرار مسجّل"""
decision: str; context: str; decision_ar: str = ""; made_by: str = ""
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
decision_ar: str = ""
made_by: str = ""
class Failure(BaseModel):
"""A recorded failure — فشل مسجّل"""
description: str
context: str
"""فشل مسجّل"""
description: str; context: str; description_ar: str = ""; resolution: str = ""
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
description_ar: str = ""
resolution: str = ""
class Win(BaseModel):
"""A recorded win — نجاح مسجّل"""
description: str
context: str
"""نجاح مسجّل"""
description: str; context: str; description_ar: str = ""
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
description_ar: str = ""
class FollowUp(BaseModel):
"""A pending follow-up task — مهمة متابعة معلّقة"""
task: str
task_ar: str = ""
due_date: Optional[datetime] = None
completed: bool = False
"""مهمة متابعة معلّقة"""
task: str; task_ar: str = ""; due_date: Optional[datetime] = None
completed: bool = False; assigned_to: str = ""
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
assigned_to: str = ""
class SessionState(BaseModel):
"""Full session state — حالة الجلسة الكاملة"""
"""حالة الجلسة الكاملة"""
session_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
project: str = "dealix"
active_workstreams: list[str] = []
last_decisions: list[Decision] = []
open_questions: list[str] = []
recent_failures: list[Failure] = []
recent_wins: list[Win] = []
project: str = "dealix"; active_workstreams: list[str] = []
last_decisions: list[Decision] = []; open_questions: list[str] = []
recent_failures: list[Failure] = []; recent_wins: list[Win] = []
pending_followups: list[FollowUp] = []
context_summary: str = ""
context_summary_ar: str = ""
context_summary: str = ""; context_summary_ar: str = ""
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
tags: list[str] = []
tenant_id: str = ""
class Config:
json_schema_extra = {
"example": {
"project": "dealix",
"active_workstreams": ["cpq-enhancement", "pdpl-audit"],
"context_summary": "Working on CPQ Arabic PDF generation and PDPL consent expiry.",
"context_summary_ar": "العمل على توليد PDF عربي للتسعير وانتهاء موافقة حماية البيانات.",
}
}
tags: list[str] = []; tenant_id: str = ""
# ---------------------------------------------------------------------------
# Session Continuity Service — خدمة استمرارية الجلسة
# ---------------------------------------------------------------------------
def _dt_hook(obj: Any) -> Any:
"""Convert datetime strings in nested dicts."""
if isinstance(obj, dict):
for k in ("timestamp", "created_at", "updated_at", "due_date"):
if k in obj and isinstance(obj[k], str) and obj[k]:
obj[k] = datetime.fromisoformat(obj[k])
return obj
return obj
class SessionContinuity:
"""
Maintain context across AI sessions.
الحفاظ على السياق عبر جلسات الذكاء الاصطناعي.
"""
"""الحفاظ على السياق عبر جلسات الذكاء الاصطناعي"""
def __init__(self, sessions_dir: Path = None):
self.sessions_dir = sessions_dir or SESSIONS_DIR
self.sessions_dir.mkdir(parents=True, exist_ok=True)
self.dir = sessions_dir or SESSIONS_DIR
self.dir.mkdir(parents=True, exist_ok=True)
self._current: Optional[SessionState] = None
def _session_path(self, session_id: str) -> Path:
return self.sessions_dir / f"{session_id}.json"
def _path(self, sid: str) -> Path: return self.dir / f"{sid}.json"
def _serialize_state(self, state: SessionState) -> str:
def _save_json(self, state: SessionState) -> None:
data = state.model_dump(mode="json")
# Convert datetime objects to ISO strings for JSON
# Ensure all datetimes are ISO strings
for key in ("created_at", "updated_at"):
if isinstance(data.get(key), datetime):
data[key] = data[key].isoformat()
for decision in data.get("last_decisions", []):
if isinstance(decision.get("timestamp"), datetime):
decision["timestamp"] = decision["timestamp"].isoformat()
for failure in data.get("recent_failures", []):
if isinstance(failure.get("timestamp"), datetime):
failure["timestamp"] = failure["timestamp"].isoformat()
for win in data.get("recent_wins", []):
if isinstance(win.get("timestamp"), datetime):
win["timestamp"] = win["timestamp"].isoformat()
for followup in data.get("pending_followups", []):
if isinstance(followup.get("due_date"), datetime):
followup["due_date"] = followup["due_date"].isoformat()
if isinstance(followup.get("created_at"), datetime):
followup["created_at"] = followup["created_at"].isoformat()
return json.dumps(data, ensure_ascii=False, indent=2)
if isinstance(data.get(key), datetime): data[key] = data[key].isoformat()
for lst in ("last_decisions", "recent_failures", "recent_wins", "pending_followups"):
for item in data.get(lst, []):
for dk in ("timestamp", "created_at", "updated_at", "due_date"):
if dk in item and isinstance(item[dk], datetime): item[dk] = item[dk].isoformat()
self._path(state.session_id).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def _deserialize_state(self, raw: str) -> SessionState:
data = json.loads(raw)
for key in ("created_at", "updated_at"):
if isinstance(data.get(key), str):
data[key] = datetime.fromisoformat(data[key])
for decision in data.get("last_decisions", []):
if isinstance(decision.get("timestamp"), str):
decision["timestamp"] = datetime.fromisoformat(decision["timestamp"])
for failure in data.get("recent_failures", []):
if isinstance(failure.get("timestamp"), str):
failure["timestamp"] = datetime.fromisoformat(failure["timestamp"])
for win in data.get("recent_wins", []):
if isinstance(win.get("timestamp"), str):
win["timestamp"] = datetime.fromisoformat(win["timestamp"])
for followup in data.get("pending_followups", []):
if isinstance(followup.get("due_date"), str) and followup["due_date"]:
followup["due_date"] = datetime.fromisoformat(followup["due_date"])
if isinstance(followup.get("created_at"), str):
followup["created_at"] = datetime.fromisoformat(followup["created_at"])
def _load_json(self, path: Path) -> SessionState:
data = json.loads(path.read_text(encoding="utf-8"))
for key in ("created_at", "updated_at"): _dt_hook(data) if key in data else None
for lst in ("last_decisions", "recent_failures", "recent_wins", "pending_followups"):
for item in data.get(lst, []): _dt_hook(item)
# Parse top-level dates
for k in ("created_at", "updated_at"):
if isinstance(data.get(k), str): data[k] = datetime.fromisoformat(data[k])
return SessionState(**data)
async def save_state(self, state: SessionState) -> str:
"""
Persist session state to disk.
حفظ حالة الجلسة على القرص.
"""
"""حفظ حالة الجلسة."""
state.updated_at = datetime.now(timezone.utc)
path = self._session_path(state.session_id)
path.write_text(self._serialize_state(state), encoding="utf-8")
self._current = state
logger.info("تم حفظ حالة الجلسة: %s", state.session_id)
return state.session_id
self._save_json(state); self._current = state
logger.info("حفظ جلسة: %s", state.session_id); return state.session_id
async def restore_state(self, session_id: str = None) -> SessionState:
"""
Restore session state. If no session_id, restore the latest.
استعادة حالة الجلسة. إذا لم يُحدد معرف، يتم استعادة الأحدث.
"""
"""استعادة حالة الجلسة — الأحدث إذا لم يُحدد معرف."""
if session_id:
path = self._session_path(session_id)
if path.exists():
state = self._deserialize_state(path.read_text(encoding="utf-8"))
self._current = state
logger.info("تم استعادة الجلسة: %s", session_id)
return state
logger.warning("الجلسة غير موجودة: %s", session_id)
p = self._path(session_id)
if p.exists():
s = self._load_json(p); self._current = s; return s
return SessionState(session_id=session_id)
# Find the latest session
latest_path: Optional[Path] = None
latest_mtime = 0.0
for f in self.sessions_dir.glob("*.json"):
mtime = f.stat().st_mtime
if mtime > latest_mtime:
latest_mtime = mtime
latest_path = f
if latest_path:
state = self._deserialize_state(latest_path.read_text(encoding="utf-8"))
self._current = state
logger.info("تم استعادة أحدث جلسة: %s", state.session_id)
return state
logger.info("لا توجد جلسات سابقة، إنشاء جلسة جديدة")
new_state = SessionState()
await self.save_state(new_state)
return new_state
# Find latest
latest = max(self.dir.glob("*.json"), key=lambda f: f.stat().st_mtime, default=None)
if latest:
s = self._load_json(latest); self._current = s; return s
s = SessionState(); await self.save_state(s); return s
async def get_restore_prompt(self) -> str:
"""
Generate a text prompt summarizing current state for a new AI session.
توليد نص ملخّص للحالة الحالية لتغذية جلسة ذكاء اصطناعي جديدة.
"""
state = self._current
if not state:
state = await self.restore_state()
lines = [
"# Session Restore — استعادة الجلسة",
f"**Project**: {state.project}",
f"**Session**: {state.session_id}",
f"**Last Updated**: {state.updated_at.isoformat()}",
"",
]
if state.context_summary:
lines.append(f"## Context (السياق)")
lines.append(state.context_summary)
if state.context_summary_ar:
lines.append(state.context_summary_ar)
"""توليد نص ملخّص للحالة الحالية لتغذية جلسة جديدة."""
s = self._current or await self.restore_state()
lines = [f"# Session Restore — استعادة الجلسة",
f"**Project**: {s.project} | **Session**: {s.session_id}",
f"**Updated**: {s.updated_at.isoformat()}", ""]
if s.context_summary:
lines += ["## Context (السياق)", s.context_summary]
if s.context_summary_ar: lines.append(s.context_summary_ar)
lines.append("")
if state.active_workstreams:
lines.append("## Active Workstreams (مسارات العمل النشطة)")
for ws in state.active_workstreams:
lines.append(f"- {ws}")
if s.active_workstreams:
lines += ["## Workstreams (مسارات العمل)"] + [f"- {w}" for w in s.active_workstreams] + [""]
if s.last_decisions:
lines.append("## Decisions (القرارات)")
for d in s.last_decisions[-5:]:
lines.append(f"- [{d.timestamp:%Y-%m-%d %H:%M}] {d.decision}")
if d.decision_ar: lines.append(f" {d.decision_ar}")
lines.append("")
if state.last_decisions:
lines.append("## Recent Decisions (القرارات الأخيرة)")
for d in state.last_decisions[-5:]:
ts = d.timestamp.strftime("%Y-%m-%d %H:%M")
lines.append(f"- [{ts}] {d.decision}")
if d.decision_ar:
lines.append(f" {d.decision_ar}")
lines.append("")
if state.open_questions:
lines.append("## Open Questions (أسئلة مفتوحة)")
for q in state.open_questions:
lines.append(f"- {q}")
lines.append("")
if state.recent_failures:
lines.append("## Recent Failures (الإخفاقات الأخيرة)")
for f in state.recent_failures[-3:]:
if s.open_questions:
lines += ["## Questions (أسئلة)"] + [f"- {q}" for q in s.open_questions] + [""]
if s.recent_failures:
lines.append("## Failures (إخفاقات)")
for f in s.recent_failures[-3:]:
lines.append(f"- {f.description}")
if f.resolution:
lines.append(f" Resolution: {f.resolution}")
if f.resolution: lines.append(f" Fix: {f.resolution}")
lines.append("")
if state.recent_wins:
lines.append("## Recent Wins (النجاحات الأخيرة)")
for w in state.recent_wins[-3:]:
lines.append(f"- {w.description}")
lines.append("")
pending = [fu for fu in state.pending_followups if not fu.completed]
if s.recent_wins:
lines += ["## Wins (نجاحات)"] + [f"- {w.description}" for w in s.recent_wins[-3:]] + [""]
pending = [fu for fu in s.pending_followups if not fu.completed]
if pending:
lines.append("## Pending Follow-ups (متابعات معلّقة)")
lines.append("## Follow-ups (متابعات)")
for fu in pending:
due = f" (due: {fu.due_date.strftime('%Y-%m-%d')})" if fu.due_date else ""
due = f" (due: {fu.due_date:%Y-%m-%d})" if fu.due_date else ""
lines.append(f"- {fu.task}{due}")
lines.append("")
lines.append("---")
lines.append("Continue from this state. Prioritize pending follow-ups and open questions.")
lines.append("استمر من هذه الحالة. أعطِ الأولوية للمتابعات المعلّقة والأسئلة المفتوحة.")
lines += ["---", "Continue from this state. Prioritize pending follow-ups.",
"استمر من هذه الحالة. أعطِ الأولوية للمتابعات المعلّقة."]
return "\n".join(lines)
async def add_decision(self, decision: str, context: str, decision_ar: str = "", made_by: str = "") -> None:
"""
Record a decision in the current session.
تسجيل قرار في الجلسة الحالية.
"""
if not self._current:
self._current = await self.restore_state()
async def _ensure_current(self) -> SessionState:
if not self._current: self._current = await self.restore_state()
return self._current
self._current.last_decisions.append(Decision(
decision=decision,
context=context,
decision_ar=decision_ar,
made_by=made_by,
))
# Keep last 20 decisions
if len(self._current.last_decisions) > 20:
self._current.last_decisions = self._current.last_decisions[-20:]
await self.save_state(self._current)
logger.info("تم تسجيل قرار: %s", decision[:80])
async def add_decision(self, decision: str, context: str, decision_ar: str = "", made_by: str = "") -> None:
"""تسجيل قرار."""
s = await self._ensure_current()
s.last_decisions.append(Decision(decision=decision, context=context, decision_ar=decision_ar, made_by=made_by))
if len(s.last_decisions) > 20: s.last_decisions = s.last_decisions[-20:]
await self.save_state(s)
async def add_failure(self, description: str, context: str, description_ar: str = "", resolution: str = "") -> None:
"""
Record a failure in the current session.
تسجيل فشل في الجلسة الحالية.
"""
if not self._current:
self._current = await self.restore_state()
self._current.recent_failures.append(Failure(
description=description,
context=context,
description_ar=description_ar,
resolution=resolution,
))
if len(self._current.recent_failures) > 10:
self._current.recent_failures = self._current.recent_failures[-10:]
await self.save_state(self._current)
logger.info("تم تسجيل فشل: %s", description[:80])
"""تسجيل فشل."""
s = await self._ensure_current()
s.recent_failures.append(Failure(description=description, context=context, description_ar=description_ar, resolution=resolution))
if len(s.recent_failures) > 10: s.recent_failures = s.recent_failures[-10:]
await self.save_state(s)
async def add_win(self, description: str, context: str, description_ar: str = "") -> None:
"""
Record a win in the current session.
تسجيل نجاح في الجلسة الحالية.
"""
if not self._current:
self._current = await self.restore_state()
self._current.recent_wins.append(Win(
description=description,
context=context,
description_ar=description_ar,
))
if len(self._current.recent_wins) > 10:
self._current.recent_wins = self._current.recent_wins[-10:]
await self.save_state(self._current)
logger.info("تم تسجيل نجاح: %s", description[:80])
"""تسجيل نجاح."""
s = await self._ensure_current()
s.recent_wins.append(Win(description=description, context=context, description_ar=description_ar))
if len(s.recent_wins) > 10: s.recent_wins = s.recent_wins[-10:]
await self.save_state(s)
async def add_followup(self, task: str, due_date: datetime = None, task_ar: str = "", assigned_to: str = "") -> None:
"""
Add a pending follow-up task.
إضافة مهمة متابعة معلّقة.
"""
if not self._current:
self._current = await self.restore_state()
self._current.pending_followups.append(FollowUp(
task=task,
task_ar=task_ar,
due_date=due_date,
assigned_to=assigned_to,
))
await self.save_state(self._current)
logger.info("تم إضافة متابعة: %s", task[:80])
"""إضافة مهمة متابعة."""
s = await self._ensure_current()
s.pending_followups.append(FollowUp(task=task, task_ar=task_ar, due_date=due_date, assigned_to=assigned_to))
await self.save_state(s)
async def complete_followup(self, task_substring: str) -> bool:
"""
Mark a follow-up as completed by matching task text.
تعليم متابعة كمكتملة عن طريق مطابقة نص المهمة.
"""
if not self._current:
self._current = await self.restore_state()
task_lower = task_substring.lower()
for fu in self._current.pending_followups:
if task_lower in fu.task.lower() and not fu.completed:
fu.completed = True
await self.save_state(self._current)
logger.info("تم إكمال متابعة: %s", fu.task[:80])
return True
"""تعليم متابعة كمكتملة."""
s = await self._ensure_current()
tl = task_substring.lower()
for fu in s.pending_followups:
if tl in fu.task.lower() and not fu.completed:
fu.completed = True; await self.save_state(s); return True
return False
async def set_workstreams(self, workstreams: list[str]) -> None:
"""
Update active workstreams.
تحديث مسارات العمل النشطة.
"""
if not self._current:
self._current = await self.restore_state()
self._current.active_workstreams = workstreams
await self.save_state(self._current)
s = await self._ensure_current(); s.active_workstreams = workstreams; await self.save_state(s)
async def set_context(self, summary: str, summary_ar: str = "") -> None:
"""
Update the context summary.
تحديث ملخص السياق.
"""
if not self._current:
self._current = await self.restore_state()
self._current.context_summary = summary
self._current.context_summary_ar = summary_ar
await self.save_state(self._current)
s = await self._ensure_current()
s.context_summary = summary; s.context_summary_ar = summary_ar; await self.save_state(s)
async def add_question(self, question: str) -> None:
"""
Add an open question.
إضافة سؤال مفتوح.
"""
if not self._current:
self._current = await self.restore_state()
if question not in self._current.open_questions:
self._current.open_questions.append(question)
if len(self._current.open_questions) > 15:
self._current.open_questions = self._current.open_questions[-15:]
await self.save_state(self._current)
s = await self._ensure_current()
if question not in s.open_questions:
s.open_questions.append(question)
if len(s.open_questions) > 15: s.open_questions = s.open_questions[-15:]
await self.save_state(s)
async def cleanup_old_sessions(self, days: int = 30) -> int:
"""
Remove session files older than N days.
حذف ملفات الجلسات الأقدم من N يوم.
"""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
removed = 0
for f in self.sessions_dir.glob("*.json"):
"""حذف جلسات أقدم من N يوم."""
cutoff, removed = datetime.now(timezone.utc) - timedelta(days=days), 0
for f in self.dir.glob("*.json"):
try:
data = json.loads(f.read_text(encoding="utf-8"))
updated = data.get("updated_at", "")
if isinstance(updated, str) and updated:
ts = datetime.fromisoformat(updated)
if ts < cutoff:
f.unlink()
removed += 1
except Exception as exc:
logger.warning("فشل معالجة ملف الجلسة %s: %s", f.name, exc)
logger.info("تم حذف %d جلسة قديمة (أقدم من %d يوم)", removed, days)
return removed
d = json.loads(f.read_text(encoding="utf-8"))
u = d.get("updated_at", "")
if isinstance(u, str) and u and datetime.fromisoformat(u) < cutoff:
f.unlink(); removed += 1
except Exception: pass
logger.info("حذف %d جلسة قديمة", removed); return removed
async def list_sessions(self, limit: int = 20) -> list[dict[str, Any]]:
"""
List recent sessions with basic info.
عرض الجلسات الأخيرة مع معلومات أساسية.
"""
sessions: list[dict[str, Any]] = []
for f in sorted(self.sessions_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True):
if len(sessions) >= limit:
break
"""عرض الجلسات الأخيرة."""
sessions = []
for f in sorted(self.dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)[:limit]:
try:
data = json.loads(f.read_text(encoding="utf-8"))
sessions.append({
"session_id": data.get("session_id", f.stem),
"project": data.get("project", ""),
"updated_at": data.get("updated_at", ""),
"workstreams": data.get("active_workstreams", []),
"decisions_count": len(data.get("last_decisions", [])),
"followups_pending": sum(
1 for fu in data.get("pending_followups", [])
if not fu.get("completed", False)
),
})
except Exception:
continue
d = json.loads(f.read_text(encoding="utf-8"))
sessions.append({"session_id": d.get("session_id", f.stem), "project": d.get("project", ""),
"updated_at": d.get("updated_at", ""), "workstreams": d.get("active_workstreams", []),
"decisions": len(d.get("last_decisions", [])),
"pending": sum(1 for fu in d.get("pending_followups", []) if not fu.get("completed"))})
except Exception: continue
return sessions
# ---------------------------------------------------------------------------
# Global singleton
# ---------------------------------------------------------------------------
session_continuity = SessionContinuity()