Metadata-Version: 2.1
Name: dagster
Version: 0.1.2
Summary: Dagster is an opinionated pipeline runner.
Home-page: https://github.com/dagster-io/dagster
Author: Elementl
Author-email: schrockn@elementl.com
License: Apache-2.0
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3.7
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Requires-Dist: enum34 (>=1.1.6)
Requires-Dist: future (>=0.16.0)
Requires-Dist: click (>=6.7)
Requires-Dist: coloredlogs (>=10.0)
Requires-Dist: graphviz (>=0.8.3)
Requires-Dist: pyyaml (>=3.12)
Requires-Dist: six (>=1.11.0)
Requires-Dist: toposort (>=1.5)
Requires-Dist: pandas (>=0.22.0)
Requires-Dist: pyarrow (>=0.8.0)
Requires-Dist: sqlalchemy (>=1.2.7)
Requires-Dist: jinja2 (>=2.10)

Dagster
=======

.. docs-include

Dagster is a library that helps you organize, run and test your data processing pipelines. Instead of writing ad-hoc data processing script, Dagster encourages separating data input, data processing and data output, as well as separating your processing in several steps.


The core abstraction in Dagster is a **Solid**, a logical unit of computation in a data pipeline. At its core a Solid is a coarse-grained pure function. It takes multiple inputs, performs a computation, produces an output. Inputs can either be received from outputs from upstream solids in the pipeline or external sources (e.g. files). Likewise, outputs from the transform can either be flowed to downstream solids or materialized (e.g. to a file in an object store) so that it can be inspected or accessed by an external system.

Solid formally separates the notion of inputs, outputs, the core transform. The core goal of this is cleanly and clearly separate the domain logic of each transform and its operational environments. This  allows solids or pipelines of solids to easily be executable in a number of environments: unit-testing, local development, integration tests, CI/CD, staging environments, production environments, and so on and so forth.

Alongside with the core abstraction, Dagster provides helpers to create Solids that operate on Pandas dataframes and SQL databases.

Example
-------



.. code-block:: python


  import pandas as pd
  import dagster.core
  from dagster.core.definitions import (
      InputDefinition,
      OutputDefinition
  )
  from dagster.core.decorators import (
      solid,
      source,
      materialization,
  )

  # Sources define *how* to get data. Let's define a source that
  # reads a CSV file and returns a pandas dataframe
  @source(
      # Name of the source
      name="CSV",
      # What arguments it should get from environment
      argument_def_dict={'path': dagster.core.types.PATH})
  )
  def csv_to_dataframe_source(path):
      return pd.read_csv(path)

  # Materializations define how to convert a transform result
  # into an artifact, for example a file. Let's define one that
  # outputs a CSV file. Note that materializations that are
  # defined for solids are optional, and are only triggered if
  # environment says so
  @materialization(
      # Name of the materialization
      name="CSV",
      # What arguments it should get from environment
      argument_def_dict={'path': dagster.core.types.PATH})
  )
  def dataframe_to_csv_materialization(data, path):
      data.to_csv(path)

  # Solids can be created by annotating transform function with
  # a decorator
  @solid(
      # Solid inputs define arguments passed to transform function
      inputs=[
          InputDefinition(
              name='num',
              # Inputs can have sources. While inputs define
              # *what* transform should receive, sources define
              # *how* this data should be retrieved
              sources=[csv_to_dataframe_source]
          )
      ],
      # Solid output determines what solid should return.
      output=OutputDefinition(materializations=[
          dataframe_to_csv_materialization,
      ])
  )
  def sum_solid(num):
      sum_df = num.copy()
      # Here we add a new column to dataframe to sum up num1 and
      # num2 columns
      sum_df['sum'] = sum_df['num1'] + sum_df['num2']
      return sum_df


  @solid(
      inputs=[
          # This solid is a *dependency*. It depends on a result
          # of previous solid as one of the inputs
          InputDefinition(name="sum", depends_on=sum_solid)
      ],
      output=OutputDefinition(materializations=[
          dataframe_to_csv_materialization,
      ])
  )
  def sum_sq_solid(sum):
      sum_sq = sum.copy()
      sum_sq['sum_sq'] = sum['sum']**2
      return sum_sq


  # After definining a solid, we are grouping them into a pipeline
  pipeline = dagster.core.pipeline(
      name='pandas_hello_world',
      solids=[
          sum_solid,
          sum_sq_solid,
      ],
  )

You might notice that there is no actual CSV file specified as inputs. This is because such parameters are passed in environment. This allows you to customize it in runtime. To run your solid, we'll pass that environment to the execution function.

.. code-block:: python

  environment = config.environment(
    sources={
        'num' : config.Source(name='CSV', args={'path': 'path/to/num.csv'})
    }
  )

  pipeline_result = dagster.execute_pipeline(
      dagster.context(),
      pipeline,
      environment
  )


We can simplify the above example by using built-in dagster pandas inputs and outputs.

