Coverage for test\test_form_parser.py: 99%
230 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-21 20:31 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-21 20:31 +0100
1import pytest
2import asyncio
3from typing import AsyncGenerator, Optional
4from nexios import get_application, NexiosApp
5from nexios.http import Request, Response
6from nexios.structs import FormData, Headers, UploadedFile
7from nexios.http.formparsers import FormParser, MultiPartParser, MultiPartException
8from nexios.testing import Client
10# Set default limits for MultiPartParser if they don't exist
11if not hasattr(MultiPartParser, "max_file_size"):
12 MultiPartParser.max_file_size = 1024 * 1024 # 1MB
13if not hasattr(MultiPartParser, "max_fields"):
14 MultiPartParser.max_fields = 1000
15if not hasattr(MultiPartParser, "max_files"):
16 MultiPartParser.max_files = 1000
18# Create an application instance for testing
19app = get_application()
22# Define test endpoints for form submission
23@app.post("/form")
24async def handle_form(request: Request, response: Response):
25 form = await request.form_data
26 return response.json({"fields": dict(form.items())})
29@app.post("/upload")
30async def handle_upload(request: Request, response: Response):
31 form = await request.form_data
32 files = {}
33 fields = {}
35 for key, value in form.items():
36 if isinstance(value, UploadedFile):
37 content = await value.read()
38 files[key] = {
39 "filename": value.filename,
40 "content": content.decode("utf-8", errors="replace"),
41 "size": value.size,
42 "content_type": value.headers.get("content-type", ""),
43 }
44 else:
45 fields[key] = value
47 return response.json({"files": files, "fields": fields})
50# Helper function to create a stream from bytes for testing parsers
51async def create_form_stream(data: bytes) -> AsyncGenerator[bytes, None]:
52 yield data
55# Pytest fixture for client
56@pytest.fixture
57async def client():
58 async with Client(app) as client:
59 yield client
62# Tests for basic form data parsing
63async def test_basic_form_parsing():
64 # Create simple form data
65 form_data = b"name=John&age=30&email=john%40example.com"
66 headers = Headers(
67 [
68 (b"content-type", b"application/x-www-form-urlencoded"),
69 (b"content-length", str(len(form_data)).encode()),
70 ]
71 )
73 # Parse the form data
74 parser = FormParser(headers, create_form_stream(form_data))
75 form = await parser.parse()
77 # Verify parsed data
78 assert isinstance(form, FormData)
79 assert dict(form.items()) == {
80 "name": "John",
81 "age": "30",
82 "email": "john@example.com",
83 }
86async def test_empty_form():
87 form_data = b""
88 headers = Headers(
89 [
90 (b"content-type", b"application/x-www-form-urlencoded"),
91 (b"content-length", b"0"),
92 ]
93 )
95 parser = FormParser(headers, create_form_stream(form_data))
96 form = await parser.parse()
98 assert isinstance(form, FormData)
99 assert len(form) == 0
102async def test_form_with_special_characters():
103 # Test URL-encoded special characters
104 form_data = b"message=Hello+World%21&symbols=%24%26%2B%2C%3A%3B%3D%3F%40"
105 headers = Headers(
106 [
107 (b"content-type", b"application/x-www-form-urlencoded"),
108 (b"content-length", str(len(form_data)).encode()),
109 ]
110 )
112 parser = FormParser(headers, create_form_stream(form_data))
113 form = await parser.parse()
115 assert dict(form.items()) == {"message": "Hello World!", "symbols": "$&+,:;=?@"}
118# Tests for multipart form data
119async def test_multipart_text_fields():
120 boundary = b"boundary123"
121 form_data = (
122 b"--" + boundary + b"\r\n"
123 b'Content-Disposition: form-data; name="field1"\r\n\r\n'
124 b"value1\r\n"
125 b"--" + boundary + b"\r\n"
126 b'Content-Disposition: form-data; name="field2"\r\n\r\n'
127 b"value2\r\n"
128 b"--" + boundary + b"--\r\n"
129 )
131 headers = Headers(
132 [
133 (b"content-type", b"multipart/form-data; boundary=" + boundary),
134 (b"content-length", str(len(form_data)).encode()),
135 ]
136 )
138 parser = MultiPartParser(headers, create_form_stream(form_data))
139 form = await parser.parse()
141 assert dict(form.items()) == {"field1": "value1", "field2": "value2"}
144async def test_multipart_file_upload():
145 boundary = b"boundary123"
146 file_content = b"Hello, this is a test file content!"
147 form_data = (
148 b"--" + boundary + b"\r\n"
149 b'Content-Disposition: form-data; name="file1"; filename="test.txt"\r\n'
150 b"Content-Type: text/plain\r\n\r\n" + file_content + b"\r\n"
151 b"--" + boundary + b"--\r\n"
152 )
154 headers = Headers(
155 [
156 (b"content-type", b"multipart/form-data; boundary=" + boundary),
157 (b"content-length", str(len(form_data)).encode()),
158 ]
159 )
161 parser = MultiPartParser(headers, create_form_stream(form_data))
162 form = await parser.parse()
164 assert len(form) == 1
165 file = form.get("file1")
166 assert isinstance(file, UploadedFile)
167 assert file.filename == "test.txt"
168 content = await file.read()
169 assert content == file_content
172async def test_multipart_mixed_content():
173 boundary = b"boundary123"
174 file_content = b"Test file content"
175 form_data = (
176 b"--" + boundary + b"\r\n"
177 b'Content-Disposition: form-data; name="field1"\r\n\r\n'
178 b"text_value\r\n"
179 b"--" + boundary + b"\r\n"
180 b'Content-Disposition: form-data; name="file1"; filename="test.txt"\r\n'
181 b"Content-Type: text/plain\r\n\r\n" + file_content + b"\r\n"
182 b"--" + boundary + b"--\r\n"
183 )
185 headers = Headers(
186 [
187 (b"content-type", b"multipart/form-data; boundary=" + boundary),
188 (b"content-length", str(len(form_data)).encode()),
189 ]
190 )
192 parser = MultiPartParser(headers, create_form_stream(form_data))
193 form = await parser.parse()
195 assert len(form) == 2
196 assert form.get("field1") == "text_value"
197 file = form.get("file1")
198 assert isinstance(file, UploadedFile)
199 content = await file.read()
200 assert content == file_content
203async def test_multipart_multiple_files():
204 boundary = b"boundary123"
205 file1_content = b"Content of file 1"
206 file2_content = b"Content of file 2"
207 form_data = (
208 b"--" + boundary + b"\r\n"
209 b'Content-Disposition: form-data; name="file1"; filename="file1.txt"\r\n'
210 b"Content-Type: text/plain\r\n\r\n" + file1_content + b"\r\n"
211 b"--" + boundary + b"\r\n"
212 b'Content-Disposition: form-data; name="file2"; filename="file2.txt"\r\n'
213 b"Content-Type: text/plain\r\n\r\n" + file2_content + b"\r\n"
214 b"--" + boundary + b"--\r\n"
215 )
217 headers = Headers(
218 [
219 (b"content-type", b"multipart/form-data; boundary=" + boundary),
220 (b"content-length", str(len(form_data)).encode()),
221 ]
222 )
224 parser = MultiPartParser(headers, create_form_stream(form_data))
225 form = await parser.parse()
227 assert len(form) == 2
228 file1 = form.get("file1")
229 file2 = form.get("file2")
231 assert isinstance(file1, UploadedFile)
232 assert isinstance(file2, UploadedFile)
234 assert file1.filename == "file1.txt"
235 assert file2.filename == "file2.txt"
237 content1 = await file1.read()
238 content2 = await file2.read()
240 assert content1 == file1_content
241 assert content2 == file2_content
244# Edge cases tests
245async def test_max_file_size_limit():
246 boundary = b"boundary123"
247 # Make file content slightly larger than the max file size
248 # This assumes MultiPartParser.max_file_size is accessible and not too large for testing
249 file_size = 1024 * 10 # Use a smaller size for testing
250 original_max_size = MultiPartParser.max_file_size
251 MultiPartParser.max_file_size = file_size
253 try:
254 file_content = b"x" * (file_size + 100) # Exceed max file size
255 form_data = (
256 b"--" + boundary + b"\r\n"
257 b'Content-Disposition: form-data; name="file"; filename="large.txt"\r\n'
258 b"Content-Type: text/plain\r\n\r\n" + file_content + b"\r\n"
259 b"--" + boundary + b"--\r\n"
260 )
262 headers = Headers(
263 [
264 (b"content-type", b"multipart/form-data; boundary=" + boundary),
265 (b"content-length", str(len(form_data)).encode()),
266 ]
267 )
269 parser = MultiPartParser(headers, create_form_stream(form_data))
271 # This should raise an exception due to file size
272 with pytest.raises(MultiPartException):
273 await parser.parse()
274 finally:
275 # Restore original max file size
276 MultiPartParser.max_file_size = original_max_size
279async def test_max_field_count_limit():
280 # Test with more fields than max_fields
281 max_fields = 5
282 # Store original value
283 original_max_fields = MultiPartParser.max_fields
284 # Temporarily set max_fields to test value
285 MultiPartParser.max_fields = max_fields
287 try:
288 # Create a multipart form with more than max_fields
289 boundary = b"boundary123"
290 parts = []
291 for i in range(max_fields + 2):
292 parts.append(
293 b"--"
294 + boundary
295 + b"\r\n"
296 + f'Content-Disposition: form-data; name="field{i}"\r\n\r\n'.encode()
297 + f"value{i}\r\n".encode()
298 )
299 parts.append(b"--" + boundary + b"--\r\n")
301 form_data = b"".join(parts)
303 headers = Headers(
304 [
305 (b"content-type", b"multipart/form-data; boundary=" + boundary),
306 (b"content-length", str(len(form_data)).encode()),
307 ]
308 )
310 parser = MultiPartParser(
311 headers, create_form_stream(form_data), max_fields=max_fields
312 )
314 # This should raise an exception due to too many fields
315 with pytest.raises(MultiPartException):
316 await parser.parse()
317 finally:
318 # Restore original max fields
319 MultiPartParser.max_fields = original_max_fields
322async def test_character_encoding_handling():
323 # Test with non-ASCII characters
324 boundary = b"boundary123"
325 unicode_text = "Hello 世界 こんにちは" # Unicode text
326 form_data = (
327 b"--" + boundary + b"\r\n"
328 b'Content-Disposition: form-data; name="text"\r\n'
329 b"Content-Type: text/plain; charset=utf-8\r\n\r\n"
330 + unicode_text.encode("utf-8")
331 + b"\r\n"
332 b"--" + boundary + b"--\r\n"
333 )
335 headers = Headers(
336 [
337 (
338 b"content-type",
339 b"multipart/form-data; boundary=" + boundary + b"; charset=utf-8",
340 ),
341 (b"content-length", str(len(form_data)).encode()),
342 ]
343 )
345 parser = MultiPartParser(headers, create_form_stream(form_data))
346 form = await parser.parse()
348 assert form.get("text") == unicode_text
351# Error handling tests
352async def test_malformed_headers():
353 # Test with malformed content-disposition header
354 boundary = b"boundary123"
355 form_data = (
356 b"--" + boundary + b"\r\n"
357 b"Invalid-Header: value\r\n\r\n" # Missing Content-Disposition
358 b"some content\r\n"
359 b"--" + boundary + b"--\r\n"
360 )
362 headers = Headers(
363 [
364 (b"content-type", b"multipart/form-data; boundary=" + boundary),
365 (b"content-length", str(len(form_data)).encode()),
366 ]
367 )
369 parser = MultiPartParser(headers, create_form_stream(form_data))
371 # Should raise an exception due to missing Content-Disposition header
372 with pytest.raises(Exception):
373 await parser.parse()
376async def test_invalid_boundary():
377 # Test with incorrect boundary in form data
378 boundary = b"correct_boundary"
379 wrong_boundary = b"wrong_boundary"
381 form_data = (
382 b"--" + wrong_boundary + b"\r\n" # Using wrong boundary
383 b'Content-Disposition: form-data; name="field1"\r\n\r\n'
384 b"value1\r\n"
385 b"--" + wrong_boundary + b"--\r\n"
386 )
388 headers = Headers(
389 raw=[
390 (
391 b"content-type",
392 b"multipart/form-data; boundary=" + boundary,
393 ), # Different boundary
394 (b"content-length", str(len(form_data)).encode()),
395 ]
396 )
398 parser = MultiPartParser(headers, create_form_stream(form_data))
400 # Should fail to parse properly
401 with pytest.raises(Exception):
402 await parser.parse()
405# Integration tests with client
406async def test_simple_form_submission(client):
407 # Test basic form submission
408 data = {
409 "name": "John Doe",
410 "email": "john@example.com",
411 "message": "This is a test message",
412 }
414 response = await client.post("/form", data=data)
415 assert response.status_code == 200
417 result = response.json()
418 assert result["fields"] == data
421async def test_form_with_special_chars_integration(client):
422 # Test form with special characters
423 data = {
424 "name": "User Name",
425 "message": "Special characters: !@#$%^&*()_+",
426 "unicode": "Hello 世界 こんにちは",
427 }
429 response = await client.post("/form", data=data)
430 assert response.status_code == 200
432 result = response.json()
433 assert result["fields"] == data
436async def test_single_file_upload(client):
437 # Test uploading a single file with metadata
438 file_content = "This is a test file content."
439 files = {"document": ("test.txt", file_content, "text/plain")}
440 data = {"description": "Test file upload", "category": "documentation"}
442 response = await client.post("/upload", files=files, data=data)
443 assert response.status_code == 200
445 result = response.json()
446 assert "files" in result
447 assert "fields" in result
449 # Check file data
450 assert "document" in result["files"]
451 uploaded_file = result["files"]["document"]
452 assert uploaded_file["filename"] == "test.txt"
453 assert uploaded_file["content"] == file_content
454 assert uploaded_file["content_type"] == "text/plain"
456 # Check form fields
457 assert result["fields"]["description"] == "Test file upload"
458 assert result["fields"]["category"] == "documentation"
461async def test_multiple_file_uploads(client):
462 # Test uploading multiple files of different types
463 files = {
464 "text_file": ("document.txt", "Text file content", "text/plain"),
465 "json_file": ("data.json", '{"key": "value"}', "application/json"),
466 "csv_file": ("data.csv", "name,age\nJohn,30\nJane,25", "text/csv"),
467 }
468 data = {"description": "Multiple files upload test"}
470 response = await client.post("/upload", files=files, data=data)
471 assert response.status_code == 200
473 result = response.json()
474 assert len(result["files"]) == 3
476 # Check text file
477 assert "text_file" in result["files"]
478 text_file = result["files"]["text_file"]
479 assert text_file["filename"] == "document.txt"
480 assert text_file["content"] == "Text file content"
481 assert text_file["content_type"] == "text/plain"
483 # Check JSON file
484 assert "json_file" in result["files"]
485 json_file = result["files"]["json_file"]
486 assert json_file["filename"] == "data.json"
487 assert json_file["content"] == '{"key": "value"}'
488 assert json_file["content_type"] == "application/json"
490 # Check CSV file
491 assert "csv_file" in result["files"]
492 csv_file = result["files"]["csv_file"]
493 assert csv_file["filename"] == "data.csv"
494 assert csv_file["content"] == "name,age\nJohn,30\nJane,25"
495 assert csv_file["content_type"] == "text/csv"
497 # Check form field
498 assert result["fields"]["description"] == "Multiple files upload test"
501async def test_unicode_filename_and_content(client):
502 # Test handling of Unicode in filenames and content
503 filename = "测试文件.txt"
504 content = "Unicode content: こんにちは世界 - Hello World!"
506 files = {"unicode_file": (filename, content, "text/plain; charset=utf-8")}
508 response = await client.post("/upload", files=files)
509 assert response.status_code == 200
511 result = response.json()
512 assert "unicode_file" in result["files"]
513 uploaded_file = result["files"]["unicode_file"]
514 assert uploaded_file["filename"] == filename
515 assert uploaded_file["content"] == content
516 assert "text/plain" in uploaded_file["content_type"]
519async def test_binary_file_upload(client):
520 # Test binary file upload (using simplified binary content for test)
521 # In a real scenario, this would be actual binary data
522 binary_content = b"Binary content with \x00\x01\x02\x03 bytes"
524 files = {"binary_file": ("data.bin", binary_content, "application/octet-stream")}
526 response = await client.post("/upload", files=files)
527 assert response.status_code == 200
529 result = response.json()
530 assert "binary_file" in result["files"]
531 # Note: The test expects the binary content to be decoded as utf-8 with replacement
532 # characters for invalid sequences - this is just for the test's sake
533 # In a real app, you might want to handle binary data differently