Hey all,

Airflow 3.2 introduces ExecutorCallbacks, allowing executors to run synchronous 
callbacks (starting with Deadline Alerts). LocalExecutor and CeleryExecutor 
support is done and tested and I'm looking for volunteers to help implement the 
remaining executors.  There's also an opportunity to help migrate Airflow's 
existing DAG/task callbacks onto this same infrastructure.


***********************************************
WHAT ARE EXECUTOR CALLBACKS?
***********************************************

Deadline Alerts allow users to set a deadline on a DAG: "if this DAG isn't done 
by this time, run this callback."  If the callback is asynchronous code, it 
goes to the Triggerer to execute.  If it's synchronous code then it now (as of 
PR #61153 [1]) gets sent to the executor and run in the worker, just like a Dag 
task.  The scheduler queues them, the executor picks them up, and a supervised 
subprocess imports and runs the callback function (that last part about the 
supervisor is still in PR [2]).


******************
WHAT'S DONE
******************

  - Core framework is all done in PR #61153 [1].  There is now a 
ExecutorWorkload which encompasses ExecuteTask and ExecuteCallback. Some work 
is still in progress, but Callbacks have almost all the same features and 
"rights" as an executor Task.
  - LocalExecutor is fully supported and tested, it'll launch in 3.2.  
Callbacks are sent to worker processes alongside tasks, with supervised 
subprocess execution coming [2].
  - CeleryExecutor is fully supported and tested and will also launch in 3.2.  
Callbacks are sent to Celery workers as workloads, executed via 
supervise_callback.

Those implementations serve as reference examples and should give a pretty good 
idea how to handle the remaining work.


******************************
WHERE YOU CAN HELP
******************************


****************************
FIVE SMALLISH TASKS
****************************

Each of the five remaining executors needs:
  1. Set supports_callbacks = True on the executor class.

  2. Handle ExecuteCallback workloads in _process_workloads().  Right now these 
executors either raise RuntimeError or NotImplementedError when they see a 
non-Task workload. We need to implement sending the callback for execution 
using supervise_callback() from airflow.sdk.execution_time.callback_supervisor.

  3. Add tests covering the callback execution path.

The specific executors that need work:

  - Kubernetes Executor (CNCF provider): Needs a way to run supervise_callback 
in a pod. May need a lightweight callback pod template or reuse the existing 
worker pod mechanism.  I don't know enough about K8S to really call how 
complicated this will be and I can really use someone with K8S experience to 
take this one on.  I can definitely help understand the existing code, but I 
won't be much help on the actual change implementation.

  - ECS Executor (Amazon provider): Similar pattern, needs to send the callback 
to an ECS task

  - Batch Executor (Amazon provider): Send the callback to a Batch job.

  - Lambda Executor (Amazon provider): Lambda is a simpler execution model, but 
callback import/execution in the Lambda runtime may be a wrinkle.

  - EdgeExecutor (Edge3 provider): Send the callback to an Edge worker.  Like 
K8S, this is another I don't have any real experience with so I can help with 
onboarding but may not be as much use on the actual implementation.

Note: The hybrid executors (CeleryKubernetesExecutor, LocalKubernetesExecutor) 
are deprecated and do not need to be updated.


*************************
ONE LARGER TASK
*************************

In addition to those smaller tasks, there is one bigger one on my "Some Day..." 
list which is tangentially related.  One side-goal of the ExecutorCallbacks 
infrastructure is to unify the other callbacks onto the same framework.  This 
isn't related to DeadlineAlerts and I still have a pile of work to do on that 
feature, so perhaps I can entice someone to take on the work for that?

Airflow's DAG-level and task-level callbacks (on_success_callback, 
on_failure_callback, etc) currently flow ((current.... flow?  I'll see myself 
out...)) through a completely separate path.  They are packaged as 
DagCallbackRequest/TaskCallbackRequest and executed in the Dag Processor, which 
has several downsides.  Migrating them to the new (synchronous) 
ExecutorCallback and/or AsyncCallback framework will let us run them in the 
worker or triggerer and let us give them the same level of isolation and 
support as a task which the Deadline callbacks now get.

The groundwork is already laid with the new ExecutorCallback framework and the 
CallbackFetchMethod enum already has a DAG_ATTRIBUTE variant stubbed out for 
exactly this purpose. The migration would involve:

  1. When the scheduler/DagRun detects a callback is needed (e.g., DAG 
success/failure, task retry/failure), instead of creating a DagCallbackRequest 
and sending it to the Dag Processor, create a Callback model record and let the 
scheduler queue it as an ExecuteCallback workload.

  2. Extend the CallbackSubprocess (or supervise_callback maybe??) to support 
the DAG_ATTRIBUTE fetch method and resolve the callback from the DAG object's 
attributes (e.g., dag.on_failure_callback) rather than from an import path.

  3. Migrate the callback execution out of dag_processing/processor.py (the 
_execute_callbacks and related code paths) and into the executor path.

  4. Update or deprecate the old 
DagCallbackRequest/TaskCallbackRequest/CallbackSink infrastructure once the new 
path is proven.

This is a larger piece of work that I haven't planned too deeply, so it's maybe 
not a "good first task" kind of job, but it would make all callback execution 
consistent, observable, and logged regardless of which executor you use and it 
would remove callback execution load from the Dag Processor. If this interests 
you, I'd love to collaborate on the design and help however I can.


*****************************
HOW TO GET STARTED
*****************************

  1. Look at the LocalExecutor implementation in 
airflow-core/src/airflow/executors/local_executor.py, specifically the 
_execute_workload function and the ExecuteCallback branch. This is the simplest 
reference.

  2. Look at the CeleryExecutor implementation in providers/celery/ for an 
example of a remote/distributed executor handling callbacks.

  3. The key function you'll call is supervise_callback() from 
airflow.sdk.execution_time.callback_supervisor.  It takes a callback id, 
callback_path, callback_kwargs, and optional log_path.  It forks a supervised 
subprocess, and returns an exit code.

  4. For remote executors (K8s, ECS, Batch, Lambda), the challenge is making 
sure supervise_callback runs on the remote worker side, not in the scheduler. 
The pattern will be similar to how ExecuteTask workloads are handled, but will 
be slightly different for each executor.


***************************
HOW TO VOLUNTEER
***************************

Reply to this thread (or the corresponding GitHub issue) with which executor or 
work item you'd like to take on. One executor per volunteer is ideal so we can 
parallelize the work. I'm happy to review PRs and answer questions.  I'll be 
working on related features (Task-level Deadlines, Asset-based and Event-based 
Deadlines) in parallel.

Even if you're not an executor expert, this feels like a pretty well-scoped 
contribution with clear examples to follow, an active guide (that's me!), and 
it shouldn't be terribly complicated.  It would be wonderful if they can all be 
done and ready for the next wave of provider package releases so the rest of 
the executors can follow quickly behind the first two, if not at the same time.

Thanks!

[1] https://github.com/apache/airflow/pull/61153
[2] https://github.com/apache/airflow/pull/62645

Reply via email to