stale[bot] closed pull request #748: End-to-end DAG testing
URL: https://github.com/apache/incubator-airflow/pull/748
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index eeaa5b0531..8508b0cce7 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -467,7 +467,7 @@ def serve_logs(filename): # noqa
def worker(args):
env = os.environ.copy()
- env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME
+ env['AIRFLOW_HOME'] = conf.get_airflow_home()
# Celery worker
from airflow.executors.celery_executor import app as celery_app
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 173eddb740..8b2c640785 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -10,6 +10,7 @@
import subprocess
import warnings
+from airflow import settings
from future import standard_library
standard_library.install_aliases()
@@ -649,14 +650,6 @@ def parameterized_config(template):
logging.info("Reading the config from " + AIRFLOW_CONFIG)
-def test_mode():
- conf = ConfigParserWithDefaults(defaults)
- conf.read(TEST_CONFIG)
-
-conf = ConfigParserWithDefaults(defaults)
-conf.read(AIRFLOW_CONFIG)
-
-
def get(section, key, **kwargs):
return conf.get(section, key, **kwargs)
@@ -681,18 +674,78 @@ def remove_option(section, option):
return conf.remove_option(section, option)
+def set(section, option, value): # noqa
+ return conf.set(section, option, value)
+
+
+########################
+# Convenience method to access config entries.
+
+
+def get_dags_folder():
+ return os.path.expanduser(get('core', 'DAGS_FOLDER'))
+
+
+def get_airflow_home():
+ return os.path.expanduser(get('core', 'AIRFLOW_HOME'))
+
+
+def get_sql_alchemy_conn():
+ return get('core', 'SQL_ALCHEMY_CONN')
+
+
+################
+# global config init
+
+conf = None
+
+
+def test_mode():
+ conf = ConfigParserWithDefaults(defaults)
+ conf.read(TEST_CONFIG_FILE)
+
+
+def load_config():
+ """
+ loads the config and triggers the connection to the SQL-alchemy backend
+ """
+ global conf
+ conf = ConfigParserWithDefaults(defaults)
+ conf.read(AIRFLOW_CONFIG)
+ settings.connect(get_sql_alchemy_conn(),
+ conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE'),
+ conf.getint('core', 'SQL_ALCHEMY_POOL_RECYCLE'))
+
+load_config()
+
+
def as_dict(display_source=False, display_sensitive=False):
return conf.as_dict(
display_source=display_source, display_sensitive=display_sensitive)
as_dict.__doc__ = conf.as_dict.__doc__
-def set(section, option, value): # noqa
- return conf.set(section, option, value)
+class DummyStatsLogger(object):
+ @classmethod
+ def incr(cls, stat, count=1, rate=1):
+ pass
-########################
-# convenience method to access config entries
+ @classmethod
+ def decr(cls, stat, count=1, rate=1):
+ pass
+ @classmethod
+ def gauge(cls, stat, value, rate=1, delta=False):
+ pass
-def get_dags_folder():
- return os.path.expanduser(get('core', 'DAGS_FOLDER'))
+Stats = DummyStatsLogger
+
+if conf.getboolean('scheduler', 'statsd_on'):
+ from statsd import StatsClient
+ statsd = StatsClient(
+ host=conf.get('scheduler', 'statsd_host'),
+ port=conf.getint('scheduler', 'statsd_port'),
+ prefix=conf.get('scheduler', 'statsd_prefix'))
+ Stats = statsd
+else:
+ Stats = DummyStatsLogger
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index 03075834ea..e205704fca 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -9,7 +9,7 @@
class BaseExecutor(LoggingMixin):
- def __init__(self, parallelism=PARALLELISM):
+ def __init__(self, parallelism=PARALLELISM, env=None):
"""
Class to derive in order to interface with executor-type systems
like Celery, Mesos, Yarn and the likes.
@@ -17,8 +17,13 @@ def __init__(self, parallelism=PARALLELISM):
:param parallelism: how many jobs should run at one time. Set to
``0`` for infinity
:type parallelism: int
+
+ :param env: environment variables to provide to the child process (
+ defaults to environment of current process)
"""
self.parallelism = parallelism
+ self.env = env
+
self.queued_tasks = {}
self.running = {}
self.event_buffer = {}
diff --git a/airflow/executors/local_executor.py
b/airflow/executors/local_executor.py
index f13ee6d135..7ad50eeace 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -14,11 +14,12 @@
class LocalWorker(multiprocessing.Process, LoggingMixin):
- def __init__(self, task_queue, result_queue):
+ def __init__(self, task_queue, result_queue, env=None):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.daemon = True
+ self.env = env
def run(self):
while True:
@@ -31,7 +32,7 @@ def run(self):
self.__class__.__name__, command))
command = "exec bash -c '{0}'".format(command)
try:
- subprocess.check_call(command, shell=True)
+ subprocess.check_call(command, shell=True, env=self.env)
state = State.SUCCESS
except subprocess.CalledProcessError as e:
state = State.FAILED
@@ -49,11 +50,14 @@ class LocalExecutor(BaseExecutor):
of tasks.
"""
+ def __init__(self, **kwargs):
+ super(LocalExecutor, self).__init__(**kwargs)
+
def start(self):
self.queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.Queue()
self.workers = [
- LocalWorker(self.queue, self.result_queue)
+ LocalWorker(self.queue, self.result_queue, self.env)
for i in range(self.parallelism)
]
@@ -61,6 +65,7 @@ def start(self):
w.start()
def execute_async(self, key, command, queue=None):
+ self.logger.info("adding an async task {}: {}".format(key, command))
self.queue.put((key, command))
def sync(self):
diff --git a/airflow/executors/sequential_executor.py
b/airflow/executors/sequential_executor.py
index 53d9f0a626..a22c7e4faf 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -14,8 +14,8 @@ class SequentialExecutor(BaseExecutor):
Since we want airflow to work out of the box, it defaults to this
SequentialExecutor alongside sqlite as you first install it.
"""
- def __init__(self):
- super(SequentialExecutor, self).__init__()
+ def __init__(self, **kwargs):
+ super(SequentialExecutor, self).__init__(**kwargs)
self.commands_to_run = []
def execute_async(self, key, command, queue=None):
@@ -26,7 +26,7 @@ def sync(self):
self.logger.info("Executing command: {}".format(command))
try:
- subprocess.check_call(command, shell=True)
+ subprocess.check_call(command, shell=True, env=self.env)
self.change_state(key, State.SUCCESS)
except subprocess.CalledProcessError as e:
self.change_state(key, State.FAILED)
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 43314e7229..647700bc36 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -43,7 +43,7 @@
Base = models.Base
ID_LEN = models.ID_LEN
-Stats = settings.Stats
+Stats = conf.Stats
class BaseJob(Base, LoggingMixin):
@@ -704,7 +704,7 @@ def _execute(self):
self.logger.info("Starting the scheduler")
dagbag = models.DagBag(self.subdir, sync_to_db=True)
- executor = self.executor = dagbag.executor
+ executor = self.executor
executor.start()
self.runs = 0
while not self.num_runs or self.num_runs > self.runs:
diff --git a/airflow/models.py b/airflow/models.py
index 97a2fd8897..f32084fd78 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -69,8 +69,6 @@
Base = declarative_base()
ID_LEN = 250
-SQL_ALCHEMY_CONN = configuration.get('core', 'SQL_ALCHEMY_CONN')
-DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER'))
XCOM_RETURN_KEY = 'return_value'
ENCRYPTION_ON = False
@@ -81,7 +79,7 @@
except:
pass
-if 'mysql' in SQL_ALCHEMY_CONN:
+if 'mysql' in configuration.get_sql_alchemy_conn():
LongText = LONGTEXT
else:
LongText = Text
@@ -145,17 +143,24 @@ def __init__(
self,
dag_folder=None,
executor=DEFAULT_EXECUTOR,
- include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES'),
+ include_examples=None,
sync_to_db=False):
- dag_folder = dag_folder or DAGS_FOLDER
+ dag_folder = dag_folder or configuration.get_dags_folder()
self.logger.info("Filling up the DagBag from {}".format(dag_folder))
+
self.dag_folder = dag_folder
self.dags = {}
self.sync_to_db = sync_to_db
self.file_last_changed = {}
self.executor = executor
self.import_errors = {}
+
+ # reading the config here and not as constructor arg since config can
+ # now be reloaded
+ if include_examples is None:
+ include_examples = configuration.getboolean('core',
'LOAD_EXAMPLES')
+
if include_examples:
example_dag_folder = os.path.join(
os.path.dirname(__file__),
@@ -2504,7 +2509,7 @@ def filepath(self):
"""
File location of where the dag object is instantiated
"""
- fn = self.full_filepath.replace(DAGS_FOLDER + '/', '')
+ fn = self.full_filepath.replace(configuration.get_dags_folder() + '/',
'')
fn = fn.replace(os.path.dirname(__file__) + '/', '')
return fn
diff --git a/airflow/operators/bash_operator.py
b/airflow/operators/bash_operator.py
index 3e9c7ef97b..3da8b96b31 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -80,7 +80,8 @@ def execute(self, context):
"return code {0}".format(sp.returncode))
if sp.returncode:
- raise AirflowException("Bash command failed")
+ raise AirflowException("Bash command failed: {}".format(
+ bash_command))
if self.xcom_push_flag:
return line
diff --git a/airflow/settings.py b/airflow/settings.py
index ae56455649..61db426b06 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -18,40 +18,11 @@
from __future__ import unicode_literals
import logging
-import os
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
-from airflow import configuration as conf
-
-
-class DummyStatsLogger(object):
- @classmethod
- def incr(cls, stat, count=1, rate=1):
- pass
- @classmethod
- def decr(cls, stat, count=1, rate=1):
- pass
- @classmethod
- def gauge(cls, stat, value, rate=1, delta=False):
- pass
-
-Stats = DummyStatsLogger
-
-if conf.getboolean('scheduler', 'statsd_on'):
- from statsd import StatsClient
- statsd = StatsClient(
- host=conf.get('scheduler', 'statsd_host'),
- port=conf.getint('scheduler', 'statsd_port'),
- prefix=conf.get('scheduler', 'statsd_prefix'))
- Stats = statsd
-else:
- Stats = DummyStatsLogger
-
-
-
HEADER = """\
____________ _____________
____ |__( )_________ __/__ /________ __
@@ -61,21 +32,7 @@ def gauge(cls, stat, value, rate=1, delta=False):
"""
BASE_LOG_URL = '/admin/airflow/log'
-AIRFLOW_HOME = os.path.expanduser(conf.get('core', 'AIRFLOW_HOME'))
-SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
LOGGING_LEVEL = logging.INFO
-DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
-
-engine_args = {}
-if 'sqlite' not in SQL_ALCHEMY_CONN:
- # Engine args not supported by sqlite
- engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE')
- engine_args['pool_recycle'] = conf.getint('core',
- 'SQL_ALCHEMY_POOL_RECYCLE')
-
-engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
-Session = scoped_session(
- sessionmaker(autocommit=False, autoflush=False, bind=engine))
# can't move this to conf due to ConfigParser interpolation
LOG_FORMAT = (
@@ -83,6 +40,30 @@ def gauge(cls, stat, value, rate=1, delta=False):
SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s'
+engine = None
+Session = None
+
+
+def connect(sql_alchemy_conn, pool_size, pool_recycle):
+ """
+ builds the Session object to connect to the SQL-alchemy backend.
+ """
+ global Session, engine
+
+ if Session:
+ Session.remove()
+
+ engine_args = {}
+ if 'sqlite' not in sql_alchemy_conn:
+ # Engine args not supported by sqlite
+ engine_args['pool_size'] = pool_size
+ engine_args['pool_recycle'] = pool_recycle
+
+ engine = create_engine(sql_alchemy_conn, **engine_args)
+ Session = scoped_session(sessionmaker(autocommit=False, autoflush=False,
+ bind=engine))
+
+
def policy(task_instance):
"""
This policy setting allows altering task instances right before they
diff --git a/run_unit_tests.sh b/run_unit_tests.sh
index 71df44cbeb..a0c8b02d33 100755
--- a/run_unit_tests.sh
+++ b/run_unit_tests.sh
@@ -34,4 +34,4 @@ echo "Starting the unit tests with the following nose
arguments: "$nose_args
nosetests $nose_args
# To run individual tests:
-# nosetests tests.core:CoreTest.test_scheduler_job
+# ./run_unit_tests.sh tests.core:CoreTest.test_scheduler_job
diff --git a/tests_dags/__init__.py b/tests_dags/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/tests_dags/dag_tester.py b/tests_dags/dag_tester.py
new file mode 100644
index 0000000000..bfbd093f66
--- /dev/null
+++ b/tests_dags/dag_tester.py
@@ -0,0 +1,310 @@
+"""
+General entry point for testing end to end dags
+"""
+import logging
+import shutil
+import tempfile
+import time
+import unittest
+
+import os
+import re
+from airflow import configuration
+from airflow import executors
+from airflow.configuration import TEST_CONFIG_FILE, get_sql_alchemy_conn
+from airflow.jobs import BackfillJob, SchedulerJob
+from airflow.models import DagBag, Variable, DagRun, TaskInstance
+from airflow.utils.logging import LoggingMixin
+from airflow.utils.db import provide_session
+
+
+def is_config_db_concurrent():
+ """
+ :return: true if the sqlalchemy connection is set up to use a DB that
+ supports concurrent access (i.e. not sqlite)
+ """
+ return "sqlite://" not in get_sql_alchemy_conn()
+
+
+class AbstractEndToEndTest(LoggingMixin):
+ """
+ Convenience super class with common abstract methods between
+ EndToEndBackfillJobTest and EndToEndSchedulerJobTest
+ """
+
+ def get_dag_file_names(self):
+ """
+ :return: a non empty list of python file names containing dag(s) to
+ be tested in the context of this test.
+ """
+
+ raise NotImplementedError()
+
+ def get_context(self):
+ """
+ :return: a dictionary of variables to be stored such that the
+ tested DAG can access them through a Variable.get("key") statement
+ """
+ return {}
+
+ def post_check(self, working_dir):
+ """
+ :param working_dir: the tmp file where the tested DAG has been
+ executed
+
+ Child classes should implement here any post-check and raise
+ exceptions via assertions to trigger a test failure.
+ """
+
+ raise NotImplementedError()
+
+
+class EndToEndBackfillJobTest(AbstractEndToEndTest):
+ """
+ Abstract class to implement in order to execute an end-to-end DAG test
based
+ on a BackfillJob.
+ """
+
+ def get_backfill_params(self):
+ """
+ :return: dictionary **kwargs argument for building the BackfillJob
+ execution of this test.
+ """
+ raise NotImplementedError()
+
+ @unittest.skipIf(not is_config_db_concurrent(),
+ "DB Backend must support concurrent access")
+ def test_backfilljob(self):
+
+ with BackFillJobRunner(self.get_backfill_params(),
+ dag_file_names=self.get_dag_file_names(),
+ context=self.get_context()) as runner:
+
+ runner.run()
+ self.post_check(runner.working_dir)
+
+
+class EndToEndSchedulerJobTest(AbstractEndToEndTest):
+ """
+ Abstract class to implement in order to execute an end-to-end DAG test
based
+ on a SchedulerJob.
+ """
+
+ def get_schedulerjob_params(self):
+ """
+ :return: dictionary **kwargs argument for building the BackfillJob
+ execution of this test.
+ """
+ raise NotImplementedError()
+
+ @unittest.skipIf(not is_config_db_concurrent(),
+ "DB Backend must support concurrent access")
+ def test_schedulerjob(self):
+
+ with SchedulerJobRunner(self.get_schedulerjob_params(),
+ dag_file_names=self.get_dag_file_names(),
+ context=self.get_context()) as runner:
+
+ runner.run()
+ self.post_check(runner.working_dir)
+
+
+class Runner(object):
+ """
+ Abstract Runner that prepares a working temp dir and all necessary context
+ variables in order to execute a job in its own isolated folder.
+ """
+
+ def __init__(self,
+ dag_file_names,
+ context=None):
+
+ self.dag_file_names = dag_file_names
+
+ # makes sure the default context is a different instance for each
Runner
+ self.context = context if context else {}
+
+ # this is initialized in the constructor of the child class
+ self.tested_job = None
+
+ # preparing a folder where to execute the tests, with all the DAGs
+ all_dags_folder = os.path.join(os.path.dirname(__file__), "dags")
+ self.working_dir = tempfile.mkdtemp()
+ self.it_dag_folder = os.path.join(self.working_dir, "dags")
+ os.mkdir(self.it_dag_folder)
+ for file_name in self.dag_file_names:
+ src = os.path.join(all_dags_folder, file_name)
+ shutil.copy2(src, self.it_dag_folder)
+
+ # saving the context to Variable so the child test can access it
+ # while saving existing Variables if the test would be overwriting any
+ self.context['unit_test_tmp_dir'] = self.working_dir
+ self.saved_variables = {}
+ for key, val in self.context.items():
+ try:
+ old_value = Variable.get(key)
+ self.saved_variables[key] = old_value
+ except ValueError:
+ pass
+ Variable.set(key, val, serialize_json=True)
+
+ self.config_file = self._create_it_config_file(self.it_dag_folder)
+
+ # aligns current config with test config (this of course would fail
+ # if several dag tests are executed in parallel threads)
+ configuration.AIRFLOW_CONFIG = self.config_file
+ configuration.load_config()
+
+ self.dagbag = DagBag(self.it_dag_folder, include_examples=False)
+
+ # environment variables for the child processes launched by the test
+ self.test_env = os.environ.copy()
+ self.test_env.update({"AIRFLOW_CONFIG": self.config_file})
+
+ self._reset_dags()
+
+ def run(self):
+ """
+ Starts the execution of the tested job.
+ """
+ self.tested_job.run()
+
+ def cleanup(self):
+ """
+ Deletes all traces of execution of the tested job.
+ This is called automatically if the Runner is used inside a with
+ statement
+ """
+ logging.info("cleaning up {}".format(self.tested_job))
+ self._reset_dags()
+ os.system("rm -rf {}".format(self.working_dir))
+
+ # Restore Variables that were overwritten
+ for key, val in self.saved_variables.items():
+ Variable.set(key, val)
+
+ def dag_ids(self):
+ """
+ :return: the set of all dag_ids tested by the test
+ """
+ return self.dagbag.dags.keys()
+
+ ##########################
+ # private methods
+
+ @provide_session
+ def _reset_dags(self, session=None):
+ for dag_id in self.dag_ids():
+ session.query(TaskInstance).filter_by(dag_id=dag_id).delete()
+ session.query(DagRun).filter_by(dag_id=dag_id).delete()
+
+ def _create_it_config_file(self, dag_folder):
+ """
+ Creates a custom config file for integration tests in the specified
+ location, overriding the dag_folder and heartbeat_sec values.
+ """
+
+ it_file_location = os.path.join(self.working_dir, "airflow_IT.cfg")
+
+ with open(TEST_CONFIG_FILE) as test_config_file:
+ config = test_config_file.read()
+
+ config = re.sub("dags_folder =.*",
+ "dags_folder = {}".format(dag_folder), config)
+ config = re.sub("job_heartbeat_sec =.*",
+ "job_heartbeat_sec = 1", config)
+ config = re.sub("load_examples =.*",
+ "load_examples = False", config)
+
+ # this is the config file that will be used by the child process
+ with open(it_file_location, "w") as cfg_file:
+ cfg_file.write(config)
+
+ return it_file_location
+
+ ###########################
+ # loan pattern to make any runner easily usable inside a with statement
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.cleanup()
+
+
+class BackFillJobRunner(Runner):
+ """
+ Executes a BackfillJob on the specified dagfiles, in its own tmp folder,
+ with all necessary Variables specified in context persisted so that the
+ child context has access to it.
+ """
+
+ def __init__(self, backfilljob_params, **kwargs):
+ super(BackFillJobRunner, self).__init__(**kwargs)
+
+ if self.dagbag.size() > 1:
+ self.cleanup()
+ assert False, "more than one dag found in BackfillJob test"
+
+ self.dag = list(self.dagbag.dags.values())[0]
+ self.tested_job = BackfillJob(dag=self.dag, **backfilljob_params)
+ self.tested_job.executor = executors.SequentialExecutor(
+ env=self.test_env)
+
+ self.tested_job.dag.clear()
+
+
+class SchedulerJobRunner(Runner):
+ """
+ Executes a SchedulerJob on the specified dagfiles, in its own tmp folder,
+ with all necessary Variables specified in context persisted so that the
+ child context has access to it.
+ """
+
+ def __init__(self, job_params, **kwargs):
+ super(SchedulerJobRunner, self).__init__(**kwargs)
+
+ self.tested_job = SchedulerJob(subdir=self.it_dag_folder, **job_params)
+ self.tested_job.executor = executors.LocalExecutor(env=self.test_env)
+
+
+#############
+# some useful post-check validation utils
+
+def get_existing_files_in_folder(folder):
+ return [f for f in os.listdir(folder)
+ if os.path.isfile(os.path.join(folder, f))]
+
+
+def validate_file_content(folder, filename, expected_content):
+ """
+ Raise an exception if the specified file does not have the expected
+ content, or returns silently otherwise
+ """
+ path = "{0}/{1}".format(folder, filename)
+ if not os.path.isfile(path):
+ folder_content = "\n".join(get_existing_files_in_folder(folder))
+ assert False, "File {path} does not exist. Here are the existing " \
+ "files :\n{folder_content}".format(**locals())
+ with open(path) as f:
+ content = f.read()
+ assert expected_content == content, \
+ "Unexpected content of {path}\n" \
+ " Expected content : {expected_content}\n" \
+ " Actual content : {content}".format(**locals())
+
+
+def validate_order(folder, early, late):
+ """
+ Raise an exception if the last modification of the early file happened
+ after the last modification of the late file
+ """
+ path_early = "{0}/{1}".format(folder, early)
+ path_late = "{0}/{1}".format(folder, late)
+
+ time_early = time.ctime(os.path.getmtime(path_early))
+ time_late = time.ctime(os.path.getmtime(path_late))
+ assert time_early < time_late, \
+ "The last modification time of {path_early} should be before the " \
+ "last modification time of {path_late} but it was not the case." \
+ "".format(**locals())
diff --git a/tests_dags/dags/bash_operator_ab.py
b/tests_dags/dags/bash_operator_ab.py
new file mode 100644
index 0000000000..2f975086af
--- /dev/null
+++ b/tests_dags/dags/bash_operator_ab.py
@@ -0,0 +1,39 @@
+from datetime import datetime
+
+from airflow import DAG
+from airflow.models import Variable
+from airflow.operators.bash_operator import BashOperator
+
+default_args = {
+ 'owner': 'unittest',
+ 'email_on_failure': False,
+ 'email_on_retry': False
+ }
+
+dag = DAG("tests_dags__bash_operator_ab",
+ start_date=datetime(2015, 1, 1),
+ end_date=datetime(2015, 1, 10),
+ default_args=default_args)
+
+# no default value for those: it is a bug to try to load this DAG without
+# preparing a tmp folder for it
+tempDir = Variable.get("unit_test_tmp_dir")
+
+b = BashOperator(
+ task_id='echo_b',
+ bash_command='echo success_b > %s/out.b.{{ ds }}.txt' % tempDir,
+ dag=dag)
+
+a = BashOperator(
+ task_id='echo_a',
+ bash_command='echo success_a > %s/out.a.{{ ds }}.txt' % tempDir,
+ dag=dag)
+
+direction = Variable.get(key="dependency_direction",
+ deserialize_json=True,
+ default_var="downstream")
+
+if direction == "downstream":
+ a.set_downstream(b)
+elif direction == "upstream":
+ b.set_upstream(a)
diff --git a/tests_dags/dags/bash_operator_ab_retries.py
b/tests_dags/dags/bash_operator_ab_retries.py
new file mode 100644
index 0000000000..91e92c19e3
--- /dev/null
+++ b/tests_dags/dags/bash_operator_ab_retries.py
@@ -0,0 +1,57 @@
+from datetime import datetime, timedelta
+
+from airflow import DAG
+from airflow.models import Variable
+from airflow.operators.bash_operator import BashOperator
+
+default_args = {
+ 'owner': 'unittest',
+ 'email_on_failure': False,
+ 'email_on_retry': False,
+ 'retries': 3,
+ 'retry_delay': timedelta(seconds=1)
+ }
+
+dag = DAG("tests_dags__bash_operator_ab_retries",
+ start_date=datetime(2015, 1, 1),
+ end_date=datetime(2015, 1, 10),
+ default_args=default_args)
+
+# no default value for those: it is a bug to try to load this DAG without
+# preparing a tmp folder for it
+tempDir = Variable.get("unit_test_tmp_dir")
+
+# retry a number of ( day % 3 ) times
+bash_command = """
+try={{ ti.try_number }}
+day=10#{{ macros.ds_format(ds, "%%Y-%%m-%%d", "%%d") }}
+if [ "$try" -ge $(( (($day-1)%%3)+1 )) ]
+ then
+ echo success_a > %s/out.a.{{ ds }}.txt
+ exit 0
+ else
+ exit 1
+fi
+""" % tempDir
+
+depends_on_past = Variable.get(key="depends_on_past",
+ deserialize_json=True,
+ default_var=False)
+
+wait_for_downstream = Variable.get(key="wait_for_downstream",
+ deserialize_json=True,
+ default_var=False)
+
+a = BashOperator(
+ task_id='echo_a',
+ bash_command=bash_command,
+ wait_for_downstream=wait_for_downstream,
+ dag=dag)
+
+b = BashOperator(
+ task_id='echo_b',
+ bash_command='echo success_b > %s/out.b.{{ ds }}.txt' % tempDir,
+ depends_on_past=depends_on_past,
+ dag=dag)
+
+a.set_downstream(b)
diff --git a/tests_dags/dags/bash_operator_once.py
b/tests_dags/dags/bash_operator_once.py
new file mode 100644
index 0000000000..4f256a5f6b
--- /dev/null
+++ b/tests_dags/dags/bash_operator_once.py
@@ -0,0 +1,25 @@
+from datetime import datetime
+
+from airflow import DAG
+from airflow.models import Variable
+from airflow.operators.bash_operator import BashOperator
+
+default_args = {
+ 'owner': 'unittest',
+ 'email_on_failure': False,
+ 'email_on_retry': False
+ }
+
+dag = DAG("tests_dags__bash_operator_once",
+ start_date=datetime(2015, 1, 1),
+ schedule_interval="@once",
+ default_args=default_args)
+
+# no default value for those: it is a bug to try to load this DAG without
+# preparing a tmp folder for it
+tempDir = Variable.get(key="unit_test_tmp_dir")
+
+BashOperator(
+ task_id='echo',
+ bash_command='echo success > %s/out.{{ ds }}.txt' % tempDir,
+ dag=dag)
diff --git a/tests_dags/dags/bash_operator_single.py
b/tests_dags/dags/bash_operator_single.py
new file mode 100644
index 0000000000..19898a7def
--- /dev/null
+++ b/tests_dags/dags/bash_operator_single.py
@@ -0,0 +1,24 @@
+from datetime import datetime
+
+from airflow import DAG
+from airflow.models import Variable
+from airflow.operators.bash_operator import BashOperator
+
+default_args = {
+ 'owner': 'unittest',
+ 'email_on_failure': False,
+ 'email_on_retry': False
+ }
+
+dag = DAG("tests_dags__bash_operator_single",
+ start_date=datetime(2015, 1, 1),
+ default_args=default_args)
+
+# no default value for those: it is a bug to try to load this DAG without
+# preparing a tmp folder for it
+tempDir = Variable.get(key="unit_test_tmp_dir")
+
+BashOperator(
+ task_id='echo',
+ bash_command='echo success > %s/out.{{ ds }}.txt' % tempDir,
+ dag=dag)
diff --git a/tests_dags/tests/__init__.py b/tests_dags/tests/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/tests_dags/tests/bash_operator_dag_tests.py
b/tests_dags/tests/bash_operator_dag_tests.py
new file mode 100644
index 0000000000..9d8aa4fb99
--- /dev/null
+++ b/tests_dags/tests/bash_operator_dag_tests.py
@@ -0,0 +1,234 @@
+"""
+ End to end tests of some simple DAGs composed of basic bash_operators
+"""
+import unittest
+from datetime import datetime
+
+from ..dag_tester import validate_file_content, validate_order, \
+ EndToEndBackfillJobTest, EndToEndSchedulerJobTest
+
+
+class BashOperatorSingleOneDay(EndToEndBackfillJobTest,
+ EndToEndSchedulerJobTest,
+ unittest.TestCase):
+ """
+ Tests that a bash operator executed over 1 day correctly produces 1 file.
+ This is validated with a BackfillJob as well as a SchedulerJob
+ """
+
+ def get_dag_file_names(self):
+ return ["bash_operator_single.py"]
+
+ def get_backfill_params(self):
+ return {"start_date": datetime(2015, 1, 1),
+ "end_date": datetime(2015, 1, 1)}
+
+ def get_schedulerjob_params(self):
+ return {"num_runs": 1}
+
+ def post_check(self, working_dir):
+ validate_file_content(working_dir, "out.2015-01-01.txt", "success\n")
+
+
+class BashOperatorOnce(EndToEndSchedulerJobTest, unittest.TestCase):
+ """
+ Tests that a bash operator executed over 1 day correctly produces 1 file.
+ This is validated with a SchedulerJob
+ """
+
+ def get_dag_file_names(self):
+ return ["bash_operator_once.py"]
+
+ # backfilljob does not support the schedule interval "once" (bug?)
+ # def get_backfill_params(self):
+ # return {"start_date": datetime(2015, 1, 1),
+ # "end_date": datetime(2015, 1, 1)}
+
+ def get_schedulerjob_params(self):
+ return {"num_runs": 1}
+
+ def post_check(self, working_dir):
+ formatted_date = datetime.now().strftime("%Y-%m-%d")
+ filename = "out.{}.txt".format(formatted_date)
+ validate_file_content(working_dir, filename, "success\n")
+
+
+class BashOperatorSingle3Days(EndToEndBackfillJobTest,
+ EndToEndSchedulerJobTest,
+ unittest.TestCase):
+ """
+ Tests that a bash operator executed over 3 days correctly produces 3 files.
+ """
+
+ dates = ["2015-01-01", "2015-01-02", "2015-01-03"]
+
+ def get_dag_file_names(self):
+ return ["bash_operator_single.py"]
+
+ def get_backfill_params(self):
+ return {"start_date": datetime(2015, 1, 1),
+ "end_date": datetime(2015, 1, 3)}
+
+ def get_schedulerjob_params(self):
+ return {"num_runs": 5}
+
+ def post_check(self, working_dir):
+ for date in self.dates:
+ out_file = "out.{date}.txt".format(**locals())
+ validate_file_content(working_dir, out_file, "success\n")
+
+
+class BashOperatorABDownStream(EndToEndBackfillJobTest,
+ EndToEndSchedulerJobTest,
+ unittest.TestCase):
+ """
+ Tests that two bash operators linked with .set_downstream that are executed
+ over 10 days each produce 10 files in a legal order.
+
+ * A and B
+ * B depends on A
+ """
+ dates = ["2015-01-01", "2015-01-02", "2015-01-03", "2015-01-04",
+ "2015-01-05", "2015-01-06", "2015-01-07", "2015-01-08",
+ "2015-01-09", "2015-01-10"]
+
+ file_a = "out.a.{date}.txt"
+ file_b = "out.b.{date}.txt"
+
+ def get_dag_file_names(self):
+ return ["bash_operator_ab.py"]
+
+ def get_backfill_params(self):
+ return {"start_date": datetime(2015, 1, 1),
+ "end_date": datetime(2015, 1, 10)}
+
+ def get_schedulerjob_params(self):
+ # number of necessary dag-runs
+ return {"num_runs": 30}
+
+ def get_context(self):
+ return {"dependency_direction": "downstream"}
+
+ def post_check(self, working_dir):
+ for date in self.dates:
+
+ file_a_date = self.file_a.format(**locals())
+ file_b_date = self.file_b.format(**locals())
+
+ validate_file_content(working_dir, file_a_date, "success_a\n")
+ validate_file_content(working_dir, file_b_date, "success_b\n")
+
+ validate_order(working_dir, file_a_date, file_b_date)
+
+
+class BashOperatorABUpstream(BashOperatorABDownStream, unittest.TestCase):
+ """
+ Tests that two bash operators linked with .set_upstream that are executed
+ over 10 days each produce 10 files in a legal order.
+
+ * A and B
+ * B depends on A
+ """
+
+ def get_context(self):
+ return {"dep_direction": "upstream"}
+
+
+class BashOperatorABRetries(BashOperatorABDownStream, unittest.TestCase):
+ """
+ Tests that two bash operators linked with .set_downstream that are executed
+ over 10 days each produce 10 files in a legal order. Failures and retries
+ introduce chaos.
+
+ * A and B
+ * B depends on A
+ * A has failures and retries
+ """
+ def get_dag_file_names(self):
+ return ["bash_operator_ab_retries.py"]
+
+ def get_context(self):
+ return {"depends_on_past": False,
+ "wait_for_downstream": False}
+
+
+class BashOperatorABDependsOnPast(BashOperatorABDownStream, unittest.TestCase):
+ """
+ Tests that two bash operators linked with .set_downstream and
+ depends_on_past that are executed over 10 days each produce 10 files in a
+ legal order. Failures and retries introduce chaos.
+
+ * A and B
+ * B depends on A
+ * A has failures and retries
+ * B depends on past
+ """
+ def get_dag_file_names(self):
+ return ["bash_operator_ab_retries.py"]
+
+ def get_context(self):
+ return {"depends_on_past": True,
+ "wait_for_downstream": False}
+
+ def get_schedulerjob_params(self):
+ return {"num_runs": 35}
+
+ def post_check(self, working_dir):
+
+ prev_file_b_date = None
+
+ for date in self.dates:
+ file_a_date = self.file_a.format(**locals())
+ file_b_date = self.file_b.format(**locals())
+
+ validate_file_content(working_dir, file_a_date, "success_a\n")
+ validate_file_content(working_dir, file_b_date, "success_b\n")
+
+ validate_order(working_dir, file_a_date, file_b_date)
+
+ if prev_file_b_date:
+ validate_order(working_dir, prev_file_b_date, file_b_date)
+
+ prev_file_b_date = file_b_date
+
+
+class BashOperatorABWaitForDownstream(BashOperatorABDownStream,
+ unittest.TestCase):
+ """
+ Tests that two bash operators linked with .set_downstream and
+ wait_for_downstream that are executed over 10 days each produce 10 files
+ in a legal order. Failures and retries introduce chaos.
+
+ * A and B
+ * B depends on A
+ * A has failures and retries
+ * A waits for downstream
+ """
+ def get_dag_file_names(self):
+ return ["bash_operator_ab_retries.py"]
+
+ def get_context(self):
+ return {"depends_on_past": False,
+ "wait_for_downstream": True}
+
+ def get_schedulerjob_params(self):
+ return {"num_runs": 70}
+
+ def post_check(self, working_dir):
+
+ prev_file_b_date = None
+
+ for date in self.dates:
+ file_a_date = self.file_a.format(**locals())
+ file_b_date = self.file_b.format(**locals())
+
+ validate_file_content(working_dir, file_a_date, "success_a\n")
+ validate_file_content(working_dir, file_b_date, "success_b\n")
+
+ validate_order(working_dir, file_a_date, file_b_date)
+
+ if prev_file_b_date:
+ validate_order(working_dir, prev_file_b_date, file_a_date)
+
+ prev_file_b_date = file_b_date
+
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services