Coverage for nexios\structs.py: 55%

460 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-21 20:31 +0100

1from __future__ import annotations 

2import typing 

3from urllib.parse import SplitResult, parse_qsl, urlencode, urlsplit 

4from nexios._utils.cuncurrency import run_in_threadpool 

5 

6Scope = typing.MutableMapping[str, typing.Any] 

7Message = typing.MutableMapping[str, typing.Any] 

8 

9Receive = typing.Callable[[], typing.Awaitable[Message]] 

10Send = typing.Callable[[Message], typing.Awaitable[None]] 

11 

12 

13class Address(typing.NamedTuple): 

14 host: str 

15 port: int 

16 

17 

18_KeyType = typing.TypeVar("_KeyType") 

19# you can only read them 

20# that is, you can't do `Mapping[str, Animal]()["fido"] = Dog()` 

21_CovariantValueType = typing.TypeVar("_CovariantValueType", covariant=True) 

22 

23 

24class URL: 

25 def __init__( 

26 self, 

27 url: str = "", 

28 scope: typing.Optional[Scope] = None, 

29 **components: typing.Any, 

30 ) -> None: 

31 if scope is not None: 

32 assert not url, 'Cannot set both "url" and "scope".' 

33 assert not components, 'Cannot set both "scope" and "**components".' 

34 scheme = scope.get("scheme", "http") 

35 server = scope.get("server", None) 

36 path = scope.get("root_path", "") + scope["path"] 

37 query_string = scope.get("query_string", b"") 

38 

39 host_header = None 

40 for key, value in scope["headers"]: 

41 if key == b"host": 

42 host_header = value.decode("latin-1") 

43 break 

44 

45 if host_header is not None: 

46 url = f"{scheme}://{host_header}{path}" 

47 elif server is None: 

48 url = path 

49 else: 

50 host, port = server 

51 default_port = {"http": 80, "https": 443, "ws": 80, "wss": 443}[scheme] 

52 if port == default_port: 

53 url = f"{scheme}://{host}{path}" 

54 else: 

55 url = f"{scheme}://{host}:{port}{path}" 

56 

57 if query_string: 

58 url += "?" + query_string.decode() 

59 elif components: 

60 assert not url, 'Cannot set both "url" and "**components".' 

61 url = URL("").replace(**components).components.geturl() 

62 

63 self._url = url 

64 

65 @property 

66 def components(self) -> SplitResult: 

67 if not hasattr(self, "_components"): 

68 self._components = urlsplit(self._url) 

69 return self._components 

70 

71 @property 

72 def scheme(self) -> str: 

73 return self.components.scheme 

74 

75 @property 

76 def netloc(self) -> str: 

77 return self.components.netloc 

78 

79 @property 

80 def path(self) -> str: 

81 return self.components.path 

82 

83 @property 

84 def query(self) -> str: 

85 return self.components.query 

86 

87 @property 

88 def fragment(self) -> str: 

89 return self.components.fragment 

90 

91 @property 

92 def username(self) -> typing.Union[None, str]: 

93 return self.components.username 

94 

95 @property 

96 def password(self) -> typing.Union[None, str]: 

97 return self.components.password 

98 

99 @property 

100 def hostname(self) -> typing.Union[None, str]: 

101 return self.components.hostname 

102 

103 @property 

104 def port(self) -> typing.Optional[int]: 

105 return self.components.port 

106 

107 @property 

108 def is_secure(self) -> bool: 

109 return self.scheme in ("https", "wss") 

110 

111 @property 

112 def params(self): 

113 return 

114 

115 @params.setter 

116 def params(self, value: str): 

117 return value 

118 

119 def replace(self, **kwargs: typing.Any) -> "URL": 

120 if ( 

121 "username" in kwargs 

122 or "password" in kwargs 

123 or "hostname" in kwargs 

124 or "port" in kwargs 

125 ): 

126 hostname = kwargs.pop("hostname", None) 

127 port = kwargs.pop("port", self.port) 

128 username = kwargs.pop("username", self.username) 

