Metadata-Version: 2.1
Name: cs-naysync
Version: 20250103
Summary: An attempt at comingling async-code and nonasync-code in an ergonomic way.
Author-email: Cameron Simpson <cs@cskk.id.au>
License: GNU General Public License v3 or later (GPLv3+)
Project-URL: Monorepo Hg/Mercurial Mirror, https://hg.sr.ht/~cameron-simpson/css
Project-URL: Monorepo Git Mirror, https://github.com/cameron-simpson/css
Project-URL: MonoRepo Commits, https://bitbucket.org/cameron_simpson/css/commits/branch/main
Project-URL: Source, https://github.com/cameron-simpson/css/blob/main/lib/python/cs/naysync.py
Keywords: python3
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Description-Content-Type: text/markdown
Requires-Dist: cs.deco>=20250103
Requires-Dist: cs.semantics>=20250103

An attempt at comingling async-code and nonasync-code in an ergonomic way.

*Latest release 20250103*:
* @afunc now accepts an async function and returns it unchanged.
* @agen now accepts an async generator and returns it unchanged.
* async_iter: now accepts an async iterable, return aiter(it) of it directly.
* New AnyIterable = Union[Iterable, AsyncIterable] type alias, to allow both sync and async iterators.
* async_iter: new optional fast=False parameter, if true then iterate the iterator directly instead of via asyncio.to_thread.
* async_iter: make missing `fast=` be True for list/tuple/set and False otherwise.
* @afunc: new optional fast=False parameter - if true then do not divert through asyncio.to_thread.
* New AsyncPipeLine, an asynchronous iterable with a `put` method to provide input for processing.
* New StageMode class with a STREAM enum for streaming stages, implement in AsyncPipeLine.run_stage.
* New aqget(Queue), an async interface to queue.Queue.get.
* New aqiter(Queue[,sentinel]), an async generator yielding from a queue.Queue.

One of the difficulties in adapting non-async code for use in
an async world is that anything asynchronous needs to be turtles
all the way down: a single blocking synchronous call anywhere
in the call stack blocks the async event loop.

This module presently provides:
- `@afunc`: a decorator to make a synchronous function asynchronous
- `@agen`: a decorator to make a synchronous generator asynchronous
- `amap(func,iterable)`: asynchronous mapping of `func` over an iterable
- `aqget(q)`: asynchronous function to get an item from a `queue.Queue` or similar
- `aqiter(q)`: asynchronous generator to yield items from a `queue.Queue` or similar
- `async_iter(iterable)`: return an asynchronous iterator of an iterable
- `IterableAsyncQueue`: an iterable flavour of `asyncio.Queue` with no `get` methods
- `AsyncPipeLine`: a pipeline of functions connected together with `IterableAsyncQueue`s

## <a name="afunc"></a>`afunc(*da, **dkw)`

A decorator for a synchronous function which turns it into
an asynchronous function.
If `func` is already an asynchronous function it is returned unchanged.
If `fast` is true (default `False`) then `func` is presumed to consume
negligible time and it is simply wrapped in an asynchronous function.
Otherwise it is wrapped in `asyncio.to_thread`.

Example:

    @afunc
    def func(count):
        time.sleep(count)
        return count

    slept = await func(5)

    @afunc(fast=True)
    def asqrt(n):
        return math.sqrt(n)

## <a name="agen"></a>`agen(*da, **dkw)`

A decorator for a synchronous generator which turns it into
an asynchronous generator.
If `genfunc` already an asynchronous generator it is returned unchanged.
Exceptions in the synchronous generator are reraised in the asynchronous
generator.

Example:

    @agen
    def gen(count):
        for i in range(count):
            yield i
            time.sleep(1.0)

    async for item in gen(5):
        print(item)

## <a name="amap"></a>`amap(func: Callable[[Any], Any], it: Union[Iterable, AsyncIterable], concurrent=False, unordered=False, indexed=False)`

An asynchronous generator yielding the results of `func(item)`
for each `item` in the iterable `it`.

`it` may be a synchronous or asynchronous iterable.

