Coverage for airflow/executors/local_executor.py : 35%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
# -*- coding: utf-8 -*- # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.
multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue self.daemon = True
while True: key, command = self.task_queue.get() if key is None: # Received poison pill, no more tasks to run self.task_queue.task_done() break self.logger.info("{} running {}".format( self.__class__.__name__, command)) command = "exec bash -c '{0}'".format(command) try: subprocess.check_call(command, shell=True) state = State.SUCCESS except subprocess.CalledProcessError as e: state = State.FAILED self.logger.error("failed to execute task {}:".format(str(e))) # raise e self.result_queue.put((key, state)) self.task_queue.task_done() time.sleep(1)
""" LocalExecutor executes tasks locally in parallel. It uses the multiprocessing Python library and queues to parallelize the execution of tasks. """
self.queue = multiprocessing.JoinableQueue() self.result_queue = multiprocessing.Queue() self.workers = [ LocalWorker(self.queue, self.result_queue) for i in range(self.parallelism) ]
for w in self.workers: w.start()
self.queue.put((key, command))
while not self.result_queue.empty(): results = self.result_queue.get() self.change_state(*results)
# Sending poison pill to all worker [self.queue.put((None, None)) for w in self.workers] # Wait for commands to finish self.queue.join() self.sync() |