Coverage for airflow.executors.celery_executor : 46%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
''' To start the celery worker, run the command: airflow worker '''
configuration.get('celery', 'CELERY_APP_NAME'), config_source=CeleryConfig)
def execute_command(command): logging.info("Executing command in Celery " + command) rc = subprocess.Popen(command, shell=True).wait() if rc: logging.error(rc) raise AirflowException('Celery command failed')
""" CeleryExecutor is recommended for production use of Airflow. It allows distributing the execution of task instances to multiple worker nodes.
Celery is a simple, flexible and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. """
self.tasks = {} self.last_state = {}
logging.info( "[celery] queuing {key} through celery, " "queue={queue}".format(**locals())) self.tasks[key] = execute_command.apply_async( args=[command], queue=queue) self.last_state[key] = celery_states.PENDING
logging.debug( "Inquiring about {} celery task(s)".format(len(self.tasks))) for key, async in list(self.tasks.items()): state = async.state if self.last_state[key] != state: if state == celery_states.SUCCESS: self.success(key) del self.tasks[key] del self.last_state[key] elif state == celery_states.FAILURE: self.fail(key) del self.tasks[key] del self.last_state[key] elif state == celery_states.REVOKED: self.fail(key) del self.tasks[key] del self.last_state[key] else: logging.info("Unexpected state: " + async.state) self.last_state[key] = async.state
if synchronous: while any([ async.state not in celery_states.READY_STATES for async in self.tasks.values()]): time.sleep(5) |