`func` may be a synchronous or asynchronous callable.

If `concurrent` is `False` (the default), run each `func(item)`
call in series.

If `concurrent` is true run the function calls as `asyncio`
tasks concurrently.
If `unordered` is true (default `False`) yield results as
they arrive, otherwise yield results in the order of the items
in `it`, but as they arrive - tasks still evaluate concurrently
if `concurrent` is true.

If `indexed` is true (default `False`) yield 2-tuples of
`(i,result)` instead of just `result`, where `i` is the index
if each item from `it` counting from `0`.

Example of an async function to fetch URLs in parallel.

    async def get_urls(urls : List[str]):
        """ Fetch `urls` in parallel.
            Yield `(url,response)` 2-tuples.
        """
        async for i, response in amap(
            requests.get, urls,
            concurrent=True, unordered=True, indexed=True,
        ):
            yield urls[i], response

## <a name="aqget"></a>`aqget(q: queue.Queue)`

An asynchronous function to get an item from a `queue.Queue`like object `q`.
It must support the `.get()` and `.get_nowait()` methods.

## <a name="aqiter"></a>`aqiter(q: queue.Queue, sentinel=<object object at 0x10f37a300>)`

An asynchronous generator to yield items from a `queue.Queue`like object `q`.
It must support the `.get()` and `.get_nowait()` methods.

An optional `sentinel` object may be supplied, which ends iteration
if encountered. If a sentinel is specified then this must be the only
consumer of the queue because the sentinel is consumed.

## <a name="async_iter"></a>`async_iter(it: Union[Iterable, AsyncIterable], fast=None)`

Return an asynchronous iterator yielding items from the iterable `it`.
An asynchronous iterable returns `aiter(it)` directly.

If `fast` is true then `it` is iterated directly instead of
via a distinct async generator. If not specified, `fast` is
set to `True` if `it` is a `list` or `tuple` or `set`. A true
value for this parameter indicates that fetching the next
item from `it` is always effectively instant and never blocks.

## <a name="AsyncPipeLine"></a>Class `AsyncPipeLine`

An `AsyncPipeLine` is an asynchronous iterable with a `put` method
to provide input for processing.

A new pipeline is usually constructed via the factory method
`AsyncPipeLine.from_stages(stage_func,...)`.

It has the same methods as an `IterableAsyncQueue`:
- `async put(item)` to queue an item for processing
- `async close()` to close the input, indicating end of the input items
- iteration to consume the processed results

It also has the following methods:
- `async submit(AnyIterable)` to submit multiple items for processing
- `async __call__(AnyIterable)` to submit the iterable for
  processing and consume the results by iteration


Example:

    def double(item):
        yield item
        yield item
    pipeline = AsyncPipeLine.from_stages(
        double,
        double,
    )
    async for result in pipeline([1,2,3,4]):
        print(result)

*`AsyncPipeLine.__call__(self, it: Union[Iterable, AsyncIterable], fast=None)`*:
Call the pipeline with an iterable.

*`AsyncPipeLine.close(self)`*:
Close the input queue.

*`AsyncPipeLine.from_stages(*stage_specs, maxsize=0) -> Tuple[cs.naysync.IterableAsyncQueue, cs.naysync.IterableAsyncQueue]`*:
Prepare an `AsyncPipeLine` from stage specifications.
Return `(inq,tasks,outq)` 3-tuple being an input `IterableAsyncQueue`
to receive items to process, a list of `asyncio.Task`s per
stage specification, and an output `IterableAsyncQueue` to
produce results. If there are no stage_specs the 2 queues
are the same queue.

Each stage specification is either:
- an stage function suitable for `run_stage`
- a 2-tuple of `(stage_func,batchsize)`
In the latter case:
- `stage_func` is an stage function suitable for `run_stage`
- `batchsize` is an `int`, where `0` means to gather all the
  items from `inq` and supply them as a single batch to
  `stage_func` and where a value `>0` collects items up to a limit
  of `batchsize` and supplies each batch to `stage_func`
If the `batchsize` is `0` the `stage_func` is called exactly
once with all the input items, even if there are no input
items.

