Source code for airflow.contrib.operators.dataflow_operator

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import re

from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


[docs]class DataFlowJavaOperator(BaseOperator): """ Start a Java Cloud DataFlow batch job. The parameters of the operation will be passed to the job. It's a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location. ``` default_args = { 'dataflow_default_options': { 'project': 'my-gcp-project', 'zone': 'europe-west1-d', 'stagingLocation': 'gs://my-staging-bucket/staging/' } } ``` You need to pass the path to your dataflow as a file reference with the ``jar`` parameter, the jar needs to be a self executing jar. Use ``options`` to pass on options to your job. ``` t1 = DataFlowOperation( task_id='datapflow_example', jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar', options={ 'autoscalingAlgorithm': 'BASIC', 'maxNumWorkers': '50', 'start': '{{ds}}', 'partitionType': 'DAY' }, dag=my-dag) ``` Both ``jar`` and ``options`` are templated so you can use variables in them. """ template_fields = ['options', 'jar'] ui_color = '#0273d4' @apply_defaults def __init__( self, jar, dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): """ Create a new DataFlowJavaOperator. Note that both dataflow_default_options and options will be merged to specify pipeline execution parameter, and dataflow_default_options is expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG. For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params :param jar: The reference to a self executing DataFlow jar. :type jar: string :param dataflow_default_options: Map of default job options. :type dataflow_default_options: dict :param options: Map of job specific options. :type options: dict :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. :type gcp_conn_id: string :param delegate_to: The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string """ super(DataFlowJavaOperator, self).__init__(*args, **kwargs) dataflow_default_options = dataflow_default_options or {} options = options or {} self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.jar = jar self.dataflow_default_options = dataflow_default_options self.options = options def execute(self, context): hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) dataflow_options = copy.copy(self.dataflow_default_options) dataflow_options.update(self.options) hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
class DataFlowPythonOperator(BaseOperator): @apply_defaults def __init__( self, py_file, py_options=None, dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): """ Create a new DataFlowPythonOperator. Note that both dataflow_default_options and options will be merged to specify pipeline execution parameter, and dataflow_default_options is expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG. For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params :param py_file: Reference to the python dataflow pipleline file, e.g., /some/local/file/path/to/your/python/pipeline/file.py. :type py_file: string :param py_options: Additional python options. :type pyt_options: list of strings, e.g., ["-m", "-v"]. :param dataflow_default_options: Map of default job options. :type dataflow_default_options: dict :param options: Map of job specific options. :type options: dict :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. :type gcp_conn_id: string :param delegate_to: The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string """ super(DataFlowPythonOperator, self).__init__(*args, **kwargs) self.py_file = py_file self.py_options = py_options or [] self.dataflow_default_options = dataflow_default_options or {} self.options = options or {} self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to def execute(self, context): """Execute the python dataflow job.""" hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) dataflow_options = self.dataflow_default_options.copy() dataflow_options.update(self.options) # Convert argument names from lowerCamelCase to snake case. camel_to_snake = lambda name: re.sub( r'[A-Z]', lambda x: '_' + x.group(0).lower(), name) formatted_options = {camel_to_snake(key): dataflow_options[key] for key in dataflow_options} hook.start_python_dataflow( self.task_id, formatted_options, self.py_file, self.py_options)