129 password = kwargs.pop("password", self.password) 

130 

131 if hostname is None: 

132 netloc = self.netloc 

133 _, _, hostname = netloc.rpartition("@") 

134 

135 if hostname[-1] != "]": 

136 hostname = hostname.rsplit(":", 1)[0] 

137 

138 netloc = hostname 

139 if port is not None: 

140 netloc += f":{port}" 

141 if username is not None: 

142 userpass = username 

143 if password is not None: 

144 userpass += f":{password}" 

145 netloc = f"{userpass}@{netloc}" 

146 

147 kwargs["netloc"] = netloc 

148 

149 components = self.components._replace(**kwargs) 

150 return self.__class__(components.geturl()) 

151 

152 def include_query_params(self, **kwargs: typing.Any) -> "URL": 

153 params = MultiDict(parse_qsl(self.query, keep_blank_values=True)) 

154 params.update({str(key): str(value) for key, value in kwargs.items()}) 

155 query = urlencode(params.multi_items()) 

156 return self.replace(query=query) 

157 

158 def replace_query_params(self, **kwargs: typing.Any) -> "URL": 

159 query = urlencode([(str(key), str(value)) for key, value in kwargs.items()]) 

160 return self.replace(query=query) 

161 

162 def remove_query_params( 

163 self, keys: typing.Union[str, typing.Sequence[str]] 

164 ) -> "URL": 

165 if isinstance(keys, str): 

166 keys = [keys] 

167 params = MultiDict(parse_qsl(self.query, keep_blank_values=True)) 

168 for key in keys: 

169 params.pop(key, None) 

170 query = urlencode(params.multi_items()) 

171 return self.replace(query=query) 

172 

173 def __eq__(self, other: typing.Any) -> bool: 

174 return str(self) == str(other) 

175 

176 def __str__(self) -> str: 

177 return self._url 

178 

179 def __repr__(self) -> str: 

180 url = str(self) 

181 if self.password: 

182 url = str(self.replace(password="********")) 

183 return f"{self.__class__.__name__}({repr(url)})" 

184 

185 

186class URLPath(str): 

187 """ 

188 A URL path string that may also hold an associated protocol and/or host. 

189 Used by the routing to return `url_path_for` matches. 

190 """ 

191 

192 def __new__(cls, path: str, protocol: str = "", host: str = "") -> "URLPath": 

193 assert protocol in ("http", "websocket", "") 

194 return str.__new__(cls, path) 

195 

196 def __init__(self, path: str, protocol: str = "", host: str = "") -> None: 

197 self.protocol = protocol 

198 self.host = host 

199 

200 def make_absolute_url(self, base_url: typing.Union[str, URL]) -> URL: 

201 if isinstance(base_url, str): 

202 base_url = URL(base_url) 

203 if self.protocol: 

204 scheme = { 

205 "http": {True: "https", False: "http"}, 

206 "websocket": {True: "wss", False: "ws"}, 

207 }[self.protocol][base_url.is_secure] 

208 else: 

209 scheme = base_url.scheme 

210 

211 netloc = self.host or base_url.netloc 

212 path = base_url.path.rstrip("/") + str(self) 

213 return URL(scheme=scheme, netloc=netloc, path=path) 

214 

215 

216class Secret: 

217 """ 

218 Holds a string value that should not be revealed in tracebacks etc. 

219 You should cast the value to `str` at the point it is required. 

220 """ 

221 

222 def __init__(self, value: str): 

223 self._value = value 

224 

225 def __repr__(self) -> str: 

226 class_name = self.__class__.__name__ 

227 return f"{class_name}('**********')" 

228 

229 def __str__(self) -> str: 

230 return self._value 

231 

232 def __bool__(self) -> bool: 

233 return bool(self._value) 

234 

235 

236class ImmutableMultiDict(typing.Mapping[_KeyType, _CovariantValueType]): 

237 _dict: typing.Dict[_KeyType, _CovariantValueType] 

238 

