Coverage for netrun / rbac / policies.py: 87%

63 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-18 22:20 +0000

1""" 

2PostgreSQL Row-Level Security (RLS) Policy Generator 

3 

4Extracted from: Intirkast migrations/versions/001_initial_schema_creation.py 

5Generates SQL statements for creating RLS policies on multi-tenant tables 

6 

7RLS Security Model: 

8- ENABLE ROW LEVEL SECURITY: Activates RLS on table 

9- tenant_isolation_policy: Enforces tenant isolation using session variable 

10- bypass_rls_policy (optional): Allows superuser/admin bypass for migrations 

11 

12PostgreSQL Session Variables: 

13- app.current_tenant_id: Set by middleware for each request 

14- app.current_user_id: Set for audit logging 

15 

16Example RLS Policy: 

17 CREATE POLICY tenant_isolation_policy ON users 

18 FOR ALL 

19 USING (tenant_id = NULLIF(current_setting('app.current_tenant_id', true), '')::UUID); 

20""" 

21 

22import logging 

23from typing import List, Optional 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class RLSPolicyGenerator: 

29 """ 

30 Generator for PostgreSQL Row-Level Security policies 

31 

32 Extracted from: Intirkast migration patterns (20+ tables with RLS) 

33 """ 

34 

35 @staticmethod 

36 def enable_rls(table_name: str) -> str: 

37 """ 

38 Generate SQL to enable RLS on a table 

39 

40 Args: 

41 table_name: Table name to enable RLS 

42 

43 Returns: 

44 SQL statement 

45 

46 Example: 

47 ALTER TABLE users ENABLE ROW LEVEL SECURITY; 

48 """ 

49 return f"ALTER TABLE {table_name} ENABLE ROW LEVEL SECURITY;" 

50 

51 @staticmethod 

52 def disable_rls(table_name: str) -> str: 

53 """ 

54 Generate SQL to disable RLS on a table 

55 

56 Args: 

57 table_name: Table name to disable RLS 

58 

59 Returns: 

60 SQL statement 

61 

62 Example: 

63 ALTER TABLE users DISABLE ROW LEVEL SECURITY; 

64 """ 

65 return f"ALTER TABLE {table_name} DISABLE ROW LEVEL SECURITY;" 

66 

67 @staticmethod 

68 def create_tenant_isolation_policy( 

69 table_name: str, 

70 tenant_column: str = "tenant_id", 

71 session_variable: str = "app.current_tenant_id", 

72 policy_name: str = "tenant_isolation_policy", 

73 ) -> str: 

74 """ 

75 Generate SQL for tenant isolation policy (FOR ALL operations) 

76 

77 Extracted from: Intirkast migration pattern (used on 19 tables) 

78 

79 Args: 

80 table_name: Table name to apply policy 

81 tenant_column: Column containing tenant_id (default: tenant_id) 

82 session_variable: PostgreSQL session variable (default: app.current_tenant_id) 

83 policy_name: Policy name (default: tenant_isolation_policy) 

84 

85 Returns: 

86 SQL CREATE POLICY statement 

87 

88 Example Output: 

89 CREATE POLICY tenant_isolation_policy ON users 

90 FOR ALL 

91 USING (tenant_id = NULLIF(current_setting('app.current_tenant_id', true), '')::UUID); 

92 

93 Policy Breakdown: 

94 - FOR ALL: Applies to SELECT, INSERT, UPDATE, DELETE 

95 - USING: Filter condition for SELECT/UPDATE/DELETE 

96 - WITH CHECK: Validation for INSERT/UPDATE (same as USING if not specified) 

97 - NULLIF(..., ''): Converts empty string to NULL (handles missing variable) 

98 - current_setting('...', true): true = no error if variable missing 

99 - ::UUID: Casts string to UUID type 

100 

101 Security: 

102 - Enforces tenant isolation at database level 

103 - Cannot be bypassed by application code 

104 - Prevents SQL injection attacks from crossing tenant boundaries 

105 """ 

106 return f"""CREATE POLICY {policy_name} ON {table_name} 

107 FOR ALL 

108 USING ({tenant_column} = NULLIF(current_setting('{session_variable}', true), '')::UUID);""" 

109 

110 @staticmethod 

111 def create_read_only_policy( 

112 table_name: str, 

113 tenant_column: str = "tenant_id", 

114 session_variable: str = "app.current_tenant_id", 

115 policy_name: str = "tenant_read_policy", 

116 ) -> str: 

