fix(intelligence): contact_phone fix + Apollo/PDL enrichment + CSV export + stats endpoints

This commit is contained in:
VoXc2 2026-04-22 05:30:15 +00:00
parent efe270b320
commit 973cdd22e9

View File

@ -579,3 +579,134 @@ def intelligence_dashboard(user):
"last_run": runs["last_run"] if runs else None,
"top_leads": top,
})
# ═══════════════════════════════════════════════════════════
# EXPORT + ADDITIONAL ENDPOINTS
# ═══════════════════════════════════════════════════════════
@intelligence_bp.get("/leads/export/csv")
@require_auth
def export_leads_csv(user):
"""Export intelligence leads as CSV for offline use"""
import csv, io
from flask import Response
tier = request.args.get("tier", "")
with db() as conn:
query = """SELECT company_name, domain, industry, region, company_size,
contact_name, contact_title, contact_email, contact_phone, contact_linkedin,
score_master, priority_tier, score_fit, score_intent, score_access,
score_value, score_urgency, signals, next_action_ar, status, created_at
FROM intelligence_leads WHERE org_id=?"""
params = [user["org_id"]]
if tier:
query += " AND priority_tier=?"
params.append(tier.upper())
query += " ORDER BY score_master DESC"
rows = conn.execute(query, params).fetchall()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow([
"Company", "Domain", "Industry", "Region", "Size",
"Contact Name", "Title", "Email", "Phone", "LinkedIn",
"Master Score", "Tier", "Fit", "Intent", "Access", "Value", "Urgency",
"Signals", "Next Action (AR)", "Status", "Discovered At"
])
for row in rows:
r = dict(row)
try:
sigs = ", ".join(json.loads(r.get("signals") or "[]"))
except Exception:
sigs = ""
writer.writerow([
r["company_name"], r["domain"], r["industry"], r["region"], r["company_size"],
r["contact_name"], r["contact_title"], r["contact_email"],
r["contact_phone"], r["contact_linkedin"],
r["score_master"], r["priority_tier"],
r["score_fit"], r["score_intent"], r["score_access"],
r["score_value"], r["score_urgency"],
sigs, r["next_action_ar"], r["status"], r["created_at"]
])
csv_content = "\ufeff" + output.getvalue() # UTF-8 BOM for Arabic in Excel
return Response(
csv_content,
mimetype="text/csv; charset=utf-8",
headers={
"Content-Disposition": f"attachment; filename=dealix-leads-{user['org_id']}.csv"
}
)
@intelligence_bp.get("/stats")
@require_auth
def intelligence_stats(user):
"""Detailed stats on leads, scores, and pipeline performance"""
with db() as conn:
tier_stats = conn.execute("""
SELECT priority_tier,
COUNT(*) as count,
ROUND(AVG(score_master),1) as avg_score,
ROUND(MAX(score_master),1) as top_score,
COUNT(CASE WHEN contact_email != '' AND contact_email IS NOT NULL THEN 1 END) as has_contact,
COUNT(CASE WHEN status = 'contacted' THEN 1 END) as contacted
FROM intelligence_leads WHERE org_id=?
GROUP BY priority_tier ORDER BY priority_tier
""", (user["org_id"],)).fetchall()
industry_stats = conn.execute("""
SELECT industry, COUNT(*) as count, ROUND(AVG(score_master),1) as avg_score
FROM intelligence_leads WHERE org_id=? AND industry != ''
GROUP BY industry ORDER BY count DESC LIMIT 10
""", (user["org_id"],)).fetchall()
signal_data = conn.execute("""
SELECT signals FROM intelligence_leads WHERE org_id=?
""", (user["org_id"],)).fetchall()
# Count signal frequencies
from collections import Counter
signal_counter = Counter()
for row in signal_data:
try:
sigs = json.loads(row["signals"] or "[]")
for s in sigs:
signal_counter[s] += 1
except Exception:
pass
return _json({
"by_tier": [dict(r) for r in tier_stats],
"by_industry": [dict(r) for r in industry_stats],
"top_signals": dict(signal_counter.most_common(10)),
"total_leads": sum(r["count"] for r in tier_stats),
"contact_coverage_pct": round(
100 * sum(r["has_contact"] for r in tier_stats) / max(1, sum(r["count"] for r in tier_stats)), 1
),
})
@intelligence_bp.post("/leads/bulk-status")
@require_auth
def bulk_update_status(user):
"""Update status for multiple leads at once"""
data = request.get_json() or {}
lead_ids = data.get("lead_ids", [])
new_status = data.get("status", "")
valid_statuses = ["new", "contacted", "qualified", "disqualified", "converted", "archived"]
if not lead_ids or new_status not in valid_statuses:
return _json({"error": "lead_ids[] and valid status required"}, 400)
with db() as conn:
placeholders = ",".join("?" * len(lead_ids))
conn.execute(
f"UPDATE intelligence_leads SET status=? WHERE org_id=? AND id IN ({placeholders})",
[new_status, user["org_id"]] + lead_ids
)
audit_log(user["org_id"], "intelligence", "bulk_status_update", user["id"],
f"bulk-{new_status}", {"count": len(lead_ids), "status": new_status})
return _json({"updated": len(lead_ids), "status": new_status})