Hi Sebastian, I think this is the right direction, especially if we can make the migration backwards compatible.
One small thing: task callbacks already run on workers in some paths today, so this is really more about Dag-level callbacks and the remaining failure/retry task callback paths. I also think we should avoid using the Triggerer to run these callbacks to avoid callbacks that could block the event loop? Thanks for starting the discussion. On 2026/03/20 13:40:36 Sebastian Daum wrote: > Hello community, > > I'd like to discuss moving Dag-level and task-level callbacks from the Dag > Processor to either the Worker or Triggerer. > > > Background > > Airflow's new ExecutorCallback framework, introduced with Deadline Alerts > [1], provides a flexible approach for running callbacks. Deadline Callbacks > can now run within the Triggerer (for async code) or on a Worker as > scheduled callbacks (for synchronous code). > > This new Callback Framework opens up the possibility of moving all Dag > callbacks and task callbacks from the Dag Processor to the Worker and > Triggerer. This change would give callbacks the same level of isolation and > support as standard task workloads while freeing the Dag Processor from > callback execution responsibilities. > > This topic has appeared in previous devlist discussions, and there's an > open issue covering at least Dag-level callbacks [2]. AIP-43 (Dag Processor > separation) explored running task callbacks in a Worker [3], though it also > highlighted potential downsides, particularly for the KubernetesExecutor > where spinning up a new pod for callback execution creates overhead > compared to using the existing Dag Processor. > > > Current Implementation > > Currently, both the Scheduler and Triggerer create CallbackRequests > (DagCallbackRequest | TaskCallbackRequest | EmailRequest) and send them to > the database using DbCallbackRequest/DagProcessorCallback. > > The DagFileProcessorManager fetches stored DbCallbackRequests, deserializes > them, and sends them to the file_queue along with the callback request. A > DagFileProcessorProcess then picks them up and executes them. > > > Proposed Changes > > Here are the key steps needed to implement this change: > > 1. Add new Airflow configuration: Add a new configuration option (e.g., > use_worker_callbacks or similar) that allows users to opt into the new > callback execution behavior while maintaining the existing Dag Processor > approach as the default. > > 2. Callback Wrapping (Task-SDK): Wrap Dag and Task Callables in > SyncCallable or AsyncCallable if not already provided with the new type. > This maintains backwards compatibility and preserves the current Dag > authoring experience. > > 3. Serialization Updates: Adjust Dag serialization to include Dag/Task > on_*_callback references. Like Deadline Callbacks, we would serialize only > the reference to the callable, not the callable itself. > > 4. Callback Consolidation: Combine the different DagProcessorCallback > implementations with ExecutorCallbacks and TriggererCallbacks. > > 5. Improved Typing: Enhance typing and class separation within the Callback > data structure. Currently, we differentiate between CallbackType: > Triggerer, Executor, and DAG Processor. It makes sense to implement better > type discrimination for DagCallback, TaskCallback, and potentially more > specific types like DagSuccessCallback, DagErrorCallback, > TaskSuccessCallback, EmailCallback, etc. This might warrant adding a new > table column and reconsidering the CallbackType field structure. > > 6. Component Updates: Modify the Scheduler and Triggerer to send > ExecutorCallbacks or TriggererCallbacks and store them in the database > instead of sending CallbackRequest via DatabaseSink. > > 7. Integration: Leverage the existing logic for running Deadline Callbacks. > However, if we decide to run callbacks on workers, we'll need to determine > how to handle and prioritize callbacks versus standard task workloads, as > well as different callback types (Deadline Alerts, Task Level Callbacks vs. > Dag Level Callbacks) in the scheduler. This connects to ongoing discussions > about 'Deadline Callback queuing and priority' [4]. > > 8. Deprecation: Deprecate the existing > DagCallbackRequest/TaskCallbackRequest/CallbackSink process. > > I'd appreciate your thoughts on this proposal, particularly around: > > - Any concerns about the migration path > > - The prioritization strategy for callbacks vs. standard tasks > > > Thanks for your consideration! > > Sebastian > > > P.S. Many thanks to ferruzzi and shivaam for the discussions and feedback. > > > -------------------------------------------------------- > > [1] ExecutorCallback Framework > > Executor Synchronous callback workload PR #61153 > <https://github.com/apache/airflow/pull/61153> > > [2] Move dag-level callbacks to worker > > Issue #44354 <https://github.com/apache/airflow/issues/44354> > > [3] Dag Processor separation > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation > > [4] AIP-86 Deadline Callback queuing and priority > > https://lists.apache.org/thread/85zydrb5sc61gmgktm991jmjqvb78x7w > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
