Metadata-Version: 2.1
Name: storey
Version: 1.7.3
Summary: Async flows
Home-page: https://github.com/mlrun/storey
Author: Iguazio
Author-email: yaronh@iguazio.com
License: Apache
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: POSIX :: Linux
Classifier: Operating System :: Microsoft :: Windows
Classifier: Operating System :: MacOS
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Software Development :: Libraries
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: aiohttp~=3.8
Requires-Dist: v3io~=0.6.2
Requires-Dist: pandas!=1.5.*,<2.2,>=1
Requires-Dist: numpy<1.27,>=1.16.5
Requires-Dist: pyarrow<15,>=1
Requires-Dist: v3io-frames~=0.10.9
Requires-Dist: fsspec>=0.6.2
Requires-Dist: v3iofs~=0.1.17
Requires-Dist: xxhash>=1
Requires-Dist: nuclio-sdk>=0.5.3
Provides-Extra: kafka
Requires-Dist: kafka-python~=2.0; extra == "kafka"
Provides-Extra: redis
Requires-Dist: redis~=4.3; extra == "redis"
Provides-Extra: sqlalchemy
Requires-Dist: sqlalchemy~=1.3; extra == "sqlalchemy"

# Storey

[![CI](https://github.com/mlrun/storey/workflows/CI/badge.svg)](https://github.com/mlrun/storey/actions?query=workflow%3ACI)

Storey is an asynchronous streaming library, for real time event processing and feature extraction.

#### In This Document

- [API Walkthrough](#api-walkthrough)
- [Usage Examples](#examples)

&#x25B6; For more information, see the [Storey Python package documentation](https://storey.readthedocs.io).

<a id="api-walkthrough"></a>
## API Walkthrough
A Storey flow consist of steps linked together by the `build_flow` function, each doing it's designated work.

### Supported Steps
#### Input Steps
* `SyncEmitSource` 
* `AsyncEmitSource` 
* `CSVSource`  
* `ParquetSource` 
* `DataframeSource`  

#### Processing Steps
* `Filter` 
* `Map` 
* `FlatMap`
* `MapWithState`
* `Batch(max_events, timeout)` - Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received.
* `Choice`  
* `JoinWithV3IOTable` 
* `SendToHttp` 
* `AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)` - This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features.
* `QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)` - Similar to to `AggregateByKey`, but this step is for serving only and does not aggregate the event.
* `NoSqlTarget(table)` - Persists the data in `table` to its associated storage by key.
* `Extend`
* `JoinWithTable`  

#### Output Steps
* `Complete`  
* `Reduce` 
* `StreamTarget` 
* `CSVTarget`
* `ReduceToDataFrame`
* `TSDBTarget`
* `ParquetTarget`     


<a id="examples"></a>
## Usage Examples

### Using Aggregates
The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing.

```python
from storey import build_flow, SyncEmitSource, Table, V3ioDriver, AggregateByKey, FieldAggregator, NoSqlTarget
from storey.dtypes import SlidingWindows

v3io_web_api = "https://webapi.change-me.com"
v3io_acceess_key = "1284ne83-i262-46m6-9a23-810n41f169ea"
table_object = Table("/projects/my_features", V3ioDriver(v3io_web_api, v3io_acceess_key))

def enrich(event, state):
    if "first_activity" not in state:
        state["first_activity"] = event.time
    event.body["time_since_activity"] = (event.body["time"] - state["first_activity"]).seconds
    state["last_event"] = event.time
    event.body["total_activities"] = state["total_activities"] = state.get("total_activities", 0) + 1
    return event, state

controller = build_flow([
    SyncEmitSource(),
    MapWithState(table_object, enrich, group_by_key=True, full_event=True),
    AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"],
                                    SlidingWindows(["1h","2h", "24h"], "10m")),
                    FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
                                    SlidingWindows(["1h","2h", "24h"], "10m")),
                    FieldAggregator("failed_activities", "activity", ["count"],
                                    SlidingWindows(["1h"], "10m"),
                                    aggr_filter=lambda element: element["activity_status"] == "fail"))],
                   table_object,
                   time_field="time"),
    NoSqlTarget(table_object),
    StreamTarget(V3ioDriver(v3io_web_api, v3io_acceess_key), "features_stream")
]).run()
```

We can also create a serving function, which sole purpose is to read data from the feature store and emit it further

```python
controller = build_flow([
    SyncEmitSource(),
    QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"],
                                           SlidingWindows(["1h","2h", "24h"], "10m")),
                           FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
                                           SlidingWindows(["1h","2h", "24h"], "10m")),
                           FieldAggregator("failed_activities", "activity", ["count"],
                                           SlidingWindows(["1h"], "10m"),
                                           aggr_filter=lambda element: element["activity_status"] == "fail"))],
                           table_object,
                           time_field="time")
]).run()
```
