Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

from airflow.hooks.base_hook import BaseHook 

from airflow.configuration import conf 

 

try: 

    snakebite_imported = True 

    from snakebite.client import Client, HAClient, Namenode 

except ImportError: 

    snakebite_imported = False 

 

from airflow.utils import AirflowException 

 

 

class HDFSHookException(AirflowException): 

    pass 

 

 

class HDFSHook(BaseHook): 

    ''' 

    Interact with HDFS. This class is a wrapper around the snakebite library. 

    ''' 

    def __init__(self, hdfs_conn_id='hdfs_default'): 

        if not snakebite_imported: 

            raise ImportError( 

                'This HDFSHook implementation requires snakebite, but ' 

                'snakebite is not compatible with Python 3 ' 

                '(as of August 2015). Please use Python 2 if you require ' 

                'this hook  -- or help by submitting a PR!') 

        self.hdfs_conn_id = hdfs_conn_id 

 

    def get_conn(self): 

        ''' 

        Returns a snakebite HDFSClient object. 

        ''' 

        use_sasl = False 

        if conf.get('core','security') == 'kerberos': 

            use_sasl = True 

 

        connections = self.get_connections(self.hdfs_conn_id) 

        client = None 

        if len(connections) == 1: 

            client = Client(connections[0].host, connections[0].port,use_sasl=use_sasl) 

        elif len(connections) > 1: 

            nn = [Namenode(conn.host, conn.port) for conn in connections] 

            client = HAClient(nn, use_sasl=use_sasl) 

        else: 

            raise HDFSHookException("conn_id doesn't exist in the repository") 

        return client