Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

from builtins import object 

import logging 

import subprocess 

import time 

 

from celery import Celery 

from celery import states as celery_states 

 

from airflow.utils import AirflowException 

from airflow.executors.base_executor import BaseExecutor 

from airflow import configuration 

 

PARALLELISM = configuration.get('core', 'PARALLELISM') 

 

''' 

To start the celery worker, run the command: 

airflow worker 

''' 

 

DEFAULT_QUEUE = configuration.get('celery', 'DEFAULT_QUEUE') 

 

 

class CeleryConfig(object): 

    CELERY_ACCEPT_CONTENT = ['json', 'pickle'] 

    CELERYD_PREFETCH_MULTIPLIER = 1 

    CELERY_ACKS_LATE = True 

    BROKER_URL = configuration.get('celery', 'BROKER_URL') 

    CELERY_RESULT_BACKEND = configuration.get('celery', 'CELERY_RESULT_BACKEND') 

    CELERYD_CONCURRENCY = configuration.getint('celery', 'CELERYD_CONCURRENCY') 

    CELERY_DEFAULT_QUEUE = DEFAULT_QUEUE 

 

app = Celery( 

    configuration.get('celery', 'CELERY_APP_NAME'), 

    config_source=CeleryConfig) 

 

 

@app.task 

def execute_command(command): 

    logging.info("Executing command in Celery " + command) 

    rc = subprocess.Popen(command, shell=True).wait() 

    if rc: 

        logging.error(rc) 

        raise AirflowException('Celery command failed') 

 

 

class CeleryExecutor(BaseExecutor): 

    """ 

    CeleryExecutor is recommended for production use of Airflow. It allows 

    distributing the execution of task instances to multiple worker nodes. 

 

    Celery is a simple, flexible and reliable distributed system to process 

    vast amounts of messages, while providing operations with the tools 

    required to maintain such a system. 

    """ 

 

    def start(self): 

        self.tasks = {} 

        self.last_state = {} 

 

    def execute_async(self, key, command, queue=DEFAULT_QUEUE): 

        logging.info( 

            "[celery] queuing {key} through celery, " 

            "queue={queue}".format(**locals())) 

        self.tasks[key] = execute_command.apply_async( 

            args=[command], queue=queue) 

        self.last_state[key] = celery_states.PENDING 

 

    def sync(self): 

        logging.debug( 

            "Inquiring about {} celery task(s)".format(len(self.tasks))) 

        for key, async in list(self.tasks.items()): 

            state = async.state 

            if self.last_state[key] != state: 

                if state == celery_states.SUCCESS: 

                    self.success(key) 

                    del self.tasks[key] 

                    del self.last_state[key] 

                elif state == celery_states.FAILURE: 

                    self.fail(key) 

                    del self.tasks[key] 

                    del self.last_state[key] 

                elif state == celery_states.REVOKED: 

                    self.fail(key) 

                    del self.tasks[key] 

                    del self.last_state[key] 

                else: 

                    logging.info("Unexpected state: " + async.state) 

                self.last_state[key] = async.state 

 

    def end(self, synchronous=False): 

        if synchronous: 

            while any([ 

                    async.state not in celery_states.READY_STATES 

                    for async in self.tasks.values()]): 

                time.sleep(5)