239 def __init__( 

240 self, 

241 *args: typing.Union[ 

242 "ImmutableMultiDict[_KeyType, _CovariantValueType]", 

243 typing.Mapping[_KeyType, _CovariantValueType], 

244 typing.Iterable[typing.Tuple[_KeyType, _CovariantValueType]], 

245 ], 

246 **kwargs: typing.Any, 

247 ) -> None: 

248 assert len(args) < 2, "Too many arguments." 

249 

250 value: typing.Any = args[0] if args else [] 

251 if kwargs: 

252 value = ( 

253 ImmutableMultiDict(value).multi_items() 

254 + ImmutableMultiDict(kwargs).multi_items() # type: ignore[operator] 

255 ) 

256 

257 if not value: 

258 _items: typing.List[typing.Tuple[typing.Any, typing.Any]] = [] 

259 elif hasattr(value, "multi_items"): 

260 value = typing.cast( 

261 ImmutableMultiDict[_KeyType, _CovariantValueType], value 

262 ) 

263 _items = list(value.multi_items()) 

264 elif hasattr(value, "items"): 

265 value = typing.cast(typing.Mapping[_KeyType, _CovariantValueType], value) 

266 _items = list(value.items()) 

267 else: 

268 value = typing.cast( 

269 typing.List[typing.Tuple[typing.Any, typing.Any]], value 

270 ) 

271 _items = list(value) 

272 

273 self._dict = {k: v for k, v in _items} 

274 self._list = _items 

275 

276 def getlist(self, key: typing.Any) -> typing.List[_CovariantValueType]: 

277 return [item_value for item_key, item_value in self._list if item_key == key] 

278 

279 def keys(self) -> typing.KeysView[_KeyType]: 

280 return self._dict.keys() 

281 

282 def values(self) -> typing.ValuesView[_CovariantValueType]: 

283 return self._dict.values() 

284 

285 def items(self) -> typing.ItemsView[_KeyType, _CovariantValueType]: 

286 return self._dict.items() 

287 

288 def multi_items(self) -> typing.List[typing.Tuple[_KeyType, _CovariantValueType]]: 

289 return list(self._list) 

290 

291 def __getitem__(self, key: _KeyType) -> _CovariantValueType: 

292 return self._dict[key] 

293 

294 def __contains__(self, key: typing.Any) -> bool: 

295 return key in self._dict 

296 

297 def __iter__(self) -> typing.Iterator[_KeyType]: 

298 return iter(self.keys()) 

299 

300 def __len__(self) -> int: 

301 return len(self._dict) 

302 

303 def __eq__(self, other: typing.Any) -> bool: 

304 if not isinstance(other, self.__class__): 

305 return False 

306 return sorted(self._list) == sorted(other._list) 

307 

308 def __repr__(self) -> str: 

309 class_name = self.__class__.__name__ 

310 items = self.multi_items() 

311 return f"{class_name}({items!r})" 

312 

313 

314class MultiDict(ImmutableMultiDict[typing.Any, typing.Any]): 

315 def __setitem__(self, key: typing.Any, value: typing.Any) -> None: 

316 self.setlist(key, [value]) 

317 

318 def __delitem__(self, key: typing.Any) -> None: 

319 self._list = [(k, v) for k, v in self._list if k != key] 

320 del self._dict[key] 

321 

322 def pop(self, key: typing.Any, default: typing.Any = None) -> typing.Any: 

323 self._list = [(k, v) for k, v in self._list if k != key] 

324 return self._dict.pop(key, default) 

325 

326 def popitem(self) -> typing.Tuple[typing.Any, typing.Any]: 

327 key, value = self._dict.popitem() 

328 self._list = [(k, v) for k, v in self._list if k != key] 

329 return key, value 

330 

331 def poplist(self, key: typing.Any) -> typing.List[typing.Any]: 

332 values = [v for k, v in self._list if k == key] 

333 self.pop(key) 

334 return values 

335 

336 def clear(self) -> None: 

337 self._dict.clear() 

338 self._list.clear() 

339 

