mirror of
https://github.com/x1xhlol/system-prompts-and-models-of-ai-tools.git
synced 2026-06-18 15:29:36 +00:00
441 lines
17 KiB
Python
441 lines
17 KiB
Python
"""
|
|
Email send router — Gmail OAuth send + status + replies sync.
|
|
|
|
Endpoints:
|
|
POST /api/v1/email/connect/gmail — return OAuth setup checklist
|
|
POST /api/v1/email/send-approved — send a single approved row
|
|
POST /api/v1/email/send-batch — send a batch of up to BATCH_SIZE rows
|
|
GET /api/v1/email/status — Gmail config + today counts
|
|
POST /api/v1/email/replies/sync — manual reply ingestion (until Pub/Sub)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
from fastapi import APIRouter, Body, HTTPException
|
|
from sqlalchemy import func, select
|
|
|
|
from auto_client_acquisition.email.compliance import (
|
|
append_opt_out_line,
|
|
check_outreach,
|
|
get_batch_interval_seconds,
|
|
get_batch_size,
|
|
get_daily_limit,
|
|
)
|
|
from auto_client_acquisition.email.gmail_send import (
|
|
get_oauth_setup_instructions,
|
|
is_configured as gmail_is_configured,
|
|
send_email,
|
|
)
|
|
from auto_client_acquisition.email.reply_classifier import classify_reply
|
|
from db.models import (
|
|
AccountRecord,
|
|
ContactRecord,
|
|
EmailSendLog,
|
|
OutreachQueueRecord,
|
|
SuppressionRecord,
|
|
)
|
|
from db.session import async_session_factory
|
|
|
|
router = APIRouter(prefix="/api/v1/email", tags=["email"])
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def _new_id(prefix: str = "") -> str:
|
|
return f"{prefix}{uuid.uuid4().hex[:24]}" if prefix else uuid.uuid4().hex[:24]
|
|
|
|
|
|
def _utcnow() -> datetime:
|
|
return datetime.now(timezone.utc).replace(tzinfo=None)
|
|
|
|
|
|
@router.post("/connect/gmail")
|
|
async def connect_gmail() -> dict[str, Any]:
|
|
"""Returns the exact 8-step OAuth setup Sami runs once locally."""
|
|
if gmail_is_configured():
|
|
return {"status": "already_configured", "sender_email": os.getenv("GMAIL_SENDER_EMAIL", "")}
|
|
return {"status": "needs_setup", **get_oauth_setup_instructions()}
|
|
|
|
|
|
@router.get("/status")
|
|
async def email_status() -> dict[str, Any]:
|
|
today_start = _utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
|
|
async with async_session_factory() as session:
|
|
try:
|
|
sent_today = int((await session.execute(
|
|
select(func.count()).select_from(EmailSendLog).where(
|
|
EmailSendLog.status == "sent",
|
|
EmailSendLog.sent_at >= today_start,
|
|
)
|
|
)).scalar() or 0)
|
|
queued = int((await session.execute(
|
|
select(func.count()).select_from(OutreachQueueRecord).where(
|
|
OutreachQueueRecord.status.in_(["queued", "approved"])
|
|
)
|
|
)).scalar() or 0)
|
|
except Exception as exc: # noqa: BLE001
|
|
return {"status": "skipped_db_unreachable", "error": str(exc)}
|
|
|
|
return {
|
|
"gmail_configured": gmail_is_configured(),
|
|
"sender_email": os.getenv("GMAIL_SENDER_EMAIL") or "(unset)",
|
|
"limits": {
|
|
"daily_email_limit": get_daily_limit(),
|
|
"batch_size": get_batch_size(),
|
|
"batch_interval_minutes": get_batch_interval_seconds() // 60,
|
|
},
|
|
"sent_today": sent_today,
|
|
"remaining_today": max(0, get_daily_limit() - sent_today),
|
|
"approval_queue_size": queued,
|
|
}
|
|
|
|
|
|
async def _gather_compliance_inputs(
|
|
*, to_email: str, account_id: str | None
|
|
) -> dict[str, Any]:
|
|
"""Pull suppression + contact + recent-send state needed for compliance gate."""
|
|
async with async_session_factory() as session:
|
|
sup_emails: set[str] = set()
|
|
sup_domains: set[str] = set()
|
|
contact_opt_out = False
|
|
bounced_before = False
|
|
risk_score = 0.0
|
|
allowed_use = "business_contact_research_only"
|
|
try:
|
|
sup_rows = (await session.execute(select(SuppressionRecord))).scalars().all()
|
|
for r in sup_rows:
|
|
if r.email: sup_emails.add(r.email.lower())
|
|
if r.domain: sup_domains.add(r.domain.lower())
|
|
|
|
if account_id:
|
|
acc = (await session.execute(
|
|
select(AccountRecord).where(AccountRecord.id == account_id)
|
|
)).scalar_one_or_none()
|
|
if acc:
|
|
allowed_use = (acc.extra or {}).get("allowed_use") or allowed_use
|
|
if (acc.risk_level or "").lower() == "high":
|
|
risk_score = 80.0
|
|
|
|
contacts = (await session.execute(
|
|
select(ContactRecord).where(ContactRecord.account_id == account_id)
|
|
)).scalars().all()
|
|
for c in contacts:
|
|
if c.email and c.email.lower() == to_email.lower() and c.opt_out:
|
|
contact_opt_out = True
|
|
|
|
bounce_log = (await session.execute(
|
|
select(EmailSendLog).where(
|
|
EmailSendLog.to_email == to_email,
|
|
EmailSendLog.status == "bounced",
|
|
).limit(1)
|
|
)).scalar_one_or_none()
|
|
if bounce_log:
|
|
bounced_before = True
|
|
|
|
today_start = _utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
|
|
sent_today_count = int((await session.execute(
|
|
select(func.count()).select_from(EmailSendLog).where(
|
|
EmailSendLog.status == "sent",
|
|
EmailSendLog.sent_at >= today_start,
|
|
)
|
|
)).scalar() or 0)
|
|
|
|
# Last batch timestamp
|
|
last = (await session.execute(
|
|
select(EmailSendLog.sent_at).where(
|
|
EmailSendLog.status == "sent"
|
|
).order_by(EmailSendLog.sent_at.desc()).limit(1)
|
|
)).scalar_one_or_none()
|
|
seconds_since_last = None
|
|
if last is not None:
|
|
seconds_since_last = (_utcnow() - last).total_seconds()
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("compliance_gather_failed err=%s", exc)
|
|
return {"db_error": str(exc)}
|
|
|
|
return {
|
|
"sup_emails": sup_emails,
|
|
"sup_domains": sup_domains,
|
|
"contact_opt_out": contact_opt_out,
|
|
"bounced_before": bounced_before,
|
|
"risk_score": risk_score,
|
|
"allowed_use": allowed_use,
|
|
"sent_today_count": sent_today_count,
|
|
"seconds_since_last_batch": seconds_since_last,
|
|
}
|
|
|
|
|
|
@router.post("/send-approved")
|
|
async def send_approved(body: dict[str, Any] = Body(...)) -> dict[str, Any]:
|
|
"""
|
|
Send a single approved row. Body:
|
|
to_email (required)
|
|
subject (required)
|
|
body_plain (required)
|
|
account_id (optional)
|
|
queue_id (optional — if from outreach_queue, marks as sent on success)
|
|
sequence_step (default 0)
|
|
force (default False — skips DB compliance, NEVER skips Gmail config check)
|
|
"""
|
|
to_email = str(body.get("to_email") or "").strip()
|
|
subject = str(body.get("subject") or "").strip()
|
|
body_plain = str(body.get("body_plain") or "").strip()
|
|
if not to_email or not subject or not body_plain:
|
|
raise HTTPException(400, "to_email/subject/body_plain required")
|
|
|
|
account_id = body.get("account_id")
|
|
queue_id = body.get("queue_id")
|
|
seq_step = int(body.get("sequence_step") or 0)
|
|
|
|
if not gmail_is_configured():
|
|
return {"status": "blocked_compliance",
|
|
"reasons": ["gmail_not_configured"],
|
|
"next_action": "POST /api/v1/email/connect/gmail"}
|
|
|
|
# Compliance gate
|
|
inputs = await _gather_compliance_inputs(to_email=to_email, account_id=account_id)
|
|
if inputs.get("db_error"):
|
|
return {"status": "skipped_db_unreachable", "error": inputs["db_error"]}
|
|
|
|
chk = check_outreach(
|
|
to_email=to_email,
|
|
contact_opt_out=inputs["contact_opt_out"],
|
|
risk_score=inputs["risk_score"],
|
|
allowed_use=inputs["allowed_use"],
|
|
suppression_emails=inputs["sup_emails"],
|
|
suppression_domains=inputs["sup_domains"],
|
|
bounced_before=inputs["bounced_before"],
|
|
sent_today_count=inputs["sent_today_count"],
|
|
sent_in_current_batch=0,
|
|
seconds_since_last_batch=inputs.get("seconds_since_last_batch"),
|
|
)
|
|
if not chk.allowed:
|
|
# Persist a compliance-blocked log row
|
|
async with async_session_factory() as session:
|
|
session.add(EmailSendLog(
|
|
id=_new_id("es_"),
|
|
account_id=account_id, queue_id=queue_id,
|
|
to_email=to_email, subject=subject[:500],
|
|
body_preview=body_plain[:500],
|
|
sender_email=os.getenv("GMAIL_SENDER_EMAIL", ""),
|
|
status="blocked_compliance",
|
|
sequence_step=seq_step,
|
|
compliance_check=chk.to_dict(),
|
|
))
|
|
try:
|
|
await session.commit()
|
|
except Exception:
|
|
await session.rollback()
|
|
return {"status": "blocked_compliance", "reasons": chk.blocked_reasons}
|
|
|
|
# Append opt-out line and send
|
|
final_body = append_opt_out_line(body_plain)
|
|
result = await send_email(
|
|
to_email=to_email,
|
|
subject=subject,
|
|
body_plain=final_body,
|
|
sender_name=body.get("sender_name") or "Sami | Dealix",
|
|
)
|
|
|
|
# Persist log
|
|
async with async_session_factory() as session:
|
|
log_row = EmailSendLog(
|
|
id=_new_id("es_"),
|
|
account_id=account_id, queue_id=queue_id,
|
|
to_email=to_email, subject=subject[:500],
|
|
body_preview=final_body[:500],
|
|
sender_email=os.getenv("GMAIL_SENDER_EMAIL", ""),
|
|
status="sent" if result.status == "ok" else "failed",
|
|
gmail_message_id=result.gmail_message_id,
|
|
sent_at=_utcnow() if result.status == "ok" else None,
|
|
sequence_step=seq_step,
|
|
compliance_check=chk.to_dict(),
|
|
bounce_reason=result.error if result.status != "ok" else None,
|
|
)
|
|
session.add(log_row)
|
|
if queue_id and result.status == "ok":
|
|
qr = (await session.execute(
|
|
select(OutreachQueueRecord).where(OutreachQueueRecord.id == queue_id)
|
|
)).scalar_one_or_none()
|
|
if qr:
|
|
qr.status = "sent"
|
|
qr.sent_at = _utcnow()
|
|
try:
|
|
await session.commit()
|
|
except Exception as exc: # noqa: BLE001
|
|
await session.rollback()
|
|
return {"status": "commit_failed", "send_status": result.status, "error": str(exc)}
|
|
|
|
return {
|
|
"status": result.status,
|
|
"gmail_message_id": result.gmail_message_id,
|
|
"send_log_id": log_row.id,
|
|
"error": result.error,
|
|
}
|
|
|
|
|
|
@router.post("/send-batch")
|
|
async def send_batch(body: dict[str, Any] = Body(default={})) -> dict[str, Any]:
|
|
"""
|
|
Send up to BATCH_SIZE approved rows from the outreach queue.
|
|
Body:
|
|
max: int (default = EMAIL_BATCH_SIZE)
|
|
only_status: 'approved' (default) — skips 'queued' which still need approval
|
|
"""
|
|
max_n = int(body.get("max") or get_batch_size())
|
|
only_status = str(body.get("only_status") or "approved")
|
|
if max_n < 1 or max_n > 50:
|
|
raise HTTPException(400, "max_out_of_range: 1..50")
|
|
|
|
if not gmail_is_configured():
|
|
return {"status": "blocked", "reason": "gmail_not_configured",
|
|
"next_action": "POST /api/v1/email/connect/gmail"}
|
|
|
|
sent: list[dict[str, Any]] = []
|
|
blocked: list[dict[str, Any]] = []
|
|
|
|
async with async_session_factory() as session:
|
|
try:
|
|
rows = (await session.execute(
|
|
select(OutreachQueueRecord).where(
|
|
OutreachQueueRecord.status == only_status,
|
|
OutreachQueueRecord.channel.in_(["email", "email_warm", "email_followup"]),
|
|
).limit(max_n)
|
|
)).scalars().all()
|
|
except Exception as exc: # noqa: BLE001
|
|
return {"status": "skipped_db_unreachable", "error": str(exc)}
|
|
|
|
for r in rows:
|
|
# Need to fetch contact email for the account
|
|
async with async_session_factory() as s2:
|
|
try:
|
|
contact = (await s2.execute(
|
|
select(ContactRecord).where(
|
|
ContactRecord.account_id == r.lead_id,
|
|
ContactRecord.email.is_not(None),
|
|
ContactRecord.opt_out == False, # noqa: E712
|
|
).limit(1)
|
|
)).scalar_one_or_none()
|
|
acc = (await s2.execute(
|
|
select(AccountRecord).where(AccountRecord.id == r.lead_id)
|
|
)).scalar_one_or_none()
|
|
except Exception as exc: # noqa: BLE001
|
|
blocked.append({"queue_id": r.id, "reason": f"db: {exc}"})
|
|
continue
|
|
|
|
if not contact or not contact.email:
|
|
blocked.append({"queue_id": r.id, "reason": "no_contact_email"})
|
|
continue
|
|
|
|
subject = f"Dealix — تجربة تأهيل عملاء لـ {(acc.company_name if acc else 'فريقكم')[:60]}"
|
|
|
|
send_result = await send_approved.__wrapped__({} if False else {
|
|
"to_email": contact.email,
|
|
"subject": subject,
|
|
"body_plain": r.message,
|
|
"account_id": r.lead_id,
|
|
"queue_id": r.id,
|
|
"sequence_step": 0,
|
|
}) if False else None # FastAPI doesn't expose __wrapped__; we re-call helper inline
|
|
|
|
# Inline send instead of recursive HTTP-style call
|
|
import asyncio as _asyncio
|
|
send_payload = {
|
|
"to_email": contact.email,
|
|
"subject": subject,
|
|
"body_plain": r.message,
|
|
"account_id": r.lead_id,
|
|
"queue_id": r.id,
|
|
"sequence_step": 0,
|
|
}
|
|
try:
|
|
send_result = await send_approved(send_payload)
|
|
except HTTPException as he:
|
|
blocked.append({"queue_id": r.id, "reason": f"http: {he.detail}"})
|
|
continue
|
|
|
|
if send_result.get("status") == "ok":
|
|
sent.append({"queue_id": r.id, "to": contact.email,
|
|
"gmail_message_id": send_result.get("gmail_message_id")})
|
|
else:
|
|
blocked.append({
|
|
"queue_id": r.id, "to": contact.email,
|
|
"status": send_result.get("status"),
|
|
"reasons": send_result.get("reasons") or [send_result.get("error")],
|
|
})
|
|
|
|
return {
|
|
"status": "ok",
|
|
"sent_count": len(sent),
|
|
"blocked_count": len(blocked),
|
|
"sent": sent,
|
|
"blocked": blocked,
|
|
"limits": {
|
|
"batch_size": get_batch_size(),
|
|
"daily_limit": get_daily_limit(),
|
|
},
|
|
}
|
|
|
|
|
|
@router.post("/replies/sync")
|
|
async def replies_sync(body: dict[str, Any] = Body(...)) -> dict[str, Any]:
|
|
"""
|
|
Manual reply ingestion endpoint (until Gmail Pub/Sub is wired).
|
|
Body:
|
|
to_email: original recipient (the prospect)
|
|
from_email: sender of the reply (their address)
|
|
subject: reply subject
|
|
text: reply body
|
|
original_send_log_id: optional
|
|
"""
|
|
text = str(body.get("text") or "").strip()
|
|
from_email = str(body.get("from_email") or "").strip().lower()
|
|
if not text or not from_email:
|
|
raise HTTPException(400, "from_email and text required")
|
|
|
|
classification = await classify_reply(text)
|
|
|
|
async with async_session_factory() as session:
|
|
try:
|
|
log_row = None
|
|
if body.get("original_send_log_id"):
|
|
log_row = (await session.execute(
|
|
select(EmailSendLog).where(EmailSendLog.id == body["original_send_log_id"])
|
|
)).scalar_one_or_none()
|
|
if log_row is None:
|
|
# Find by recipient = from_email of reply
|
|
log_row = (await session.execute(
|
|
select(EmailSendLog).where(
|
|
EmailSendLog.to_email == from_email,
|
|
EmailSendLog.status == "sent",
|
|
).order_by(EmailSendLog.sent_at.desc()).limit(1)
|
|
)).scalar_one_or_none()
|
|
if log_row:
|
|
log_row.status = "replied"
|
|
log_row.reply_classification = classification.category
|
|
log_row.reply_received_at = _utcnow()
|
|
|
|
# If unsubscribe → add to suppression
|
|
if classification.category == "unsubscribe":
|
|
session.add(SuppressionRecord(
|
|
id=_new_id("sup_"),
|
|
email=from_email, phone=None, domain=None,
|
|
reason="opt_out_via_reply",
|
|
))
|
|
await session.commit()
|
|
except Exception as exc: # noqa: BLE001
|
|
await session.rollback()
|
|
return {"status": "skipped_db_unreachable", "error": str(exc),
|
|
"classification": classification.to_dict()}
|
|
|
|
return {
|
|
"status": "ok",
|
|
"classification": classification.to_dict(),
|
|
"matched_send_log": getattr(log_row, "id", None) if log_row else None,
|
|
}
|