Coverage for nexios\http\response.py: 70%

445 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-21 20:31 +0100

1from datetime import datetime, timedelta 

2from email.utils import formatdate 

3from typing import Any, Dict, List, Optional, Tuple, Union, Generator 

4from pathlib import Path 

5import json 

6from base64 import b64encode 

7from hashlib import sha1 

8import mimetypes 

9import typing 

10import os 

11import anyio 

12from typing import AsyncIterator 

13from anyio import AsyncFile 

14import http.cookies 

15from email.utils import format_datetime, formatdate 

16from datetime import datetime, timezone 

17from urllib.parse import quote 

18import hashlib 

19import anyio.to_thread 

20from nexios.pagination import ( 

21 AsyncListDataHandler, 

22 AsyncPaginator, 

23 BasePaginationStrategy, 

24 CursorPagination, 

25 LimitOffsetPagination, 

26 SyncListDataHandler, 

27 PageNumberPagination, 

28 SyncPaginator, 

29) 

30from nexios.structs import MutableHeaders 

31from nexios.http.request import ClientDisconnect, Request 

32import stat 

33from functools import partial 

34 

35Scope = typing.MutableMapping[str, typing.Any] 

36Message = typing.MutableMapping[str, typing.Any] 

37 

38Receive = typing.Callable[[], typing.Awaitable[Message]] 

39Send = typing.Callable[[Message], typing.Awaitable[None]] 

40 

41JSONType = Union[str, int, float, bool, None, Dict[str, Any], List[Any]] 

42 

43 

44class MalformedRangeHeader(Exception): 

45 def __init__(self, content: str = "Malformed range header.") -> None: 

46 self.content = content 

47 

48 

49class RangeNotSatisfiable(Exception): 

50 def __init__(self, max_size: int) -> None: 

51 self.max_size = max_size 

52 

53 

54class BaseResponse: 

55 """ 

56 Base ASGI-compatible Response class with support for cookies, caching, and custom headers. 

57 """ 

58 

59 STATUS_CODES = { 

60 200: "OK", 

61 201: "Created", 

62 204: "No Content", 

63 301: "Moved Permanently", 

64 302: "Found", 

65 304: "Not Modified", 

66 400: "Bad Request", 

67 401: "Unauthorized", 

68 403: "Forbidden", 

69 404: "Not Found", 

70 500: "Internal Server Error", 

71 } 

72 

73 def __init__( 

74 self, 

75 body: Union[JSONType, Any] = "", 

76 status_code: int = 200, 

77 headers: Optional[Dict[str, str]] = None, 

78 content_type: Optional[str] = None, 

79 ): 

80 self.charset = "utf-8" 

81 self.status_code: int = status_code 

82 self._headers: List[Tuple[bytes, bytes]] = [] 

83 self._body = self.render(body) 

84 self.headers = headers or {} 

85 

86 self.content_type: typing.Optional[str] = content_type 

87 

88 def render(self, content: typing.Any) -> typing.Union[bytes, memoryview]: 

89 if content is None: 

90 return b"" 

91 if isinstance(content, (bytes, memoryview)): 

92 return content # type: ignore 

93 return content.encode(self.charset) # type: ignore 

94 

95 def _init_headers(self): 

96 raw_headers = [ 

97 (k.lower().encode("latin-1"), v.encode("latin-1")) 

98 for k, v in self.headers.items() 

99 ] 

100 keys = [h[0] for h in raw_headers] 

101 populate_content_length = b"content-length" not in keys 

102 populate_content_type = b"content-type" not in keys 

103 body = getattr(self, "_body", None) 

104 if ( 

105 body is not None 

106 and populate_content_length 

107 and not (self.status_code < 200 or self.status_code in (204, 304)) 

108 ): 

109 content_length = str(len(body)) 

110 self.set_header("content-length", content_length, overide=True) 

111 content_type: typing.Optional[str] = self.content_type 

112 if content_type is not None and populate_content_type: 

113 if ( 

114 content_type.startswith("text/") 

115 and "charset=" not in content_type.lower() 

116 ): 

117 content_type += "; charset=" + self.charset 

118 self._headers.append((b"content-type", content_type.encode("latin-1"))) 

119 

120 self._headers.extend(raw_headers) 

121 

