feat: Add Strategic Deals Engine — autonomous B2B deal-making system

Revolutionary AI system for autonomous B2B partnerships, negotiations, and deals:

Models (strategic_deal.py - 238 lines):
- CompanyProfile: Rich Saudi company profiles with CR, capabilities, needs
- StrategicDeal: Full deal lifecycle (discovery → negotiation → close)
- DealMatch: AI-generated company matches with scoring

Services (4 files, ~2,060 lines):
- company_profiler.py: Profile creation, AI enrichment, needs/capability analysis
- deal_matcher.py: 6-dimension scoring, semantic matching, barter chain discovery
- deal_negotiator.py: Multi-round Arabic negotiation with cultural awareness
- deal_agent.py: Autonomous outreach via WhatsApp/LinkedIn/Email

API (strategic_deals.py - 681 lines, 16 endpoints):
- Profile management + AI enrichment
- Match discovery + approval
- Deal lifecycle (create → negotiate → proposal → term sheet → close)
- Barter chain scanning
- Analytics dashboard

Deal types: partnership, distribution, franchise, JV, referral, acquisition, barter
Channels: WhatsApp (primary), LinkedIn, Email
Languages: Arabic (Saudi dialect) + English
Cultural: Saudi negotiation norms, relationship-first, face-saving

https://claude.ai/code/session_01LsnvBa7HwF5hs99VZbgLGj
This commit is contained in:
Claude 2026-04-11 09:15:29 +00:00
parent 06a4e5c79c
commit d7a5af9156
No known key found for this signature in database
10 changed files with 3011 additions and 0 deletions

View File

@ -17,6 +17,7 @@ from app.api.v1 import pipeline as pipeline_router
from app.api.v1 import agent_system as agent_system_router
from app.api.v1 import autonomous_foundation as autonomous_foundation_router
from app.api.v1 import hermes as hermes_router
from app.api.v1 import strategic_deals as strategic_deals_router
from app.api.v1 import marketing_hub as marketing_hub_router
from app.api.v1 import strategy_summary as strategy_summary_router
from app.api.v1 import value_proposition as value_proposition_router
@ -90,3 +91,6 @@ api_router.include_router(autonomous_foundation_router.router)
# ── Hermes Fusion — Orchestration Layer ──────────────────────
api_router.include_router(hermes_router.router)
# ── Strategic Deals — B2B Deal Discovery & Negotiation ───────
api_router.include_router(strategic_deals_router.router)

View File

