sain.futures
Abstractions for threading / asynchronous programming.
1# BSD 3-Clause License 2# 3# Copyright (c) 2022-Present, nxtlo 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 9# * Redistributions of source code must retain the above copyright notice, this 10# list of conditions and the following disclaimer. 11# 12# * Redistributions in binary form must reproduce the above copyright notice, 13# this list of conditions and the following disclaimer in the documentation 14# and/or other materials provided with the distribution. 15# 16# * Neither the name of the copyright holder nor the names of its 17# contributors may be used to endorse or promote products derived from 18# this software without specific prior written permission. 19# 20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 30"""Abstractions for threading / asynchronous programming.""" 31 32from __future__ import annotations 33 34__all__ = ("join", "loop") 35 36import asyncio 37import enum 38import typing 39 40from . import result as _result 41 42if typing.TYPE_CHECKING: 43 import collections.abc as collections 44 45 T_co = typing.TypeVar("T_co", covariant=True) 46 T = typing.TypeVar("T", bound=collections.Callable[..., typing.Any]) 47 48 49class JoinError(enum.Enum): 50 EMPTY = 0 51 """No awaitables were passed.""" 52 CANCELED = 1 53 """The future gatherer were canceled.""" 54 TIMEOUT = 2 55 """The future gatherer timed-out.""" 56 57 58async def join( 59 *aws: collections.Awaitable[T_co], 60 timeout: float | None = None, 61) -> _result.Result[collections.Sequence[T_co], JoinError]: 62 """Polls multiple awaitables concurrently, returning a sequence of their results once complete. 63 64 Example 65 ------- 66 ```py 67 async def one() -> int: 68 return 1 69 70 async def two() -> int: 71 return 2 72 73 x, y = (await join(one(), two())).unwrap() 74 ``` 75 76 Parameters 77 ---------- 78 *aws : `collections.Awaitable[T]` 79 The awaitables to gather. 80 timeout : `float | None` 81 An optional timeout. 82 83 Returns 84 ------- 85 `sain.Result[T, JoinError]`: 86 The result of the gathered awaitables. 87 """ 88 89 if not aws: 90 return _result.Err(JoinError.EMPTY) 91 92 tasks: list[asyncio.Task[T_co]] = [] 93 94 tasks.extend(asyncio.ensure_future(coro) for coro in aws) 95 gatherer = asyncio.gather(*tasks) 96 try: 97 return _result.Ok(await asyncio.wait_for(gatherer, timeout=timeout)) 98 99 except asyncio.CancelledError: 100 return _result.Err(JoinError.CANCELED) 101 except asyncio.TimeoutError: 102 return _result.Err(JoinError.TIMEOUT) 103 104 finally: 105 for task in tasks: 106 if not task.done() and not task.cancelled(): 107 task.cancel() 108 gatherer.cancel() 109 110 111# source: hikari-py/aio.py 112def loop() -> asyncio.AbstractEventLoop: 113 """Get the current usable event loop or create a new one. 114 115 Returns 116 ------- 117 `asyncio.AbstractEventLoop` 118 """ 119 try: 120 loop = asyncio.get_event_loop_policy().get_event_loop() 121 122 if not loop.is_closed(): 123 return loop 124 125 except RuntimeError: 126 pass 127 128 loop = asyncio.new_event_loop() 129 asyncio.set_event_loop(loop) 130 return loop
async def
join( *aws: Awaitable[+T_co], timeout: float | None = None) -> '_result.Result[collections.Sequence[T_co], JoinError]':
59async def join( 60 *aws: collections.Awaitable[T_co], 61 timeout: float | None = None, 62) -> _result.Result[collections.Sequence[T_co], JoinError]: 63 """Polls multiple awaitables concurrently, returning a sequence of their results once complete. 64 65 Example 66 ------- 67 ```py 68 async def one() -> int: 69 return 1 70 71 async def two() -> int: 72 return 2 73 74 x, y = (await join(one(), two())).unwrap() 75 ``` 76 77 Parameters 78 ---------- 79 *aws : `collections.Awaitable[T]` 80 The awaitables to gather. 81 timeout : `float | None` 82 An optional timeout. 83 84 Returns 85 ------- 86 `sain.Result[T, JoinError]`: 87 The result of the gathered awaitables. 88 """ 89 90 if not aws: 91 return _result.Err(JoinError.EMPTY) 92 93 tasks: list[asyncio.Task[T_co]] = [] 94 95 tasks.extend(asyncio.ensure_future(coro) for coro in aws) 96 gatherer = asyncio.gather(*tasks) 97 try: 98 return _result.Ok(await asyncio.wait_for(gatherer, timeout=timeout)) 99 100 except asyncio.CancelledError: 101 return _result.Err(JoinError.CANCELED) 102 except asyncio.TimeoutError: 103 return _result.Err(JoinError.TIMEOUT) 104 105 finally: 106 for task in tasks: 107 if not task.done() and not task.cancelled(): 108 task.cancel() 109 gatherer.cancel()
Polls multiple awaitables concurrently, returning a sequence of their results once complete.
Example
async def one() -> int:
return 1
async def two() -> int:
return 2
x, y = (await join(one(), two())).unwrap()
Parameters
- *aws (
collections.Awaitable[T]): The awaitables to gather. - timeout (
float | None): An optional timeout.
Returns
sain.Result[T, JoinError]:: The result of the gathered awaitables.
def
loop() -> asyncio.events.AbstractEventLoop:
113def loop() -> asyncio.AbstractEventLoop: 114 """Get the current usable event loop or create a new one. 115 116 Returns 117 ------- 118 `asyncio.AbstractEventLoop` 119 """ 120 try: 121 loop = asyncio.get_event_loop_policy().get_event_loop() 122 123 if not loop.is_closed(): 124 return loop 125 126 except RuntimeError: 127 pass 128 129 loop = asyncio.new_event_loop() 130 asyncio.set_event_loop(loop) 131 return loop
Get the current usable event loop or create a new one.
Returns
asyncio.AbstractEventLoop