{
   "cells":[
      {
         "cell_type":"markdown",
         "source":[
            "#Preprocessing\n\n###Notebook with the preprocessing tasks for the project: project_name\n\n##### - Model input:model_input_path\n\n##### - Model output:model_output_path\n\n##### - Model model_artifacts_path"
         ],
         "metadata":{
            
         }
      },
      {
         "cell_type":"markdown",
         "source":[
            "##1 Discovering Butterfree - Feature Set Basics\n\nWelcome to **Discovering Butterfree** tutorial series!\n\nThis first tutorial will cover some basics of Butterfree library and you learn how to create your first feature set :rocket: :rocket:\n\nBefore diving into the tutorial make sure you have a basic understanding of these main data concepts: **features**, **feature sets** and the **\"Feature Store Architecture\"**, you can read more about this [here]().\n\n## Library Basics:\n\nButerfree's main objective is to make feature engineering easy. The library provides a high-level API for declarative feature definitions. But behind these abstractions, Butterfree is essentially an **ETL (Extract - Transform - Load)** framework, so this reflects in terms of the organization of the project.\n\n### Extract\n\n`from butterfree.extract import ...`\n\nModule with the entities responsible for extracting data into the pipeline. The module provides the following tools:\n\n* `readers`: data connectors. Currently Butterfree provides readers for files, tables registered in Spark Hive metastore, and Kafka topics.\n\n\n* `pre_processing`: a utility tool for making some transformations or re-arrange the structure of the reader's input data before the feature engineering.\n\n\n* `source`: a composition of `readers`. The entity responsible for merging datasets coming from the defined readers into a single dataframe input for the `Transform` stage.\n\n### Transform\n\n`from butterfree.transform import ...`\n\nThe main module of the library, responsible for feature engineering, in other words, all the transformations on the data. The module provides the following main tools:\n\n* `features`: the entity that defines what a feature is. Holds a transformation and metadata about the feature.\n\n\n* `transformations`: provides a set of components for transforming the data, with the possibility to use Spark native functions, aggregations, SQL expressions and others. \n\n\n* `feature_set`: an entity that defines a feature set. Holds features and the metadata around it.\n\n\n### Load\n\n`from butterfree.load import ...`\n\nThe module is responsible for saving the data in some data storage. The module provides the following tools:\n\n* `writers`: provide connections to data sources to write data. Currently Butterfree provides ways to save data on S3 registered as tables Spark Hive metastore and to Cassandra DB.\n\n\n* `sink`: a composition of writers. The entity responsible for triggering the writing jobs on a set of defined writers\n\n### Pipelines\n\nPipelines are responsible for integrating all other modules (`extract`, `transform`, `load`) in order to define complete ETL jobs from source data to data storage destination.\n\n`from butterfree.pipelines import ...`\n\n* `feature_set_pipeline`: defines an ETL pipeline for creating feature sets."
         ],
         "metadata":{
            
         }
      },
      {
         "cell_type":"markdown",
         "source":[
            "## Install Butterfree:"
         ],
         "metadata":{
            
         }
      },
      {
         "cell_type":"code",
         "source":[
            "dbutils.library.installPyPI(\"butterfree\")"
         ],
         "metadata":{
            
         },
         "outputs":[
            {
               "metadata":{
                  
               },
               "output_type":"display_data",
               "data":{
                  "text/html":[
                     "<style scoped>\n  .ansiout {\n    display: block;\n    unicode-bidi: embed;\n    white-space: pre-wrap;\n    word-wrap: break-word;\n    word-break: break-all;\n    font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n    font-size: 13px;\n    color: #555;\n    margin-left: 4px;\n    line-height: 19px;\n  }\n</style>\n<div class=\"ansiout\">Out[1]: True</div>"
                  ]
               }
            }
         ],
         "execution_count":4
      },
      {
         "cell_type":"markdown",
         "source":[
            "## Example:\nSimulating the following scenario:\n\n- We want to create a feature set with features about houses for rent (listings).\n\n- We are interested in houses only for the **Kanto** region.\n\nWe have two sets of data:\n\n- Table: `listing_events`. Table with data about events of house listings.\n- File: `region.json`. Static file with data about the cities and regions.\n\nOur desire is to have result dataset with the following schema:\n\n| id | timestamp | rent | rent_over_area | bedrooms | bathrooms | area | bedrooms_over_area | bathrooms_over_area | latitude | longitude | h3 | city | region \n| - | - | - | - | - | - | - | - | - | - | - | - | - | - |\n| int | timestamp | float | float | int | int | float | float | float | double | double | string | string | string |\n\nFor more information about H3 geohash click [here]()\n\nThe following code blocks will show how to generate this feature set using Butterfree library:"
         ],
         "metadata":{
            
         }
      },
      {
         "cell_type":"code",
         "source":[
            "# setup spark\nfrom pyspark import SparkContext, SparkConf\nfrom pyspark.sql import session\nimport  pyspark.sql.functions as F\n# butterfree spark client\nfrom butterfree.clients import SparkClient\n\nspark_client = SparkClient()\n"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":6
      },
      {
         "cell_type":"markdown",
         "source":[
            "### Showing test data"
         ],
         "metadata":{
            
         }
      },
      {
         "cell_type":"code",
         "source":[
            "listing_evengs_df = spark.read.json(f\"{path}/examples/data/listing_events.json\")\nlisting_evengs_df.createOrReplaceTempView(\"listing_events\")  # creating listing_events table\n\nprint(\">>> listing_events table:\")\nlisting_evengs_df.toPandas()"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":8
      },
      {
         "cell_type":"code",
         "source":[
            "print(\">>> region.json file:\")\nspark.read.json(f\"{path}/examples/data/region.json\").toPandas()"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":9
      },
      {
         "cell_type":"markdown",
         "source":[
            "### Extract\n\n- For the extract part, we need the `Source` entity and the `FileReader` and `TableReader` for the data we have.\n- We need to declare a query with the rule for joining the results of the readers too.\n- As proposed in the problem we can filter the region dataset to get only **Kanto** region."
         ],
         "metadata":{
            
         }
      },
      {
         "cell_type":"code",
         "source":[
            "from butterfree.extract import Source\nfrom butterfree.extract.readers import FileReader, TableReader\nfrom butterfree.extract.pre_processing import filter\n\nreaders = [\n    TableReader(id=\"listing_events\", table=\"listing_events\",),\n    FileReader(id=\"region\", path=f\"{path}/examples/data/region.json\", format=\"json\",).with_(\n        transformer=filter, condition=\"region == 'Kanto'\"\n    ),\n]\n\nquery = \"\"\"\nselect\n    listing_events.*,\n    region.city,\n    region.lat,\n    region.lng,\n    region.region\nfrom\n    listing_events\n    join region\n      on listing_events.region_id = region.id\n\"\"\"\n\nsource = Source(readers=readers, query=query)"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":11
      },
      {
         "cell_type":"code",
         "source":[
            "# showing source result\n\nsource_df = source.construct(spark_client)\nsource_df.toPandas()"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":12
      },
      {
         "cell_type":"markdown",
         "source":[
            "### Transform\n- At the transform part, a set of `Feature` objects is declared.\n- An Instance of `FeatureSet` is used to hold the features.\n- A `FeatureSet` can only be created when it is possible to define a unique tuple formed by key columns and a time reference. This is an **architectural requirement** for the data. So least one `KeyFeature` and one `TimestampFeature` is needed.\n- Every `Feature` needs a unique name, a description, and a data-type definition."
         ],
         "metadata":{
            
         }
      },
      {
         "cell_type":"code",
         "source":[
            "from butterfree.transform import FeatureSet\nfrom butterfree.transform.features import Feature, KeyFeature, TimestampFeature\nfrom butterfree.transform.transformations import SQLExpressionTransform\nfrom butterfree.transform.transformations.h3_transform import H3HashTransform\nfrom butterfree.constants import DataType\n\nkeys = [\n    KeyFeature(\n        name=\"id\",\n        description=\"Unique identificator code for houses.\",\n        dtype=DataType.BIGINT,\n    )\n]\n\n# from_ms = True because the data originally is not in a Timestamp format.\nts_feature = TimestampFeature(from_ms=True)\n\nfeatures = [\n    Feature(\n        name=\"rent\",\n        description=\"Rent value by month described in the listing.\",\n        dtype=DataType.FLOAT,\n    ),\n    Feature(\n        name=\"rent_over_area\",\n        description=\"Rent value by month divided by the area of the house.\",\n        transformation=SQLExpressionTransform(\"rent / area\"),\n        dtype=DataType.FLOAT,\n    ),\n    Feature(\n        name=\"bedrooms\",\n        description=\"Number of bedrooms of the house.\",\n        dtype=DataType.INTEGER,\n    ),\n    Feature(\n        name=\"bathrooms\",\n        description=\"Number of bathrooms of the house.\",\n        dtype=DataType.INTEGER,\n    ),\n    Feature(\n        name=\"area\",\n        description=\"Area of the house, in squared meters.\",\n        dtype=DataType.FLOAT,\n    ),\n    Feature(\n        name=\"bedrooms_over_area\",\n        description=\"Number of bedrooms divided by the area.\",\n        transformation=SQLExpressionTransform(\"bedrooms / area\"),\n        dtype=DataType.FLOAT,\n    ),\n    Feature(\n        name=\"bathrooms_over_area\",\n        description=\"Number of bathrooms divided by the area.\",\n        transformation=SQLExpressionTransform(\"bathrooms / area\"),\n        dtype=DataType.FLOAT,\n    ),\n    Feature(\n        name=\"latitude\",\n        description=\"House location latitude.\",\n        from_column=\"lat\",  # arg from_column is needed when changing column name\n        dtype=DataType.DOUBLE,\n    ),\n    Feature(\n        name=\"longitude\",\n        description=\"House location longitude.\",\n        from_column=\"lng\",\n        dtype=DataType.DOUBLE,\n    ),\n    Feature(\n        name=\"h3\",\n        description=\"H3 hash geohash.\",\n        transformation=H3HashTransform(\n            h3_resolutions=[10], lat_column=\"latitude\", lng_column=\"longitude\",\n        ),\n        dtype=DataType.STRING,\n    ),\n    Feature(name=\"city\", description=\"House location city.\", dtype=DataType.STRING,),\n    Feature(\n        name=\"region\",\n        description=\"House location region.\",\n        dtype=DataType.STRING,\n    ),\n]\n\nfeature_set = FeatureSet(\n    name=\"house_listings\",\n    entity=\"house\",  # entity: to which \"business context\" this feature set belongs\n    description=\"Features describring a house listing.\",\n    keys=keys,\n    timestamp=ts_feature,\n    features=features,\n)"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":14
      },
      {
         "cell_type":"code",
         "source":[
            "# showing feature set result\n\nfeature_set_df = feature_set.construct(source_df, spark_client)\nfeature_set_df.toPandas()"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":15
      },
      {
         "cell_type":"markdown",
         "source":[
            "### Load\n\n- For the load part we need `Writer` instances and a `Sink`.\n- writers define where to load the data.\n- The `Sink` gets the transformed data (feature set) and trigger the load to all the defined writers.\n- `debug_mode` will create a temporary view instead of trying to write in a real data store."
         ],
         "metadata":{
            
         }
      },
      {
         "cell_type":"code",
         "source":[
            "from butterfree.load.writers import (\n    HistoricalFeatureStoreWriter,\n    OnlineFeatureStoreWriter,\n)\nfrom butterfree.load import Sink\n\nwriters = [HistoricalFeatureStoreWriter(debug_mode=True), OnlineFeatureStoreWriter(debug_mode=True)]\nsink = Sink(writers=writers)"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":17
      },
      {
         "cell_type":"markdown",
         "source":[
            "## Pipeline\n\n- The `Pipeline` entity wraps all the other defined elements.\n- `run` command will trigger the execution of the pipeline, end-to-end."
         ],
         "metadata":{
            
         }
      },
      {
         "cell_type":"code",
         "source":[
            "from butterfree.pipelines import FeatureSetPipeline\n\npipeline = FeatureSetPipeline(source=source, feature_set=feature_set, sink=sink)"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":19
      },
      {
         "cell_type":"code",
         "source":[
            "result_df = pipeline.run()"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":20
      },
      {
         "cell_type":"markdown",
         "source":[
            "### Showing the results"
         ],
         "metadata":{
            
         }
      },
      {
         "cell_type":"code",
         "source":[
            "print(\">>> Historical Feature house_listings feature set table:\")\nspark.table(\"historical_feature_store__house_listings\").orderBy(\n    \"id\", \"timestamp\"\n).toPandas()"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":22
      },
      {
         "cell_type":"code",
         "source":[
            "print(\">>> Online Feature house_listings feature set table:\")\nspark.table(\"online_feature_store__house_listings\").orderBy(\"id\", \"timestamp\").toPandas()"
         ],
         "metadata":{
            
         },
         "outputs":[
            
         ],
         "execution_count":23
      },
      {
         "cell_type":"markdown",
         "source":[
            "- We can see that we were able to create all the desired features in an easy way\n- The **historical feature set** holds all the data, and we can see that it is partitioned by year, month and day (columns added in the `HistoricalFeatureStoreWriter`)\n- In the **online feature set** there is only the latest data for each id"
         ],
         "metadata":{
            
         }
      }
   ],
   "metadata":{
      "kernelspec":{
         "display_name":"Python 3",
         "language":"python",
         "name":"python3"
      },
      "language_info":{
         "mimetype":"text/x-python",
         "name":"python",
         "pygments_lexer":"ipython3",
         "codemirror_mode":{
            "name":"ipython",
            "version":3
         },
         "version":"3.6.8",
         "nbconvert_exporter":"python",
         "file_extension":".py"
      },
      "name":"preprocessing",
      "notebookId":1762639776163919
   },
   "nbformat":4,
   "nbformat_minor":0
}