diff --git a/salesflow-saas/backend/app/services/core_os/decision_memo.py b/salesflow-saas/backend/app/services/core_os/decision_memo.py new file mode 100644 index 00000000..51babc79 --- /dev/null +++ b/salesflow-saas/backend/app/services/core_os/decision_memo.py @@ -0,0 +1,61 @@ +from pydantic import BaseModel, ConfigDict, Field +from typing import List, Dict, Any, Optional +import uuid +from datetime import datetime + +class RiskRegisterItem(BaseModel): + risk: str + severity: str # "high", "medium", "low", "critical" + mitigation: str + +class FinancialImpact(BaseModel): + revenue_upside_sar: float = 0.0 + cost_downside_sar: float = 0.0 + capital_at_risk_sar: float = 0.0 + +class AuditMetadata(BaseModel): + verified: bool = False + tool_proof_id: Optional[str] = None + policy_check_passed: bool = False + agent_id: str + timestamp: str + +class DecisionMemo(BaseModel): + """ + The Universal Output Contract (Decision Memo) + All Sovereign Growth OS Agents MUST return this exact schema. + """ + model_config = ConfigDict(extra="allow") + + memo_id: str = Field(default_factory=lambda: f"memo_{uuid.uuid4().hex[:10]}") + objective: str + decision_context: str + inputs_used: List[str] + assumptions: List[str] + recommendation_ar: str + alternatives_considered: List[str] + expected_financial_impact: FinancialImpact + risk_register: List[RiskRegisterItem] + confidence_score: float = Field(ge=0, le=100) + required_approvals: List[str] + next_best_action: str + rollback_plan: str + evidence_links: List[str] + audit_metadata: AuditMetadata + + def to_json(self) -> Dict[str, Any]: + return self.model_dump() + + @classmethod + def create_memo(cls, agent_id: str, objective: str, recommendation: str, + confidence: float, **kwargs) -> 'DecisionMemo': + """Helper to safely instantiate memos with timestamps included.""" + audit = AuditMetadata( + agent_id=agent_id, + timestamp=datetime.utcnow().isoformat() + ) + kwargs["audit_metadata"] = audit + kwargs["objective"] = objective + kwargs["recommendation_ar"] = recommendation + kwargs["confidence_score"] = confidence + return cls(**kwargs) diff --git a/salesflow-saas/backend/app/services/strategic_deals/partnership_scout.py b/salesflow-saas/backend/app/services/strategic_deals/partnership_scout.py new file mode 100644 index 00000000..da7a134f --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/partnership_scout.py @@ -0,0 +1,86 @@ +import uuid +from typing import Dict, Any, List +# import langgraph primitives when integrated, for now structure the state and logic +# from langgraph.graph import StateGraph, END + +class PartnershipScoutWorkflow: + """ + Partnership Scout Agent (LangGraph-based state machine). + Reads market signals and generates detailed partnership scorecards. + """ + + def __init__(self, memory_store, decision_engine): + self.memory_store = memory_store + self.decision_engine = decision_engine + + def _fetch_company_data(self, state: dict) -> dict: + signal = state.get("signal", {}) + target = signal.get("company_name") + # In a real environment, call clearbit/linkedin/web-scraper here + state["enriched_data"] = {"name": target, "industry": "SaaS", "revenue_tier": "mid-market"} + return state + + def _score_partnership_fit(self, state: dict) -> dict: + data = state.get("enriched_data", {}) + score = 85 if data.get("industry") == "SaaS" else 40 + state["fit_score"] = score + state["synergy_potential"] = ["Product reselling", "Data API sharing"] + return state + + def _generate_decision_memo(self, state: dict) -> dict: + # Generate the structured Decision Memo + score = state.get("fit_score", 0) + target = state.get("enriched_data", {}).get("name", "Unknown") + + memo_kwargs = { + "decision_context": f"Market signal detected interest for {target}", + "inputs_used": ["LinkedIn API", "Crunchbase Signal"], + "assumptions": ["Revenue over 10M SAR", "No strict exclusivity clauses"], + "alternatives_considered": ["Ignore signal", "Direct M&A (ruled out due to size)"], + "expected_financial_impact": { + "revenue_upside_sar": 250000.0, + "cost_downside_sar": 15000.0, + "capital_at_risk_sar": 0.0 + }, + "risk_register": [ + {"risk": "Brand overlap", "severity": "medium", "mitigation": "Co-branding guidelines"} + ], + "required_approvals": ["VP Partnerships"], + "next_best_action": "Send Partnership introductory email to C-level", + "rollback_plan": "Cease communications and mark as disqualified in CRM", + "evidence_links": ["https://crm.dealix.local/signals/1"] + } + + recommendation = f"Initiate Alliance Structuring with {target} (Score: {score})" if score > 70 else "Discard lead." + + memo = self.decision_engine.create_memo( + agent_id="partnership_scout", + objective="Evaluate partnership market fit", + recommendation=recommendation, + confidence=float(score), + **memo_kwargs + ) + + state["final_memo"] = memo.to_json() + return state + + def execute_flow(self, signal: Dict[str, Any]) -> Dict[str, Any]: + """ + Simulates the LangGraph execution flow: fetch -> score -> memo. + """ + state = {"signal": signal} + state = self._fetch_company_data(state) + state = self._score_partnership_fit(state) + state = self._generate_decision_memo(state) + + # Save to memory backbone + memo_id = self.memory_store.store_item( + domain="partners", + title=f"Partner Scout: {signal.get('company_name', 'Unknown')}", + memory_type="Partner Evaluation", + owner="VP Partnerships", + confidence=int(state.get("fit_score", 0)), + summary=state["final_memo"]["recommendation_ar"] + ) + + return {"status": "scouted", "memo_id": memo_id, "score": state["fit_score"]} diff --git a/salesflow-saas/backend/app/services/strategic_deals/strategic_pmo.py b/salesflow-saas/backend/app/services/strategic_deals/strategic_pmo.py new file mode 100644 index 00000000..2ff84a45 --- /dev/null +++ b/salesflow-saas/backend/app/services/strategic_deals/strategic_pmo.py @@ -0,0 +1,79 @@ +from typing import Dict, Any, List +from datetime import datetime, timedelta + +class StrategicPMOAgent: + """ + Strategic PMO Agent for Sovereign OS. + Translates long-term strategies (e.g., Post-Merger Integration, Global Expansion) + into tracked workstreams, RAG statuses (Red/Amber/Green), and escalates blockers. + """ + + def __init__(self, memory_store, decision_engine): + self.memory_store = memory_store + self.decision_engine = decision_engine + + def breakdown_initiative(self, initiative_title: str, goals: List[str], deadline_days: int) -> Dict[str, Any]: + """Takes an executive goal and breaks it down into tracks.""" + # Simulated LLM generation of tasks based on the M&A DD or Expansion Playbook + tasks = [ + {"task": "Legal Entity Setup", "owner": "Legal", "due_in": min(14, deadline_days), "status": "green"}, + {"task": "Financial Integration", "owner": "Finance", "due_in": min(30, deadline_days), "status": "amber"}, + {"task": "IT Systems Merge", "owner": "IT", "due_in": min(45, deadline_days), "status": "green"} + ] + + return { + "initiative": initiative_title, + "overall_status": "amber", + "deadline_date": (datetime.utcnow() + timedelta(days=deadline_days)).isoformat(), + "workstreams": tasks + } + + def check_health_and_escalate(self, initiative_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Runs on CRON (`execution.milestone_due`). + Escalates if a task belongs in red territory. + """ + red_tasks = [t for t in initiative_data.get("workstreams", []) if t["status"] == "red"] + + if not red_tasks: + return {"status": "healthy", "escalation_needed": False} + + memo_kwargs = { + "decision_context": f"Critical path slippage detected in: {initiative_data['initiative']}", + "inputs_used": ["PMO Task Tracker", "Jira Integration"], + "assumptions": ["Delaying IT systems merge jeopardizes synergy realization"], + "alternatives_considered": ["Extend deadline", "Allocate emergency budget"], + "expected_financial_impact": { + "revenue_upside_sar": 0.0, + "cost_downside_sar": 50000.0, # Penalty for delay + "capital_at_risk_sar": 0.0 + }, + "risk_register": [ + {"risk": "Integration Failure", "severity": "critical", "mitigation": "CEO Intervention required"} + ], + "required_approvals": ["CEO", "Chief of Staff"], + "next_best_action": "Schedule emergency steering committee meeting", + "rollback_plan": "N/A - Execution pipeline blocker", + "evidence_links": ["https://jira.dealix.local/board/INT-1"] + } + + memo = self.decision_engine.create_memo( + agent_id="strategic_pmo", + objective="Escalate blocked strategic initiative", + recommendation=f"Intervene immediately in the following red tasks: {[t['task'] for t in red_tasks]}", + confidence=95.0, + **memo_kwargs + ) + + # Save escalation to memory + self.memory_store.store_item( + domain="runbooks", + title=f"ESCALATION: {initiative_data['initiative']}", + memory_type="Escalation Memo", + owner="Chief of Staff", + confidence=95, + summary=memo.recommendation_ar, + tags=["escalation", "red-flag"] + ) + + return {"status": "escalated", "memo": memo.to_json()}