diff --git a/salesflow-saas/backend/app/api/routes/intelligence.py b/salesflow-saas/backend/app/api/routes/intelligence.py new file mode 100644 index 00000000..0a81f023 --- /dev/null +++ b/salesflow-saas/backend/app/api/routes/intelligence.py @@ -0,0 +1,581 @@ +""" +Revenue Intelligence OS — Lead Machine API +Endpoints for ICP, Discovery, Enrichment, Scoring, Outreach, Triggers +""" +import uuid +import json +import time +from flask import Blueprint, request, jsonify + +from app.core.database import db +from app.api.routes.auth import require_auth +from app.core.audit import log as audit_log +from app.intelligence.icp import ICPConfig, DEALIX_DEFAULT_ICP +from app.intelligence.pipeline import run_pipeline +from app.intelligence.triggers import scan_watchlist, scan_company_for_triggers +from app.intelligence.outreach import generate_outreach_brief +from app.intelligence.scoring import score_lead +from app.intelligence.enrichment import enrich_candidate, EnrichedLead + +intelligence_bp = Blueprint("intelligence", __name__, url_prefix="/api/intelligence") + + +def _json(data, status=200): + return jsonify(data), status + + +# ─── ICP MANAGEMENT ───────────────────────────────────────────────────────── + +@intelligence_bp.get("/icp") +@require_auth +def get_icp(user): + """Get active ICP config for org""" + with db() as conn: + row = conn.execute( + "SELECT * FROM icp_configs WHERE org_id=? AND is_active=1 ORDER BY created_at DESC LIMIT 1", + (user["org_id"],) + ).fetchone() + if row: + config = json.loads(row["config"]) + return _json({"icp": config, "id": row["id"], "name": row["name"]}) + # Return default ICP + return _json({"icp": DEALIX_DEFAULT_ICP.to_dict(), "id": "default", "name": "Dealix Default ICP"}) + + +@intelligence_bp.post("/icp") +@require_auth +def create_icp(user): + if user["role"] not in ("manager", "admin"): + return _json({"error": "Forbidden"}, 403) + """Create or update ICP config""" + data = request.get_json() or {} + icp_id = str(uuid.uuid4()) + + # Deactivate existing + with db() as conn: + conn.execute("UPDATE icp_configs SET is_active=0 WHERE org_id=?", (user["org_id"],)) + conn.execute(""" + INSERT INTO icp_configs (id, org_id, name, config, is_active, created_by) + VALUES (?, ?, ?, ?, 1, ?) + """, (icp_id, user["org_id"], data.get("name", "Custom ICP"), json.dumps(data), user["id"])) + + audit_log(user["org_id"], "intelligence", "icp_created", user["id"], icp_id, data) + return _json({"id": icp_id, "message": "ICP saved"}, 201) + + +# ─── PIPELINE ──────────────────────────────────────────────────────────────── + +@intelligence_bp.post("/pipeline/run") +@require_auth +def run_lead_pipeline(user): + if user["role"] not in ("manager", "admin"): + return _json({"error": "Forbidden"}, 403) + """ + Trigger full lead intelligence pipeline. + Body (all optional): + custom_queries: list[str] + motion: sales | partnership | channel | tender + max_leads: int (default 30) + enrich: bool (default true) + generate_outreach: bool (default true) + """ + data = request.get_json() or {} + motion = data.get("motion", "sales") + max_leads = min(int(data.get("max_leads", 30)), 100) + enrich = data.get("enrich", True) + gen_outreach = data.get("generate_outreach", True) + custom_queries = data.get("custom_queries", None) + + run_id = f"run-{uuid.uuid4().hex[:12]}" + + # Load ICP from DB if available + with db() as conn: + icp_row = conn.execute( + "SELECT config FROM icp_configs WHERE org_id=? AND is_active=1 LIMIT 1", + (user["org_id"],) + ).fetchone() + + icp = None + if icp_row: + try: + cfg = json.loads(icp_row["config"]) + icp = ICPConfig(**{k: v for k, v in cfg.items() if k in ICPConfig.__dataclass_fields__}) + except Exception: + icp = DEALIX_DEFAULT_ICP + else: + icp = DEALIX_DEFAULT_ICP + + # Record run start + with db() as conn: + conn.execute(""" + INSERT INTO intelligence_runs (id, org_id, run_mode, motion, status, created_by) + VALUES (?, ?, 'manual', ?, 'running', ?) + """, (run_id, user["org_id"], motion, user["id"])) + + try: + result = run_pipeline( + icp=icp, + custom_queries=custom_queries, + motion=motion, + max_leads=max_leads, + enrich=enrich, + generate_outreach=gen_outreach, + ) + result["run_id"] = run_id + + # Persist scored leads to DB + with db() as conn: + for item in result.get("scored_leads", []): + lead = item["lead"] + score = item["score"] + lid = lead.get("id", str(uuid.uuid4())) + conn.execute(""" + INSERT OR REPLACE INTO intelligence_leads ( + id, org_id, company_name, domain, industry, region, company_size, + description, website, tech_stack, signals, recent_news, + contact_name, contact_title, contact_email, contact_phone, contact_linkedin, + decision_maker_score, enrichment_source, enrichment_confidence, + source, source_url, raw_snippet, trigger, + score_fit, score_intent, score_access, score_value, score_urgency, + score_master, priority_tier, score_reasons, next_action, next_action_ar, + pipeline_run_id, enriched_at + ) VALUES ( + ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,? + ) + """, ( + lid, user["org_id"], + lead.get("company_name", ""), lead.get("domain", ""), + lead.get("industry", ""), lead.get("region", ""), + lead.get("company_size", "unknown"), + lead.get("description", ""), lead.get("website", ""), + json.dumps(lead.get("tech_stack", [])), + json.dumps(lead.get("signals", [])), + json.dumps(lead.get("recent_news", [])), + lead.get("contact_name", ""), lead.get("contact_title", ""), + lead.get("contact_email", ""), lead.get("contact_phone", ""), + lead.get("contact_linkedin", ""), + lead.get("decision_maker_score", 0), + lead.get("enrichment_source", "web"), + lead.get("enrichment_confidence", 0.5), + lead.get("source", ""), lead.get("source_url", ""), + lead.get("raw_snippet", ""), lead.get("trigger", ""), + score.get("fit", 0), score.get("intent", 0), + score.get("access", 0), score.get("value", 0), + score.get("urgency", 0), score.get("master", 0), + score.get("tier", "P4"), + json.dumps(score.get("reasons", [])), + score.get("next_action", ""), score.get("next_action_ar", ""), + run_id, lead.get("enriched_at", ""), + )) + + # Update run record + ts = result.get("tier_summary", {}) + conn.execute(""" + UPDATE intelligence_runs SET + total_discovered=?, total_deduped=?, total_enriched=?, + tier_p1=?, tier_p2=?, tier_p3=?, tier_p4=?, + duration_sec=?, status='complete' + WHERE id=? + """, ( + result.get("total_discovered", 0), + result.get("total_after_dedup", 0), + result.get("total_enriched", 0), + ts.get("P1_outreach_now", 0), ts.get("P2_enrich_more", 0), + ts.get("P3_nurture", 0), ts.get("P4_archive", 0), + result.get("pipeline_duration_sec", 0), + run_id, + )) + + audit_log(user["org_id"], "intelligence", "pipeline_run", user["id"], run_id, + {"motion": motion, "total": result.get("total_enriched", 0)}) + + # Return summary (not full scored list — too large) + return _json({ + "run_id": run_id, + "total_discovered": result["total_discovered"], + "total_after_dedup": result["total_after_dedup"], + "total_enriched": result["total_enriched"], + "tier_summary": result["tier_summary"], + "pipeline_duration_sec": result["pipeline_duration_sec"], + "p1_leads": result["p1_leads"][:10], + "outreach_briefs": result["outreach_briefs"][:5], + }) + + except Exception as e: + with db() as conn: + conn.execute( + "UPDATE intelligence_runs SET status='error', error_message=? WHERE id=?", + (str(e)[:500], run_id) + ) + return _json({"error": str(e), "run_id": run_id}, 500) + + +# ─── LEAD MANAGEMENT ───────────────────────────────────────────────────────── + +@intelligence_bp.get("/leads") +@require_auth +def list_intelligence_leads(user): + """List discovered leads with filters""" + tier = request.args.get("tier") # P1|P2|P3|P4 + status = request.args.get("status") # discovered|contacted|qualified|archived + sort = request.args.get("sort", "score") # score|date + limit = min(int(request.args.get("limit", 50)), 200) + offset = int(request.args.get("offset", 0)) + + conditions = ["org_id=?"] + params = [user["org_id"]] + if tier: + conditions.append("priority_tier=?") + params.append(tier) + if status: + conditions.append("status=?") + params.append(status) + + order = "score_master DESC" if sort == "score" else "created_at DESC" + where = " AND ".join(conditions) + + with db() as conn: + rows = conn.execute( + f"SELECT * FROM intelligence_leads WHERE {where} ORDER BY {order} LIMIT ? OFFSET ?", + params + [limit, offset] + ).fetchall() + total = conn.execute( + f"SELECT COUNT(*) FROM intelligence_leads WHERE {where}", params + ).fetchone()[0] + + leads = [] + for row in rows: + lead = dict(row) + for field in ["tech_stack", "signals", "recent_news", "score_reasons"]: + try: + lead[field] = json.loads(lead[field] or "[]") + except Exception: + lead[field] = [] + leads.append(lead) + + return _json({"leads": leads, "total": total, "limit": limit, "offset": offset}) + + +@intelligence_bp.get("/leads/") +@require_auth +def get_intelligence_lead(user, lead_id): + """Get a single intelligence lead""" + with db() as conn: + row = conn.execute( + "SELECT * FROM intelligence_leads WHERE id=? AND org_id=?", + (lead_id, user["org_id"]) + ).fetchone() + if not row: + return _json({"error": "Lead not found"}, 404) + lead = dict(row) + for field in ["tech_stack", "signals", "recent_news", "score_reasons"]: + try: + lead[field] = json.loads(lead[field] or "[]") + except Exception: + lead[field] = [] + return _json(lead) + + +@intelligence_bp.patch("/leads//status") +@require_auth +def update_lead_status(user, lead_id): + """Update lead status — contacted | qualified | archived""" + data = request.get_json() or {} + new_status = data.get("status") + if new_status not in ("discovered", "contacted", "qualified", "archived"): + return _json({"error": "Invalid status"}, 400) + + with db() as conn: + conn.execute(""" + UPDATE intelligence_leads SET status=?, reviewed_by=?, reviewed_at=datetime('now') + WHERE id=? AND org_id=? + """, (new_status, user["id"], lead_id, user["org_id"])) + + audit_log(user["org_id"], "intelligence", f"lead_status_{new_status}", user["id"], lead_id) + return _json({"id": lead_id, "status": new_status}) + + +@intelligence_bp.post("/leads//push-to-crm") +@require_auth +def push_lead_to_crm(user, lead_id): + """Push an intelligence lead to the CRM leads table""" + with db() as conn: + il = conn.execute( + "SELECT * FROM intelligence_leads WHERE id=? AND org_id=?", + (lead_id, user["org_id"]) + ).fetchone() + if not il: + return _json({"error": "Lead not found"}, 404) + + crm_id = str(uuid.uuid4()) + conn.execute(""" + INSERT INTO leads (id, org_id, company_name, contact_name, contact_email, + contact_phone, source, industry, company_size, region, status, score, + stage, enriched_data) + VALUES (?, ?, ?, ?, ?, ?, 'intelligence', ?, ?, ?, 'new', ?, 'intake', ?) + """, ( + crm_id, user["org_id"], + il["company_name"], il["contact_name"] or "", + il["contact_email"] or "", il["contact_phone"] or "", + il["industry"] or "", il["company_size"] or "", + il["region"] or "", il["score_master"], + json.dumps({ + "signals": json.loads(il["signals"] or "[]"), + "domain": il["domain"], + "description": il["description"], + "score_breakdown": { + "fit": il["score_fit"], "intent": il["score_intent"], + "access": il["score_access"], "value": il["score_value"], + "urgency": il["score_urgency"], + } + }) + )) + conn.execute( + "UPDATE intelligence_leads SET crm_lead_id=?, status='qualified' WHERE id=?", + (crm_id, lead_id) + ) + + audit_log(user["org_id"], "intelligence", "lead_pushed_to_crm", user["id"], lead_id, + {"crm_lead_id": crm_id}) + return _json({"lead_id": lead_id, "crm_lead_id": crm_id, "message": "Pushed to CRM"}, 201) + + +# ─── OUTREACH ──────────────────────────────────────────────────────────────── + +@intelligence_bp.post("/outreach/generate") +@require_auth +def generate_outreach(user): + """ + Generate outreach brief for a single lead. + Body: { lead_id, motion? } + """ + data = request.get_json() or {} + lead_id = data.get("lead_id") + motion = data.get("motion", "sales") + + with db() as conn: + row = conn.execute( + "SELECT * FROM intelligence_leads WHERE id=? AND org_id=?", + (lead_id, user["org_id"]) + ).fetchone() + if not row: + return _json({"error": "Lead not found"}, 404) + + lead = dict(row) + for field in ["tech_stack", "signals", "recent_news"]: + try: + lead[field] = json.loads(lead[field] or "[]") + except Exception: + lead[field] = [] + + score_dict = { + "fit": lead.get("score_fit", 0), "intent": lead.get("score_intent", 0), + "access": lead.get("score_access", 0), "value": lead.get("score_value", 0), + "urgency": lead.get("score_urgency", 0), "master": lead.get("score_master", 0), + "tier": lead.get("priority_tier", "P3"), + } + + brief = generate_outreach_brief(lead, score_dict, motion) + + # Save outreach back to lead + with db() as conn: + conn.execute(""" + UPDATE intelligence_leads SET + outreach_whatsapp_ar=?, outreach_email_subject_ar=?, + outreach_email_body_ar=?, outreach_linkedin_ar=?, outreach_angle=? + WHERE id=? + """, ( + brief.whatsapp_ar, brief.email_subject_ar, brief.email_body_ar, + brief.linkedin_ar, brief.angle, lead_id + )) + + audit_log(user["org_id"], "intelligence", "outreach_generated", user["id"], lead_id) + return _json({ + "lead_id": lead_id, + "company": brief.company_name, + "angle": brief.angle, + "whatsapp_ar": brief.whatsapp_ar, + "email_subject_ar": brief.email_subject_ar, + "email_body_ar": brief.email_body_ar, + "email_subject_en": brief.email_subject_en, + "email_body_en": brief.email_body_en, + "linkedin_ar": brief.linkedin_ar, + "personalization_score": brief.personalization_score, + }) + + +# ─── WATCHLIST & TRIGGERS ──────────────────────────────────────────────────── + +@intelligence_bp.get("/watchlist") +@require_auth +def get_watchlist(user): + with db() as conn: + rows = conn.execute( + "SELECT * FROM intelligence_watchlist WHERE org_id=? AND active=1 ORDER BY priority DESC", + (user["org_id"],) + ).fetchall() + return _json({"watchlist": [dict(r) for r in rows]}) + + +@intelligence_bp.post("/watchlist") +@require_auth +def add_to_watchlist(user): + data = request.get_json() or {} + wid = str(uuid.uuid4()) + with db() as conn: + conn.execute(""" + INSERT INTO intelligence_watchlist (id, org_id, company_name, domain, priority, added_by) + VALUES (?, ?, ?, ?, ?, ?) + """, (wid, user["org_id"], data.get("company_name", ""), + data.get("domain", ""), data.get("priority", 0), user["id"])) + return _json({"id": wid, "message": "Added to watchlist"}, 201) + + +@intelligence_bp.delete("/watchlist/") +@require_auth +def remove_from_watchlist(user, wid): + with db() as conn: + conn.execute( + "UPDATE intelligence_watchlist SET active=0 WHERE id=? AND org_id=?", + (wid, user["org_id"]) + ) + return _json({"id": wid, "message": "Removed from watchlist"}) + + +@intelligence_bp.post("/triggers/scan") +@require_auth +def scan_triggers(user): + if user["role"] not in ("manager", "admin"): + return _json({"error": "Forbidden"}, 403) + """ + Scan watchlist companies for trigger events. + Body: { company_names?: list[str] } + """ + data = request.get_json() or {} + company_names = data.get("company_names") + + if not company_names: + with db() as conn: + rows = conn.execute( + "SELECT company_name FROM intelligence_watchlist WHERE org_id=? AND active=1", + (user["org_id"],) + ).fetchall() + company_names = [r["company_name"] for r in rows] + + if not company_names: + return _json({"message": "No companies to scan", "triggers": {}}) + + # Limit to 5 companies per manual scan + company_names = company_names[:5] + trigger_results = scan_watchlist(company_names) + + # Persist triggers + now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + with db() as conn: + for company, events in trigger_results.items(): + for event in events: + tid = str(uuid.uuid4()) + conn.execute(""" + INSERT INTO intelligence_triggers ( + id, org_id, company_name, trigger_type, trigger_label_ar, + signal_strength, evidence, source_url, + recommended_action_ar, recommended_action_en + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + tid, user["org_id"], company, + event["type"], event["label_ar"], + event["strength"], event["evidence"][:500], + event["url"][:300], + event["action_ar"], event["action_en"], + )) + + audit_log(user["org_id"], "intelligence", "triggers_scanned", user["id"], + f"watchlist-{len(company_names)}", {"companies": company_names}) + return _json({ + "companies_scanned": len(company_names), + "triggers_found": sum(len(v) for v in trigger_results.values()), + "results": trigger_results, + }) + + +@intelligence_bp.get("/triggers") +@require_auth +def list_triggers(user): + with db() as conn: + rows = conn.execute( + """SELECT * FROM intelligence_triggers WHERE org_id=? + ORDER BY signal_strength DESC, detected_at DESC LIMIT 50""", + (user["org_id"],) + ).fetchall() + return _json({"triggers": [dict(r) for r in rows]}) + + +# ─── RUNS HISTORY ──────────────────────────────────────────────────────────── + +@intelligence_bp.get("/runs") +@require_auth +def list_runs(user): + with db() as conn: + rows = conn.execute( + "SELECT * FROM intelligence_runs WHERE org_id=? ORDER BY created_at DESC LIMIT 20", + (user["org_id"],) + ).fetchall() + return _json({"runs": [dict(r) for r in rows]}) + + +# ─── DASHBOARD SUMMARY ─────────────────────────────────────────────────────── + +@intelligence_bp.get("/dashboard") +@require_auth +def intelligence_dashboard(user): + """Intelligence OS overview — stats for the frontend dashboard""" + with db() as conn: + total = conn.execute( + "SELECT COUNT(*) FROM intelligence_leads WHERE org_id=?", (user["org_id"],) + ).fetchone()[0] + tiers = conn.execute( + """SELECT priority_tier, COUNT(*) as cnt FROM intelligence_leads + WHERE org_id=? GROUP BY priority_tier""", + (user["org_id"],) + ).fetchall() + top_leads = conn.execute( + """SELECT company_name, score_master, priority_tier, signals, + contact_email, next_action_ar, outreach_angle, status + FROM intelligence_leads WHERE org_id=? + ORDER BY score_master DESC LIMIT 10""", + (user["org_id"],) + ).fetchall() + trigger_count = conn.execute( + "SELECT COUNT(*) FROM intelligence_triggers WHERE org_id=? AND is_actioned=0", + (user["org_id"],) + ).fetchone()[0] + runs = conn.execute( + """SELECT COUNT(*) as total, MAX(created_at) as last_run + FROM intelligence_runs WHERE org_id=?""", + (user["org_id"],) + ).fetchone() + + tier_breakdown = {r["priority_tier"]: r["cnt"] for r in tiers} + + top = [] + for row in top_leads: + lead = dict(row) + try: + lead["signals"] = json.loads(lead["signals"] or "[]") + except Exception: + lead["signals"] = [] + top.append(lead) + + return _json({ + "total_leads": total, + "tier_breakdown": { + "P1_outreach_now": tier_breakdown.get("P1", 0), + "P2_enrich_more": tier_breakdown.get("P2", 0), + "P3_nurture": tier_breakdown.get("P3", 0), + "P4_archive": tier_breakdown.get("P4", 0), + }, + "unactioned_triggers": trigger_count, + "pipeline_runs": runs["total"] if runs else 0, + "last_run": runs["last_run"] if runs else None, + "top_leads": top, + }) diff --git a/salesflow-saas/backend/app/core/database.py b/salesflow-saas/backend/app/core/database.py new file mode 100644 index 00000000..2a51063f --- /dev/null +++ b/salesflow-saas/backend/app/core/database.py @@ -0,0 +1,561 @@ +"""Dealix Database Core — SQLite with full schema for 9 OS modules""" +import sqlite3 +import hashlib +import json +import time +from contextlib import contextmanager +from pathlib import Path + +DB_PATH = Path(__file__).parent.parent.parent / "dealix.db" + +def get_connection(): + conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=DELETE") + conn.execute("PRAGMA foreign_keys=ON") + return conn + +@contextmanager +def db(): + conn = get_connection() + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() + +def init_db(): + with db() as conn: + conn.executescript(""" + -- Users & Auth + CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY, + email TEXT UNIQUE NOT NULL, + name TEXT NOT NULL, + role TEXT NOT NULL DEFAULT 'sales', + org_id TEXT NOT NULL DEFAULT 'dealix', + password_hash TEXT NOT NULL, + created_at TEXT DEFAULT (datetime('now')), + updated_at TEXT DEFAULT (datetime('now')) + ); + + -- ============================================================ + -- 1. REVENUE OS + -- ============================================================ + CREATE TABLE IF NOT EXISTS leads ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + company_name TEXT NOT NULL, + contact_name TEXT, + contact_email TEXT, + contact_phone TEXT, + source TEXT DEFAULT 'website', + industry TEXT, + company_size TEXT, + annual_revenue TEXT, + region TEXT, + status TEXT DEFAULT 'new', + score INTEGER DEFAULT 0, + stage TEXT DEFAULT 'intake', + assigned_to TEXT, + notes TEXT, + enriched_data TEXT, + created_at TEXT DEFAULT (datetime('now')), + updated_at TEXT DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS deals ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + lead_id TEXT, + title TEXT NOT NULL, + value REAL DEFAULT 0, + currency TEXT DEFAULT 'SAR', + stage TEXT DEFAULT 'discovery', + probability INTEGER DEFAULT 0, + close_date TEXT, + owner_id TEXT, + account_id TEXT, + notes TEXT, + created_at TEXT DEFAULT (datetime('now')), + updated_at TEXT DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS accounts ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + company_name TEXT NOT NULL, + industry TEXT, + tier TEXT DEFAULT 'standard', + arr REAL DEFAULT 0, + health_score INTEGER DEFAULT 75, + csm_id TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + -- ============================================================ + -- 2. PRICING & MARGIN CONTROL OS + -- ============================================================ + CREATE TABLE IF NOT EXISTS quotes ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + deal_id TEXT, + account_id TEXT, + line_items TEXT, + subtotal REAL DEFAULT 0, + discount_pct REAL DEFAULT 0, + discount_reason TEXT, + final_price REAL DEFAULT 0, + margin_pct REAL DEFAULT 0, + approval_status TEXT DEFAULT 'pending', + approved_by TEXT, + approved_at TEXT, + valid_until TEXT, + created_by TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS discount_policies ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + max_discount_pct REAL NOT NULL, + approver_role TEXT NOT NULL, + deal_value_min REAL DEFAULT 0, + deal_value_max REAL, + active INTEGER DEFAULT 1 + ); + + -- ============================================================ + -- 3. PARTNERSHIP & ALLIANCE OS + -- ============================================================ + CREATE TABLE IF NOT EXISTS partners ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + company_name TEXT NOT NULL, + partner_type TEXT DEFAULT 'reseller', + status TEXT DEFAULT 'prospect', + fit_score INTEGER DEFAULT 0, + revenue_contribution REAL DEFAULT 0, + health_score INTEGER DEFAULT 75, + contact_name TEXT, + contact_email TEXT, + notes TEXT, + created_at TEXT DEFAULT (datetime('now')), + updated_at TEXT DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS alliance_workflows ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + partner_id TEXT NOT NULL, + stage TEXT DEFAULT 'scouting', + economics_model TEXT, + term_sheet TEXT, + approval_status TEXT DEFAULT 'pending', + approved_by TEXT, + activation_date TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + -- ============================================================ + -- 4. PROCUREMENT / VENDOR OS + -- ============================================================ + CREATE TABLE IF NOT EXISTS vendors ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + vendor_name TEXT NOT NULL, + category TEXT, + risk_level TEXT DEFAULT 'medium', + spend REAL DEFAULT 0, + health_score INTEGER DEFAULT 75, + contract_expiry TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS procurement_requests ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + vendor_id TEXT, + title TEXT NOT NULL, + amount REAL NOT NULL, + justification TEXT, + status TEXT DEFAULT 'draft', + approval_status TEXT DEFAULT 'pending', + approved_by TEXT, + approved_at TEXT, + created_by TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + -- ============================================================ + -- 5. RENEWAL & EXPANSION OS + -- ============================================================ + CREATE TABLE IF NOT EXISTS renewals ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + account_id TEXT NOT NULL, + current_arr REAL DEFAULT 0, + renewal_date TEXT, + churn_risk_score INTEGER DEFAULT 0, + expansion_score INTEGER DEFAULT 0, + status TEXT DEFAULT 'upcoming', + rescue_play_active INTEGER DEFAULT 0, + assigned_to TEXT, + notes TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + -- ============================================================ + -- 6. EXPANSION / MARKET ENTRY OS + -- ============================================================ + CREATE TABLE IF NOT EXISTS market_entries ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + market_name TEXT NOT NULL, + segment TEXT, + readiness_score INTEGER DEFAULT 0, + status TEXT DEFAULT 'scanning', + gtm_plan TEXT, + launch_date TEXT, + stop_loss_triggered INTEGER DEFAULT 0, + actual_vs_forecast TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + -- ============================================================ + -- 7. M&A / CORPORATE DEVELOPMENT OS + -- ============================================================ + CREATE TABLE IF NOT EXISTS ma_targets ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + target_name TEXT NOT NULL, + industry TEXT, + estimated_value REAL, + fit_score INTEGER DEFAULT 0, + stage TEXT DEFAULT 'screening', + dd_findings TEXT, + valuation_memo TEXT, + synergy_model TEXT, + ic_pack_status TEXT DEFAULT 'pending', + board_pack_ready INTEGER DEFAULT 0, + close_date TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + -- ============================================================ + -- 8. PMI / STRATEGIC PMO OS + -- ============================================================ + CREATE TABLE IF NOT EXISTS pmo_projects ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + title TEXT NOT NULL, + type TEXT DEFAULT 'pmi', + status TEXT DEFAULT 'active', + day1_readiness INTEGER DEFAULT 0, + plan_30_60_90 TEXT, + synergy_target REAL DEFAULT 0, + synergy_realized REAL DEFAULT 0, + blockers TEXT, + health TEXT DEFAULT 'green', + created_at TEXT DEFAULT (datetime('now')) + ); + + -- ============================================================ + -- 9. EXECUTIVE / BOARD OS + -- ============================================================ + CREATE TABLE IF NOT EXISTS approvals ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + module TEXT NOT NULL, + reference_id TEXT NOT NULL, + title TEXT NOT NULL, + amount REAL, + risk_level TEXT DEFAULT 'medium', + status TEXT DEFAULT 'pending', + requested_by TEXT, + approved_by TEXT, + decision_at TEXT, + evidence_pack TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS executive_packs ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + week_label TEXT, + actual_revenue REAL DEFAULT 0, + forecast_revenue REAL DEFAULT 0, + open_approvals INTEGER DEFAULT 0, + blockers TEXT, + next_best_actions TEXT, + risk_heatmap TEXT, + generated_at TEXT DEFAULT (datetime('now')) + ); + + -- ============================================================ + -- AUDIT CHAIN (cross-module) + -- ============================================================ + -- ============================================= + -- Revenue Intelligence OS — Lead Machine Tables + -- ============================================= + + -- ICP configs per org + CREATE TABLE IF NOT EXISTS icp_configs ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + name TEXT NOT NULL, + config TEXT NOT NULL, -- JSON ICPConfig + is_active INTEGER DEFAULT 1, + created_by TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + -- Discovered leads (raw, before enrichment) + CREATE TABLE IF NOT EXISTS intelligence_leads ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + company_name TEXT NOT NULL, + domain TEXT, + industry TEXT, + region TEXT, + company_size TEXT, + description TEXT, + website TEXT, + tech_stack TEXT, -- JSON list + signals TEXT, -- JSON list + recent_news TEXT, -- JSON list + contact_name TEXT, + contact_title TEXT, + contact_email TEXT, + contact_phone TEXT, + contact_linkedin TEXT, + decision_maker_score INTEGER DEFAULT 0, + enrichment_source TEXT DEFAULT 'web', + enrichment_confidence REAL DEFAULT 0.5, + source TEXT, + source_url TEXT, + raw_snippet TEXT, + trigger TEXT, + -- Scores + score_fit INTEGER DEFAULT 0, + score_intent INTEGER DEFAULT 0, + score_access INTEGER DEFAULT 0, + score_value INTEGER DEFAULT 0, + score_urgency INTEGER DEFAULT 0, + score_master INTEGER DEFAULT 0, + priority_tier TEXT DEFAULT 'P4', + score_reasons TEXT, -- JSON list + next_action TEXT, + next_action_ar TEXT, + -- Outreach + outreach_whatsapp_ar TEXT, + outreach_email_subject_ar TEXT, + outreach_email_body_ar TEXT, + outreach_linkedin_ar TEXT, + outreach_angle TEXT, + -- Pipeline tracking + pipeline_run_id TEXT, + crm_lead_id TEXT, -- linked to leads table + status TEXT DEFAULT 'discovered', -- discovered | contacted | qualified | archived + reviewed_by TEXT, + reviewed_at TEXT, + enriched_at TEXT, + discovered_at TEXT DEFAULT (datetime('now')), + created_at TEXT DEFAULT (datetime('now')) + ); + + -- Pipeline run history + CREATE TABLE IF NOT EXISTS intelligence_runs ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + icp_id TEXT, + run_mode TEXT DEFAULT 'auto', -- auto | manual | triggered + motion TEXT DEFAULT 'sales', + total_discovered INTEGER DEFAULT 0, + total_deduped INTEGER DEFAULT 0, + total_enriched INTEGER DEFAULT 0, + tier_p1 INTEGER DEFAULT 0, + tier_p2 INTEGER DEFAULT 0, + tier_p3 INTEGER DEFAULT 0, + tier_p4 INTEGER DEFAULT 0, + duration_sec REAL DEFAULT 0, + status TEXT DEFAULT 'running', -- running | complete | error + error_message TEXT, + created_by TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + -- Watchlist for trigger alerts + CREATE TABLE IF NOT EXISTS intelligence_watchlist ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + company_name TEXT NOT NULL, + domain TEXT, + priority INTEGER DEFAULT 0, + last_scanned TEXT, + active INTEGER DEFAULT 1, + added_by TEXT, + created_at TEXT DEFAULT (datetime('now')) + ); + + -- Trigger events detected + CREATE TABLE IF NOT EXISTS intelligence_triggers ( + id TEXT PRIMARY KEY, + org_id TEXT NOT NULL, + company_name TEXT NOT NULL, + trigger_type TEXT NOT NULL, + trigger_label_ar TEXT, + signal_strength INTEGER DEFAULT 0, + evidence TEXT, + source_url TEXT, + recommended_action_ar TEXT, + recommended_action_en TEXT, + is_actioned INTEGER DEFAULT 0, + actioned_by TEXT, + detected_at TEXT DEFAULT (datetime('now')) + ); + + -- Entity registry (deduplication) + CREATE TABLE IF NOT EXISTS intelligence_entities ( + id TEXT PRIMARY KEY, + canonical_name TEXT NOT NULL, + normalized_name TEXT, + domain TEXT, + aliases TEXT, -- JSON list + created_at TEXT DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS audit_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + org_id TEXT NOT NULL, + module TEXT NOT NULL, + action TEXT NOT NULL, + actor_id TEXT, + resource_id TEXT, + payload TEXT, + prev_hash TEXT, + entry_hash TEXT, + ts TEXT DEFAULT (datetime('now')) + ); + """) + + # Seed admin users + import hashlib, uuid + users = [ + ("admin-001", "admin@dealix.io", "Admin", "admin"), + ("mgr-001", "manager@dealix.io", "Manager", "manager"), + ("sales-001", "sales@dealix.io", "Sales Rep", "sales"), + ] + passwords = {"admin": "Admin1234!", "manager": "Manager1234!", "sales": "Sales1234!"} + for uid, email, name, role in users: + pw = hashlib.sha256(passwords[role].encode()).hexdigest() + conn.execute(""" + INSERT OR IGNORE INTO users (id, email, name, role, password_hash) + VALUES (?, ?, ?, ?, ?) + """, (uid, email, name, role, pw)) + + # Seed discount policies + conn.execute(""" + INSERT OR IGNORE INTO discount_policies (id, org_id, max_discount_pct, approver_role, deal_value_min, deal_value_max) + VALUES ('dp-1','dealix',10,'sales',0,50000), + ('dp-2','dealix',20,'manager',50000,200000), + ('dp-3','dealix',35,'admin',200000,NULL) + """) + + # Seed sample data for dashboard + _seed_sample_data(conn) + +def _seed_sample_data(conn): + import uuid + # Sample leads + leads = [ + ("lead-001","dealix","البنك الأهلي","محمد الغامدي","m@anb.com","0500000001","referral","banking","enterprise","500M+","Riyadh","qualified",88,"proposal","sales-001"), + ("lead-002","dealix","stc","فيصل الحربي","f@stc.com","0500000002","website","telecom","enterprise","1B+","Riyadh","qualified",91,"negotiation","sales-001"), + ("lead-003","dealix","أرامكو","خالد المالكي","k@aramco.com","0500000003","partner","energy","enterprise","10B+","Dhahran","new",72,"intake","sales-001"), + ("lead-004","dealix","مجموعة العثيم","سارة القحطاني","s@othaim.com","0500000004","website","retail","large","100M+","Riyadh","contacted",65,"discovery","sales-001"), + ("lead-005","dealix","مستشفى الملك فيصل","أحمد الزهراني","a@kfsh.com","0500000005","referral","healthcare","enterprise","200M+","Riyadh","qualified",79,"proposal","sales-001"), + ] + for l in leads: + conn.execute("""INSERT OR IGNORE INTO leads + (id,org_id,company_name,contact_name,contact_email,contact_phone,source,industry,company_size,annual_revenue,region,status,score,stage,assigned_to) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", l) + + # Sample deals + deals = [ + ("deal-001","dealix","lead-001","صفقة البنك الأهلي — Revenue OS",850000,"SAR","proposal",75,"2026-06-30","sales-001","acc-001"), + ("deal-002","dealix","lead-002","stc — Enterprise Suite",1200000,"SAR","negotiation",85,"2026-05-31","sales-001","acc-002"), + ("deal-003","dealix","lead-003","أرامكو — Executive OS",2500000,"SAR","discovery",40,"2026-09-30","sales-001","acc-003"), + ("deal-004","dealix","lead-005","KFSH — Procurement OS",420000,"SAR","proposal",65,"2026-07-15","sales-001","acc-004"), + ] + for d in deals: + conn.execute("""INSERT OR IGNORE INTO deals + (id,org_id,lead_id,title,value,currency,stage,probability,close_date,owner_id,account_id) + VALUES (?,?,?,?,?,?,?,?,?,?,?)""", d) + + # Sample accounts + accounts = [ + ("acc-001","dealix","البنك الأهلي","banking","enterprise",850000,82,"sales-001"), + ("acc-002","dealix","stc","telecom","strategic",1200000,91,"sales-001"), + ("acc-003","dealix","أرامكو","energy","strategic",0,88,"sales-001"), + ("acc-004","dealix","KFSH","healthcare","enterprise",420000,74,"sales-001"), + ] + for a in accounts: + conn.execute("""INSERT OR IGNORE INTO accounts + (id,org_id,company_name,industry,tier,arr,health_score,csm_id) + VALUES (?,?,?,?,?,?,?,?)""", a) + + # Sample partners + partners = [ + ("part-001","dealix","Oracle Arabia","technology","active",87,320000,82,"علي الدوسري","a@oracle.com"), + ("part-002","dealix","SAP KSA","technology","active",91,580000,88,"نورة العتيبي","n@sap.com"), + ("part-003","dealix","Deloitte KSA","consulting","prospect",73,0,0,"طارق المحمد","t@deloitte.com"), + ] + for p in partners: + conn.execute("""INSERT OR IGNORE INTO partners + (id,org_id,company_name,partner_type,status,fit_score,revenue_contribution,health_score,contact_name,contact_email) + VALUES (?,?,?,?,?,?,?,?,?,?)""", p) + + # Sample M&A targets + conn.execute("""INSERT OR IGNORE INTO ma_targets + (id,org_id,target_name,industry,estimated_value,fit_score,stage) + VALUES ('ma-001','dealix','Salesbook KSA','SaaS',8500000,84,'due_diligence')""") + + # Sample renewals + conn.execute("""INSERT OR IGNORE INTO renewals + (id,org_id,account_id,current_arr,renewal_date,churn_risk_score,expansion_score,status) + VALUES ('ren-001','dealix','acc-001',850000,'2026-12-31',22,67,'upcoming'), + ('ren-002','dealix','acc-002',1200000,'2026-10-31',8,88,'upcoming')""") + + # Sample approvals + conn.execute("""INSERT OR IGNORE INTO approvals + (id,org_id,module,reference_id,title,amount,risk_level,status,requested_by) + VALUES + ('appr-001','dealix','pricing','deal-001','خصم 25% — البنك الأهلي',212500,'high','pending','sales-001'), + ('appr-002','dealix','procurement','pr-001','تجديد عقد Oracle',145000,'medium','pending','mgr-001'), + ('appr-003','dealix','partnership','part-003','تفعيل شراكة Deloitte',0,'low','pending','mgr-001')""") + + # Executive pack + conn.execute("""INSERT OR IGNORE INTO executive_packs + (id,org_id,week_label,actual_revenue,forecast_revenue,open_approvals,blockers,next_best_actions) + VALUES ('ep-001','dealix','الأسبوع 16 — 2026',3850000,4200000,3, + '["صفقة أرامكو: تأخر RFP","تجديد Oracle: انتهاء العقد في 30 يوم"]', + '["أغلق خصم البنك الأهلي — 25%","ادفع تجديد Oracle قبل الانتهاء","جدول kickoff مع Deloitte"]')""") + + # Audit chain seed + prev = "GENESIS" + entries = [ + ("dealix","revenue","lead_created","admin-001","lead-001","{}"), + ("dealix","pricing","quote_created","sales-001","deal-001","{}"), + ("dealix","partnership","partner_added","mgr-001","part-001","{}"), + ("dealix","executive","pack_generated","admin-001","ep-001","{}"), + ] + for org, module, action, actor, resource, payload in entries: + import hashlib, time + content = f"{org}:{module}:{action}:{actor}:{resource}:{time.time()}" + entry_hash = hashlib.sha256(f"{prev}:{content}".encode()).hexdigest() + conn.execute("""INSERT INTO audit_log (org_id,module,action,actor_id,resource_id,payload,prev_hash,entry_hash) + VALUES (?,?,?,?,?,?,?,?)""", (org, module, action, actor, resource, payload, prev, entry_hash)) + prev = entry_hash diff --git a/salesflow-saas/backend/app/intelligence/__init__.py b/salesflow-saas/backend/app/intelligence/__init__.py new file mode 100644 index 00000000..591252b8 --- /dev/null +++ b/salesflow-saas/backend/app/intelligence/__init__.py @@ -0,0 +1 @@ +# Dealix Revenue Intelligence OS — Lead Machine Layer diff --git a/salesflow-saas/backend/app/intelligence/discovery.py b/salesflow-saas/backend/app/intelligence/discovery.py new file mode 100644 index 00000000..bf5bfdf9 --- /dev/null +++ b/salesflow-saas/backend/app/intelligence/discovery.py @@ -0,0 +1,378 @@ +""" +Lead Discovery Engine — Multi-source, Arabic/English +Searches web, news, job boards, and directories to find lead targets. +Returns structured LeadCandidate objects ready for enrichment. +""" +import re +import uuid +import hashlib +import unicodedata +from dataclasses import dataclass, field +from typing import List, Dict, Any, Optional +import urllib.request +import urllib.parse +import json +import time + + +@dataclass +class LeadCandidate: + """Raw discovered lead — before enrichment and scoring""" + id: str = field(default_factory=lambda: str(uuid.uuid4())) + company_name: str = "" + company_name_ar: str = "" + domain: str = "" + industry: str = "" + region: str = "" + source: str = "" # web_search | news | job_board | directory + source_url: str = "" + raw_snippet: str = "" + contact_name: str = "" + contact_title: str = "" + contact_email: str = "" + contact_linkedin: str = "" + phone: str = "" + signals: List[str] = field(default_factory=list) # ["hiring", "expansion", ...] + trigger: str = "" # what triggered discovery + confidence: float = 0.5 # 0-1 source confidence + discovered_at: str = "" + + +def normalize_company_name(name: str) -> str: + """Normalize company name for deduplication — Arabic + English""" + name = name.strip().lower() + # Remove Arabic definite article + name = re.sub(r'^(ال|شركة\s+|مجموعة\s+)', '', name) + # Remove common suffixes + suffixes = [ + r'\s+(llc|ltd|co\.|inc\.|corp\.?|group|holding|sa|كو|ليميتد|للتقنية|للخدمات|السعودية)$' + ] + for s in suffixes: + name = re.sub(s, '', name, flags=re.IGNORECASE) + # Normalize unicode + name = unicodedata.normalize('NFKC', name) + return name.strip() + + +def extract_domain_from_url(url: str) -> str: + try: + parsed = urllib.parse.urlparse(url) + domain = parsed.netloc.replace('www.', '') + return domain + except Exception: + return "" + + +def extract_emails_from_text(text: str) -> List[str]: + pattern = r'\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Z|a-z]{2,}\b' + return list(set(re.findall(pattern, text))) + + +def extract_phones_from_text(text: str) -> List[str]: + pattern = r'(\+966|00966|05\d)[\s\-]?(\d[\s\-]?){8,9}' + return list(set(re.findall(pattern, text) or [])) + + +def extract_linkedin_profiles(text: str) -> List[str]: + pattern = r'linkedin\.com/in/[\w\-]+' + return list(set(re.findall(pattern, text, re.IGNORECASE))) + + +def detect_signals(text: str) -> List[str]: + """Detect intent/trigger signals in text""" + signals = [] + text_lower = text.lower() + signal_map = { + "hiring": ["hiring", "we're hiring", "join our team", "نحن نوظف", "فرص عمل", "وظائف"], + "expansion": ["expansion", "new office", "توسع", "افتتاح فرع", "نطاق جديد"], + "funding": ["funding", "raised", "investment", "تمويل", "استثمار", "سلسلة", "series"], + "partnership": ["partnership", "collaboration", "شراكة", "تعاون"], + "digital_transformation": ["digital transformation", "تحول رقمي", "رقمنة"], + "new_product": ["launch", "new product", "إطلاق", "منتج جديد"], + "pain_point_crm": ["crm", "sales management", "إدارة المبيعات", "عملاء"], + "pain_point_outreach": ["outreach", "leads", "عملاء محتملين", "مبيعات"], + "regulation": ["zatca", "pdpl", "vat", "ضريبة", "ضريبة القيمة المضافة", "حوكمة"], + "ipo": ["ipo", "طرح عام", "اكتتاب"], + } + for signal, keywords in signal_map.items(): + if any(kw in text_lower for kw in keywords): + signals.append(signal) + return signals + + +# ─── Curated Saudi B2B Lead Database (fallback when web search is rate-limited) ─── +SAUDI_B2B_SEED_LEADS = [ + # Tech / SaaS + {"company_name": "Elm", "domain": "elm.sa", "industry": "technology", "region": "Riyadh", "company_size": "1000+", "signals": ["digital_transformation", "hiring"]}, + {"company_name": "Unifonic", "domain": "unifonic.com", "industry": "tech", "region": "Riyadh", "company_size": "200-1000", "signals": ["expansion", "funding"]}, + {"company_name": "Foodics", "domain": "foodics.com", "industry": "saas", "region": "Riyadh", "company_size": "200-1000", "signals": ["funding", "expansion"]}, + {"company_name": "Salla", "domain": "salla.sa", "industry": "technology", "region": "Jeddah", "company_size": "200-1000", "signals": ["expansion", "hiring"]}, + {"company_name": "Zid", "domain": "zid.sa", "industry": "saas", "region": "Riyadh", "company_size": "50-200", "signals": ["digital_transformation"]}, + {"company_name": "Lean Technologies", "domain": "leantech.me", "industry": "fintech", "region": "Riyadh", "company_size": "50-200", "signals": ["funding"]}, + {"company_name": "Tamara", "domain": "tamara.co", "industry": "fintech", "region": "Riyadh", "company_size": "200-1000", "signals": ["funding", "expansion"]}, + {"company_name": "Mozn", "domain": "mozn.sa", "industry": "technology", "region": "Riyadh", "company_size": "50-200", "signals": ["digital_transformation", "hiring"]}, + {"company_name": "Rewaa", "domain": "rewaaapp.com", "industry": "saas", "region": "Riyadh", "company_size": "50-200", "signals": ["pain_point_crm"]}, + {"company_name": "Tamatem", "domain": "tamatem.co", "industry": "technology", "region": "Riyadh", "company_size": "50-200", "signals": ["expansion"]}, + # Healthcare + {"company_name": "مجموعة دله للرعاية الصحية", "domain": "dallah-hospital.com", "industry": "healthcare", "region": "Riyadh", "company_size": "1000+", "signals": ["digital_transformation"]}, + {"company_name": "مستشفى الحمادي", "domain": "hammadi.com", "industry": "healthcare", "region": "Riyadh", "company_size": "200-1000", "signals": ["hiring"]}, + {"company_name": "Aster DM Healthcare Saudi", "domain": "asterhospitals.sa", "industry": "healthcare", "region": "Riyadh", "company_size": "200-1000", "signals": ["expansion"]}, + # Finance / Banking + {"company_name": "Riyad Bank", "domain": "riyadbank.com", "industry": "banking", "region": "Riyadh", "company_size": "1000+", "signals": ["digital_transformation", "hiring"]}, + {"company_name": "SABB", "domain": "sabb.com", "industry": "banking", "region": "Jeddah", "company_size": "1000+", "signals": ["digital_transformation"]}, + {"company_name": "Alinma Bank", "domain": "alinma.com", "industry": "banking", "region": "Riyadh", "company_size": "1000+", "signals": ["digital_transformation", "expansion"]}, + {"company_name": "STC Pay", "domain": "stcpay.com.sa", "industry": "fintech", "region": "Riyadh", "company_size": "200-1000", "signals": ["expansion", "hiring"]}, + # Retail / E-commerce + {"company_name": "نون", "domain": "noon.com", "industry": "retail", "region": "Riyadh", "company_size": "1000+", "signals": ["expansion", "hiring", "digital_transformation"]}, + {"company_name": "Jarir Bookstore", "domain": "jarir.com", "industry": "retail", "region": "Riyadh", "company_size": "1000+", "signals": ["digital_transformation"]}, + {"company_name": "Extra", "domain": "extra.com", "industry": "retail", "region": "Riyadh", "company_size": "200-1000", "signals": ["pain_point_crm"]}, + # Logistics + {"company_name": "NAQEL Express", "domain": "naqel.com.sa", "industry": "logistics", "region": "Riyadh", "company_size": "1000+", "signals": ["digital_transformation", "expansion"]}, + {"company_name": "Aramex Saudi Arabia", "domain": "aramex.com", "industry": "logistics", "region": "Riyadh", "company_size": "1000+", "signals": ["expansion"]}, + {"company_name": "Fetchr", "domain": "fetchr.us", "industry": "logistics", "region": "Riyadh", "company_size": "50-200", "signals": ["digital_transformation"]}, + # Real Estate + {"company_name": "Bayut Saudi Arabia", "domain": "bayut.sa", "industry": "real estate", "region": "Riyadh", "company_size": "50-200", "signals": ["digital_transformation"]}, + {"company_name": "مدار للعقارات", "domain": "madar.com.sa", "industry": "real estate", "region": "Riyadh", "company_size": "50-200", "signals": ["expansion"]}, + # Manufacturing / Industrial + {"company_name": "SABIC", "domain": "sabic.com", "industry": "manufacturing", "region": "Riyadh", "company_size": "1000+", "signals": ["digital_transformation", "ipo"]}, + {"company_name": "Saudi Cement", "domain": "saudicement.com.sa", "industry": "manufacturing", "region": "Riyadh", "company_size": "1000+", "signals": ["hiring"]}, + # Consulting / Professional Services + {"company_name": "Deloitte Saudi Arabia", "domain": "deloitte.com/sa", "industry": "consulting", "region": "Riyadh", "company_size": "200-1000", "signals": ["hiring", "expansion"]}, + {"company_name": "McKinsey Riyadh", "domain": "mckinsey.com", "industry": "consulting", "region": "Riyadh", "company_size": "50-200", "signals": ["pain_point_outreach"]}, + {"company_name": "PwC Saudi Arabia", "domain": "pwc.com/m1", "industry": "consulting", "region": "Riyadh", "company_size": "200-1000", "signals": ["regulation", "hiring"]}, + # Media / Education + {"company_name": "MBC Group", "domain": "mbc.net", "industry": "media", "region": "Riyadh", "company_size": "1000+", "signals": ["digital_transformation"]}, + {"company_name": "Edraak", "domain": "edraak.org", "industry": "education", "region": "Amman", "company_size": "50-200", "signals": ["digital_transformation"]}, + # Energy / Government + {"company_name": "Saudi Electricity Company", "domain": "se.com.sa", "industry": "energy", "region": "Riyadh", "company_size": "1000+", "signals": ["digital_transformation", "regulation"]}, + {"company_name": "Maaden", "domain": "maaden.com.sa", "industry": "manufacturing", "region": "Riyadh", "company_size": "1000+", "signals": ["expansion", "ipo"]}, +] + + +class LeadDiscoveryEngine: + """ + Multi-source lead discovery engine. + Searches web sources and extracts structured lead candidates. + """ + + def __init__(self, icp=None): + self.icp = icp + + def search_web_simple(self, query: str, max_results: int = 10) -> List[Dict]: + """ + Lightweight web search via DuckDuckGo HTML (no API key required). + Returns list of {title, url, snippet} dicts. + """ + results = [] + try: + encoded = urllib.parse.quote(query) + url = f"https://html.duckduckgo.com/html/?q={encoded}" + req = urllib.request.Request( + url, + headers={ + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/120.0", + "Accept": "text/html,application/xhtml+xml", + "Accept-Language": "ar,en;q=0.9", + } + ) + with urllib.request.urlopen(req, timeout=8) as resp: + html = resp.read().decode('utf-8', errors='ignore') + + # DDG HTML uses redirect URLs: //duckduckgo.com/l/?uddg= + # Extract all anchor text + href pairs + link_pattern = re.compile( + r']+href="(//duckduckgo\.com/l/\?uddg=[^"]+)"[^>]*>(.*?)', + re.DOTALL | re.IGNORECASE + ) + # Extract snippets from result__snippet divs + snippet_pattern = re.compile( + r'class="result__snippet"[^>]*>(.*?)', + re.DOTALL | re.IGNORECASE + ) + snippets_raw = snippet_pattern.findall(html) + + # Extract URL domain displays (result__url spans) + domain_pattern = re.compile( + r']*>\s*([^<]+)\s*', + re.IGNORECASE + ) + domains_raw = domain_pattern.findall(html) + + # Extract title links + title_pattern = re.compile( + r'class="result__a"[^>]*>(.*?)', + re.DOTALL | re.IGNORECASE + ) + titles_raw = title_pattern.findall(html) + + # Extract real URLs from DDG redirect + uddg_pattern = re.compile( + r'uddg=([A-Za-z0-9%+_.-]+)', + re.IGNORECASE + ) + + all_hrefs = re.findall( + r'class="result__a"[^>]*href="([^"]+)"', + html, re.IGNORECASE + ) + + for i in range(min(max_results, len(titles_raw))): + title = re.sub(r'<[^>]+>', '', titles_raw[i]).strip() + if not title or len(title) < 4: + continue + + # Decode real URL + real_url = "" + if i < len(all_hrefs): + href = all_hrefs[i] + m = uddg_pattern.search(href) + if m: + try: + real_url = urllib.parse.unquote(m.group(1)) + except Exception: + real_url = "" + + snippet = "" + if i < len(snippets_raw): + snippet = re.sub(r'<[^>]+>', '', snippets_raw[i]).strip() + + results.append({ + "title": title[:200], + "url": real_url[:500], + "snippet": snippet[:800], + }) + + except Exception as e: + pass # Silent fail — don't break the pipeline + + return results + + def candidate_from_search_result( + self, result: Dict, query: str, source: str = "web_search" + ) -> Optional[LeadCandidate]: + """Convert a raw search result into a LeadCandidate""" + title = result.get("title", "") + snippet = result.get("snippet", "") + url = result.get("url", "") + + if not title or len(title) < 3: + return None + + text = f"{title} {snippet}" + signals = detect_signals(text) + emails = extract_emails_from_text(text) + phones = extract_phones_from_text(text) + linkedin_profiles = extract_linkedin_profiles(text) + + candidate = LeadCandidate( + company_name=title[:100], + domain=extract_domain_from_url(url), + source=source, + source_url=url[:500], + raw_snippet=snippet[:1000], + signals=signals, + trigger=query[:200], + contact_email=emails[0] if emails else "", + phone=str(phones[0]) if phones else "", + contact_linkedin=linkedin_profiles[0] if linkedin_profiles else "", + confidence=0.6 if signals else 0.4, + discovered_at=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + ) + return candidate + + def _seed_candidates_from_db(self, icp=None) -> List[LeadCandidate]: + """Generate candidates from curated Saudi B2B seed database""" + candidates = [] + now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + for entry in SAUDI_B2B_SEED_LEADS: + # ICP filter by industry if ICP is provided + if icp and icp.industries: + industry = entry.get("industry", "").lower() + if not any(ind.lower() in industry or industry in ind.lower() for ind in icp.industries): + continue + c = LeadCandidate( + company_name=entry["company_name"], + domain=entry.get("domain", ""), + industry=entry.get("industry", ""), + region=entry.get("region", ""), + source="seed_database", + source_url=f"https://{entry.get('domain','')}", + # Embed region in raw_snippet so scoring picks it up + raw_snippet=f"{entry.get('company_name','')} | {entry.get('industry','')} | {entry.get('region','')} Saudi Arabia KSA", + signals=entry.get("signals", []), + trigger="ICP match — seed database", + confidence=0.7, + discovered_at=now, + ) + candidates.append(c) + return candidates + + def discover(self, queries: List[str], max_per_query: int = 8) -> List[LeadCandidate]: + """ + Run discovery across all queries, return deduplicated LeadCandidates. + Falls back to seed database if web search returns 0 results. + """ + all_candidates = [] + seen_domains = set() + seen_names = set() + web_found = 0 + + for query in queries: + time.sleep(0.3) # rate limiting + results = self.search_web_simple(query, max_results=max_per_query) + web_found += len(results) + for result in results: + candidate = self.candidate_from_search_result(result, query) + if candidate is None: + continue + # Dedup by domain + if candidate.domain and candidate.domain in seen_domains: + continue + # Dedup by normalized name + norm_name = normalize_company_name(candidate.company_name) + if norm_name and norm_name in seen_names: + continue + if candidate.domain: + seen_domains.add(candidate.domain) + if norm_name: + seen_names.add(norm_name) + all_candidates.append(candidate) + + # Fallback: if web search returned nothing (rate-limited), use seed DB + if web_found == 0: + seed_candidates = self._seed_candidates_from_db(self.icp) + for candidate in seed_candidates: + if candidate.domain and candidate.domain in seen_domains: + continue + norm_name = normalize_company_name(candidate.company_name) + if norm_name and norm_name in seen_names: + continue + if candidate.domain: + seen_domains.add(candidate.domain) + if norm_name: + seen_names.add(norm_name) + all_candidates.append(candidate) + else: + # Also enrich with seed DB entries not already found + seed_candidates = self._seed_candidates_from_db(self.icp) + for candidate in seed_candidates: + if candidate.domain and candidate.domain in seen_domains: + continue + norm_name = normalize_company_name(candidate.company_name) + if norm_name and norm_name in seen_names: + continue + if candidate.domain: + seen_domains.add(candidate.domain) + if norm_name: + seen_names.add(norm_name) + all_candidates.append(candidate) + + return all_candidates + + def discover_from_icp(self, icp=None, max_per_query: int = 6) -> List[LeadCandidate]: + """Run discovery using ICP-generated queries""" + icp = icp or self.icp + if icp is None: + return [] + queries = icp.build_search_queries() + return self.discover(queries, max_per_query=max_per_query) diff --git a/salesflow-saas/backend/app/intelligence/enrichment.py b/salesflow-saas/backend/app/intelligence/enrichment.py new file mode 100644 index 00000000..a3ac728c --- /dev/null +++ b/salesflow-saas/backend/app/intelligence/enrichment.py @@ -0,0 +1,257 @@ +""" +Enrichment Layer — Company + Person + Intent signals +Enriches LeadCandidates with additional data from multiple sources. +Designed to plug in Apollo/PDL/Clay APIs via env vars when available. +""" +import os +import re +import json +import time +import urllib.request +import urllib.parse +from typing import Dict, Any, Optional, List +from dataclasses import dataclass, field, asdict + +from app.intelligence.discovery import LeadCandidate, extract_emails_from_text, detect_signals + + +@dataclass +class EnrichedLead: + """Fully enriched lead — ready for scoring""" + # Identity + id: str = "" + company_name: str = "" + company_name_ar: str = "" + domain: str = "" + website: str = "" + + # Company facts + industry: str = "" + industry_ar: str = "" + company_size: str = "" + employee_count: int = 0 + founded_year: int = 0 + annual_revenue_sar: float = 0.0 + headquarters: str = "" + region: str = "" + description: str = "" + description_ar: str = "" + + # Technology stack (signals for fit) + tech_stack: List[str] = field(default_factory=list) + uses_crm: bool = False + uses_erp: bool = False + + # Contact + contact_name: str = "" + contact_title: str = "" + contact_title_ar: str = "" + contact_email: str = "" + contact_phone: str = "" + contact_linkedin: str = "" + decision_maker_score: int = 0 # 0-100: how likely this person makes the buy decision + + # Intent signals + signals: List[str] = field(default_factory=list) + intent_keywords: List[str] = field(default_factory=list) + recent_news: List[str] = field(default_factory=list) + open_jobs_count: int = 0 + open_jobs_relevant: List[str] = field(default_factory=list) + + # Enrichment metadata + enrichment_source: str = "web" # web | apollo | pdl | clay + enrichment_confidence: float = 0.5 + enriched_at: str = "" + + # Original discovery data + source: str = "" + source_url: str = "" + raw_snippet: str = "" + trigger: str = "" + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +# Title → Seniority mapping (Arabic + English) +TITLE_SENIORITY = { + "ceo": 100, "chief executive": 100, "الرئيس التنفيذي": 100, "المدير العام": 100, + "coo": 95, "chief operating": 95, "المدير التشغيلي": 95, + "cro": 95, "chief revenue": 95, + "cfo": 90, "chief financial": 90, + "vp": 85, "vice president": 85, "نائب الرئيس": 85, + "head of": 80, "رئيس قسم": 80, + "director": 75, "مدير": 70, + "manager": 55, "مشرف": 40, + "executive": 65, "تنفيذي": 65, +} + +TECH_KEYWORDS = [ + "salesforce", "sap", "oracle", "hubspot", "zoho", "dynamics", "pipedrive", + "نت سويت", "odoo", "quickbooks", "workday", "servicenow", + "jira", "slack", "teams", "whatsapp business", +] + +CRM_KEYWORDS = ["salesforce", "hubspot", "zoho crm", "dynamics crm", "pipedrive", "crm"] +ERP_KEYWORDS = ["sap", "oracle", "odoo", "netsuite", "dynamics erp", "erp"] + + +def infer_seniority_score(title: str) -> int: + title_lower = title.lower() + for kw, score in TITLE_SENIORITY.items(): + if kw in title_lower: + return score + return 30 + + +def infer_tech_stack(text: str) -> List[str]: + text_lower = text.lower() + return [tech for tech in TECH_KEYWORDS if tech in text_lower] + + +def estimate_company_size(text: str) -> str: + """Try to extract company size from text""" + patterns = [ + (r'(\d{1,5})\s*\+?\s*(employees|موظف|staff)', lambda m: int(m.group(1))), + (r'(small|صغير)', lambda m: 0), + (r'(medium|متوسط)', lambda m: 150), + (r'(large|كبير|enterprise)', lambda m: 1000), + ] + for pattern, extractor in patterns: + match = re.search(pattern, text, re.IGNORECASE) + if match: + try: + count = extractor(match) + if count < 50: return "1-50" + elif count < 200: return "50-200" + elif count < 1000: return "200-1000" + else: return "1000+" + except Exception: + pass + return "unknown" + + +def fetch_company_website_data(domain: str) -> Dict[str, Any]: + """Try to fetch company website and extract key signals""" + if not domain: + return {} + try: + url = f"https://{domain}" + req = urllib.request.Request( + url, + headers={"User-Agent": "Mozilla/5.0 (compatible; DealixBot/1.0)"} + ) + with urllib.request.urlopen(req, timeout=6) as resp: + html = resp.read().decode('utf-8', errors='ignore')[:15000] + + emails = extract_emails_from_text(html) + tech_stack = infer_tech_stack(html) + signals = detect_signals(html) + size = estimate_company_size(html) + + # Extract title/description + title_match = re.search(r']*>(.*?)', html, re.IGNORECASE | re.DOTALL) + desc_match = re.search( + r']*content=["\']([^"\']+)["\']', + html, re.IGNORECASE + ) + + return { + "page_title": re.sub(r'<[^>]+>', '', title_match.group(1)).strip() if title_match else "", + "description": desc_match.group(1).strip() if desc_match else "", + "emails": emails[:3], + "tech_stack": tech_stack, + "signals": signals, + "company_size": size, + } + except Exception: + return {} + + +def search_company_news(company_name: str) -> List[str]: + """Quick news search for a company name""" + try: + query = urllib.parse.quote(f"{company_name} news 2025 2026") + url = f"https://html.duckduckgo.com/html/?q={query}" + req = urllib.request.Request( + url, headers={"User-Agent": "Mozilla/5.0 (compatible; DealixBot/1.0)"} + ) + with urllib.request.urlopen(req, timeout=5) as resp: + html = resp.read().decode('utf-8', errors='ignore') + snippets = re.findall(r']*>(.*?)', html) + return [re.sub(r'<[^>]+>', '', s).strip() for s in snippets[:4]] + except Exception: + return [] + + +def enrich_candidate(candidate: LeadCandidate) -> EnrichedLead: + """ + Enrich a LeadCandidate with website data, news, and inferred signals. + Falls back gracefully when data unavailable. + """ + enriched = EnrichedLead( + id=candidate.id, + company_name=candidate.company_name, + domain=candidate.domain, + website=f"https://{candidate.domain}" if candidate.domain else "", + source=candidate.source, + source_url=candidate.source_url, + raw_snippet=candidate.raw_snippet, + trigger=candidate.trigger, + signals=candidate.signals.copy(), + contact_email=candidate.contact_email, + contact_phone=candidate.contact_phone, + contact_linkedin=candidate.contact_linkedin, + enriched_at=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + ) + + # Fetch website data + if candidate.domain: + site_data = fetch_company_website_data(candidate.domain) + enriched.description = site_data.get("description", "") + enriched.tech_stack = site_data.get("tech_stack", []) + enriched.uses_crm = any(t in site_data.get("tech_stack", []) for t in CRM_KEYWORDS) + enriched.uses_erp = any(t in site_data.get("tech_stack", []) for t in ERP_KEYWORDS) + enriched.company_size = site_data.get("company_size", "unknown") + # Merge signals + for sig in site_data.get("signals", []): + if sig not in enriched.signals: + enriched.signals.append(sig) + # Extract emails if not already present + if not enriched.contact_email and site_data.get("emails"): + enriched.contact_email = site_data["emails"][0] + + # Fetch news + if candidate.company_name: + enriched.recent_news = search_company_news(candidate.company_name) + # Detect signals in news + for news_item in enriched.recent_news: + for sig in detect_signals(news_item): + if sig not in enriched.signals: + enriched.signals.append(sig) + + # Infer decision maker score + enriched.decision_maker_score = infer_seniority_score(candidate.contact_title) + + # Confidence based on available data + data_points = sum([ + bool(enriched.domain), + bool(enriched.contact_email), + bool(enriched.description), + bool(enriched.signals), + bool(enriched.recent_news), + ]) + enriched.enrichment_confidence = min(1.0, 0.3 + (data_points * 0.14)) + enriched.enrichment_source = "web" + + return enriched + + +def enrich_batch(candidates: List[LeadCandidate], delay: float = 0.5) -> List[EnrichedLead]: + """Enrich a list of candidates with rate limiting""" + enriched_leads = [] + for candidate in candidates: + enriched = enrich_candidate(candidate) + enriched_leads.append(enriched) + time.sleep(delay) + return enriched_leads diff --git a/salesflow-saas/backend/app/intelligence/entity_resolution.py b/salesflow-saas/backend/app/intelligence/entity_resolution.py new file mode 100644 index 00000000..a80803a4 --- /dev/null +++ b/salesflow-saas/backend/app/intelligence/entity_resolution.py @@ -0,0 +1,210 @@ +""" +Entity Resolution & Deduplication Engine +Arabic/English normalization + fuzzy company matching. +Prevents same company appearing twice under different names. +""" +import re +import unicodedata +from typing import List, Dict, Tuple, Optional +from difflib import SequenceMatcher + + +# Common Arabic/English company suffixes to strip +STRIP_SUFFIXES_AR = [ + r'\s*(شركة|مجموعة|مؤسسة|ش\.م\.م|ش\.م\.س|ذ\.م\.م|للخدمات|للتقنية|للمعلوماتية' + r'|السعودية|العربية|الخليجية|الدولية|التجارية|الحديثة|المتحدة|المتقدمة)\s*$' +] +STRIP_SUFFIXES_EN = [ + r'\s*(llc|ltd|co\.|co|inc\.|inc|corp\.|corp|group|holding|holdings|sa|plc' + r'|technologies|solutions|services|systems|international|global|company)\s*$' +] +ARABIC_ARTICLE = r'^(ال)' + +# Arabic → English character transliteration for matching +ARABIC_ROMAN_MAP = { + 'ا': 'a', 'أ': 'a', 'إ': 'a', 'آ': 'a', + 'ب': 'b', 'ت': 't', 'ث': 'th', 'ج': 'j', 'ح': 'h', 'خ': 'kh', + 'د': 'd', 'ذ': 'dh', 'ر': 'r', 'ز': 'z', 'س': 's', 'ش': 'sh', + 'ص': 's', 'ض': 'd', 'ط': 't', 'ظ': 'z', 'ع': 'a', 'غ': 'gh', + 'ف': 'f', 'ق': 'q', 'ك': 'k', 'ل': 'l', 'م': 'm', 'ن': 'n', + 'ه': 'h', 'و': 'w', 'ي': 'y', 'ى': 'a', 'ة': 'h', + 'ئ': 'y', 'ء': '', 'ؤ': 'w', +} + + +def transliterate_arabic(text: str) -> str: + """Convert Arabic script to approximate Latin for cross-script matching""" + return ''.join(ARABIC_ROMAN_MAP.get(c, c) for c in text) + + +def normalize_name(name: str) -> str: + """Canonical form for deduplication matching""" + if not name: + return "" + name = name.strip().lower() + # Strip Arabic article + name = re.sub(ARABIC_ARTICLE, '', name) + # Strip Arabic suffixes + for pattern in STRIP_SUFFIXES_AR: + name = re.sub(pattern, '', name, flags=re.IGNORECASE) + # Strip English suffixes + for pattern in STRIP_SUFFIXES_EN: + name = re.sub(pattern, '', name, flags=re.IGNORECASE) + # Normalize unicode + name = unicodedata.normalize('NFKC', name) + # Remove punctuation + name = re.sub(r'[^\w\s\u0600-\u06FF]', '', name) + name = re.sub(r'\s+', ' ', name).strip() + return name + + +def normalize_domain(domain: str) -> str: + """Strip www, https, subdomains for domain matching""" + domain = domain.lower().strip() + domain = re.sub(r'^https?://', '', domain) + domain = re.sub(r'^www\.', '', domain) + domain = re.sub(r'/.*$', '', domain) + return domain + + +def fuzzy_match_score(a: str, b: str) -> float: + """Similarity ratio between two strings 0-1""" + return SequenceMatcher(None, a, b).ratio() + + +def are_same_company( + name_a: str, domain_a: str, + name_b: str, domain_b: str, + threshold: float = 0.82 +) -> Tuple[bool, float, str]: + """ + Determine if two company records refer to the same entity. + Returns: (is_same, confidence, reason) + """ + # Domain match is definitive + if domain_a and domain_b: + d_a = normalize_domain(domain_a) + d_b = normalize_domain(domain_b) + if d_a == d_b and d_a: + return True, 1.0, "exact_domain_match" + + # Normalize names + norm_a = normalize_name(name_a) + norm_b = normalize_name(name_b) + + if not norm_a or not norm_b: + return False, 0.0, "insufficient_data" + + # Exact normalized match + if norm_a == norm_b: + return True, 0.98, "exact_name_match" + + # Fuzzy match on original names + ratio = fuzzy_match_score(norm_a, norm_b) + if ratio >= threshold: + return True, ratio, f"fuzzy_match_{ratio:.2f}" + + # Cross-script: transliterate Arabic and compare with English + translit_a = transliterate_arabic(norm_a) + translit_b = transliterate_arabic(norm_b) + cross_ratio = fuzzy_match_score(translit_a, norm_b) + if cross_ratio >= threshold: + return True, cross_ratio, f"cross_script_match_{cross_ratio:.2f}" + cross_ratio2 = fuzzy_match_score(norm_a, translit_b) + if cross_ratio2 >= threshold: + return True, cross_ratio2, f"cross_script_match_{cross_ratio2:.2f}" + + return False, max(ratio, cross_ratio), "no_match" + + +class EntityRegistry: + """ + Maintains a registry of known companies with deduplication. + Use resolve() to find or create a canonical entity. + """ + + def __init__(self): + self._entities: List[Dict] = [] # List of canonical entity records + self._domain_index: Dict[str, int] = {} # domain → entity index + self._name_index: Dict[str, int] = {} # normalized name → entity index + + def resolve(self, name: str, domain: str = "") -> Tuple[int, bool]: + """ + Find existing entity or create new one. + Returns: (entity_id, is_new) + """ + norm_name = normalize_name(name) + norm_domain = normalize_domain(domain) if domain else "" + + # Fast lookup by domain + if norm_domain and norm_domain in self._domain_index: + return self._domain_index[norm_domain], False + + # Fast lookup by exact name + if norm_name and norm_name in self._name_index: + return self._name_index[norm_name], False + + # Fuzzy scan + for idx, entity in enumerate(self._entities): + is_same, confidence, reason = are_same_company( + name, domain, + entity.get("canonical_name", ""), + entity.get("domain", ""), + ) + if is_same: + # Update entity with better data + if not entity.get("domain") and norm_domain: + entity["domain"] = norm_domain + self._domain_index[norm_domain] = idx + return idx, False + + # Create new entity + new_id = len(self._entities) + entity = { + "id": new_id, + "canonical_name": name, + "normalized_name": norm_name, + "domain": norm_domain, + "aliases": [], + } + self._entities.append(entity) + if norm_domain: + self._domain_index[norm_domain] = new_id + if norm_name: + self._name_index[norm_name] = new_id + + return new_id, True + + def deduplicate_lead_list(self, leads: List[Dict]) -> List[Dict]: + """ + Deduplicate a list of lead dicts. + Each lead must have 'company_name' and optionally 'domain'. + Returns deduplicated list with canonical names. + """ + seen = {} # entity_id → first lead index + deduped = [] + + for lead in leads: + name = lead.get("company_name", "") + domain = lead.get("domain", "") + entity_id, is_new = self.resolve(name, domain) + if is_new or entity_id not in seen: + seen[entity_id] = len(deduped) + lead["entity_id"] = entity_id + deduped.append(lead) + else: + # Merge: keep richer record + existing = deduped[seen[entity_id]] + for field in ["contact_email", "contact_phone", "contact_linkedin", + "description", "tech_stack", "signals"]: + if not existing.get(field) and lead.get(field): + existing[field] = lead[field] + # Merge signals list + if isinstance(existing.get("signals"), list) and isinstance(lead.get("signals"), list): + existing["signals"] = list(set(existing["signals"] + lead["signals"])) + + return deduped + + @property + def entity_count(self) -> int: + return len(self._entities) diff --git a/salesflow-saas/backend/app/intelligence/icp.py b/salesflow-saas/backend/app/intelligence/icp.py new file mode 100644 index 00000000..007efbc8 --- /dev/null +++ b/salesflow-saas/backend/app/intelligence/icp.py @@ -0,0 +1,87 @@ +""" +ICP Builder — Ideal Customer Profile Engine +Defines and stores ICP configs per org. Drives all discovery logic. +""" +from dataclasses import dataclass, field, asdict +from typing import List, Optional, Dict, Any +import json + + +@dataclass +class ICPConfig: + """Ideal Customer Profile — full definition per org""" + org_id: str + + # Company attributes + industries: List[str] = field(default_factory=list) # e.g. ["tech", "healthcare", "banking"] + company_sizes: List[str] = field(default_factory=list) # e.g. ["50-200", "200-1000"] + regions: List[str] = field(default_factory=list) # e.g. ["Riyadh", "Jeddah", "KSA"] + revenue_range_sar: Dict[str, float] = field(default_factory=dict) # {"min": 1000000, "max": 50000000} + tech_signals: List[str] = field(default_factory=list) # e.g. ["Salesforce", "SAP", "HubSpot"] + growth_signals: List[str] = field(default_factory=list) # e.g. ["hiring", "funding", "expansion"] + languages: List[str] = field(default_factory=list) # e.g. ["ar", "en"] + + # Person attributes (buying committee) + target_titles_ar: List[str] = field(default_factory=list) # Arabic titles + target_titles_en: List[str] = field(default_factory=list) # English titles + seniority_levels: List[str] = field(default_factory=list) # e.g. ["C-level", "VP", "Director"] + + # Opportunity type + motion: str = "sales" # sales | partnership | channel | tender + segment: str = "B2B" # B2B | B2C | B2T + + # Scoring weights (must sum to 1.0) + fit_weight: float = 0.30 + intent_weight: float = 0.25 + access_weight: float = 0.15 + value_weight: float = 0.20 + urgency_weight: float = 0.10 + + # Discovery sources + discovery_sources: List[str] = field(default_factory=lambda: [ + "web_search", "linkedin_public", "news", "job_boards", "directories" + ]) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + def build_search_queries(self) -> List[str]: + """Auto-generate search queries from ICP attributes — Arabic + English""" + queries = [] + for industry in self.industries[:3]: + for region in self.regions[:2]: + queries.append(f"شركات {industry} في {region}") + queries.append(f"{industry} companies in {region} Saudi Arabia") + for signal in self.growth_signals[:2]: + for industry in self.industries[:2]: + queries.append(f"{industry} {signal} Saudi Arabia 2025 2026") + for title in self.target_titles_ar[:2]: + for industry in self.industries[:2]: + queries.append(f"{title} {industry} السعودية") + for title in self.target_titles_en[:2]: + for industry in self.industries[:2]: + queries.append(f"{title} {industry} Saudi Arabia LinkedIn") + return queries[:20] # cap at 20 queries + + +# Default Dealix ICP — B2B SaaS / Enterprise, Saudi-first +DEALIX_DEFAULT_ICP = ICPConfig( + org_id="dealix", + industries=["تقنية", "رعاية صحية", "مالية وبنوك", "عقارات", "تصنيع", "تجزئة", "لوجستيات", + "technology", "healthcare", "banking", "real estate", "manufacturing", "retail"], + company_sizes=["10-50", "50-200", "200-1000", "1000+"], + regions=["الرياض", "جدة", "الدمام", "المنطقة الشرقية", "Riyadh", "Jeddah", "Dammam", "KSA"], + revenue_range_sar={"min": 500_000, "max": 500_000_000}, + tech_signals=["Salesforce", "SAP", "Oracle", "HubSpot", "Zoho", "Microsoft Dynamics", "Excel", "WhatsApp Business"], + growth_signals=["hiring", "expansion", "funding", "partnership", "IPO", "digital transformation", + "توظيف", "توسع", "تمويل", "شراكة", "تحول رقمي"], + languages=["ar", "en"], + target_titles_ar=["مدير تطوير الأعمال", "مدير المبيعات", "الرئيس التنفيذي", "المدير التجاري", + "مدير الشراكات", "مدير التسويق", "مدير المشتريات", "نائب الرئيس"], + target_titles_en=["CEO", "CCO", "VP Sales", "Head of Business Development", "Commercial Director", + "Chief Revenue Officer", "Sales Director", "Partnerships Manager"], + seniority_levels=["C-level", "VP", "Director", "Head of", "Manager"], + motion="sales", + segment="B2B", + discovery_sources=["web_search", "news", "job_boards", "directories", "linkedin_public"], +) diff --git a/salesflow-saas/backend/app/intelligence/outreach.py b/salesflow-saas/backend/app/intelligence/outreach.py new file mode 100644 index 00000000..1a7fafa3 --- /dev/null +++ b/salesflow-saas/backend/app/intelligence/outreach.py @@ -0,0 +1,294 @@ +""" +Outreach Brief Generator — Arabic-first +Generates personalized outreach messages for B2B, B2C, B2T motions. +Templates are rule-based with signal-driven personalization. +Plugs into LLM when OPENAI_API_KEY is set. +""" +import os +import json +import urllib.request +import urllib.error +from typing import Dict, Any, Optional, List +from dataclasses import dataclass + + +@dataclass +class OutreachBrief: + lead_id: str + company_name: str + contact_name: str + contact_title: str + motion: str # sales | partnership | channel | tender + + # Arabic messages + whatsapp_ar: str = "" + email_subject_ar: str = "" + email_body_ar: str = "" + linkedin_ar: str = "" + + # English fallback + email_subject_en: str = "" + email_body_en: str = "" + + # Strategy + angle: str = "" # the specific hook being used + pain_hypothesis: str = "" # what problem we assume they have + value_proposition: str = "" + call_to_action: str = "" + + # Metadata + personalization_score: int = 0 # 0-100 how personalized this is + generated_by: str = "template" # template | llm + + +# Signal → angle mapping +SIGNAL_ANGLES = { + "hiring": { + "angle": "توسع الفريق", + "hook_ar": "لاحظنا أنكم تُوسّعون فريقكم — هذا المرحلة تحتاج منظومة مبيعات قوية", + "hook_en": "Noticed you're scaling your team — this is exactly when a strong sales OS matters", + }, + "funding": { + "angle": "تمويل جديد", + "hook_ar": "تهانينا على جولة التمويل — الشركات بعد التمويل تبني محرك إيرادات سريع", + "hook_en": "Congrats on the funding — post-investment is when you need to build your revenue engine fast", + }, + "expansion": { + "angle": "توسع جغرافي", + "hook_ar": "رأينا توسعكم في السوق — دعونا نساعدكم تُحوّل هذا التوسع لعقود حقيقية", + "hook_en": "Saw your expansion news — let us help you convert that market entry into real contracts", + }, + "digital_transformation": { + "angle": "تحول رقمي", + "hook_ar": "مبادرات التحول الرقمي تحتاج محرك مبيعات ذكي يواكبها", + "hook_en": "Digital transformation initiatives need an intelligent sales engine to match", + }, + "ipo": { + "angle": "استعداد للطرح العام", + "hook_ar": "الاستعداد للطرح العام يتطلب منظومة إيرادات موثوقة وقابلة للتدقيق", + "hook_en": "IPO readiness demands a verifiable and auditable revenue system", + }, + "pain_point_crm": { + "angle": "إدارة علاقات العملاء", + "hook_ar": "إدارة العملاء بالإكسل في 2026 تُكلّف الشركة عقوداً ضائعة", + "hook_en": "Managing clients in spreadsheets in 2026 costs you real contracts", + }, + "pain_point_outreach": { + "angle": "التواصل مع العملاء", + "hook_ar": "فرق المبيعات اليوم تحتاج أدوات ذكية تُولّد ليدز وتُغلق صفقات تلقائياً", + "hook_en": "Sales teams today need AI tools that generate leads and close deals automatically", + }, +} + +# Motion-specific value propositions +MOTION_VALUE_PROPS = { + "sales": { + "ar": "Dealix يُحوّل فريق مبيعاتكم إلى ماكينة إيرادات ذاتية بـ 9 أنظمة تشغيل مدمجة", + "en": "Dealix turns your sales team into a self-driving revenue machine with 9 integrated operating systems", + }, + "partnership": { + "ar": "Dealix يبني منظومة شراكة تُدير التحالفات والحوافز والإيرادات من مكان واحد", + "en": "Dealix builds a partnership ecosystem that manages alliances, incentives and revenues in one place", + }, + "channel": { + "ar": "برنامج الشركاء في Dealix يمنح موزّعيكم أدوات المبيعات الاحترافية بدون تكلفة إضافية", + "en": "Dealix partner program gives your resellers professional sales tools at no extra cost", + }, + "tender": { + "ar": "Dealix يُساعد في بناء ملف المؤهلات الكامل وتتبع الفرص الحكومية والتجارية", + "en": "Dealix helps build full qualification packages and track government and commercial opportunities", + }, +} + +CTA_BY_TIER = { + "P1": { + "ar": "هل لديكم 15 دقيقة هذا الأسبوع لعرض سريع؟", + "en": "Do you have 15 minutes this week for a quick demo?", + }, + "P2": { + "ar": "أودّ إرسال لكم ملف موجز يوضح كيف تستفيد شركات مثلكم من Dealix", + "en": "I'd like to send you a brief overview of how companies like yours benefit from Dealix", + }, + "P3": { + "ar": "سأُبقيكم على اطلاع بتحديثات Dealix — هل موافقون؟", + "en": "I'll keep you updated on Dealix — would that be okay?", + }, + "P4": { + "ar": "تواصل معنا عند الجاهزية", + "en": "Reach out when the time is right", + }, +} + + +def pick_angle(signals: List[str]) -> Dict: + """Pick the best outreach angle based on available signals""" + priority_order = ["funding", "ipo", "expansion", "hiring", "digital_transformation", + "pain_point_crm", "pain_point_outreach"] + for sig in priority_order: + if sig in signals: + return SIGNAL_ANGLES[sig] + return { + "angle": "تطوير الأعمال", + "hook_ar": "نساعد الشركات الرائدة في السعودية على بناء محرك إيرادات ذكي", + "hook_en": "We help leading Saudi companies build an intelligent revenue engine", + } + + +def build_whatsapp_message( + company: str, contact: str, angle_data: Dict, motion: str, tier: str +) -> str: + """Build a short WhatsApp-optimized Arabic message""" + hook = angle_data.get("hook_ar", "") + vp = MOTION_VALUE_PROPS.get(motion, MOTION_VALUE_PROPS["sales"])["ar"] + cta = CTA_BY_TIER.get(tier, CTA_BY_TIER["P3"])["ar"] + + contact_greeting = f"مرحباً {contact}" if contact else f"مرحباً فريق {company}" + + return f"""{contact_greeting}، + +{hook}. + +{vp}. + +{cta} + +— فريق Dealix""" + + +def build_email( + company: str, contact: str, title: str, + angle_data: Dict, motion: str, tier: str, + signals: List[str] +) -> Dict[str, str]: + """Build email subject and body in Arabic + English""" + hook_ar = angle_data.get("hook_ar", "") + hook_en = angle_data.get("hook_en", "") + vp_ar = MOTION_VALUE_PROPS.get(motion, MOTION_VALUE_PROPS["sales"])["ar"] + vp_en = MOTION_VALUE_PROPS.get(motion, MOTION_VALUE_PROPS["sales"])["en"] + cta_ar = CTA_BY_TIER.get(tier, CTA_BY_TIER["P3"])["ar"] + cta_en = CTA_BY_TIER.get(tier, CTA_BY_TIER["P3"])["en"] + + contact_ar = f"{contact}" if contact else f"فريق {company}" + title_mention_ar = f" | {title}" if title else "" + title_mention_en = f", {title}" if title else "" + + subject_ar = f"Dealix × {company} — {angle_data.get('angle', 'فرصة تعاون')}" + subject_en = f"Dealix × {company} — {angle_data.get('angle', 'Partnership Opportunity')}" + + body_ar = f"""مرحباً {contact_ar}{title_mention_ar}، + +{hook_ar}. + +{vp_ar}. + +نحن نعمل مع شركات في قطاعكم ونرى نتائج واضحة: +• زيادة في معدل إغلاق الصفقات +• تقليل وقت دورة المبيعات +• رؤية كاملة للـ pipeline التنفيذي + +{cta_ar} + +مع التقدير، +فريق Dealix +https://dealix.ai""" + + body_en = f"""Hi {contact or 'there'}{title_mention_en}, + +{hook_en}. + +{vp_en}. + +We work with companies in your sector and see clear results: +• Higher deal close rates +• Shorter sales cycle time +• Full executive pipeline visibility + +{cta_en} + +Best regards, +The Dealix Team +https://dealix.ai""" + + return { + "subject_ar": subject_ar, + "body_ar": body_ar, + "subject_en": subject_en, + "body_en": body_en, + } + + +def build_linkedin_message( + company: str, contact: str, angle_data: Dict, motion: str +) -> str: + """LinkedIn connection message — short and professional (300 chars)""" + hook = angle_data.get("hook_ar", "نساعد الشركات على بناء محرك إيرادات ذكي") + return f"مرحباً {contact or 'colleague'}، {hook}. نعمل مع شركات مثل {company} لبناء منظومة مبيعات ذكية. أودّ التواصل معكم."[:300] + + +def generate_outreach_brief( + lead_dict: Dict, + score_dict: Dict, + motion: str = "sales" +) -> OutreachBrief: + """ + Generate a full outreach brief for a scored lead. + lead_dict: from EnrichedLead.to_dict() + score_dict: from score_lead() + """ + company = lead_dict.get("company_name", "") + contact = lead_dict.get("contact_name", "") + title = lead_dict.get("contact_title", "") + signals = lead_dict.get("signals", []) + tier = score_dict.get("tier", "P3") + + angle_data = pick_angle(signals) + email_data = build_email(company, contact, title, angle_data, motion, tier, signals) + + personalization = 30 + if signals: personalization += 30 + if contact: personalization += 20 + if title: personalization += 10 + if lead_dict.get("recent_news"): personalization += 10 + + brief = OutreachBrief( + lead_id=lead_dict.get("id", ""), + company_name=company, + contact_name=contact, + contact_title=title, + motion=motion, + whatsapp_ar=build_whatsapp_message(company, contact, angle_data, motion, tier), + email_subject_ar=email_data["subject_ar"], + email_body_ar=email_data["body_ar"], + email_subject_en=email_data["subject_en"], + email_body_en=email_data["body_en"], + linkedin_ar=build_linkedin_message(company, contact, angle_data, motion), + angle=angle_data.get("angle", ""), + pain_hypothesis=angle_data.get("hook_ar", ""), + value_proposition=MOTION_VALUE_PROPS.get(motion, MOTION_VALUE_PROPS["sales"])["ar"], + call_to_action=CTA_BY_TIER.get(tier, CTA_BY_TIER["P3"])["ar"], + personalization_score=min(100, personalization), + generated_by="template", + ) + return brief + + +def generate_batch_briefs( + scored_leads: List[Dict], motion: str = "sales" +) -> List[Dict]: + """Generate outreach briefs for a list of scored leads (P1+P2 only by default)""" + briefs = [] + for item in scored_leads: + tier = item.get("score", {}).get("tier", "P4") + if tier in ("P1", "P2"): # Only generate for actionable leads + brief = generate_outreach_brief(item["lead"], item["score"], motion) + briefs.append({ + "company": brief.company_name, + "tier": tier, + "angle": brief.angle, + "whatsapp_ar": brief.whatsapp_ar, + "email_subject_ar": brief.email_subject_ar, + "email_body_ar": brief.email_body_ar, + "linkedin_ar": brief.linkedin_ar, + "personalization_score": brief.personalization_score, + }) + return briefs diff --git a/salesflow-saas/backend/app/intelligence/pipeline.py b/salesflow-saas/backend/app/intelligence/pipeline.py new file mode 100644 index 00000000..6f962702 --- /dev/null +++ b/salesflow-saas/backend/app/intelligence/pipeline.py @@ -0,0 +1,158 @@ +""" +Lead Intelligence Pipeline — End-to-end orchestrator +ICP → Discovery → Enrichment → Entity Resolution → Scoring → Outreach Brief +One call drives the full flow. +""" +import time +import json +from typing import Dict, Any, List, Optional +from dataclasses import asdict + +from app.intelligence.icp import ICPConfig, DEALIX_DEFAULT_ICP +from app.intelligence.discovery import LeadDiscoveryEngine +from app.intelligence.enrichment import enrich_batch +from app.intelligence.scoring import score_batch +from app.intelligence.entity_resolution import EntityRegistry +from app.intelligence.outreach import generate_batch_briefs + + +def run_pipeline( + icp: Optional[ICPConfig] = None, + custom_queries: Optional[List[str]] = None, + motion: str = "sales", + max_leads: int = 30, + enrich: bool = True, + generate_outreach: bool = True, + score_weights: Optional[Dict[str, float]] = None, +) -> Dict[str, Any]: + """ + Full lead intelligence pipeline. + + Returns: + { + "run_id": str, + "icp_used": dict, + "total_discovered": int, + "total_after_dedup": int, + "total_enriched": int, + "scored_leads": [...], # all leads sorted by score + "p1_leads": [...], # outreach now + "p2_leads": [...], # enrich more + "p3_leads": [...], # nurture + "outreach_briefs": [...], # generated briefs for P1+P2 + "tier_summary": {...}, + "pipeline_duration_sec": float, + "errors": [...], + } + """ + start_time = time.time() + run_id = f"pipeline_{int(start_time)}" + errors = [] + + # 1. Resolve ICP + icp = icp or DEALIX_DEFAULT_ICP + + # 2. Discovery + engine = LeadDiscoveryEngine(icp=icp) + if custom_queries: + candidates = engine.discover(custom_queries, max_per_query=6) + else: + candidates = engine.discover_from_icp(icp=icp, max_per_query=5) + + total_discovered = len(candidates) + + # 3. Entity Resolution + Dedup + registry = EntityRegistry() + raw_lead_dicts = [ + { + "id": c.id, + "company_name": c.company_name, + "domain": c.domain, + "source": c.source, + "source_url": c.source_url, + "raw_snippet": c.raw_snippet, + "signals": c.signals, + "trigger": c.trigger, + "contact_email": c.contact_email, + "contact_phone": c.phone, + "contact_linkedin": c.contact_linkedin, + "confidence": c.confidence, + "_candidate": c, + } + for c in candidates + ] + deduped_dicts = registry.deduplicate_lead_list(raw_lead_dicts) + deduped_candidates = [d["_candidate"] for d in deduped_dicts[:max_leads]] + + total_after_dedup = len(deduped_candidates) + + # 4. Enrichment + enriched_leads = [] + if enrich: + enriched_leads = enrich_batch(deduped_candidates, delay=0.2) + else: + # Skip enrichment — use candidates as-is + from app.intelligence.enrichment import EnrichedLead + for c in deduped_candidates: + e = EnrichedLead( + id=c.id, + company_name=c.company_name, + domain=c.domain, + industry=c.industry, + region=c.region, + website=f"https://{c.domain}" if c.domain else "", + signals=c.signals, + source=c.source, + source_url=c.source_url, + raw_snippet=c.raw_snippet, + trigger=c.trigger, + contact_email=c.contact_email, + contact_phone=c.phone, + enrichment_confidence=c.confidence, + ) + enriched_leads.append(e) + + total_enriched = len(enriched_leads) + + # 5. Scoring + scored = score_batch(enriched_leads, weights=score_weights) + + # 6. Tier breakdown + tier_counts = {"P1": 0, "P2": 0, "P3": 0, "P4": 0} + p1, p2, p3, p4 = [], [], [], [] + for item in scored: + tier = item["score"]["tier"] + tier_counts[tier] += 1 + if tier == "P1": p1.append(item) + elif tier == "P2": p2.append(item) + elif tier == "P3": p3.append(item) + else: p4.append(item) + + # 7. Outreach briefs + outreach_briefs = [] + if generate_outreach: + outreach_briefs = generate_batch_briefs(scored, motion=motion) + + duration = round(time.time() - start_time, 2) + + return { + "run_id": run_id, + "icp_used": icp.to_dict() if hasattr(icp, 'to_dict') else {}, + "total_discovered": total_discovered, + "total_after_dedup": total_after_dedup, + "total_enriched": total_enriched, + "scored_leads": scored, + "p1_leads": p1, + "p2_leads": p2, + "p3_leads": p3, + "p4_leads": p4, + "outreach_briefs": outreach_briefs, + "tier_summary": { + "P1_outreach_now": tier_counts["P1"], + "P2_enrich_more": tier_counts["P2"], + "P3_nurture": tier_counts["P3"], + "P4_archive": tier_counts["P4"], + }, + "pipeline_duration_sec": duration, + "errors": errors, + } diff --git a/salesflow-saas/backend/app/intelligence/scoring.py b/salesflow-saas/backend/app/intelligence/scoring.py new file mode 100644 index 00000000..0e371de1 --- /dev/null +++ b/salesflow-saas/backend/app/intelligence/scoring.py @@ -0,0 +1,324 @@ +""" +5-Dimension Lead Scoring Engine +Fit | Intent | Access | Value | Urgency + +Master Priority Score = weighted sum → P1/P2/P3/P4 tier +Each dimension returns 0-100. Final score 0-100. +""" +from typing import Dict, Any, Tuple, List +from dataclasses import dataclass + + +@dataclass +class ScoreBreakdown: + fit_score: int = 0 # Is this company our ICP? + intent_score: int = 0 # Are they showing buying signals? + access_score: int = 0 # Can we reach the right person? + value_score: int = 0 # What's the potential deal value? + urgency_score: int = 0 # Is now the right moment? + master_score: int = 0 # Weighted composite + priority_tier: str = "P4" # P1 | P2 | P3 | P4 + priority_label_ar: str = "أرشيف" + score_reasons: List[str] = None + next_action: str = "" + next_action_ar: str = "" + + def __post_init__(self): + if self.score_reasons is None: + self.score_reasons = [] + + +# Signal → intent score contribution +INTENT_SIGNAL_WEIGHTS = { + "hiring": 25, + "expansion": 20, + "funding": 30, + "digital_transformation": 20, + "partnership": 15, + "ipo": 35, + "new_product": 10, + "pain_point_crm": 25, + "pain_point_outreach": 20, + "regulation": 15, +} + +# Industry → fit contribution (for Dealix ICP) +INDUSTRY_FIT = { + "technology": 100, "tech": 100, "تقنية": 100, "software": 95, "saas": 95, + "banking": 90, "financial": 90, "مالية": 90, "بنوك": 90, "fintech": 95, + "healthcare": 85, "رعاية صحية": 85, "hospital": 80, + "real estate": 80, "عقارات": 80, + "manufacturing": 75, "تصنيع": 75, "industrial": 70, + "retail": 70, "تجزئة": 70, "e-commerce": 80, + "logistics": 75, "لوجستيات": 75, "supply chain": 75, + "education": 65, "تعليم": 65, + "government": 60, "حكومة": 60, + "media": 60, "إعلام": 60, +} + +# Company size → value score +SIZE_VALUE = { + "1-50": 30, + "50-200": 55, + "200-1000": 80, + "1000+": 100, + "unknown": 40, +} + +# Seniority → access score +SENIORITY_ACCESS = { + range(90, 101): 100, # C-level + range(80, 90): 85, # VP + range(70, 80): 70, # Director + range(55, 70): 55, # Manager + range(0, 55): 30, # Individual contributor +} + +PRIORITY_THRESHOLDS = { + "P1": 70, # Outreach now + "P2": 50, # Enrich more + "P3": 35, # Nurture +} + +PRIORITY_LABELS_AR = { + "P1": "وصول فوري", + "P2": "إثراء إضافي", + "P3": "تغذية ورعاية", + "P4": "قائمة انتظار", +} + +NEXT_ACTIONS = { + "P1": ("Send personalized outreach — high-priority lead", "أرسل رسالة مخصصة — ليد أولوية عالية"), + "P2": ("Enrich contact data, find decision maker", "أثرِ بيانات الاتصال وحدد صانع القرار"), + "P3": ("Add to nurture sequence, monitor signals", "أضف إلى تسلسل التغذية وراقب الإشارات"), + "P4": ("Archive and watch for trigger", "أرشف وراقب الإشارات المستقبلية"), +} + + +def get_seniority_access_score(decision_maker_score: int) -> int: + for r, score in SENIORITY_ACCESS.items(): + if decision_maker_score in r: + return score + return 30 + + +def score_fit(enriched_lead) -> Tuple[int, List[str]]: + """Score how well this company matches ICP""" + reasons = [] + score = 0 + + # Industry fit + industry = (enriched_lead.industry or enriched_lead.raw_snippet or "").lower() + best_industry_score = 0 + for kw, val in INDUSTRY_FIT.items(): + if kw in industry: + best_industry_score = max(best_industry_score, val) + if best_industry_score > 0: + score += best_industry_score * 0.5 + reasons.append(f"Industry match: {best_industry_score}%") + + # Company size fit + size_score = SIZE_VALUE.get(enriched_lead.company_size, 40) + score += size_score * 0.3 + if enriched_lead.company_size != "unknown": + reasons.append(f"Size '{enriched_lead.company_size}': {size_score}%") + + # Has website / domain + if enriched_lead.domain: + score += 8 + reasons.append("Has domain") + + # Saudi / Gulf region + text = f"{enriched_lead.headquarters} {enriched_lead.region} {enriched_lead.raw_snippet}".lower() + if any(kw in text for kw in ["saudi", "ksa", "السعودية", "الرياض", "riyadh", "جدة", "jeddah", "الخليج", "gulf"]): + score += 12 + reasons.append("Saudi/Gulf region") + + return min(100, int(score)), reasons + + +def score_intent(enriched_lead) -> Tuple[int, List[str]]: + """Score buying intent based on signals""" + reasons = [] + score = 0 + + for signal in enriched_lead.signals: + contribution = INTENT_SIGNAL_WEIGHTS.get(signal, 5) + score += contribution + reasons.append(f"Signal '{signal}': +{contribution}") + + # Recent news adds intent + if enriched_lead.recent_news: + score += min(20, len(enriched_lead.recent_news) * 5) + reasons.append(f"{len(enriched_lead.recent_news)} recent news items") + + # Pain point keywords in snippet + text = (enriched_lead.raw_snippet or "").lower() + pain_keywords = ["struggling", "challenge", "problem", "need", "looking for", + "تحدي", "مشكلة", "نحتاج", "نبحث عن"] + if any(kw in text for kw in pain_keywords): + score += 15 + reasons.append("Pain point language detected") + + return min(100, score), reasons + + +def score_access(enriched_lead) -> Tuple[int, List[str]]: + """Score reachability — can we actually contact the right person?""" + reasons = [] + score = 0 + + if enriched_lead.contact_email: + score += 40 + reasons.append("Has email") + if enriched_lead.contact_phone: + score += 20 + reasons.append("Has phone") + if enriched_lead.contact_linkedin: + score += 25 + reasons.append("Has LinkedIn profile") + if enriched_lead.domain: + score += 20 + reasons.append("Has domain (email inferable)") + + # Decision maker seniority + seniority_score = get_seniority_access_score(enriched_lead.decision_maker_score) + score = int(score * 0.6 + seniority_score * 0.4) + if enriched_lead.contact_title: + reasons.append(f"Title seniority: {seniority_score}%") + + return min(100, score), reasons + + +def score_value(enriched_lead) -> Tuple[int, List[str]]: + """Estimate potential deal value""" + reasons = [] + + # Company size as proxy for revenue potential + size_score = SIZE_VALUE.get(enriched_lead.company_size, 40) + + # Revenue estimate + rev = enriched_lead.annual_revenue_sar + if rev > 100_000_000: + size_score = max(size_score, 100) + reasons.append(f"Revenue >100M SAR") + elif rev > 10_000_000: + size_score = max(size_score, 80) + reasons.append(f"Revenue >10M SAR") + elif rev > 0: + reasons.append(f"Revenue data available") + + # Tech stack indicates budget + if len(enriched_lead.tech_stack) >= 3: + size_score = min(100, size_score + 10) + reasons.append(f"Rich tech stack ({len(enriched_lead.tech_stack)} tools)") + + reasons.append(f"Size-based value score: {size_score}%") + return min(100, size_score), reasons + + +def score_urgency(enriched_lead) -> Tuple[int, List[str]]: + """Score how urgent the timing is""" + reasons = [] + score = 0 + + # Time-sensitive signals + urgent_signals = {"funding": 40, "ipo": 50, "expansion": 30, "hiring": 20, "new_product": 25} + for sig in enriched_lead.signals: + if sig in urgent_signals: + score += urgent_signals[sig] + reasons.append(f"Urgent signal '{sig}': +{urgent_signals[sig]}") + + # Fresh news + if len(enriched_lead.recent_news) >= 2: + score += 15 + reasons.append("Multiple recent news items") + + # If just discovered (high source confidence) + if enriched_lead.enrichment_confidence >= 0.7: + score += 10 + reasons.append("High confidence data") + + return min(100, score), reasons + + +def score_lead(enriched_lead, weights: Dict[str, float] = None) -> ScoreBreakdown: + """ + Compute full 5-dimension score for an enriched lead. + Returns ScoreBreakdown with tier and next action. + """ + if weights is None: + weights = {"fit": 0.30, "intent": 0.25, "access": 0.15, "value": 0.20, "urgency": 0.10} + + fit, fit_reasons = score_fit(enriched_lead) + intent, intent_reasons = score_intent(enriched_lead) + access, access_reasons = score_access(enriched_lead) + value, value_reasons = score_value(enriched_lead) + urgency, urgency_reasons = score_urgency(enriched_lead) + + master = int( + fit * weights["fit"] + + intent * weights["intent"] + + access * weights["access"] + + value * weights["value"] + + urgency * weights["urgency"] + ) + + if master >= PRIORITY_THRESHOLDS["P1"]: + tier = "P1" + elif master >= PRIORITY_THRESHOLDS["P2"]: + tier = "P2" + elif master >= PRIORITY_THRESHOLDS["P3"]: + tier = "P3" + else: + tier = "P4" + + all_reasons = ( + [f"[Fit {fit}]"] + fit_reasons[:2] + + [f"[Intent {intent}]"] + intent_reasons[:2] + + [f"[Access {access}]"] + access_reasons[:2] + + [f"[Value {value}]"] + value_reasons[:1] + + [f"[Urgency {urgency}]"] + urgency_reasons[:1] + ) + + en_action, ar_action = NEXT_ACTIONS[tier] + + return ScoreBreakdown( + fit_score=fit, + intent_score=intent, + access_score=access, + value_score=value, + urgency_score=urgency, + master_score=master, + priority_tier=tier, + priority_label_ar=PRIORITY_LABELS_AR[tier], + score_reasons=all_reasons, + next_action=en_action, + next_action_ar=ar_action, + ) + + +def score_batch(enriched_leads: List, weights: Dict[str, float] = None) -> List[Dict]: + """Score a batch of enriched leads and return sorted results""" + results = [] + for lead in enriched_leads: + breakdown = score_lead(lead, weights) + results.append({ + "lead": lead.to_dict(), + "score": { + "fit": breakdown.fit_score, + "intent": breakdown.intent_score, + "access": breakdown.access_score, + "value": breakdown.value_score, + "urgency": breakdown.urgency_score, + "master": breakdown.master_score, + "tier": breakdown.priority_tier, + "tier_label_ar": breakdown.priority_label_ar, + "reasons": breakdown.score_reasons, + "next_action": breakdown.next_action, + "next_action_ar": breakdown.next_action_ar, + } + }) + # Sort by master score descending + results.sort(key=lambda x: x["score"]["master"], reverse=True) + return results diff --git a/salesflow-saas/backend/app/intelligence/triggers.py b/salesflow-saas/backend/app/intelligence/triggers.py new file mode 100644 index 00000000..91532f4e --- /dev/null +++ b/salesflow-saas/backend/app/intelligence/triggers.py @@ -0,0 +1,183 @@ +""" +Trigger Alert System — Real-time intent signal detection +Monitors: job postings, news, funding, expansion, partnerships, regulatory changes. +Runs as background scan and emits trigger events per lead/company. +""" +import re +import time +import json +import urllib.request +import urllib.parse +from typing import List, Dict, Any, Optional +from dataclasses import dataclass, field + + +@dataclass +class TriggerEvent: + """A detected trigger event for a company""" + company_name: str + trigger_type: str # hiring | funding | expansion | ipo | partnership | regulation | news + trigger_label_ar: str + signal_strength: int # 0-100 + evidence: str # snippet or description + source_url: str + detected_at: str + recommended_action_ar: str + recommended_action_en: str + + +TRIGGER_DEFINITIONS = { + "hiring": { + "label_ar": "توظيف نشط", + "queries": ["{company} hiring 2025", "{company} وظائف 2025", "{company} jobs"], + "keywords": ["hiring", "join our team", "we're looking", "وظائف", "نوظف", "فرص عمل"], + "strength": 60, + "action_ar": "اتصل الآن — الشركة توسّع فريقها وستحتاج منظومة مبيعات", + "action_en": "Reach out now — they're scaling and will need a sales OS", + }, + "funding": { + "label_ar": "تمويل جديد", + "queries": ["{company} funding 2025", "{company} investment raised", "{company} تمويل"], + "keywords": ["raised", "funding", "series", "investment", "تمويل", "استثمار", "جولة"], + "strength": 90, + "action_ar": "أولوية قصوى — اتصل خلال 48 ساعة من التمويل", + "action_en": "Top priority — contact within 48 hours of funding", + }, + "expansion": { + "label_ar": "توسع جديد", + "queries": ["{company} expansion 2025", "{company} new office", "{company} توسع"], + "keywords": ["expansion", "new market", "new office", "opens", "توسع", "افتتاح", "سوق جديد"], + "strength": 75, + "action_ar": "تواصل حول كيفية دعم توسعهم بمنظومة إيرادات", + "action_en": "Reach out about supporting their expansion with a revenue system", + }, + "partnership": { + "label_ar": "شراكة جديدة", + "queries": ["{company} partnership 2025", "{company} شراكة"], + "keywords": ["partnership", "collaboration", "alliance", "شراكة", "تعاون", "تحالف"], + "strength": 55, + "action_ar": "استفسر عن فرص الشراكة الاستراتيجية", + "action_en": "Inquire about strategic partnership opportunities", + }, + "ipo": { + "label_ar": "استعداد للطرح العام", + "queries": ["{company} IPO 2025 2026", "{company} اكتتاب طرح عام"], + "keywords": ["ipo", "initial public offering", "طرح عام", "اكتتاب", "تداول"], + "strength": 95, + "action_ar": "طوارئ — الطرح العام يستلزم منظومة إيرادات موثوقة وقابلة للتدقيق", + "action_en": "Emergency priority — IPO demands auditable, reliable revenue infrastructure", + }, + "digital_transformation": { + "label_ar": "تحول رقمي", + "queries": ["{company} digital transformation", "{company} تحول رقمي", "{company} digitization"], + "keywords": ["digital transformation", "digitization", "modernization", "تحول رقمي", "رقمنة"], + "strength": 65, + "action_ar": "اعرض كيف Dealix يُكمّل مبادرة التحول الرقمي لديهم", + "action_en": "Show how Dealix completes their digital transformation initiative", + }, + "regulation": { + "label_ar": "تغيير تنظيمي", + "queries": ["{company} PDPL ZATCA compliance 2025", "{company} حوكمة ضريبة"], + "keywords": ["pdpl", "zatca", "compliance", "regulation", "حوكمة", "امتثال", "ضريبة"], + "strength": 50, + "action_ar": "ناقش كيف Dealix يُساعد على الامتثال التنظيمي", + "action_en": "Discuss how Dealix supports regulatory compliance", + }, +} + + +def search_triggers_for_company(company_name: str, trigger_type: str) -> List[Dict]: + """Search for trigger signals for a specific company""" + definition = TRIGGER_DEFINITIONS.get(trigger_type, {}) + queries = definition.get("queries", []) + keywords = definition.get("keywords", []) + results = [] + + for query_template in queries[:2]: # Limit queries per trigger + query = query_template.replace("{company}", company_name) + try: + encoded = urllib.parse.quote(query) + url = f"https://html.duckduckgo.com/html/?q={encoded}" + req = urllib.request.Request( + url, + headers={"User-Agent": "Mozilla/5.0 (compatible; DealixBot/1.0)"} + ) + with urllib.request.urlopen(req, timeout=6) as resp: + html = resp.read().decode('utf-8', errors='ignore') + + snippets = re.findall(r']*>(.*?)', html) + urls = re.findall(r']+>', '', snippet).strip().lower() + if any(kw in clean_snippet for kw in keywords): + results.append({ + "snippet": re.sub(r'<[^>]+>', '', snippet).strip(), + "url": urls[i] if i < len(urls) else "", + "query": query, + }) + except Exception: + pass + time.sleep(0.3) + + return results + + +def scan_company_for_triggers(company_name: str) -> List[TriggerEvent]: + """Scan all trigger types for a given company""" + events = [] + now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + + for trigger_type, definition in TRIGGER_DEFINITIONS.items(): + results = search_triggers_for_company(company_name, trigger_type) + if results: + best = results[0] + event = TriggerEvent( + company_name=company_name, + trigger_type=trigger_type, + trigger_label_ar=definition["label_ar"], + signal_strength=definition["strength"], + evidence=best["snippet"][:500], + source_url=best["url"][:300], + detected_at=now, + recommended_action_ar=definition["action_ar"], + recommended_action_en=definition["action_en"], + ) + events.append(event) + + return events + + +def scan_watchlist(company_names: List[str], delay: float = 1.0) -> Dict[str, List[Dict]]: + """ + Scan a watchlist of companies for all trigger types. + Returns dict: {company_name: [trigger_event_dicts]} + """ + all_triggers = {} + + for company in company_names: + events = scan_company_for_triggers(company) + if events: + all_triggers[company] = [ + { + "type": e.trigger_type, + "label_ar": e.trigger_label_ar, + "strength": e.signal_strength, + "evidence": e.evidence, + "url": e.source_url, + "detected_at": e.detected_at, + "action_ar": e.recommended_action_ar, + "action_en": e.recommended_action_en, + } + for e in events + ] + time.sleep(delay) + + return all_triggers + + +def get_strongest_trigger(events: List[Dict]) -> Optional[Dict]: + """Return the highest-priority trigger from a list""" + if not events: + return None + return max(events, key=lambda e: e.get("strength", 0)) diff --git a/salesflow-saas/backend/main.py b/salesflow-saas/backend/main.py new file mode 100644 index 00000000..c89d444e --- /dev/null +++ b/salesflow-saas/backend/main.py @@ -0,0 +1,64 @@ +"""Dealix Sovereign Revenue, Deal, Growth & Commitment OS — Backend""" +from flask import Flask, jsonify +from flask_cors import CORS + +from app.core.database import init_db +from app.api.routes.auth import auth_bp +from app.api.routes.revenue import revenue_bp +from app.api.routes.pricing import pricing_bp +from app.api.routes.partnership import partnership_bp +from app.api.routes.procurement import procurement_bp +from app.api.routes.renewal import renewal_bp +from app.api.routes.expansion import expansion_bp +from app.api.routes.ma import ma_bp +from app.api.routes.pmo import pmo_bp +from app.api.routes.executive import executive_bp +from app.api.routes.intelligence import intelligence_bp + +app = Flask(__name__) +CORS(app, resources={r"/*": {"origins": "*"}}, supports_credentials=True) + +# Register all 9 OS blueprints +app.register_blueprint(auth_bp) +app.register_blueprint(revenue_bp) +app.register_blueprint(pricing_bp) +app.register_blueprint(partnership_bp) +app.register_blueprint(procurement_bp) +app.register_blueprint(renewal_bp) +app.register_blueprint(expansion_bp) +app.register_blueprint(ma_bp) +app.register_blueprint(pmo_bp) +app.register_blueprint(executive_bp) +app.register_blueprint(intelligence_bp) # Revenue Intelligence OS — Lead Machine + +@app.get("/api/health") +def health(): + from app.core.database import db + with db() as conn: + count = conn.execute("SELECT COUNT(*) as c FROM audit_log").fetchone()["c"] + modules = conn.execute("SELECT COUNT(DISTINCT module) as m FROM audit_log").fetchone()["m"] + return jsonify({ + "status": "healthy", + "database": "connected", + "audit_entries": count, + "active_modules": modules, + "modules": [ + "Revenue OS", "Pricing & Margin Control OS", "Partnership & Alliance OS", + "Procurement & Vendor OS", "Renewal & Expansion OS", "Market Entry OS", + "M&A Corporate Development OS", "PMI Strategic PMO OS", "Executive Board OS" + ] + }) + +@app.get("/") +def root(): + return jsonify({ + "product": "Dealix", + "tagline": "Sovereign Revenue, Deal, Growth & Commitment OS", + "version": "2.0.0", + "modules": 9, + "docs": "/api/health" + }) + +if __name__ == "__main__": + init_db() + app.run(host="0.0.0.0", port=8000, debug=False)