117 """ 

118 Generate SQL for read-only tenant policy (FOR SELECT only) 

119 

120 Extracted from: Intirkast audit_logs pattern (read-only tenant access) 

121 

122 Args: 

123 table_name: Table name to apply policy 

124 tenant_column: Column containing tenant_id 

125 session_variable: PostgreSQL session variable 

126 policy_name: Policy name 

127 

128 Returns: 

129 SQL CREATE POLICY statement 

130 

131 Example Output: 

132 CREATE POLICY tenant_read_policy ON audit_logs 

133 FOR SELECT 

134 USING (tenant_id = NULLIF(current_setting('app.current_tenant_id', true), '')::UUID); 

135 

136 Use Case: 

137 - Audit logs: Tenants can read their logs but cannot modify/delete 

138 - Immutable data: Prevent UPDATE/DELETE while allowing SELECT 

139 """ 

140 return f"""CREATE POLICY {policy_name} ON {table_name} 

141 FOR SELECT 

142 USING ({tenant_column} = NULLIF(current_setting('{session_variable}', true), '')::UUID);""" 

143 

144 @staticmethod 

145 def create_insert_only_policy( 

146 table_name: str, 

147 tenant_column: str = "tenant_id", 

148 session_variable: str = "app.current_tenant_id", 

149 policy_name: str = "tenant_insert_policy", 

150 ) -> str: 

151 """ 

152 Generate SQL for insert-only tenant policy 

153 

154 Args: 

155 table_name: Table name to apply policy 

156 tenant_column: Column containing tenant_id 

157 session_variable: PostgreSQL session variable 

158 policy_name: Policy name 

159 

160 Returns: 

161 SQL CREATE POLICY statement 

162 

163 Example Output: 

164 CREATE POLICY tenant_insert_policy ON audit_logs 

165 FOR INSERT 

166 WITH CHECK (tenant_id = NULLIF(current_setting('app.current_tenant_id', true), '')::UUID); 

167 

168 Use Case: 

169 - Audit logs: Allow INSERT but prevent modification 

170 - Append-only tables: Prevent UPDATE/DELETE 

171 """ 

172 return f"""CREATE POLICY {policy_name} ON {table_name} 

173 FOR INSERT 

174 WITH CHECK ({tenant_column} = NULLIF(current_setting('{session_variable}', true), '')::UUID);""" 

175 

176 @staticmethod 

177 def drop_policy(table_name: str, policy_name: str) -> str: 

178 """ 

179 Generate SQL to drop a policy 

180 

181 Args: 

182 table_name: Table name 

183 policy_name: Policy name to drop 

184 

185 Returns: 

186 SQL DROP POLICY statement 

187 

188 Example: 

189 DROP POLICY IF EXISTS tenant_isolation_policy ON users; 

190 """ 

191 return f"DROP POLICY IF EXISTS {policy_name} ON {table_name};" 

192 

193 @staticmethod 

194 def generate_rls_for_table( 

195 table_name: str, 

196 tenant_column: str = "tenant_id", 

197 session_variable: str = "app.current_tenant_id", 

198 read_only: bool = False, 

199 ) -> List[str]: 

200 """ 

201 Generate complete RLS setup for a table 

202 

203 Args: 

204 table_name: Table name to protect 

205 tenant_column: Column containing tenant_id 

206 session_variable: PostgreSQL session variable 

207 read_only: If True, only allow SELECT (for audit tables) 

208 

209 Returns: 

210 List of SQL statements to execute 

211 

212 Example: 

213 statements = RLSPolicyGenerator.generate_rls_for_table("users") 

214 for stmt in statements: 

215 await session.execute(text(stmt)) 

216 

217 Output (for normal table): 

218 [ 

219 "ALTER TABLE users ENABLE ROW LEVEL SECURITY;", 

220 "CREATE POLICY tenant_isolation_policy ON users FOR ALL USING (...);" 

221 ] 

222 

223 Output (for read-only table): 

224 [ 

225 "ALTER TABLE audit_logs ENABLE ROW LEVEL SECURITY;", 

226 "CREATE POLICY tenant_read_policy ON audit_logs FOR SELECT USING (...);", 

227 "CREATE POLICY tenant_insert_policy ON audit_logs FOR INSERT WITH CHECK (...);" 

228 ] 

229 """ 

230 statements = [RLSPolicyGenerator.enable_rls(table_name)] 

231 

232 if read_only: 

233 # Read-only: Allow SELECT and INSERT, prevent UPDATE/DELETE 

234 statements.append( 

235 RLSPolicyGenerator.create_read_only_policy( 

236 table_name, tenant_column, session_variable 

237 ) 

238 ) 

239 statements.append( 

240 RLSPolicyGenerator.create_insert_only_policy( 

241 table_name, tenant_column, session_variable 

242 ) 

243 ) 