122 def set_cookie( 

123 self, 

124 key: str, 

125 value: str = "", 

126 max_age: typing.Optional[int] = None, 

127 expires: typing.Union[datetime, str, int, None] = None, 

128 path: typing.Optional[str] = "/", 

129 domain: typing.Optional[str] = None, 

130 secure: typing.Optional[bool] = False, 

131 httponly: typing.Optional[bool] = False, 

132 samesite: typing.Optional[typing.Literal["lax", "strict", "none"]] = "lax", 

133 ) -> Any: 

134 cookie: http.cookies.BaseCookie[str] = http.cookies.SimpleCookie() 

135 cookie[key] = value 

136 if max_age is not None: 

137 cookie[key]["max-age"] = max_age 

138 if expires is not None: 

139 if isinstance(expires, datetime): 

140 

141 cookie[key]["expires"] = format_datetime(expires, usegmt=True) 

142 else: 

143 cookie[key]["expires"] = expires 

144 if path is not None: 

145 cookie[key]["path"] = path 

146 if domain is not None: 

147 cookie[key]["domain"] = domain 

148 if secure: 

149 cookie[key]["secure"] = True 

150 if httponly: 

151 cookie[key]["httponly"] = True 

152 if samesite is not None: 

153 assert samesite.lower() in [ 

154 "strict", 

155 "lax", 

156 "none", 

157 ], "samesite must be either 'strict', 'lax' or 'none'" 

158 cookie[key]["samesite"] = samesite 

159 cookie_val = cookie.output(header="").strip() 

160 self.set_header("set-cookie", cookie_val) 

161 

162 return cookie 

163 

164 def delete_cookie( 

165 self, key: str, path: str = "/", domain: Optional[str] = None 

166 ) -> Any: 

167 """Delete a cookie by setting its expiry to the past.""" 

168 cookie = self.set_cookie( 

169 key=key, value="", max_age=0, expires=0, path=path, domain=domain 

170 ) 

171 

172 return cookie 

173 

174 def enable_caching(self, max_age: int = 3600, private: bool = True) -> None: 

175 """Enable caching with the specified max age (in seconds).""" 

176 cache_control: List[str] = [] 

177 if private: 

178 cache_control.append("private") 

179 else: 

180 cache_control.append("public") 

181 

182 cache_control.append(f"max-age={max_age}") 

183 self.headers["cache-control"] = ", ".join(cache_control) 

184 

185 etag = self._generate_etag() 

186 self.headers["etag"] = etag 

187 

188 expires = datetime.utcnow() + timedelta(seconds=max_age) # type: ignore 

189 self.headers["expires"] = formatdate(expires.timestamp(), usegmt=True) 

190 

191 def disable_caching(self) -> None: 

192 """Disable caching for this response.""" 

193 self.headers["cache-control"] = "no-store, no-cache, must-revalidate, max-age=0" 

194 self.headers["pragma"] = "no-cache" 

195 self.headers["expires"] = "0" 

196 

197 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

198 """Make the response callable as an ASGI application.""" 

199 self._init_headers() 

200 

201 await send( 

202 { 

203 "type": "http.response.start", 

204 "status": self.status_code, 

205 "headers": self._headers, 

206 } 

207 ) 

208 

209 await send( 

210 { 

211 "type": "http.response.body", 

212 "body": self._body, 

213 } 

214 ) 

215 

216 @property 

217 def body(self): 

218 

219 return self._body 

220 

221 @property 

222 def raw_headers(self): 

223 

224 return self._headers 

225 

226 def _generate_etag(self) -> str: 

227 """Generate an ETag for the response content.""" 

228 content_hash = sha1() 

229 content_hash.update(self._body) # type:ignore 

230 return f'W/"{b64encode(content_hash.digest()).decode("utf-8")}"' 

231 

232 def set_header(self, key: str, value: str, overide: bool = False) -> "BaseResponse": 

233 """ 

234 Set a response header. If `overide` is True, replace the existing header. 

235 """ 

236 key_bytes = key.lower().encode( 

237 "latin-1" 

238 ) # Normalize key to lowercase for case-insensitive comparison 

239 value_bytes = value.encode("latin-1") 

240 new_header = (key_bytes, value_bytes) 

241 

242 if overide: 

