Coverage for netrun / rbac / policies.py: 87%
63 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-18 22:20 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-18 22:20 +0000
1"""
2PostgreSQL Row-Level Security (RLS) Policy Generator
4Extracted from: Intirkast migrations/versions/001_initial_schema_creation.py
5Generates SQL statements for creating RLS policies on multi-tenant tables
7RLS Security Model:
8- ENABLE ROW LEVEL SECURITY: Activates RLS on table
9- tenant_isolation_policy: Enforces tenant isolation using session variable
10- bypass_rls_policy (optional): Allows superuser/admin bypass for migrations
12PostgreSQL Session Variables:
13- app.current_tenant_id: Set by middleware for each request
14- app.current_user_id: Set for audit logging
16Example RLS Policy:
17 CREATE POLICY tenant_isolation_policy ON users
18 FOR ALL
19 USING (tenant_id = NULLIF(current_setting('app.current_tenant_id', true), '')::UUID);
20"""
22import logging
23from typing import List, Optional
25logger = logging.getLogger(__name__)
28class RLSPolicyGenerator:
29 """
30 Generator for PostgreSQL Row-Level Security policies
32 Extracted from: Intirkast migration patterns (20+ tables with RLS)
33 """
35 @staticmethod
36 def enable_rls(table_name: str) -> str:
37 """
38 Generate SQL to enable RLS on a table
40 Args:
41 table_name: Table name to enable RLS
43 Returns:
44 SQL statement
46 Example:
47 ALTER TABLE users ENABLE ROW LEVEL SECURITY;
48 """
49 return f"ALTER TABLE {table_name} ENABLE ROW LEVEL SECURITY;"
51 @staticmethod
52 def disable_rls(table_name: str) -> str:
53 """
54 Generate SQL to disable RLS on a table
56 Args:
57 table_name: Table name to disable RLS
59 Returns:
60 SQL statement
62 Example:
63 ALTER TABLE users DISABLE ROW LEVEL SECURITY;
64 """
65 return f"ALTER TABLE {table_name} DISABLE ROW LEVEL SECURITY;"
67 @staticmethod
68 def create_tenant_isolation_policy(
69 table_name: str,
70 tenant_column: str = "tenant_id",
71 session_variable: str = "app.current_tenant_id",
72 policy_name: str = "tenant_isolation_policy",
73 ) -> str:
74 """
75 Generate SQL for tenant isolation policy (FOR ALL operations)
77 Extracted from: Intirkast migration pattern (used on 19 tables)
79 Args:
80 table_name: Table name to apply policy
81 tenant_column: Column containing tenant_id (default: tenant_id)
82 session_variable: PostgreSQL session variable (default: app.current_tenant_id)
83 policy_name: Policy name (default: tenant_isolation_policy)
85 Returns:
86 SQL CREATE POLICY statement
88 Example Output:
89 CREATE POLICY tenant_isolation_policy ON users
90 FOR ALL
91 USING (tenant_id = NULLIF(current_setting('app.current_tenant_id', true), '')::UUID);
93 Policy Breakdown:
94 - FOR ALL: Applies to SELECT, INSERT, UPDATE, DELETE
95 - USING: Filter condition for SELECT/UPDATE/DELETE
96 - WITH CHECK: Validation for INSERT/UPDATE (same as USING if not specified)
97 - NULLIF(..., ''): Converts empty string to NULL (handles missing variable)
98 - current_setting('...', true): true = no error if variable missing
99 - ::UUID: Casts string to UUID type
101 Security:
102 - Enforces tenant isolation at database level
103 - Cannot be bypassed by application code
104 - Prevents SQL injection attacks from crossing tenant boundaries
105 """
106 return f"""CREATE POLICY {policy_name} ON {table_name}
107 FOR ALL
108 USING ({tenant_column} = NULLIF(current_setting('{session_variable}', true), '')::UUID);"""
110 @staticmethod
111 def create_read_only_policy(
112 table_name: str,
113 tenant_column: str = "tenant_id",
114 session_variable: str = "app.current_tenant_id",
115 policy_name: str = "tenant_read_policy",
116 ) -> str:
117 """
118 Generate SQL for read-only tenant policy (FOR SELECT only)
120 Extracted from: Intirkast audit_logs pattern (read-only tenant access)
122 Args:
123 table_name: Table name to apply policy
124 tenant_column: Column containing tenant_id
125 session_variable: PostgreSQL session variable
126 policy_name: Policy name
128 Returns:
129 SQL CREATE POLICY statement
131 Example Output:
132 CREATE POLICY tenant_read_policy ON audit_logs
133 FOR SELECT
134 USING (tenant_id = NULLIF(current_setting('app.current_tenant_id', true), '')::UUID);
136 Use Case:
137 - Audit logs: Tenants can read their logs but cannot modify/delete
138 - Immutable data: Prevent UPDATE/DELETE while allowing SELECT
139 """
140 return f"""CREATE POLICY {policy_name} ON {table_name}
141 FOR SELECT
142 USING ({tenant_column} = NULLIF(current_setting('{session_variable}', true), '')::UUID);"""
144 @staticmethod
145 def create_insert_only_policy(
146 table_name: str,
147 tenant_column: str = "tenant_id",
148 session_variable: str = "app.current_tenant_id",
149 policy_name: str = "tenant_insert_policy",
150 ) -> str:
151 """
152 Generate SQL for insert-only tenant policy
154 Args:
155 table_name: Table name to apply policy
156 tenant_column: Column containing tenant_id
157 session_variable: PostgreSQL session variable
158 policy_name: Policy name
160 Returns:
161 SQL CREATE POLICY statement
163 Example Output:
164 CREATE POLICY tenant_insert_policy ON audit_logs
165 FOR INSERT
166 WITH CHECK (tenant_id = NULLIF(current_setting('app.current_tenant_id', true), '')::UUID);
168 Use Case:
169 - Audit logs: Allow INSERT but prevent modification
170 - Append-only tables: Prevent UPDATE/DELETE
171 """
172 return f"""CREATE POLICY {policy_name} ON {table_name}
173 FOR INSERT
174 WITH CHECK ({tenant_column} = NULLIF(current_setting('{session_variable}', true), '')::UUID);"""
176 @staticmethod
177 def drop_policy(table_name: str, policy_name: str) -> str:
178 """
179 Generate SQL to drop a policy
181 Args:
182 table_name: Table name
183 policy_name: Policy name to drop
185 Returns:
186 SQL DROP POLICY statement
188 Example:
189 DROP POLICY IF EXISTS tenant_isolation_policy ON users;
190 """
191 return f"DROP POLICY IF EXISTS {policy_name} ON {table_name};"
193 @staticmethod
194 def generate_rls_for_table(
195 table_name: str,
196 tenant_column: str = "tenant_id",
197 session_variable: str = "app.current_tenant_id",
198 read_only: bool = False,
199 ) -> List[str]:
200 """
201 Generate complete RLS setup for a table
203 Args:
204 table_name: Table name to protect
205 tenant_column: Column containing tenant_id
206 session_variable: PostgreSQL session variable
207 read_only: If True, only allow SELECT (for audit tables)
209 Returns:
210 List of SQL statements to execute
212 Example:
213 statements = RLSPolicyGenerator.generate_rls_for_table("users")
214 for stmt in statements:
215 await session.execute(text(stmt))
217 Output (for normal table):
218 [
219 "ALTER TABLE users ENABLE ROW LEVEL SECURITY;",
220 "CREATE POLICY tenant_isolation_policy ON users FOR ALL USING (...);"
221 ]
223 Output (for read-only table):
224 [
225 "ALTER TABLE audit_logs ENABLE ROW LEVEL SECURITY;",
226 "CREATE POLICY tenant_read_policy ON audit_logs FOR SELECT USING (...);",
227 "CREATE POLICY tenant_insert_policy ON audit_logs FOR INSERT WITH CHECK (...);"
228 ]
229 """
230 statements = [RLSPolicyGenerator.enable_rls(table_name)]
232 if read_only:
233 # Read-only: Allow SELECT and INSERT, prevent UPDATE/DELETE
234 statements.append(
235 RLSPolicyGenerator.create_read_only_policy(
236 table_name, tenant_column, session_variable
237 )
238 )
239 statements.append(
240 RLSPolicyGenerator.create_insert_only_policy(
241 table_name, tenant_column, session_variable
242 )
243 )
244 else:
245 # Normal: Allow all operations (SELECT, INSERT, UPDATE, DELETE)
246 statements.append(
247 RLSPolicyGenerator.create_tenant_isolation_policy(
248 table_name, tenant_column, session_variable
249 )
250 )
252 return statements
254 @staticmethod
255 def generate_migration_up(
256 tables: List[str],
257 tenant_column: str = "tenant_id",
258 session_variable: str = "app.current_tenant_id",
259 read_only_tables: Optional[List[str]] = None,
260 ) -> str:
261 """
262 Generate complete Alembic migration upgrade function
264 Args:
265 tables: List of table names to protect with RLS
266 tenant_column: Column containing tenant_id
267 session_variable: PostgreSQL session variable
268 read_only_tables: List of tables that should be read-only (e.g., audit_logs)
270 Returns:
271 Complete migration code
273 Example:
274 migration = RLSPolicyGenerator.generate_migration_up(
275 tables=["users", "posts", "comments"],
276 read_only_tables=["audit_logs"]
277 )
278 print(migration)
280 Output:
281 def upgrade() -> None:
282 # Enable RLS on users
283 op.execute("ALTER TABLE users ENABLE ROW LEVEL SECURITY")
284 op.execute(\"\"\"
285 CREATE POLICY tenant_isolation_policy ON users
286 FOR ALL
287 USING (tenant_id = NULLIF(current_setting('app.current_tenant_id', true), '')::UUID)
288 \"\"\")
289 ...
290 """
291 read_only_tables = read_only_tables or []
293 migration_code = ['def upgrade() -> None:']
295 for table in tables:
296 is_read_only = table in read_only_tables
298 # Enable RLS
299 migration_code.append(f' # Enable RLS on {table}')
300 migration_code.append(f' op.execute("{RLSPolicyGenerator.enable_rls(table)}")')
302 # Create policies
303 if is_read_only:
304 # Read-only table (e.g., audit logs)
305 migration_code.append(f' op.execute("""')
306 migration_code.append(
307 f'{RLSPolicyGenerator.create_read_only_policy(table, tenant_column, session_variable)}'
308 )
309 migration_code.append(f' """)')
310 migration_code.append(f' op.execute("""')
311 migration_code.append(
312 f'{RLSPolicyGenerator.create_insert_only_policy(table, tenant_column, session_variable)}'
313 )
314 migration_code.append(f' """)')
315 else:
316 # Normal table
317 migration_code.append(f' op.execute("""')
318 migration_code.append(
319 f'{RLSPolicyGenerator.create_tenant_isolation_policy(table, tenant_column, session_variable)}'
320 )
321 migration_code.append(f' """)')
323 migration_code.append('') # Blank line between tables
325 return '\n'.join(migration_code)
327 @staticmethod
328 def generate_migration_down(
329 tables: List[str],
330 read_only_tables: Optional[List[str]] = None,
331 ) -> str:
332 """
333 Generate complete Alembic migration downgrade function
335 Args:
336 tables: List of table names
337 read_only_tables: List of tables with read-only policies
339 Returns:
340 Complete downgrade migration code
342 Example:
343 migration = RLSPolicyGenerator.generate_migration_down(
344 tables=["users", "posts"],
345 read_only_tables=["audit_logs"]
346 )
347 """
348 read_only_tables = read_only_tables or []
350 migration_code = ['def downgrade() -> None:']
352 for table in tables:
353 is_read_only = table in read_only_tables
355 # Drop policies
356 if is_read_only:
357 migration_code.append(
358 f' op.execute("{RLSPolicyGenerator.drop_policy(table, "tenant_read_policy")}")'
359 )
360 migration_code.append(
361 f' op.execute("{RLSPolicyGenerator.drop_policy(table, "tenant_insert_policy")}")'
362 )
363 else:
364 migration_code.append(
365 f' op.execute("{RLSPolicyGenerator.drop_policy(table, "tenant_isolation_policy")}")'
366 )
368 # Disable RLS
369 migration_code.append(f' op.execute("{RLSPolicyGenerator.disable_rls(table)}")')
370 migration_code.append('') # Blank line
372 return '\n'.join(migration_code)