Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

# -*- coding: utf-8 -*- 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

 

 

from builtins import bytes 

import logging 

from subprocess import Popen, STDOUT, PIPE 

from tempfile import gettempdir, NamedTemporaryFile 

 

from airflow.exceptions import AirflowException 

from airflow.models import BaseOperator 

from airflow.utils.decorators import apply_defaults 

from airflow.utils.file import TemporaryDirectory 

 

 

class BashOperator(BaseOperator): 

""" 

Execute a Bash script, command or set of commands. 

 

:param bash_command: The command, set of commands or reference to a 

bash script (must be '.sh') to be executed. 

:type bash_command: string 

:param env: If env is not None, it must be a mapping that defines the 

environment variables for the new process; these are used instead 

of inheriting the current process environment, which is the default 

behavior. (templated) 

:type env: dict 

:type output_encoding: output encoding of bash command 

""" 

template_fields = ('bash_command', 'env') 

template_ext = ('.sh', '.bash',) 

ui_color = '#f0ede4' 

 

@apply_defaults 

def __init__( 

self, 

bash_command, 

xcom_push=False, 

env=None, 

output_encoding='utf-8', 

*args, **kwargs): 

""" 

If xcom_push is True, the last line written to stdout will also 

be pushed to an XCom when the bash command completes. 

""" 

super(BashOperator, self).__init__(*args, **kwargs) 

self.bash_command = bash_command 

self.env = env 

self.xcom_push_flag = xcom_push 

self.output_encoding = output_encoding 

 

def execute(self, context): 

""" 

Execute the bash command in a temporary directory 

which will be cleaned afterwards 

""" 

bash_command = self.bash_command 

logging.info("tmp dir root location: \n" + gettempdir()) 

with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: 

with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: 

 

f.write(bytes(bash_command, 'utf_8')) 

f.flush() 

fname = f.name 

script_location = tmp_dir + "/" + fname 

logging.info("Temporary script " 

"location :{0}".format(script_location)) 

logging.info("Running command: " + bash_command) 

sp = Popen( 

['bash', fname], 

stdout=PIPE, stderr=STDOUT, 

cwd=tmp_dir, env=self.env) 

 

self.sp = sp 

 

logging.info("Output:") 

line = '' 

for line in iter(sp.stdout.readline, b''): 

line = line.decode(self.output_encoding).strip() 

logging.info(line) 

sp.wait() 

logging.info("Command exited with " 

"return code {0}".format(sp.returncode)) 

 

if sp.returncode: 

raise AirflowException("Bash command failed") 

 

if self.xcom_push_flag: 

return line 

 

def on_kill(self): 

logging.info('Sending SIGTERM signal to bash subprocess') 

self.sp.terminate()