Coverage for test\test_form_parser.py: 99%

230 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-21 20:31 +0100

1import pytest 

2import asyncio 

3from typing import AsyncGenerator, Optional 

4from nexios import get_application, NexiosApp 

5from nexios.http import Request, Response 

6from nexios.structs import FormData, Headers, UploadedFile 

7from nexios.http.formparsers import FormParser, MultiPartParser, MultiPartException 

8from nexios.testing import Client 

9 

10# Set default limits for MultiPartParser if they don't exist 

11if not hasattr(MultiPartParser, "max_file_size"): 

12 MultiPartParser.max_file_size = 1024 * 1024 # 1MB 

13if not hasattr(MultiPartParser, "max_fields"): 

14 MultiPartParser.max_fields = 1000 

15if not hasattr(MultiPartParser, "max_files"): 

16 MultiPartParser.max_files = 1000 

17 

18# Create an application instance for testing 

19app = get_application() 

20 

21 

22# Define test endpoints for form submission 

23@app.post("/form") 

24async def handle_form(request: Request, response: Response): 

25 form = await request.form_data 

26 return response.json({"fields": dict(form.items())}) 

27 

28 

29@app.post("/upload") 

30async def handle_upload(request: Request, response: Response): 

31 form = await request.form_data 

32 files = {} 

33 fields = {} 

34 

35 for key, value in form.items(): 

36 if isinstance(value, UploadedFile): 

37 content = await value.read() 

38 files[key] = { 

39 "filename": value.filename, 

40 "content": content.decode("utf-8", errors="replace"), 

41 "size": value.size, 

42 "content_type": value.headers.get("content-type", ""), 

43 } 

44 else: 

45 fields[key] = value 

46 

47 return response.json({"files": files, "fields": fields}) 

48 

49 

50# Helper function to create a stream from bytes for testing parsers 

51async def create_form_stream(data: bytes) -> AsyncGenerator[bytes, None]: 

52 yield data 

53 

54 

55# Pytest fixture for client 

56@pytest.fixture 

57async def client(): 

58 async with Client(app) as client: 

59 yield client 

60 

61 

62# Tests for basic form data parsing 

63async def test_basic_form_parsing(): 

64 # Create simple form data 

65 form_data = b"name=John&age=30&email=john%40example.com" 

66 headers = Headers( 

67 [ 

68 (b"content-type", b"application/x-www-form-urlencoded"), 

69 (b"content-length", str(len(form_data)).encode()), 

70 ] 

71 ) 

72 

73 # Parse the form data 

74 parser = FormParser(headers, create_form_stream(form_data)) 

75 form = await parser.parse() 

76 

77 # Verify parsed data 

78 assert isinstance(form, FormData) 

79 assert dict(form.items()) == { 

80 "name": "John", 

81 "age": "30", 

82 "email": "john@example.com", 

83 } 

84 

85 

86async def test_empty_form(): 

87 form_data = b"" 

88 headers = Headers( 

89 [ 

90 (b"content-type", b"application/x-www-form-urlencoded"), 

91 (b"content-length", b"0"), 

92 ] 

93 ) 

94 

95 parser = FormParser(headers, create_form_stream(form_data)) 

96 form = await parser.parse() 

97 

98 assert isinstance(form, FormData) 

99 assert len(form) == 0 

100 

101 

102async def test_form_with_special_characters(): 

103 # Test URL-encoded special characters 

104 form_data = b"message=Hello+World%21&symbols=%24%26%2B%2C%3A%3B%3D%3F%40" 

105 headers = Headers( 

106 [ 

107 (b"content-type", b"application/x-www-form-urlencoded"), 

108 (b"content-length", str(len(form_data)).encode()), 

109 ] 

110 ) 

111 

112 parser = FormParser(headers, create_form_stream(form_data)) 

113 form = await parser.parse() 

114 

115 assert dict(form.items()) == {"message": "Hello World!", "symbols": "$&+,:;=?@"} 

116 

117 

118# Tests for multipart form data 

119async def test_multipart_text_fields(): 

120 boundary = b"boundary123" 

121 form_data = ( 

122 b"--" + boundary + b"\r\n" 

123 b'Content-Disposition: form-data; name="field1"\r\n\r\n' 

124 b"value1\r\n" 

125 b"--" + boundary + b"\r\n" 

126 b'Content-Disposition: form-data; name="field2"\r\n\r\n' 

127 b"value2\r\n" 

128 b"--" + boundary + b"--\r\n" 

129 ) 

130 

131 headers = Headers( 

132 [ 

133 (b"content-type", b"multipart/form-data; boundary=" + boundary), 

134 (b"content-length", str(len(form_data)).encode()), 

135 ] 

136 ) 

137 

138 parser = MultiPartParser(headers, create_form_stream(form_data)) 

