Integration¶
AWS: Amazon Webservices¶
—
GCP: Google Cloud Platform¶
Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and Operators are in the contrib section. Meaning that they have a beta status, meaning that they can have breaking changes between minor releases.
BigQuery¶
BigQuery Operators¶
- BigQueryCheckOperator : Performs checks against a SQL query that will return a single row with different values.
- BigQueryValueCheckOperator : Performs a simple value check using SQL code.
- BigQueryIntervalCheckOperator : Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.
- BigQueryOperator : Executes BigQuery SQL queries in a specific BigQuery database.
- BigQueryToBigQueryOperator : Copy a BigQuery table to another BigQuery table.
- BigQueryToCloudStorageOperator : Transfers a BigQuery table to a Google Cloud Storage bucket
BigQueryCheckOperator¶
-
class
airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator(sql, bigquery_conn_id=’bigquery_default’, *args, **kwargs)[source]¶ Performs checks against Presto. The
BigQueryCheckOperatorexpects a sql query that will return a single row. Each value on that first row is evaluated using pythonboolcasting. If any of the values returnFalsethe check is failed and errors out.Note that Python bool casting evals the following as
False:False0- Empty string (
"") - Empty list (
[]) - Empty dictionary or set (
{})
Given a query like
SELECT COUNT(*) FROM foo, it will fail only if the count== 0. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average.This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alterts without stopping the progress of the DAG.
Parameters: - sql (string) – the sql to be executed
- bigquery_conn_id – reference to the BigQuery database
BigQueryValueCheckOperator¶
BigQueryIntervalCheckOperator¶
-
class
airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator(table, metrics_thresholds, date_filter_column=’ds’, days_back=-7, bigquery_conn_id=’bigquery_default’, *args, **kwargs)[source]¶ Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.
This method constructs a query like so:
- SELECT {metrics_threshold_dict_key} FROM {table}
- WHERE {date_filter_column}=<date>
Parameters: - table (str) – the table name
- days_back (int) – number of days between ds and the ds we want to check against. Defaults to 7 days
- metrics_threshold (dict) – a dictionary of ratios indexed by metrics, for example ‘COUNT(*)’: 1.5 would require a 50 percent or less difference between the current day, and the prior days_back.
BigQueryOperator¶
-
class
airflow.contrib.operators.bigquery_operator.BigQueryOperator(bql, destination_dataset_table=False, write_disposition=’WRITE_EMPTY’, allow_large_results=False, bigquery_conn_id=’bigquery_default’, delegate_to=None, udf_config=False, use_legacy_sql=True, *args, **kwargs)[source]¶ Executes BigQuery SQL queries in a specific BigQuery database
BigQueryToBigQueryOperator¶
-
class
airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator(source_project_dataset_tables, destination_project_dataset_table, write_disposition=’WRITE_EMPTY’, create_disposition=’CREATE_IF_NEEDED’, bigquery_conn_id=’bigquery_default’, delegate_to=None, *args, **kwargs)[source]¶ Copy a BigQuery table to another BigQuery table.
BigQueryToCloudStorageOperator¶
-
class
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator(source_project_dataset_table, destination_cloud_storage_uris, compression=’NONE’, export_format=’CSV’, field_delimiter=’, ’, print_header=True, bigquery_conn_id=’bigquery_default’, delegate_to=None, *args, **kwargs)[source]¶ Transfers a BigQuery table to a Google Cloud Storage bucket.
BigQueryHook¶
-
class
airflow.contrib.hooks.bigquery_hook.BigQueryHook(bigquery_conn_id=’bigquery_default’, delegate_to=None)[source]¶ Interact with BigQuery. This hook uses the Google Cloud Platform connection.
-
get_pandas_df(bql, parameters=None)[source]¶ Returns a Pandas DataFrame for the results produced by a BigQuery query. The DbApiHook method must be overridden because Pandas doesn’t support PEP 249 connections, except for SQLite. See:
https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447 https://github.com/pydata/pandas/issues/6900
Parameters: bql (string) – The BigQuery SQL to execute.
-
insert_rows(table, rows, target_fields=None, commit_every=1000)[source]¶ Insertion is currently unsupported. Theoretically, you could use BigQuery’s streaming API to insert rows into a table, but this hasn’t been implemented.
-
table_exists(project_id, dataset_id, table_id)[source]¶ Checks for the existence of a table in Google BigQuery.
Parameters: project_id – The Google cloud project in which to look for the table. The connection supplied to the hook must provide access to the specified project. :type project_id: string :param dataset_id: The name of the dataset in which to look for the table.
storage bucket.Parameters: table_id (string) – The name of the table to check the existence of.
-
Cloud DataFlow¶
DataFlow Operators¶
DataFlowJavaOperator¶
-
class
airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator(jar, dataflow_default_options=None, options=None, gcp_conn_id=’google_cloud_default’, delegate_to=None, *args, **kwargs)[source]¶ Start a Java Cloud DataFlow batch job. The parameters of the operation will be passed to the job.
It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.
``` default_args = {
- ‘dataflow_default_options’: {
- ‘project’: ‘my-gcp-project’, ‘zone’: ‘europe-west1-d’, ‘stagingLocation’: ‘gs://my-staging-bucket/staging/’
}
You need to pass the path to your dataflow as a file reference with the
jarparameter, the jar needs to be a self executing jar. Useoptionsto pass on options to your job.``` t1 = DataFlowOperation(
task_id=’datapflow_example’, jar=’{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar’, options={
‘autoscalingAlgorithm’: ‘BASIC’, ‘maxNumWorkers’: ‘50’, ‘start’: ‘{{ds}}’, ‘partitionType’: ‘DAY’}, dag=my-dag)
Both
jarandoptionsare templated so you can use variables in them.
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date':
(2016, 8, 1),
'email': ['alex@vanboxel.be'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=30),
'dataflow_default_options': {
'project': 'my-gcp-project',
'zone': 'us-central1-f',
'stagingLocation': 'gs://bucket/tmp/dataflow/staging/',
}
}
dag = DAG('test-dag', default_args=default_args)
task = DataFlowJavaOperator(
gcp_conn_id='gcp_default',
task_id='normalize-cal',
jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
options={
'autoscalingAlgorithm': 'BASIC',
'maxNumWorkers': '50',
'start': '{{ds}}',
'partitionType': 'DAY'
},
dag=dag)
Cloud DataProc¶
DataProc Operators¶
- DataProcPigOperator : Start a Pig query Job on a Cloud DataProc cluster.
- DataProcHiveOperator : Start a Hive query Job on a Cloud DataProc cluster.
- DataProcSparkSqlOperator : Start a Spark SQL query Job on a Cloud DataProc cluster.
- DataProcSparkOperator : Start a Spark Job on a Cloud DataProc cluster.
- DataProcHadoopOperator : Start a Hadoop Job on a Cloud DataProc cluster.
- DataProcPySparkOperator : Start a PySpark Job on a Cloud DataProc cluster.
DataProcPigOperator¶
-
class
airflow.contrib.operators.dataproc_operator.DataProcPigOperator(query=None, query_uri=None, variables=None, job_name=’{{task.task_id}}_{{ds_nodash}}’, dataproc_cluster=’cluster-1’, dataproc_pig_properties=None, dataproc_pig_jars=None, gcp_conn_id=’google_cloud_default’, delegate_to=None, *args, **kwargs)[source]¶ Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation will be passed to the cluster.
It’s a good practice to define dataproc_* parameters in the default_args of the dag like the cluster name and UDFs.
``` default_args = {
‘dataproc_cluster’: ‘cluster-1’, ‘dataproc_pig_jars’: [
‘gs://example/udf/jar/datafu/1.2.0/datafu.jar’, ‘gs://example/udf/jar/gpig/1.2/gpig.jar’]
You can pass a pig script as string or file reference. Use variables to pass on variables for the pig script to be resolved on the cluster or use the parameters to be resolved in the script as template parameters.
``` t1 = DataProcPigOperator(
task_id=’dataproc_pig’, query=’a_pig_script.pig’, variables={‘out’: ‘gs://example/output/{{ds}}’},
DataProcHiveOperator¶
-
class
airflow.contrib.operators.dataproc_operator.DataProcHiveOperator(query, variables=None, job_name=’{{task.task_id}}_{{ds_nodash}}’, dataproc_cluster=’cluster-1’, dataproc_hive_properties=None, dataproc_hive_jars=None, gcp_conn_id=’google_cloud_default’, delegate_to=None, *args, **kwargs)[source]¶ Start a Hive query Job on a Cloud DataProc cluster.
DataProcSparkSqlOperator¶
-
class
airflow.contrib.operators.dataproc_operator.DataProcSparkSqlOperator(query, variables=None, job_name=’{{task.task_id}}_{{ds_nodash}}’, dataproc_cluster=’cluster-1’, dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id=’google_cloud_default’, delegate_to=None, *args, **kwargs)[source]¶ Start a Spark SQL query Job on a Cloud DataProc cluster.
DataProcSparkOperator¶
-
class
airflow.contrib.operators.dataproc_operator.DataProcSparkOperator(main_jar=None, main_class=None, arguments=None, archives=None, files=None, job_name=’{{task.task_id}}_{{ds_nodash}}’, dataproc_cluster=’cluster-1’, dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id=’google_cloud_default’, delegate_to=None, *args, **kwargs)[source]¶ Start a Spark Job on a Cloud DataProc cluster.
DataProcHadoopOperator¶
-
class
airflow.contrib.operators.dataproc_operator.DataProcHadoopOperator(main_jar=None, main_class=None, arguments=None, archives=None, files=None, job_name=’{{task.task_id}}_{{ds_nodash}}’, dataproc_cluster=’cluster-1’, dataproc_hadoop_properties=None, dataproc_hadoop_jars=None, gcp_conn_id=’google_cloud_default’, delegate_to=None, *args, **kwargs)[source]¶ Start a Hadoop Job on a Cloud DataProc cluster.
DataProcPySparkOperator¶
-
class
airflow.contrib.operators.dataproc_operator.DataProcPySparkOperator(main, arguments=None, archives=None, pyfiles=None, files=None, job_name=’{{task.task_id}}_{{ds_nodash}}’, dataproc_cluster=’cluster-1’, dataproc_pyspark_properties=None, dataproc_pyspark_jars=None, gcp_conn_id=’google_cloud_default’, delegate_to=None, *args, **kwargs)[source]¶ Start a PySpark Job on a Cloud DataProc cluster.
Cloud Datastore¶
Datastore Operators¶
-
class
airflow.contrib.hooks.datastore_hook.DatastoreHook(datastore_conn_id=’google_cloud_datastore_default’, delegate_to=None)[source]¶ Interact with Google Cloud Datastore. This hook uses the Google Cloud Platform connection.
This object is not threads safe. If you want to make multiple requests simultaniously, you will need to create a hook per thread.
-
allocate_ids(partialKeys)[source]¶ Allocate IDs for incomplete keys. see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/allocateIds
Parameters: partialKeys – a list of partial keys Returns: a list of full keys.
-
begin_transaction()[source]¶ Get a new transaction handle see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/beginTransaction
Returns: a transaction handle
-
commit(body)[source]¶ Commit a transaction, optionally creating, deleting or modifying some entities. see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/commit
Parameters: body – the body of the commit request Returns: the response body of the commit request
-
lookup(keys, read_consistency=None, transaction=None)[source]¶ Lookup some entities by key see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/lookup :param keys: the keys to lookup :param read_consistency: the read consistency to use. default, strong or eventual.
Cannot be used with a transaction.Parameters: transaction – the transaction to use, if any. Returns: the response body of the lookup request.
-
rollback(transaction)[source]¶ Roll back a transaction see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/rollback :param transaction: the transaction to roll back
-
run_query(body)[source]¶ Run a query for entities. see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/runQuery :param body: the body of the query request :return: the batch of query results.
-
Cloud Storage¶
Storage Operators¶
- GoogleCloudStorageDownloadOperator : Downloads a file from Google Cloud Storage.
- GoogleCloudStorageToBigQueryOperator : Loads files from Google cloud storage into BigQuery.
GoogleCloudStorageDownloadOperator¶
GoogleCloudStorageToBigQueryOperator¶
-
class
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator(bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, source_format=’CSV’, create_disposition=’CREATE_IF_NEEDED’, skip_leading_rows=0, write_disposition=’WRITE_EMPTY’, field_delimiter=’, ’, max_id_key=None, bigquery_conn_id=’bigquery_default’, google_cloud_storage_conn_id=’google_cloud_storage_default’, delegate_to=None, schema_update_options=(), *args, **kwargs)[source]¶ Loads files from Google cloud storage into BigQuery.
GoogleCloudStorageHook¶
-
class
airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook(google_cloud_storage_conn_id=’google_cloud_storage_default’, delegate_to=None)[source]¶ Interact with Google Cloud Storage. This hook uses the Google Cloud Platform connection.
-
download(bucket, object, filename=False)[source]¶ Get a file from Google Cloud Storage.
Parameters: - bucket (string) – The bucket to fetch from.
- object (string) – The object to fetch.
- filename (string) – If set, a local file path where the file should be written to.
-
exists(bucket, object)[source]¶ Checks for the existence of a file in Google Cloud Storage.
Parameters: - bucket (string) – The Google cloud storage bucket where the object is.
- object (string) – The name of the object to check in the Google cloud storage bucket.
-
is_updated_after(bucket, object, ts)[source]¶ Checks if an object is updated in Google Cloud Storage.
Parameters: - bucket (string) – The Google cloud storage bucket where the object is.
- object (string) – The name of the object to check in the Google cloud storage bucket.
- ts (datetime) – The timestamp to check against.
-
upload(bucket, object, filename, mime_type=’application/octet-stream’)[source]¶ Uploads a local file to Google Cloud Storage.
Parameters: - bucket (string) – The bucket to upload to.
- object (string) – The object name to set when uploading the local file.
- filename (string) – The local file path to the file to be uploaded.
- mime_type (string) – The MIME type to set when uploading the file.
-