244 else: 

245 # Normal: Allow all operations (SELECT, INSERT, UPDATE, DELETE) 

246 statements.append( 

247 RLSPolicyGenerator.create_tenant_isolation_policy( 

248 table_name, tenant_column, session_variable 

249 ) 

250 ) 

251 

252 return statements 

253 

254 @staticmethod 

255 def generate_migration_up( 

256 tables: List[str], 

257 tenant_column: str = "tenant_id", 

258 session_variable: str = "app.current_tenant_id", 

259 read_only_tables: Optional[List[str]] = None, 

260 ) -> str: 

261 """ 

262 Generate complete Alembic migration upgrade function 

263 

264 Args: 

265 tables: List of table names to protect with RLS 

266 tenant_column: Column containing tenant_id 

267 session_variable: PostgreSQL session variable 

268 read_only_tables: List of tables that should be read-only (e.g., audit_logs) 

269 

270 Returns: 

271 Complete migration code 

272 

273 Example: 

274 migration = RLSPolicyGenerator.generate_migration_up( 

275 tables=["users", "posts", "comments"], 

276 read_only_tables=["audit_logs"] 

277 ) 

278 print(migration) 

279 

280 Output: 

281 def upgrade() -> None: 

282 # Enable RLS on users 

283 op.execute("ALTER TABLE users ENABLE ROW LEVEL SECURITY") 

284 op.execute(\"\"\" 

285 CREATE POLICY tenant_isolation_policy ON users 

286 FOR ALL 

287 USING (tenant_id = NULLIF(current_setting('app.current_tenant_id', true), '')::UUID) 

288 \"\"\") 

289 ... 

290 """ 

291 read_only_tables = read_only_tables or [] 

292 

293 migration_code = ['def upgrade() -> None:'] 

294 

295 for table in tables: 

296 is_read_only = table in read_only_tables 

297 

298 # Enable RLS 

299 migration_code.append(f' # Enable RLS on {table}') 

300 migration_code.append(f' op.execute("{RLSPolicyGenerator.enable_rls(table)}")') 

301 

302 # Create policies 

303 if is_read_only: 

304 # Read-only table (e.g., audit logs) 

305 migration_code.append(f' op.execute("""') 

306 migration_code.append( 

307 f'{RLSPolicyGenerator.create_read_only_policy(table, tenant_column, session_variable)}' 

308 ) 

309 migration_code.append(f' """)') 

310 migration_code.append(f' op.execute("""') 

311 migration_code.append( 

312 f'{RLSPolicyGenerator.create_insert_only_policy(table, tenant_column, session_variable)}' 

313 ) 

314 migration_code.append(f' """)') 

315 else: 

316 # Normal table 

317 migration_code.append(f' op.execute("""') 

318 migration_code.append( 

319 f'{RLSPolicyGenerator.create_tenant_isolation_policy(table, tenant_column, session_variable)}' 

320 ) 

321 migration_code.append(f' """)') 

322 

323 migration_code.append('') # Blank line between tables 

324 

325 return '\n'.join(migration_code) 

326 

327 @staticmethod 

328 def generate_migration_down( 

329 tables: List[str], 

330 read_only_tables: Optional[List[str]] = None, 

331 ) -> str: 

332 """ 

333 Generate complete Alembic migration downgrade function 

334 

335 Args: 

336 tables: List of table names 

337 read_only_tables: List of tables with read-only policies 

338 

339 Returns: 

340 Complete downgrade migration code 

341 

342 Example: 

343 migration = RLSPolicyGenerator.generate_migration_down( 

344 tables=["users", "posts"], 

345 read_only_tables=["audit_logs"] 

346 ) 

347 """ 

348 read_only_tables = read_only_tables or [] 

349 

350 migration_code = ['def downgrade() -> None:'] 

351 

352 for table in tables: 

353 is_read_only = table in read_only_tables 

354 

355 # Drop policies 

356 if is_read_only: 

357 migration_code.append( 

358 f' op.execute("{RLSPolicyGenerator.drop_policy(table, "tenant_read_policy")}")' 

359 ) 

360 migration_code.append( 

361 f' op.execute("{RLSPolicyGenerator.drop_policy(table, "tenant_insert_policy")}")' 

362 ) 

363 else: 

364 migration_code.append( 

365 f' op.execute("{RLSPolicyGenerator.drop_policy(table, "tenant_isolation_policy")}")' 

366 ) 

367 

368 # Disable RLS 

369 migration_code.append(f' op.execute("{RLSPolicyGenerator.disable_rls(table)}")') 

370 migration_code.append('') # Blank line 

371 

372 return '\n'.join(migration_code)