system-prompts-and-models-o.../dealix/scripts/smoke_inprocess.py
2026-05-01 14:03:52 +03:00

63 lines
1.7 KiB
Python

#!/usr/bin/env python3
"""HTTP smoke without a TCP server — uses ASGITransport (CI-friendly)."""
from __future__ import annotations
import asyncio
import json
import sys
from pathlib import Path
_REPO = Path(__file__).resolve().parents[1]
if str(_REPO) not in sys.path:
sys.path.insert(0, str(_REPO))
import httpx # noqa: E402
from api.main import create_app # noqa: E402
PATHS = [
"/",
"/health",
"/api/v1/personal-operator/daily-brief",
"/api/v1/personal-operator/launch-report",
"/api/v1/v3/command-center/snapshot",
"/api/v1/business/pricing",
"/api/v1/innovation/growth-missions",
"/api/v1/innovation/command-feed/demo",
]
async def main() -> int:
try:
sys.stdout.reconfigure(encoding="utf-8")
except (AttributeError, OSError):
pass
app = create_app()
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
failed = 0
for path in PATHS:
r = await client.get(path)
line = f"{r.status_code} {path}"
print(line)
if r.status_code != 200:
failed += 1
print(r.text[:400], file=sys.stderr)
else:
try:
snippet = json.dumps(r.json(), ensure_ascii=False)[:400]
except Exception:
snippet = r.text[:200]
print(snippet.encode("utf-8", errors="replace").decode("utf-8", errors="replace"))
if failed:
print(f"SMOKE_FAIL {failed} endpoints", file=sys.stderr)
return 1
print("SMOKE_INPROCESS_OK")
return 0
if __name__ == "__main__":
raise SystemExit(asyncio.run(main()))