Coverage for nexios\_utils\cuncurrency.py: 43%

37 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-21 20:31 +0100

1from __future__ import annotations 

2 

3import functools 

4import sys 

5import typing 

6import warnings 

7 

8import anyio.to_thread 

9 

10if sys.version_info >= (3, 10): 

11 from typing import ParamSpec 

12else: 

13 from typing_extensions import ParamSpec 

14 

15P = ParamSpec("P") 

16T = typing.TypeVar("T") 

17 

18 

19async def run_until_first_complete(*args: tuple[typing.Callable, dict]) -> None: # type: ignore[type-arg] 

20 warnings.warn( 

21 "run_until_first_complete is deprecated and will be removed in a future version.", 

22 DeprecationWarning, 

23 ) 

24 

25 async with anyio.create_task_group() as task_group: 

26 

27 async def run(func: typing.Callable[[], typing.Coroutine]) -> None: # type: ignore[type-arg] 

28 await func() 

29 task_group.cancel_scope.cancel() 

30 

31 for func, kwargs in args: 

32 task_group.start_soon(run, functools.partial(func, **kwargs)) 

33 

34 

35async def run_in_threadpool( 

36 func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs 

37) -> T: 

38 if kwargs: 

39 func = functools.partial(func, **kwargs) 

40 return await anyio.to_thread.run_sync(func, *args) 

41 

42 

43class _StopIteration(Exception): 

44 pass 

45 

46 

47def _next(iterator: typing.Iterator[T]) -> T: 

48 # We can't raise `StopIteration` from within the threadpool iterator 

49 # and catch it outside that context, so we coerce them into a different 

50 # exception type. 

51 try: 

52 return next(iterator) 

53 except StopIteration: 

54 raise _StopIteration 

55 

56 

57async def iterate_in_threadpool( 

58 iterator: typing.Iterable[T], 

59) -> typing.AsyncIterator[T]: 

60 as_iterator = iter(iterator) 

61 while True: 

62 try: 

63 yield await anyio.to_thread.run_sync(_next, as_iterator) 

64 except _StopIteration: 

65 break