Hi Sebastian,

I think this is the right direction, especially if we can make the migration 
backwards compatible.

One small thing: task callbacks already run on workers in some paths today, so 
this is really more about Dag-level callbacks and the remaining failure/retry 
task callback paths.

I also think we should avoid using the Triggerer to run these callbacks to 
avoid callbacks that could block the event loop?

Thanks for starting the discussion.

On 2026/03/20 13:40:36 Sebastian Daum wrote:
> Hello community,
> 
> I'd like to discuss moving Dag-level and task-level callbacks from the Dag
> Processor to either the Worker or Triggerer.
> 
> 
> Background
> 
> Airflow's new ExecutorCallback framework, introduced with Deadline Alerts
> [1], provides a flexible approach for running callbacks. Deadline Callbacks
> can now run within the Triggerer (for async code) or on a Worker as
> scheduled callbacks (for synchronous code).
> 
> This new Callback Framework opens up the possibility of moving all Dag
> callbacks and task callbacks from the Dag Processor to the Worker and
> Triggerer. This change would give callbacks the same level of isolation and
> support as standard task workloads while freeing the Dag Processor from
> callback execution responsibilities.
> 
> This topic has appeared in previous devlist discussions, and there's an
> open issue covering at least Dag-level callbacks [2]. AIP-43 (Dag Processor
> separation) explored running task callbacks in a Worker [3], though it also
> highlighted potential downsides, particularly for the KubernetesExecutor
> where spinning up a new pod for callback execution creates overhead
> compared to using the existing Dag Processor.
> 
> 
> Current Implementation
> 
> Currently, both the Scheduler and Triggerer create CallbackRequests
> (DagCallbackRequest | TaskCallbackRequest | EmailRequest) and send them to
> the database using DbCallbackRequest/DagProcessorCallback.
> 
> The DagFileProcessorManager fetches stored DbCallbackRequests, deserializes
> them, and sends them to the file_queue along with the callback request. A
> DagFileProcessorProcess then picks them up and executes them.
> 
> 
> Proposed Changes
> 
> Here are the key steps needed to implement this change:
> 
> 1. Add new Airflow configuration: Add a new configuration option (e.g.,
> use_worker_callbacks or similar) that allows users to opt into the new
> callback execution behavior while maintaining the existing Dag Processor
> approach as the default.
> 
> 2. Callback Wrapping (Task-SDK): Wrap Dag and Task Callables in
> SyncCallable or AsyncCallable if not already provided with the new type.
> This maintains backwards compatibility and preserves the current Dag
> authoring experience.
> 
> 3. Serialization Updates: Adjust Dag serialization to include Dag/Task
> on_*_callback references. Like Deadline Callbacks, we would serialize only
> the reference to the callable, not the callable itself.
> 
> 4. Callback Consolidation: Combine the different DagProcessorCallback
> implementations with ExecutorCallbacks and TriggererCallbacks.
> 
> 5. Improved Typing: Enhance typing and class separation within the Callback
> data structure. Currently, we differentiate between CallbackType:
> Triggerer, Executor, and DAG Processor. It makes sense to implement better
> type discrimination for DagCallback, TaskCallback, and potentially more
> specific types like DagSuccessCallback, DagErrorCallback,
> TaskSuccessCallback, EmailCallback, etc. This might warrant adding a new
> table column and reconsidering the CallbackType field structure.
> 
> 6. Component Updates: Modify the Scheduler and Triggerer to send
> ExecutorCallbacks or TriggererCallbacks and store them in the database
> instead of sending CallbackRequest via DatabaseSink.
> 
> 7. Integration: Leverage the existing logic for running Deadline Callbacks.
> However, if we decide to run callbacks on workers, we'll need to determine
> how to handle and prioritize callbacks versus standard task workloads, as
> well as different callback types (Deadline Alerts, Task Level Callbacks vs.
> Dag Level Callbacks) in the scheduler. This connects to ongoing discussions
> about 'Deadline Callback queuing and priority' [4].
> 
> 8. Deprecation: Deprecate the existing
> DagCallbackRequest/TaskCallbackRequest/CallbackSink process.
> 
> I'd appreciate your thoughts on this proposal, particularly around:
> 
> - Any concerns about the migration path
> 
> - The prioritization strategy for callbacks vs. standard tasks
> 
> 
> Thanks for your consideration!
> 
> Sebastian
> 
> 
> P.S. Many thanks to ferruzzi and shivaam for the discussions and feedback.
> 
> 
> --------------------------------------------------------
> 
> [1] ExecutorCallback Framework
> 
> Executor Synchronous callback workload PR #61153
> <https://github.com/apache/airflow/pull/61153>
> 
> [2] Move dag-level callbacks to worker
> 
> Issue #44354 <https://github.com/apache/airflow/issues/44354>
> 
> [3] Dag Processor separation
> 
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation
> 
> [4] AIP-86 Deadline Callback queuing and priority
> 
> https://lists.apache.org/thread/85zydrb5sc61gmgktm991jmjqvb78x7w
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to