Coverage for airflow/settings.py : 79%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
# -*- coding: utf-8 -*- # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #
pass pass pass def timing(cls, stat, dt): pass
from statsd import StatsClient statsd = StatsClient( host=conf.get('scheduler', 'statsd_host'), port=conf.getint('scheduler', 'statsd_port'), prefix=conf.get('scheduler', 'statsd_prefix')) Stats = statsd else:
____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ """
# Engine args not supported by sqlite engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE') engine_args['pool_recycle'] = conf.getint('core', 'SQL_ALCHEMY_POOL_RECYCLE')
sessionmaker(autocommit=False, autoflush=False, bind=engine))
# can't move this to conf due to ConfigParser interpolation '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
""" This policy setting allows altering task instances right before they are executed. It allows administrator to rewire some task parameters.
Note that the ``TaskInstance`` object has an attribute ``task`` pointing to its related task object, that in turns has a reference to the DAG object. So you can use the attributes of all of these to define your policy.
To define policy, add a ``airflow_local_settings`` module to your PYTHONPATH that defines this ``policy`` function. It receives a ``TaskInstance`` object and can alter it where needed.
Here are a few examples of how this can be useful:
* You could enforce a specific queue (say the ``spark`` queue) for tasks using the ``SparkOperator`` to make sure that these task instances get wired to the right workers * You could force all task instances running on an ``execution_date`` older than a week old to run in a ``backfill`` pool. * ... """ pass
format=LOG_FORMAT, stream=sys.stdout, level=LOGGING_LEVEL)
logging.info("Loaded airflow_local_settings.")
|