Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

# -*- coding: utf-8 -*- 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

 

import MySQLdb 

import MySQLdb.cursors 

 

from airflow.hooks.dbapi_hook import DbApiHook 

 

class MySqlHook(DbApiHook): 

''' 

Interact with MySQL. 

 

You can specify charset in the extra field of your connection 

as ``{"charset": "utf8"}``. Also you can choose cursor as 

``{"cursor": "SSCursor"}``. Refer to the MySQLdb.cursors for more details. 

''' 

 

conn_name_attr = 'mysql_conn_id' 

default_conn_name = 'mysql_default' 

supports_autocommit = True 

 

def get_conn(self): 

""" 

Returns a mysql connection object 

""" 

conn = self.get_connection(self.mysql_conn_id) 

conn_config = { 

"user": conn.login, 

"passwd": conn.password or '' 

} 

 

conn_config["host"] = conn.host or 'localhost' 

if not conn.port: 

conn_config["port"] = 3306 

else: 

conn_config["port"] = int(conn.port) 

 

conn_config["db"] = conn.schema or '' 

 

if conn.extra_dejson.get('charset', False): 

conn_config["charset"] = conn.extra_dejson["charset"] 

if (conn_config["charset"]).lower() == 'utf8' or\ 

(conn_config["charset"]).lower() == 'utf-8': 

conn_config["use_unicode"] = True 

if conn.extra_dejson.get('cursor', False): 

if (conn.extra_dejson["cursor"]).lower() == 'sscursor': 

conn_config["cursorclass"] = MySQLdb.cursors.SSCursor 

elif (conn.extra_dejson["cursor"]).lower() == 'dictcursor': 

conn_config["cursorclass"] = MySQLdb.cursors.DictCursor 

elif (conn.extra_dejson["cursor"]).lower() == 'ssdictcursor': 

conn_config["cursorclass"] = MySQLdb.cursors.SSDictCursor 

local_infile = conn.extra_dejson.get('local_infile',False) 

if conn.extra_dejson.get('ssl', False): 

conn_config['ssl'] = conn.extra_dejson['ssl'] 

if local_infile: 

conn_config["local_infile"] = 1 

conn = MySQLdb.connect(**conn_config) 

return conn 

 

def bulk_load(self, table, tmp_file): 

""" 

Loads a tab-delimited file into a database table 

""" 

conn = self.get_conn() 

cur = conn.cursor() 

cur.execute(""" 

LOAD DATA LOCAL INFILE '{tmp_file}' 

INTO TABLE {table} 

""".format(**locals())) 

conn.commit()