.. code-block:: python

  import dagster.core
  from dagster import config
  from dagster.core.decorators import solid, with_context
  import dagster.pandas_kernel as dagster_pd

  @solid(
      inputs=[
          # We are using a pre-made input that should be a dataframe
          dagster_pd.dataframe_input(
              'num',
              sources=[
                  # A built-in pandas csv-dataframe source reads
                  # a CSV file andproduces a pandas dataframe
                  dagster_pd.csv_dataframe_source()
              ]
          )
      ],
      # This built-in dataframe knows how to materialize dataframes
      # out of the box
      output=dagster_pd.dataframe_output()
  )
  def sum_solid(num):
      sum_df = num.copy()
      # Here we add a new column to dataframe to sum up num1 and num2 columns
      sum_df['sum'] = sum_df['num1'] + sum_df['num2']
      return sum_df


  @solid(
      inputs=[
          # This input will check that the source solid outputs a
          # dataframe
          dagster_pd.dataframe_dependency(name="sum", solid=sum_solid)
      ],
      output=dagster_pd.dataframe_output()
  )
  def sum_sq_solid(sum):
      sum_sq = sum.copy()
      sum_sq['sum_sq'] = sum['sum']**2
      return sum_sq

  # After definining a solid, we are grouping them into a pipeline
  pipeline = dagster.core.pipeline(
      name='pandas_hello_world',
      solids=[
          sum_solid,
          sum_sq_solid,
      ],
  )

We can specify in order to get artifacts for the results. We can materialize output from any solid, this can be useful to see if intermediate results make sense.

.. code-block:: python

    environment = config.Environment(
        sources={
            'num' : config.Source(name='CSV', args={'path': 'path/to/num.csv'})
        }
    )

  materializations = [
      config.Materialization(
          solid='sum',
          materialization_type='CSV',
          args={'path': 'path/to/output.csv'},
      ),
      config.Materialization(
          solid='sum_sq',
          materialization_type='CSV',
          args={'path': 'path/to/output.csv'},
      )
  ]

  pipeline_result = dagster.execute_pipeline(
      dagster.context(),
      pipeline,
      environment,
      materializations,
  )


Dagster CLI
===========

In addition to programmatic API, you can also use dagster CLI to run the pipelines. In that case the environment is specified through yaml configuration files.

The folder structure should be as follows.

.. code-block

  pipeline_project_name/
    pipelines.yml
    pipeline_module_1/
      env.yml
    pipeline_module_2/
      env.yml

Pipelines yml specify the pipelines that are present in current project. Env specifies environment for each particular pipeline.

.. code-block:: yaml

  pipelines:
    - module: pipeline_project_name.pipeline_module_1.pipeline
      fn: define_pipeline
    - module: pipeline_project_name.pipeline_module_2.pipeline
      fn: define_pipeline


.. code-block:: yaml

  environment:
    inputs:
      - input_name: num
        source: CSV
        args:
          path: "input/num.csv"

  materializations:
    - solid: sum
      type: CSV
      args:
        path: 'sum.csv'
    - solid: sum_sq
      type: CSV
      args:
        path: 'sum_sq.csv'


.. code-block:: sh

    pip install dagster
    # List pipelines
    dagster pipeline list
    # Print info about pipeline solids
    dagster pipeline print pipeline1
    # Execute pipeline
    dagster pipeline execute pipeline1
    # Execute pipeline from intermediate step
    dagster pipeline execute pipeline1 --from-solid SOLID_NAME


Concepts
========

Transform
---------

This is core, user-defined transform that performs the logical data
computation. In this case the transform is ``hello_world_transform_fn``
and it is passed as parameter into the solid. It takes one or more
inputs and produces an output. All other concepts in a solid are the
metadata and structure that surround this core computation

Inputs
---------

For each argument to the transform function, there is one
``InputDefinition`` object. It has a name, which must match the
parameters to the transform function. The input definitions define a
name, a dependency for the input (what upstream solid produces its
value, see below) and a number of sources. An input definition must
specify at least a dependency or a source. The input can have any number
of sources.

Sources
^^^^^^^

Sources are the the way that one can create an input to a transform from
external resources. A source is a function that takes a set of arguments
and produces a value that is passed to the transform for processing. In
this case, a CSV file is the only source type. One can imagine adding
other source types to create pandas dataframes for Json, Parquet, and so
forth. End users will typically rely on pre-existing libraries to
specify sources.

Sources also declare what arguments they expect. These are inspectable
and could be used to render information for web or command line tools,
to verify the validity of confie files, and other tooling contexts. The
framework verifies when solids or pipelines of solids are executed, that
the arguments abide by the declaration. These arguments are then sent to
the source function in the ``arg_dict`` parameter.

Output
---------

The ``OutputDefinition`` represents the output of the transform
function.

Materializations
^^^^^^^^^^^^^^^^

