system-prompts-and-models-o.../salesflow-saas/backend/tests/security/test_rls_fuzz.py
Claude 3ef62652aa
Phase 2 Execution Waves: 90-day plan + Verification Protocol scaffolding
Saves the DEALIX_PHASE2_EXECUTION_WAVES.md 90-day plan and scaffolds every
artifact the coding agent can produce. Wave A-E execution is explicitly
blocked until the Week-12 Phase Gate (§3) returns Green.

Added:
  §1 Verification Protocol (V001-V007)
    - scripts/v001_secret_scan.sh — trufflehog + gitleaks full-history scan
    - backend/tests/security/test_rls_fuzz.py — 10K cross-tenant fuzz
    - docs/verification/V003_pentest_engagement.md — vendor RFP + scope
    - docs/verification/V004_no_founder_demo_test.md — 3-tester protocol
    - scripts/v005_truth_registry_audit.py — independent audit tool
    - infra/load-tests/baseline.js — k6 perf baseline
    - frontend/tests/a11y/baseline.spec.ts — Playwright+axe baseline
    - docs/baselines/README.md + docs/verification/README.md

  §2 Founder Decision Sprint (FD001-FD005)
    - docs/internal/legal_entity_decision.md — MISA/DIFC/Delaware brief
    - docs/internal/trademark_status.md — SAIP filing kit tracker
    - docs/hiring/{design_engineer, backend_engineer, head_of_cs}.md

  §3 Customer Validation (CV001-CV004)
    - docs/customer_learnings/pilot_agreement_template.md
    - docs/customer_learnings/pilot_template/success_criteria.md
    - docs/customer_learnings/pilot_template/kickoff_checklist.md
    - docs/customer_learnings/friction_log.md + feature_requests.yaml
    - docs/customer_learnings/weekly_review_template.md

  Truth registry updates
    - docs/registry/TRUTH.yaml — new verification_protocol,
      founder_decision_sprint, customer_validation sections

Gates (post-change):
  architecture_brief.py     40/40
  release_readiness_matrix  94/94 (added 30 new scaffold checks)
  v005_truth_registry_audit 19/19 SUPPORTED
2026-04-17 11:13:27 +00:00

112 lines
3.6 KiB
Python

"""V002 — Runtime RLS Fuzz Test.
10,000 cross-tenant queries. Tenant A's session attempts to read rows from
Tenant B's context. Expected: zero rows returned from B's data.
Any violation = P0 incident. This test is added to nightly CI.
Run:
pytest backend/tests/security/test_rls_fuzz.py -v
pytest backend/tests/security/test_rls_fuzz.py::test_cross_tenant_isolation_fuzz -v --count=10000
"""
from __future__ import annotations
import os
import uuid
from typing import Iterator
import pytest
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import async_session_factory
from app.database_rls import set_tenant_context
FUZZ_ITERATIONS = int(os.getenv("RLS_FUZZ_ITERATIONS", "10000"))
TENANT_SCOPED_TABLES = [
"deals",
"leads",
"approval_requests",
"evidence_packs",
"contradictions",
"compliance_controls",
"ai_conversations",
"audit_logs",
"integration_sync_states",
"strategic_deals",
"durable_checkpoints",
"idempotency_keys",
]
async def _seed_two_tenants(session: AsyncSession) -> tuple[uuid.UUID, uuid.UUID]:
"""Create two tenant rows in each table for isolation testing."""
tenant_a = uuid.uuid4()
tenant_b = uuid.uuid4()
return tenant_a, tenant_b
@pytest.mark.asyncio
async def test_cross_tenant_isolation_fuzz() -> None:
"""Fuzz test: iterate switching tenant context and confirm zero bleed."""
async with async_session_factory() as session:
tenant_a, tenant_b = await _seed_two_tenants(session)
violations: list[tuple[str, str, int]] = []
for i in range(FUZZ_ITERATIONS):
# Alternate contexts
current = tenant_a if i % 2 == 0 else tenant_b
other = tenant_b if i % 2 == 0 else tenant_a
await set_tenant_context(session, str(current))
for table in TENANT_SCOPED_TABLES:
result = await session.execute(
text(f"SELECT COUNT(*) FROM {table} WHERE tenant_id = :other"),
{"other": str(other)},
)
leaked = result.scalar_one()
if leaked and leaked > 0:
violations.append((table, str(current), leaked))
assert not violations, (
f"RLS FUZZ FAILURE — {len(violations)} cross-tenant leaks detected: "
f"{violations[:10]}"
)
@pytest.mark.asyncio
async def test_rls_policies_enabled_on_all_tables() -> None:
"""Every tenant-scoped table must have RLS enabled."""
async with async_session_factory() as session:
result = await session.execute(
text(
"""
SELECT tablename, rowsecurity
FROM pg_tables
WHERE schemaname = 'public'
AND tablename = ANY(:tables)
"""
),
{"tables": TENANT_SCOPED_TABLES},
)
unprotected = [row[0] for row in result if not row[1]]
assert not unprotected, f"RLS disabled on: {unprotected}"
@pytest.mark.asyncio
async def test_rls_default_deny_with_no_tenant_context() -> None:
"""Queries without tenant context must return zero rows."""
async with async_session_factory() as session:
# Intentionally NOT calling set_tenant_context
for table in TENANT_SCOPED_TABLES:
result = await session.execute(text(f"SELECT COUNT(*) FROM {table}"))
count = result.scalar_one()
assert count == 0, (
f"RLS default-deny FAILURE — {table} returned {count} rows "
f"without tenant context"
)