340 def setdefault(self, key: typing.Any, default: typing.Any = None) -> typing.Any: 

341 if key not in self: 

342 self._dict[key] = default 

343 self._list.append((key, default)) 

344 

345 return self[key] 

346 

347 def setlist(self, key: typing.Any, values: typing.List[typing.Any]) -> None: 

348 if not values: 

349 self.pop(key, None) 

350 else: 

351 existing_items = [(k, v) for (k, v) in self._list if k != key] 

352 self._list = existing_items + [(key, value) for value in values] 

353 self._dict[key] = values[-1] 

354 

355 def append(self, key: typing.Any, value: typing.Any) -> None: 

356 self._list.append((key, value)) 

357 self._dict[key] = value 

358 

359 def update( 

360 self, 

361 *args: typing.Union[ 

362 "MultiDict", 

363 typing.Mapping[str, typing.Any], 

364 typing.List[typing.Tuple[typing.Any, typing.Any]], 

365 ], 

366 **kwargs: typing.Any, 

367 ) -> None: 

368 value = MultiDict(*args, **kwargs) 

369 existing_items = [(k, v) for (k, v) in self._list if k not in value.keys()] 

370 self._list = existing_items + value.multi_items() 

371 self._dict.update(value) 

372 

373 

374class QueryParams(ImmutableMultiDict[str, str]): 

375 """ 

376 An immutable multidict. 

377 """ 

378 

379 def __init__( 

380 self, 

381 *args: typing.Union[ 

382 "ImmutableMultiDict[str,typing.Any]", 

383 typing.Mapping[str, typing.Any], 

384 typing.List[typing.Tuple[typing.Any, typing.Any]], 

385 str, 

386 bytes, 

387 ], 

388 **kwargs: typing.Any, 

389 ) -> None: 

390 assert len(args) < 2, "Too many arguments." 

391 

392 value = args[0] if args else [] 

393 

394 if isinstance(value, str): 

395 super().__init__(parse_qsl(value, keep_blank_values=True), **kwargs) 

396 elif isinstance(value, bytes): 

397 super().__init__( 

398 parse_qsl(value.decode("latin-1"), keep_blank_values=True), **kwargs 

399 ) 

400 else: 

401 super().__init__(*args, **kwargs) # type: ignore[arg-type] 

402 self._list = [(str(k), str(v)) for k, v in self._list] 

403 self._dict = {str(k): str(v) for k, v in self._dict.items()} 

404 

405 def __str__(self) -> str: 

406 return urlencode(self._list) 

407 

408 def __repr__(self) -> str: 

409 class_name = self.__class__.__name__ 

410 query_string = str(self) 

411 return f"{class_name}({query_string!r})" 

412 

413 def __call__(self, *args: Any, **kwds: Any) -> Dict[str, Any]: 

414 return self._dict 

415 

416 

417class Headers(typing.Mapping[str, str]): 

418 """ 

419 An immutable, case-insensitive multidict. 

420 """ 

421 

422 def __init__( 

423 self, 

424 headers: typing.Optional[typing.Mapping[str, str]] = None, 

425 raw: typing.Optional[typing.List[typing.Tuple[bytes, bytes]]] = None, 

426 scope: typing.Optional[typing.MutableMapping[str, typing.Any]] = None, 

427 ) -> None: 

428 self._list: typing.List[typing.Tuple[bytes, bytes]] = [] 

429 if headers is not None: 

430 assert raw is None, 'Cannot set both "headers" and "raw".' 

431 assert scope is None, 'Cannot set both "headers" and "scope".' 

432 if isinstance(headers, typing.Mapping): # type: ignore 

433 self._list = [ 

434 (key.lower().encode("latin-1"), value.encode("latin-1")) 

435 for key, value in headers.items() 

436 ] 

437 else: 

438 # Assume it's a list of (bytes, bytes) tuples or something convertible 

439 self._list = [ 

440 ( 

441 ( 

442 k.lower() 

443 if isinstance(k, bytes) 

444 else k.lower().encode("latin-1") 

445 ), 

446 v if isinstance(v, bytes) else v.encode("latin-1"), 

447 ) 

448 for k, v in headers 

449 ] 

