diff --git a/salesflow-saas/backend/app/services/local_inference.py b/salesflow-saas/backend/app/services/local_inference.py new file mode 100644 index 00000000..90d515f1 --- /dev/null +++ b/salesflow-saas/backend/app/services/local_inference.py @@ -0,0 +1,229 @@ +""" +Local Inference Adapter — Dealix AI Revenue OS +Connects to local/private LLM providers (Ollama, LM Studio, Atomic Chat) +via OpenAI-compatible API. Privacy-first, cost-optimized, Arabic-tuned. +""" +import logging +from datetime import datetime, timezone +from typing import Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +class LocalProvider(BaseModel): + name: str + base_url: str # e.g., "http://localhost:11434/v1" for Ollama + model: str # e.g., "qwen2.5:7b", "llama3.1:8b" + is_healthy: bool = False + last_check: Optional[datetime] = None + avg_latency_ms: float = 0.0 + total_calls: int = 0 + total_failures: int = 0 + + +# Default local providers to check +DEFAULT_PROVIDERS = [ + LocalProvider( + name="ollama", + base_url="http://localhost:11434/v1", + model="qwen2.5:7b", + ), + LocalProvider( + name="lm-studio", + base_url="http://localhost:1234/v1", + model="local-model", + ), + LocalProvider( + name="atomic-chat", + base_url="http://localhost:8080/v1", + model="default", + ), +] + +# Tasks suitable for local inference +LOCAL_SUITABLE_TASKS = { + "arabic_summarization": "تلخيص نصوص عربية", + "text_classification": "تصنيف نصوص", + "entity_extraction": "استخراج كيانات", + "internal_drafting": "صياغة مسودات داخلية", + "sentiment_analysis": "تحليل المشاعر", + "translation": "ترجمة نصوص", + "data_cleaning": "تنظيف بيانات", + "code_review_simple": "مراجعة كود بسيطة", +} + +# Tasks that should NEVER use local inference +CLOUD_ONLY_TASKS = { + "proposal_generation", + "complex_reasoning", + "long_document_analysis", + "customer_facing_messages", +} + + +class LocalInferenceResult(BaseModel): + provider: str + model: str + response: str + latency_ms: int + tokens_used: int = 0 + cost_usd: float = 0.0 # Local = free + success: bool = True + error: Optional[str] = None + + +class LocalInferenceAdapter: + """ + Adapter for local/private LLM inference. + Tries providers in order, falls back gracefully to cloud. + """ + + def __init__(self): + self._providers = list(DEFAULT_PROVIDERS) + self._primary: Optional[LocalProvider] = None + + async def health_check(self, provider: LocalProvider = None) -> bool: + """Check if a local provider is available.""" + targets = [provider] if provider else self._providers + for p in targets: + try: + import httpx + async with httpx.AsyncClient(timeout=5.0) as client: + resp = await client.get(f"{p.base_url}/models") + if resp.status_code == 200: + p.is_healthy = True + p.last_check = datetime.now(timezone.utc) + if not self._primary: + self._primary = p + logger.info(f"Local provider {p.name} is healthy at {p.base_url}") + return True + except Exception: + p.is_healthy = False + p.last_check = datetime.now(timezone.utc) + continue + return False + + async def health_check_all(self) -> dict[str, bool]: + """Check all configured local providers.""" + results = {} + for p in self._providers: + results[p.name] = await self.health_check(p) + return results + + def is_suitable_for_local(self, task_type: str) -> bool: + """Check if a task should use local inference.""" + if task_type in CLOUD_ONLY_TASKS: + return False + return task_type in LOCAL_SUITABLE_TASKS + + async def complete( + self, + prompt: str, + system_prompt: str = "", + task_type: str = "general", + max_tokens: int = 1024, + temperature: float = 0.7, + ) -> LocalInferenceResult: + """Run inference on local provider. Falls back gracefully.""" + if not self._primary or not self._primary.is_healthy: + await self.health_check() + + if not self._primary: + return LocalInferenceResult( + provider="none", + model="none", + response="", + latency_ms=0, + success=False, + error="لا يوجد مزود محلي متاح — استخدم السحابة", + ) + + start = datetime.now(timezone.utc) + provider = self._primary + + try: + import httpx + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + messages.append({"role": "user", "content": prompt}) + + async with httpx.AsyncClient(timeout=60.0) as client: + resp = await client.post( + f"{provider.base_url}/chat/completions", + json={ + "model": provider.model, + "messages": messages, + "max_tokens": max_tokens, + "temperature": temperature, + }, + ) + resp.raise_for_status() + data = resp.json() + + latency = int((datetime.now(timezone.utc) - start).total_seconds() * 1000) + provider.total_calls += 1 + provider.avg_latency_ms = ( + (provider.avg_latency_ms * (provider.total_calls - 1) + latency) + / provider.total_calls + ) + + content = data.get("choices", [{}])[0].get("message", {}).get("content", "") + tokens = data.get("usage", {}).get("total_tokens", 0) + + return LocalInferenceResult( + provider=provider.name, + model=provider.model, + response=content, + latency_ms=latency, + tokens_used=tokens, + cost_usd=0.0, + ) + + except Exception as e: + provider.total_failures += 1 + provider.is_healthy = False + latency = int((datetime.now(timezone.utc) - start).total_seconds() * 1000) + logger.warning(f"Local inference failed on {provider.name}: {e}") + return LocalInferenceResult( + provider=provider.name, + model=provider.model, + response="", + latency_ms=latency, + success=False, + error=str(e), + ) + + def add_provider(self, name: str, base_url: str, model: str) -> None: + """Register a new local provider.""" + self._providers.append(LocalProvider( + name=name, base_url=base_url, model=model, + )) + + def get_providers(self) -> list[dict]: + """List all configured providers with health status.""" + return [ + { + "name": p.name, + "base_url": p.base_url, + "model": p.model, + "healthy": p.is_healthy, + "last_check": p.last_check.isoformat() if p.last_check else None, + "avg_latency_ms": round(p.avg_latency_ms, 1), + "total_calls": p.total_calls, + "failure_rate": round( + p.total_failures / p.total_calls * 100, 1 + ) if p.total_calls > 0 else 0, + "is_primary": p == self._primary, + } + for p in self._providers + ] + + def get_suitable_tasks(self) -> dict[str, str]: + """List tasks suitable for local inference.""" + return dict(LOCAL_SUITABLE_TASKS) + + +local_inference = LocalInferenceAdapter() diff --git a/salesflow-saas/memory/runbooks/operations-schedule.md b/salesflow-saas/memory/runbooks/operations-schedule.md new file mode 100644 index 00000000..0f3edd06 --- /dev/null +++ b/salesflow-saas/memory/runbooks/operations-schedule.md @@ -0,0 +1,115 @@ +# Operations Schedule — Dealix AI Revenue OS + +**Date**: 2026-04-11 | **Status**: active + +## Daily Operations (يومي) + +| الوقت | المهمة | المسؤول | +|-------|--------|---------| +| 08:00 | فحص صحة جميع الخدمات (Docker, DB, Redis, Celery) | ops | +| 08:15 | مراجعة أخطاء Sentry الجديدة | ops | +| 08:30 | فحص صحة مزودي الاستدلال المحلي | ops | +| 09:00 | مراجعة تقرير المبيعات اليومي التلقائي | founder | +| 12:00 | فحص Celery Beat tasks (sequences, follow-ups) | ops | +| 16:00 | مراجعة tool verification logs — أي contradictions؟ | ops | +| 17:00 | فحص memory sync وwiki health | knowledge | + +### أوامر الفحص اليومي: +```bash +# Health check +curl -f https://api.dealix.sa/api/v1/health + +# Celery workers +docker compose exec celery-worker celery -A app.workers inspect active + +# Sentry errors (last 24h) +# Check https://sentry.io/organizations/dealix/ + +# Tool verification contradictions +curl https://api.dealix.sa/api/v1/hermes/health +``` + +## Weekly Operations (أسبوعي — كل أحد) + +| المهمة | المسؤول | +|--------|---------| +| تشغيل فحص Shannon الأمني على staging | security | +| مراجعة مزودي LLM: تكلفة + أداء + استقرار | ops | +| مقارنة local vs cloud: أي المهام أنسب محلياً؟ | ops | +| مراجعة الـ runs الفاشلة ومعرفة السبب الجذري | ops | +| مراجعة الإجراءات المتناقضة (contradicted actions) | security | +| تنظيف الذاكرة: حذف duplicates + archive stale | knowledge | +| مراجعة التكلفة الأسبوعية (هدف: < $50) | founder | +| مراجعة drift الأوامر والمهارات | ops | + +### أوامر الفحص الأسبوعي: +```bash +# Shannon security scan +curl -X POST https://api.dealix.sa/api/v1/hermes/security/scan \ + -H "Content-Type: application/json" \ + -d '{"environment": "staging", "base_url": "https://staging.dealix.sa"}' + +# Cost report +curl https://api.dealix.sa/api/v1/hermes/cost?period=weekly + +# Self-improvement cycle +curl -X POST https://api.dealix.sa/api/v1/hermes/improvements/cycle + +# Executive summary +curl https://api.dealix.sa/api/v1/hermes/executive-summary?period=weekly +``` + +## Monthly Operations (شهري — أول أحد من كل شهر) + +| المهمة | المسؤول | +|--------|---------| +| مراجعة انحراف المعمارية (architecture drift) | ops | +| مراجعة عملية الإطلاق والتحسين | ops | +| تدريب rollback drill (استعادة من النسخة الاحتياطية) | ops | +| تدريب backup/restore drill | ops | +| إعادة benchmark لمزودي LLM | ops | +| مراجعة انحراف نظام التصميم | delivery | +| مراجعة وإعادة هيكلة سير العمل | founder | +| تحديث ICP وstrategy بناءً على بيانات الشهر | founder | +| مراجعة PDPL compliance checklist | security | +| تقرير أداء شهري للمستثمرين/المؤسسين | founder | + +### أوامر الفحص الشهري: +```bash +# Full health report +curl https://api.dealix.sa/api/v1/hermes/health + +# Knowledge brain lint +# Run via Hermes: identify stale/orphan/duplicate wiki pages + +# Database backup test +pg_dump dealix > /tmp/test_restore.sql +psql -d dealix_test < /tmp/test_restore.sql +rm /tmp/test_restore.sql + +# Provider benchmark rerun +# Compare Groq vs OpenAI vs local on 50 test queries +``` + +## Emergency Procedures + +### Production Down +1. Check Docker: `docker compose ps` +2. Check logs: `docker compose logs -f backend --since 5m` +3. Restart if needed: `docker compose restart backend` +4. If persistent: rollback to last known good commit +5. Notify team in communication channel + +### Data Breach Suspicion +1. Immediately notify security profile +2. Check audit logs for unauthorized access +3. Check PDPL consent logs for anomalies +4. Run Shannon emergency scan on affected area +5. Prepare SDAIA notification if confirmed (within 72 hours) + +### Cost Spike +1. Check observability: `GET /hermes/cost?period=hourly` +2. Identify expensive workflow +3. Pause autopilot if needed +4. Switch to local inference for non-critical tasks +5. Review and optimize the expensive workflow