139 form = await parser.parse() 

140 

141 assert dict(form.items()) == {"field1": "value1", "field2": "value2"} 

142 

143 

144async def test_multipart_file_upload(): 

145 boundary = b"boundary123" 

146 file_content = b"Hello, this is a test file content!" 

147 form_data = ( 

148 b"--" + boundary + b"\r\n" 

149 b'Content-Disposition: form-data; name="file1"; filename="test.txt"\r\n' 

150 b"Content-Type: text/plain\r\n\r\n" + file_content + b"\r\n" 

151 b"--" + boundary + b"--\r\n" 

152 ) 

153 

154 headers = Headers( 

155 [ 

156 (b"content-type", b"multipart/form-data; boundary=" + boundary), 

157 (b"content-length", str(len(form_data)).encode()), 

158 ] 

159 ) 

160 

161 parser = MultiPartParser(headers, create_form_stream(form_data)) 

162 form = await parser.parse() 

163 

164 assert len(form) == 1 

165 file = form.get("file1") 

166 assert isinstance(file, UploadedFile) 

167 assert file.filename == "test.txt" 

168 content = await file.read() 

169 assert content == file_content 

170 

171 

172async def test_multipart_mixed_content(): 

173 boundary = b"boundary123" 

174 file_content = b"Test file content" 

175 form_data = ( 

176 b"--" + boundary + b"\r\n" 

177 b'Content-Disposition: form-data; name="field1"\r\n\r\n' 

178 b"text_value\r\n" 

179 b"--" + boundary + b"\r\n" 

180 b'Content-Disposition: form-data; name="file1"; filename="test.txt"\r\n' 

181 b"Content-Type: text/plain\r\n\r\n" + file_content + b"\r\n" 

182 b"--" + boundary + b"--\r\n" 

183 ) 

184 

185 headers = Headers( 

186 [ 

187 (b"content-type", b"multipart/form-data; boundary=" + boundary), 

188 (b"content-length", str(len(form_data)).encode()), 

189 ] 

190 ) 

191 

192 parser = MultiPartParser(headers, create_form_stream(form_data)) 

193 form = await parser.parse() 

194 

195 assert len(form) == 2 

196 assert form.get("field1") == "text_value" 

197 file = form.get("file1") 

198 assert isinstance(file, UploadedFile) 

199 content = await file.read() 

200 assert content == file_content 

201 

202 

203async def test_multipart_multiple_files(): 

204 boundary = b"boundary123" 

205 file1_content = b"Content of file 1" 

206 file2_content = b"Content of file 2" 

207 form_data = ( 

208 b"--" + boundary + b"\r\n" 

209 b'Content-Disposition: form-data; name="file1"; filename="file1.txt"\r\n' 

210 b"Content-Type: text/plain\r\n\r\n" + file1_content + b"\r\n" 

211 b"--" + boundary + b"\r\n" 

212 b'Content-Disposition: form-data; name="file2"; filename="file2.txt"\r\n' 

213 b"Content-Type: text/plain\r\n\r\n" + file2_content + b"\r\n" 

214 b"--" + boundary + b"--\r\n" 

215 ) 

216 

217 headers = Headers( 

218 [ 

219 (b"content-type", b"multipart/form-data; boundary=" + boundary), 

220 (b"content-length", str(len(form_data)).encode()), 

221 ] 

222 ) 

223 

224 parser = MultiPartParser(headers, create_form_stream(form_data)) 

225 form = await parser.parse() 

226 

227 assert len(form) == 2 

228 file1 = form.get("file1") 

229 file2 = form.get("file2") 

230 

231 assert isinstance(file1, UploadedFile) 

232 assert isinstance(file2, UploadedFile) 

233 

234 assert file1.filename == "file1.txt" 

235 assert file2.filename == "file2.txt" 

236 

237 content1 = await file1.read() 

238 content2 = await file2.read() 

239 

240 assert content1 == file1_content 

241 assert content2 == file2_content 

242 

243 

244# Edge cases tests 

245async def test_max_file_size_limit(): 

246 boundary = b"boundary123" 

247 # Make file content slightly larger than the max file size 

248 # This assumes MultiPartParser.max_file_size is accessible and not too large for testing 

249 file_size = 1024 * 10 # Use a smaller size for testing 

250 original_max_size = MultiPartParser.max_file_size 

251 MultiPartParser.max_file_size = file_size 

252 

253 try: 

254 file_content = b"x" * (file_size + 100) # Exceed max file size 

255 form_data = ( 

256 b"--" + boundary + b"\r\n" 

257 b'Content-Disposition: form-data; name="file"; filename="large.txt"\r\n' 

258 b"Content-Type: text/plain\r\n\r\n" + file_content + b"\r\n" 

259 b"--" + boundary + b"--\r\n" 

260 ) 

261 

