There are two options I think:

1) rely on Breeze as execution environment for System Tests and add a flag
to start triggerer with system tests
2) add a pytest fixture that will do it automatically when system tests are
run via pytests (that could be done via tests/system/conftest.py)

I believe most of the system tests out there are run with Breeze. Breeze
gives a stable, reproducible execution environment for various tests, it's
been designed from the ground up to provide a "reproducible test
environment" - that was the main "function" of Breeze from the start. So we
could add a change to run triggerer when starting tests with breeze:
`--run-triggerer=True`. Plus we could describe, that you can also run the
deferrable system tests in regular "local" venv as long as you run
triggerer manually to support it.

But it does not have to be like it. It's also possible that  we
could simply have a pytest fixture that will do the job (based on what
Daniel proposed).This way, whenever we run the system tests as "pytest"
test, we should get triggerrer up-and-running and  torn down after. It will
not support running tests via Airflow UI or regular python execution of the
test files, but maybe it's a good idea that does not rely on docker/breeze.


J.

On Sat, Jun 3, 2023 at 8:45 AM Daniel Standish
<daniel.stand...@astronomer.io.invalid> wrote:

> I just took a look and it turns out that DebugExecutor works fine with
> triggerer you just need to have one running.
>
> You could run one in a subprocess.  I experimented with refactoring the
> subprocess hook for this purpose (so you can start the subprocess
> asynchronously) and then ran this dag with debug executor and it worked.
>
> from __future__ import annotations
>
> import os
>
> os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor"
>
> from datetime import timedelta
>
> import pendulum
>
> from airflow.models import DAG
> from airflow.sensors.time_delta import TimeDeltaSensorAsync
>
> with DAG(
>     dag_id="example_sensors",
>     schedule="@once",
>     start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
> ) as dag:
>     t0a = TimeDeltaSensorAsync(task_id="wait_some_seconds_async",
> delta=timedelta(seconds=2))
>
> from airflow.hooks.subprocess import SubprocessHook
>
> # let's get triggerer running in a subprocess
> hook = SubprocessHook()
> triggerer_process = hook.start_process(command=["airflow",
> "triggerer"], cwd="/tmp")
>
> # now let's run the dag
> dag.clear()
> dag.run()
>
> # now the dag has completed
>
> # kill triggerer
> hook.send_sigterm(triggerer_process)
> result = hook.process_output(subprocess=triggerer_process)
> assert result.exit_code == 0
>
> # now triggerer has exited and our system test is done
>
>
> So, something like this could be done.
>

Reply via email to