Coverage for airflow.bin.cli : 48%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
#!/usr/bin/env python
# Common help text across subcommands
subdir = subdir.replace("DAGS_FOLDER", dags_folder) raise AirflowException( "subdir has to be part of your DAGS_FOLDER as defined in your " "airflow.cfg")
log = logging.getLogger() log.setLevel(settings.LOGGING_LEVEL) logformat = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch = logging.StreamHandler(sys.stdout) ch.setFormatter(logformat) log.addHandler(ch)
level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) raise AirflowException('dag_id could not be found')
args.end_date = dateutil.parser.parse(args.end_date)
# If only one date is passed, using same as start and end
dag = dag.sub_dag( task_regex=args.task_regex, include_upstream=not args.ignore_dependencies)
print("Dry run of DAG {0} on {1}".format(args.dag_id, args.start_date)) for task in dag.tasks: print("Task {0}".format(task.task_id)) ti = TaskInstance(task, args.start_date) ti.dry_run() else: start_date=args.start_date, end_date=args.end_date, mark_success=args.mark_success, include_adhoc=args.include_adhoc, local=args.local, donot_pickle=(args.donot_pickle or configuration.getboolean('core', 'donot_pickle')), ignore_dependencies=args.ignore_dependencies, pool=args.pool)
log_to_stdout() session = settings.Session() # TODO: verify dag_id execution_date = datetime.now() dr = session.query(DagRun).filter( DagRun.dag_id==args.dag_id, DagRun.run_id==args.run_id).first() if dr: logging.error("This run_id already exists") else: trigger = DagRun( dag_id=args.dag_id, run_id=args.run_id, execution_date=execution_date, state=State.RUNNING, external_trigger=True) session.add(trigger) logging.info("Created {}".format(trigger)) session.commit()
utils.pessimistic_connection_handling() # Setting up logging log = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) directory = log + "/{args.dag_id}/{args.task_id}".format(args=args) if not os.path.exists(directory): os.makedirs(directory) args.execution_date = dateutil.parser.parse(args.execution_date) iso = args.execution_date.isoformat() filename = "{directory}/{iso}".format(**locals())
# store old log (to help with S3 appends) if os.path.exists(filename): with open(filename, 'r') as logfile: old_log = logfile.read() else: old_log = None
subdir = process_subdir(args.subdir) logging.basicConfig( filename=filename, level=settings.LOGGING_LEVEL, format=settings.LOG_FORMAT) if not args.pickle: dagbag = DagBag(subdir) if args.dag_id not in dagbag.dags: msg = 'DAG [{0}] could not be found'.format(args.dag_id) logging.error(msg) raise AirflowException(msg) dag = dagbag.dags[args.dag_id] task = dag.get_task(task_id=args.task_id) else: session = settings.Session() logging.info('Loading pickle id {args.pickle}'.format(**locals())) dag_pickle = session.query( DagPickle).filter(DagPickle.id == args.pickle).first() if not dag_pickle: raise AirflowException("Who hid the pickle!? [missing pickle]") dag = dag_pickle.pickle task = dag.get_task(task_id=args.task_id)
task_start_date = None if args.task_start_date: task_start_date = dateutil.parser.parse(args.task_start_date) task.start_date = task_start_date ti = TaskInstance(task, args.execution_date)
if args.local: print("Logging into: " + filename) run_job = jobs.LocalTaskJob( task_instance=ti, mark_success=args.mark_success, force=args.force, pickle_id=args.pickle, task_start_date=task_start_date, ignore_dependencies=args.ignore_dependencies, pool=args.pool) run_job.run() elif args.raw: ti.run( mark_success=args.mark_success, force=args.force, ignore_dependencies=args.ignore_dependencies, job_id=args.job_id, pool=args.pool, ) else: pickle_id = None if args.ship_dag: try: # Running remotely, so pickling the DAG session = settings.Session() pickle = DagPickle(dag) session.add(pickle) session.commit() pickle_id = pickle.id print(( 'Pickled dag {dag} ' 'as pickle_id:{pickle_id}').format(**locals())) except Exception as e: print('Could not pickle the DAG') print(e) raise e
executor = DEFAULT_EXECUTOR executor.start() print("Sending to executor.") executor.queue_task_instance( ti, mark_success=args.mark_success, pickle_id=pickle_id, ignore_dependencies=args.ignore_dependencies, force=args.force) executor.heartbeat() executor.end()
if configuration.get('core', 'S3_LOG_FOLDER').startswith('s3:'): import boto s3_log = filename.replace(log, configuration.get('core', 'S3_LOG_FOLDER')) bucket, key = s3_log.lstrip('s3:/').split('/', 1) if os.path.exists(filename):
# get logs with open(filename, 'r') as logfile: new_log = logfile.read()
# remove old logs (since they are already in S3) if old_log: new_log.replace(old_log, '')
try: s3 = boto.connect_s3() s3_key = boto.s3.key.Key(s3.get_bucket(bucket), key)
# append new logs to old S3 logs, if available if s3_key.exists(): old_s3_log = s3_key.get_contents_as_string().decode() new_log = old_s3_log + '\n' + new_log
# send log to S3 s3_key.set_contents_from_string(new_log) except: print('Could not send logs to S3.')
""" Returns the state of a TaskInstance at the command line.
>>> airflow task_state tutorial sleep 2015-01-01 success """ args.execution_date = dateutil.parser.parse(args.execution_date) dagbag = DagBag(process_subdir(args.subdir)) if args.dag_id not in dagbag.dags: raise AirflowException('dag_id could not be found') dag = dagbag.dags[args.dag_id] task = dag.get_task(task_id=args.task_id) ti = TaskInstance(task, args.execution_date) print(ti.current_state())
dagbag = DagBag(process_subdir(args.subdir)) print("\n".join(sorted(dagbag.dags)))
dagbag = DagBag(process_subdir(args.subdir)) if args.dag_id not in dagbag.dags: raise AirflowException('dag_id could not be found') dag = dagbag.dags[args.dag_id] if args.tree: dag.tree_view() else: tasks = sorted([t.task_id for t in dag.tasks]) print("\n".join(sorted(tasks)))
log_to_stdout() args.execution_date = dateutil.parser.parse(args.execution_date) dagbag = DagBag(process_subdir(args.subdir)) if args.dag_id not in dagbag.dags: raise AirflowException('dag_id could not be found') dag = dagbag.dags[args.dag_id] task = dag.get_task(task_id=args.task_id) ti = TaskInstance(task, args.execution_date)
if args.dry_run: ti.dry_run() else: ti.run(force=True, ignore_dependencies=True, test_mode=True)
logging.basicConfig( level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) dagbag = DagBag(process_subdir(args.subdir))
if args.dag_id not in dagbag.dags: raise AirflowException('dag_id could not be found') dag = dagbag.dags[args.dag_id]
if args.start_date: args.start_date = dateutil.parser.parse(args.start_date) if args.end_date: args.end_date = dateutil.parser.parse(args.end_date)
if args.task_regex: dag = dag.sub_dag( task_regex=args.task_regex, include_downstream=args.downstream, include_upstream=args.upstream, ) dag.clear( start_date=args.start_date, end_date=args.end_date, only_failed=args.only_failed, only_running=args.only_running, confirm_prompt=not args.no_confirm)
print(settings.HEADER) log_to_stdout() from airflow.www.app import cached_app app = cached_app(configuration) threads = args.threads or configuration.get('webserver', 'threads') if args.debug: print( "Starting the web server on port {0} and host {1}.".format( args.port, args.hostname)) app.run(debug=True, port=args.port, host=args.hostname) else: print( 'Running the Gunicorn server with {threads}' 'on host {args.hostname} and port ' '{args.port}...'.format(**locals())) sp = subprocess.Popen([ 'gunicorn', '-w', str(args.threads), '-t', '120', '-b', args.hostname + ':' + str(args.port), 'airflow.www.app:cached_app()']) sp.wait()
print(settings.HEADER) log_to_stdout() job = jobs.SchedulerJob( dag_id=args.dag_id, subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle) job.run()
print("Starting flask") import flask flask_app = flask.Flask(__name__)
@flask_app.route('/log/<path:filename>') def serve_logs(filename): log = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) return flask.send_from_directory( log, filename, mimetype="application/json", as_attachment=False) WORKER_LOG_SERVER_PORT = \ int(configuration.get('celery', 'WORKER_LOG_SERVER_PORT')) flask_app.run( host='0.0.0.0', port=WORKER_LOG_SERVER_PORT)
# Worker to serve static log files through this simple flask app env = os.environ.copy() env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME sp = subprocess.Popen(['airflow', 'serve_logs'], env=env)
# Celery worker from airflow.executors.celery_executor import app as celery_app from celery.bin import worker
worker = worker.worker(app=celery_app) options = { 'optimization': 'fair', 'O': 'fair', 'queues': args.queues, 'concurrency': args.concurrency, } worker.run(**options) sp.kill()
print("DB: " + configuration.get('core', 'SQL_ALCHEMY_CONN')) utils.initdb() print("Done.")
print("DB: " + configuration.get('core', 'SQL_ALCHEMY_CONN')) if input( "This will drop existing tables if they exist. " "Proceed? (y/n)").upper() == "Y": logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) utils.resetdb() else: print("Bail.")
print("DB: " + configuration.get('core', 'SQL_ALCHEMY_CONN')) utils.upgradedb()
print(settings.HEADER + " v" + airflow.__version__)
broka = configuration.get('celery', 'BROKER_URL') args.port = args.port or configuration.get('celery', 'FLOWER_PORT') port = '--port=' + args.port api = '' if args.broker_api: api = '--broker_api=' + args.broker_api sp = subprocess.Popen(['flower', '-b', broka, port, api]) sp.wait()
print(settings.HEADER) log_to_stdout() import airflow.security.kerberos airflow.security.kerberos.run()
"-t", "--task_regex", help="The regex to filter specific task_ids to backfill (optional)") "-s", "--start_date", help="Override start_date YYYY-MM-DD") "-e", "--end_date", help="Override end_date YYYY-MM-DD") "-m", "--mark_success", help=mark_success_help, action="store_true") "-l", "--local", help="Run the task using the LocalExecutor", action="store_true") "-x", "--donot_pickle", help=( "Do not attempt to pickle the DAG object to send over " "to the workers, just tell the workers to run their version " "of the code."), action="store_true") "-a", "--include_adhoc", help="Include dags with the adhoc parameter.", action="store_true") "-i", "--ignore_dependencies", help=( "Skip upstream tasks, run only the tasks " "matching the regexp. Only works in conjunction with task_regex"), action="store_true") "-sd", "--subdir", help=subdir_help, default=DAGS_FOLDER) "-p", "--pool", help="Pool to use to run the backfill") "-dr", "--dry_run", help="Perform a dry run", action="store_true")
"-t", "--task_regex", help="The regex to filter specific task_ids to clear (optional)") "-s", "--start_date", help="Override start_date YYYY-MM-DD") "-e", "--end_date", help="Override end_date YYYY-MM-DD") "-u", "--upstream", help=ht, action="store_true") "-f", "--only_failed", help=ht, action="store_true") "-r", "--only_running", help=ht, action="store_true") "-d", "--downstream", help=ht, action="store_true") "-sd", "--subdir", help=subdir_help, default=DAGS_FOLDER) "-c", "--no_confirm", help=ht, action="store_true")
"-r", "--run_id", help="Helps to indentify this run")
"execution_date", help="The execution date to run") "-sd", "--subdir", help=subdir_help, default=DAGS_FOLDER) "-s", "--task_start_date", help="Override the tasks's start_date (used internally)",) "-m", "--mark_success", help=mark_success_help, action="store_true") "-f", "--force", help="Force a run regardless or previous success", action="store_true") "-l", "--local", help="Runs the task locally, don't use the executor", action="store_true") "-r", "--raw", help=argparse.SUPPRESS, action="store_true") "--pool", help="Pool to use to run the task instance") "-i", "--ignore_dependencies", help="Ignore upstream and depends_on_past dependencies", action="store_true") "--ship_dag", help="Pickles (serializes) the DAG and ships it to the worker", action="store_true") "-p", "--pickle", help="Serialized pickle object of the entire dag (used internally)") "-j", "--job_id", help=argparse.SUPPRESS)
"Test a task instance. This will run a task without checking for " "dependencies or recording it's state in the database." ) "execution_date", help="The execution date to run") "-sd", "--subdir", help=subdir_help, default=DAGS_FOLDER) "-dr", "--dry_run", help="Perform a dry run", action="store_true")
"execution_date", help="The execution date to check") "-sd", "--subdir", help=subdir_help, default=DAGS_FOLDER)
"-p", "--port", default=configuration.get('webserver', 'WEB_SERVER_PORT'), type=int, help="Set the port on which to run the web server") "-w", "--threads", default=configuration.get('webserver', 'THREADS'), type=int, help="Number of threads to run the webserver on") "-hn", "--hostname", default=configuration.get('webserver', 'WEB_SERVER_HOST'), help="Set the hostname on which to run the web server") "-d", "--debug", help=ht, action="store_true")
"-d", "--dag_id", help="The id of the dag to run") "-sd", "--subdir", help=subdir_help, default=DAGS_FOLDER) "-n", "--num_runs", default=None, type=int, help="Set the number of runs to execute before exiting") "-p", "--do_pickle", default=False, help=( "Attempt to pickle the DAG object to send over " "to the workers, instead of letting workers run their version " "of the code."), action="store_true")
"-sd", "--subdir", help=subdir_help, default=DAGS_FOLDER)
"-t", "--tree", help="Tree view", action="store_true") "dag_id", help="The id of the dag") "-sd", "--subdir", help=subdir_help, default=DAGS_FOLDER)
"-q", "--queues", help="Comma delimited list of queues to serve", default=configuration.get('celery', 'DEFAULT_QUEUE')) "-c", "--concurrency", type=int, help="The number of worker processes", default=configuration.get('celery', 'celeryd_concurrency'))
"-p", "--port", help="The port") "-a", "--broker_api", help="Broker api")
"-kt", "--keytab", help="keytab", nargs='?', default=configuration.get('kerberos', 'keytab')) "principal", help="kerberos principal", nargs='?', default=configuration.get('kerberos', 'principal'))
|