sain.futures

Abstractions for threading / asynchronous programming.

  1# BSD 3-Clause License
  2#
  3# Copyright (c) 2022-Present, nxtlo
  4# All rights reserved.
  5#
  6# Redistribution and use in source and binary forms, with or without
  7# modification, are permitted provided that the following conditions are met:
  8#
  9# * Redistributions of source code must retain the above copyright notice, this
 10#   list of conditions and the following disclaimer.
 11#
 12# * Redistributions in binary form must reproduce the above copyright notice,
 13#   this list of conditions and the following disclaimer in the documentation
 14#   and/or other materials provided with the distribution.
 15#
 16# * Neither the name of the copyright holder nor the names of its
 17#   contributors may be used to endorse or promote products derived from
 18#   this software without specific prior written permission.
 19#
 20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
 24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 30"""Abstractions for threading / asynchronous programming."""
 31
 32from __future__ import annotations
 33
 34__all__ = ("join", "loop")
 35
 36import asyncio
 37import enum
 38import typing
 39
 40from . import result as _result
 41
 42if typing.TYPE_CHECKING:
 43    import collections.abc as collections
 44
 45    T_co = typing.TypeVar("T_co", covariant=True)
 46    T = typing.TypeVar("T", bound=collections.Callable[..., typing.Any])
 47
 48
 49class JoinError(enum.Enum):
 50    EMPTY = 0
 51    """No awaitables were passed."""
 52    CANCELED = 1
 53    """The future gatherer were canceled."""
 54    TIMEOUT = 2
 55    """The future gatherer timed-out."""
 56
 57
 58async def join(
 59    *aws: collections.Awaitable[T_co],
 60    timeout: float | None = None,
 61) -> _result.Result[collections.Sequence[T_co], JoinError]:
 62    """Polls multiple awaitables concurrently, returning a sequence of their results once complete.
 63
 64    Example
 65    -------
 66    ```py
 67    async def one() -> int:
 68        return 1
 69
 70    async def two() -> int:
 71        return 2
 72
 73    x, y = (await join(one(), two())).unwrap()
 74    ```
 75
 76    Parameters
 77    ----------
 78    *aws : `collections.Awaitable[T]`
 79        The awaitables to gather.
 80    timeout : `float | None`
 81        An optional timeout.
 82
 83    Returns
 84    -------
 85    `sain.Result[T, JoinError]`:
 86        The result of the gathered awaitables.
 87    """
 88
 89    if not aws:
 90        return _result.Err(JoinError.EMPTY)
 91
 92    tasks: list[asyncio.Task[T_co]] = []
 93
 94    tasks.extend(asyncio.ensure_future(coro) for coro in aws)
 95    gatherer = asyncio.gather(*tasks)
 96    try:
 97        return _result.Ok(await asyncio.wait_for(gatherer, timeout=timeout))
 98
 99    except asyncio.CancelledError:
100        return _result.Err(JoinError.CANCELED)
101    except asyncio.TimeoutError:
102        return _result.Err(JoinError.TIMEOUT)
103
104    finally:
105        for task in tasks:
106            if not task.done() and not task.cancelled():
107                task.cancel()
108        gatherer.cancel()
109
110
111# source: hikari-py/aio.py
112def loop() -> asyncio.AbstractEventLoop:
113    """Get the current usable event loop or create a new one.
114
115    Returns
116    -------
117    `asyncio.AbstractEventLoop`
118    """
119    try:
120        loop = asyncio.get_event_loop_policy().get_event_loop()
121
122        if not loop.is_closed():
123            return loop
124
125    except RuntimeError:
126        pass
127
128    loop = asyncio.new_event_loop()
129    asyncio.set_event_loop(loop)
130    return loop
async def join( *aws: Awaitable[+T_co], timeout: float | None = None) -> '_result.Result[collections.Sequence[T_co], JoinError]':
 59async def join(
 60    *aws: collections.Awaitable[T_co],
 61    timeout: float | None = None,
 62) -> _result.Result[collections.Sequence[T_co], JoinError]:
 63    """Polls multiple awaitables concurrently, returning a sequence of their results once complete.
 64
 65    Example
 66    -------
 67    ```py
 68    async def one() -> int:
 69        return 1
 70
 71    async def two() -> int:
 72        return 2
 73
 74    x, y = (await join(one(), two())).unwrap()
 75    ```
 76
 77    Parameters
 78    ----------
 79    *aws : `collections.Awaitable[T]`
 80        The awaitables to gather.
 81    timeout : `float | None`
 82        An optional timeout.
 83
 84    Returns
 85    -------
 86    `sain.Result[T, JoinError]`:
 87        The result of the gathered awaitables.
 88    """
 89
 90    if not aws:
 91        return _result.Err(JoinError.EMPTY)
 92
 93    tasks: list[asyncio.Task[T_co]] = []
 94
 95    tasks.extend(asyncio.ensure_future(coro) for coro in aws)
 96    gatherer = asyncio.gather(*tasks)
 97    try:
 98        return _result.Ok(await asyncio.wait_for(gatherer, timeout=timeout))
 99
100    except asyncio.CancelledError:
101        return _result.Err(JoinError.CANCELED)
102    except asyncio.TimeoutError:
103        return _result.Err(JoinError.TIMEOUT)
104
105    finally:
106        for task in tasks:
107            if not task.done() and not task.cancelled():
108                task.cancel()
109        gatherer.cancel()

Polls multiple awaitables concurrently, returning a sequence of their results once complete.

Example
async def one() -> int:
    return 1

async def two() -> int:
    return 2

x, y = (await join(one(), two())).unwrap()
Parameters
  • *aws (collections.Awaitable[T]): The awaitables to gather.
  • timeout (float | None): An optional timeout.
Returns
  • sain.Result[T, JoinError]:: The result of the gathered awaitables.
def loop() -> asyncio.events.AbstractEventLoop:
113def loop() -> asyncio.AbstractEventLoop:
114    """Get the current usable event loop or create a new one.
115
116    Returns
117    -------
118    `asyncio.AbstractEventLoop`
119    """
120    try:
121        loop = asyncio.get_event_loop_policy().get_event_loop()
122
123        if not loop.is_closed():
124            return loop
125
126    except RuntimeError:
127        pass
128
129    loop = asyncio.new_event_loop()
130    asyncio.set_event_loop(loop)
131    return loop

Get the current usable event loop or create a new one.

Returns
  • asyncio.AbstractEventLoop