243 self._headers = [(k, v) for k, v in self._headers if k != key_bytes] 

244 

245 self._headers.append(new_header) 

246 return self 

247 

248 

249class PlainTextResponse(BaseResponse): 

250 def __init__( 

251 self, 

252 body: JSONType = "", 

253 status_code: int = 200, 

254 headers: typing.Optional[Dict[str, str]] = None, 

255 content_type: str = "text/plain", 

256 ): 

257 super().__init__(body, status_code, headers, content_type) 

258 

259 

260class JSONResponse(BaseResponse): 

261 """ 

262 Response subclass for JSON content. 

263 """ 

264 

265 def __init__( 

266 self, 

267 content: Any, 

268 status_code: int = 200, 

269 headers: Optional[Dict[str, str]] = None, 

270 indent: Optional[int] = None, 

271 ensure_ascii: bool = True, 

272 ): 

273 try: 

274 body = json.dumps( 

275 content, 

276 indent=indent, 

277 ensure_ascii=ensure_ascii, 

278 allow_nan=False, 

279 default=str, 

280 ) 

281 except (TypeError, ValueError) as e: 

282 raise ValueError(f"Content is not JSON serializable: {str(e)}") 

283 

284 super().__init__( 

285 body=body, 

286 status_code=status_code, 

287 headers=headers, 

288 content_type="application/json", 

289 ) 

290 

291 

292class HTMLResponse(BaseResponse): 

293 """ 

294 Response subclass for HTML content. 

295 """ 

296 

297 def __init__( 

298 self, 

299 content: Union[str, JSONType], 

300 status_code: int = 200, 

301 headers: Optional[Dict[str, str]] = None, 

302 ): 

303 super().__init__( 

304 body=content, 

305 status_code=status_code, 

306 headers=headers, 

307 content_type="text/html; charset=utf-8", 

308 ) 

309 

310 

311class FileResponse(BaseResponse): 

312 """ 

313 Enhanced FileResponse class with AnyIO for asynchronous file streaming, 

314 support for range requests, and multipart responses. 

315 """ 

316 

317 chunk_size = 64 * 1024 # 64KB chunks 

318 

319 def __init__( 

320 self, 

321 path: Union[str, Path], 

322 filename: Optional[str] = None, 

323 status_code: int = 200, 

324 headers: Optional[Dict[str, str]] = None, 

325 content_disposition_type: str = "inline", 

326 ): 

327 super().__init__(headers=headers) 

328 self.path = Path(path) 

329 self.filename = filename or self.path.name 

330 self.content_disposition_type = content_disposition_type 

331 self.status_code = status_code 

332 

333 self.headers = headers or {} 

334 content_type, _ = mimetypes.guess_type(str(self.path)) 

335 self.set_header("content-type", content_type or "application/octet-stream") 

336 self.set_header( 

337 "content-disposition", 

338 f'{content_disposition_type}; filename="{self.filename}"', 

339 ) 

340 self.set_header("accept-ranges", "bytes") 

341 

342 self._ranges: List[Tuple[int, int]] = [] 

343 self._multipart_boundary: Optional[str] = None 

344 

345 def set_stat_headers(self, stat_result: os.stat_result) -> None: 

346 content_length = str(stat_result.st_size) 

347 last_modified = formatdate(stat_result.st_mtime, usegmt=True) 

348 etag_base = str(stat_result.st_mtime) + "-" + str(stat_result.st_size) 

349 etag = f'"{hashlib.md5(etag_base.encode(), usedforsecurity=False).hexdigest()}"' 

350 

351 self.set_header("content-length", content_length, overide=True) 

352 self.headers.setdefault("last-modified", last_modified) 

353 self.headers.setdefault("etag", etag) 

354 

355 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

356 """Handle the ASGI response, including range requests.""" 

357 

358 try: 

359 stat_result = await anyio.to_thread.run_sync(os.stat, self.path) 

360 self.set_stat_headers(stat_result) 

361 except FileNotFoundError: 

362 raise RuntimeError(f"File at path {self.path} does not exist.") 

363 else: 

364 mode = stat_result.st_mode 

365 if not stat.S_ISREG(mode): 

366 raise RuntimeError(f"File at path {self.path} is not a file.") 

367 

368 range_header = MutableHeaders(scope=scope).get("Range") 

