system-prompts-and-models-o.../dealix/api/routers/data.py
2026-05-01 14:03:52 +03:00

886 lines
36 KiB
Python

"""
Data Lake + Lead Graph ingestion router.
Endpoints:
POST /api/v1/data/import — register a dataset (JSON rows)
POST /api/v1/data/import/{id}/normalize — normalize raw rows
POST /api/v1/data/import/{id}/dedupe — match + merge into accounts
POST /api/v1/data/import/{id}/enrich — run enrichment for new accounts
GET /api/v1/data/import/{id}/report — totals + per-row counts
POST /api/v1/data/suppression — add opt-out email/phone/domain
GET /api/v1/data/suppression — list suppression rows
GET /api/v1/data/imports — list all imports
GET /api/v1/data/accounts — list accounts (paginated)
GET /api/v1/data/accounts/{id} — single account + signals
POST /api/v1/data/accounts/{id}/score — recompute score from current data
Ingestion is *append-only*. Raw rows are kept; normalization writes
new account/contact/signal records but never deletes raw_lead_rows.
PDPL compliance:
- Every import declares allowed_use, source_type, consent_status, risk_level.
- Suppression list is checked at outreach time, not at ingest time.
"""
from __future__ import annotations
import logging
import uuid
from datetime import datetime, timezone
from typing import Any
from fastapi import APIRouter, Body, HTTPException
from sqlalchemy import select
from auto_client_acquisition.pipelines.dedupe import build_index, find_match
from auto_client_acquisition.pipelines.enrichment import enrich_account
from auto_client_acquisition.pipelines.normalize import (
fuzzy_company_key,
is_acceptable,
normalize_row,
)
from auto_client_acquisition.pipelines.scoring import (
compute_data_quality,
compute_lead_score,
)
from db.models import (
AccountRecord,
ContactRecord,
LeadScoreRecord,
RawLeadImport,
RawLeadRow,
SignalRecord,
SuppressionRecord,
)
from db.session import async_session_factory
router = APIRouter(prefix="/api/v1/data", tags=["data"])
log = logging.getLogger(__name__)
# ── Data Source Catalog (compliance-graded) ──────────────────────
SAUDI_DATA_SOURCE_CATALOG: list[dict[str, Any]] = [
{
"key": "riyadh_chamber",
"name_ar": "غرفة الرياض — دليل الأعضاء",
"name_en": "Riyadh Chamber of Commerce Member Directory",
"url": "https://chamber.org.sa",
"rating": "green",
"access_method": "public_web",
"coverage_city": ["riyadh"],
"coverage_sector": "all",
"ingest_strategy": "crawl_with_requests_bs4_provider",
},
{
"key": "jeddah_chamber",
"name_ar": "غرفة جدة — دليل الأعضاء",
"name_en": "Jeddah Chamber of Commerce Member Directory",
"url": "https://jcci.org.sa",
"rating": "green",
"access_method": "public_web",
"coverage_city": ["jeddah"],
"coverage_sector": "all",
},
{
"key": "eastern_chamber",
"name_ar": "غرفة الشرقية",
"name_en": "Asharqia Chamber",
"url": "https://chamber.org.sa/eastern",
"rating": "green",
"access_method": "public_web",
"coverage_city": ["dammam", "khobar", "jubail"],
"coverage_sector": "all",
},
{
"key": "data_gov_sa",
"name_ar": "بوابة البيانات المفتوحة (سدايا)",
"name_en": "SDAIA Open Data Portal",
"url": "https://data.gov.sa",
"rating": "green",
"access_method": "public_dataset_download",
"coverage_city": "all",
"coverage_sector": "all",
},
{
"key": "google_places",
"name_ar": "Google Places (Maps API)",
"name_en": "Google Places via MapsProvider chain",
"url": "internal:auto_client_acquisition.providers.maps",
"rating": "green",
"access_method": "api_with_key",
"coverage_city": "all",
"coverage_sector": "all",
"ingest_strategy": "store_place_id_only_per_terms",
},
{
"key": "saudi_contractors_authority",
"name_ar": "هيئة المقاولين السعودية",
"name_en": "Saudi Contractors Authority Registry",
"url": "https://sca.org.sa",
"rating": "green",
"access_method": "public_web",
"coverage_sector": ["construction"],
},
{
"key": "saudi_tourism_authority",
"name_ar": "هيئة السياحة السعودية",
"name_en": "Saudi Tourism Authority Registry",
"url": "https://scth.gov.sa",
"rating": "green",
"access_method": "public_web",
"coverage_sector": ["hospitality_events"],
},
{
"key": "linkedin",
"name_ar": "LinkedIn",
"name_en": "LinkedIn",
"url": "https://www.linkedin.com",
"rating": "red",
"access_method": "scraping_prohibited",
"ingest_strategy": "manual_research_only_no_bulk_ingest",
"note": "Dealix uses LinkedIn for human research + human send only — never for data ingestion.",
},
{
"key": "linkedin_chamber_other_yellow",
"name_ar": "أدلة تجارية مدفوعة",
"name_en": "Paid B2B Data Vendors (general)",
"url": "various",
"rating": "yellow",
"access_method": "purchase_with_documentation",
"ingest_strategy": "audit_lead_file_first_then_import",
"note": "Demand source documentation, allowed_use, last_updated, sample of 100 rows before paying.",
},
]
def _new_id(prefix: str = "") -> str:
suffix = uuid.uuid4().hex[:24]
return f"{prefix}{suffix}" if prefix else suffix
def _utcnow() -> datetime:
return datetime.now(timezone.utc).replace(tzinfo=None)
async def _safe_commit(session, *objs: Any) -> bool:
try:
for o in objs:
session.add(o)
await session.commit()
return True
except Exception as exc: # noqa: BLE001
log.warning("data_router_commit_failed err=%s", exc)
try:
await session.rollback()
except Exception:
pass
return False
# ── Source catalog ────────────────────────────────────────────────
@router.get("/sources/catalog")
async def list_data_sources() -> dict[str, Any]:
"""Compliance-graded Saudi business data source catalog."""
return {
"count": len(SAUDI_DATA_SOURCE_CATALOG),
"rating_legend": {
"green": "public + clearly permissive — direct ingest",
"yellow": "public but ToS-sensitive — lookup-only, manual approval",
"red": "scraping forbidden / paywalled-without-allowed-use — DO NOT INGEST",
},
"sources": SAUDI_DATA_SOURCE_CATALOG,
"doc": "See docs/ops/SAUDI_DATA_SOURCE_CATALOG.md for ingestion strategy per source.",
}
# ── Import: register a dataset ────────────────────────────────────
@router.post("/import")
async def create_import(body: dict[str, Any] = Body(...)) -> dict[str, Any]:
"""
Register a dataset. Body:
source_name (required, str)
source_type (required, one of: owned/public/paid/partner/google_maps/google_search/manual)
allowed_use (optional, str — defaults to "business_contact_research_only")
consent_status (optional)
risk_level (optional, low/medium/high)
rows (required, list[dict] — raw records, can be loose schema)
file_name (optional)
imported_by (optional)
notes (optional)
"""
source_name = str(body.get("source_name") or "").strip()
source_type = str(body.get("source_type") or "").strip()
rows = body.get("rows")
if not source_name:
raise HTTPException(400, "source_name_required")
if source_type not in {
"owned", "public", "paid", "partner",
"google_maps", "google_search", "manual",
}:
raise HTTPException(400, "source_type_invalid")
if not isinstance(rows, list) or not rows:
raise HTTPException(400, "rows_required: provide a non-empty list of dicts")
if len(rows) > 10000:
raise HTTPException(400, "too_many_rows: max 10000 per import; split into batches")
import_id = _new_id("imp_")
rec = RawLeadImport(
id=import_id,
source_name=source_name,
source_type=source_type,
file_name=body.get("file_name"),
imported_by=body.get("imported_by"),
allowed_use=str(body.get("allowed_use") or "business_contact_research_only"),
consent_status=str(body.get("consent_status") or "unknown"),
risk_level=str(body.get("risk_level") or "medium"),
rows_total=len(rows),
notes=body.get("notes"),
status="raw",
)
raw_rows = [
RawLeadRow(
id=_new_id("rr_"),
import_id=import_id,
raw_json=r if isinstance(r, dict) else {"value": r},
normalized_status="pending",
)
for r in rows
]
async with async_session_factory() as session:
ok = await _safe_commit(session, rec, *raw_rows)
if not ok:
return {
"import_id": import_id,
"status": "skipped_db_unreachable",
"rows_total": len(rows),
}
return {
"import_id": import_id,
"status": "raw",
"rows_total": len(rows),
"next_action": f"POST /api/v1/data/import/{import_id}/normalize",
}
# ── Normalize ─────────────────────────────────────────────────────
@router.post("/import/{import_id}/normalize")
async def normalize_import(import_id: str) -> dict[str, Any]:
async with async_session_factory() as session:
try:
imp_rec = (await session.execute(
select(RawLeadImport).where(RawLeadImport.id == import_id)
)).scalar_one_or_none()
except Exception as exc: # noqa: BLE001
return {"status": "skipped_db_unreachable", "error": str(exc)}
if not imp_rec:
raise HTTPException(404, "import_not_found")
rows = (await session.execute(
select(RawLeadRow).where(RawLeadRow.import_id == import_id)
)).scalars().all()
normalized_count = 0
rejected_count = 0
accounts_created: list[str] = []
for row in rows:
if row.normalized_status != "pending":
continue
try:
normalized = normalize_row(row.raw_json or {})
except Exception as exc: # noqa: BLE001
row.normalized_status = "rejected"
row.error = f"normalize_error: {exc}"
rejected_count += 1
continue
ok, reason = is_acceptable(normalized)
if not ok:
row.normalized_status = "rejected"
row.error = reason or "unacceptable"
rejected_count += 1
continue
# Create AccountRecord stub (dedupe runs in next step)
acc_id = _new_id("acc_")
acc = AccountRecord(
id=acc_id,
company_name=normalized["company_name"][:255],
normalized_name=normalized["normalized_name"][:255],
domain=normalized["domain"],
website=normalized["website"][:500] if normalized["website"] else None,
city=normalized["city"][:128] if normalized["city"] else None,
country=normalized["country"][:64] if normalized["country"] else "SA",
sector=normalized["sector"][:64] if normalized["sector"] else None,
google_place_id=normalized["google_place_id"][:128]
if normalized["google_place_id"] else None,
source_count=1,
best_source=imp_rec.source_type,
risk_level=imp_rec.risk_level,
status="new",
extra={
"import_id": import_id,
"source_url": normalized["source_url"],
"raw_keys": normalized["raw_keys"],
"allowed_use": imp_rec.allowed_use,
"consent_status": imp_rec.consent_status,
},
)
session.add(acc)
accounts_created.append(acc_id)
# Optional contact
if normalized["email"] or normalized["phone"] or normalized["contact_name"]:
session.add(ContactRecord(
id=_new_id("ct_"),
account_id=acc_id,
name=normalized["contact_name"][:255] if normalized["contact_name"] else None,
role=normalized["role"][:128] if normalized["role"] else None,
email=normalized["email"][:255] if normalized["email"] else None,
phone=normalized["phone"][:32] if normalized["phone"] else None,
source=imp_rec.source_type,
consent_status=imp_rec.consent_status,
opt_out=False,
risk_level=imp_rec.risk_level,
))
row.normalized_status = "ok"
row.account_id = acc_id
normalized_count += 1
imp_rec.rows_normalized = normalized_count
imp_rec.rows_rejected = rejected_count
imp_rec.status = "normalized"
imp_rec.updated_at = _utcnow()
try:
await session.commit()
except Exception as exc: # noqa: BLE001
await session.rollback()
return {"status": "commit_failed", "error": str(exc)}
return {
"import_id": import_id,
"status": "normalized",
"rows_normalized": normalized_count,
"rows_rejected": rejected_count,
"accounts_created": len(accounts_created),
"next_action": f"POST /api/v1/data/import/{import_id}/dedupe",
}
# ── Dedupe ────────────────────────────────────────────────────────
@router.post("/import/{import_id}/dedupe")
async def dedupe_import(import_id: str) -> dict[str, Any]:
"""Match accounts created by this import against the existing graph."""
async with async_session_factory() as session:
try:
imp_rec = (await session.execute(
select(RawLeadImport).where(RawLeadImport.id == import_id)
)).scalar_one_or_none()
if not imp_rec:
raise HTTPException(404, "import_not_found")
all_accounts = (await session.execute(select(AccountRecord))).scalars().all()
except HTTPException:
raise
except Exception as exc: # noqa: BLE001
return {"status": "skipped_db_unreachable", "error": str(exc)}
# Split into already-existing (from prior imports) vs this-import's new ones
new_for_import = [a for a in all_accounts if (a.extra or {}).get("import_id") == import_id]
existing = [a for a in all_accounts if a.id not in {n.id for n in new_for_import}]
existing_dicts = [
{
"id": a.id, "company_name": a.company_name,
"normalized_name": a.normalized_name, "domain": a.domain,
"website": a.website, "city": a.city,
"phone": None, "email": None, # not on AccountRecord directly
"google_place_id": a.google_place_id,
}
for a in existing
]
idx = build_index(existing_dicts)
merged_count = 0
kept_count = 0
for acc in new_for_import:
normalized = {
"company_name": acc.company_name,
"normalized_name": acc.normalized_name,
"domain": acc.domain,
"phone": None,
"email": None,
"google_place_id": acc.google_place_id,
"city": acc.city,
}
match_id, match_kind = find_match(normalized, idx)
if match_id:
# Merge: increment source_count on the canonical, mark this one as duplicate
target = next((a for a in existing if a.id == match_id), None)
if target is not None:
target.source_count = (target.source_count or 1) + 1
extra = dict(target.extra or {})
sources = list(extra.get("sources", []))
if imp_rec.source_type not in sources:
sources.append(imp_rec.source_type)
extra["sources"] = sources
target.extra = extra
acc.status = "merged_into"
acc.extra = {**(acc.extra or {}), "merged_into": match_id, "match_kind": match_kind}
merged_count += 1
else:
kept_count += 1
imp_rec.rows_duplicate = merged_count
imp_rec.status = "deduped"
imp_rec.updated_at = _utcnow()
try:
await session.commit()
except Exception as exc: # noqa: BLE001
await session.rollback()
return {"status": "commit_failed", "error": str(exc)}
return {
"import_id": import_id,
"status": "deduped",
"merged": merged_count,
"new_accounts": kept_count,
"next_action": f"POST /api/v1/data/import/{import_id}/enrich",
}
# ── Enrich ────────────────────────────────────────────────────────
@router.post("/import/{import_id}/enrich")
async def enrich_import(import_id: str, body: dict[str, Any] = Body(default={})) -> dict[str, Any]:
"""
Run the enrichment pipeline against new accounts created by this import.
Body:
enrichment_level: basic / standard / deep (default: standard)
max_accounts: int (default 25 — cap to avoid runaway API calls)
"""
level = str(body.get("enrichment_level") or "standard")
max_accounts = int(body.get("max_accounts") or 25)
if max_accounts < 1 or max_accounts > 200:
raise HTTPException(400, "max_accounts_out_of_range: 1..200")
async with async_session_factory() as session:
try:
new_accounts = (await session.execute(
select(AccountRecord).where(
AccountRecord.status == "new"
).limit(max_accounts)
)).scalars().all()
except Exception as exc: # noqa: BLE001
return {"status": "skipped_db_unreachable", "error": str(exc)}
# Filter to accounts from this import
from_this_import = [
a for a in new_accounts
if (a.extra or {}).get("import_id") == import_id
]
enriched = 0
for acc in from_this_import:
account_dict = {
"id": acc.id,
"company_name": acc.company_name,
"domain": acc.domain,
"website": acc.website,
"city": acc.city,
"country": acc.country,
"sector": acc.sector,
"google_place_id": acc.google_place_id,
"best_source": acc.best_source,
"source_type": acc.best_source,
"allowed_use": (acc.extra or {}).get("allowed_use"),
"risk_level": acc.risk_level,
}
try:
result = await enrich_account(account_dict, enrichment_level=level)
except Exception as exc: # noqa: BLE001
log.warning("enrich_failed acc=%s err=%s", acc.id, exc)
continue
# Persist signals
for s in result.get("signals", []):
session.add(SignalRecord(
id=_new_id("sig_"),
account_id=acc.id,
signal_type=str(s.get("signal_type") or "tech")[:64],
signal_value=str(s.get("signal_value") or "")[:500],
source_url=s.get("source_url"),
confidence=float(s.get("confidence") or 0.5),
))
# Persist score
sc = result.get("score") or {}
session.add(LeadScoreRecord(
id=_new_id("ls_"),
account_id=acc.id,
fit_score=float(sc.get("fit") or 0),
intent_score=float(sc.get("intent") or 0),
urgency_score=float(sc.get("urgency") or 0),
risk_score=float(sc.get("risk") or 0),
total_score=float(sc.get("total") or 0),
priority=str(sc.get("priority") or "P3")[:8],
recommended_channel=sc.get("recommended_channel"),
reason=sc.get("reason"),
))
# Update account with crawled domain + DQ score
if result.get("domain") and not acc.domain:
acc.domain = result["domain"]
acc.website = f"https://{result['domain']}"
acc.data_quality_score = float(result.get("data_quality", {}).get("score", 0))
acc.status = "enriched"
acc.updated_at = _utcnow()
# Persist new contacts (avoid dup by email/phone)
for c in result.get("contacts", []):
if c.get("type") == "email":
session.add(ContactRecord(
id=_new_id("ct_"),
account_id=acc.id,
name=c.get("name"),
role=c.get("role"),
email=c.get("value"),
source=c.get("source") or "enrichment",
consent_status="legitimate_interest",
opt_out=False,
risk_level=acc.risk_level,
))
elif c.get("type") in ("phone", "whatsapp"):
session.add(ContactRecord(
id=_new_id("ct_"),
account_id=acc.id,
name=None,
role=None,
phone=c.get("value"),
source=c.get("source") or "enrichment",
consent_status="legitimate_interest",
opt_out=False,
risk_level=acc.risk_level,
))
enriched += 1
try:
await session.commit()
except Exception as exc: # noqa: BLE001
await session.rollback()
return {"status": "commit_failed", "error": str(exc)}
return {
"import_id": import_id,
"status": "enriched",
"accounts_enriched": enriched,
"level": level,
"next_action": f"GET /api/v1/data/import/{import_id}/report",
}
# ── Report ────────────────────────────────────────────────────────
@router.get("/import/{import_id}/report")
async def import_report(import_id: str) -> dict[str, Any]:
async with async_session_factory() as session:
try:
imp = (await session.execute(
select(RawLeadImport).where(RawLeadImport.id == import_id)
)).scalar_one_or_none()
if not imp:
raise HTTPException(404, "import_not_found")
accounts = (await session.execute(select(AccountRecord))).scalars().all()
except HTTPException:
raise
except Exception as exc: # noqa: BLE001
return {"status": "skipped_db_unreachable", "error": str(exc)}
related = [a for a in accounts if (a.extra or {}).get("import_id") == import_id]
priority_counts: dict[str, int] = {}
# Collect latest scores per account
try:
scores = (await session.execute(
select(LeadScoreRecord).where(
LeadScoreRecord.account_id.in_([a.id for a in related])
)
)).scalars().all()
for s in scores:
priority_counts[s.priority] = priority_counts.get(s.priority, 0) + 1
except Exception:
scores = []
return {
"import_id": import_id,
"source_name": imp.source_name,
"source_type": imp.source_type,
"status": imp.status,
"rows_total": imp.rows_total,
"rows_normalized": imp.rows_normalized,
"rows_rejected": imp.rows_rejected,
"rows_duplicate": imp.rows_duplicate,
"accounts_in_graph_from_this_import": len(related),
"scored_accounts": len(scores),
"priority_distribution": priority_counts,
"allowed_use": imp.allowed_use,
"consent_status": imp.consent_status,
"risk_level": imp.risk_level,
}
# ── Suppression list ──────────────────────────────────────────────
@router.post("/suppression")
async def add_suppression(body: dict[str, Any] = Body(...)) -> dict[str, Any]:
"""
Add a suppression entry. At least one of email/phone/domain required.
Body: {email?, phone?, domain?, reason?}
"""
email = body.get("email")
phone = body.get("phone")
domain = body.get("domain")
if not (email or phone or domain):
raise HTTPException(400, "at_least_one_of_email_phone_domain_required")
rec = SuppressionRecord(
id=_new_id("sup_"),
email=str(email).strip().lower() if email else None,
phone=str(phone).strip() if phone else None,
domain=str(domain).strip().lower() if domain else None,
reason=str(body.get("reason") or "opt_out")[:128],
)
async with async_session_factory() as session:
ok = await _safe_commit(session, rec)
return {
"id": rec.id,
"email": rec.email, "phone": rec.phone, "domain": rec.domain,
"reason": rec.reason,
"status": "ok" if ok else "skipped_db_unreachable",
}
@router.get("/suppression")
async def list_suppression(limit: int = 200) -> dict[str, Any]:
async with async_session_factory() as session:
try:
rows = (await session.execute(
select(SuppressionRecord).limit(min(1000, limit))
)).scalars().all()
except Exception as exc: # noqa: BLE001
return {"status": "skipped_db_unreachable", "error": str(exc), "items": []}
return {
"status": "ok",
"count": len(rows),
"items": [
{
"id": r.id, "email": r.email, "phone": r.phone, "domain": r.domain,
"reason": r.reason, "created_at": r.created_at.isoformat(),
}
for r in rows
],
}
# ── Listings ──────────────────────────────────────────────────────
@router.get("/imports")
async def list_imports(limit: int = 50) -> dict[str, Any]:
async with async_session_factory() as session:
try:
rows = (await session.execute(
select(RawLeadImport).order_by(RawLeadImport.created_at.desc()).limit(min(500, limit))
)).scalars().all()
except Exception as exc: # noqa: BLE001
return {"status": "skipped_db_unreachable", "error": str(exc), "items": []}
return {
"count": len(rows),
"items": [
{
"id": r.id, "source_name": r.source_name, "source_type": r.source_type,
"status": r.status, "rows_total": r.rows_total,
"rows_normalized": r.rows_normalized, "rows_rejected": r.rows_rejected,
"rows_duplicate": r.rows_duplicate,
"risk_level": r.risk_level, "created_at": r.created_at.isoformat(),
}
for r in rows
],
}
@router.get("/accounts")
async def list_accounts(
limit: int = 50,
sector: str | None = None,
city: str | None = None,
status: str | None = None,
priority: str | None = None,
) -> dict[str, Any]:
async with async_session_factory() as session:
try:
q = select(AccountRecord)
if sector:
q = q.where(AccountRecord.sector == sector)
if city:
q = q.where(AccountRecord.city == city)
if status:
q = q.where(AccountRecord.status == status)
q = q.order_by(AccountRecord.data_quality_score.desc()).limit(min(500, limit))
rows = (await session.execute(q)).scalars().all()
score_map: dict[str, LeadScoreRecord] = {}
if rows:
ids = [r.id for r in rows]
scores = (await session.execute(
select(LeadScoreRecord).where(LeadScoreRecord.account_id.in_(ids))
)).scalars().all()
for s in scores:
if s.account_id not in score_map or s.created_at > score_map[s.account_id].created_at:
score_map[s.account_id] = s
except Exception as exc: # noqa: BLE001
return {"status": "skipped_db_unreachable", "error": str(exc), "items": []}
items = []
for a in rows:
s = score_map.get(a.id)
if priority and (not s or s.priority != priority):
continue
items.append({
"id": a.id, "company_name": a.company_name, "domain": a.domain,
"website": a.website, "city": a.city, "sector": a.sector,
"google_place_id": a.google_place_id, "source_count": a.source_count,
"best_source": a.best_source, "status": a.status,
"data_quality_score": a.data_quality_score, "risk_level": a.risk_level,
"score": {
"fit": s.fit_score, "intent": s.intent_score,
"total": s.total_score, "priority": s.priority,
"recommended_channel": s.recommended_channel,
} if s else None,
})
return {"count": len(items), "items": items}
@router.get("/accounts/{account_id}")
async def get_account(account_id: str) -> dict[str, Any]:
async with async_session_factory() as session:
try:
acc = (await session.execute(
select(AccountRecord).where(AccountRecord.id == account_id)
)).scalar_one_or_none()
if not acc:
raise HTTPException(404, "account_not_found")
contacts = (await session.execute(
select(ContactRecord).where(ContactRecord.account_id == account_id)
)).scalars().all()
signals = (await session.execute(
select(SignalRecord).where(SignalRecord.account_id == account_id)
)).scalars().all()
scores = (await session.execute(
select(LeadScoreRecord).where(
LeadScoreRecord.account_id == account_id
).order_by(LeadScoreRecord.created_at.desc()).limit(1)
)).scalars().all()
except HTTPException:
raise
except Exception as exc: # noqa: BLE001
return {"status": "skipped_db_unreachable", "error": str(exc)}
latest = scores[0] if scores else None
return {
"account": {
"id": acc.id, "company_name": acc.company_name,
"domain": acc.domain, "website": acc.website,
"city": acc.city, "country": acc.country, "sector": acc.sector,
"google_place_id": acc.google_place_id,
"source_count": acc.source_count, "best_source": acc.best_source,
"status": acc.status, "data_quality_score": acc.data_quality_score,
"risk_level": acc.risk_level, "extra": acc.extra,
"created_at": acc.created_at.isoformat(),
"updated_at": acc.updated_at.isoformat(),
},
"contacts": [
{"id": c.id, "name": c.name, "role": c.role, "email": c.email,
"phone": c.phone, "source": c.source, "consent_status": c.consent_status,
"opt_out": c.opt_out, "risk_level": c.risk_level}
for c in contacts
],
"signals": [
{"id": s.id, "type": s.signal_type, "value": s.signal_value,
"source_url": s.source_url, "confidence": s.confidence,
"detected_at": s.detected_at.isoformat()}
for s in signals
],
"score": {
"fit": latest.fit_score, "intent": latest.intent_score,
"urgency": latest.urgency_score, "risk": latest.risk_score,
"total": latest.total_score, "priority": latest.priority,
"recommended_channel": latest.recommended_channel, "reason": latest.reason,
} if latest else None,
}
@router.post("/accounts/{account_id}/score")
async def score_account(account_id: str) -> dict[str, Any]:
"""Recompute score from current data in the graph."""
async with async_session_factory() as session:
try:
acc = (await session.execute(
select(AccountRecord).where(AccountRecord.id == account_id)
)).scalar_one_or_none()
if not acc:
raise HTTPException(404, "account_not_found")
contacts = (await session.execute(
select(ContactRecord).where(ContactRecord.account_id == account_id)
)).scalars().all()
signals = (await session.execute(
select(SignalRecord).where(SignalRecord.account_id == account_id)
)).scalars().all()
except HTTPException:
raise
except Exception as exc: # noqa: BLE001
return {"status": "skipped_db_unreachable", "error": str(exc)}
first_email = next((c.email for c in contacts if c.email), None)
first_phone = next((c.phone for c in contacts if c.phone), None)
account_dict = {
"id": acc.id, "company_name": acc.company_name, "domain": acc.domain,
"website": acc.website, "city": acc.city, "country": acc.country,
"sector": acc.sector, "google_place_id": acc.google_place_id,
"best_source": acc.best_source, "source_count": acc.source_count,
"risk_level": acc.risk_level, "email": first_email, "phone": first_phone,
"allowed_use": (acc.extra or {}).get("allowed_use"),
"opt_out": any(c.opt_out for c in contacts),
"signals": signals,
}
sig_dicts = [
{"signal_type": s.signal_type, "signal_value": s.signal_value,
"confidence": s.confidence}
for s in signals
]
sb = compute_lead_score(account_dict, signals=sig_dicts, technologies=[])
dq, _reasons = compute_data_quality(account_dict)
rec = LeadScoreRecord(
id=_new_id("ls_"),
account_id=account_id,
fit_score=sb.fit, intent_score=sb.intent, urgency_score=sb.urgency,
risk_score=sb.risk, total_score=sb.total, priority=sb.priority,
recommended_channel=sb.recommended_channel, reason=sb.reason,
)
acc.data_quality_score = dq
acc.updated_at = _utcnow()
ok = await _safe_commit(session, rec)
if not ok:
return {"status": "commit_failed"}
return {
"account_id": account_id,
"score": {
"fit": sb.fit, "intent": sb.intent, "urgency": sb.urgency,
"risk": sb.risk, "total": sb.total, "priority": sb.priority,
"recommended_channel": sb.recommended_channel, "reason": sb.reason,
},
"data_quality_score": dq,
}