Metadata-Version: 2.1
Name: coroflow
Version: 4.0.1
Summary: Asynchronous pipeline builder
Home-page: https://github.com/dewaldabrie/coroflow/
Keywords: asyncio,pipelines,fast,threads,airflow,coroutines
Author: Dewald Abrie
Author-email: dewaldabrie@gmail.com
Requires-Python: >=3.6,<4.0
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Requires-Dist: anytree (>=2.8.0,<3.0.0)
Project-URL: Documentation, https://coroflow.readthedocs.io/en/latest/
Project-URL: Issues, https://github.com/dewaldabrie/coroflow/issues
Project-URL: Repository, https://github.com/dewaldabrie/coroflow/
Description-Content-Type: text/markdown

# Coroflow: Easy and Fast Pipelines

Coroflow makes it easy to run pipelines with coroutines and also support mixing
in blocking functions and generators.

Coroflow does a lot of heavy-lifting for you:

* Manage all tasks in the pipelinen concurently in one thread using coroutines
* Pass data between tasks with queues
* Easily specify concurrency limits
* Connect stages of the pipeline with fan-out/fan-in patterns or load-balancer patterns
* Define tasks as coroutines, normal (blocking) functions, async generators or normal generators; coroflow will run it appropriately 
  in either the event-loop, a thread pool, or optionally in a processes pool
* Provides an apache-ariflow-like api for connecting tasks



## Getting Started


Coroflow makes it easy to run pipelines with coroutines and also support mixing
in blocking functions and generators

```python
    from coroflow import Node, Pipeline
    import asyncio
    import time


    class GenNode(Node):
        async def execute():
            """
            The execute method of the first/root Node has to be a generator,
            either async or synchronous.
            """
            for url in ['img_url_1', 'img_url_2', 'img_url_3']:
                print(f"Yielding {url}")
                await asyncio.sleep(1)
                yield url
            print("Generator is exhausted")
            return


    class DoSomething(Node):
        async def execute(inpt, param=None):
            """
            The execute method of all non-root Nodes should be a async
            or synchronous method.
            """
            # do your async pipelined work
            await asyncio.sleep(1)  # simulated IO delay
            outp = inpt
            print(f"func1: T1 sending {inpt}")
            return outp


    p = Pipeline()
    t0 = GenNode('gen', p)
    t1 = DoSomething('func1', p, kwargs={'param': 'param_t1'})
    t2 = DoSomething('func2', p, kwargs={'param': 'param_t2'})
    t0.set_downstream(t1)
    t1.set_downstream(t2)


    start_time = time.time()
    p.run()
    print(f"Asynchronous duration: {time.time() - start_time}s.")
```

# Tests

Run like so:

$ pytest
