There are two options I think: 1) rely on Breeze as execution environment for System Tests and add a flag to start triggerer with system tests 2) add a pytest fixture that will do it automatically when system tests are run via pytests (that could be done via tests/system/conftest.py)
I believe most of the system tests out there are run with Breeze. Breeze gives a stable, reproducible execution environment for various tests, it's been designed from the ground up to provide a "reproducible test environment" - that was the main "function" of Breeze from the start. So we could add a change to run triggerer when starting tests with breeze: `--run-triggerer=True`. Plus we could describe, that you can also run the deferrable system tests in regular "local" venv as long as you run triggerer manually to support it. But it does not have to be like it. It's also possible that we could simply have a pytest fixture that will do the job (based on what Daniel proposed).This way, whenever we run the system tests as "pytest" test, we should get triggerrer up-and-running and torn down after. It will not support running tests via Airflow UI or regular python execution of the test files, but maybe it's a good idea that does not rely on docker/breeze. J. On Sat, Jun 3, 2023 at 8:45 AM Daniel Standish <daniel.stand...@astronomer.io.invalid> wrote: > I just took a look and it turns out that DebugExecutor works fine with > triggerer you just need to have one running. > > You could run one in a subprocess. I experimented with refactoring the > subprocess hook for this purpose (so you can start the subprocess > asynchronously) and then ran this dag with debug executor and it worked. > > from __future__ import annotations > > import os > > os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor" > > from datetime import timedelta > > import pendulum > > from airflow.models import DAG > from airflow.sensors.time_delta import TimeDeltaSensorAsync > > with DAG( > dag_id="example_sensors", > schedule="@once", > start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), > ) as dag: > t0a = TimeDeltaSensorAsync(task_id="wait_some_seconds_async", > delta=timedelta(seconds=2)) > > from airflow.hooks.subprocess import SubprocessHook > > # let's get triggerer running in a subprocess > hook = SubprocessHook() > triggerer_process = hook.start_process(command=["airflow", > "triggerer"], cwd="/tmp") > > # now let's run the dag > dag.clear() > dag.run() > > # now the dag has completed > > # kill triggerer > hook.send_sigterm(triggerer_process) > result = hook.process_output(subprocess=triggerer_process) > assert result.exit_code == 0 > > # now triggerer has exited and our system test is done > > > So, something like this could be done. >