Materializations are the other end of source. This specifies the way the
output of a transform can be materialized. In this example which uses
pandas dataframes, the sources and materializations will be symmetrical.
In the above example we specified a single materialization, a CSV. One
could expand this to include JSON, Parquet, or other materialiations as
appropriate.

However in other contexts that might be true. Take solids that operate
on SQL-based data warehouses or similar. The core transform would be the
SQL command, but the materialization would specify a
``CREATE TABLE AS``, a table append, or a partition creation, as
examples.

Higher-level APIs
------------------

These definitions will typically be composed with higher level APIs. For
example, the above solid could be expressed using APIs provided by the
pandas kernel. (Note: the "kernel" terminology is not settled)

.. code-block:: python

    import dagster
    import dagster.pandas_kernel as dagster_pd

    def sum_transform(num_df):
        num_df['sum'] = num_df['num1'] + num_df['num2']
        return num_df

    sum_solid = SolidDefinition(
        name='sum',
        description='This computes the sum of two numbers.'
        inputs=[dagster_pd.dataframe_csv_input(name='num_df')],
        transform_fn=sum_transform,
        output=dagster_pd.dataframe_output(),
    )

Execution
---------

These are useless without being able to execute them. In order to
execute a solid, you need to package it up into a pipeline.

.. code-block:: python

    pipeline = dagster.pipeline(name='hello_world', solids=[sum_solid])

Then you an execute it by providing an environment. You must provide
enough source data to create all the inputs necessary for the pipeline.

.. code-block:: python

    environment = config.Environment(
        sources={
            'num_df' : config.Source(name='CSV', args={'path': 'path/to/input.csv'})
        }
    )

    pipeline_result = dagster.execute_pipeline(
        dagster.context(),
        pipeline,
        environment
    )

    print(pipeline_result.result_named('sum').transformed_value)

Execute pipeline does a purely in-memory transform, materializing
nothing. This is useful in testing and CI/CD contexts.

Materialization
----------------

In order to produce outputs that are available to external systems, you
must materialize them. In this case, that means producing files. In
addition to your environment, you must specify your materializations.

.. code-block:: python

    materializations = [
        config.Materialization(
            solid='sum',
            materialization_type='CSV',
            args={'path': 'path/to/output.csv'},
        )
    ]

    dagster.materialize_pipeline(
        dagster.context(),
        pipeline,
        environment,
        materializations,
    )

Dependencies
------------

So far we have demonstrated a single stage pipeline, which is obviously
of limited value.

Imagine we wanted to add another stage which took the sum we produced
and squared that value. (Fancy!)

.. code-block:: python

    def sum_sq_transform(sum_df):
        sum_df['sum_sq'] = sum_df['sum'] * sum_df['sum']
        return sum_df

    # Fully expanded InputDefintion. Should be wrapped in higher-level
    # but this is here for explanatory code.
    sum_sq_input = InputDefinition(
        name='sum_df',
        sources=[
            SourceDefinition(
                source_type='CSV',
                argument_def_dict={'path': types.PATH},
                source_fn=lambda arg_dict: pd.read_csv(arg_dict['path']),
            ),
        ],
        depends_on=sum_solid,
    )

    sum_sq_solid = SolidDefinition(
        name='sum_sq',
        inputs=[sum_sq_input],
        transform_fn=sum_sq_transform,
        output=dagster_pd.dataframe_output(),
    )

Note that input specifies that dependency. This means that the input
value passed to the transform can be generated by an upstream dependency
OR by an external source. This allows for the solid to be executable in
isolation or in the context of a pipeline.

.. code-block:: python

    pipeline = dagster.pipeline(solids=[sum_solid, sum_sq_solid])

    environment = config.Environment(
        sources={
            'num_df' : config.Source(name='CSV', args={'path': 'path/to/num.csv'})
        }
    )

    pipeline_result = dagster.execute_pipeline(
        dagster.context(),
        pipeline,
        environment
    )

The above executed both solids, even though one input was provided. The
input into sum\_sq\_solid was provided by the upstream result from the
output of sum\_solid.

You can also execute subsets of the pipeline. Given the above pipeline,
you could specify that you only want to specify the first solid:

.. code-block:: python

    environment = config.Environment(
        sources={
            'num_df' : config.Source(name='CSV', args={'path': 'path/to/num.csv'})
        }
    )

    pipeline_result = dagster.execute_pipeline(
        dagster.context(),
        pipeline,
        environment,
        through=['sum'],
    )

Or you could specify just the second solid. In that case the environment
would have to be changed.

.. code-block:: python

    environment = config.Environment(
        sources={
            'sum_df' : config.Source(name='CSV', args={'path': 'path/to/sum.csv'})
        }
    )

    pipeline_result = dagster.execute_pipeline(
        dagster.context(),
        pipeline,
        environment,
        from=['sum'],
        through=['sum_sq'],
    )

Expectations
------------

Expectations are another reason to introduce logical seams between data
computations. They are a way to perform data quality tests or
statistical process control on data pipelines.

TODO: Complete this section when the APIs and functionality are more
fleshed out.


