Coverage for nexios\_utils\async_helpers.py: 66%

53 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-21 20:31 +0100

1from __future__ import annotations 

2 

3import asyncio 

4import functools 

5import sys 

6import typing 

7from contextlib import contextmanager 

8 

9if sys.version_info >= (3, 10): # pragma: no cover 

10 from typing import TypeGuard 

11else: # pragma: no cover 

12 from typing_extensions import TypeGuard 

13 

14has_exceptiongroups = True 

15if sys.version_info < (3, 11): # pragma: no cover 

16 try: 

17 from exceptiongroup import BaseExceptionGroup 

18 except ImportError: 

19 has_exceptiongroups = False 

20 

21T = typing.TypeVar("T") 

22AwaitableCallable = typing.Callable[..., typing.Awaitable[T]] 

23 

24 

25@typing.overload 

26def is_async_callable(obj: AwaitableCallable[T]) -> TypeGuard[AwaitableCallable[T]]: ... 

27 

28 

29@typing.overload 

30def is_async_callable(obj: typing.Any) -> TypeGuard[AwaitableCallable[typing.Any]]: ... 

31 

32 

33def is_async_callable(obj: typing.Any) -> typing.Any: 

34 while isinstance(obj, functools.partial): 

35 obj = obj.func 

36 

37 return asyncio.iscoroutinefunction(obj) or ( 

38 callable(obj) and asyncio.iscoroutinefunction(obj.__call__) 

39 ) # type:ignore 

40 

41 

42T_co = typing.TypeVar("T_co", covariant=True) 

43 

44 

45class AwaitableOrContextManager( 

46 typing.Awaitable[T_co], typing.AsyncContextManager[T_co], typing.Protocol[T_co] 

47): ... 

48 

49 

50class SupportsAsyncClose(typing.Protocol): 

51 async def close(self) -> None: ... # pragma: no cover 

52 

53 

54SupportsAsyncCloseType = typing.TypeVar( 

55 "SupportsAsyncCloseType", bound=SupportsAsyncClose, covariant=False 

56) 

57 

58 

59class AwaitableOrContextManagerWrapper(typing.Generic[SupportsAsyncCloseType]): 

60 __slots__ = ("aw", "entered") 

61 

62 def __init__(self, aw: typing.Awaitable[SupportsAsyncCloseType]) -> None: 

63 self.aw = aw 

64 

65 def __await__(self) -> typing.Generator[typing.Any, None, SupportsAsyncCloseType]: 

66 return self.aw.__await__() 

67 

68 async def __aenter__(self) -> SupportsAsyncCloseType: 

69 self.entered = await self.aw 

70 return self.entered 

71 

72 async def __aexit__(self, *args: typing.Any) -> typing.Optional[bool]: 

73 await self.entered.close() 

74 return None 

75 

76 

77@contextmanager 

78def collapse_excgroups() -> typing.Generator[None, None, None]: 

79 try: 

80 yield 

81 except BaseException as exc: 

82 if has_exceptiongroups: 

83 while isinstance(exc, BaseExceptionGroup) and len(exc.exceptions) == 1: 

84 exc = exc.exceptions[0] # pragma: no cover 

85 

86 raise exc 

87 

88 

89def get_route_path(scope) -> str: 

90 path: str = scope["path"] 

91 root_path = scope.get("root_path", "") 

92 if not root_path: 

93 return path 

94 

95 if not path.startswith(root_path): 

96 return path 

97 

98 if path == root_path: 

99 return "" 

100 

101 if path[len(root_path)] == "/": 

102 return path[len(root_path) :] 

103 

104 return path