Coverage for nexios\structs.py: 55%
460 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-21 20:31 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-21 20:31 +0100
1from __future__ import annotations
2import typing
3from urllib.parse import SplitResult, parse_qsl, urlencode, urlsplit
4from nexios._utils.cuncurrency import run_in_threadpool
6Scope = typing.MutableMapping[str, typing.Any]
7Message = typing.MutableMapping[str, typing.Any]
9Receive = typing.Callable[[], typing.Awaitable[Message]]
10Send = typing.Callable[[Message], typing.Awaitable[None]]
13class Address(typing.NamedTuple):
14 host: str
15 port: int
18_KeyType = typing.TypeVar("_KeyType")
19# you can only read them
20# that is, you can't do `Mapping[str, Animal]()["fido"] = Dog()`
21_CovariantValueType = typing.TypeVar("_CovariantValueType", covariant=True)
24class URL:
25 def __init__(
26 self,
27 url: str = "",
28 scope: typing.Optional[Scope] = None,
29 **components: typing.Any,
30 ) -> None:
31 if scope is not None:
32 assert not url, 'Cannot set both "url" and "scope".'
33 assert not components, 'Cannot set both "scope" and "**components".'
34 scheme = scope.get("scheme", "http")
35 server = scope.get("server", None)
36 path = scope.get("root_path", "") + scope["path"]
37 query_string = scope.get("query_string", b"")
39 host_header = None
40 for key, value in scope["headers"]:
41 if key == b"host":
42 host_header = value.decode("latin-1")
43 break
45 if host_header is not None:
46 url = f"{scheme}://{host_header}{path}"
47 elif server is None:
48 url = path
49 else:
50 host, port = server
51 default_port = {"http": 80, "https": 443, "ws": 80, "wss": 443}[scheme]
52 if port == default_port:
53 url = f"{scheme}://{host}{path}"
54 else:
55 url = f"{scheme}://{host}:{port}{path}"
57 if query_string:
58 url += "?" + query_string.decode()
59 elif components:
60 assert not url, 'Cannot set both "url" and "**components".'
61 url = URL("").replace(**components).components.geturl()
63 self._url = url
65 @property
66 def components(self) -> SplitResult:
67 if not hasattr(self, "_components"):
68 self._components = urlsplit(self._url)
69 return self._components
71 @property
72 def scheme(self) -> str:
73 return self.components.scheme
75 @property
76 def netloc(self) -> str:
77 return self.components.netloc
79 @property
80 def path(self) -> str:
81 return self.components.path
83 @property
84 def query(self) -> str:
85 return self.components.query
87 @property
88 def fragment(self) -> str:
89 return self.components.fragment
91 @property
92 def username(self) -> typing.Union[None, str]:
93 return self.components.username
95 @property
96 def password(self) -> typing.Union[None, str]:
97 return self.components.password
99 @property
100 def hostname(self) -> typing.Union[None, str]:
101 return self.components.hostname
103 @property
104 def port(self) -> typing.Optional[int]:
105 return self.components.port
107 @property
108 def is_secure(self) -> bool:
109 return self.scheme in ("https", "wss")
111 @property
112 def params(self):
113 return
115 @params.setter
116 def params(self, value: str):
117 return value
119 def replace(self, **kwargs: typing.Any) -> "URL":
120 if (
121 "username" in kwargs
122 or "password" in kwargs
123 or "hostname" in kwargs
124 or "port" in kwargs
125 ):
126 hostname = kwargs.pop("hostname", None)
127 port = kwargs.pop("port", self.port)
128 username = kwargs.pop("username", self.username)
129 password = kwargs.pop("password", self.password)
131 if hostname is None:
132 netloc = self.netloc
133 _, _, hostname = netloc.rpartition("@")
135 if hostname[-1] != "]":
136 hostname = hostname.rsplit(":", 1)[0]
138 netloc = hostname
139 if port is not None:
140 netloc += f":{port}"
141 if username is not None:
142 userpass = username
143 if password is not None:
144 userpass += f":{password}"
145 netloc = f"{userpass}@{netloc}"
147 kwargs["netloc"] = netloc
149 components = self.components._replace(**kwargs)
150 return self.__class__(components.geturl())
152 def include_query_params(self, **kwargs: typing.Any) -> "URL":
153 params = MultiDict(parse_qsl(self.query, keep_blank_values=True))
154 params.update({str(key): str(value) for key, value in kwargs.items()})
155 query = urlencode(params.multi_items())
156 return self.replace(query=query)
158 def replace_query_params(self, **kwargs: typing.Any) -> "URL":
159 query = urlencode([(str(key), str(value)) for key, value in kwargs.items()])
160 return self.replace(query=query)
162 def remove_query_params(
163 self, keys: typing.Union[str, typing.Sequence[str]]
164 ) -> "URL":
165 if isinstance(keys, str):
166 keys = [keys]
167 params = MultiDict(parse_qsl(self.query, keep_blank_values=True))
168 for key in keys:
169 params.pop(key, None)
170 query = urlencode(params.multi_items())
171 return self.replace(query=query)
173 def __eq__(self, other: typing.Any) -> bool:
174 return str(self) == str(other)
176 def __str__(self) -> str:
177 return self._url
179 def __repr__(self) -> str:
180 url = str(self)
181 if self.password:
182 url = str(self.replace(password="********"))
183 return f"{self.__class__.__name__}({repr(url)})"
186class URLPath(str):
187 """
188 A URL path string that may also hold an associated protocol and/or host.
189 Used by the routing to return `url_path_for` matches.
190 """
192 def __new__(cls, path: str, protocol: str = "", host: str = "") -> "URLPath":
193 assert protocol in ("http", "websocket", "")
194 return str.__new__(cls, path)
196 def __init__(self, path: str, protocol: str = "", host: str = "") -> None:
197 self.protocol = protocol
198 self.host = host
200 def make_absolute_url(self, base_url: typing.Union[str, URL]) -> URL:
201 if isinstance(base_url, str):
202 base_url = URL(base_url)
203 if self.protocol:
204 scheme = {
205 "http": {True: "https", False: "http"},
206 "websocket": {True: "wss", False: "ws"},
207 }[self.protocol][base_url.is_secure]
208 else:
209 scheme = base_url.scheme
211 netloc = self.host or base_url.netloc
212 path = base_url.path.rstrip("/") + str(self)
213 return URL(scheme=scheme, netloc=netloc, path=path)
216class Secret:
217 """
218 Holds a string value that should not be revealed in tracebacks etc.
219 You should cast the value to `str` at the point it is required.
220 """
222 def __init__(self, value: str):
223 self._value = value
225 def __repr__(self) -> str:
226 class_name = self.__class__.__name__
227 return f"{class_name}('**********')"
229 def __str__(self) -> str:
230 return self._value
232 def __bool__(self) -> bool:
233 return bool(self._value)
236class ImmutableMultiDict(typing.Mapping[_KeyType, _CovariantValueType]):
237 _dict: typing.Dict[_KeyType, _CovariantValueType]
239 def __init__(
240 self,
241 *args: typing.Union[
242 "ImmutableMultiDict[_KeyType, _CovariantValueType]",
243 typing.Mapping[_KeyType, _CovariantValueType],
244 typing.Iterable[typing.Tuple[_KeyType, _CovariantValueType]],
245 ],
246 **kwargs: typing.Any,
247 ) -> None:
248 assert len(args) < 2, "Too many arguments."
250 value: typing.Any = args[0] if args else []
251 if kwargs:
252 value = (
253 ImmutableMultiDict(value).multi_items()
254 + ImmutableMultiDict(kwargs).multi_items() # type: ignore[operator]
255 )
257 if not value:
258 _items: typing.List[typing.Tuple[typing.Any, typing.Any]] = []
259 elif hasattr(value, "multi_items"):
260 value = typing.cast(
261 ImmutableMultiDict[_KeyType, _CovariantValueType], value
262 )
263 _items = list(value.multi_items())
264 elif hasattr(value, "items"):
265 value = typing.cast(typing.Mapping[_KeyType, _CovariantValueType], value)
266 _items = list(value.items())
267 else:
268 value = typing.cast(
269 typing.List[typing.Tuple[typing.Any, typing.Any]], value
270 )
271 _items = list(value)
273 self._dict = {k: v for k, v in _items}
274 self._list = _items
276 def getlist(self, key: typing.Any) -> typing.List[_CovariantValueType]:
277 return [item_value for item_key, item_value in self._list if item_key == key]
279 def keys(self) -> typing.KeysView[_KeyType]:
280 return self._dict.keys()
282 def values(self) -> typing.ValuesView[_CovariantValueType]:
283 return self._dict.values()
285 def items(self) -> typing.ItemsView[_KeyType, _CovariantValueType]:
286 return self._dict.items()
288 def multi_items(self) -> typing.List[typing.Tuple[_KeyType, _CovariantValueType]]:
289 return list(self._list)
291 def __getitem__(self, key: _KeyType) -> _CovariantValueType:
292 return self._dict[key]
294 def __contains__(self, key: typing.Any) -> bool:
295 return key in self._dict
297 def __iter__(self) -> typing.Iterator[_KeyType]:
298 return iter(self.keys())
300 def __len__(self) -> int:
301 return len(self._dict)
303 def __eq__(self, other: typing.Any) -> bool:
304 if not isinstance(other, self.__class__):
305 return False
306 return sorted(self._list) == sorted(other._list)
308 def __repr__(self) -> str:
309 class_name = self.__class__.__name__
310 items = self.multi_items()
311 return f"{class_name}({items!r})"
314class MultiDict(ImmutableMultiDict[typing.Any, typing.Any]):
315 def __setitem__(self, key: typing.Any, value: typing.Any) -> None:
316 self.setlist(key, [value])
318 def __delitem__(self, key: typing.Any) -> None:
319 self._list = [(k, v) for k, v in self._list if k != key]
320 del self._dict[key]
322 def pop(self, key: typing.Any, default: typing.Any = None) -> typing.Any:
323 self._list = [(k, v) for k, v in self._list if k != key]
324 return self._dict.pop(key, default)
326 def popitem(self) -> typing.Tuple[typing.Any, typing.Any]:
327 key, value = self._dict.popitem()
328 self._list = [(k, v) for k, v in self._list if k != key]
329 return key, value
331 def poplist(self, key: typing.Any) -> typing.List[typing.Any]:
332 values = [v for k, v in self._list if k == key]
333 self.pop(key)
334 return values
336 def clear(self) -> None:
337 self._dict.clear()
338 self._list.clear()
340 def setdefault(self, key: typing.Any, default: typing.Any = None) -> typing.Any:
341 if key not in self:
342 self._dict[key] = default
343 self._list.append((key, default))
345 return self[key]
347 def setlist(self, key: typing.Any, values: typing.List[typing.Any]) -> None:
348 if not values:
349 self.pop(key, None)
350 else:
351 existing_items = [(k, v) for (k, v) in self._list if k != key]
352 self._list = existing_items + [(key, value) for value in values]
353 self._dict[key] = values[-1]
355 def append(self, key: typing.Any, value: typing.Any) -> None:
356 self._list.append((key, value))
357 self._dict[key] = value
359 def update(
360 self,
361 *args: typing.Union[
362 "MultiDict",
363 typing.Mapping[str, typing.Any],
364 typing.List[typing.Tuple[typing.Any, typing.Any]],
365 ],
366 **kwargs: typing.Any,
367 ) -> None:
368 value = MultiDict(*args, **kwargs)
369 existing_items = [(k, v) for (k, v) in self._list if k not in value.keys()]
370 self._list = existing_items + value.multi_items()
371 self._dict.update(value)
374class QueryParams(ImmutableMultiDict[str, str]):
375 """
376 An immutable multidict.
377 """
379 def __init__(
380 self,
381 *args: typing.Union[
382 "ImmutableMultiDict[str,typing.Any]",
383 typing.Mapping[str, typing.Any],
384 typing.List[typing.Tuple[typing.Any, typing.Any]],
385 str,
386 bytes,
387 ],
388 **kwargs: typing.Any,
389 ) -> None:
390 assert len(args) < 2, "Too many arguments."
392 value = args[0] if args else []
394 if isinstance(value, str):
395 super().__init__(parse_qsl(value, keep_blank_values=True), **kwargs)
396 elif isinstance(value, bytes):
397 super().__init__(
398 parse_qsl(value.decode("latin-1"), keep_blank_values=True), **kwargs
399 )
400 else:
401 super().__init__(*args, **kwargs) # type: ignore[arg-type]
402 self._list = [(str(k), str(v)) for k, v in self._list]
403 self._dict = {str(k): str(v) for k, v in self._dict.items()}
405 def __str__(self) -> str:
406 return urlencode(self._list)
408 def __repr__(self) -> str:
409 class_name = self.__class__.__name__
410 query_string = str(self)
411 return f"{class_name}({query_string!r})"
413 def __call__(self, *args: Any, **kwds: Any) -> Dict[str, Any]:
414 return self._dict
417class Headers(typing.Mapping[str, str]):
418 """
419 An immutable, case-insensitive multidict.
420 """
422 def __init__(
423 self,
424 headers: typing.Optional[typing.Mapping[str, str]] = None,
425 raw: typing.Optional[typing.List[typing.Tuple[bytes, bytes]]] = None,
426 scope: typing.Optional[typing.MutableMapping[str, typing.Any]] = None,
427 ) -> None:
428 self._list: typing.List[typing.Tuple[bytes, bytes]] = []
429 if headers is not None:
430 assert raw is None, 'Cannot set both "headers" and "raw".'
431 assert scope is None, 'Cannot set both "headers" and "scope".'
432 if isinstance(headers, typing.Mapping): # type: ignore
433 self._list = [
434 (key.lower().encode("latin-1"), value.encode("latin-1"))
435 for key, value in headers.items()
436 ]
437 else:
438 # Assume it's a list of (bytes, bytes) tuples or something convertible
439 self._list = [
440 (
441 (
442 k.lower()
443 if isinstance(k, bytes)
444 else k.lower().encode("latin-1")
445 ),
446 v if isinstance(v, bytes) else v.encode("latin-1"),
447 )
448 for k, v in headers
449 ]
450 elif raw is not None:
451 assert scope is None, 'Cannot set both "raw" and "scope".'
452 self._list = raw
453 elif scope is not None:
454 # scope["headers"] isn't necessarily a list
455 # it might be a tuple or other iterable
456 self._list = list(scope["headers"])
458 @property
459 def raw(self) -> typing.List[typing.Tuple[bytes, bytes]]:
460 return list(self._list)
462 def keys(self) -> typing.List[str]: # type: ignore[override]
463 return [key.decode("latin-1") for key, _ in self._list]
465 def values(self) -> typing.List[str]: # type: ignore[override]
466 return [value.decode("latin-1") for _, value in self._list]
468 def items(self) -> typing.List[typing.Tuple[str, str]]: # type: ignore
469 return [
470 (key.decode("latin-1"), value.decode("latin-1"))
471 for key, value in self._list
472 ]
474 def getlist(self, key: str) -> typing.List[str]:
475 get_header_key = key.lower().encode("latin-1")
476 return [
477 item_value.decode("latin-1")
478 for item_key, item_value in self._list
479 if item_key == get_header_key
480 ]
482 def mutablecopy(self) -> "MutableHeaders":
483 return MutableHeaders(raw=self._list[:])
485 def __getitem__(self, key: str): # type: ignore[override]
486 get_header_key = key.lower().encode("latin-1")
487 for header_key, header_value in self._list:
488 if header_key == get_header_key:
489 return header_value.decode("latin-1")
490 return None
492 def __contains__(self, key: typing.Any) -> bool:
493 get_header_key = key.lower().encode("latin-1")
494 for header_key, _ in self._list:
495 if header_key == get_header_key:
496 return True
497 return False
499 def __iter__(self) -> typing.Iterator[typing.Any]:
500 return iter(self.keys())
502 def __len__(self) -> int:
503 return len(self._list)
505 def __eq__(self, other: typing.Any) -> bool:
506 if not isinstance(other, Headers):
507 return False
508 return sorted(self._list) == sorted(other._list)
510 def __repr__(self) -> str:
511 class_name = self.__class__.__name__
512 as_dict = dict(self.items())
513 if len(as_dict) == len(self):
514 return f"{class_name}({as_dict!r})"
515 return f"{class_name}(raw={self.raw!r})"
518class MutableHeaders(Headers):
519 def __setitem__(self, key: str, value: str) -> None:
520 """
521 Set the header `key` to `value`, removing any duplicate entries.
522 Retains insertion order.
523 """
524 set_key = key.lower().encode("latin-1")
525 set_value = value.encode("latin-1")
527 found_indexes: "typing.List[int]" = []
528 for idx, (item_key, _) in enumerate(self._list):
529 if item_key == set_key:
530 found_indexes.append(idx)
532 for idx in reversed(found_indexes[1:]):
533 del self._list[idx]
535 if found_indexes:
536 idx = found_indexes[0]
537 self._list[idx] = (set_key, set_value)
538 else:
539 self._list.append((set_key, set_value))
541 def __delitem__(self, key: str) -> None:
542 """
543 Remove the header `key`.
544 """
545 del_key = key.lower().encode("latin-1")
547 pop_indexes: "typing.List[int]" = []
548 for idx, (item_key, _) in enumerate(self._list):
549 if item_key == del_key:
550 pop_indexes.append(idx)
552 for idx in reversed(pop_indexes):
553 del self._list[idx]
555 def __ior__(self, other: typing.Mapping[str, str]) -> "MutableHeaders":
556 if not isinstance(other, typing.Mapping): # type: ignore
557 raise TypeError(f"Expected a mapping but got {other.__class__.__name__}")
558 self.update(other)
559 return self
561 def __or__(self, other: typing.Mapping[str, str]) -> "MutableHeaders":
562 if not isinstance(other, typing.Mapping): # type: ignore
563 raise TypeError(f"Expected a mapping but got {other.__class__.__name__}")
564 new = self.mutablecopy()
565 new.update(other)
566 return new
568 @property
569 def raw(self) -> typing.List[typing.Tuple[bytes, bytes]]:
570 return self._list
572 def setdefault(self, key: str, value: str) -> str:
573 """
574 If the header `key` does not exist, then set it to `value`.
575 Returns the header value.
576 """
577 set_key = key.lower().encode("latin-1")
578 set_value = value.encode("latin-1")
580 for _, (item_key, item_value) in enumerate(self._list):
581 if item_key == set_key:
582 return item_value.decode("latin-1")
583 self._list.append((set_key, set_value))
584 return value
586 def update(self, other: typing.Mapping[str, str]) -> None:
587 for key, val in other.items():
588 self[key] = val
590 def append(self, key: str, value: str) -> None:
591 """
592 Append a header, preserving any duplicate entries.
593 """
594 append_key = key.lower().encode("latin-1")
595 append_value = value.encode("latin-1")
596 self._list.append((append_key, append_value))
598 def add_vary_header(self, vary: str) -> None:
599 existing = self.get("vary")
600 if existing is not None:
601 vary = ", ".join([existing, vary])
602 self["vary"] = vary
605class State:
606 """
607 An object that can be used to store arbitrary state.
609 Used for `request.state` and `app.state`.
610 """
612 _state: typing.Dict[str, typing.Any]
614 def __init__(self, state: typing.Optional[typing.Dict[str, typing.Any]] = None):
615 if state is None:
616 state = {}
617 super().__setattr__("_state", state)
619 def __setattr__(self, key: typing.Any, value: typing.Any) -> None:
620 self._state[key] = value
622 def __getattr__(self, key: typing.Any) -> typing.Any:
623 try:
624 return self._state[key]
625 except KeyError:
626 return None
628 def __delattr__(self, key: typing.Any) -> None:
629 del self._state[key]
632from typing import Any, Dict, Iterator, ItemsView, KeysView, Sequence, ValuesView
635class RouteParam:
636 def __init__(self, data: Dict[str, Any]) -> None:
637 """Initialize the RouteParam with a dictionary."""
638 self.data: Dict[str, Any] = data
640 def __iter__(self) -> Iterator[str]:
641 """Return an iterator over the dictionary keys."""
642 return iter(self.data)
644 def __getitem__(self, name: str) -> Any:
645 """Retrieve a value by key, returning None if the key does not exist."""
646 return self.data.get(name, None)
648 def __getattribute__(self, name: str) -> Any:
649 """
650 Custom attribute access:
651 - If the attribute exists in `data`, return its value.
652 - Otherwise, fallback to the default attribute resolution.
653 """
654 data = object.__getattribute__(self, "data")
655 if name in data:
656 return data[name]
657 return object.__getattribute__(self, name)
659 def get_lists(self) -> ItemsView[str, Any]:
660 """Return the dictionary's items (key-value pairs)."""
661 return self.data.items()
663 def keys(self) -> KeysView[str]:
664 """Return the dictionary's keys."""
665 return self.data.keys()
667 def values(self) -> ValuesView[Any]:
668 """Return the dictionary's values."""
669 return self.data.values()
671 def items(self) -> ItemsView[str, Any]:
672 """Return the dictionary's items (key-value pairs)."""
673 return self.data.items()
675 def __repr__(self) -> str:
676 """Return a string representation of the RouteParam object."""
677 return f"<RouteParams {dict(self.data)}>"
679 def __len__(self) -> int:
680 """Return the number of items in the dictionary."""
681 return len(self.data)
683 def __call__(self, *args: Any, **kwds: Any) -> Dict[str, Any]:
684 return self.data
686 def get(self, key: str, default: Any = None) -> Any:
687 """Return the value for the given key, or a default value if the key does not exist."""
688 return self.data.get(key, default)
690 def __dict__(self) -> Dict[str, Any]: # type:ignore
691 return self.data
694class UploadedFile:
695 """
696 An uploaded file included as part of the request data.
697 """
699 def __init__(
700 self,
701 file: typing.BinaryIO,
702 *,
703 size: typing.Optional[int] = None,
704 filename: typing.Optional[str] = None,
705 headers: typing.Optional[Headers] = None,
706 ) -> None:
707 self.filename = filename
708 self.file = file
709 self.size = size
710 self.headers = headers or Headers()
712 @property
713 def content_type(self) -> typing.Union[str, None]:
714 return self.headers.get("content-type", None)
716 @property
717 def _in_memory(self) -> bool:
718 # check for SpooledTemporaryFile._rolled
719 rolled_to_disk = getattr(self.file, "_rolled", True)
720 return not rolled_to_disk
722 async def write(self, data: bytes) -> None:
723 if self.size is not None:
724 self.size += len(data)
726 if self._in_memory:
727 self.file.write(data)
728 else:
729 await run_in_threadpool(self.file.write, data)
731 async def read(self, size: int = -1) -> bytes:
732 if self._in_memory:
733 return self.file.read(size)
734 return await run_in_threadpool(self.file.read, size)
736 async def seek(self, offset: int) -> None:
737 if self._in_memory:
738 self.file.seek(offset)
739 else:
740 await run_in_threadpool(self.file.seek, offset)
742 async def close(self) -> None:
743 if self._in_memory:
744 self.file.close()
745 else:
746 await run_in_threadpool(self.file.close)
748 def __repr__(self) -> str:
749 return (
750 f"{self.__class__.__name__}("
751 f"filename={self.filename!r}, "
752 f"size={self.size!r}, "
753 f"headers={self.headers!r})"
754 )
757class FormData(
758 MultiDict[str, typing.Union[UploadedFile, str, Sequence[Any]]] # type:ignore
759): # type:ignore
761 def __init__(
762 self,
763 *args: typing.Union[
764 FormData,
765 typing.Mapping[str, typing.Union[str, UploadedFile]],
766 list[tuple[str, typing.Union[str, UploadedFile]]],
767 ],
768 **kwargs: typing.Union[str, UploadedFile],
769 ) -> None:
771 super().__init__(*args, **kwargs)
773 async def close(self) -> None:
774 for _, value in self.multi_items():
775 if isinstance(value, UploadedFile):
776 await value.close()
778 def get(
779 self, key: str, default: typing.Any = None
780 ) -> typing.Union[UploadedFile, str, None]:
781 """
782 Get a value from the form data by key.
784 Args:
785 key: The key to look up
786 default: Value to return if key is not found
788 Returns:
789 The value if found, or the default
790 """
791 try:
792 return self[key]
793 except KeyError:
794 return default