Greetings!

During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I
probably found skeletons in the closet.

Let's start from the beginning, initially I got this warning

airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY columns
added implicitly due to DISTINCT is deprecated and will be removed in
SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to
explicitly include the appropriate columns in the columns clause
(Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')

"OK let's fix it!", I thought at first and started to investigate
RenderedTaskInstanceFields model

*Skeleton #1:*

When I first time look on the code and comments it got me to thinking that
part which keep only latest N Rendered Task Fields potentially could lead
different performance degradation (Locks, Dead Locks, Data Bloating): see
code
https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245

Also this historical part (from Airflow 1.10.10) generate this SQL
Statement (pg backend)

DELETE FROM rendered_task_instance_fields
WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
  AND rendered_task_instance_fields.task_id = %(task_id_1) s
  AND (
    (
      rendered_task_instance_fields.dag_id,
      rendered_task_instance_fields.task_id,
      rendered_task_instance_fields.run_id
    ) NOT IN (
      SELECT
        anon_1.dag_id,
        anon_1.task_id,
        anon_1.run_id
      FROM
        (
          SELECT DISTINCT
            rendered_task_instance_fields.dag_id AS dag_id,
            rendered_task_instance_fields.task_id AS task_id,
            rendered_task_instance_fields.run_id AS run_id,
            dag_run.execution_date AS execution_date
          FROM rendered_task_instance_fields
            JOIN dag_run ON rendered_task_instance_fields.dag_id =
dag_run.dag_id
            AND rendered_task_instance_fields.run_id = dag_run.run_id
          WHERE
            rendered_task_instance_fields.dag_id = %(dag_id_2) s
            AND rendered_task_instance_fields.task_id = %(task_id_2) s
          ORDER BY
            dag_run.execution_date DESC
          limit %(param_1) s
        ) AS anon_1
    )
  )

Which is especially not effective in PostgreSQL. When IN SUBQUERY could be
easily transform internaly into SEMI-JOIN (aka EXISTS clause), but it is
not working for NOT IN SUBQUERY because it is not transformed into ANTI
JOIN (aka NOT EXISTS clause) even if it possible, see:
https://commitfest.postgresql.org/27/2023/

I didn't do any performance benchmarks yet but I guess if users set
AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than default
30 it could improve performance and reduce number of DeadLocks, however the
table size will increase but I think we don't do any maintenance job for
other tables.

Potentially it is a good idea to deprecate this option and recommend for
users to set it to 0? WDYT? Maybe someone has already tried or investigated
this?


*Skeleton #2:*

We have a k8s_pod_yaml field which is exclusively used by K8S executors.

Should we also decouple this field as part of AIP-51?

----
Best Wishes
*Andrey Anshin*

Reply via email to