Coverage for netrun / rbac / tests / test_tenant_isolation.py: 0%
319 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-18 22:06 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-18 22:06 +0000
1"""
2Contract Tests for Tenant Isolation.
4These tests MUST pass before any release. They prove that:
51. Tenant A cannot read Tenant B's data
62. Tenant A cannot write to Tenant B's data
73. Background tasks maintain tenant context
84. Raw SQL is blocked or filtered
95. Pagination queries include tenant filters
106. Aggregations are properly scoped
12Security Level: CRITICAL
13Compliance: SOC2 CC6.1, ISO27001 A.9.4, NIST AC-4
15Usage:
16 # Run all tenant isolation tests
17 pytest -m tenant_isolation
19 # Run with verbose output
20 pytest netrun/rbac/tests/test_tenant_isolation.py -v
22 # Fail fast on first error (recommended for CI)
23 pytest netrun/rbac/tests/test_tenant_isolation.py -x
25CI/CD Integration:
26 These tests should be run on every PR and before any deployment.
27 Configure your CI to fail the build if any test in this file fails.
28"""
30import asyncio
31import re
32from typing import Any, AsyncGenerator, List, Optional
33from unittest.mock import AsyncMock, MagicMock, patch
34from uuid import uuid4
36import pytest
37from sqlalchemy import Column, MetaData, String, Table, create_engine, select, text
38from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
39from sqlalchemy.orm import declarative_base
41from netrun.rbac.exceptions import TenantIsolationError
42from netrun.rbac.testing import (
43 BackgroundTaskTenantContext,
44 EscapePathFinding,
45 EscapePathSeverity,
46 TenantEscapePathScanner,
47 TenantTestContext,
48 assert_tenant_isolation,
49 assert_tenant_isolation_sync,
50 ci_fail_on_findings,
51 get_compliance_documentation,
52 preserve_tenant_context,
53 tenant_isolation_test,
54 tenant_test_context,
55)
57# =============================================================================
58# Test Fixtures
59# =============================================================================
61Base = declarative_base()
64class MockItem(Base):
65 """Mock model for testing tenant isolation."""
67 __tablename__ = "items"
69 id = Column(String, primary_key=True)
70 name = Column(String, nullable=False)
71 tenant_id = Column(String, nullable=False)
72 status = Column(String, default="active")
75class MockUser(Base):
76 """Mock user model for testing."""
78 __tablename__ = "users"
80 id = Column(String, primary_key=True)
81 email = Column(String, nullable=False)
82 tenant_id = Column(String, nullable=False)
85@pytest.fixture
86def mock_session() -> AsyncMock:
87 """Create a mock async session for testing."""
88 session = AsyncMock(spec=AsyncSession)
90 # Track RLS context
91 session._tenant_context = None
93 async def mock_execute(statement, params=None):
94 stmt_str = str(statement) if hasattr(statement, "__str__") else str(statement)
96 # Capture SET LOCAL commands
97 if "SET LOCAL" in stmt_str and "app.current_tenant_id" in stmt_str:
98 if params and "tenant_id" in params:
99 session._tenant_context = params["tenant_id"]
101 # Capture RESET commands
102 if "RESET app.current_tenant_id" in stmt_str:
103 session._tenant_context = None
105 # Mock current_setting queries
106 if "current_setting" in stmt_str and "app.current_tenant_id" in stmt_str:
107 result = MagicMock()
108 result.scalar.return_value = session._tenant_context
109 return result
111 # Return mock result
112 result = MagicMock()
113 result.scalars.return_value.all.return_value = []
114 result.fetchall.return_value = []
115 result.rowcount = 0
116 return result
118 session.execute = AsyncMock(side_effect=mock_execute)
119 return session
122@pytest.fixture
123def tenant_a_id() -> str:
124 """Generate a unique tenant A ID."""
125 return f"tenant-a-{uuid4().hex[:8]}"
128@pytest.fixture
129def tenant_b_id() -> str:
130 """Generate a unique tenant B ID."""
131 return f"tenant-b-{uuid4().hex[:8]}"
134# =============================================================================
135# Core Tenant Isolation Tests
136# =============================================================================
139class TestTenantIsolation:
140 """
141 pgTAP-style contract tests for multi-tenant isolation.
143 These tests verify that the fundamental isolation guarantees hold.
144 """
146 @pytest.mark.asyncio
147 @pytest.mark.tenant_isolation
148 @tenant_isolation_test
149 async def test_cross_tenant_read_impossible(self, mock_session: AsyncMock) -> None:
150 """
151 Tenant B MUST NOT see Tenant A's data.
153 This is the fundamental isolation guarantee. If this test fails,
154 there is a CRITICAL security vulnerability.
155 """
156 async with TenantTestContext(mock_session) as ctx:
157 # Create item in Tenant A (context is tenant A by default)
158 await mock_session.execute(
159 text("INSERT INTO items (id, name, tenant_id) VALUES (:id, :name, :tid)"),
160 {"id": "item-1", "name": "Secret Item", "tid": ctx.tenant_a_id},
161 )
162 await mock_session.commit()
164 # Switch to Tenant B
165 await ctx.switch_to_tenant_b()
167 # Verify context switched
168 current = await ctx.get_current_tenant()
169 assert current == ctx.tenant_b_id, "Failed to switch tenant context"
171 # With proper RLS, this query should return empty
172 result = await mock_session.execute(text("SELECT * FROM items"))
173 items = result.fetchall()
175 # CRITICAL ASSERTION: Tenant B must not see Tenant A's data
176 assert len(items) == 0, (
177 "CRITICAL SECURITY FAILURE: Tenant B can see Tenant A's data! "
178 f"Found {len(items)} items that should be hidden by RLS."
179 )
181 @pytest.mark.asyncio
182 @pytest.mark.tenant_isolation
183 @tenant_isolation_test
184 async def test_cross_tenant_write_impossible(self, mock_session: AsyncMock) -> None:
185 """
186 Tenant B MUST NOT be able to modify Tenant A's data.
188 Even if Tenant B somehow knows the ID of Tenant A's data,
189 they must not be able to update or delete it.
190 """
191 async with TenantTestContext(mock_session) as ctx:
192 # Create item in Tenant A
193 await mock_session.execute(
194 text("INSERT INTO items (id, name, tenant_id) VALUES (:id, :name, :tid)"),
195 {"id": "item-1", "name": "Original", "tid": ctx.tenant_a_id},
196 )
197 await mock_session.commit()
199 # Switch to Tenant B and try to update
200 await ctx.switch_to_tenant_b()
202 result = await mock_session.execute(
203 text("UPDATE items SET name = 'Hacked' WHERE id = 'item-1'")
204 )
206 # With proper RLS, rowcount should be 0 (no rows affected)
207 assert result.rowcount == 0, (
208 "CRITICAL SECURITY FAILURE: Tenant B modified Tenant A's data! "
209 f"UPDATE affected {result.rowcount} rows."
210 )
212 @pytest.mark.asyncio
213 @pytest.mark.tenant_isolation
214 @tenant_isolation_test
215 async def test_cross_tenant_delete_impossible(self, mock_session: AsyncMock) -> None:
216 """
217 Tenant B MUST NOT be able to delete Tenant A's data.
218 """
219 async with TenantTestContext(mock_session) as ctx:
220 # Create item in Tenant A
221 await mock_session.execute(
222 text("INSERT INTO items (id, name, tenant_id) VALUES (:id, :name, :tid)"),
223 {"id": "item-1", "name": "Protected", "tid": ctx.tenant_a_id},
224 )
225 await mock_session.commit()
227 # Switch to Tenant B and try to delete
228 await ctx.switch_to_tenant_b()
230 result = await mock_session.execute(
231 text("DELETE FROM items WHERE id = 'item-1'")
232 )
234 # With proper RLS, rowcount should be 0
235 assert result.rowcount == 0, (
236 "CRITICAL SECURITY FAILURE: Tenant B deleted Tenant A's data! "
237 f"DELETE affected {result.rowcount} rows."
238 )
241# =============================================================================
242# Query Isolation Tests
243# =============================================================================
246class TestQueryIsolation:
247 """Tests for query-level tenant isolation assertions."""
249 @pytest.mark.asyncio
250 async def test_query_without_tenant_filter_fails(self) -> None:
251 """
252 Queries without tenant filter should be caught by assert_tenant_isolation.
254 This ensures developers cannot accidentally write queries that
255 bypass tenant isolation.
256 """
257 # This query is DANGEROUS - no tenant filter
258 dangerous_query = select(MockItem).where(MockItem.status == "active")
260 with pytest.raises(TenantIsolationError) as exc_info:
261 await assert_tenant_isolation(dangerous_query)
263 # Verify error message is helpful
264 assert "tenant_id" in str(exc_info.value).lower()
265 assert "REMEDIATION" in str(exc_info.value)
267 @pytest.mark.asyncio
268 async def test_query_with_tenant_filter_passes(self) -> None:
269 """
270 Queries with tenant filter should pass validation.
271 """
272 tenant_id = "test-tenant-123"
273 safe_query = select(MockItem).where(
274 MockItem.tenant_id == tenant_id,
275 MockItem.status == "active",
276 )
278 # Should not raise
279 await assert_tenant_isolation(safe_query)
281 @pytest.mark.asyncio
282 async def test_pagination_without_tenant_filter_fails(self) -> None:
283 """
284 Pagination queries MUST include tenant filter.
286 Common mistake: developers add pagination without realizing
287 it can expose data from other tenants.
288 """
289 # Dangerous: pagination without tenant filter
290 dangerous_pagination = select(MockItem).offset(0).limit(100)
292 with pytest.raises(TenantIsolationError):
293 await assert_tenant_isolation(dangerous_pagination)
295 @pytest.mark.asyncio
296 async def test_pagination_with_tenant_filter_passes(self) -> None:
297 """
298 Pagination with tenant filter should pass.
299 """
300 tenant_id = "test-tenant-123"
301 safe_pagination = select(MockItem).where(
302 MockItem.tenant_id == tenant_id
303 ).offset(0).limit(100)
305 # Should not raise
306 await assert_tenant_isolation(safe_pagination)
308 def test_sync_assertion_works(self) -> None:
309 """
310 Synchronous version of assertion should work for non-async contexts.
311 """
312 dangerous_query = select(MockItem).where(MockItem.status == "active")
314 with pytest.raises(TenantIsolationError):
315 assert_tenant_isolation_sync(dangerous_query)
317 @pytest.mark.asyncio
318 async def test_allowed_patterns_bypass_check(self) -> None:
319 """
320 Queries matching allowed patterns should pass without tenant filter.
322 Use for system tables, public lookup tables, etc.
323 """
324 # Define allowed patterns
325 allowed = [
326 re.compile(r"system_config"),
327 re.compile(r"lookup_"),
328 ]
330 # Query on allowed table (no tenant_id)
331 system_query = "SELECT * FROM system_config WHERE key = 'version'"
333 # Should not raise due to allowed pattern
334 await assert_tenant_isolation(
335 system_query,
336 allowed_patterns=allowed,
337 )
339 @pytest.mark.asyncio
340 async def test_raw_sql_string_validation(self) -> None:
341 """
342 Raw SQL strings should be validated too.
343 """
344 dangerous_sql = "SELECT * FROM items WHERE status = 'active'"
346 with pytest.raises(TenantIsolationError):
347 await assert_tenant_isolation(dangerous_sql)
349 safe_sql = "SELECT * FROM items WHERE tenant_id = 'abc' AND status = 'active'"
351 # Should not raise
352 await assert_tenant_isolation(safe_sql)
355# =============================================================================
356# Background Task Context Tests
357# =============================================================================
360class TestBackgroundTaskContext:
361 """Tests for background task tenant context preservation."""
363 @pytest.mark.asyncio
364 @pytest.mark.tenant_isolation
365 async def test_background_task_preserves_tenant(self) -> None:
366 """
367 Background tasks MUST maintain tenant context.
369 Without proper context preservation, background tasks could
370 execute with wrong tenant context or no context at all.
371 """
372 captured_tenant_ids: List[Optional[str]] = []
374 async def capture_tenant_context(tenant_id: str) -> None:
375 """Simulate a background task that needs tenant context."""
376 from netrun.rbac.testing import _current_tenant_id
378 captured_tenant_ids.append(_current_tenant_id.get())
380 tenant_id = f"test-tenant-{uuid4().hex[:8]}"
382 # Create context wrapper
383 ctx = BackgroundTaskTenantContext(tenant_id)
385 # Wrap and execute task
386 wrapped_task = ctx.run(capture_tenant_context, tenant_id)
387 await wrapped_task()
389 # Verify tenant was captured correctly
390 assert len(captured_tenant_ids) == 1
391 assert captured_tenant_ids[0] == tenant_id, (
392 f"Background task lost tenant context! "
393 f"Expected: {tenant_id}, Got: {captured_tenant_ids[0]}"
394 )
396 @pytest.mark.asyncio
397 async def test_background_task_logs_correlation_id(self) -> None:
398 """
399 Background tasks should log correlation ID for tracing.
400 """
401 tenant_id = "test-tenant"
402 correlation_id = "trace-123-456"
404 ctx = BackgroundTaskTenantContext(
405 tenant_id=tenant_id,
406 correlation_id=correlation_id,
407 )
409 assert ctx.correlation_id == correlation_id
411 @pytest.mark.asyncio
412 async def test_preserve_tenant_context_decorator(self) -> None:
413 """
414 Test the decorator form of tenant context preservation.
415 """
416 tenant_id = f"test-tenant-{uuid4().hex[:8]}"
417 results: List[str] = []
419 async def process_items(items: List[str]) -> None:
420 from netrun.rbac.testing import _current_tenant_id
422 current = _current_tenant_id.get()
423 results.append(current or "NONE")
425 # Use decorator
426 @preserve_tenant_context(tenant_id)
427 async def decorated_process(items: List[str]) -> None:
428 await process_items(items)
430 # Would normally be added to BackgroundTasks
431 await decorated_process(["item1", "item2"])
433 assert len(results) == 1
434 assert results[0] == tenant_id
437# =============================================================================
438# Tenant Test Context Tests
439# =============================================================================
442class TestTenantTestContext:
443 """Tests for the TenantTestContext helper."""
445 @pytest.mark.asyncio
446 async def test_context_initializes_tenant_a(self, mock_session: AsyncMock) -> None:
447 """
448 Context should start with Tenant A active.
449 """
450 async with TenantTestContext(mock_session) as ctx:
451 current = await ctx.get_current_tenant()
452 assert current == ctx.tenant_a_id
454 @pytest.mark.asyncio
455 async def test_context_switch_works(self, mock_session: AsyncMock) -> None:
456 """
457 Switching between tenants should update context.
458 """
459 async with TenantTestContext(mock_session) as ctx:
460 # Start with A
461 assert await ctx.get_current_tenant() == ctx.tenant_a_id
463 # Switch to B
464 await ctx.switch_to_tenant_b()
465 assert await ctx.get_current_tenant() == ctx.tenant_b_id
467 # Switch back to A
468 await ctx.switch_to_tenant_a()
469 assert await ctx.get_current_tenant() == ctx.tenant_a_id
471 @pytest.mark.asyncio
472 async def test_context_custom_tenant(self, mock_session: AsyncMock) -> None:
473 """
474 Should be able to switch to arbitrary tenant.
475 """
476 custom_tenant = f"custom-{uuid4().hex[:8]}"
478 async with TenantTestContext(mock_session) as ctx:
479 await ctx.switch_to_tenant(custom_tenant)
480 assert await ctx.get_current_tenant() == custom_tenant
482 @pytest.mark.asyncio
483 async def test_context_history_tracking(self, mock_session: AsyncMock) -> None:
484 """
485 Context should track switch history for debugging.
486 """
487 async with TenantTestContext(mock_session) as ctx:
488 await ctx.switch_to_tenant_b()
489 await ctx.switch_to_tenant_a()
490 await ctx.switch_to_tenant_b()
492 history = ctx.get_context_history()
494 # Should have: enter, switch_b, switch_a, switch_b
495 assert len(history) >= 4
496 assert history[0][0] == "enter"
497 assert history[1][0] == "switch_b"
499 @pytest.mark.asyncio
500 async def test_functional_context_manager(self, mock_session: AsyncMock) -> None:
501 """
502 Functional context manager should work same as class.
503 """
504 async with tenant_test_context(mock_session) as ctx:
505 assert ctx.tenant_a_id is not None
506 assert ctx.tenant_b_id is not None
507 assert await ctx.get_current_tenant() == ctx.tenant_a_id
510# =============================================================================
511# Escape Path Scanner Tests
512# =============================================================================
515class TestEscapePathScanner:
516 """Tests for the escape path scanner."""
518 def test_scanner_detects_raw_select(self) -> None:
519 """
520 Scanner should detect raw SELECT queries without tenant filter.
521 """
522 scanner = TenantEscapePathScanner()
524 query = 'execute("SELECT * FROM users WHERE status = active")'
525 findings = scanner.scan_query(query)
527 assert len(findings) > 0
528 assert any(f.category == "raw_sql" for f in findings)
530 def test_scanner_detects_raw_update(self) -> None:
531 """
532 Scanner should detect raw UPDATE queries as CRITICAL.
533 """
534 scanner = TenantEscapePathScanner()
536 query = 'execute("UPDATE users SET status = inactive WHERE id = 1")'
537 findings = scanner.scan_query(query)
539 critical = [f for f in findings if f.severity == EscapePathSeverity.CRITICAL]
540 assert len(critical) > 0
542 def test_scanner_detects_raw_delete(self) -> None:
543 """
544 Scanner should detect raw DELETE queries as CRITICAL.
545 """
546 scanner = TenantEscapePathScanner()
548 query = 'execute("DELETE FROM users WHERE id = 1")'
549 findings = scanner.scan_query(query)
551 critical = [f for f in findings if f.severity == EscapePathSeverity.CRITICAL]
552 assert len(critical) > 0
554 def test_scanner_allows_safe_queries(self) -> None:
555 """
556 Scanner should not flag queries with tenant filters.
557 """
558 scanner = TenantEscapePathScanner()
560 safe_queries = [
561 "query.filter(tenant_id == ctx.tenant_id)",
562 "query.where(Item.tenant_id == tenant_id)",
563 "await set_tenant_context(session, tenant_id)",
564 "BackgroundTaskTenantContext(tenant_id)",
565 ]
567 for query in safe_queries:
568 findings = scanner.scan_query(query)
569 assert len(findings) == 0, f"Safe query flagged: {query}"
571 def test_scanner_report_formats(self) -> None:
572 """
573 Scanner should generate reports in multiple formats.
574 """
575 scanner = TenantEscapePathScanner()
576 findings = [
577 EscapePathFinding(
578 severity=EscapePathSeverity.CRITICAL,
579 category="raw_sql",
580 description="Test finding",
581 location="test.py:10",
582 remediation="Add tenant filter",
583 compliance_impact=["SOC2 CC6.1"],
584 )
585 ]
587 # Test text format
588 text_report = scanner.generate_report(findings, format="text")
589 assert "CRITICAL" in text_report
590 assert "raw_sql" in text_report
592 # Test JSON format
593 json_report = scanner.generate_report(findings, format="json")
594 assert '"severity": "critical"' in json_report
596 # Test markdown format
597 md_report = scanner.generate_report(findings, format="markdown")
598 assert "## CRITICAL" in md_report
600 def test_custom_patterns(self) -> None:
601 """
602 Scanner should accept custom patterns.
603 """
604 custom_dangerous = [
605 (
606 re.compile(r"my_custom_unsafe_function"),
607 EscapePathSeverity.HIGH,
608 "custom",
609 "Custom unsafe function detected",
610 )
611 ]
613 scanner = TenantEscapePathScanner(custom_dangerous_patterns=custom_dangerous)
615 query = "result = my_custom_unsafe_function(data)"
616 findings = scanner.scan_query(query)
618 assert len(findings) > 0
619 assert any(f.category == "custom" for f in findings)
622# =============================================================================
623# CI/CD Integration Tests
624# =============================================================================
627class TestCIIntegration:
628 """Tests for CI/CD integration utilities."""
630 def test_ci_passes_on_no_findings(self) -> None:
631 """
632 CI should pass when no critical findings.
633 """
634 exit_code = ci_fail_on_findings([])
635 assert exit_code == 0
637 def test_ci_fails_on_critical_findings(self) -> None:
638 """
639 CI should fail when critical findings exist.
640 """
641 findings = [
642 EscapePathFinding(
643 severity=EscapePathSeverity.CRITICAL,
644 category="raw_sql",
645 description="Critical issue",
646 location="test.py:1",
647 remediation="Fix it",
648 )
649 ]
651 exit_code = ci_fail_on_findings(findings)
652 assert exit_code == 1
654 def test_ci_fails_on_high_findings(self) -> None:
655 """
656 CI should fail on HIGH severity by default.
657 """
658 findings = [
659 EscapePathFinding(
660 severity=EscapePathSeverity.HIGH,
661 category="pagination",
662 description="High issue",
663 location="test.py:1",
664 remediation="Fix it",
665 )
666 ]
668 exit_code = ci_fail_on_findings(findings)
669 assert exit_code == 1
671 def test_ci_passes_on_medium_findings(self) -> None:
672 """
673 CI should pass on MEDIUM severity by default.
674 """
675 findings = [
676 EscapePathFinding(
677 severity=EscapePathSeverity.MEDIUM,
678 category="aggregation",
679 description="Medium issue",
680 location="test.py:1",
681 remediation="Consider fixing",
682 )
683 ]
685 exit_code = ci_fail_on_findings(findings)
686 assert exit_code == 0
688 def test_ci_custom_fail_levels(self) -> None:
689 """
690 CI should respect custom fail severity levels.
691 """
692 findings = [
693 EscapePathFinding(
694 severity=EscapePathSeverity.MEDIUM,
695 category="aggregation",
696 description="Medium issue",
697 location="test.py:1",
698 remediation="Fix it",
699 )
700 ]
702 # Default should pass
703 assert ci_fail_on_findings(findings) == 0
705 # Custom level should fail
706 assert ci_fail_on_findings(
707 findings,
708 fail_on={EscapePathSeverity.MEDIUM},
709 ) == 1
712# =============================================================================
713# Compliance Documentation Tests
714# =============================================================================
717class TestComplianceDocumentation:
718 """Tests for compliance documentation utilities."""
720 def test_compliance_documentation_exists(self) -> None:
721 """
722 Compliance documentation should be available.
723 """
724 docs = get_compliance_documentation()
726 assert "SOC2" in docs
727 assert "ISO27001" in docs
728 assert "NIST" in docs
729 assert "CC6.1" in docs
730 assert "AC-4" in docs
732 def test_compliance_mapping_complete(self) -> None:
733 """
734 Compliance mapping should cover key frameworks.
735 """
736 from netrun.rbac.testing import COMPLIANCE_MAPPING
738 assert "SOC2" in COMPLIANCE_MAPPING
739 assert "ISO27001" in COMPLIANCE_MAPPING
740 assert "NIST" in COMPLIANCE_MAPPING
742 # Verify key controls
743 assert "CC6.1" in COMPLIANCE_MAPPING["SOC2"]
744 assert "A.9.4" in COMPLIANCE_MAPPING["ISO27001"]
745 assert "AC-4" in COMPLIANCE_MAPPING["NIST"]
748# =============================================================================
749# Edge Case Tests
750# =============================================================================
753class TestEdgeCases:
754 """Tests for edge cases and corner scenarios."""
756 @pytest.mark.asyncio
757 async def test_empty_tenant_context_handling(self, mock_session: AsyncMock) -> None:
758 """
759 Should handle empty/null tenant context gracefully.
760 """
761 async with TenantTestContext(mock_session) as ctx:
762 await ctx.clear_tenant_context()
764 current = await ctx.get_current_tenant()
765 assert current is None
767 @pytest.mark.asyncio
768 async def test_unicode_tenant_ids(self, mock_session: AsyncMock) -> None:
769 """
770 Should handle unicode characters in tenant IDs.
771 """
772 unicode_tenant = "tenant-\u00e9\u00e8\u00ea"
774 async with TenantTestContext(
775 mock_session,
776 tenant_a_id=unicode_tenant,
777 ) as ctx:
778 assert ctx.tenant_a_id == unicode_tenant
780 @pytest.mark.asyncio
781 async def test_very_long_tenant_ids(self, mock_session: AsyncMock) -> None:
782 """
783 Should handle very long tenant IDs.
784 """
785 long_tenant = "tenant-" + "a" * 200
787 async with TenantTestContext(
788 mock_session,
789 tenant_a_id=long_tenant,
790 ) as ctx:
791 # Should not truncate
792 assert ctx.tenant_a_id == long_tenant
794 def test_query_with_subquery(self) -> None:
795 """
796 Should handle queries with subqueries.
797 """
798 subquery_sql = """
799 SELECT * FROM items WHERE id IN (
800 SELECT item_id FROM orders WHERE tenant_id = 'abc'
801 )
802 """
804 # Should pass because tenant_id is in the query
805 assert_tenant_isolation_sync(subquery_sql)
807 def test_query_with_join(self) -> None:
808 """
809 Should handle JOIN queries.
810 """
811 join_sql = """
812 SELECT i.*, u.name
813 FROM items i
814 JOIN users u ON i.user_id = u.id
815 WHERE i.tenant_id = 'abc'
816 """
818 # Should pass
819 assert_tenant_isolation_sync(join_sql)
822# =============================================================================
823# Integration Test Placeholder
824# =============================================================================
827@pytest.mark.skip(reason="Requires actual PostgreSQL database with RLS")
828class TestPostgreSQLIntegration:
829 """
830 Integration tests against real PostgreSQL with RLS.
832 These tests require a PostgreSQL database with RLS policies configured.
833 Uncomment and configure DATABASE_URL to run.
835 Setup:
836 1. Create test database with RLS-enabled tables
837 2. Configure DATABASE_URL environment variable
838 3. Run: pytest -m integration --run-integration
839 """
841 @pytest.fixture
842 async def db_session(self) -> AsyncGenerator[AsyncSession, None]:
843 """Create a real database session."""
844 import os
846 DATABASE_URL = os.getenv(
847 "TEST_DATABASE_URL",
848 "postgresql+asyncpg://[USERNAME]:[PASSWORD]@localhost/test_db",
849 )
851 engine = create_async_engine(DATABASE_URL)
852 async_session = async_sessionmaker(engine, class_=AsyncSession)
854 async with async_session() as session:
855 yield session
857 @pytest.mark.asyncio
858 @pytest.mark.integration
859 async def test_rls_blocks_cross_tenant_read(self, db_session: AsyncSession) -> None:
860 """
861 Test that actual PostgreSQL RLS blocks cross-tenant reads.
862 """
863 # This would use real database with RLS policies
864 pass
866 @pytest.mark.asyncio
867 @pytest.mark.integration
868 async def test_rls_blocks_cross_tenant_write(self, db_session: AsyncSession) -> None:
869 """
870 Test that actual PostgreSQL RLS blocks cross-tenant writes.
871 """
872 # This would use real database with RLS policies
873 pass
876# =============================================================================
877# Test Runner Configuration
878# =============================================================================
881def pytest_collection_modifyitems(config: Any, items: List[Any]) -> None:
882 """
883 Configure pytest markers for tenant isolation tests.
885 Adds:
886 - tenant_isolation marker for easy filtering
887 - escape_path marker for escape path tests
888 """
889 for item in items:
890 if "tenant_isolation" in item.keywords:
891 item.add_marker(pytest.mark.critical)
893 if item.fspath and "test_tenant_isolation" in str(item.fspath):
894 item.add_marker(pytest.mark.security)
897if __name__ == "__main__":
898 # Allow running tests directly
899 pytest.main([__file__, "-v", "-x"])