*`AsyncPipeLine.put(self, item)`*:
Put `item` onto the input queue.

*`AsyncPipeLine.run_stage(inq: cs.naysync.IterableAsyncQueue, stage_func, outq: cs.naysync.IterableAsyncQueue, batchsize: Union[int, NoneType, cs.naysync.StageMode] = None)`*:
Run a pipeline stage, copying items from `inq` to the `stage_func`
and putting results onto `outq`. After processing, `outq` is
closed.

`stage_func` is a callable which may be:
- a sync or async generator which yields results to place onto `outq`
- a sync or async function which returns a single result

If `batchsize` is `None`, the default, each input item is
passed to `stage_func(item)`, which yields the results from the
single item.

If `batchsize` is an `int`, items from `inq` are collected
into batches up to a limit of `batchsize` (no limit if
`batchsize` is `0`) and passed to `stage_func(batch)`, which
yields the results from the batch of items.
If the `batchsize` is `0` the `stage_func` is called exactly
once with all the input items, even if there are no input
items.

*`AsyncPipeLine.submit(self, it: Union[Iterable, AsyncIterable], fast=None)`*:
Submit the items from `it` to the pipeline.

## <a name="IterableAsyncQueue"></a>Class `IterableAsyncQueue(asyncio.queues.Queue)`

An iterable subclass of `asyncio.Queue`.

This modifies `asyncio.Queue` by:
- adding a `.close()` async method
- making the queue iterable, with each iteration consuming an item via `.get()`

*`IterableAsyncQueue.__anext__(self)`*:
Fetch the next item from the queue.

*`IterableAsyncQueue.close(self)`*:
Close the queue.
It is not an error to close the queue more than once.

*`IterableAsyncQueue.get(self)`*:
We do not allow `.get()`.

*`IterableAsyncQueue.get_nowat(self)`*:
We do not allow `.get_nowait()`.

*`IterableAsyncQueue.put(self, item)`*:
Put `item` onto the queue.

*`IterableAsyncQueue.put_nowait(self, item)`*:
Put an item onto the queue without blocking.

## <a name="StageMode"></a>Class `StageMode(enum.StrEnum)`

Special modes for `AsyncPipeLine` pipeline stages.

*`StageMode.__format__`*

*`StageMode.__str__`*

# Release Log



*Release 20250103*:
* @afunc now accepts an async function and returns it unchanged.
* @agen now accepts an async generator and returns it unchanged.
* async_iter: now accepts an async iterable, return aiter(it) of it directly.
* New AnyIterable = Union[Iterable, AsyncIterable] type alias, to allow both sync and async iterators.
* async_iter: new optional fast=False parameter, if true then iterate the iterator directly instead of via asyncio.to_thread.
* async_iter: make missing `fast=` be True for list/tuple/set and False otherwise.
* @afunc: new optional fast=False parameter - if true then do not divert through asyncio.to_thread.
* New AsyncPipeLine, an asynchronous iterable with a `put` method to provide input for processing.
* New StageMode class with a STREAM enum for streaming stages, implement in AsyncPipeLine.run_stage.
* New aqget(Queue), an async interface to queue.Queue.get.
* New aqiter(Queue[,sentinel]), an async generator yielding from a queue.Queue.

*Release 20241221.1*:
Doc fix for amap().

*Release 20241221*:
* Simpler implementation of @afunc.
* Simplify implementation of @agen by using async_iter.
* Docstring improvements.

*Release 20241220*:
* New async_iter(Iterable) returning an asynchronous iterator of a synchronous iterable.
* New amap(func,iterable) asynchronously mapping a function over an iterable.

*Release 20241215*:
* @afunc: now uses asyncio.to_thread() instead of wrapping @agen, drop the decorator parameters since no queue or polling are now used.
* @agen: nonpolling implementation - now uses asyncio.to_thread() for the next(genfunc) step, drop the decorator parameters since no queue or polling are now used.

*Release 20241214.1*:
Doc update.

*Release 20241214*:
Initial release with @agen and @afunc decorators.
