Coverage for nexios\http\response.py: 70%
445 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-21 20:31 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-21 20:31 +0100
1from datetime import datetime, timedelta
2from email.utils import formatdate
3from typing import Any, Dict, List, Optional, Tuple, Union, Generator
4from pathlib import Path
5import json
6from base64 import b64encode
7from hashlib import sha1
8import mimetypes
9import typing
10import os
11import anyio
12from typing import AsyncIterator
13from anyio import AsyncFile
14import http.cookies
15from email.utils import format_datetime, formatdate
16from datetime import datetime, timezone
17from urllib.parse import quote
18import hashlib
19import anyio.to_thread
20from nexios.pagination import (
21 AsyncListDataHandler,
22 AsyncPaginator,
23 BasePaginationStrategy,
24 CursorPagination,
25 LimitOffsetPagination,
26 SyncListDataHandler,
27 PageNumberPagination,
28 SyncPaginator,
29)
30from nexios.structs import MutableHeaders
31from nexios.http.request import ClientDisconnect, Request
32import stat
33from functools import partial
35Scope = typing.MutableMapping[str, typing.Any]
36Message = typing.MutableMapping[str, typing.Any]
38Receive = typing.Callable[[], typing.Awaitable[Message]]
39Send = typing.Callable[[Message], typing.Awaitable[None]]
41JSONType = Union[str, int, float, bool, None, Dict[str, Any], List[Any]]
44class MalformedRangeHeader(Exception):
45 def __init__(self, content: str = "Malformed range header.") -> None:
46 self.content = content
49class RangeNotSatisfiable(Exception):
50 def __init__(self, max_size: int) -> None:
51 self.max_size = max_size
54class BaseResponse:
55 """
56 Base ASGI-compatible Response class with support for cookies, caching, and custom headers.
57 """
59 STATUS_CODES = {
60 200: "OK",
61 201: "Created",
62 204: "No Content",
63 301: "Moved Permanently",
64 302: "Found",
65 304: "Not Modified",
66 400: "Bad Request",
67 401: "Unauthorized",
68 403: "Forbidden",
69 404: "Not Found",
70 500: "Internal Server Error",
71 }
73 def __init__(
74 self,
75 body: Union[JSONType, Any] = "",
76 status_code: int = 200,
77 headers: Optional[Dict[str, str]] = None,
78 content_type: Optional[str] = None,
79 ):
80 self.charset = "utf-8"
81 self.status_code: int = status_code
82 self._headers: List[Tuple[bytes, bytes]] = []
83 self._body = self.render(body)
84 self.headers = headers or {}
86 self.content_type: typing.Optional[str] = content_type
88 def render(self, content: typing.Any) -> typing.Union[bytes, memoryview]:
89 if content is None:
90 return b""
91 if isinstance(content, (bytes, memoryview)):
92 return content # type: ignore
93 return content.encode(self.charset) # type: ignore
95 def _init_headers(self):
96 raw_headers = [
97 (k.lower().encode("latin-1"), v.encode("latin-1"))
98 for k, v in self.headers.items()
99 ]
100 keys = [h[0] for h in raw_headers]
101 populate_content_length = b"content-length" not in keys
102 populate_content_type = b"content-type" not in keys
103 body = getattr(self, "_body", None)
104 if (
105 body is not None
106 and populate_content_length
107 and not (self.status_code < 200 or self.status_code in (204, 304))
108 ):
109 content_length = str(len(body))
110 self.set_header("content-length", content_length, overide=True)
111 content_type: typing.Optional[str] = self.content_type
112 if content_type is not None and populate_content_type:
113 if (
114 content_type.startswith("text/")
115 and "charset=" not in content_type.lower()
116 ):
117 content_type += "; charset=" + self.charset
118 self._headers.append((b"content-type", content_type.encode("latin-1")))
120 self._headers.extend(raw_headers)
122 def set_cookie(
123 self,
124 key: str,
125 value: str = "",
126 max_age: typing.Optional[int] = None,
127 expires: typing.Union[datetime, str, int, None] = None,
128 path: typing.Optional[str] = "/",
129 domain: typing.Optional[str] = None,
130 secure: typing.Optional[bool] = False,
131 httponly: typing.Optional[bool] = False,
132 samesite: typing.Optional[typing.Literal["lax", "strict", "none"]] = "lax",
133 ) -> Any:
134 cookie: http.cookies.BaseCookie[str] = http.cookies.SimpleCookie()
135 cookie[key] = value
136 if max_age is not None:
137 cookie[key]["max-age"] = max_age
138 if expires is not None:
139 if isinstance(expires, datetime):
141 cookie[key]["expires"] = format_datetime(expires, usegmt=True)
142 else:
143 cookie[key]["expires"] = expires
144 if path is not None:
145 cookie[key]["path"] = path
146 if domain is not None:
147 cookie[key]["domain"] = domain
148 if secure:
149 cookie[key]["secure"] = True
150 if httponly:
151 cookie[key]["httponly"] = True
152 if samesite is not None:
153 assert samesite.lower() in [
154 "strict",
155 "lax",
156 "none",
157 ], "samesite must be either 'strict', 'lax' or 'none'"
158 cookie[key]["samesite"] = samesite
159 cookie_val = cookie.output(header="").strip()
160 self.set_header("set-cookie", cookie_val)
162 return cookie
164 def delete_cookie(
165 self, key: str, path: str = "/", domain: Optional[str] = None
166 ) -> Any:
167 """Delete a cookie by setting its expiry to the past."""
168 cookie = self.set_cookie(
169 key=key, value="", max_age=0, expires=0, path=path, domain=domain
170 )
172 return cookie
174 def enable_caching(self, max_age: int = 3600, private: bool = True) -> None:
175 """Enable caching with the specified max age (in seconds)."""
176 cache_control: List[str] = []
177 if private:
178 cache_control.append("private")
179 else:
180 cache_control.append("public")
182 cache_control.append(f"max-age={max_age}")
183 self.headers["cache-control"] = ", ".join(cache_control)
185 etag = self._generate_etag()
186 self.headers["etag"] = etag
188 expires = datetime.utcnow() + timedelta(seconds=max_age) # type: ignore
189 self.headers["expires"] = formatdate(expires.timestamp(), usegmt=True)
191 def disable_caching(self) -> None:
192 """Disable caching for this response."""
193 self.headers["cache-control"] = "no-store, no-cache, must-revalidate, max-age=0"
194 self.headers["pragma"] = "no-cache"
195 self.headers["expires"] = "0"
197 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
198 """Make the response callable as an ASGI application."""
199 self._init_headers()
201 await send(
202 {
203 "type": "http.response.start",
204 "status": self.status_code,
205 "headers": self._headers,
206 }
207 )
209 await send(
210 {
211 "type": "http.response.body",
212 "body": self._body,
213 }
214 )
216 @property
217 def body(self):
219 return self._body
221 @property
222 def raw_headers(self):
224 return self._headers
226 def _generate_etag(self) -> str:
227 """Generate an ETag for the response content."""
228 content_hash = sha1()
229 content_hash.update(self._body) # type:ignore
230 return f'W/"{b64encode(content_hash.digest()).decode("utf-8")}"'
232 def set_header(self, key: str, value: str, overide: bool = False) -> "BaseResponse":
233 """
234 Set a response header. If `overide` is True, replace the existing header.
235 """
236 key_bytes = key.lower().encode(
237 "latin-1"
238 ) # Normalize key to lowercase for case-insensitive comparison
239 value_bytes = value.encode("latin-1")
240 new_header = (key_bytes, value_bytes)
242 if overide:
243 self._headers = [(k, v) for k, v in self._headers if k != key_bytes]
245 self._headers.append(new_header)
246 return self
249class PlainTextResponse(BaseResponse):
250 def __init__(
251 self,
252 body: JSONType = "",
253 status_code: int = 200,
254 headers: typing.Optional[Dict[str, str]] = None,
255 content_type: str = "text/plain",
256 ):
257 super().__init__(body, status_code, headers, content_type)
260class JSONResponse(BaseResponse):
261 """
262 Response subclass for JSON content.
263 """
265 def __init__(
266 self,
267 content: Any,
268 status_code: int = 200,
269 headers: Optional[Dict[str, str]] = None,
270 indent: Optional[int] = None,
271 ensure_ascii: bool = True,
272 ):
273 try:
274 body = json.dumps(
275 content,
276 indent=indent,
277 ensure_ascii=ensure_ascii,
278 allow_nan=False,
279 default=str,
280 )
281 except (TypeError, ValueError) as e:
282 raise ValueError(f"Content is not JSON serializable: {str(e)}")
284 super().__init__(
285 body=body,
286 status_code=status_code,
287 headers=headers,
288 content_type="application/json",
289 )
292class HTMLResponse(BaseResponse):
293 """
294 Response subclass for HTML content.
295 """
297 def __init__(
298 self,
299 content: Union[str, JSONType],
300 status_code: int = 200,
301 headers: Optional[Dict[str, str]] = None,
302 ):
303 super().__init__(
304 body=content,
305 status_code=status_code,
306 headers=headers,
307 content_type="text/html; charset=utf-8",
308 )
311class FileResponse(BaseResponse):
312 """
313 Enhanced FileResponse class with AnyIO for asynchronous file streaming,
314 support for range requests, and multipart responses.
315 """
317 chunk_size = 64 * 1024 # 64KB chunks
319 def __init__(
320 self,
321 path: Union[str, Path],
322 filename: Optional[str] = None,
323 status_code: int = 200,
324 headers: Optional[Dict[str, str]] = None,
325 content_disposition_type: str = "inline",
326 ):
327 super().__init__(headers=headers)
328 self.path = Path(path)
329 self.filename = filename or self.path.name
330 self.content_disposition_type = content_disposition_type
331 self.status_code = status_code
333 self.headers = headers or {}
334 content_type, _ = mimetypes.guess_type(str(self.path))
335 self.set_header("content-type", content_type or "application/octet-stream")
336 self.set_header(
337 "content-disposition",
338 f'{content_disposition_type}; filename="{self.filename}"',
339 )
340 self.set_header("accept-ranges", "bytes")
342 self._ranges: List[Tuple[int, int]] = []
343 self._multipart_boundary: Optional[str] = None
345 def set_stat_headers(self, stat_result: os.stat_result) -> None:
346 content_length = str(stat_result.st_size)
347 last_modified = formatdate(stat_result.st_mtime, usegmt=True)
348 etag_base = str(stat_result.st_mtime) + "-" + str(stat_result.st_size)
349 etag = f'"{hashlib.md5(etag_base.encode(), usedforsecurity=False).hexdigest()}"'
351 self.set_header("content-length", content_length, overide=True)
352 self.headers.setdefault("last-modified", last_modified)
353 self.headers.setdefault("etag", etag)
355 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
356 """Handle the ASGI response, including range requests."""
358 try:
359 stat_result = await anyio.to_thread.run_sync(os.stat, self.path)
360 self.set_stat_headers(stat_result)
361 except FileNotFoundError:
362 raise RuntimeError(f"File at path {self.path} does not exist.")
363 else:
364 mode = stat_result.st_mode
365 if not stat.S_ISREG(mode):
366 raise RuntimeError(f"File at path {self.path} is not a file.")
368 range_header = MutableHeaders(scope=scope).get("Range")
369 if range_header:
370 self._handle_range_header(range_header)
372 await self._send_response(scope, receive, send)
374 def _handle_range_header(self, range_header: str) -> None:
375 """Parse and validate the Range header."""
376 file_size = self.path.stat().st_size
378 try:
379 unit, ranges = range_header.strip().split("=")
380 if unit != "bytes":
381 raise ValueError("Only byte ranges are supported")
383 self._ranges = []
384 for range_str in ranges.split(","):
385 range = range_str.split("-")
386 start: int = int(range[0])
387 end: int = int(range[-1]) if range[-1] != "" else 0
388 start = int(start) if start else 0
389 end: int = int(end) if end else file_size - 1
391 if start < 0 or end >= file_size or start > end:
392 raise ValueError("Invalid range")
394 self._ranges.append((start, end))
396 if len(self._ranges) == 1:
397 start, end = self._ranges[0]
398 content_length = end - start + 1
399 self.set_header("content-range", f"bytes {start}-{end}/{file_size}")
400 self.set_header("content-length", str(content_length), overide=True)
401 self.status_code = 206
402 elif len(self._ranges) > 1:
404 self._multipart_boundary = self._generate_multipart_boundary()
405 self.set_header(
406 "content-type",
407 f"multipart/byteranges; boundary={self._multipart_boundary}",
408 )
409 self.status_code = 206
411 except ValueError as _:
413 self.set_header("content-range", f"bytes */{file_size}")
414 self.status_code = 416
416 async def _send_response(self, scope: Scope, receive: Receive, send: Send) -> None:
417 """Send the file response, handling range requests and multipart responses."""
419 await send(
420 {
421 "type": "http.response.start",
422 "status": self.status_code,
423 "headers": self._headers,
424 }
425 )
427 if self.status_code == 416:
428 await send(
429 {
430 "type": "http.response.body",
431 "body": b"",
432 }
433 )
434 return
436 async with await anyio.open_file(self.path, "rb") as file:
438 if self._multipart_boundary:
439 for start, end in self._ranges:
440 await self._send_multipart_chunk(file, start, end, send)
441 await send(
442 {
443 "type": "http.response.body",
444 "body": f"--{self._multipart_boundary}--\r\n".encode("utf-8"),
445 "more_body": False,
446 }
447 )
448 elif self._ranges:
449 start, end = self._ranges[0]
450 await self._send_range(file, start, end, send) # type:ignore
451 else:
452 await self._send_full_file(file, send) # type:ignore
454 async def _send_full_file(self, file: AsyncIterator[bytes], send: Send) -> None:
455 """Send the entire file in chunks using AnyIO."""
456 while True:
457 chunk = await file.read(self.chunk_size) # type:ignore
458 if not chunk:
459 break
460 await send(
461 {
462 "type": "http.response.body",
463 "body": chunk,
464 "more_body": True,
465 }
466 )
467 await send(
468 {
469 "type": "http.response.body",
470 "body": b"",
471 "more_body": False,
472 }
473 )
475 async def _send_range(
476 self, file: AsyncFile[bytes], start: int, end: int, send: Send
477 ) -> None:
478 """Send a single range of the file using AnyIO."""
479 await file.seek(start)
480 remaining = end - start + 1
481 self.set_header("content-length", str(remaining), overide=True)
483 while remaining > 0:
484 chunk_size = min(self.chunk_size, remaining)
485 chunk = await file.read(chunk_size)
486 if not chunk:
487 break
488 await send(
489 {
490 "type": "http.response.body",
491 "body": chunk,
492 "more_body": True,
493 }
494 )
495 remaining -= len(chunk)
496 await send(
497 {
498 "type": "http.response.body",
499 "body": b"",
500 "more_body": False,
501 }
502 )
504 async def _send_multipart_chunk(
505 self, file: AsyncFile[bytes], start: int, end: int, send: Send
506 ) -> None:
507 """Send a multipart chunk for a range using AnyIO."""
508 await file.seek(start)
509 remaining = end - start + 1
511 boundary = f"--{self._multipart_boundary}\r\n"
512 header = next(
513 (value for key, value in self._headers if key == b"content-type"), None
514 )
515 headers = f"Content-Type: {header}\r\nContent-Range: bytes {start}-{end}/{self.path.stat().st_size}\r\n\r\n" # type:ignore[str-bytes-safe]
516 await send(
517 {
518 "type": "http.response.body",
519 "body": (boundary + headers).encode("utf-8"),
520 "more_body": True,
521 }
522 )
524 while remaining > 0:
525 chunk_size = min(self.chunk_size, remaining)
526 chunk = await file.read(chunk_size)
527 if not chunk:
528 break
529 await send(
530 {
531 "type": "http.response.body",
532 "body": chunk,
533 "more_body": True,
534 }
535 )
536 remaining -= len(chunk)
538 def _generate_multipart_boundary(self) -> str:
539 """Generate a unique multipart boundary string."""
540 return f"boundary_{os.urandom(16).hex()}"
543class StreamingResponse(BaseResponse):
544 """
545 Response subclass for streaming content.
546 """
548 def __init__(
549 self,
550 content: AsyncIterator[Union[str, bytes]],
551 status_code: int = 200,
552 headers: Optional[Dict[str, str]] = None,
553 content_type: str = "text/plain",
554 ):
555 super().__init__(headers=headers)
557 self.content_iterator = content
558 self.status_code = status_code
559 self._cookies: List[Tuple[str, str, Dict[str, Any]]] = []
561 self.content_type = content_type
562 self.headers["content-type"] = self.content_type
564 self.headers.pop("content-length", None)
566 async def listen_for_disconnect(self, receive: Receive) -> None:
567 while True:
568 message = await receive()
569 if message["type"] == "http.disconnect":
570 break
572 async def stream_response(self, send: Send) -> None:
573 await send(
574 {
575 "type": "http.response.start",
576 "status": self.status_code,
577 "headers": self.raw_headers,
578 }
579 )
580 async for chunk in self.content_iterator:
581 if not isinstance(chunk, (bytes, memoryview)):
582 chunk = chunk.encode(self.charset) # type:ignore
583 await send({"type": "http.response.body", "body": chunk, "more_body": True})
585 await send({"type": "http.response.body", "body": b"", "more_body": False})
587 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
588 spec_version = tuple(
589 map(int, scope.get("asgi", {}).get("spec_version", "2.0").split("."))
590 )
592 if spec_version >= (2, 4):
593 try:
594 await self.stream_response(send)
595 except OSError:
596 raise ClientDisconnect()
597 else:
598 async with anyio.create_task_group() as task_group:
600 async def wrap(
601 func: typing.Callable[[], typing.Awaitable[None]],
602 ) -> None:
603 await func()
604 task_group.cancel_scope.cancel()
606 task_group.start_soon(wrap, partial(self.stream_response, send))
607 await wrap(partial(self.listen_for_disconnect, receive))
610class RedirectResponse(BaseResponse):
611 """
612 Response subclass for HTTP redirects.
613 """
615 def __init__(
616 self,
617 url: str,
618 status_code: int = 302,
619 headers: Dict[str, str] = {},
620 ):
621 if not 300 <= status_code < 400:
622 raise ValueError("Status code must be a valid redirect status")
624 headers["location"] = quote(str(url), safe=":/%#?=@[]!$&'()*+,;")
626 super().__init__(body="", status_code=status_code, headers=headers)
629class NexiosResponse:
631 _instance = None
633 def __new__(cls, *args, **kwargs): # type:ignore
634 if cls._instance is None:
635 cls._instance = super(NexiosResponse, cls).__new__(cls)
636 cls._instance._initialized = False # type:ignore
637 return cls._instance
639 def __init__(self, request: Request):
640 self._response: BaseResponse = BaseResponse()
641 self._cookies: List[Dict[str, Any]] = []
642 self._status_code = self._response.status_code
643 self._request = request
645 @property
646 def headers(self):
647 return MutableHeaders(raw=self._response._headers) # type:ignore
649 @property
650 def cookies(self):
651 return self._cookies # type:ignore
653 @property
654 def body(self):
655 return self._response._body # type:ignore
657 @property
658 def content_type(self):
659 return self._response.content_type
661 @property
662 def content_length(self):
663 content_length = self.headers.get("content-length")
664 if not content_length:
665 return str(len(self.body))
667 return content_length
669 @property
670 def status_code(self):
671 return self._response.status_code
673 def _preserve_headers_and_cookies(self, new_response: BaseResponse) -> BaseResponse:
674 """Preserve headers and cookies when switching to a new response."""
675 for key, value in self.headers.items():
676 new_response.set_header(key, value)
678 return new_response
680 def has_header(self, key: str) -> bool:
681 """Check if a header is present in the response."""
682 return key.lower() in (k.lower() for k in self.headers.keys())
684 def text(
685 self, content: JSONType, status_code: int = 200, headers: Dict[str, Any] = {}
686 ):
687 """Send plain text or HTML content."""
688 new_response = PlainTextResponse(
689 body=content, status_code=status_code, headers=headers
690 )
691 self._response = self._preserve_headers_and_cookies(new_response)
692 return self
694 def json(
695 self,
696 data: Union[str, List[Any], Dict[str, Any]],
697 status_code: int = 200,
698 headers: Dict[str, Any] = {},
699 indent: Optional[int] = None,
700 ensure_ascii: bool = True,
701 ):
702 """Send JSON response."""
703 new_response = JSONResponse(
704 content=data,
705 status_code=status_code,
706 headers=headers,
707 indent=indent,
708 ensure_ascii=ensure_ascii,
709 )
710 self._response = self._preserve_headers_and_cookies(new_response)
711 return self
713 def download(self, path: str, filename: Optional[str] = None) -> "NexiosResponse":
714 """Set a response to force a file download."""
715 return self.file(path, filename, content_disposition_type="attachment")
717 def set_permanent_cookie(
718 self, key: str, value: str, **kwargs: Dict[str, Any]
719 ) -> "NexiosResponse":
720 """Set a permanent cookie with a far-future expiration date."""
721 expires = datetime.now(timezone.utc) + timedelta(days=365 * 10)
722 self.set_cookie(key, value, expires=expires, **kwargs) # type:ignore
723 return self
725 def empty(self, status_code: int = 200, headers: Dict[str, Any] = {}):
726 """Send an empty response."""
727 new_response = BaseResponse(status_code=status_code, headers=headers)
728 self._response = self._preserve_headers_and_cookies(new_response)
729 return self
731 def html(self, content: str, status_code: int = 200, headers: Dict[str, Any] = {}):
732 """Send HTML response."""
733 new_response = HTMLResponse(
734 content=content, status_code=status_code, headers=headers
735 )
736 self._response = self._preserve_headers_and_cookies(new_response)
737 return self
739 def file(
740 self,
741 path: str,
742 filename: Optional[str] = None,
743 content_disposition_type: str = "inline",
744 ):
745 """Send file response."""
746 new_response = FileResponse(
747 path=path,
748 filename=filename,
749 status_code=self._status_code,
750 headers=self._response.headers,
751 content_disposition_type=content_disposition_type,
752 )
753 self._response = self._preserve_headers_and_cookies(new_response)
754 return self
756 def stream(
757 self,
758 iterator: Generator[Union[str, bytes], Any, Any],
759 content_type: str = "text/plain",
760 status_code: Optional[int] = None,
761 ):
762 """Send streaming response."""
763 new_response = StreamingResponse(
764 content=iterator, # type: ignore
765 status_code=status_code or self._status_code,
766 headers=self._response.headers,
767 content_type=content_type,
768 )
769 self._response = self._preserve_headers_and_cookies(new_response)
770 return self
772 def redirect(self, url: str, status_code: int = 302):
773 """Send redirect response."""
774 new_response = RedirectResponse(
775 url=url, status_code=status_code, headers=self._response.headers
776 )
777 self._response = self._preserve_headers_and_cookies(new_response)
778 return self
780 def status(self, status_code: int):
781 """Set response status code."""
782 self._response.status_code = status_code
783 return self
785 def set_header(self, key: str, value: str, overide: bool = False):
786 """Set a response header."""
788 self._response.set_header(key, value, overide=overide)
789 return self
791 def set_cookie(
792 self,
793 key: str,
794 value: str,
795 max_age: Optional[int] = None,
796 expires: Optional[Union[str, datetime, int]] = None,
797 path: str = "/",
798 domain: Optional[str] = None,
799 secure: bool = True,
800 httponly: bool = False,
801 samesite: typing.Optional[typing.Literal["lax", "strict", "none"]] = "lax",
802 ):
803 """Set a response cookie."""
804 self._response.set_cookie(
805 key=key,
806 value=value,
807 max_age=max_age,
808 expires=expires,
809 path=path,
810 domain=domain,
811 secure=secure,
812 httponly=httponly,
813 samesite=samesite,
814 )
815 return self
817 def delete_cookie(
818 self,
819 key: str,
820 path: str = "/",
821 domain: Optional[str] = None,
822 ):
823 """Delete a response cookie."""
824 self._response.delete_cookie(
825 key=key,
826 path=path,
827 domain=domain,
828 )
830 return self
832 def cache(self, max_age: int = 3600, private: bool = True):
833 """Enable response caching."""
834 self._response.enable_caching(max_age, private)
835 return self
837 def no_cache(self):
838 """Disable response caching."""
839 self._response.disable_caching()
840 return self
842 def resp(
843 self,
844 body: Union[JSONType, Any] = "",
845 status_code: int = 200,
846 headers: Optional[Dict[str, str]] = None,
847 content_type: str = "text/plain",
848 ):
849 """
850 Provides access to the purest form of the response object.
851 """
852 new_response = BaseResponse(
853 body=body,
854 status_code=status_code,
855 headers=headers,
856 content_type=content_type,
857 )
858 self._response = self._preserve_headers_and_cookies(new_response)
859 return self
861 def set_cookies(self, cookies: List[Dict[str, Any]]):
862 """Set multiple cookies at once."""
863 for cookie in cookies:
864 self.set_cookie(**cookie)
865 return self
867 def set_headers(self, headers: Dict[str, str], overide_all: bool = False):
868 if overide_all:
869 self._response._headers = [ # type:ignore
870 (bytes(str(k), "utf-8"), bytes(str(v), "utf-8"))
871 for k, v in self.headers.items()
872 ] # type:ignore
873 return
874 """Set multiple headers at once."""
875 for key, value in headers.items():
876 self.set_header(key, value)
877 return self
879 def set_body(self, new_body: Any):
880 self._response._body = new_body # type:ignore
882 def get_response(self) -> BaseResponse:
883 """Make the response ASGI-compatible."""
884 return self._response
886 def add_csp_header(self, policy: str) -> "NexiosResponse":
887 """Add a Content Security Policy header."""
888 self.set_header("Content-Security-Policy", policy)
889 return self
891 def make_response(self, response_class: BaseResponse) -> "NexiosResponse":
892 """
893 Create a response using a custom response class.
895 Args:
896 response_class (Type[BaseResponse]): The custom response class to use.
897 *args: Positional arguments to pass to the custom response class.
898 **kwargs: Keyword arguments to pass to the custom response class.
900 Returns:
901 NexiosResponse: The current instance for method chaining.
902 """
904 self._response = self._preserve_headers_and_cookies(response_class)
905 return self
907 def remove_header(self, key: str):
908 """Remove a header from the response."""
909 self._response._headers = [ # type:ignore
910 (k, v)
911 for k, v in self._response._headers # type:ignore
912 if k.decode("latin-1").lower() != key.lower()
913 ] # type:ignore
915 def paginate(
916 self,
917 items: List[Any],
918 total_items: Optional[int] = None,
919 strategy: Union[str, BasePaginationStrategy] = "page_number",
920 data_handler: type[SyncListDataHandler] = SyncListDataHandler,
921 **kwargs: Dict[str, Any],
922 ) -> "NexiosResponse":
923 """
924 Paginate the response data.
926 Args:
927 items: List of items to paginate
928 total_items: Total number of items (optional, defaults to len(items))
929 strategy: Either a string ('page_number', 'limit_offset', 'cursor') or
930 a custom pagination strategy instance
931 **kwargs: Additional arguments for the pagination strategy
932 """
933 if isinstance(strategy, str):
934 if strategy == "page_number":
935 strategy = PageNumberPagination(**kwargs) # type:ignore
936 elif strategy == "limit_offset":
937 strategy = LimitOffsetPagination(**kwargs) # type:ignore
938 elif strategy == "cursor":
939 strategy = CursorPagination(**kwargs) # type:ignore
940 else:
941 raise ValueError(f"Unknown pagination strategy: {strategy}")
943 _data_handler = data_handler(items)
944 request = self._request # You'll need to store the request in the response
946 paginator = SyncPaginator(
947 data_handler=_data_handler,
948 pagination_strategy=strategy,
949 base_url=str(request.url),
950 request_params=dict(request.query_params),
951 )
953 result = paginator.paginate()
954 return self.json(result)
956 async def apaginate(
957 self,
958 items: List[Any],
959 total_items: Optional[int] = None,
960 strategy: Union[str, BasePaginationStrategy] = "page_number",
961 data_handler: type[AsyncListDataHandler] = AsyncListDataHandler,
962 **kwargs: Dict[str, Any],
963 ) -> "NexiosResponse":
964 """
965 Paginate the response data.
967 Args:
968 items: List of items to paginate
969 total_items: Total number of items (optional, defaults to len(items))
970 strategy: Either a string ('page_number', 'limit_offset', 'cursor') or
971 a custom pagination strategy instance
972 **kwargs: Additional arguments for the pagination strategy
973 """
974 if isinstance(strategy, str):
975 if strategy == "page_number":
976 strategy = PageNumberPagination(**kwargs) # type:ignore
977 elif strategy == "limit_offset":
978 strategy = LimitOffsetPagination(**kwargs) # type:ignore
979 elif strategy == "cursor":
980 strategy = CursorPagination(**kwargs) # type:ignore
981 else:
982 raise ValueError(f"Unknown pagination strategy: {strategy}")
984 _data_handler = AsyncListDataHandler(items)
985 request = self._request # You'll need to store the request in the response
987 paginator = AsyncPaginator(
988 data_handler=_data_handler,
989 pagination_strategy=strategy,
990 base_url=str(request.url),
991 request_params=dict(request.query_params),
992 )
994 result = await paginator.paginate()
995 return self.json(result)
997 def __str__(self):
998 return f"Response [{self._status_code} {self.body}]"
1001Response = BaseResponse