Coverage for airflow.hooks.presto_hook : 38%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
""" Interact with Presto through PyHive!
>>> ph = PrestoHook() >>> sql = "SELECT count(1) AS num FROM airflow.static_babynames" >>> ph.get_records(sql) [[340698]] """
"""Returns a connection object""" db = self.get_connection(self.presto_conn_id) return presto.connect( host=db.host, port=db.port, username=db.login, catalog=db.extra_dejson.get('catalog', 'hive'), schema=db.schema)
def _strip_sql(sql): return sql.strip().rstrip(';')
""" Get a set of records from Presto """ try: return super(PrestoHook, self).get_records( self._strip_sql(hql), parameters) except DatabaseError as e: obj = eval(str(e)) raise PrestoException(obj['message'])
""" Returns only the first row, regardless of how many rows the query returns. """ try: return super(PrestoHook, self).get_first( self._strip_sql(hql), parameters) except DatabaseError as e: obj = eval(str(e)) raise PrestoException(obj['message'])
""" Get a pandas dataframe from a sql query. """ import pandas cursor = self.get_cursor() cursor.execute(self._strip_sql(hql), parameters) try: data = cursor.fetchall() except DatabaseError as e: obj = eval(str(e)) raise PrestoException(obj['message']) column_descriptions = cursor.description if data: df = pandas.DataFrame(data) df.columns = [c[0] for c in column_descriptions] else: df = pandas.DataFrame() return df
""" Execute the statement against Presto. Can be used to create views. """ return super(PrestoHook, self).run(self._strip_sql(hql), parameters)
raise NotImplemented() |