450 elif raw is not None: 

451 assert scope is None, 'Cannot set both "raw" and "scope".' 

452 self._list = raw 

453 elif scope is not None: 

454 # scope["headers"] isn't necessarily a list 

455 # it might be a tuple or other iterable 

456 self._list = list(scope["headers"]) 

457 

458 @property 

459 def raw(self) -> typing.List[typing.Tuple[bytes, bytes]]: 

460 return list(self._list) 

461 

462 def keys(self) -> typing.List[str]: # type: ignore[override] 

463 return [key.decode("latin-1") for key, _ in self._list] 

464 

465 def values(self) -> typing.List[str]: # type: ignore[override] 

466 return [value.decode("latin-1") for _, value in self._list] 

467 

468 def items(self) -> typing.List[typing.Tuple[str, str]]: # type: ignore 

469 return [ 

470 (key.decode("latin-1"), value.decode("latin-1")) 

471 for key, value in self._list 

472 ] 

473 

474 def getlist(self, key: str) -> typing.List[str]: 

475 get_header_key = key.lower().encode("latin-1") 

476 return [ 

477 item_value.decode("latin-1") 

478 for item_key, item_value in self._list 

479 if item_key == get_header_key 

480 ] 

481 

482 def mutablecopy(self) -> "MutableHeaders": 

483 return MutableHeaders(raw=self._list[:]) 

484 

485 def __getitem__(self, key: str): # type: ignore[override] 

486 get_header_key = key.lower().encode("latin-1") 

487 for header_key, header_value in self._list: 

488 if header_key == get_header_key: 

489 return header_value.decode("latin-1") 

490 return None 

491 

492 def __contains__(self, key: typing.Any) -> bool: 

493 get_header_key = key.lower().encode("latin-1") 

494 for header_key, _ in self._list: 

495 if header_key == get_header_key: 

496 return True 

497 return False 

498 

499 def __iter__(self) -> typing.Iterator[typing.Any]: 

500 return iter(self.keys()) 

501 

502 def __len__(self) -> int: 

503 return len(self._list) 

504 

505 def __eq__(self, other: typing.Any) -> bool: 

506 if not isinstance(other, Headers): 

507 return False 

508 return sorted(self._list) == sorted(other._list) 

509 

510 def __repr__(self) -> str: 

511 class_name = self.__class__.__name__ 

512 as_dict = dict(self.items()) 

513 if len(as_dict) == len(self): 

514 return f"{class_name}({as_dict!r})" 

515 return f"{class_name}(raw={self.raw!r})" 

516 

517 

518class MutableHeaders(Headers): 

519 def __setitem__(self, key: str, value: str) -> None: 

520 """ 

521 Set the header `key` to `value`, removing any duplicate entries. 

522 Retains insertion order. 

523 """ 

524 set_key = key.lower().encode("latin-1") 

525 set_value = value.encode("latin-1") 

526 

527 found_indexes: "typing.List[int]" = [] 

528 for idx, (item_key, _) in enumerate(self._list): 

529 if item_key == set_key: 

530 found_indexes.append(idx) 

531 

532 for idx in reversed(found_indexes[1:]): 

533 del self._list[idx] 

534 

535 if found_indexes: 

536 idx = found_indexes[0] 

537 self._list[idx] = (set_key, set_value) 

538 else: 

539 self._list.append((set_key, set_value)) 

540 

541 def __delitem__(self, key: str) -> None: 

542 """ 

543 Remove the header `key`. 

544 """ 

545 del_key = key.lower().encode("latin-1") 

546 

547 pop_indexes: "typing.List[int]" = [] 

548 for idx, (item_key, _) in enumerate(self._list): 

549 if item_key == del_key: 

550 pop_indexes.append(idx) 

551 

552 for idx in reversed(pop_indexes): 

553 del self._list[idx] 

554 

555 def __ior__(self, other: typing.Mapping[str, str]) -> "MutableHeaders": 

