Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

import datetime 

import ftplib 

import logging 

import os.path 

from airflow.hooks.base_hook import BaseHook 

from past.builtins import basestring 

 

 

def mlsd(conn, path="", facts=[]): 

    ''' 

    BACKPORT FROM PYTHON3 FTPLIB 

 

    List a directory in a standardized format by using MLSD 

    command (RFC-3659). If path is omitted the current directory 

    is assumed. "facts" is a list of strings representing the type 

    of information desired (e.g. ["type", "size", "perm"]). 

 

    Return a generator object yielding a tuple of two elements 

    for every file found in path. 

    First element is the file name, the second one is a dictionary 

    including a variable number of "facts" depending on the server 

    and whether "facts" argument has been provided. 

    ''' 

    if facts: 

        conn.sendcmd("OPTS MLST " + ";".join(facts) + ";") 

    if path: 

        cmd = "MLSD %s" % path 

    else: 

        cmd = "MLSD" 

    lines = [] 

    conn.retrlines(cmd, lines.append) 

    for line in lines: 

        facts_found, _, name = line.rstrip(ftplib.CRLF).partition(' ') 

        entry = {} 

        for fact in facts_found[:-1].split(";"): 

            key, _, value = fact.partition("=") 

            entry[key.lower()] = value 

        yield (name, entry) 

 

 

class FTPHook(BaseHook): 

 

    """ 

    Interact with FTP. 

 

    Errors that may occur throughout but should be handled 

    downstream. 

    """ 

 

    def __init__(self, ftp_conn_id='ftp_default'): 

        self.ftp_conn_id = ftp_conn_id 

        self.conn = None 

 

    def get_conn(self): 

        """ 

        Returns a FTP connection object 

        """ 

        if self.conn is None: 

            params = self.get_connection(self.ftp_conn_id) 

            self.conn = ftplib.FTP(params.host, params.login, params.password) 

 

        return self.conn 

 

    def close_conn(self): 

        """ 

        Closes the connection. An error will occur if the 

        connection wasnt ever opened. 

        """ 

        conn = self.conn 

        conn.quit() 

 

    def describe_directory(self, path): 

        """ 

        Returns a dictionary of {filename: {attributes}} for all files 

        on the remote system (where the MLSD command is supported). 

 

        :param path: full path to the remote directory 

        :type path: str 

        """ 

        conn = self.get_conn() 

        conn.cwd(path) 

        try: 

            # only works in Python 3 

            files = dict(conn.mlsd()) 

        except AttributeError: 

            files = dict(mlsd(conn)) 

        return files 

 

    def list_directory(self, path, nlst=False): 

        """ 

        Returns a list of files on the remote system. 

 

        :param path: full path to the remote directory to list 

        :type path: str 

        """ 

        conn = self.get_conn() 

        conn.cwd(path) 

 

        files = conn.nlst() 

        return files 

 

    def create_directory(self, path): 

        """ 

        Creates a directory on the remote system. 

 

        :param path: full path to the remote directory to create 

        :type path: str 

        """ 

        conn = self.get_conn() 

        conn.mkd(path) 

 

    def delete_directory(self, path): 

        """ 

        Deletes a directory on the remote system. 

 

        :param path: full path to the remote directory to delete 

        :type path: str 

        """ 

        conn = self.get_conn() 

        conn.rmd(path) 

 

    def retrieve_file(self, remote_full_path, local_full_path_or_buffer): 

        """ 

        Transfers the remote file to a local location. 

 

        If local_full_path_or_buffer is a string path, the file will be put 

        at that location; if it is a file-like buffer, the file will 

        be written to the buffer but not closed. 

 

        :param remote_full_path: full path to the remote file 

        :type remote_full_path: str 

        :param local_full_path_or_buffer: full path to the local file or a 

            file-like buffer 

        :type local_full_path: str or file-like buffer 

        """ 

        conn = self.get_conn() 

 

        is_path = isinstance(local_full_path_or_buffer, basestring) 

 

        if is_path: 

            output_handle = open(local_full_path_or_buffer, 'wb') 

        else: 

            output_handle = local_full_path_or_buffer 

 

        remote_path, remote_file_name = os.path.split(remote_full_path) 

        conn.cwd(remote_path) 

        logging.info('Retrieving file from FTP: {}'.format(remote_full_path)) 

        conn.retrbinary('RETR %s' % remote_file_name, output_handle.write) 

        logging.info('Finished etrieving file from FTP: {}'.format( 

            remote_full_path)) 

 

        if is_path: 

            output_handle.close() 

 

    def store_file(self, remote_full_path, local_full_path_or_buffer): 

        """ 

        Transfers a local file to the remote location. 

 

        If local_full_path_or_buffer is a string path, the file will be read 

        from that location; if it is a file-like buffer, the file will 

        be read from the buffer but not closed. 

 

        :param remote_full_path: full path to the remote file 

        :type remote_full_path: str 

        :param local_full_path_or_buffer: full path to the local file or a 

            file-like buffer 

        :type local_full_path_or_buffer: str or file-like buffer 

        """ 

        conn = self.get_conn() 

 

        is_path = isinstance(local_full_path_or_buffer, basestring) 

 

        if is_path: 

            input_handle = open(local_full_path_or_buffer, 'rb') 

        else: 

            input_handle = local_full_path_or_buffer 

        remote_path, remote_file_name = os.path.split(remote_full_path) 

        conn.cwd(remote_path) 

        conn.storbinary('STOR %s' % remote_file_name, input_handle) 

 

        if is_path: 

            input_handle.close() 

 

    def delete_file(self, path): 

        """ 

        Removes a file on the FTP Server 

 

        :param path: full path to the remote file 

        :type path: str 

        """ 

        conn = self.get_conn() 

        conn.delete(path) 

 

    def get_mod_time(self, path): 

        conn = self.get_conn() 

        ftp_mdtm = conn.sendcmd('MDTM ' + path) 

        return datetime.datetime.strptime(ftp_mdtm[4:], '%Y%m%d%H%M%S')