Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

 

from builtins import bytes 

import logging 

from subprocess import Popen, STDOUT, PIPE 

from tempfile import gettempdir, NamedTemporaryFile 

 

from airflow.utils import AirflowException 

from airflow.models import BaseOperator 

from airflow.utils import apply_defaults, TemporaryDirectory 

 

 

class BashOperator(BaseOperator): 

    """ 

    Execute a Bash script, command or set of commands. 

 

    :param bash_command: The command, set of commands or reference to a 

        bash script (must be '.sh') to be executed. 

    :type bash_command: string 

    :param env: If env is not None, it must be a mapping that defines the 

        environment variables for the new process; these are used instead 

        of inheriting the current process environment, which is the default 

        behavior. 

    :type env: dict 

    """ 

    template_fields = ('bash_command',) 

    template_ext = ('.sh', '.bash',) 

    ui_color = '#f0ede4' 

 

    @apply_defaults 

    def __init__( 

            self, 

            bash_command, 

            xcom_push=False, 

            env=None, 

            *args, **kwargs): 

        """ 

        If xcom_push is True, the last line written to stdout will also 

        be pushed to an XCom when the bash command completes. 

        """ 

        super(BashOperator, self).__init__(*args, **kwargs) 

        self.bash_command = bash_command 

        self.env = env 

        self.xcom_push = xcom_push 

 

    def execute(self, context): 

        """ 

        Execute the bash command in a temporary directory 

        which will be cleaned afterwards 

        """ 

        bash_command = self.bash_command 

        logging.info("tmp dir root location: \n" + gettempdir()) 

        with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: 

            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: 

 

                f.write(bytes(bash_command, 'utf_8')) 

                f.flush() 

                fname = f.name 

                script_location = tmp_dir + "/" + fname 

                logging.info("Temporary script " 

                             "location :{0}".format(script_location)) 

                logging.info("Running command: " + bash_command) 

                sp = Popen( 

                    ['bash', fname], 

                    stdout=PIPE, stderr=STDOUT, 

                    cwd=tmp_dir, env=self.env) 

 

                self.sp = sp 

 

                logging.info("Output:") 

                line = '' 

                for line in iter(sp.stdout.readline, b''): 

                    logging.info(line.strip()) 

                sp.wait() 

                logging.info("Command exited with " 

                             "return code {0}".format(sp.returncode)) 

 

                if sp.returncode: 

                    raise AirflowException("Bash command failed") 

 

        if self.xcom_push: 

            return str(line.strip()) 

 

    def on_kill(self): 

        logging.info('Sending SIGTERM signal to bash subprocess') 

        self.sp.terminate()