Thanks Amogh and Jarek: +1, this makes sense and is a better approach to take. Letting the operator own its execution and just subclass ResumableJobMixin is cleaner than what https://github.com/apache/airflow/pull/68936 does today.
Right now the contract is duplicated in the task runner, and the proposed gets rid of the special case instead of trying to share it. The accessor is small. trigger_dag_run() would mirror the ti.get_dagrun_state() we already have, hitting the same execution API endpoint the runner uses today with the same token, so no new authz. It basically finishes the AIP-72 migration that added DagRunTriggerException as a stopgap (https://github.com/apache/airflow/pull/47882). Happy to do the POC, and rework #68936 onto the accessor then link it here. Would the main thing be checking is back-compat? - execute() currently raises on every Airflow 3 run, not just the ones that wait, so in the the POC we want to prove keeping that behavior identical? Best, Stefan > On Jun 28, 2026, at 10:38 PM, Jarek Potiuk <[email protected]> wrote: > > Sounds reasonable - maybe a quick POC would be good to show how it could > look like and allowed to assess if there are some back-compat concerns. > > On Mon, Jun 29, 2026 at 7:27 AM Amogh Desai <[email protected]> wrote: > >> Now that Airflow 3.3 will introduce ResumableJobMixin to make synchronous >> submit and poll operators crash-safe, I wanted to start a discussion. >> >> I came across https://github.com/apache/airflow/pull/68936, which brings >> crash recovery/durable exeucution to TriggerDagRunOperator, but it's a case >> which cannot use the mixin. On Airflow 3 the operator's execute() raises >> *DagRunTriggerException*; the actual trigger and the wait loop run in the >> task runner >> (_handle_trigger_dag_run). So the PR reimplements the mixin's three state >> contract (succeeded / reconnect / resubmit) and persist-before-poll by hand >> in the >> runner. This means that we will now have two copies of the same contract >> that can drift. >> >> It cannot use the mixin's contract because it only offloads its execution >> to task runner, and doesn't own it. For more context, the poll primitive >> already exists as a >> user-callable accessor (ti.get_dagrun_state()). The only missing primitive >> is triggering a dag run. >> >> I propose that we revisit this portion. I propose we introduce an execution >> API accessor in task sdk for triggering dagruns, which will be the >> counterpart to the existing >> ti.get_dagrun_state(). It routes through the same execution endpoint the >> runner already uses, so no new authz surface is changed. >> >> This proposal does not expand what task code can do, it just gives a first >> class way to do something already possible. A task JWT can already trigger >> dag runs through the >> Execution API today: that is exactly what DagRunTriggerException does under >> the hood. The proposed *ti.trigger_dag_run()* accessor routes through the >> same endpoint with >> the same scoped token, so the boundary is identical, just reached through a >> clean, supported API instead of an exception side channel. >> >> Happy to hear thoughts from folks. >> >> >> Thanks & Regards, >> Amogh Desai >>