369 if range_header: 

370 self._handle_range_header(range_header) 

371 

372 await self._send_response(scope, receive, send) 

373 

374 def _handle_range_header(self, range_header: str) -> None: 

375 """Parse and validate the Range header.""" 

376 file_size = self.path.stat().st_size 

377 

378 try: 

379 unit, ranges = range_header.strip().split("=") 

380 if unit != "bytes": 

381 raise ValueError("Only byte ranges are supported") 

382 

383 self._ranges = [] 

384 for range_str in ranges.split(","): 

385 range = range_str.split("-") 

386 start: int = int(range[0]) 

387 end: int = int(range[-1]) if range[-1] != "" else 0 

388 start = int(start) if start else 0 

389 end: int = int(end) if end else file_size - 1 

390 

391 if start < 0 or end >= file_size or start > end: 

392 raise ValueError("Invalid range") 

393 

394 self._ranges.append((start, end)) 

395 

396 if len(self._ranges) == 1: 

397 start, end = self._ranges[0] 

398 content_length = end - start + 1 

399 self.set_header("content-range", f"bytes {start}-{end}/{file_size}") 

400 self.set_header("content-length", str(content_length), overide=True) 

401 self.status_code = 206 

402 elif len(self._ranges) > 1: 

403 

404 self._multipart_boundary = self._generate_multipart_boundary() 

405 self.set_header( 

406 "content-type", 

407 f"multipart/byteranges; boundary={self._multipart_boundary}", 

408 ) 

409 self.status_code = 206 

410 

411 except ValueError as _: 

412 

413 self.set_header("content-range", f"bytes */{file_size}") 

414 self.status_code = 416 

415 

416 async def _send_response(self, scope: Scope, receive: Receive, send: Send) -> None: 

417 """Send the file response, handling range requests and multipart responses.""" 

418 

419 await send( 

420 { 

421 "type": "http.response.start", 

422 "status": self.status_code, 

423 "headers": self._headers, 

424 } 

425 ) 

426 

427 if self.status_code == 416: 

428 await send( 

429 { 

430 "type": "http.response.body", 

431 "body": b"", 

432 } 

433 ) 

434 return 

435 

436 async with await anyio.open_file(self.path, "rb") as file: 

437 

438 if self._multipart_boundary: 

439 for start, end in self._ranges: 

440 await self._send_multipart_chunk(file, start, end, send) 

441 await send( 

442 { 

443 "type": "http.response.body", 

444 "body": f"--{self._multipart_boundary}--\r\n".encode("utf-8"), 

445 "more_body": False, 

446 } 

447 ) 

448 elif self._ranges: 

449 start, end = self._ranges[0] 

450 await self._send_range(file, start, end, send) # type:ignore 

451 else: 

452 await self._send_full_file(file, send) # type:ignore 

453 

454 async def _send_full_file(self, file: AsyncIterator[bytes], send: Send) -> None: 

455 """Send the entire file in chunks using AnyIO.""" 

456 while True: 

457 chunk = await file.read(self.chunk_size) # type:ignore 

458 if not chunk: 

459 break 

460 await send( 

461 { 

462 "type": "http.response.body", 

463 "body": chunk, 

464 "more_body": True, 

465 } 

466 ) 

467 await send( 

468 { 

469 "type": "http.response.body", 

470 "body": b"", 

471 "more_body": False, 

472 } 

473 ) 

474 

475 async def _send_range( 

476 self, file: AsyncFile[bytes], start: int, end: int, send: Send 

477 ) -> None: 

478 """Send a single range of the file using AnyIO.""" 

479 await file.seek(start) 

480 remaining = end - start + 1 

481 self.set_header("content-length", str(remaining), overide=True) 

482 

483 while remaining > 0: 

484 chunk_size = min(self.chunk_size, remaining) 

485 chunk = await file.read(chunk_size) 

486 if not chunk: 

487 break 

488 await send( 

489 { 

490 "type": "http.response.body", 

491 "body": chunk, 

492 "more_body": True, 

493 } 

494 ) 

495 remaining -= len(chunk) 

496 await send( 

497 { 

498 "type": "http.response.body", 

499 "body": b"", 

500 "more_body": False, 

501 } 

502 ) 

503 

