# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import logging
import json
import time
from pydruid.client import PyDruid
import requests
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
LOAD_CHECK_INTERVAL = 5
DEFAULT_TARGET_PARTITION_SIZE = 5000000
class AirflowDruidLoadException(AirflowException):
pass
[docs]class DruidHook(BaseHook):
'''
Interact with druid.
'''
def __init__(
self,
druid_query_conn_id='druid_query_default',
druid_ingest_conn_id='druid_ingest_default'):
self.druid_query_conn_id = druid_query_conn_id
self.druid_ingest_conn_id = druid_ingest_conn_id
self.header = {'content-type': 'application/json'}
[docs] def get_conn(self):
"""
Returns a druid connection object for query
"""
conn = self.get_connection(self.druid_query_conn_id)
return PyDruid(
"http://{conn.host}:{conn.port}".format(**locals()),
conn.extra_dejson.get('endpoint', ''))
@property
def ingest_post_url(self):
conn = self.get_connection(self.druid_ingest_conn_id)
host = conn.host
port = conn.port
endpoint = conn.extra_dejson.get('endpoint', '')
return "http://{host}:{port}/{endpoint}".format(**locals())
def get_ingest_status_url(self, task_id):
post_url = self.ingest_post_url
return "{post_url}/{task_id}/status".format(**locals())
[docs] def construct_ingest_query(
self, datasource, static_path, ts_dim, columns, metric_spec,
intervals, num_shards, target_partition_size,
query_granularity="NONE", segment_granularity="DAY",
hadoop_dependency_coordinates=None):
"""
Builds an ingest query for an HDFS TSV load.
:param datasource: target datasource in druid
:param columns: list of all columns in the TSV, in the right order
"""
# backward compatibilty for num_shards, but target_partition_size is the default setting
# and overwrites the num_shards
if target_partition_size == -1:
if num_shards == -1:
target_partition_size = DEFAULT_TARGET_PARTITION_SIZE
else:
num_shards = -1
metric_names = [
m['fieldName'] for m in metric_spec if m['type'] != 'count']
dimensions = [c for c in columns if c not in metric_names and c != ts_dim]
ingest_query_dict = {
"type": "index_hadoop",
"spec": {
"dataSchema": {
"metricsSpec": metric_spec,
"granularitySpec": {
"queryGranularity": query_granularity,
"intervals": intervals,
"type": "uniform",
"segmentGranularity": segment_granularity,
},
"parser": {
"type": "string",
"parseSpec": {
"columns": columns,
"dimensionsSpec": {
"dimensionExclusions": [],
"dimensions": dimensions, # list of names
"spatialDimensions": []
},
"timestampSpec": {
"column": ts_dim,
"format": "auto"
},
"format": "tsv"
}
},
"dataSource": datasource
},
"tuningConfig": {
"type": "hadoop",
"jobProperties": {
"mapreduce.job.user.classpath.first": "false",
"mapreduce.map.output.compress": "false",
"mapreduce.output.fileoutputformat.compress": "false",
},
"partitionsSpec": {
"type": "hashed",
"targetPartitionSize": target_partition_size,
"numShards": num_shards,
},
},
"ioConfig": {
"inputSpec": {
"paths": static_path,
"type": "static"
},
"type": "hadoop"
}
}
}
if hadoop_dependency_coordinates:
ingest_query_dict[
'hadoopDependencyCoordinates'] = hadoop_dependency_coordinates
return json.dumps(ingest_query_dict, indent=4)
def send_ingest_query(
self, datasource, static_path, ts_dim, columns, metric_spec,
intervals, num_shards, target_partition_size, query_granularity, segment_granularity,
hadoop_dependency_coordinates=None):
query = self.construct_ingest_query(
datasource, static_path, ts_dim, columns,
metric_spec, intervals, num_shards, target_partition_size,
query_granularity, segment_granularity, hadoop_dependency_coordinates)
r = requests.post(
self.ingest_post_url, headers=self.header, data=query)
logging.info(self.ingest_post_url)
logging.info(query)
logging.info(r.text)
d = json.loads(r.text)
if "task" not in d:
raise AirflowDruidLoadException(
"[Error]: Ingesting data to druid failed.")
return d["task"]
[docs] def load_from_hdfs(
self, datasource, static_path, ts_dim, columns,
intervals, num_shards, target_partition_size, query_granularity, segment_granularity,
metric_spec=None, hadoop_dependency_coordinates=None):
"""
load data to druid from hdfs
:param ts_dim: The column name to use as a timestamp
:param metric_spec: A list of dictionaries
"""
task_id = self.send_ingest_query(
datasource, static_path, ts_dim, columns, metric_spec,
intervals, num_shards, target_partition_size, query_granularity, segment_granularity,
hadoop_dependency_coordinates)
status_url = self.get_ingest_status_url(task_id)
while True:
r = requests.get(status_url)
d = json.loads(r.text)
if d['status']['status'] == 'FAILED':
logging.error(d)
raise AirflowDruidLoadException(
"[Error]: Ingesting data to druid failed.")
elif d['status']['status'] == 'SUCCESS':
break
time.sleep(LOAD_CHECK_INTERVAL)