Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

# -*- coding: utf-8 -*- 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

 

from builtins import str 

import logging 

 

import requests 

 

from airflow.hooks.base_hook import BaseHook 

from airflow.exceptions import AirflowException 

 

 

class HttpHook(BaseHook): 

""" 

Interact with HTTP servers. 

""" 

 

def __init__(self, method='POST', http_conn_id='http_default'): 

self.http_conn_id = http_conn_id 

self.method = method 

 

# headers is required to make it required 

def get_conn(self, headers): 

""" 

Returns http session for use with requests 

""" 

conn = self.get_connection(self.http_conn_id) 

session = requests.Session() 

self.base_url = conn.host 

if not self.base_url.startswith('http'): 

self.base_url = 'http://' + self.base_url 

 

if conn.port: 

self.base_url = self.base_url + ":" + str(conn.port) + "/" 

if conn.login: 

session.auth = (conn.login, conn.password) 

if headers: 

session.headers.update(headers) 

 

return session 

 

def run(self, endpoint, data=None, headers=None, extra_options=None): 

""" 

Performs the request 

""" 

extra_options = extra_options or {} 

 

session = self.get_conn(headers) 

 

url = self.base_url + endpoint 

req = None 

if self.method == 'GET': 

# GET uses params 

req = requests.Request(self.method, 

url, 

params=data, 

headers=headers) 

else: 

# Others use data 

req = requests.Request(self.method, 

url, 

data=data, 

headers=headers) 

 

prepped_request = session.prepare_request(req) 

logging.info("Sending '" + self.method + "' to url: " + url) 

return self.run_and_check(session, prepped_request, extra_options) 

 

def run_and_check(self, session, prepped_request, extra_options): 

""" 

Grabs extra options like timeout and actually runs the request, 

checking for the result 

""" 

extra_options = extra_options or {} 

 

response = session.send( 

prepped_request, 

stream=extra_options.get("stream", False), 

verify=extra_options.get("verify", False), 

proxies=extra_options.get("proxies", {}), 

cert=extra_options.get("cert"), 

timeout=extra_options.get("timeout"), 

allow_redirects=extra_options.get("allow_redirects", True)) 

 

try: 

response.raise_for_status() 

except requests.exceptions.HTTPError: 

# Tried rewrapping, but not supported. This way, it's possible 

# to get reason and code for failure by checking first 3 chars 

# for the code, or do a split on ':' 

logging.error("HTTP error: " + response.reason) 

if self.method != 'GET': 

# The sensor uses GET, so this prevents filling up the log 

# with the body every time the GET 'misses'. 

# That's ok to do, because GETs should be repeatable and 

# all data should be visible in the log (no post data) 

logging.error(response.text) 

raise AirflowException(str(response.status_code)+":"+response.reason) 

return response