504 async def _send_multipart_chunk( 

505 self, file: AsyncFile[bytes], start: int, end: int, send: Send 

506 ) -> None: 

507 """Send a multipart chunk for a range using AnyIO.""" 

508 await file.seek(start) 

509 remaining = end - start + 1 

510 

511 boundary = f"--{self._multipart_boundary}\r\n" 

512 header = next( 

513 (value for key, value in self._headers if key == b"content-type"), None 

514 ) 

515 headers = f"Content-Type: {header}\r\nContent-Range: bytes {start}-{end}/{self.path.stat().st_size}\r\n\r\n" # type:ignore[str-bytes-safe] 

516 await send( 

517 { 

518 "type": "http.response.body", 

519 "body": (boundary + headers).encode("utf-8"), 

520 "more_body": True, 

521 } 

522 ) 

523 

524 while remaining > 0: 

525 chunk_size = min(self.chunk_size, remaining) 

526 chunk = await file.read(chunk_size) 

527 if not chunk: 

528 break 

529 await send( 

530 { 

531 "type": "http.response.body", 

532 "body": chunk, 

533 "more_body": True, 

534 } 

535 ) 

536 remaining -= len(chunk) 

537 

538 def _generate_multipart_boundary(self) -> str: 

539 """Generate a unique multipart boundary string.""" 

540 return f"boundary_{os.urandom(16).hex()}" 

541 

542 

543class StreamingResponse(BaseResponse): 

544 """ 

545 Response subclass for streaming content. 

546 """ 

547 

548 def __init__( 

549 self, 

550 content: AsyncIterator[Union[str, bytes]], 

551 status_code: int = 200, 

552 headers: Optional[Dict[str, str]] = None, 

553 content_type: str = "text/plain", 

554 ): 

555 super().__init__(headers=headers) 

556 

557 self.content_iterator = content 

558 self.status_code = status_code 

559 self._cookies: List[Tuple[str, str, Dict[str, Any]]] = [] 

560 

561 self.content_type = content_type 

562 self.headers["content-type"] = self.content_type 

563 

564 self.headers.pop("content-length", None) 

565 

566 async def listen_for_disconnect(self, receive: Receive) -> None: 

567 while True: 

568 message = await receive() 

569 if message["type"] == "http.disconnect": 

570 break 

571 

572 async def stream_response(self, send: Send) -> None: 

573 await send( 

574 { 

575 "type": "http.response.start", 

576 "status": self.status_code, 

577 "headers": self.raw_headers, 

578 } 

579 ) 

580 async for chunk in self.content_iterator: 

581 if not isinstance(chunk, (bytes, memoryview)): 

582 chunk = chunk.encode(self.charset) # type:ignore 

583 await send({"type": "http.response.body", "body": chunk, "more_body": True}) 

584 

585 await send({"type": "http.response.body", "body": b"", "more_body": False}) 

586 

587 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

588 spec_version = tuple( 

589 map(int, scope.get("asgi", {}).get("spec_version", "2.0").split(".")) 

590 ) 

591 

592 if spec_version >= (2, 4): 

593 try: 

594 await self.stream_response(send) 

595 except OSError: 

596 raise ClientDisconnect() 

597 else: 

598 async with anyio.create_task_group() as task_group: 

599 

600 async def wrap( 

601 func: typing.Callable[[], typing.Awaitable[None]], 

602 ) -> None: 

603 await func() 

604 task_group.cancel_scope.cancel() 

605 

606 task_group.start_soon(wrap, partial(self.stream_response, send)) 

607 await wrap(partial(self.listen_for_disconnect, receive)) 

608 

609 

610class RedirectResponse(BaseResponse): 

611 """ 

612 Response subclass for HTTP redirects. 

613 """ 

614 

615 def __init__( 

616 self, 

617 url: str, 

618 status_code: int = 302, 

619 headers: Dict[str, str] = {}, 

620 ): 

621 if not 300 <= status_code < 400: 

622 raise ValueError("Status code must be a valid redirect status") 

623 

624 headers["location"] = quote(str(url), safe=":/%#?=@[]!$&'()*+,;") 

625 

626 super().__init__(body="", status_code=status_code, headers=headers) 

627 

628 

629class NexiosResponse: 

630 

631 _instance = None 

