Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

from slackclient import SlackClient 

from airflow.models import BaseOperator 

from airflow.utils import apply_defaults 

 

 

class SlackAPIOperator(BaseOperator): 

    """ 

    Base Slack Operator 

    The SlackAPIPostOperator is derived from this operator. 

    In the future additional Slack API Operators will be derived from this class as well 

 

    :param token: Slack API token (https://api.slack.com/web) 

    :type token: string 

    :param method: The Slack API Method to Call (https://api.slack.com/methods) 

    :type method: string 

    :param params: API Method call parameters (https://api.slack.com/methods) 

    :type params: dict 

    """ 

 

    @apply_defaults 

    def __init__(self, 

                 token='unset', 

                 method='unset', 

                 params=None, 

                 *args, **kwargs): 

        super(SlackAPIOperator, self).__init__(*args, **kwargs) 

        self.token = token 

        self.method = method 

        self.params = params 

 

    def construct_api_call_params(self): 

        """ 

        Used by the execute function. Allows templating on the source fields of the api_call_params dict before construction 

 

        Override in child classes. 

        Each SlackAPIOperator child class is responsible for having a construct_api_call_params function 

        which sets self.api_call_params with a dict of API call parameters (https://api.slack.com/methods) 

        """ 

 

        pass 

 

    def execute(self, **kwargs): 

        """ 

        SlackAPIOperator calls will not fail even if the call is not unsuccessful. 

        It should not prevent a DAG from completing in success 

        """ 

        if not self.params: 

            self.construct_api_call_params() 

        sc = SlackClient(self.token) 

        sc.api_call(self.method, **self.params) 

 

 

class SlackAPIPostOperator(SlackAPIOperator): 

    """ 

    Posts messages to a slack channel 

 

    :param channel: channel in which to post message on slack name (#general) or ID (C12318391) 

    :type channel: string 

    :param username: Username that airflow will be posting to Slack as 

    :type username: string 

    :param text: message to send to slack 

    :type text: string 

    :param icon_url: url to icon used for this message 

    :type icon_url: string 

    """ 

 

    template_fields = ('username', 'text') 

    ui_color = '#FFBA40' 

 

    @apply_defaults 

    def __init__(self, 

                 channel='#general', 

                 username='Airflow', 

                 text='No message has been set.\n' 

                      'Here is a cat video instead\n' 

                      'https://www.youtube.com/watch?v=J---aiyznGQ', 

                 icon_url='https://raw.githubusercontent.com/airbnb/airflow/master/airflow/www/static/pin_100.png', 

                 *args, **kwargs): 

        self.method = 'chat.postMessage' 

        self.channel = channel 

        self.username = username 

        self.text = text 

        self.icon_url = icon_url 

        super(SlackAPIPostOperator, self).__init__(method=self.method, 

                                                   *args, **kwargs) 

 

    def construct_api_call_params(self): 

        self.params = { 

            'channel': self.channel, 

            'username': self.username, 

            'text': self.text, 

            'icon_url': self.icon_url, 

        }