mirror of
https://github.com/x1xhlol/system-prompts-and-models-of-ai-tools.git
synced 2026-06-18 07:19:35 +00:00
190 lines
10 KiB
Python
190 lines
10 KiB
Python
"""
|
|
Tool Receipts — Dealix ToolProof Enhancement
|
|
Signed execution receipts, pre-execution policy, and trust analytics.
|
|
Extends tool_verification.py with cryptographic receipts and policy enforcement.
|
|
"""
|
|
import hashlib, logging, uuid
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta, timezone
|
|
from enum import Enum
|
|
from typing import Any, Optional
|
|
from pydantic import BaseModel, Field
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PolicyDecisionType(str, Enum):
|
|
ALLOW = "allow"; BLOCK = "block"; HOLD = "hold"
|
|
|
|
class VerificationVerdict(str, Enum):
|
|
VERIFIED = "verified"; PARTIALLY_VERIFIED = "partially_verified"
|
|
UNVERIFIED = "unverified"; CONTRADICTED = "contradicted"; BLOCKED = "blocked"
|
|
|
|
|
|
class PolicyDecision(BaseModel):
|
|
"""قرار السياسة قبل التنفيذ"""
|
|
decision: PolicyDecisionType; reason: str; reason_ar: str; tool_name: str
|
|
requires_approval_from: Optional[str] = None
|
|
pdpl_consent_required: bool = False
|
|
budget_remaining: Optional[float] = None
|
|
|
|
|
|
class ToolReceipt(BaseModel):
|
|
"""إيصال تنفيذ موقّع"""
|
|
receipt_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
|
|
run_id: str = ""; session_id: str = ""; agent_id: str = ""
|
|
tool_name: str; parameters: dict[str, Any] = {}
|
|
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
|
execution_result: str = ""; normalized_result: str = ""
|
|
hash_signature: str = ""
|
|
policy_decision: PolicyDecisionType = PolicyDecisionType.ALLOW
|
|
side_effects: list[str] = []
|
|
verification_verdict: VerificationVerdict = VerificationVerdict.UNVERIFIED
|
|
cost_estimate: float = 0.0; tenant_id: str = ""
|
|
|
|
def compute_hash(self) -> str:
|
|
payload = f"{self.tool_name}|{'|'.join(f'{k}={v}' for k,v in sorted(self.parameters.items()))}|{self.execution_result}|{self.timestamp.isoformat()}"
|
|
self.hash_signature = hashlib.sha256(payload.encode()).hexdigest()
|
|
return self.hash_signature
|
|
|
|
def normalize_result(self) -> str:
|
|
raw = self.execution_result.lower().strip()
|
|
for w in ["ok","success","done","completed","تم","نجح"]: raw = raw.replace(w, "SUCCESS")
|
|
for w in ["error","fail","exception","خطأ","فشل"]: raw = raw.replace(w, "ERROR")
|
|
self.normalized_result = raw; return raw
|
|
|
|
|
|
class PreExecutionPolicy:
|
|
"""تحديد السماح/الحظر/التعليق قبل تنفيذ الأداة."""
|
|
SAFE_TOOLS = {"read_file","search","query_db_readonly","get_status","list_leads",
|
|
"get_deal","get_pipeline","view_analytics","check_consent","get_sequence_status"}
|
|
HOLD_TOOLS = {"send_message","send_whatsapp","send_email","send_sms","update_deal",
|
|
"assign_lead","create_proposal","change_stage","update_score","create_sequence",
|
|
"schedule_meeting","update_territory"}
|
|
BLOCK_TOOLS = {"delete_tenant","drop_table","bulk_delete","export_all_data",
|
|
"reset_database","delete_all_leads","purge_audit_log","disable_pdpl",
|
|
"bypass_consent","modify_permissions_bulk"}
|
|
MSG_TOOLS = {"send_message","send_whatsapp","send_email","send_sms"}
|
|
ROLE_PERMS: dict[str, set[str]] = {
|
|
"owner": SAFE_TOOLS | HOLD_TOOLS, "admin": SAFE_TOOLS | HOLD_TOOLS,
|
|
"manager": SAFE_TOOLS | {"update_deal","assign_lead","create_proposal","change_stage"},
|
|
"sales_rep": SAFE_TOOLS | {"send_message","send_whatsapp","send_email","update_deal"},
|
|
"viewer": SAFE_TOOLS}
|
|
BUDGET_LIMIT = 100.0 # SAR per session
|
|
COST_MAP = {"send_whatsapp": 0.15, "send_sms": 0.08, "send_email": 0.02,
|
|
"send_message": 0.10, "create_proposal": 0.50, "query_db_readonly": 0.001}
|
|
|
|
def __init__(self):
|
|
self._costs: dict[str, float] = defaultdict(float)
|
|
|
|
def evaluate(self, tool_name: str, params: dict[str, Any], user_context: dict[str, Any]) -> PolicyDecision:
|
|
role = user_context.get("role", "viewer")
|
|
sid = user_context.get("session_id", "unknown")
|
|
limit = user_context.get("budget_limit", self.BUDGET_LIMIT)
|
|
if tool_name in self.BLOCK_TOOLS:
|
|
logger.warning("محظور: %s (المستخدم: %s)", tool_name, user_context.get("user_id", "?"))
|
|
return PolicyDecision(decision=PolicyDecisionType.BLOCK, tool_name=tool_name,
|
|
reason=f"Tool '{tool_name}' is forbidden.", reason_ar=f"الأداة '{tool_name}' محظورة.")
|
|
allowed = self.ROLE_PERMS.get(role, self.SAFE_TOOLS)
|
|
if tool_name not in allowed and tool_name not in self.SAFE_TOOLS:
|
|
return PolicyDecision(decision=PolicyDecisionType.BLOCK, tool_name=tool_name,
|
|
reason=f"Role '{role}' lacks permission for '{tool_name}'.",
|
|
reason_ar=f"الدور '{role}' لا يملك صلاحية '{tool_name}'.")
|
|
if tool_name in self.MSG_TOOLS and not user_context.get("has_consent", False):
|
|
return PolicyDecision(decision=PolicyDecisionType.BLOCK, tool_name=tool_name,
|
|
reason="PDPL consent required.", reason_ar="مطلوب موافقة حماية البيانات.",
|
|
pdpl_consent_required=True)
|
|
est = self.COST_MAP.get(tool_name, 0.01) * params.get("count", 1)
|
|
if self._costs[sid] + est > limit:
|
|
return PolicyDecision(decision=PolicyDecisionType.HOLD, tool_name=tool_name,
|
|
reason=f"Budget exceeded ({self._costs[sid]:.2f}+{est:.2f} > {limit:.2f} SAR).",
|
|
reason_ar=f"تجاوز الميزانية.", budget_remaining=limit - self._costs[sid])
|
|
if tool_name in self.HOLD_TOOLS:
|
|
approver = "manager" if role == "sales_rep" else "admin"
|
|
return PolicyDecision(decision=PolicyDecisionType.HOLD, tool_name=tool_name,
|
|
reason=f"'{tool_name}' requires approval.", reason_ar=f"'{tool_name}' تتطلب موافقة.",
|
|
requires_approval_from=approver)
|
|
return PolicyDecision(decision=PolicyDecisionType.ALLOW, tool_name=tool_name,
|
|
reason=f"'{tool_name}' is safe.", reason_ar=f"'{tool_name}' آمنة.")
|
|
|
|
def record_cost(self, session_id: str, cost: float) -> None:
|
|
self._costs[session_id] += cost
|
|
|
|
|
|
class ReceiptStore:
|
|
"""مخزن الإيصالات في الذاكرة"""
|
|
def __init__(self, max_size: int = 50000):
|
|
self._receipts: list[ToolReceipt] = []; self._max = max_size
|
|
|
|
def store(self, receipt: ToolReceipt) -> str:
|
|
receipt.compute_hash(); receipt.normalize_result()
|
|
self._receipts.append(receipt)
|
|
if len(self._receipts) > self._max: self._receipts = self._receipts[-self._max:]
|
|
logger.info("إيصال: %s أداة=%s حكم=%s", receipt.receipt_id, receipt.tool_name, receipt.verification_verdict.value)
|
|
return receipt.receipt_id
|
|
|
|
def get(self, receipt_id: str) -> Optional[ToolReceipt]:
|
|
return next((r for r in self._receipts if r.receipt_id == receipt_id), None)
|
|
|
|
def query(self, agent_id: str = None, tool_name: str = None,
|
|
verdict: VerificationVerdict = None, since: datetime = None, limit: int = 100) -> list[ToolReceipt]:
|
|
r = self._receipts
|
|
if agent_id: r = [x for x in r if x.agent_id == agent_id]
|
|
if tool_name: r = [x for x in r if x.tool_name == tool_name]
|
|
if verdict: r = [x for x in r if x.verification_verdict == verdict]
|
|
if since: r = [x for x in r if x.timestamp >= since]
|
|
return r[-limit:]
|
|
|
|
|
|
class TrustAnalytics:
|
|
"""تتبع مقاييس الثقة عبر سير عمل الوكلاء"""
|
|
WEIGHTS = {VerificationVerdict.VERIFIED: 1.0, VerificationVerdict.PARTIALLY_VERIFIED: 0.6,
|
|
VerificationVerdict.UNVERIFIED: 0.3, VerificationVerdict.CONTRADICTED: 0.0,
|
|
VerificationVerdict.BLOCKED: 0.2}
|
|
|
|
def __init__(self, store: ReceiptStore):
|
|
self._store = store
|
|
|
|
def get_trust_score(self, agent_id: str) -> float:
|
|
recs = self._store.query(agent_id=agent_id, limit=500)
|
|
if not recs: return 0.5
|
|
return round(sum(self.WEIGHTS.get(r.verification_verdict, 0.3) for r in recs) / len(recs), 4)
|
|
|
|
def get_contradiction_rate(self, agent_id: str) -> float:
|
|
recs = self._store.query(agent_id=agent_id, limit=500)
|
|
if not recs: return 0.0
|
|
return round(sum(1 for r in recs if r.verification_verdict == VerificationVerdict.CONTRADICTED) / len(recs), 4)
|
|
|
|
def get_cost_by_agent(self, period_days: int = 30) -> dict[str, float]:
|
|
since = datetime.now(timezone.utc) - timedelta(days=period_days)
|
|
costs: dict[str, float] = defaultdict(float)
|
|
for r in self._store.query(since=since, limit=50000): costs[r.agent_id] += r.cost_estimate
|
|
return {k: round(v, 4) for k, v in costs.items()}
|
|
|
|
def get_blocked_attempts(self, period_days: int = 30) -> list[ToolReceipt]:
|
|
return self._store.query(verdict=VerificationVerdict.BLOCKED,
|
|
since=datetime.now(timezone.utc) - timedelta(days=period_days), limit=1000)
|
|
|
|
def get_hallucination_suspects(self) -> list[ToolReceipt]:
|
|
return [r for r in self._store.query(limit=5000)
|
|
if r.verification_verdict == VerificationVerdict.CONTRADICTED
|
|
or (r.verification_verdict == VerificationVerdict.UNVERIFIED
|
|
and not r.execution_result and r.tool_name not in PreExecutionPolicy.SAFE_TOOLS)]
|
|
|
|
def get_summary(self, agent_id: str = None) -> dict[str, Any]:
|
|
recs = self._store.query(agent_id=agent_id, limit=10000)
|
|
if not recs: return {"total": 0, "trust_score": 0.5, "message_ar": "لا توجد بيانات"}
|
|
by_v: dict[str, int] = defaultdict(int)
|
|
cost = 0.0
|
|
for r in recs: by_v[r.verification_verdict.value] += 1; cost += r.cost_estimate
|
|
ts = self.get_trust_score(agent_id) if agent_id else 0.5
|
|
return {"total": len(recs), "by_verdict": dict(by_v), "trust_score": ts,
|
|
"total_cost_sar": round(cost, 2),
|
|
"contradiction_rate": round(by_v.get("contradicted", 0) / len(recs) * 100, 2),
|
|
"message_ar": f"عمليات: {len(recs)}، ثقة: {ts:.2f}"}
|
|
|
|
|
|
pre_execution_policy = PreExecutionPolicy()
|
|
receipt_store = ReceiptStore()
|
|
trust_analytics = TrustAnalytics(receipt_store)
|