262 headers = Headers( 

263 [ 

264 (b"content-type", b"multipart/form-data; boundary=" + boundary), 

265 (b"content-length", str(len(form_data)).encode()), 

266 ] 

267 ) 

268 

269 parser = MultiPartParser(headers, create_form_stream(form_data)) 

270 

271 # This should raise an exception due to file size 

272 with pytest.raises(MultiPartException): 

273 await parser.parse() 

274 finally: 

275 # Restore original max file size 

276 MultiPartParser.max_file_size = original_max_size 

277 

278 

279async def test_max_field_count_limit(): 

280 # Test with more fields than max_fields 

281 max_fields = 5 

282 # Store original value 

283 original_max_fields = MultiPartParser.max_fields 

284 # Temporarily set max_fields to test value 

285 MultiPartParser.max_fields = max_fields 

286 

287 try: 

288 # Create a multipart form with more than max_fields 

289 boundary = b"boundary123" 

290 parts = [] 

291 for i in range(max_fields + 2): 

292 parts.append( 

293 b"--" 

294 + boundary 

295 + b"\r\n" 

296 + f'Content-Disposition: form-data; name="field{i}"\r\n\r\n'.encode() 

297 + f"value{i}\r\n".encode() 

298 ) 

299 parts.append(b"--" + boundary + b"--\r\n") 

300 

301 form_data = b"".join(parts) 

302 

303 headers = Headers( 

304 [ 

305 (b"content-type", b"multipart/form-data; boundary=" + boundary), 

306 (b"content-length", str(len(form_data)).encode()), 

307 ] 

308 ) 

309 

310 parser = MultiPartParser( 

311 headers, create_form_stream(form_data), max_fields=max_fields 

312 ) 

313 

314 # This should raise an exception due to too many fields 

315 with pytest.raises(MultiPartException): 

316 await parser.parse() 

317 finally: 

318 # Restore original max fields 

319 MultiPartParser.max_fields = original_max_fields 

320 

321 

322async def test_character_encoding_handling(): 

323 # Test with non-ASCII characters 

324 boundary = b"boundary123" 

325 unicode_text = "Hello 世界 こんにちは" # Unicode text 

326 form_data = ( 

327 b"--" + boundary + b"\r\n" 

328 b'Content-Disposition: form-data; name="text"\r\n' 

329 b"Content-Type: text/plain; charset=utf-8\r\n\r\n" 

330 + unicode_text.encode("utf-8") 

331 + b"\r\n" 

332 b"--" + boundary + b"--\r\n" 

333 ) 

334 

335 headers = Headers( 

336 [ 

337 ( 

338 b"content-type", 

339 b"multipart/form-data; boundary=" + boundary + b"; charset=utf-8", 

340 ), 

341 (b"content-length", str(len(form_data)).encode()), 

342 ] 

343 ) 

344 

345 parser = MultiPartParser(headers, create_form_stream(form_data)) 

346 form = await parser.parse() 

347 

348 assert form.get("text") == unicode_text 

349 

350 

351# Error handling tests 

352async def test_malformed_headers(): 

353 # Test with malformed content-disposition header 

354 boundary = b"boundary123" 

355 form_data = ( 

356 b"--" + boundary + b"\r\n" 

357 b"Invalid-Header: value\r\n\r\n" # Missing Content-Disposition 

358 b"some content\r\n" 

359 b"--" + boundary + b"--\r\n" 

360 ) 

361 

362 headers = Headers( 

363 [ 

364 (b"content-type", b"multipart/form-data; boundary=" + boundary), 

365 (b"content-length", str(len(form_data)).encode()), 

366 ] 

367 ) 

368 

369 parser = MultiPartParser(headers, create_form_stream(form_data)) 

370 

371 # Should raise an exception due to missing Content-Disposition header 

372 with pytest.raises(Exception): 

373 await parser.parse() 

374 

375 

376async def test_invalid_boundary(): 

377 # Test with incorrect boundary in form data 

378 boundary = b"correct_boundary" 

379 wrong_boundary = b"wrong_boundary" 

380 

381 form_data = ( 

382 b"--" + wrong_boundary + b"\r\n" # Using wrong boundary 

383 b'Content-Disposition: form-data; name="field1"\r\n\r\n' 

384 b"value1\r\n" 

385 b"--" + wrong_boundary + b"--\r\n" 

386 ) 

387 

388 headers = Headers( 

389 raw=[ 

390 ( 

391 b"content-type", 

392 b"multipart/form-data; boundary=" + boundary, 

393 ), # Different boundary 

394 (b"content-length", str(len(form_data)).encode()), 

395 ] 

396 ) 

397 

398 parser = MultiPartParser(headers, create_form_stream(form_data)) 

399 

400 # Should fail to parse properly 

401 with pytest.raises(Exception): 