632 

633 def __new__(cls, *args, **kwargs): # type:ignore 

634 if cls._instance is None: 

635 cls._instance = super(NexiosResponse, cls).__new__(cls) 

636 cls._instance._initialized = False # type:ignore 

637 return cls._instance 

638 

639 def __init__(self, request: Request): 

640 self._response: BaseResponse = BaseResponse() 

641 self._cookies: List[Dict[str, Any]] = [] 

642 self._status_code = self._response.status_code 

643 self._request = request 

644 

645 @property 

646 def headers(self): 

647 return MutableHeaders(raw=self._response._headers) # type:ignore 

648 

649 @property 

650 def cookies(self): 

651 return self._cookies # type:ignore 

652 

653 @property 

654 def body(self): 

655 return self._response._body # type:ignore 

656 

657 @property 

658 def content_type(self): 

659 return self._response.content_type 

660 

661 @property 

662 def content_length(self): 

663 content_length = self.headers.get("content-length") 

664 if not content_length: 

665 return str(len(self.body)) 

666 

667 return content_length 

668 

669 @property 

670 def status_code(self): 

671 return self._response.status_code 

672 

673 def _preserve_headers_and_cookies(self, new_response: BaseResponse) -> BaseResponse: 

674 """Preserve headers and cookies when switching to a new response.""" 

675 for key, value in self.headers.items(): 

676 new_response.set_header(key, value) 

677 

678 return new_response 

679 

680 def has_header(self, key: str) -> bool: 

681 """Check if a header is present in the response.""" 

682 return key.lower() in (k.lower() for k in self.headers.keys()) 

683 

684 def text( 

685 self, content: JSONType, status_code: int = 200, headers: Dict[str, Any] = {} 

686 ): 

687 """Send plain text or HTML content.""" 

688 new_response = PlainTextResponse( 

689 body=content, status_code=status_code, headers=headers 

690 ) 

691 self._response = self._preserve_headers_and_cookies(new_response) 

692 return self 

693 

694 def json( 

695 self, 

696 data: Union[str, List[Any], Dict[str, Any]], 

697 status_code: int = 200, 

698 headers: Dict[str, Any] = {}, 

699 indent: Optional[int] = None, 

700 ensure_ascii: bool = True, 

701 ): 

702 """Send JSON response.""" 

703 new_response = JSONResponse( 

704 content=data, 

705 status_code=status_code, 

706 headers=headers, 

707 indent=indent, 

708 ensure_ascii=ensure_ascii, 

709 ) 

710 self._response = self._preserve_headers_and_cookies(new_response) 

711 return self 

712 

713 def download(self, path: str, filename: Optional[str] = None) -> "NexiosResponse": 

714 """Set a response to force a file download.""" 

715 return self.file(path, filename, content_disposition_type="attachment") 

716 

717 def set_permanent_cookie( 

718 self, key: str, value: str, **kwargs: Dict[str, Any] 

719 ) -> "NexiosResponse": 

720 """Set a permanent cookie with a far-future expiration date.""" 

721 expires = datetime.now(timezone.utc) + timedelta(days=365 * 10) 

722 self.set_cookie(key, value, expires=expires, **kwargs) # type:ignore 

723 return self 

724 

725 def empty(self, status_code: int = 200, headers: Dict[str, Any] = {}): 

726 """Send an empty response.""" 

727 new_response = BaseResponse(status_code=status_code, headers=headers) 

728 self._response = self._preserve_headers_and_cookies(new_response) 

729 return self 

730 

731 def html(self, content: str, status_code: int = 200, headers: Dict[str, Any] = {}): 

732 """Send HTML response.""" 

733 new_response = HTMLResponse( 

734 content=content, status_code=status_code, headers=headers 

735 ) 

736 self._response = self._preserve_headers_and_cookies(new_response) 

737 return self 

738 

739 def file( 

740 self, 

741 path: str, 

742 filename: Optional[str] = None, 

743 content_disposition_type: str = "inline", 

744 ): 

745 """Send file response.""" 

746 new_response = FileResponse( 

747 path=path, 

748 filename=filename, 

749 status_code=self._status_code, 

750 headers=self._response.headers, 

751 content_disposition_type=content_disposition_type, 

752 ) 

