Coverage for airflow/hooks/http_hook.py : 23%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
# -*- coding: utf-8 -*- # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.
""" Interact with HTTP servers. """
self.http_conn_id = http_conn_id self.method = method
# headers is required to make it required """ Returns http session for use with requests """ conn = self.get_connection(self.http_conn_id) session = requests.Session() self.base_url = conn.host if not self.base_url.startswith('http'): self.base_url = 'http://' + self.base_url
if conn.port: self.base_url = self.base_url + ":" + str(conn.port) + "/" if conn.login: session.auth = (conn.login, conn.password) if headers: session.headers.update(headers)
return session
""" Performs the request """ extra_options = extra_options or {}
session = self.get_conn(headers)
url = self.base_url + endpoint req = None if self.method == 'GET': # GET uses params req = requests.Request(self.method, url, params=data, headers=headers) else: # Others use data req = requests.Request(self.method, url, data=data, headers=headers)
prepped_request = session.prepare_request(req) logging.info("Sending '" + self.method + "' to url: " + url) return self.run_and_check(session, prepped_request, extra_options)
""" Grabs extra options like timeout and actually runs the request, checking for the result """ extra_options = extra_options or {}
response = session.send( prepped_request, stream=extra_options.get("stream", False), verify=extra_options.get("verify", False), proxies=extra_options.get("proxies", {}), cert=extra_options.get("cert"), timeout=extra_options.get("timeout"), allow_redirects=extra_options.get("allow_redirects", True))
try: response.raise_for_status() except requests.exceptions.HTTPError: # Tried rewrapping, but not supported. This way, it's possible # to get reason and code for failure by checking first 3 chars # for the code, or do a split on ':' logging.error("HTTP error: " + response.reason) if self.method != 'GET': # The sensor uses GET, so this prevents filling up the log # with the body every time the GET 'misses'. # That's ok to do, because GETs should be repeatable and # all data should be visible in the log (no post data) logging.error(response.text) raise AirflowException(str(response.status_code)+":"+response.reason) return response |