402 await parser.parse() 

403 

404 

405# Integration tests with client 

406async def test_simple_form_submission(client): 

407 # Test basic form submission 

408 data = { 

409 "name": "John Doe", 

410 "email": "john@example.com", 

411 "message": "This is a test message", 

412 } 

413 

414 response = await client.post("/form", data=data) 

415 assert response.status_code == 200 

416 

417 result = response.json() 

418 assert result["fields"] == data 

419 

420 

421async def test_form_with_special_chars_integration(client): 

422 # Test form with special characters 

423 data = { 

424 "name": "User Name", 

425 "message": "Special characters: !@#$%^&*()_+", 

426 "unicode": "Hello 世界 こんにちは", 

427 } 

428 

429 response = await client.post("/form", data=data) 

430 assert response.status_code == 200 

431 

432 result = response.json() 

433 assert result["fields"] == data 

434 

435 

436async def test_single_file_upload(client): 

437 # Test uploading a single file with metadata 

438 file_content = "This is a test file content." 

439 files = {"document": ("test.txt", file_content, "text/plain")} 

440 data = {"description": "Test file upload", "category": "documentation"} 

441 

442 response = await client.post("/upload", files=files, data=data) 

443 assert response.status_code == 200 

444 

445 result = response.json() 

446 assert "files" in result 

447 assert "fields" in result 

448 

449 # Check file data 

450 assert "document" in result["files"] 

451 uploaded_file = result["files"]["document"] 

452 assert uploaded_file["filename"] == "test.txt" 

453 assert uploaded_file["content"] == file_content 

454 assert uploaded_file["content_type"] == "text/plain" 

455 

456 # Check form fields 

457 assert result["fields"]["description"] == "Test file upload" 

458 assert result["fields"]["category"] == "documentation" 

459 

460 

461async def test_multiple_file_uploads(client): 

462 # Test uploading multiple files of different types 

463 files = { 

464 "text_file": ("document.txt", "Text file content", "text/plain"), 

465 "json_file": ("data.json", '{"key": "value"}', "application/json"), 

466 "csv_file": ("data.csv", "name,age\nJohn,30\nJane,25", "text/csv"), 

467 } 

468 data = {"description": "Multiple files upload test"} 

469 

470 response = await client.post("/upload", files=files, data=data) 

471 assert response.status_code == 200 

472 

473 result = response.json() 

474 assert len(result["files"]) == 3 

475 

476 # Check text file 

477 assert "text_file" in result["files"] 

478 text_file = result["files"]["text_file"] 

479 assert text_file["filename"] == "document.txt" 

480 assert text_file["content"] == "Text file content" 

481 assert text_file["content_type"] == "text/plain" 

482 

483 # Check JSON file 

484 assert "json_file" in result["files"] 

485 json_file = result["files"]["json_file"] 

486 assert json_file["filename"] == "data.json" 

487 assert json_file["content"] == '{"key": "value"}' 

488 assert json_file["content_type"] == "application/json" 

489 

490 # Check CSV file 

491 assert "csv_file" in result["files"] 

492 csv_file = result["files"]["csv_file"] 

493 assert csv_file["filename"] == "data.csv" 

494 assert csv_file["content"] == "name,age\nJohn,30\nJane,25" 

495 assert csv_file["content_type"] == "text/csv" 

496 

497 # Check form field 

498 assert result["fields"]["description"] == "Multiple files upload test" 

499 

500 

501async def test_unicode_filename_and_content(client): 

502 # Test handling of Unicode in filenames and content 

503 filename = "测试文件.txt" 

504 content = "Unicode content: こんにちは世界 - Hello World!" 

505 

506 files = {"unicode_file": (filename, content, "text/plain; charset=utf-8")} 

507 

508 response = await client.post("/upload", files=files) 

509 assert response.status_code == 200 

510 

511 result = response.json() 

512 assert "unicode_file" in result["files"] 

513 uploaded_file = result["files"]["unicode_file"] 

514 assert uploaded_file["filename"] == filename 

515 assert uploaded_file["content"] == content 

516 assert "text/plain" in uploaded_file["content_type"] 

517 

518 

519async def test_binary_file_upload(client): 

520 # Test binary file upload (using simplified binary content for test) 

521 # In a real scenario, this would be actual binary data 

522 binary_content = b"Binary content with \x00\x01\x02\x03 bytes" 

523 

524 files = {"binary_file": ("data.bin", binary_content, "application/octet-stream")} 

525 

526 response = await client.post("/upload", files=files) 

527 assert response.status_code == 200 

528 

529 result = response.json() 

530 assert "binary_file" in result["files"] 

531 # Note: The test expects the binary content to be decoded as utf-8 with replacement 

532 # characters for invalid sequences - this is just for the test's sake 

533 # In a real app, you might want to handle binary data differently