Now that Airflow 3.3 will introduce ResumableJobMixin to make synchronous submit and poll operators crash-safe, I wanted to start a discussion.
I came across https://github.com/apache/airflow/pull/68936, which brings crash recovery/durable exeucution to TriggerDagRunOperator, but it's a case which cannot use the mixin. On Airflow 3 the operator's execute() raises *DagRunTriggerException*; the actual trigger and the wait loop run in the task runner (_handle_trigger_dag_run). So the PR reimplements the mixin's three state contract (succeeded / reconnect / resubmit) and persist-before-poll by hand in the runner. This means that we will now have two copies of the same contract that can drift. It cannot use the mixin's contract because it only offloads its execution to task runner, and doesn't own it. For more context, the poll primitive already exists as a user-callable accessor (ti.get_dagrun_state()). The only missing primitive is triggering a dag run. I propose that we revisit this portion. I propose we introduce an execution API accessor in task sdk for triggering dagruns, which will be the counterpart to the existing ti.get_dagrun_state(). It routes through the same execution endpoint the runner already uses, so no new authz surface is changed. This proposal does not expand what task code can do, it just gives a first class way to do something already possible. A task JWT can already trigger dag runs through the Execution API today: that is exactly what DagRunTriggerException does under the hood. The proposed *ti.trigger_dag_run()* accessor routes through the same endpoint with the same scoped token, so the boundary is identical, just reached through a clean, supported API instead of an exception side channel. Happy to hear thoughts from folks. Thanks & Regards, Amogh Desai
