system-prompts-and-models-o.../salesflow-saas/backend/app/services/idempotency_service.py
Claude 38e9d02075
feat(dealix): close ALL 4 Tier-1 runtime gaps (Programs E, F, G, K, J)
Program F — Multi-Tenancy RLS (Row-Level Security):
  alembic 20260417_0002_add_rls.py: Enables RLS on 23 tenant-scoped tables.
  database_rls.py: set_tenant_context() helpers for SET LOCAL app.tenant_id.
  middleware/tenant_rls.py: Extracts tenant_id from JWT on every request.
  Default-deny when no context. PostgreSQL only (CI safe on SQLite).
  Result: OWASP A01:2025 — access control enforced at DB layer.

Program G — Idempotency Standard:
  models/idempotency_key.py: IdempotencyKey table with TTL + SHA256 hash.
  services/idempotency_service.py: get_existing/store with request fingerprint.
  middleware/idempotency.py: HTTP middleware on POST/PUT/PATCH.
  Result: Duplicate side effects prevented on retry.

Program E — Persistent Durable Execution:
  models/durable_checkpoint.py: DurableCheckpoint with sequence_num + status.
  services/durable_runtime.py: start_run/checkpoint/complete/resume/list_incomplete.
  Result: Workflows survive crashes — resume from last persisted checkpoint.

Program K — OpenTelemetry:
  observability/otel.py: init/span/inject_correlation_id with graceful
    degradation when OTel packages absent.
  openclaw/gateway.py: Wraps execute() in span, binds correlation_id to
    trace_id. Bridge between business correlation and production observability.

Program J — Release Gate Hardening:
  docs/governance/release-gates.md: Documents 3 mandatory gates.
  .github/workflows/dealix-ci.yml: Adds release_readiness_matrix as CI step.
  release_readiness_matrix.py: Updated to check 41/41 components.

Verification:
  architecture_brief.py:     40/40 PASS
  release_readiness_matrix.py: 41/41 PASS

https://claude.ai/code/session_01W1rJthWDkasijTdXCfxVHs
2026-04-17 10:12:04 +00:00

86 lines
2.5 KiB
Python

"""Idempotency Service — prevents duplicate side effects across retries.
Used by both HTTP middleware and service-level callers (approval_bridge,
evidence_pack_service, golden_path).
"""
from __future__ import annotations
import hashlib
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
def hash_request(body: Any) -> str:
"""Compute SHA256 of request body for fingerprinting."""
payload = json.dumps(body, sort_keys=True, default=str) if body is not None else ""
return hashlib.sha256(payload.encode()).hexdigest()
class IdempotencyService:
"""Manages idempotency key lifecycle."""
DEFAULT_TTL_HOURS = 24
async def get_existing(
self, db: AsyncSession, *, key: str, tenant_id: str
) -> Optional[Dict[str, Any]]:
"""Return cached response for key if exists and not expired."""
from app.models.idempotency_key import IdempotencyKey
stmt = select(IdempotencyKey).where(
IdempotencyKey.key == key,
IdempotencyKey.tenant_id == tenant_id,
)
result = await db.execute(stmt)
row = result.scalar_one_or_none()
if not row:
return None
# Expiry check
if row.expires_at and row.expires_at < datetime.now(timezone.utc):
return None
return {
"cached": True,
"key": row.key,
"endpoint": row.endpoint,
"request_hash": row.request_hash,
"response": row.response,
"status_code": row.status_code,
}
async def store(
self,
db: AsyncSession,
*,
key: str,
tenant_id: str,
endpoint: str,
request_body: Any,
response: Any,
status_code: int = 200,
ttl_hours: int = DEFAULT_TTL_HOURS,
) -> None:
"""Store response keyed by idempotency key."""
from app.models.idempotency_key import IdempotencyKey
record = IdempotencyKey(
tenant_id=tenant_id,
key=key,
endpoint=endpoint,
request_hash=hash_request(request_body),
response=response if isinstance(response, dict) else {"value": response},
status_code=str(status_code),
expires_at=datetime.now(timezone.utc) + timedelta(hours=ttl_hours),
)
db.add(record)
await db.commit()
idempotency_service = IdempotencyService()