system-prompts-and-models-o.../personal-brand-engine/agents/linkedin/agent.py
VoXc2 4bb2442313
Add Personal Brand Engine - 7 AI Agents Automation System
Complete AI-powered personal brand automation for Sami Assiri.\n\n7 agents: LinkedIn, Email, Social Media, WhatsApp, CV Optimizer, Content Strategist, Opportunity Scout.\nInfra: FastAPI + APScheduler + Docker + Ollama/Groq LLM + GitHub Pages landing page.\n83 files, ~10K lines. Cost: $0-5/month.
2026-03-30 11:45:48 +03:00

220 lines
7.5 KiB
Python

"""LinkedIn agent -- creates posts, engages the network, and optimises the profile."""
from __future__ import annotations
import asyncio
import logging
import time
from datetime import datetime, timezone
from typing import Any
from linkedin_api import Linkedin
from sqlalchemy.orm import Session
from agents.base_agent import BaseAgent
from agents.linkedin.content_generator import generate_post
from agents.linkedin.engagement import engage_with_feed
from agents.linkedin.profile_optimizer import optimize_profile
from storage.models import Post
logger = logging.getLogger(__name__)
# Simple in-memory rate-limiter: maps action -> last-execution timestamp.
_RATE_LIMIT_WINDOW: dict[str, float] = {}
# Minimum seconds between repeated invocations of the same action.
RATE_LIMIT_SECONDS: dict[str, int] = {
"post_content": 3600, # 1 hour between posts
"engage_network": 1800, # 30 min between engagement rounds
"optimize_profile": 86400, # once per day
}
class LinkedInAgent(BaseAgent):
"""Autonomous LinkedIn agent for Sami Mohammed Assiri's personal brand."""
agent_name: str = "linkedin"
def __init__(
self,
config: Any,
llm_client: Any,
db_session: Session,
) -> None:
super().__init__(config, llm_client, db_session)
self._api: Linkedin | None = None
# ------------------------------------------------------------------
# LinkedIn API (lazy init)
# ------------------------------------------------------------------
def _get_api(self) -> Linkedin:
"""Return an authenticated ``linkedin_api.Linkedin`` instance.
The credentials come from the application settings. The client is
created once and reused for the lifetime of this agent instance.
"""
if self._api is None:
email = self.config.linkedin_email
password = self.config.linkedin_password
if not email or not password:
raise RuntimeError(
"LinkedIn credentials are not configured. "
"Set LINKEDIN_EMAIL and LINKEDIN_PASSWORD in .env."
)
try:
self._api = Linkedin(email, password)
logger.info("LinkedIn API authenticated for %s", email)
except Exception as exc:
logger.error("LinkedIn authentication failed: %s", exc)
raise
return self._api
# ------------------------------------------------------------------
# Rate limiting
# ------------------------------------------------------------------
@staticmethod
def _is_rate_limited(action: str) -> bool:
last = _RATE_LIMIT_WINDOW.get(action)
if last is None:
return False
window = RATE_LIMIT_SECONDS.get(action, 0)
return (time.time() - last) < window
@staticmethod
def _mark_executed(action: str) -> None:
_RATE_LIMIT_WINDOW[action] = time.time()
# ------------------------------------------------------------------
# Task dispatcher
# ------------------------------------------------------------------
async def run(self, task: str, **kwargs: Any) -> dict:
"""Dispatch *task* to the appropriate handler.
Supported tasks:
- ``post_content`` -- generate and publish a LinkedIn post
- ``engage_network`` -- like / comment on connections' recent posts
- ``optimize_profile`` -- return profile improvement suggestions
"""
dispatch = {
"post_content": self.post_content,
"engage_network": self.engage_network,
"optimize_profile": self.optimize_profile,
}
handler = dispatch.get(task)
if handler is None:
self.log_action(task, details=f"Unknown task: {task}", status="failed")
return {"status": "error", "message": f"Unknown task: {task}"}
if self._is_rate_limited(task):
msg = f"Rate-limited: {task} was run too recently."
logger.warning(msg)
self.log_action(task, details=msg, status="skipped")
return {"status": "skipped", "message": msg}
with self.timer() as t:
try:
result = await handler(**kwargs)
self._mark_executed(task)
self.log_action(task, details=str(result), duration=t.elapsed)
return {"status": "success", "result": result}
except Exception as exc:
logger.exception("Task %s failed", task)
self.log_action(
task,
details=str(exc),
status="failed",
duration=t.elapsed,
)
await self.notify_owner(
f"[LinkedIn Agent] Task '{task}' failed: {exc}"
)
return {"status": "error", "message": str(exc)}
# ------------------------------------------------------------------
# post_content
# ------------------------------------------------------------------
async def post_content(self, *, pillar: str | None = None) -> dict:
"""Generate a LinkedIn post via LLM and publish it."""
brand_profile = self.get_brand_profile()
content_strategy = self.get_content_strategy()
# Generate the post text
post_text = await generate_post(
self.llm,
brand_profile,
content_strategy,
pillar=pillar,
)
# Persist as draft first
post_row = Post(
platform="linkedin",
content=post_text,
status="draft",
)
self.db.add(post_row)
self.db.flush()
# Publish via LinkedIn API
api = self._get_api()
try:
api.post(post_text)
post_row.status = "published"
post_row.published_at = datetime.now(timezone.utc)
self.db.commit()
logger.info("Published LinkedIn post id=%s", post_row.id)
except Exception as exc:
post_row.status = "failed"
self.db.commit()
raise RuntimeError(f"Failed to publish post: {exc}") from exc
return {
"post_id": post_row.id,
"content_preview": post_text[:120],
"published": True,
}
# ------------------------------------------------------------------
# engage_network
# ------------------------------------------------------------------
async def engage_network(
self,
*,
max_likes: int = 15,
max_comments: int = 5,
) -> dict:
"""Like and comment on recent posts from connections."""
api = self._get_api()
brand_profile = self.get_brand_profile()
result = await engage_with_feed(
linkedin_api=api,
llm_client=self.llm,
brand_profile=brand_profile,
max_likes=max_likes,
max_comments=max_comments,
)
return result
# ------------------------------------------------------------------
# optimize_profile
# ------------------------------------------------------------------
async def optimize_profile(self) -> dict:
"""Return a dict of profile optimisation suggestions."""
api = self._get_api()
brand_profile = self.get_brand_profile()
suggestions = await optimize_profile(
llm_client=self.llm,
brand_profile=brand_profile,
linkedin_api=api,
)
return suggestions