753 self._response = self._preserve_headers_and_cookies(new_response) 

754 return self 

755 

756 def stream( 

757 self, 

758 iterator: Generator[Union[str, bytes], Any, Any], 

759 content_type: str = "text/plain", 

760 status_code: Optional[int] = None, 

761 ): 

762 """Send streaming response.""" 

763 new_response = StreamingResponse( 

764 content=iterator, # type: ignore 

765 status_code=status_code or self._status_code, 

766 headers=self._response.headers, 

767 content_type=content_type, 

768 ) 

769 self._response = self._preserve_headers_and_cookies(new_response) 

770 return self 

771 

772 def redirect(self, url: str, status_code: int = 302): 

773 """Send redirect response.""" 

774 new_response = RedirectResponse( 

775 url=url, status_code=status_code, headers=self._response.headers 

776 ) 

777 self._response = self._preserve_headers_and_cookies(new_response) 

778 return self 

779 

780 def status(self, status_code: int): 

781 """Set response status code.""" 

782 self._response.status_code = status_code 

783 return self 

784 

785 def set_header(self, key: str, value: str, overide: bool = False): 

786 """Set a response header.""" 

787 

788 self._response.set_header(key, value, overide=overide) 

789 return self 

790 

791 def set_cookie( 

792 self, 

793 key: str, 

794 value: str, 

795 max_age: Optional[int] = None, 

796 expires: Optional[Union[str, datetime, int]] = None, 

797 path: str = "/", 

798 domain: Optional[str] = None, 

799 secure: bool = True, 

800 httponly: bool = False, 

801 samesite: typing.Optional[typing.Literal["lax", "strict", "none"]] = "lax", 

802 ): 

803 """Set a response cookie.""" 

804 self._response.set_cookie( 

805 key=key, 

806 value=value, 

807 max_age=max_age, 

808 expires=expires, 

809 path=path, 

810 domain=domain, 

811 secure=secure, 

812 httponly=httponly, 

813 samesite=samesite, 

814 ) 

815 return self 

816 

817 def delete_cookie( 

818 self, 

819 key: str, 

820 path: str = "/", 

821 domain: Optional[str] = None, 

822 ): 

823 """Delete a response cookie.""" 

824 self._response.delete_cookie( 

825 key=key, 

826 path=path, 

827 domain=domain, 

828 ) 

829 

830 return self 

831 

832 def cache(self, max_age: int = 3600, private: bool = True): 

833 """Enable response caching.""" 

834 self._response.enable_caching(max_age, private) 

835 return self 

836 

837 def no_cache(self): 

838 """Disable response caching.""" 

839 self._response.disable_caching() 

840 return self 

841 

842 def resp( 

843 self, 

844 body: Union[JSONType, Any] = "", 

845 status_code: int = 200, 

846 headers: Optional[Dict[str, str]] = None, 

847 content_type: str = "text/plain", 

848 ): 

849 """ 

850 Provides access to the purest form of the response object. 

851 """ 

852 new_response = BaseResponse( 

853 body=body, 

854 status_code=status_code, 

855 headers=headers, 

856 content_type=content_type, 

857 ) 

858 self._response = self._preserve_headers_and_cookies(new_response) 

859 return self 

860 

861 def set_cookies(self, cookies: List[Dict[str, Any]]): 

862 """Set multiple cookies at once.""" 

863 for cookie in cookies: 

864 self.set_cookie(**cookie) 

865 return self 

866 

867 def set_headers(self, headers: Dict[str, str], overide_all: bool = False): 

868 if overide_all: 

869 self._response._headers = [ # type:ignore 

870 (bytes(str(k), "utf-8"), bytes(str(v), "utf-8")) 

871 for k, v in self.headers.items() 

872 ] # type:ignore 

873 return 

874 """Set multiple headers at once.""" 

875 for key, value in headers.items(): 

876 self.set_header(key, value) 

877 return self 

878 

879 def set_body(self, new_body: Any): 

880 self._response._body = new_body # type:ignore 

881 

882 def get_response(self) -> BaseResponse: 

883 """Make the response ASGI-compatible.""" 

884 return self._response 

885 

886 def add_csp_header(self, policy: str) -> "NexiosResponse": 

887 """Add a Content Security Policy header.""" 

