Source code for airflow.contrib.hooks.gcs_hook

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging

from apiclient.discovery import build
from apiclient.http import MediaFileUpload
from googleapiclient import errors

from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook

logging.getLogger("google_cloud_storage").setLevel(logging.INFO)


[docs]class GoogleCloudStorageHook(GoogleCloudBaseHook): """ Interact with Google Cloud Storage. This hook uses the Google Cloud Platform connection. """ def __init__(self, google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None): super(GoogleCloudStorageHook, self).__init__(google_cloud_storage_conn_id, delegate_to)
[docs] def get_conn(self): """ Returns a Google Cloud Storage service object. """ http_authorized = self._authorize() return build('storage', 'v1', http=http_authorized)
# pylint:disable=redefined-builtin
[docs] def download(self, bucket, object, filename=False): """ Get a file from Google Cloud Storage. :param bucket: The bucket to fetch from. :type bucket: string :param object: The object to fetch. :type object: string :param filename: If set, a local file path where the file should be written to. :type filename: string """ service = self.get_conn() downloaded_file_bytes = service \ .objects() \ .get_media(bucket=bucket, object=object) \ .execute() # Write the file to local file path, if requested. if filename: write_argument = 'wb' if isinstance(downloaded_file_bytes, bytes) else 'w' with open(filename, write_argument) as file_fd: file_fd.write(downloaded_file_bytes) return downloaded_file_bytes
# pylint:disable=redefined-builtin
[docs] def upload(self, bucket, object, filename, mime_type='application/octet-stream'): """ Uploads a local file to Google Cloud Storage. :param bucket: The bucket to upload to. :type bucket: string :param object: The object name to set when uploading the local file. :type object: string :param filename: The local file path to the file to be uploaded. :type filename: string :param mime_type: The MIME type to set when uploading the file. :type mime_type: string """ service = self.get_conn() media = MediaFileUpload(filename, mime_type) response = service \ .objects() \ .insert(bucket=bucket, name=object, media_body=media) \ .execute()
# pylint:disable=redefined-builtin
[docs] def exists(self, bucket, object): """ Checks for the existence of a file in Google Cloud Storage. :param bucket: The Google cloud storage bucket where the object is. :type bucket: string :param object: The name of the object to check in the Google cloud storage bucket. :type object: string """ service = self.get_conn() try: service \ .objects() \ .get(bucket=bucket, object=object) \ .execute() return True except errors.HttpError as ex: if ex.resp['status'] == '404': return False raise
# pylint:disable=redefined-builtin
[docs] def is_updated_after(self, bucket, object, ts): """ Checks if an object is updated in Google Cloud Storage. :param bucket: The Google cloud storage bucket where the object is. :type bucket: string :param object: The name of the object to check in the Google cloud storage bucket. :type object: string :param ts: The timestamp to check against. :type ts: datetime """ service = self.get_conn() try: response = (service .objects() .get(bucket=bucket, object=object) .execute()) if 'updated' in response: import dateutil.parser import dateutil.tz if not ts.tzinfo: ts = ts.replace(tzinfo=dateutil.tz.tzutc()) updated = dateutil.parser.parse(response['updated']) logging.log(logging.INFO, "Verify object date: " + str(updated) + " > " + str(ts)) if updated > ts: return True except errors.HttpError as ex: if ex.resp['status'] != '404': raise return False