@ -0,0 +1,681 @@
"""
Strategic Deals API B2B deal discovery, matching, negotiation, and outreach.
واجهة الصفقات الاستراتيجية: اكتشاف وتوفيق وتفاوض وتواصل الشراكات
"""
from datetime import datetime, timezone
from decimal import Decimal
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel as Schema, Field
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.api.deps import get_current_user
from app.models.user import User
from app.models.strategic_deal import (
CompanyProfile, StrategicDeal, DealMatch,
DealStatus, DealType, DealChannel, MatchStatus,
)
from app.services.strategic_deals.company_profiler import CompanyProfiler
from app.services.strategic_deals.deal_matcher import DealMatcher
from app.services.strategic_deals.deal_negotiator import DealNegotiator, NegotiationStrategy
from app.services.strategic_deals.deal_agent import DealAgent
router = APIRouter(prefix="/strategic-deals", tags=["Strategic Deals"])
# ── Pydantic Schemas ─────────────────────────────────────────────────────────
class ProfileCreate(Schema):
company_name: str
company_name_ar: Optional[str] = None
industry: Optional[str] = None
sub_industry: Optional[str] = None
cr_number: Optional[str] = None
city: Optional[str] = None
region: Optional[str] = None
employee_count: Optional[int] = None
annual_revenue_sar: Optional[float] = None
capabilities: list[str] = []
needs: list[str] = []
deal_preferences: dict = {}
website: Optional[str] = None
linkedin_url: Optional[str] = None
whatsapp_number: Optional[str] = None
class ProfileResponse(Schema):
id: UUID
tenant_id: UUID
company_name: str
company_name_ar: Optional[str] = None
industry: Optional[str] = None
sub_industry: Optional[str] = None
cr_number: Optional[str] = None
city: Optional[str] = None
region: Optional[str] = None
employee_count: Optional[float] = None
annual_revenue_sar: Optional[float] = None
capabilities: list = []
needs: list = []
deal_preferences: dict = {}
website: Optional[str] = None
linkedin_url: Optional[str] = None
whatsapp_number: Optional[str] = None
trust_score: float = 0.0
is_verified: bool = False
created_at: datetime
updated_at: Optional[datetime] = None
model_config = {"from_attributes": True}
class NeedsAnalysisRequest(Schema):
description: str = Field(..., description="وصف الاحتياجات بالعربي أو الإنجليزي")
class DealCreate(Schema):
initiator_profile_id: UUID
target_profile_id: Optional[UUID] = None
target_company_name: Optional[str] = None
target_contact_phone: Optional[str] = None
target_contact_email: Optional[str] = None
deal_type: str = "partnership"
deal_title: str
deal_title_ar: Optional[str] = None
our_offer: Optional[str] = None
our_need: Optional[str] = None
proposed_terms: dict = {}
estimated_value_sar: Optional[float] = None
channel: str = "whatsapp"
class DealResponse(Schema):
id: UUID
tenant_id: UUID
initiator_profile_id: UUID
target_profile_id: Optional[UUID] = None
target_company_name: Optional[str] = None
target_contact_phone: Optional[str] = None
target_contact_email: Optional[str] = None
deal_type: str
deal_title: str
deal_title_ar: Optional[str] = None
our_offer: Optional[str] = None
our_need: Optional[str] = None
proposed_terms: dict = {}
agreed_terms: dict = {}
estimated_value_sar: Optional[float] = None
status: str
channel: str
ai_confidence: float = 0.0
negotiation_history: list = []
notes: Optional[str] = None
notes_ar: Optional[str] = None
created_at: datetime
updated_at: Optional[datetime] = None
closed_at: Optional[datetime] = None
model_config = {"from_attributes": True}
class MatchResponse(Schema):
id: UUID
tenant_id: UUID
company_a_id: UUID
company_b_id: Optional[UUID] = None
company_b_name: Optional[str] = None
company_b_data: dict = {}
match_score: float = 0.0
match_reasons: list = []
deal_type_suggested: Optional[str] = None
terms_suggested: dict = {}
status: str
created_at: datetime
model_config = {"from_attributes": True}
class NegotiateRequest(Schema):
their_terms: Optional[dict] = None
message: Optional[str] = None
strategy: Optional[NegotiationStrategy] = None
class OutreachRequest(Schema):
channel: str = "whatsapp"
style: str = "as_company"
class DiscoveryScanRequest(Schema):
profile_id: UUID
deal_type: Optional[str] = None
class BarterScanRequest(Schema):
profile_id: UUID
# ── Profile Endpoints ────────────────────────────────────────────────────────
@router.post("/profiles", response_model=ProfileResponse, status_code=201)
async def create_profile(
data: ProfileCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a company profile for B2B matching. | إنشاء ملف شركة للمطابقة"""
profiler = CompanyProfiler()
profile = await profiler.create_profile(
company_data=data.model_dump(),
tenant_id=current_user.tenant_id,
db=db,
)
return ProfileResponse.model_validate(profile)
@router.put("/profiles/{profile_id}/enrich", response_model=ProfileResponse)
async def enrich_profile(
profile_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""AI-enrich a company profile. | إثراء ملف الشركة بالذكاء الاصطناعي"""
result = await db.execute(
select(CompanyProfile).where(
CompanyProfile.id == profile_id,
CompanyProfile.tenant_id == current_user.tenant_id,
)
)
if not result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="الملف غير موجود | Profile not found")
profiler = CompanyProfiler()
profile = await profiler.enrich_profile(profile_id, db)
return ProfileResponse.model_validate(profile)
@router.post("/profiles/{profile_id}/analyze-needs")
async def analyze_needs(
profile_id: UUID,
data: NeedsAnalysisRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Analyze what a company needs (Arabic input). | تحليل احتياجات الشركة"""
result = await db.execute(
select(CompanyProfile).where(
CompanyProfile.id == profile_id,
CompanyProfile.tenant_id == current_user.tenant_id,
)
)
if not result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="الملف غير موجود | Profile not found")
profiler = CompanyProfiler()
analysis = await profiler.analyze_needs(profile_id, data.description, db)
return {"status": "ok", "analysis": analysis}
# ── Match Endpoints ──────────────────────────────────────────────────────────
@router.get("/matches", response_model=list[MatchResponse])
async def list_matches(
profile_id: UUID = Query(None, description="Filter by company profile"),
status: str = Query(None, description="Filter by match status"),
min_score: float = Query(None, ge=0, le=1, description="Minimum match score"),
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get AI-suggested matches. | عرض المطابقات المقترحة بالذكاء الاصطناعي"""
query = select(DealMatch).where(DealMatch.tenant_id == current_user.tenant_id)
if profile_id:
query = query.where(
(DealMatch.company_a_id == profile_id) | (DealMatch.company_b_id == profile_id)
)
if status:
query = query.where(DealMatch.status == status)
if min_score is not None:
query = query.where(DealMatch.match_score >= min_score)
query = query.order_by(DealMatch.match_score.desc()).offset((page - 1) * per_page).limit(per_page)
result = await db.execute(query)
return [MatchResponse.model_validate(m) for m in result.scalars().all()]
@router.post("/matches/{match_id}/approve", response_model=MatchResponse)
async def approve_match(
match_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Approve a match for outreach. | الموافقة على مطابقة للتواصل"""
result = await db.execute(
select(DealMatch).where(
DealMatch.id == match_id,
DealMatch.tenant_id == current_user.tenant_id,
)
)
match = result.scalar_one_or_none()
if not match:
raise HTTPException(status_code=404, detail="المطابقة غير موجودة | Match not found")
if match.status != MatchStatus.SUGGESTED.value:
raise HTTPException(status_code=400, detail="المطابقة تمت الموافقة عليها مسبقاً | Match already processed")
match.status = MatchStatus.APPROVED.value
await db.flush()
await db.refresh(match)
return MatchResponse.model_validate(match)
# ── Discovery Scan ───────────────────────────────────────────────────────────
@router.post("/scan", response_model=list[MatchResponse])
async def run_discovery_scan(
data: DiscoveryScanRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Run a full AI discovery scan for partners. | تشغيل فحص اكتشاف شامل"""
result = await db.execute(
select(CompanyProfile).where(
CompanyProfile.id == data.profile_id,
CompanyProfile.tenant_id == current_user.tenant_id,
)
)
if not result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="الملف غير موجود | Profile not found")
agent = DealAgent()
matches = await agent.run_discovery_scan(data.profile_id, data.deal_type, db)
return [MatchResponse.model_validate(m) for m in matches]
# ── Deal CRUD ────────────────────────────────────────────────────────────────
@router.post("", response_model=DealResponse, status_code=201)
async def create_deal(
data: DealCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a strategic deal. | إنشاء صفقة استراتيجية"""
# Verify initiator profile belongs to tenant
init_result = await db.execute(
select(CompanyProfile).where(
CompanyProfile.id == data.initiator_profile_id,
CompanyProfile.tenant_id == current_user.tenant_id,
)
)
if not init_result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="ملف المبادر غير موجود | Initiator profile not found")
deal = StrategicDeal(
tenant_id=current_user.tenant_id,
initiator_profile_id=data.initiator_profile_id,
target_profile_id=data.target_profile_id,
target_company_name=data.target_company_name,
target_contact_phone=data.target_contact_phone,
target_contact_email=data.target_contact_email,
deal_type=data.deal_type,
deal_title=data.deal_title,
deal_title_ar=data.deal_title_ar,
our_offer=data.our_offer,
our_need=data.our_need,
proposed_terms=data.proposed_terms,
estimated_value_sar=Decimal(str(data.estimated_value_sar)) if data.estimated_value_sar else None,
status=DealStatus.DISCOVERY.value,
channel=data.channel,
ai_confidence=0.0,
negotiation_history=[],
)
db.add(deal)
await db.flush()
await db.refresh(deal)
return DealResponse.model_validate(deal)
@router.get("", response_model=list[DealResponse])
async def list_deals(
status: str = Query(None),
deal_type: str = Query(None),
profile_id: UUID = Query(None, description="Filter by initiator or target profile"),
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List strategic deals with filters. | عرض الصفقات الاستراتيجية"""
query = select(StrategicDeal).where(StrategicDeal.tenant_id == current_user.tenant_id)
if status:
query = query.where(StrategicDeal.status == status)
if deal_type:
query = query.where(StrategicDeal.deal_type == deal_type)
if profile_id:
query = query.where(
(StrategicDeal.initiator_profile_id == profile_id)
| (StrategicDeal.target_profile_id == profile_id)
)
query = query.order_by(StrategicDeal.created_at.desc()).offset((page - 1) * per_page).limit(per_page)
result = await db.execute(query)
return [DealResponse.model_validate(d) for d in result.scalars().all()]
@router.get("/{deal_id}", response_model=DealResponse)
async def get_deal(
deal_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get deal details with negotiation history. | تفاصيل الصفقة مع سجل التفاوض"""
result = await db.execute(
select(StrategicDeal).where(
StrategicDeal.id == deal_id,
StrategicDeal.tenant_id == current_user.tenant_id,
)
)
deal = result.scalar_one_or_none()
if not deal:
raise HTTPException(status_code=404, detail="الصفقة غير موجودة | Deal not found")
return DealResponse.model_validate(deal)
# ── Negotiation ──────────────────────────────────────────────────────────────
@router.put("/{deal_id}/negotiate")
async def negotiate_deal(
deal_id: UUID,
data: NegotiateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Submit terms, counter-offer, or free-text message. | تقديم شروط أو عرض مضاد"""
result = await db.execute(
select(StrategicDeal).where(
StrategicDeal.id == deal_id,
StrategicDeal.tenant_id == current_user.tenant_id,
)
)
deal = result.scalar_one_or_none()
if not deal:
raise HTTPException(status_code=404, detail="الصفقة غير موجودة | Deal not found")
negotiator = DealNegotiator()
# Check if we should escalate
should_escalate = await negotiator.should_escalate(deal_id, db)
if should_escalate:
return {
"status": "escalation_required",
"message_ar": "هذه الصفقة تحتاج تدخل بشري. يرجى التواصل مع مدير الحساب.",
"message_en": "This deal requires human intervention. Please contact the account manager.",
}
# Start new negotiation
if data.strategy and not deal.negotiation_history:
round_data = await negotiator.start_negotiation(deal_id, data.strategy, db)
return {
"status": "negotiation_started",
"round": round_data.round_number,
"action": round_data.action,
"our_terms": round_data.our_terms,
"message_ar": round_data.message_ar,
"message_en": round_data.message_en,
"confidence": round_data.confidence,
}
# Handle counter-offer
if data.their_terms:
round_data = await negotiator.handle_counter_offer(deal_id, data.their_terms, db)
return {
"status": "counter_processed",
"round": round_data.round_number,
"action": round_data.action,
"our_terms": round_data.our_terms,
"message_ar": round_data.message_ar,
"message_en": round_data.message_en,
"within_range": round_data.within_range,
"confidence": round_data.confidence,
}
# Handle free-text message
if data.message:
response = await negotiator.generate_response(deal_id, data.message, db)
return {
"status": "response_generated",
"response": response,
}
raise HTTPException(
status_code=400,
detail="يرجى تقديم شروط أو رسالة أو استراتيجية | Provide terms, message, or strategy",
)
# ── Outreach ─────────────────────────────────────────────────────────────────
@router.post("/{deal_id}/outreach")
async def send_outreach(
deal_id: UUID,
data: OutreachRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Send outreach via channel. | إرسال تواصل عبر قناة"""
# Find the deal and verify ownership
deal_result = await db.execute(
select(StrategicDeal).where(
StrategicDeal.id == deal_id,
StrategicDeal.tenant_id == current_user.tenant_id,
)
)
deal = deal_result.scalar_one_or_none()
if not deal:
raise HTTPException(status_code=404, detail="الصفقة غير موجودة | Deal not found")
# Find or create a match for this deal to run outreach
match_result = await db.execute(
select(DealMatch).where(
DealMatch.company_a_id == deal.initiator_profile_id,
DealMatch.tenant_id == current_user.tenant_id,
).order_by(DealMatch.match_score.desc()).limit(1)
)
match = match_result.scalar_one_or_none()
if not match:
# Create a placeholder match for outreach
match = DealMatch(
tenant_id=current_user.tenant_id,
company_a_id=deal.initiator_profile_id,
company_b_id=deal.target_profile_id,
company_b_name=deal.target_company_name,
match_score=deal.ai_confidence or 0.5,
match_reasons=["تواصل مباشر من المستخدم"],
deal_type_suggested=deal.deal_type,
status=MatchStatus.APPROVED.value,
)
db.add(match)
await db.flush()
agent = DealAgent()
result = await agent.run_outreach_campaign(match.id, data.channel, db)
return {
"status": "sent" if result.success else "failed",
"channel": result.channel,
"message_sent": result.message_sent,
"next_action_ar": result.next_action_ar,
"error": result.error,
}
# ── Proposal & Term Sheet ───────────────────────────────────────────────────
@router.post("/{deal_id}/proposal")
async def generate_proposal(
deal_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Generate an Arabic business proposal. | إنشاء مقترح أعمال بالعربي"""
result = await db.execute(
select(StrategicDeal).where(
StrategicDeal.id == deal_id,
StrategicDeal.tenant_id == current_user.tenant_id,
)
)
if not result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="الصفقة غير موجودة | Deal not found")
agent = DealAgent()
proposal = await agent.generate_proposal(deal_id, db)
return {"status": "ok", "proposal": proposal}
@router.post("/{deal_id}/term-sheet")
async def generate_term_sheet(
deal_id: UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Generate an Arabic term sheet. | إنشاء ورقة شروط بالعربي"""
result = await db.execute(
select(StrategicDeal).where(
StrategicDeal.id == deal_id,
StrategicDeal.tenant_id == current_user.tenant_id,
)
)
if not result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="الصفقة غير موجودة | Deal not found")
negotiator = DealNegotiator()
term_sheet = await negotiator.generate_term_sheet(deal_id, db)
return {"status": "ok", "term_sheet": term_sheet}
# ── Analytics ────────────────────────────────────────────────────────────────
@router.get("/analytics/overview")
async def deal_analytics(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Deal flow analytics: match rate, close rate, avg deal value. | تحليلات الصفقات"""
tenant_id = current_user.tenant_id
# Total deals
total_q = select(func.count()).select_from(StrategicDeal).where(
StrategicDeal.tenant_id == tenant_id,
)
total_deals = (await db.execute(total_q)).scalar() or 0
# By status
status_q = select(
StrategicDeal.status, func.count()
).where(
StrategicDeal.tenant_id == tenant_id,
).group_by(StrategicDeal.status)
status_rows = (await db.execute(status_q)).all()
by_status = {row[0]: row[1] for row in status_rows}
won = by_status.get(DealStatus.CLOSED_WON.value, 0)
lost = by_status.get(DealStatus.CLOSED_LOST.value, 0)
closed = won + lost
close_rate = (won / closed * 100) if closed > 0 else 0.0
# Average deal value (closed won)
avg_val_q = select(func.avg(StrategicDeal.estimated_value_sar)).where(
StrategicDeal.tenant_id == tenant_id,
StrategicDeal.status == DealStatus.CLOSED_WON.value,
)
avg_value = (await db.execute(avg_val_q)).scalar()
avg_value_float = float(avg_value) if avg_value else 0.0
# Total matches and conversion
total_matches_q = select(func.count()).select_from(DealMatch).where(
DealMatch.tenant_id == tenant_id,
)
total_matches = (await db.execute(total_matches_q)).scalar() or 0
converted_q = select(func.count()).select_from(DealMatch).where(
DealMatch.tenant_id == tenant_id,
DealMatch.status == MatchStatus.CONVERTED.value,
)
converted_matches = (await db.execute(converted_q)).scalar() or 0
match_rate = (converted_matches / total_matches * 100) if total_matches > 0 else 0.0
# By deal type
type_q = select(
StrategicDeal.deal_type, func.count()
).where(
StrategicDeal.tenant_id == tenant_id,
).group_by(StrategicDeal.deal_type)
type_rows = (await db.execute(type_q)).all()
by_type = {row[0]: row[1] for row in type_rows}
return {
"total_deals": total_deals,
"by_status": by_status,
"close_rate_percent": round(close_rate, 1),
"avg_deal_value_sar": round(avg_value_float, 2),
"total_matches": total_matches,
"converted_matches": converted_matches,
"match_conversion_rate_percent": round(match_rate, 1),
"by_deal_type": by_type,
"labels_ar": {
"total_deals": "إجمالي الصفقات",
"close_rate": "نسبة الإغلاق",
"avg_value": "متوسط قيمة الصفقة",
"match_rate": "نسبة تحول المطابقات",
},
}
# ── Barter Scan ──────────────────────────────────────────────────────────────
@router.post("/barter-scan")
async def barter_scan(
data: BarterScanRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Find multi-party barter opportunities. | اكتشاف فرص المقايضة المتعددة"""
result = await db.execute(
select(CompanyProfile).where(
CompanyProfile.id == data.profile_id,
CompanyProfile.tenant_id == current_user.tenant_id,
)
)
if not result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="الملف غير موجود | Profile not found")
matcher = DealMatcher()
chains = await matcher.find_barter_chains(data.profile_id, db)
return {
"status": "ok",
"chains_found": len(chains),
"chains": chains,
"summary_ar": (
f"تم العثور على {len(chains)} سلسلة مقايضة محتملة"
if chains
else "لم يتم العثور على فرص مقايضة. حاول إضافة المزيد من القدرات والاحتياجات في ملفك."
),
}

View File

@ -25,6 +25,7 @@ from app.models.knowledge import KnowledgeArticle, SectorAsset
from app.models.advanced import TrustScore, Prospect, Scorecard, AIRehearsal
from app.models.consent import PDPLConsent, PDPLConsentAudit, DataRequest
from app.models.sequence import Sequence, SequenceStep, SequenceEnrollment, SequenceEvent
from app.models.strategic_deal import CompanyProfile, StrategicDeal, DealMatch
__all__ = [
"BaseModel", "TenantModel", "Tenant", "User", "Lead", "Customer",
@ -39,4 +40,5 @@ __all__ = [
"TrustScore", "Prospect", "Scorecard", "AIRehearsal",
"PDPLConsent", "PDPLConsentAudit", "DataRequest",
"Sequence", "SequenceStep", "SequenceEnrollment", "SequenceEvent",
"CompanyProfile", "StrategicDeal", "DealMatch",
]

View File

@ -0,0 +1,238 @@
"""
Strategic Deal Models B2B deal discovery, matching, and negotiation.
نماذج الصفقات الاستراتيجية: اكتشاف وتوفيق وتفاوض الشراكات بين الشركات
"""
import enum
from datetime import datetime, timezone
from sqlalchemy import Column, String, Text, DateTime, Boolean, Float, Numeric, ForeignKey, Index
from sqlalchemy.orm import relationship
from app.models.base import TenantModel
from app.models.compat import UUID, JSONB, default_uuid
# ── Enums ────────────────────────────────────────────────────────────────────
class DealType(str, enum.Enum):
PARTNERSHIP = "partnership"
DISTRIBUTION = "distribution"
FRANCHISE = "franchise"
JOINT_VENTURE = "jv"
REFERRAL = "referral"
ACQUISITION = "acquisition"
BARTER = "barter"
class DealStatus(str, enum.Enum):
DISCOVERY = "discovery"
OUTREACH = "outreach"
NEGOTIATING = "negotiating"
TERM_SHEET = "term_sheet"
DUE_DILIGENCE = "due_diligence"
CLOSED_WON = "closed_won"
CLOSED_LOST = "closed_lost"
class DealChannel(str, enum.Enum):
WHATSAPP = "whatsapp"
LINKEDIN = "linkedin"
EMAIL = "email"
IN_PERSON = "in_person"
class MatchStatus(str, enum.Enum):
SUGGESTED = "suggested"
APPROVED = "approved"
OUTREACH_SENT = "outreach_sent"
IN_PROGRESS = "in_progress"
CONVERTED = "converted"
REJECTED = "rejected"
# ── Company Profile ──────────────────────────────────────────────────────────
class CompanyProfile(TenantModel):
"""
Rich company profile for B2B matching.
ملف الشركة الغني للمطابقة بين الشركات
"""
__tablename__ = "company_profiles"
__table_args__ = (
Index("ix_company_profiles_industry", "industry"),
Index("ix_company_profiles_region", "region"),
Index("ix_company_profiles_verified", "is_verified"),
)
company_name = Column(String(255), nullable=False, index=True)
company_name_ar = Column(String(255), nullable=True)
# Industry classification (ISIC codes)
industry = Column(String(100), nullable=True)
sub_industry = Column(String(100), nullable=True)
# Saudi Commercial Registration
cr_number = Column(String(20), nullable=True, unique=True)
# Location
city = Column(String(100), nullable=True)
region = Column(String(100), nullable=True) # Saudi administrative regions
# Size indicators
employee_count = Column(Numeric(10, 0), nullable=True)
annual_revenue_sar = Column(Numeric(15, 2), nullable=True)
# AI-enriched capability/need vectors (JSONB arrays)
capabilities = Column(JSONB, default=list) # What this company can offer
needs = Column(JSONB, default=list) # What this company needs
# Deal preferences: partnership, acquisition, distribution, referral, barter weights
deal_preferences = Column(JSONB, default=dict)
# Contact & web
website = Column(String(500), nullable=True)
linkedin_url = Column(String(500), nullable=True)
whatsapp_number = Column(String(20), nullable=True)
# Trust & verification
trust_score = Column(Float, default=0.0) # 0-1 from KYB verification
is_verified = Column(Boolean, default=False)
# Relationships
initiated_deals = relationship(
"StrategicDeal",
back_populates="initiator_profile",
foreign_keys="StrategicDeal.initiator_profile_id",
)
targeted_deals = relationship(
"StrategicDeal",
back_populates="target_profile",
foreign_keys="StrategicDeal.target_profile_id",
)
matches_as_a = relationship(
"DealMatch",
back_populates="company_a",
foreign_keys="DealMatch.company_a_id",
)
matches_as_b = relationship(
"DealMatch",
back_populates="company_b",
foreign_keys="DealMatch.company_b_id",
)
# ── Strategic Deal ───────────────────────────────────────────────────────────
class StrategicDeal(TenantModel):
"""
A B2B deal between two companies.
صفقة بين شركتين
"""
__tablename__ = "strategic_deals"
__table_args__ = (
Index("ix_strategic_deals_status", "status"),
Index("ix_strategic_deals_type", "deal_type"),
)
# Parties
initiator_profile_id = Column(
UUID(as_uuid=True), ForeignKey("company_profiles.id"), nullable=False, index=True,
)
target_profile_id = Column(
UUID(as_uuid=True), ForeignKey("company_profiles.id"), nullable=True, index=True,
)
# Target info (when profile doesn't exist yet)
target_company_name = Column(String(255), nullable=True)
target_contact_phone = Column(String(20), nullable=True)
target_contact_email = Column(String(255), nullable=True)
# Deal classification
deal_type = Column(String(30), default=DealType.PARTNERSHIP.value)
deal_title = Column(String(500), nullable=False)
deal_title_ar = Column(String(500), nullable=True)
# Value proposition
our_offer = Column(Text, nullable=True) # What we're offering
our_need = Column(Text, nullable=True) # What we need from them
# Terms
proposed_terms = Column(JSONB, default=dict) # equity_split, revenue_share, territory, exclusivity
agreed_terms = Column(JSONB, default=dict) # Final agreed terms
estimated_value_sar = Column(Numeric(15, 2), nullable=True)
# Status & channel
status = Column(String(30), default=DealStatus.DISCOVERY.value)
channel = Column(String(20), default=DealChannel.WHATSAPP.value)
# AI signals
ai_confidence = Column(Float, default=0.0) # 0-1
# Negotiation audit trail
negotiation_history = Column(JSONB, default=list) # list of round dicts
# Notes
notes = Column(Text, nullable=True)
notes_ar = Column(Text, nullable=True)
closed_at = Column(DateTime(timezone=True), nullable=True)
# Relationships
initiator_profile = relationship(
"CompanyProfile", back_populates="initiated_deals",
foreign_keys=[initiator_profile_id],
)
target_profile = relationship(
"CompanyProfile", back_populates="targeted_deals",
foreign_keys=[target_profile_id],
)
# ── Deal Match ───────────────────────────────────────────────────────────────
class DealMatch(TenantModel):
"""
AI-generated match between two companies.
مطابقة بالذكاء الاصطناعي بين شركتين
"""
__tablename__ = "deal_matches"
__table_args__ = (
Index("ix_deal_matches_score", "match_score"),
Index("ix_deal_matches_status", "status"),
)
company_a_id = Column(
UUID(as_uuid=True), ForeignKey("company_profiles.id"), nullable=False, index=True,
)
company_b_id = Column(
UUID(as_uuid=True), ForeignKey("company_profiles.id"), nullable=True, index=True,
)
# External company data (when company_b has no profile)
company_b_name = Column(String(255), nullable=True)
company_b_data = Column(JSONB, default=dict)
# Scoring
match_score = Column(Float, default=0.0) # 0-1
match_reasons = Column(JSONB, default=list) # Arabic explanations
# AI suggestions
deal_type_suggested = Column(String(30), nullable=True)
terms_suggested = Column(JSONB, default=dict)
# Status
status = Column(String(30), default=MatchStatus.SUGGESTED.value)
# Relationships
company_a = relationship(
"CompanyProfile", back_populates="matches_as_a", foreign_keys=[company_a_id],
)
company_b = relationship(
"CompanyProfile", back_populates="matches_as_b", foreign_keys=[company_b_id],
)

View File

@ -29,6 +29,9 @@ from app.services.memory_engine import (
create_memory_adapter,
)
from app.services.session_continuity import SessionContinuity, session_continuity
from app.services.strategic_deals import (
CompanyProfiler, DealMatcher, DealNegotiator, NegotiationStrategy, DealAgent,
)
__all__ = [
"AuthService",
@ -65,4 +68,9 @@ __all__ = [
"create_memory_adapter",
"SessionContinuity",
"session_continuity",
"CompanyProfiler",
"DealMatcher",
"DealNegotiator",
"NegotiationStrategy",
"DealAgent",
]

View File

@ -0,0 +1,17 @@
"""
Dealix Strategic Deals Engine
محرك الصفقات الاستراتيجية اكتشاف وتفاوض وإغلاق شراكات B2B بالذكاء الاصطناعي
"""
from app.services.strategic_deals.company_profiler import CompanyProfiler
from app.services.strategic_deals.deal_matcher import DealMatcher
from app.services.strategic_deals.deal_negotiator import DealNegotiator, NegotiationStrategy
from app.services.strategic_deals.deal_agent import DealAgent
__all__ = [
"CompanyProfiler",
"DealMatcher",
"DealNegotiator",
"NegotiationStrategy",
"DealAgent",
]

View File

@ -0,0 +1,414 @@
"""
Company Profiler Builds rich company profiles for B2B matching.
محلل الشركات: يبني ملفات شركات غنية للمطابقة بين الشركات
"""
import json
import logging
from typing import Optional
from datetime import datetime, timezone
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.strategic_deal import CompanyProfile
from app.services.llm.provider import get_llm
logger = logging.getLogger("dealix.strategic_deals.profiler")
# ── ISIC industry mapping (common Saudi sectors) ─────────────────────────────
SAUDI_INDUSTRIES = {
"construction": {"ar": "مقاولات وبناء", "isic": "F"},
"real_estate": {"ar": "عقارات", "isic": "L"},
"retail": {"ar": "تجارة تجزئة", "isic": "G"},
"wholesale": {"ar": "تجارة جملة", "isic": "G"},
"technology": {"ar": "تقنية معلومات", "isic": "J"},
"manufacturing": {"ar": "صناعة", "isic": "C"},
"healthcare": {"ar": "رعاية صحية", "isic": "Q"},
"education": {"ar": "تعليم وتدريب", "isic": "P"},
"food_beverage": {"ar": "أغذية ومشروبات", "isic": "I"},
"logistics": {"ar": "نقل ولوجستيات", "isic": "H"},
"finance": {"ar": "خدمات مالية", "isic": "K"},
"energy": {"ar": "طاقة", "isic": "D"},
"tourism": {"ar": "سياحة وضيافة", "isic": "I"},
"consulting": {"ar": "استشارات", "isic": "M"},
"marketing": {"ar": "تسويق وإعلان", "isic": "M"},
"agriculture": {"ar": "زراعة", "isic": "A"},
"telecom": {"ar": "اتصالات", "isic": "J"},
"media": {"ar": "إعلام وترفيه", "isic": "R"},
"automotive": {"ar": "سيارات", "isic": "G"},
"government": {"ar": "قطاع حكومي", "isic": "O"},
}
SAUDI_REGIONS = [
"الرياض", "مكة المكرمة", "المنطقة الشرقية", "المدينة المنورة",
"القصيم", "عسير", "تبوك", "حائل", "الحدود الشمالية",
"جازان", "نجران", "الباحة", "الجوف",
]
class CompanyProfiler:
"""
Builds, enriches, and scores company profiles for strategic B2B matching.
يبني ملفات الشركات ويثريها ويقيمها للمطابقة الاستراتيجية
"""
def __init__(self):
self.llm = get_llm()
# ── Create Profile ───────────────────────────────────────────────────────
async def create_profile(
self,
company_data: dict,
tenant_id,
db: AsyncSession,
) -> CompanyProfile:
"""
Create a company profile from user input.
إنشاء ملف شركة من بيانات المستخدم
"""
profile = CompanyProfile(
tenant_id=tenant_id,
company_name=company_data["company_name"],
company_name_ar=company_data.get("company_name_ar"),
industry=company_data.get("industry"),
sub_industry=company_data.get("sub_industry"),
cr_number=company_data.get("cr_number"),
city=company_data.get("city"),
region=company_data.get("region"),
employee_count=company_data.get("employee_count"),
annual_revenue_sar=company_data.get("annual_revenue_sar"),
capabilities=company_data.get("capabilities", []),
needs=company_data.get("needs", []),
deal_preferences=company_data.get("deal_preferences", {}),
website=company_data.get("website"),
linkedin_url=company_data.get("linkedin_url"),
whatsapp_number=company_data.get("whatsapp_number"),
trust_score=0.0,
is_verified=False,
)
db.add(profile)
await db.flush()
await db.refresh(profile)
logger.info("Created company profile %s for tenant %s", profile.id, tenant_id)
return profile
# ── Enrich Profile with AI ───────────────────────────────────────────────
async def enrich_profile(
self,
profile_id,
db: AsyncSession,
) -> CompanyProfile:
"""
Use LLM to enrich a company profile: analyze website, detect industry,
extract capabilities, identify needs, estimate company size.
إثراء ملف الشركة بالذكاء الاصطناعي
"""
result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == profile_id))
profile = result.scalar_one_or_none()
if not profile:
raise ValueError(f"Profile {profile_id} not found")
# Build context from available data
context_parts = [
f"Company: {profile.company_name}",
]
if profile.company_name_ar:
context_parts.append(f"Arabic name: {profile.company_name_ar}")
if profile.website:
context_parts.append(f"Website: {profile.website}")
if profile.industry:
context_parts.append(f"Industry: {profile.industry}")
if profile.city:
context_parts.append(f"City: {profile.city}")
if profile.cr_number:
context_parts.append(f"CR Number: {profile.cr_number}")
if profile.capabilities:
context_parts.append(f"Known capabilities: {', '.join(profile.capabilities)}")
company_context = "\n".join(context_parts)
system_prompt = """أنت محلل شركات سعودي متخصص. حلل بيانات الشركة التالية وأعد تقريراً مفصلاً بصيغة JSON.
You are a Saudi company analyst. Analyze the following company data and return a detailed JSON report.
Return JSON with these fields:
{
"industry": "industry code from the list",
"sub_industry": "specific sub-industry",
"capabilities": ["list of what this company can offer to partners"],
"needs": ["list of what this company likely needs from partners"],
"estimated_employee_range": "micro/small/medium/large",
"deal_preferences": {
"partnership": 0.0-1.0,
"distribution": 0.0-1.0,
"franchise": 0.0-1.0,
"jv": 0.0-1.0,
"referral": 0.0-1.0,
"acquisition": 0.0-1.0,
"barter": 0.0-1.0
},
"enrichment_notes_ar": "ملاحظات الإثراء بالعربي"
}
Available industries: """ + ", ".join(SAUDI_INDUSTRIES.keys())
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=company_context,
json_mode=True,
temperature=0.3,
)
enrichment = llm_response.parse_json()
if enrichment:
if enrichment.get("industry") and not profile.industry:
profile.industry = enrichment["industry"]
if enrichment.get("sub_industry") and not profile.sub_industry:
profile.sub_industry = enrichment["sub_industry"]
if enrichment.get("capabilities"):
existing_caps = set(profile.capabilities or [])
new_caps = [c for c in enrichment["capabilities"] if c not in existing_caps]
profile.capabilities = list(existing_caps) + new_caps
if enrichment.get("needs"):
existing_needs = set(profile.needs or [])
new_needs = [n for n in enrichment["needs"] if n not in existing_needs]
profile.needs = list(existing_needs) + new_needs
if enrichment.get("deal_preferences") and not profile.deal_preferences:
profile.deal_preferences = enrichment["deal_preferences"]
await db.flush()
await db.refresh(profile)
logger.info("Enriched profile %s with AI analysis", profile_id)
return profile
# ── Analyze Needs (Arabic Input) ─────────────────────────────────────────
async def analyze_needs(
self,
profile_id,
user_description: str,
db: AsyncSession,
) -> dict:
"""
User describes needs in Arabic free-text. AI extracts structured needs.
المستخدم يصف احتياجاته بالعربي والذكاء الاصطناعي يستخرج البيانات المهيكلة
"""
result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == profile_id))
profile = result.scalar_one_or_none()
if not profile:
raise ValueError(f"Profile {profile_id} not found")
system_prompt = """أنت مستشار أعمال سعودي. المستخدم يصف احتياجاته بالعربي.
استخرج المعلومات التالية وأعدها بصيغة JSON:
{
"deal_type": "partnership/distribution/franchise/jv/referral/acquisition/barter",
"specific_needs": ["قائمة الاحتياجات المحددة"],
"budget_range_sar": {"min": 0, "max": 0},
"timeline": "فوري/1-3 أشهر/3-6 أشهر/6-12 شهر/أكثر من سنة",
"priorities": ["الأولوية الأولى", "الأولوية الثانية"],
"ideal_partner_profile": "وصف الشريك المثالي",
"deal_breakers": ["الأمور التي لا يمكن التنازل عنها"],
"summary_ar": "ملخص الاحتياجات بالعربي"
}
Company context:
- Name: """ + profile.company_name + """
- Industry: """ + (profile.industry or "unknown") + """
- City: """ + (profile.city or "unknown")
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=user_description,
json_mode=True,
temperature=0.2,
)
analysis = llm_response.parse_json() or {}
# Persist extracted needs onto the profile
if analysis.get("specific_needs"):
existing = set(profile.needs or [])
for need in analysis["specific_needs"]:
existing.add(need)
profile.needs = list(existing)
if analysis.get("deal_type") and not profile.deal_preferences:
profile.deal_preferences = {analysis["deal_type"]: 1.0}
await db.flush()
logger.info("Analyzed needs for profile %s: %s", profile_id, analysis.get("summary_ar", ""))
return analysis
# ── Analyze Capabilities ─────────────────────────────────────────────────
async def analyze_capabilities(
self,
profile_id,
db: AsyncSession,
) -> dict:
"""
Analyze what the company can offer to partners.
تحليل ما يمكن للشركة تقديمه للشركاء
"""
result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == profile_id))
profile = result.scalar_one_or_none()
if not profile:
raise ValueError(f"Profile {profile_id} not found")
context_parts = [
f"Company: {profile.company_name}",
f"Industry: {profile.industry or 'unknown'}",
f"Sub-industry: {profile.sub_industry or 'unknown'}",
f"City: {profile.city or 'unknown'}",
f"Employees: {profile.employee_count or 'unknown'}",
f"Revenue (SAR): {profile.annual_revenue_sar or 'unknown'}",
]
if profile.capabilities:
context_parts.append(f"Known capabilities: {', '.join(profile.capabilities)}")
system_prompt = """أنت محلل قدرات شركات سعودي. حلل الشركة وحدد قدراتها التي يمكن تقديمها لشركاء.
You are a Saudi company capabilities analyst. Analyze the company and identify what it can offer to partners.
Return JSON:
{
"core_capabilities": ["القدرات الأساسية"],
"secondary_capabilities": ["القدرات الثانوية"],
"unique_advantages": ["المزايا التنافسية الفريدة"],
"capacity_utilization": "low/medium/high",
"partnership_value_ar": "وصف القيمة التي يمكن تقديمها للشركاء بالعربي",
"recommended_deal_types": ["أنواع الصفقات المقترحة"],
"target_industries": ["القطاعات المستهدفة للشراكة"]
}"""
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message="\n".join(context_parts),
json_mode=True,
temperature=0.3,
)
analysis = llm_response.parse_json() or {}
# Merge discovered capabilities into the profile
if analysis.get("core_capabilities"):
existing = set(profile.capabilities or [])
for cap in analysis["core_capabilities"]:
existing.add(cap)
if analysis.get("secondary_capabilities"):
for cap in analysis["secondary_capabilities"]:
existing.add(cap)
profile.capabilities = list(existing)
await db.flush()
logger.info("Analyzed capabilities for profile %s", profile_id)
return analysis
# ── Deal Readiness Score ─────────────────────────────────────────────────
async def get_deal_readiness(
self,
profile_id,
db: AsyncSession,
) -> dict:
"""
Score the company's readiness to engage in strategic deals.
تقييم جاهزية الشركة للصفقات الاستراتيجية
"""
from app.models.strategic_deal import StrategicDeal, DealStatus
result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == profile_id))
profile = result.scalar_one_or_none()
if not profile:
raise ValueError(f"Profile {profile_id} not found")
score = 0.0
breakdown = {}
recommendations_ar = []
# 1. Profile completeness (0-25)
completeness = 0
if profile.company_name:
completeness += 3
if profile.company_name_ar:
completeness += 2
if profile.industry:
completeness += 3
if profile.cr_number:
completeness += 4
if profile.city and profile.region:
completeness += 2
if profile.website:
completeness += 2
if profile.whatsapp_number:
completeness += 2
if profile.capabilities and len(profile.capabilities) >= 3:
completeness += 4
else:
recommendations_ar.append("أضف 3 قدرات على الأقل لتحسين المطابقة")
if profile.needs and len(profile.needs) >= 2:
completeness += 3
else:
recommendations_ar.append("حدد احتياجاتك لنجد لك الشريك المناسب")
breakdown["profile_completeness"] = completeness
score += completeness
# 2. Verification status (0-25)
verification = 0
if profile.is_verified:
verification = 20
if profile.cr_number:
verification += 5
else:
recommendations_ar.append("أضف رقم السجل التجاري للتحقق من شركتك")
breakdown["verification"] = verification
score += verification
# 3. Trust score (0-25)
trust = int(profile.trust_score * 25)
breakdown["trust"] = trust
score += trust
if profile.trust_score < 0.5:
recommendations_ar.append("أكمل عملية التحقق لرفع درجة الثقة")
# 4. Deal history (0-25)
deal_count_q = select(func.count()).select_from(StrategicDeal).where(
StrategicDeal.initiator_profile_id == profile_id,
)
total_deals = (await db.execute(deal_count_q)).scalar() or 0
won_count_q = select(func.count()).select_from(StrategicDeal).where(
StrategicDeal.initiator_profile_id == profile_id,
StrategicDeal.status == DealStatus.CLOSED_WON.value,
)
won_deals = (await db.execute(won_count_q)).scalar() or 0
history = min(25, total_deals * 3 + won_deals * 5)
breakdown["deal_history"] = history
score += history
if total_deals == 0:
recommendations_ar.append("ابدأ أول صفقة لبناء سجل تاريخي")
readiness_level = "low"
if score >= 70:
readiness_level = "high"
elif score >= 40:
readiness_level = "medium"
return {
"score": round(score, 1),
"max_score": 100,
"readiness_level": readiness_level,
"breakdown": breakdown,
"recommendations_ar": recommendations_ar,
"total_deals": total_deals,
"won_deals": won_deals,
"readiness_label_ar": {
"high": "جاهز للصفقات",
"medium": "يحتاج تحسين",
"low": "يحتاج اهتمام",
}[readiness_level],
}

View File

@ -0,0 +1,596 @@
"""
Deal Agent Autonomous outreach agent for B2B deal discovery and engagement.
وكيل الصفقات: وكيل ذكي مستقل للتواصل واكتشاف الشراكات
"""
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.strategic_deal import (
CompanyProfile, StrategicDeal, DealMatch,
DealStatus, DealChannel, MatchStatus, DealType,
)
from app.services.llm.provider import get_llm
logger = logging.getLogger("dealix.strategic_deals.agent")
# ── WhatsApp outreach templates (Arabic) ─────────────────────────────────────
TEMPLATES = {
"introduction_collaborative": (
"السلام عليكم {contact_name}\n\n"
"أتمنى تكون بخير وعافية. أنا {sender_name} من {company_name}.\n"
"اطلعت على أعمالكم في مجال {target_industry} وعجبني اللي تقدمونه.\n\n"
"عندنا خبرة في {our_capability} ونشوف فرصة تعاون مثمرة بيننا "
"خصوصاً في مجال {collaboration_area}.\n\n"
"هل ممكن نحدد وقت مناسب نتكلم فيه عن إمكانية التعاون؟\n\n"
"تحياتي"
),
"introduction_as_ai": (
"السلام عليكم {contact_name}\n\n"
"أنا مساعد ذكي من شركة {company_name}. فريقنا مهتم بالتعاون معكم "
"بناءً على تحليل التكامل بين خدماتنا.\n\n"
"شركة {company_name} تقدم {our_capability} وشفنا إن عندكم احتياج في "
"هذا المجال.\n\n"
"هل تحبون نرسل لكم تفاصيل أكثر عن فرصة التعاون؟\n\n"
"شكراً لوقتكم"
),
"follow_up_1": (
"مرحباً {contact_name}\n\n"
"تابعت معكم بخصوص موضوع التعاون اللي ذكرناه.\n"
"أبشركم جهزنا مقترح مبدئي يوضح كيف ممكن نستفيد من بعض.\n\n"
"هل تفضلون نرسله عبر الإيميل أو نحدد اجتماع قصير؟\n\n"
"تحياتي"
),
"proposal_summary": (
"حبيت أشاركك ملخص المقترح:\n\n"
"- نوع التعاون: {deal_type_ar}\n"
"- القيمة المتوقعة: {estimated_value}\n"
"- المدة: {duration}\n"
"- المنافع المشتركة: {mutual_benefits}\n\n"
"المقترح الكامل بالمرفق. ننتظر ملاحظاتكم.\n\n"
"شاكرين لكم"
),
"negotiation_counter": (
"شاكرين لكم على الرد والاهتمام {contact_name}.\n\n"
"نقدر وجهة نظركم. بعد دراسة مقترحكم، حبينا نقدم لكم عرض معدل:\n\n"
"{counter_details}\n\n"
"نتمنى يكون العرض مناسب ونتطلع لشراكة ناجحة.\n\n"
"تحياتي"
),
}
DEAL_TYPE_AR = {
"partnership": "شراكة استراتيجية",
"distribution": "توزيع",
"franchise": "امتياز تجاري",
"jv": "مشروع مشترك",
"referral": "إحالة",
"acquisition": "استحواذ",
"barter": "مقايضة",
}
@dataclass
class OutreachResult:
"""Result of an outreach attempt."""
success: bool = False
channel: str = ""
message_sent: str = ""
response_received: Optional[str] = None
interest_level: Optional[str] = None # high, medium, low, none
next_action: str = ""
next_action_ar: str = ""
error: Optional[str] = None
class DealAgent:
"""
Autonomous outreach agent that discovers, contacts, and qualifies B2B partners.
وكيل ذكي مستقل يكتشف ويتواصل ويؤهل الشركاء
"""
def __init__(self):
self.llm = get_llm()
# ── Outreach Campaign ────────────────────────────────────────────────────
async def run_outreach_campaign(
self,
deal_match_id,
channel: str,
db: AsyncSession,
) -> OutreachResult:
"""
Execute multi-step outreach: research, craft intro, send, handle response.
تنفيذ حملة تواصل متعددة الخطوات
"""
# Load match and related profiles
match_result = await db.execute(select(DealMatch).where(DealMatch.id == deal_match_id))
match = match_result.scalar_one_or_none()
if not match:
return OutreachResult(success=False, error="Match not found")
a_result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == match.company_a_id))
company_a = a_result.scalar_one_or_none()
if not company_a:
return OutreachResult(success=False, error="Initiator profile not found")
target_name = match.company_b_name or "الشركة المستهدفة"
target_industry = ""
target_contact = ""
company_b = None
if match.company_b_id:
b_result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == match.company_b_id))
company_b = b_result.scalar_one_or_none()
if company_b:
target_name = company_b.company_name
target_industry = company_b.industry or ""
target_contact = company_b.whatsapp_number or ""
# Step 1: Research the target
research = await self._research_target(company_a, company_b, match)
# Step 2: Craft personalized introduction
style = "as_company" # Default to speaking as the company
intro_message = await self.craft_introduction(
match=match,
channel=channel,
style=style,
db=db,
)
# Step 3: Prepare outreach result (actual sending delegated to channel adapters)
# In production, this would call WhatsApp/LinkedIn/Email service
outreach = OutreachResult(
success=True,
channel=channel,
message_sent=intro_message,
next_action="await_response",
next_action_ar="انتظار الرد من الطرف الآخر",
)
# Step 4: Update match status
match.status = MatchStatus.OUTREACH_SENT.value
await db.flush()
# Step 5: Create a strategic deal from this outreach
deal = StrategicDeal(
tenant_id=company_a.tenant_id,
initiator_profile_id=company_a.id,
target_profile_id=match.company_b_id,
target_company_name=target_name,
deal_type=match.deal_type_suggested or DealType.PARTNERSHIP.value,
deal_title=f"شراكة مع {target_name}",
deal_title_ar=f"شراكة مع {target_name}",
our_offer=research.get("our_value_proposition", ""),
our_need=research.get("what_we_need_from_them", ""),
proposed_terms=match.terms_suggested or {},
status=DealStatus.OUTREACH.value,
channel=channel,
ai_confidence=match.match_score,
negotiation_history=[{
"round": 0,
"action": "outreach",
"channel": channel,
"message": intro_message[:500],
"timestamp": datetime.now(timezone.utc).isoformat(),
}],
)
db.add(deal)
await db.flush()
logger.info(
"Outreach campaign executed for match %s via %s (deal %s created)",
deal_match_id, channel, deal.id,
)
return outreach
# ── Craft Introduction ───────────────────────────────────────────────────
async def craft_introduction(
self,
match: DealMatch,
channel: str,
style: str,
db: AsyncSession,
) -> str:
"""
Generate a personalized Arabic introduction message.
إنشاء رسالة تعريفية عربية مخصصة
"""
a_result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == match.company_a_id))
company_a = a_result.scalar_one_or_none()
target_name = match.company_b_name or "الشركة المستهدفة"
target_industry = ""
target_caps = []
target_needs = []
if match.company_b_id:
b_result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == match.company_b_id))
company_b = b_result.scalar_one_or_none()
if company_b:
target_name = company_b.company_name
target_industry = company_b.industry or ""
target_caps = company_b.capabilities or []
target_needs = company_b.needs or []
elif match.company_b_data:
target_industry = match.company_b_data.get("industry", "")
target_caps = match.company_b_data.get("capabilities", [])
target_needs = match.company_b_data.get("needs", [])
# Channel-specific length guidance
length_guidance = {
"whatsapp": "اكتب رسالة قصيرة ومباشرة (3-5 أسطر). لا تكتب أكثر من 300 حرف.",
"email": "اكتب رسالة مفصلة ومهنية (8-12 سطر) مع سطر موضوع.",
"linkedin": "اكتب رسالة قصيرة ومهنية (4-6 أسطر).",
"in_person": "جهز نقاط حديث مختصرة (5-7 نقاط).",
}
style_guidance = {
"as_company": "تكلم باسم الشركة مباشرة (نحن في شركة X...)",
"as_ai": "كن شفافاً أنك مساعد ذكي (أنا مساعد ذكي من شركة X...)",
}
context = f"""Our company: {company_a.company_name if company_a else 'unknown'}
Our capabilities: {', '.join((company_a.capabilities or [])[:5]) if company_a else 'unknown'}
Target company: {target_name}
Target industry: {target_industry}
Target capabilities: {', '.join(target_caps[:5])}
Target needs: {', '.join(target_needs[:5])}
Match reasons: {', '.join(match.match_reasons or [])}
Match score: {match.match_score}
Suggested deal type: {match.deal_type_suggested}"""
system_prompt = f"""أنت كاتب رسائل أعمال سعودي محترف.
اكتب رسالة تعريفية للتواصل مع شركة محتملة للتعاون.
Channel: {channel}
{length_guidance.get(channel, length_guidance['whatsapp'])}
Style: {style}
{style_guidance.get(style, style_guidance['as_company'])}
قواعد مهمة:
- ابدأ بالسلام
- اذكر سبب التواصل بوضوح
- أبرز نقطة التكامل بين الشركتين
- اختم بطلب واضح (اجتماع، مكالمة، تفاصيل أكثر)
- لا تبالغ في المدح
- كن مهنياً وودوداً
Return the message directly as text (not JSON)."""
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=context,
temperature=0.6,
)
message = llm_response.content.strip()
logger.info("Crafted introduction for match %s via %s", match.id, channel)
return message
# ── Handle Response ──────────────────────────────────────────────────────
async def handle_response(
self,
deal_id,
message: str,
channel: str,
db: AsyncSession,
) -> str:
"""
Analyze incoming response and generate appropriate follow-up.
تحليل الرد الوارد وتوليد متابعة مناسبة
"""
deal_result = await db.execute(select(StrategicDeal).where(StrategicDeal.id == deal_id))
deal = deal_result.scalar_one_or_none()
if not deal:
raise ValueError(f"Deal {deal_id} not found")
# Load initiator profile
initiator = None
if deal.initiator_profile_id:
init_result = await db.execute(
select(CompanyProfile).where(CompanyProfile.id == deal.initiator_profile_id)
)
initiator = init_result.scalar_one_or_none()
history_summary = ""
for h in (deal.negotiation_history or [])[-3:]:
history_summary += f"- {h.get('action', '?')}: {h.get('message', '')[:100]}\n"
context = f"""Deal: {deal.deal_title}
Our company: {initiator.company_name if initiator else 'unknown'}
Target: {deal.target_company_name or 'unknown'}
Channel: {channel}
Current status: {deal.status}
Recent conversation:
{history_summary}
Incoming message: {message}"""
system_prompt = """أنت مساعد أعمال سعودي. حلل الرسالة الواردة وحدد نوعها وقدم رداً مناسباً.
أولاً حلل الرسالة:
- اهتمام (interest): الطرف الآخر مهتم
- اعتراض (objection): عنده تحفظات
- سؤال (question): يحتاج معلومات إضافية
- رفض (rejection): غير مهتم
- طلب معلومات (request_for_info): يريد تفاصيل أكثر
ثم اكتب رداً مناسباً:
- إذا مهتم: حدد الخطوة التالية (اجتماع، مقترح)
- إذا متحفظ: عالج التحفظ بلطف
- إذا عنده سؤال: أجب بوضوح
- إذا رافض: اشكره واترك الباب مفتوحاً
- إذا يبي تفاصيل: وعده بإرسالها
Return JSON:
{
"response_type": "interest/objection/question/rejection/request_for_info",
"interest_level": "high/medium/low/none",
"response_message": "الرد بالعربي",
"next_action": "schedule_meeting/send_proposal/send_info/follow_up_later/close",
"next_action_ar": "الخطوة التالية بالعربي"
}"""
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=context,
json_mode=True,
temperature=0.4,
)
result = llm_response.parse_json() or {}
response_msg = result.get("response_message", "شكراً لردكم، سنتواصل معكم قريباً.")
interest = result.get("interest_level", "medium")
next_action = result.get("next_action", "follow_up_later")
# Update deal based on response
if interest == "high" or next_action == "schedule_meeting":
deal.status = DealStatus.NEGOTIATING.value
elif interest == "none" or next_action == "close":
deal.status = DealStatus.CLOSED_LOST.value
deal.closed_at = datetime.now(timezone.utc)
# Log in negotiation history
history = list(deal.negotiation_history or [])
history.append({
"round": len(history) + 1,
"action": "response_handling",
"their_message": message[:500],
"our_response": response_msg[:500],
"response_type": result.get("response_type", "unknown"),
"interest_level": interest,
"next_action": next_action,
"channel": channel,
"timestamp": datetime.now(timezone.utc).isoformat(),
})
deal.negotiation_history = history
await db.flush()
logger.info(
"Handled response for deal %s: type=%s, interest=%s",
deal_id, result.get("response_type"), interest,
)
return response_msg
# ── Generate Proposal ────────────────────────────────────────────────────
async def generate_proposal(
self,
deal_id,
db: AsyncSession,
) -> dict:
"""
Generate a full Arabic business proposal document.
إنشاء مقترح أعمال عربي كامل
"""
deal_result = await db.execute(select(StrategicDeal).where(StrategicDeal.id == deal_id))
deal = deal_result.scalar_one_or_none()
if not deal:
raise ValueError(f"Deal {deal_id} not found")
initiator = None
if deal.initiator_profile_id:
init_result = await db.execute(
select(CompanyProfile).where(CompanyProfile.id == deal.initiator_profile_id)
)
initiator = init_result.scalar_one_or_none()
target_name = deal.target_company_name or "الطرف الآخر"
target = None
if deal.target_profile_id:
t_result = await db.execute(
select(CompanyProfile).where(CompanyProfile.id == deal.target_profile_id)
)
target = t_result.scalar_one_or_none()
if target:
target_name = target.company_name
context = f"""Our company: {initiator.company_name if initiator else 'unknown'}
Our industry: {initiator.industry if initiator else 'unknown'}
Our capabilities: {', '.join((initiator.capabilities or [])[:8]) if initiator else 'unknown'}
Target company: {target_name}
Target industry: {target.industry if target else 'unknown'}
Target capabilities: {', '.join((target.capabilities or [])[:8]) if target else 'unknown'}
Target needs: {', '.join((target.needs or [])[:8]) if target else 'unknown'}
Deal: {deal.deal_title}
Deal type: {deal.deal_type}
Our offer: {deal.our_offer or 'TBD'}
Our need: {deal.our_need or 'TBD'}
Proposed terms: {json.dumps(deal.proposed_terms or {}, ensure_ascii=False)}
Estimated value: {deal.estimated_value_sar or 'TBD'} SAR"""
system_prompt = """أنت كاتب مقترحات أعمال سعودي محترف.
أنشئ مقترح أعمال شامل ومهني باللغة العربية.
Return JSON:
{
"title_ar": "عنوان المقترح",
"executive_summary_ar": "الملخص التنفيذي (3-5 جمل)",
"about_us_ar": "نبذة عن شركتنا",
"understanding_your_needs_ar": "فهمنا لاحتياجاتكم",
"proposed_solution_ar": "الحل المقترح",
"our_capabilities_ar": ["قدرة 1", "قدرة 2"],
"mutual_benefits_ar": ["منفعة مشتركة 1", "منفعة مشتركة 2"],
"deal_structure_ar": "هيكل الصفقة",
"financial_overview_ar": "النظرة المالية",
"timeline_ar": [
{"phase_ar": "المرحلة", "duration_ar": "المدة", "deliverables_ar": "المخرجات"}
],
"success_metrics_ar": ["مؤشر نجاح 1", "مؤشر نجاح 2"],
"risks_and_mitigations_ar": [
{"risk_ar": "المخاطرة", "mitigation_ar": "التخفيف"}
],
"next_steps_ar": ["خطوة 1", "خطوة 2"],
"closing_statement_ar": "الخاتمة"
}"""
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=context,
json_mode=True,
temperature=0.4,
)
proposal = llm_response.parse_json() or {}
logger.info("Generated proposal for deal %s", deal_id)
return proposal
# ── Discovery Scan ───────────────────────────────────────────────────────
async def run_discovery_scan(
self,
profile_id,
deal_type: Optional[str],
db: AsyncSession,
) -> list[DealMatch]:
"""
Full autonomous scan: analyze profile, find matches, generate outreach plan.
فحص مستقل كامل: تحليل الملف، إيجاد مطابقات، تجهيز خطة تواصل
"""
from app.services.strategic_deals.company_profiler import CompanyProfiler
from app.services.strategic_deals.deal_matcher import DealMatcher
profiler = CompanyProfiler()
matcher = DealMatcher()
# Step 1: Enrich profile if needed
prof_result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == profile_id))
profile = prof_result.scalar_one_or_none()
if not profile:
raise ValueError(f"Profile {profile_id} not found")
if not profile.capabilities or len(profile.capabilities) < 2:
logger.info("Discovery scan: enriching profile %s first", profile_id)
await profiler.enrich_profile(profile_id, db)
# Step 2: Analyze capabilities if thin
if not profile.capabilities or len(profile.capabilities) < 3:
await profiler.analyze_capabilities(profile_id, db)
# Step 3: Find matches
matches = await matcher.find_matches(
profile_id=profile_id,
deal_type=deal_type,
db=db,
limit=10,
)
# Step 4: Generate deal structure suggestions for top matches
for match in matches[:3]:
try:
await matcher.suggest_deal_structure(match.id, db)
except Exception as e:
logger.warning("Could not suggest structure for match %s: %s", match.id, e)
# Step 5: Generate Arabic summary
if matches:
summary_parts = [f"تم العثور على {len(matches)} فرصة شراكة محتملة:"]
for i, m in enumerate(matches[:5], 1):
target_name = m.company_b_name or "شركة"
if m.company_b_id:
b_res = await db.execute(
select(CompanyProfile).where(CompanyProfile.id == m.company_b_id)
)
b_prof = b_res.scalar_one_or_none()
if b_prof:
target_name = b_prof.company_name
reasons = ", ".join((m.match_reasons or [])[:2])
summary_parts.append(
f"{i}. {target_name} (نسبة التوافق: {m.match_score:.0%}) — {reasons}"
)
summary = "\n".join(summary_parts)
logger.info("Discovery scan summary:\n%s", summary)
logger.info("Discovery scan complete for profile %s: %d matches", profile_id, len(matches))
return matches
# ── Private Helpers ──────────────────────────────────────────────────────
async def _research_target(
self,
company_a: CompanyProfile,
company_b: Optional[CompanyProfile],
match: DealMatch,
) -> dict:
"""Research the target company to personalize outreach."""
target_name = company_b.company_name if company_b else (match.company_b_name or "unknown")
target_industry = company_b.industry if company_b else ""
target_caps = ", ".join((company_b.capabilities or [])[:5]) if company_b else ""
target_needs = ", ".join((company_b.needs or [])[:5]) if company_b else ""
context = f"""Our company: {company_a.company_name}
Our capabilities: {', '.join((company_a.capabilities or [])[:5])}
Our needs: {', '.join((company_a.needs or [])[:5])}
Target: {target_name}
Industry: {target_industry}
Capabilities: {target_caps}
Needs: {target_needs}
Match score: {match.match_score}
Match reasons: {', '.join(match.match_reasons or [])}"""
system_prompt = """أنت باحث أعمال. حلل الشركة المستهدفة وجهز نقاط للتواصل.
Return JSON:
{
"our_value_proposition": "ما نقدمه لهم بجملة واحدة",
"what_we_need_from_them": "ما نحتاجه منهم بجملة واحدة",
"key_talking_points_ar": ["نقطة حوار 1", "نقطة حوار 2"],
"potential_objections_ar": ["اعتراض محتمل 1"],
"recommended_approach_ar": "النهج الموصى به"
}"""
try:
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=context,
json_mode=True,
temperature=0.3,
fast=True,
)
return llm_response.parse_json() or {}
except Exception as e:
logger.warning("Target research failed: %s", e)
return {
"our_value_proposition": "",
"what_we_need_from_them": "",
"key_talking_points_ar": [],
"potential_objections_ar": [],
"recommended_approach_ar": "",
}

View File

@ -0,0 +1,572 @@
"""
Deal Matcher AI-powered B2B matching engine.
محرك المطابقة: يجد الشركاء المثاليين باستخدام الذكاء الاصطناعي
"""
import json
import logging
from dataclasses import dataclass, field
from typing import Optional
from sqlalchemy import select, and_, or_, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.strategic_deal import (
CompanyProfile, DealMatch, MatchStatus, DealType,
)
from app.services.llm.provider import get_llm
logger = logging.getLogger("dealix.strategic_deals.matcher")
# ── Matching weights ─────────────────────────────────────────────────────────
MATCH_WEIGHTS = {
"capability_complementarity": 0.30,
"need_alignment": 0.25,
"industry_fit": 0.15,
"geographic_fit": 0.10,
"size_compatibility": 0.10,
"trust_score": 0.10,
}
# Industry adjacency: industries that typically partner together
INDUSTRY_ADJACENCY = {
"technology": ["consulting", "finance", "healthcare", "education", "retail"],
"construction": ["real_estate", "manufacturing", "logistics", "energy"],
"real_estate": ["construction", "finance", "marketing"],
"retail": ["wholesale", "logistics", "marketing", "technology"],
"wholesale": ["retail", "manufacturing", "logistics"],
"healthcare": ["technology", "consulting", "manufacturing"],
"education": ["technology", "consulting", "media"],
"food_beverage": ["logistics", "retail", "agriculture", "tourism"],
"logistics": ["retail", "wholesale", "manufacturing", "food_beverage"],
"finance": ["technology", "real_estate", "consulting"],
"energy": ["construction", "manufacturing", "technology"],
"tourism": ["food_beverage", "marketing", "media"],
"consulting": ["technology", "finance", "healthcare", "education"],
"marketing": ["technology", "media", "retail", "tourism"],
"agriculture": ["food_beverage", "logistics", "manufacturing"],
"telecom": ["technology", "media", "consulting"],
"media": ["marketing", "technology", "telecom", "tourism"],
"automotive": ["manufacturing", "logistics", "finance"],
"manufacturing": ["construction", "wholesale", "logistics", "energy", "automotive"],
"government": ["technology", "consulting", "construction"],
}
# Regions that commonly trade together
REGION_PROXIMITY = {
"الرياض": ["القصيم", "المنطقة الشرقية"],
"مكة المكرمة": ["المدينة المنورة", "الباحة", "عسير"],
"المنطقة الشرقية": ["الرياض", "الحدود الشمالية"],
"المدينة المنورة": ["مكة المكرمة", "تبوك"],
"القصيم": ["الرياض", "حائل"],
"عسير": ["مكة المكرمة", "جازان", "نجران", "الباحة"],
"تبوك": ["المدينة المنورة", "الجوف"],
"حائل": ["القصيم", "الحدود الشمالية", "الجوف"],
"الحدود الشمالية": ["حائل", "الجوف", "المنطقة الشرقية"],
"جازان": ["عسير", "نجران"],
"نجران": ["عسير", "جازان"],
"الباحة": ["عسير", "مكة المكرمة"],
"الجوف": ["تبوك", "الحدود الشمالية", "حائل"],
}
@dataclass
class MatchScore:
"""Detailed breakdown of a match score."""
total: float = 0.0
capability_complementarity: float = 0.0
need_alignment: float = 0.0
industry_fit: float = 0.0
geographic_fit: float = 0.0
size_compatibility: float = 0.0
trust_score: float = 0.0
reasons_ar: list[str] = field(default_factory=list)
reasons_en: list[str] = field(default_factory=list)
class DealMatcher:
"""
AI-powered B2B matching engine that finds optimal partners.
محرك مطابقة بالذكاء الاصطناعي يجد الشركاء المثاليين
"""
def __init__(self):
self.llm = get_llm()
# ── Find Matches ─────────────────────────────────────────────────────────
async def find_matches(
self,
profile_id,
deal_type: Optional[str],
db: AsyncSession,
limit: int = 10,
) -> list[DealMatch]:
"""
Score and rank potential matches for a company profile.
تقييم وترتيب المطابقات المحتملة لملف شركة
"""
result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == profile_id))
source = result.scalar_one_or_none()
if not source:
raise ValueError(f"Profile {profile_id} not found")
# Fetch candidate profiles from the same tenant, excluding self
candidates_q = select(CompanyProfile).where(
CompanyProfile.tenant_id == source.tenant_id,
CompanyProfile.id != profile_id,
CompanyProfile.is_verified == True, # noqa: E712
)
candidates_result = await db.execute(candidates_q)
candidates = candidates_result.scalars().all()
if not candidates:
logger.info("No verified candidate profiles found for tenant %s", source.tenant_id)
return []
# Score each candidate
scored: list[tuple[CompanyProfile, MatchScore]] = []
for candidate in candidates:
match_score = await self.score_match(
company_a=source,
company_b=candidate,
deal_type=deal_type,
)
if match_score.total >= 0.2: # Minimum threshold
scored.append((candidate, match_score))
# Sort by score descending, take top N
scored.sort(key=lambda x: x[1].total, reverse=True)
scored = scored[:limit]
# Persist matches
matches = []
for candidate, ms in scored:
match = DealMatch(
tenant_id=source.tenant_id,
company_a_id=source.id,
company_b_id=candidate.id,
match_score=round(ms.total, 4),
match_reasons=ms.reasons_ar,
deal_type_suggested=deal_type or self._suggest_deal_type(source, candidate),
terms_suggested={},
status=MatchStatus.SUGGESTED.value,
)
db.add(match)
matches.append(match)
await db.flush()
for m in matches:
await db.refresh(m)
logger.info(
"Found %d matches for profile %s (from %d candidates)",
len(matches), profile_id, len(candidates),
)
return matches
# ── Detailed Scoring ─────────────────────────────────────────────────────
async def score_match(
self,
company_a: CompanyProfile,
company_b: CompanyProfile,
deal_type: Optional[str] = None,
) -> MatchScore:
"""
Compute detailed match score between two companies.
حساب درجة المطابقة التفصيلية بين شركتين
"""
ms = MatchScore()
# 1. Capability complementarity (0.30): A offers what B needs
cap_score = self._score_overlap(
company_a.capabilities or [],
company_b.needs or [],
)
ms.capability_complementarity = cap_score
if cap_score > 0.5:
ms.reasons_ar.append(
f"شركة {company_a.company_name} تقدم خدمات تحتاجها شركة {company_b.company_name}"
)
ms.reasons_en.append(
f"{company_a.company_name} offers what {company_b.company_name} needs"
)
# 2. Need alignment (0.25): B offers what A needs
need_score = self._score_overlap(
company_b.capabilities or [],
company_a.needs or [],
)
ms.need_alignment = need_score
if need_score > 0.5:
ms.reasons_ar.append(
f"شركة {company_b.company_name} تقدم خدمات تحتاجها شركة {company_a.company_name}"
)
ms.reasons_en.append(
f"{company_b.company_name} offers what {company_a.company_name} needs"
)
# 3. Industry fit (0.15): same value chain or adjacent
ind_score = self._score_industry_fit(company_a.industry, company_b.industry)
ms.industry_fit = ind_score
if ind_score > 0.5:
ms.reasons_ar.append("القطاعان متكاملان في سلسلة القيمة")
# 4. Geographic fit (0.10): same region or complementary
geo_score = self._score_geographic_fit(company_a.region, company_b.region)
ms.geographic_fit = geo_score
if geo_score >= 1.0:
ms.reasons_ar.append(f"الشركتان في نفس المنطقة: {company_a.region}")
elif geo_score >= 0.7:
ms.reasons_ar.append("الشركتان في مناطق متقاربة")
# 5. Size compatibility (0.10): appropriate size ratio
size_score = self._score_size_compatibility(company_a, company_b)
ms.size_compatibility = size_score
if size_score > 0.7:
ms.reasons_ar.append("حجم الشركتين متناسب للشراكة")
# 6. Trust score (0.10): verification and history
trust_a = company_a.trust_score or 0.0
trust_b = company_b.trust_score or 0.0
ms.trust_score = (trust_a + trust_b) / 2.0
if ms.trust_score > 0.7:
ms.reasons_ar.append("كلا الشركتين حاصلتان على درجة ثقة عالية")
# If keyword overlap is low, use LLM for semantic matching
if cap_score < 0.3 and need_score < 0.3:
semantic = await self._semantic_match(company_a, company_b, deal_type)
ms.capability_complementarity = max(ms.capability_complementarity, semantic.get("cap", 0))
ms.need_alignment = max(ms.need_alignment, semantic.get("need", 0))
if semantic.get("reason_ar"):
ms.reasons_ar.append(semantic["reason_ar"])
# Weighted total
ms.total = (
ms.capability_complementarity * MATCH_WEIGHTS["capability_complementarity"]
+ ms.need_alignment * MATCH_WEIGHTS["need_alignment"]
+ ms.industry_fit * MATCH_WEIGHTS["industry_fit"]
+ ms.geographic_fit * MATCH_WEIGHTS["geographic_fit"]
+ ms.size_compatibility * MATCH_WEIGHTS["size_compatibility"]
+ ms.trust_score * MATCH_WEIGHTS["trust_score"]
)
ms.total = round(min(1.0, ms.total), 4)
return ms
# ── Suggest Deal Structure ───────────────────────────────────────────────
async def suggest_deal_structure(
self,
match_id,
db: AsyncSession,
) -> dict:
"""
AI suggests deal type, key terms, pricing, timeline.
الذكاء الاصطناعي يقترح هيكل الصفقة
"""
result = await db.execute(select(DealMatch).where(DealMatch.id == match_id))
match = result.scalar_one_or_none()
if not match:
raise ValueError(f"Match {match_id} not found")
# Load both company profiles
a_result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == match.company_a_id))
company_a = a_result.scalar_one_or_none()
company_b = None
if match.company_b_id:
b_result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == match.company_b_id))
company_b = b_result.scalar_one_or_none()
company_b_name = company_b.company_name if company_b else (match.company_b_name or "شركة خارجية")
company_b_caps = company_b.capabilities if company_b else []
company_b_needs = company_b.needs if company_b else []
context = f"""Company A: {company_a.company_name}
Industry: {company_a.industry or 'unknown'}
Capabilities: {', '.join(company_a.capabilities or [])}
Needs: {', '.join(company_a.needs or [])}
Revenue SAR: {company_a.annual_revenue_sar or 'unknown'}
Company B: {company_b_name}
Industry: {(company_b.industry if company_b else 'unknown')}
Capabilities: {', '.join(company_b_caps)}
Needs: {', '.join(company_b_needs)}
Match score: {match.match_score}
Match reasons: {', '.join(match.match_reasons or [])}
Suggested deal type: {match.deal_type_suggested}"""
system_prompt = """أنت مستشار صفقات استراتيجية سعودي خبير. اقترح هيكل صفقة مفصل بين الشركتين.
Return JSON:
{
"deal_type": "partnership/distribution/franchise/jv/referral/acquisition/barter",
"deal_title_ar": "عنوان الصفقة بالعربي",
"deal_title_en": "Deal title in English",
"proposed_terms": {
"equity_split": "50/50 or other",
"revenue_share": "percentage or fixed",
"territory": "geographic scope",
"exclusivity": true/false,
"duration_months": 12,
"payment_terms": "description",
"kpis": ["key performance indicators"]
},
"estimated_value_sar": 0,
"timeline": {
"negotiation_weeks": 2,
"due_diligence_weeks": 4,
"launch_weeks": 8
},
"mutual_benefits_ar": ["المنفعة المشتركة 1", "المنفعة المشتركة 2"],
"risks_ar": ["المخاطر المحتملة"],
"proposal_summary_ar": "ملخص المقترح بالعربي",
"next_steps_ar": ["الخطوة التالية 1", "الخطوة التالية 2"]
}"""
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=context,
json_mode=True,
temperature=0.4,
)
structure = llm_response.parse_json() or {}
# Persist suggested terms on the match
if structure.get("proposed_terms"):
match.terms_suggested = structure
await db.flush()
logger.info("Suggested deal structure for match %s", match_id)
return structure
# ── Barter Chain Discovery ───────────────────────────────────────────────
async def find_barter_chains(
self,
profile_id,
db: AsyncSession,
) -> list[list[dict]]:
"""
Discover multi-party barter opportunities: A->B->C->A circular trades.
اكتشاف فرص المقايضة المتعددة الأطراف: سلاسل تبادل دائرية
شركتك عندها تسويق، الشركة ب تحتاج تسويق وعندها مساحات،
والشركة ج تحتاج مساحات وعندها تطوير برمجي اللي أنت تحتاجه
"""
result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == profile_id))
source = result.scalar_one_or_none()
if not source:
raise ValueError(f"Profile {profile_id} not found")
source_caps = set(c.lower() for c in (source.capabilities or []))
source_needs = set(n.lower() for n in (source.needs or []))
if not source_caps or not source_needs:
return []
# Fetch all profiles in tenant
all_q = select(CompanyProfile).where(
CompanyProfile.tenant_id == source.tenant_id,
CompanyProfile.id != profile_id,
)
all_result = await db.execute(all_q)
all_profiles = all_result.scalars().all()
# Build capability/need index
profiles_by_cap: dict[str, list[CompanyProfile]] = {}
profiles_by_need: dict[str, list[CompanyProfile]] = {}
for p in all_profiles:
for cap in (p.capabilities or []):
profiles_by_cap.setdefault(cap.lower(), []).append(p)
for need in (p.needs or []):
profiles_by_need.setdefault(need.lower(), []).append(p)
chains: list[list[dict]] = []
# Find 3-party chains: source->B->C->source
for source_cap in source_caps:
# B needs what source offers
for b_profile in profiles_by_need.get(source_cap, []):
b_caps = set(c.lower() for c in (b_profile.capabilities or []))
for b_cap in b_caps:
# C needs what B offers and C has what source needs
for c_profile in profiles_by_need.get(b_cap, []):
if c_profile.id == source.id or c_profile.id == b_profile.id:
continue
c_caps = set(c.lower() for c in (c_profile.capabilities or []))
# Does C offer what source needs?
overlap = c_caps & source_needs
if overlap:
chain = [
{
"company_id": str(source.id),
"company_name": source.company_name,
"offers": source_cap,
"receives": list(overlap)[0],
},
{
"company_id": str(b_profile.id),
"company_name": b_profile.company_name,
"offers": b_cap,
"receives": source_cap,
},
{
"company_id": str(c_profile.id),
"company_name": c_profile.company_name,
"offers": list(overlap)[0],
"receives": b_cap,
},
]
chains.append(chain)
if len(chains) >= 10:
break
if len(chains) >= 10:
break
if len(chains) >= 10:
break
if len(chains) >= 10:
break
# Deduplicate chains by company set
seen_sets: set[frozenset] = set()
unique_chains = []
for chain in chains:
company_set = frozenset(link["company_id"] for link in chain)
if company_set not in seen_sets:
seen_sets.add(company_set)
unique_chains.append(chain)
logger.info(
"Found %d barter chains for profile %s", len(unique_chains), profile_id,
)
return unique_chains
# ── Private Helpers ──────────────────────────────────────────────────────
def _score_overlap(self, offers: list[str], needs: list[str]) -> float:
"""Score overlap between what one company offers and another needs."""
if not offers or not needs:
return 0.0
offers_lower = {o.lower().strip() for o in offers}
needs_lower = {n.lower().strip() for n in needs}
if not needs_lower:
return 0.0
overlap = offers_lower & needs_lower
# Also check partial substring matches
partial = 0
for o in offers_lower:
for n in needs_lower:
if n not in overlap and o not in overlap:
if o in n or n in o:
partial += 0.5
total_matches = len(overlap) + partial
return min(1.0, total_matches / len(needs_lower))
def _score_industry_fit(self, ind_a: Optional[str], ind_b: Optional[str]) -> float:
"""Score industry compatibility."""
if not ind_a or not ind_b:
return 0.3 # Unknown = neutral
if ind_a == ind_b:
return 1.0
adjacent = INDUSTRY_ADJACENCY.get(ind_a, [])
if ind_b in adjacent:
return 0.7
# Check reverse
adjacent_b = INDUSTRY_ADJACENCY.get(ind_b, [])
if ind_a in adjacent_b:
return 0.7
return 0.2
def _score_geographic_fit(self, region_a: Optional[str], region_b: Optional[str]) -> float:
"""Score geographic proximity."""
if not region_a or not region_b:
return 0.5 # Unknown = neutral
if region_a == region_b:
return 1.0
nearby = REGION_PROXIMITY.get(region_a, [])
if region_b in nearby:
return 0.7
return 0.3
def _score_size_compatibility(
self, a: CompanyProfile, b: CompanyProfile,
) -> float:
"""Score size compatibility for deals. Very large + very small = low fit."""
emp_a = float(a.employee_count or 0)
emp_b = float(b.employee_count or 0)
if emp_a == 0 or emp_b == 0:
return 0.5 # Unknown = neutral
ratio = min(emp_a, emp_b) / max(emp_a, emp_b)
# Ratios > 0.1 are generally workable
if ratio >= 0.3:
return 1.0
elif ratio >= 0.1:
return 0.7
elif ratio >= 0.01:
return 0.4
return 0.2
def _suggest_deal_type(self, a: CompanyProfile, b: CompanyProfile) -> str:
"""Heuristic deal-type suggestion based on profiles."""
prefs_a = a.deal_preferences or {}
prefs_b = b.deal_preferences or {}
# Find mutually preferred deal type
all_types = set(list(prefs_a.keys()) + list(prefs_b.keys()))
if all_types:
best_type = max(
all_types,
key=lambda t: (prefs_a.get(t, 0) + prefs_b.get(t, 0)),
)
return best_type
# Default based on industry relationship
if a.industry == b.industry:
return DealType.REFERRAL.value
return DealType.PARTNERSHIP.value
async def _semantic_match(
self,
company_a: CompanyProfile,
company_b: CompanyProfile,
deal_type: Optional[str],
) -> dict:
"""Use LLM for semantic matching when keyword overlap is low."""
context = f"""Company A: {company_a.company_name}
Capabilities: {', '.join(company_a.capabilities or ['unknown'])}
Needs: {', '.join(company_a.needs or ['unknown'])}
Industry: {company_a.industry or 'unknown'}
Company B: {company_b.company_name}
Capabilities: {', '.join(company_b.capabilities or ['unknown'])}
Needs: {', '.join(company_b.needs or ['unknown'])}
Industry: {company_b.industry or 'unknown'}
Deal type: {deal_type or 'any'}"""
system_prompt = """أنت محلل مطابقة بين الشركات. قيم مدى تكامل هاتين الشركتين.
Return JSON:
{
"cap": 0.0 to 1.0 (capability complementarity),
"need": 0.0 to 1.0 (need alignment),
"reason_ar": "سبب التكامل بالعربي أو null إذا لا يوجد تكامل"
}"""
try:
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=context,
json_mode=True,
temperature=0.2,
fast=True,
)
result = llm_response.parse_json()
return result if result else {"cap": 0, "need": 0, "reason_ar": None}
except Exception as e:
logger.warning("Semantic match failed: %s", e)
return {"cap": 0, "need": 0, "reason_ar": None}

View File

@ -0,0 +1,479 @@
"""
Deal Negotiator Autonomous AI negotiator for B2B deals.
المفاوض الذكي: مفاوض آلي بالذكاء الاصطناعي للصفقات بين الشركات
"""
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.strategic_deal import (
StrategicDeal, CompanyProfile, DealStatus, DealType,
)
from app.services.llm.provider import get_llm
logger = logging.getLogger("dealix.strategic_deals.negotiator")
# ── Models ───────────────────────────────────────────────────────────────────
class NegotiationStrategy(BaseModel):
"""Strategy configuration for autonomous negotiation."""
target_terms: dict = {} # Ideal outcome
acceptable_range: dict = {} # Min/max for each variable
walk_away_point: dict = {} # Absolute limits / deal breakers
priorities: list[str] = [] # Ordered from most to least important
style: str = "collaborative" # collaborative, competitive, accommodating
@dataclass
class NegotiationRound:
"""Result of a single negotiation round."""
round_number: int = 0
action: str = "" # opening_offer, counter_offer, acceptance, rejection, escalation
our_terms: dict = field(default_factory=dict)
their_terms: dict = field(default_factory=dict)
message_ar: str = ""
message_en: str = ""
concessions_made: list[str] = field(default_factory=list)
concessions_gained: list[str] = field(default_factory=list)
within_range: bool = True
confidence: float = 0.0
timestamp: str = ""
# ── Escalation thresholds ────────────────────────────────────────────────────
ESCALATION_VALUE_SAR = 500_000 # Deals above this need human oversight
MAX_AUTO_ROUNDS = 5 # After this many rounds, escalate
STALL_THRESHOLD = 3 # Same terms repeated this many times = stall
class DealNegotiator:
"""
Autonomous AI negotiator that handles B2B deal negotiations.
Respects Saudi business culture: relationship-first, patience, mutual respect.
مفاوض ذكي يحترم ثقافة الأعمال السعودية
"""
def __init__(self):
self.llm = get_llm()
# ── Start Negotiation ────────────────────────────────────────────────────
async def start_negotiation(
self,
deal_id,
strategy: NegotiationStrategy,
db: AsyncSession,
) -> NegotiationRound:
"""
Generate opening offer based on strategy and Saudi negotiation culture.
إنشاء العرض الأولي بناءً على الاستراتيجية وثقافة التفاوض السعودية
"""
deal = await self._load_deal(deal_id, db)
initiator = await self._load_profile(deal.initiator_profile_id, db)
target_name = deal.target_company_name or "الطرف الآخر"
if deal.target_profile_id:
target = await self._load_profile(deal.target_profile_id, db)
target_name = target.company_name if target else target_name
context = f"""Deal: {deal.deal_title}
Deal type: {deal.deal_type}
Our company: {initiator.company_name}
Target company: {target_name}
Our offer: {deal.our_offer or 'not specified'}
Our need: {deal.our_need or 'not specified'}
Strategy style: {strategy.style}
Target terms: {json.dumps(strategy.target_terms, ensure_ascii=False)}
Priorities: {', '.join(strategy.priorities)}"""
style_guidance = {
"collaborative": "ابدأ بعرض عادل ومتوازن يظهر الرغبة في شراكة طويلة المدى",
"competitive": "ابدأ بعرض طموح لكن معقول مع ترك مساحة للتفاوض",
"accommodating": "ابدأ بعرض سخي يظهر حسن النية والرغبة في بناء علاقة",
}
system_prompt = f"""أنت مفاوض أعمال سعودي محترف. أنشئ عرضاً أولياً للصفقة.
التوجيه: {style_guidance.get(strategy.style, style_guidance['collaborative'])}
Important Saudi negotiation culture:
- Start with relationship building (سلامات واستفسار عن الأحوال)
- Show respect for the other party
- Be patient, don't rush to numbers
- Present win-win framing
Return JSON:
{{
"opening_terms": {{"key": "value for each negotiable item"}},
"message_ar": "رسالة العرض الأولي بالعربي (تبدأ بالسلام والتحية)",
"message_en": "Opening message in English",
"rationale_ar": "مبررات العرض",
"confidence": 0.0 to 1.0
}}"""
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=context,
json_mode=True,
temperature=0.4,
)
result = llm_response.parse_json() or {}
now_str = datetime.now(timezone.utc).isoformat()
round_data = NegotiationRound(
round_number=1,
action="opening_offer",
our_terms=result.get("opening_terms", strategy.target_terms),
their_terms={},
message_ar=result.get("message_ar", ""),
message_en=result.get("message_en", ""),
concessions_made=[],
concessions_gained=[],
within_range=True,
confidence=result.get("confidence", 0.5),
timestamp=now_str,
)
# Update deal
deal.proposed_terms = round_data.our_terms
deal.status = DealStatus.NEGOTIATING.value
deal.ai_confidence = round_data.confidence
history = list(deal.negotiation_history or [])
history.append({
"round": round_data.round_number,
"action": round_data.action,
"our_terms": round_data.our_terms,
"their_terms": round_data.their_terms,
"message_ar": round_data.message_ar,
"timestamp": now_str,
})
deal.negotiation_history = history
await db.flush()
logger.info("Started negotiation for deal %s (round 1)", deal_id)
return round_data
# ── Handle Counter-Offer ─────────────────────────────────────────────────
async def handle_counter_offer(
self,
deal_id,
their_terms: dict,
db: AsyncSession,
) -> NegotiationRound:
"""
Analyze a counter-offer and generate a response.
تحليل عرض مضاد وتوليد رد مناسب
"""
deal = await self._load_deal(deal_id, db)
history = list(deal.negotiation_history or [])
round_num = len(history) + 1
# Get the latest strategy from proposed terms
our_latest = deal.proposed_terms or {}
context = f"""Deal: {deal.deal_title}
Deal type: {deal.deal_type}
Our latest terms: {json.dumps(our_latest, ensure_ascii=False)}
Their counter-offer: {json.dumps(their_terms, ensure_ascii=False)}
Negotiation history (rounds): {len(history)}
Estimated value SAR: {deal.estimated_value_sar or 'unknown'}"""
system_prompt = """أنت مفاوض أعمال سعودي محترف. الطرف الآخر قدم عرضاً مضاداً.
حلل العرض وقرر:
1. هل العرض مقبول؟
2. هل نحتاج عرض مضاد؟
3. هل يجب رفع الموضوع لإنسان؟
Saudi culture: never be aggressive. Show appreciation for their offer before countering.
Handle common responses: "غالي" (too expensive), "نبي نفكر" (need to think), "عندنا عرض ثاني" (we have another offer)
Return JSON:
{
"action": "accept/counter/reject/escalate",
"counter_terms": {"key": "value"},
"message_ar": "الرد بالعربي",
"message_en": "Response in English",
"concessions_made": ["what we gave up"],
"concessions_gained": ["what we got"],
"within_acceptable_range": true/false,
"confidence": 0.0 to 1.0,
"analysis_ar": "تحليل العرض المضاد"
}"""
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=context,
json_mode=True,
temperature=0.3,
)
result = llm_response.parse_json() or {}
action = result.get("action", "counter")
now_str = datetime.now(timezone.utc).isoformat()
round_data = NegotiationRound(
round_number=round_num,
action=action,
our_terms=result.get("counter_terms", our_latest),
their_terms=their_terms,
message_ar=result.get("message_ar", ""),
message_en=result.get("message_en", ""),
concessions_made=result.get("concessions_made", []),
concessions_gained=result.get("concessions_gained", []),
within_range=result.get("within_acceptable_range", True),
confidence=result.get("confidence", 0.5),
timestamp=now_str,
)
# Update deal state
if action == "accept":
deal.agreed_terms = their_terms
deal.status = DealStatus.TERM_SHEET.value
elif action == "reject":
deal.status = DealStatus.CLOSED_LOST.value
deal.closed_at = datetime.now(timezone.utc)
else:
deal.proposed_terms = round_data.our_terms
deal.ai_confidence = round_data.confidence
history.append({
"round": round_data.round_number,
"action": action,
"our_terms": round_data.our_terms,
"their_terms": their_terms,
"message_ar": round_data.message_ar,
"timestamp": now_str,
})
deal.negotiation_history = history
await db.flush()
logger.info("Handled counter-offer for deal %s (round %d, action=%s)", deal_id, round_num, action)
return round_data
# ── Generate Negotiation Response ────────────────────────────────────────
async def generate_response(
self,
deal_id,
message: str,
db: AsyncSession,
) -> str:
"""
Generate a culturally appropriate Arabic/English negotiation response.
توليد رد تفاوضي مناسب ثقافياً بالعربي أو الإنجليزي
"""
deal = await self._load_deal(deal_id, db)
history = deal.negotiation_history or []
# Summarize negotiation context
history_summary = ""
for h in history[-3:]: # Last 3 rounds
history_summary += f"Round {h.get('round', '?')}: {h.get('action', '?')} - {h.get('message_ar', '')[:100]}\n"
context = f"""Deal: {deal.deal_title}
Deal type: {deal.deal_type}
Current status: {deal.status}
Our proposed terms: {json.dumps(deal.proposed_terms or {}, ensure_ascii=False)}
Recent history:
{history_summary}
Incoming message from counter-party: {message}"""
system_prompt = """أنت مفاوض أعمال سعودي محترف. رد على رسالة الطرف الآخر بشكل مناسب.
Rules:
- إذا الرسالة بالعربي، رد بالعربي
- إذا الرسالة بالإنجليزي، رد بالإنجليزي
- كن محترماً وودوداً دائماً
- لا تكن عدوانياً أبداً
- حافظ على العلاقة حتى لو الصفقة لم تنجح
Handle:
- "غالي" أظهر المرونة واعرض بدائل
- "نبي نفكر" أعطهم وقت مع اقتراح موعد متابعة
- "عندنا عرض ثاني" أبرز المميزات الفريدة بدون تقليل المنافسين
- "ما يناسبنا" اسأل عن التفاصيل واعرض تعديلات
Return the response message directly as text (not JSON)."""
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=context,
temperature=0.5,
)
response_text = llm_response.content.strip()
# Log the exchange in negotiation history
history = list(deal.negotiation_history or [])
history.append({
"round": len(history) + 1,
"action": "response",
"their_message": message[:500],
"our_response": response_text[:500],
"timestamp": datetime.now(timezone.utc).isoformat(),
})
deal.negotiation_history = history
await db.flush()
logger.info("Generated negotiation response for deal %s", deal_id)
return response_text
# ── Should Escalate? ─────────────────────────────────────────────────────
async def should_escalate(
self,
deal_id,
db: AsyncSession,
) -> bool:
"""
Determine if a human should take over the negotiation.
تحديد ما إذا كان يجب تصعيد التفاوض لإنسان
"""
deal = await self._load_deal(deal_id, db)
history = deal.negotiation_history or []
round_count = len(history)
# Rule 1: High-value deals
value = float(deal.estimated_value_sar or 0)
if value > ESCALATION_VALUE_SAR:
logger.info("Escalation: deal %s value (%.0f SAR) exceeds threshold", deal_id, value)
return True
# Rule 2: Too many rounds without resolution
if round_count >= MAX_AUTO_ROUNDS:
logger.info("Escalation: deal %s reached %d rounds", deal_id, round_count)
return True
# Rule 3: Stalled negotiation (same terms repeating)
if round_count >= STALL_THRESHOLD:
recent_terms = [
json.dumps(h.get("our_terms", {}), sort_keys=True)
for h in history[-STALL_THRESHOLD:]
]
if len(set(recent_terms)) == 1:
logger.info("Escalation: deal %s stalled for %d rounds", deal_id, STALL_THRESHOLD)
return True
# Rule 4: Low confidence
if deal.ai_confidence is not None and deal.ai_confidence < 0.3:
logger.info("Escalation: deal %s AI confidence too low (%.2f)", deal_id, deal.ai_confidence)
return True
# Rule 5: Counter-party explicitly requested human contact
if history:
last_msg = (history[-1].get("their_message", "") or "").lower()
human_keywords = [
"أبي أكلم شخص", "أبي أكلم المدير", "ابي اتكلم مع انسان",
"speak to someone", "talk to a person", "human", "manager",
"مدير", "مسؤول",
]
for kw in human_keywords:
if kw in last_msg:
logger.info("Escalation: deal %s counter-party requested human", deal_id)
return True
return False
# ── Generate Term Sheet ──────────────────────────────────────────────────
async def generate_term_sheet(
self,
deal_id,
db: AsyncSession,
) -> dict:
"""
Generate a formal Arabic term sheet from agreed terms.
إنشاء ورقة شروط رسمية بالعربي من الشروط المتفق عليها
"""
deal = await self._load_deal(deal_id, db)
initiator = await self._load_profile(deal.initiator_profile_id, db)
target_name = deal.target_company_name or "الطرف الثاني"
target_cr = ""
if deal.target_profile_id:
target = await self._load_profile(deal.target_profile_id, db)
if target:
target_name = target.company_name
target_cr = target.cr_number or ""
terms = deal.agreed_terms or deal.proposed_terms or {}
context = f"""Parties:
- Party A: {initiator.company_name} (CR: {initiator.cr_number or 'N/A'})
- Party B: {target_name} (CR: {target_cr or 'N/A'})
Deal: {deal.deal_title}
Deal type: {deal.deal_type}
Agreed terms: {json.dumps(terms, ensure_ascii=False)}
Estimated value: {deal.estimated_value_sar or 'TBD'} SAR
Our offer: {deal.our_offer or 'N/A'}
Our need: {deal.our_need or 'N/A'}"""
system_prompt = """أنت مستشار قانوني سعودي متخصص في صياغة أوراق الشروط.
أنشئ ورقة شروط رسمية باللغة العربية.
Return JSON:
{
"title_ar": "عنوان ورقة الشروط",
"date": "التاريخ",
"parties": [
{"name": "اسم الطرف", "role": "الطرف الأول/الطرف الثاني", "cr": "رقم السجل التجاري"}
],
"preamble_ar": "مقدمة ورقة الشروط",
"scope_ar": "نطاق الاتفاقية",
"terms": [
{"title_ar": "عنوان البند", "description_ar": "تفاصيل البند"}
],
"obligations_party_a_ar": ["التزامات الطرف الأول"],
"obligations_party_b_ar": ["التزامات الطرف الثاني"],
"financial_terms_ar": "الشروط المالية",
"duration_ar": "مدة الاتفاقية",
"termination_ar": "شروط الإنهاء",
"confidentiality_ar": "شرط السرية",
"dispute_resolution_ar": "حل النزاعات",
"governing_law_ar": "القانون الحاكم: أنظمة المملكة العربية السعودية",
"next_steps_ar": ["الخطوات التالية"]
}"""
llm_response = await self.llm.complete(
system_prompt=system_prompt,
user_message=context,
json_mode=True,
temperature=0.2,
)
term_sheet = llm_response.parse_json() or {}
# Update deal status
if deal.status == DealStatus.NEGOTIATING.value:
deal.status = DealStatus.TERM_SHEET.value
await db.flush()
logger.info("Generated term sheet for deal %s", deal_id)
return term_sheet
# ── Helpers ──────────────────────────────────────────────────────────────
async def _load_deal(self, deal_id, db: AsyncSession) -> StrategicDeal:
result = await db.execute(select(StrategicDeal).where(StrategicDeal.id == deal_id))
deal = result.scalar_one_or_none()
if not deal:
raise ValueError(f"Deal {deal_id} not found")
return deal
async def _load_profile(self, profile_id, db: AsyncSession) -> Optional[CompanyProfile]:
if not profile_id:
return None
result = await db.execute(select(CompanyProfile).where(CompanyProfile.id == profile_id))
return result.scalar_one_or_none()