556 if not isinstance(other, typing.Mapping): # type: ignore 

557 raise TypeError(f"Expected a mapping but got {other.__class__.__name__}") 

558 self.update(other) 

559 return self 

560 

561 def __or__(self, other: typing.Mapping[str, str]) -> "MutableHeaders": 

562 if not isinstance(other, typing.Mapping): # type: ignore 

563 raise TypeError(f"Expected a mapping but got {other.__class__.__name__}") 

564 new = self.mutablecopy() 

565 new.update(other) 

566 return new 

567 

568 @property 

569 def raw(self) -> typing.List[typing.Tuple[bytes, bytes]]: 

570 return self._list 

571 

572 def setdefault(self, key: str, value: str) -> str: 

573 """ 

574 If the header `key` does not exist, then set it to `value`. 

575 Returns the header value. 

576 """ 

577 set_key = key.lower().encode("latin-1") 

578 set_value = value.encode("latin-1") 

579 

580 for _, (item_key, item_value) in enumerate(self._list): 

581 if item_key == set_key: 

582 return item_value.decode("latin-1") 

583 self._list.append((set_key, set_value)) 

584 return value 

585 

586 def update(self, other: typing.Mapping[str, str]) -> None: 

587 for key, val in other.items(): 

588 self[key] = val 

589 

590 def append(self, key: str, value: str) -> None: 

591 """ 

592 Append a header, preserving any duplicate entries. 

593 """ 

594 append_key = key.lower().encode("latin-1") 

595 append_value = value.encode("latin-1") 

596 self._list.append((append_key, append_value)) 

597 

598 def add_vary_header(self, vary: str) -> None: 

599 existing = self.get("vary") 

600 if existing is not None: 

601 vary = ", ".join([existing, vary]) 

602 self["vary"] = vary 

603 

604 

605class State: 

606 """ 

607 An object that can be used to store arbitrary state. 

608 

609 Used for `request.state` and `app.state`. 

610 """ 

611 

612 _state: typing.Dict[str, typing.Any] 

613 

614 def __init__(self, state: typing.Optional[typing.Dict[str, typing.Any]] = None): 

615 if state is None: 

616 state = {} 

617 super().__setattr__("_state", state) 

618 

619 def __setattr__(self, key: typing.Any, value: typing.Any) -> None: 

620 self._state[key] = value 

621 

622 def __getattr__(self, key: typing.Any) -> typing.Any: 

623 try: 

624 return self._state[key] 

625 except KeyError: 

626 return None 

627 

628 def __delattr__(self, key: typing.Any) -> None: 

629 del self._state[key] 

630 

631 

632from typing import Any, Dict, Iterator, ItemsView, KeysView, Sequence, ValuesView 

633 

634 

635class RouteParam: 

636 def __init__(self, data: Dict[str, Any]) -> None: 

637 """Initialize the RouteParam with a dictionary.""" 

638 self.data: Dict[str, Any] = data 

639 

640 def __iter__(self) -> Iterator[str]: 

641 """Return an iterator over the dictionary keys.""" 

642 return iter(self.data) 

643 

644 def __getitem__(self, name: str) -> Any: 

645 """Retrieve a value by key, returning None if the key does not exist.""" 

646 return self.data.get(name, None) 

647 

648 def __getattribute__(self, name: str) -> Any: 

649 """ 

650 Custom attribute access: 

651 - If the attribute exists in `data`, return its value. 

652 - Otherwise, fallback to the default attribute resolution. 

653 """ 

654 data = object.__getattribute__(self, "data") 

655 if name in data: 

656 return data[name] 

657 return object.__getattribute__(self, name) 

658 

659 def get_lists(self) -> ItemsView[str, Any]: 

660 """Return the dictionary's items (key-value pairs).""" 

661 return self.data.items() 

662 

663 def keys(self) -> KeysView[str]: 

664 """Return the dictionary's keys.""" 

665 return self.data.keys() 

666 

667 def values(self) -> ValuesView[Any]: 

