Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

# -*- coding: utf-8 -*- 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

 

import json 

import logging 

from airflow.exceptions import AirflowException 

from airflow.models import BaseOperator 

from airflow.utils.decorators import apply_defaults 

from airflow.utils.file import TemporaryDirectory 

from docker import Client, tls 

import ast 

 

 

class DockerOperator(BaseOperator): 

""" 

Execute a command inside a docker container. 

 

A temporary directory is created on the host and mounted into a container to allow storing files 

that together exceed the default disk size of 10GB in a container. The path to the mounted 

directory can be accessed via the environment variable ``AIRFLOW_TMP_DIR``. 

 

:param image: Docker image from which to create the container. 

:type image: str 

:param api_version: Remote API version. 

:type api_version: str 

:param command: Command to be run in the container. 

:type command: str or list 

:param cpus: Number of CPUs to assign to the container. 

This value gets multiplied with 1024. See 

https://docs.docker.com/engine/reference/run/#cpu-share-constraint 

:type cpus: float 

:param docker_url: URL of the host running the docker daemon. 

:type docker_url: str 

:param environment: Environment variables to set in the container. 

:type environment: dict 

:param force_pull: Pull the docker image on every run. 

:type force_pull: bool 

:param mem_limit: Maximum amount of memory the container can use. Either a float value, which 

represents the limit in bytes, or a string like ``128m`` or ``1g``. 

:type mem_limit: float or str 

:param network_mode: Network mode for the container. 

:type network_mode: str 

:param tls_ca_cert: Path to a PEM-encoded certificate authority to secure the docker connection. 

:type tls_ca_cert: str 

:param tls_client_cert: Path to the PEM-encoded certificate used to authenticate docker client. 

:type tls_client_cert: str 

:param tls_client_key: Path to the PEM-encoded key used to authenticate docker client. 

:type tls_client_key: str 

:param tls_hostname: Hostname to match against the docker server certificate or False to 

disable the check. 

:type tls_hostname: str or bool 

:param tls_ssl_version: Version of SSL to use when communicating with docker daemon. 

:type tls_ssl_version: str 

:param tmp_dir: Mount point inside the container to a temporary directory created on the host by 

the operator. The path is also made available via the environment variable 

``AIRFLOW_TMP_DIR`` inside the container. 

:type tmp_dir: str 

:param user: Default user inside the docker container. 

:type user: int or str 

:param volumes: List of volumes to mount into the container, e.g. 

``['/host/path:/container/path', '/host/path2:/container/path2:ro']``. 

:param xcom_push: Does the stdout will be pushed to the next step using XCom. 

The default is False. 

:type xcom_push: bool 

:param xcom_all: Push all the stdout or just the last line. The default is False (last line). 

:type xcom_all: bool 

""" 

template_fields = ('command',) 

template_ext = ('.sh', '.bash',) 

 

@apply_defaults 

def __init__( 

self, 

image, 

api_version=None, 

command=None, 

cpus=1.0, 

docker_url='unix://var/run/docker.sock', 

environment=None, 

force_pull=False, 

mem_limit=None, 

network_mode=None, 

tls_ca_cert=None, 

tls_client_cert=None, 

tls_client_key=None, 

tls_hostname=None, 

tls_ssl_version=None, 

tmp_dir='/tmp/airflow', 

user=None, 

volumes=None, 

xcom_push=False, 

xcom_all=False, 

*args, 

**kwargs): 

 

super(DockerOperator, self).__init__(*args, **kwargs) 

self.api_version = api_version 

self.command = command 

self.cpus = cpus 

self.docker_url = docker_url 

self.environment = environment or {} 

self.force_pull = force_pull 

self.image = image 

self.mem_limit = mem_limit 

self.network_mode = network_mode 

self.tls_ca_cert = tls_ca_cert 

self.tls_client_cert = tls_client_cert 

self.tls_client_key = tls_client_key 

self.tls_hostname = tls_hostname 

self.tls_ssl_version = tls_ssl_version 

self.tmp_dir = tmp_dir 

self.user = user 

self.volumes = volumes or [] 

self.xcom_push = xcom_push 

self.xcom_all = xcom_all 

 

self.cli = None 

self.container = None 

 

def execute(self, context): 

logging.info('Starting docker container from image ' + self.image) 

 

tls_config = None 

if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key: 

tls_config = tls.TLSConfig( 

ca_cert=self.tls_ca_cert, 

client_cert=(self.tls_client_cert, self.tls_client_key), 

verify=True, 

ssl_version=self.tls_ssl_version, 

assert_hostname=self.tls_hostname 

) 

self.docker_url = self.docker_url.replace('tcp://', 'https://') 

 

self.cli = Client(base_url=self.docker_url, version=self.api_version, tls=tls_config) 

 

if ':' not in self.image: 

image = self.image + ':latest' 

else: 

image = self.image 

 

if self.force_pull or len(self.cli.images(name=image)) == 0: 

logging.info('Pulling docker image ' + image) 

for l in self.cli.pull(image, stream=True): 

output = json.loads(l) 

logging.info("{}".format(output['status'])) 

 

cpu_shares = int(round(self.cpus * 1024)) 

 

with TemporaryDirectory(prefix='airflowtmp') as host_tmp_dir: 

self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir 

self.volumes.append('{0}:{1}'.format(host_tmp_dir, self.tmp_dir)) 

 

self.container = self.cli.create_container( 

command=self.get_command(), 

cpu_shares=cpu_shares, 

environment=self.environment, 

host_config=self.cli.create_host_config(binds=self.volumes, 

network_mode=self.network_mode), 

image=image, 

mem_limit=self.mem_limit, 

user=self.user 

) 

self.cli.start(self.container['Id']) 

 

line = '' 

for line in self.cli.logs(container=self.container['Id'], stream=True): 

logging.info("{}".format(line.strip())) 

 

exit_code = self.cli.wait(self.container['Id']) 

if exit_code != 0: 

raise AirflowException('docker container failed') 

 

if self.xcom_push: 

return self.cli.logs(container=self.container['Id']) if self.xcom_all else str(line.strip()) 

 

def get_command(self): 

if self.command is not None and self.command.strip().find('[') == 0: 

commands = ast.literal_eval(self.command) 

else: 

commands = self.command 

return commands 

 

def on_kill(self): 

if self.cli is not None: 

logging.info('Stopping docker container') 

self.cli.stop(self.container['Id'])