mirror of
https://github.com/x1xhlol/system-prompts-and-models-of-ai-tools.git
synced 2026-06-18 23:39:34 +00:00
77 lines
2.6 KiB
Python
77 lines
2.6 KiB
Python
import os
|
|
from typing import Dict, List, Any, Optional
|
|
import json
|
|
|
|
try:
|
|
from mem0 import Memory
|
|
except ImportError:
|
|
# Fallback mock for testing environments where mem0ai isn't available yet
|
|
class Memory:
|
|
def __init__(self, config=None):
|
|
self.store = []
|
|
|
|
def search(self, query: str, user_id: str, **kwargs):
|
|
return [{"text": "Mocked memory context."}]
|
|
|
|
def add(self, text: str, user_id: str, metadata: dict = None, **kwargs):
|
|
self.store.append({"text": text, "user_id": user_id, "metadata": metadata})
|
|
|
|
class SelfHealingMemory:
|
|
"""
|
|
Layer 3: Centralized Self-Healing Memory using Mem0 AI.
|
|
Provides episodic long-term memory for CrewAI Agents across the Dealix OS.
|
|
"""
|
|
def __init__(self, namespace="dealix_org"):
|
|
self.namespace = namespace
|
|
# Use simple configuration. Can be enhanced with Qdrant later.
|
|
self.config = {
|
|
"llm": {
|
|
"provider": "anthropic",
|
|
"config": {
|
|
"model": "claude-3-haiku-20240307",
|
|
"temperature": 0.1
|
|
}
|
|
}
|
|
}
|
|
self.memory = Memory(config=self.config)
|
|
|
|
def get_context(self, company_name: str, context_type: str = "general") -> str:
|
|
"""
|
|
Retrieves context about a company to inject into agent prompts.
|
|
"""
|
|
query = f"Provide all known {context_type} information regarding the company: {company_name}"
|
|
records = self.memory.search(query=query, user_id=self.namespace)
|
|
|
|
# Consolidate strings
|
|
if not records:
|
|
return "No prior context discovered in episodic memory."
|
|
|
|
context_str = "Memory Context:\n"
|
|
for rec in records:
|
|
# Mem0 records structure depends on version, usually contains 'memory' or 'text'
|
|
text_val = rec.get('text') or rec.get('memory') or str(rec)
|
|
context_str += f"- {text_val}\n"
|
|
|
|
return context_str
|
|
|
|
def add_insight(self, company_name: str, insight: str, tag: str = "discovery"):
|
|
"""
|
|
Stores episodic agent findings.
|
|
"""
|
|
metadata = {
|
|
"company": company_name,
|
|
"tag": tag,
|
|
"source": "autonomous_os"
|
|
}
|
|
self.memory.add(text=insight, user_id=self.namespace, metadata=metadata)
|
|
|
|
def consolidate(self, company_name: str):
|
|
"""
|
|
To be implemented by the Knowledge Distiller Node:
|
|
Cleans up redundant memories.
|
|
"""
|
|
pass
|
|
|
|
# Singleton instance for agents
|
|
empire_memory = SelfHealingMemory()
|