668 """Return the dictionary's values.""" 

669 return self.data.values() 

670 

671 def items(self) -> ItemsView[str, Any]: 

672 """Return the dictionary's items (key-value pairs).""" 

673 return self.data.items() 

674 

675 def __repr__(self) -> str: 

676 """Return a string representation of the RouteParam object.""" 

677 return f"<RouteParams {dict(self.data)}>" 

678 

679 def __len__(self) -> int: 

680 """Return the number of items in the dictionary.""" 

681 return len(self.data) 

682 

683 def __call__(self, *args: Any, **kwds: Any) -> Dict[str, Any]: 

684 return self.data 

685 

686 def get(self, key: str, default: Any = None) -> Any: 

687 """Return the value for the given key, or a default value if the key does not exist.""" 

688 return self.data.get(key, default) 

689 

690 def __dict__(self) -> Dict[str, Any]: # type:ignore 

691 return self.data 

692 

693 

694class UploadedFile: 

695 """ 

696 An uploaded file included as part of the request data. 

697 """ 

698 

699 def __init__( 

700 self, 

701 file: typing.BinaryIO, 

702 *, 

703 size: typing.Optional[int] = None, 

704 filename: typing.Optional[str] = None, 

705 headers: typing.Optional[Headers] = None, 

706 ) -> None: 

707 self.filename = filename 

708 self.file = file 

709 self.size = size 

710 self.headers = headers or Headers() 

711 

712 @property 

713 def content_type(self) -> typing.Union[str, None]: 

714 return self.headers.get("content-type", None) 

715 

716 @property 

717 def _in_memory(self) -> bool: 

718 # check for SpooledTemporaryFile._rolled 

719 rolled_to_disk = getattr(self.file, "_rolled", True) 

720 return not rolled_to_disk 

721 

722 async def write(self, data: bytes) -> None: 

723 if self.size is not None: 

724 self.size += len(data) 

725 

726 if self._in_memory: 

727 self.file.write(data) 

728 else: 

729 await run_in_threadpool(self.file.write, data) 

730 

731 async def read(self, size: int = -1) -> bytes: 

732 if self._in_memory: 

733 return self.file.read(size) 

734 return await run_in_threadpool(self.file.read, size) 

735 

736 async def seek(self, offset: int) -> None: 

737 if self._in_memory: 

738 self.file.seek(offset) 

739 else: 

740 await run_in_threadpool(self.file.seek, offset) 

741 

742 async def close(self) -> None: 

743 if self._in_memory: 

744 self.file.close() 

745 else: 

746 await run_in_threadpool(self.file.close) 

747 

748 def __repr__(self) -> str: 

749 return ( 

750 f"{self.__class__.__name__}(" 

751 f"filename={self.filename!r}, " 

752 f"size={self.size!r}, " 

753 f"headers={self.headers!r})" 

754 ) 

755 

756 

757class FormData( 

758 MultiDict[str, typing.Union[UploadedFile, str, Sequence[Any]]] # type:ignore 

759): # type:ignore 

760 

761 def __init__( 

762 self, 

763 *args: typing.Union[ 

764 FormData, 

765 typing.Mapping[str, typing.Union[str, UploadedFile]], 

766 list[tuple[str, typing.Union[str, UploadedFile]]], 

767 ], 

768 **kwargs: typing.Union[str, UploadedFile], 

769 ) -> None: 

770 

771 super().__init__(*args, **kwargs) 

772 

773 async def close(self) -> None: 

774 for _, value in self.multi_items(): 

775 if isinstance(value, UploadedFile): 

776 await value.close() 

777 

778 def get( 

779 self, key: str, default: typing.Any = None 

780 ) -> typing.Union[UploadedFile, str, None]: 

781 """ 

782 Get a value from the form data by key. 

783 

784 Args: 

785 key: The key to look up 

786 default: Value to return if key is not found 

787 

788 Returns: 

789 The value if found, or the default 

790 """ 

791 try: 

792 return self[key] 

793 except KeyError: 

794 return default