888 self.set_header("Content-Security-Policy", policy) 

889 return self 

890 

891 def make_response(self, response_class: BaseResponse) -> "NexiosResponse": 

892 """ 

893 Create a response using a custom response class. 

894 

895 Args: 

896 response_class (Type[BaseResponse]): The custom response class to use. 

897 *args: Positional arguments to pass to the custom response class. 

898 **kwargs: Keyword arguments to pass to the custom response class. 

899 

900 Returns: 

901 NexiosResponse: The current instance for method chaining. 

902 """ 

903 

904 self._response = self._preserve_headers_and_cookies(response_class) 

905 return self 

906 

907 def remove_header(self, key: str): 

908 """Remove a header from the response.""" 

909 self._response._headers = [ # type:ignore 

910 (k, v) 

911 for k, v in self._response._headers # type:ignore 

912 if k.decode("latin-1").lower() != key.lower() 

913 ] # type:ignore 

914 

915 def paginate( 

916 self, 

917 items: List[Any], 

918 total_items: Optional[int] = None, 

919 strategy: Union[str, BasePaginationStrategy] = "page_number", 

920 data_handler: type[SyncListDataHandler] = SyncListDataHandler, 

921 **kwargs: Dict[str, Any], 

922 ) -> "NexiosResponse": 

923 """ 

924 Paginate the response data. 

925 

926 Args: 

927 items: List of items to paginate 

928 total_items: Total number of items (optional, defaults to len(items)) 

929 strategy: Either a string ('page_number', 'limit_offset', 'cursor') or 

930 a custom pagination strategy instance 

931 **kwargs: Additional arguments for the pagination strategy 

932 """ 

933 if isinstance(strategy, str): 

934 if strategy == "page_number": 

935 strategy = PageNumberPagination(**kwargs) # type:ignore 

936 elif strategy == "limit_offset": 

937 strategy = LimitOffsetPagination(**kwargs) # type:ignore 

938 elif strategy == "cursor": 

939 strategy = CursorPagination(**kwargs) # type:ignore 

940 else: 

941 raise ValueError(f"Unknown pagination strategy: {strategy}") 

942 

943 _data_handler = data_handler(items) 

944 request = self._request # You'll need to store the request in the response 

945 

946 paginator = SyncPaginator( 

947 data_handler=_data_handler, 

948 pagination_strategy=strategy, 

949 base_url=str(request.url), 

950 request_params=dict(request.query_params), 

951 ) 

952 

953 result = paginator.paginate() 

954 return self.json(result) 

955 

956 async def apaginate( 

957 self, 

958 items: List[Any], 

959 total_items: Optional[int] = None, 

960 strategy: Union[str, BasePaginationStrategy] = "page_number", 

961 data_handler: type[AsyncListDataHandler] = AsyncListDataHandler, 

962 **kwargs: Dict[str, Any], 

963 ) -> "NexiosResponse": 

964 """ 

965 Paginate the response data. 

966 

967 Args: 

968 items: List of items to paginate 

969 total_items: Total number of items (optional, defaults to len(items)) 

970 strategy: Either a string ('page_number', 'limit_offset', 'cursor') or 

971 a custom pagination strategy instance 

972 **kwargs: Additional arguments for the pagination strategy 

973 """ 

974 if isinstance(strategy, str): 

975 if strategy == "page_number": 

976 strategy = PageNumberPagination(**kwargs) # type:ignore 

977 elif strategy == "limit_offset": 

978 strategy = LimitOffsetPagination(**kwargs) # type:ignore 

979 elif strategy == "cursor": 

980 strategy = CursorPagination(**kwargs) # type:ignore 

981 else: 

982 raise ValueError(f"Unknown pagination strategy: {strategy}") 

983 

984 _data_handler = AsyncListDataHandler(items) 

985 request = self._request # You'll need to store the request in the response 

986 

987 paginator = AsyncPaginator( 

988 data_handler=_data_handler, 

989 pagination_strategy=strategy, 

990 base_url=str(request.url), 

991 request_params=dict(request.query_params), 

992 ) 

993 

994 result = await paginator.paginate() 

995 return self.json(result) 

996 

997 def __str__(self): 

998 return f"Response [{self._status_code} {self.body}]" 

999 

1000 

1001Response = BaseResponse