I have been looking into Airflow metadata database level bottlenecks. In my analysis
so far, I observed that change of dag hash at run time for any reason has a
significant negative impact on the database because it blocks dag run updates for
last scheduling resulting into higher lock waits and in many instances lock wait
timeouts. I recently opened an issue #53957 showing one instance where dag hash
changes just because template field order is different and I also suggested a fix
with a PR #54041.Troubleshooting the lock waits further, I have come across a
scenario that is rare but it is resulting into unnecessary dag hash change. This, in
my opinion, needs community experts’ attention and review. The details are below.
Airflow version: 2.x or (also 3.x based on the code) Airflow config: Executor: k8
AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION: False
AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK: 0 AIRFLOW__CORE__PARALLELISM: 250
DAG: any DAG with dag Params and multiple retries for tasks with a retry callback
Steps: 1. Trigger DAG with overriding the param default value 2. Create a zombie task
in the run e.g. remove the executor pod while the task is running 3. Observe the
scheduler log (enable debug if possible) and serialized dag table, dag hash is
updated with the new value. If you compare with the old serialized value in the data
column, you will see that the difference is the new serialized value now has param
values from the run that had zombie task failure 4. This results into an additional
dag run update statement with last scheduling update statement and takes longer to
execute when you have multiple tasks executing simultaneously. This multiplies
further if a DAG run has multiple zombie task failures at the same time from
different runs with different Param valuesCode analysis: (I have looked at the code
for tag 2.10.5 because I am using that version in production but latest code appears
to be similar in logic)Based on the code analysis, I see that DAG processor in the
scheduler executes callbacks before serialization of the DAG in processor.py ->
process_file function which calls taskinstance.py -> handle_failure function that
ends up calling get_template_context having process_params function call updating
params value to the values from DAG run conf. This causes param default value to
change in the serialized DAG and change in the DAG hash value It appears that
handle_failure is being called in other scenarios where updating params values to the
ones from DAG run conf may be required but in this scenario it does not seem to be
required. So far I am unable to find any ways to resolve this problem I hope this
information helps to understand the problem.