>I think we should not deprecate it though, but find a more efficient
way of deleting the old keys. I think we could slightly denormalize
RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
to the RenderedTaskInstance table and that will be enough to optimise
it.

yeah I agree with that

On Mon, 30 Jan 2023 at 19:51, Jarek Potiuk <ja...@potiuk.com> wrote:

> I think there is a good reason to clean those up automatically.
> rendered task instance fields are almost arbitrary in size. If we try
> to keep all historical values there by default, there are numerous
> cases it will grow very fast - far, far too quickly.
>
> And I am not worried at all about locks on this table if we do it the
> way I described it and it uses the indexes. The contention this way
> might only be between the two deleting tasks. and with the query I
> proposed, they will only last for a short time - the index will be
> locked when two DELETES  or SELECT DISTINCT - which should both be
> fast.
>
>
> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <andrey.ans...@taragol.is>
> wrote:
> >
> > I guess two things involved to reduce performance on this query through
> the time: Dynamic Task Mapping and run_id instead of execution date.
> >
> > I still personally think that changing the default value from 30 to 0
> might improve performance of multiple concurrent tasks, just because this
> query does not run and there are no locks on multiple records/pages.
> >
> > I do not have any proof (yet?) other than simple DAGs. I think that
> there is some cross point exists when keeping this table growth worse
> rather than cleanup for each TI run. But users have ability to cleanup
> table by execute airflow db clean which should improve performance again
> >
> > And also there is interesting behavior with this query: if user already
> have more that value specified by
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill
> than rendered templates not written to table (or may be inserted and after
> that immediately deleted), the same is valid for cleanup old tasks.
> >
> > ----
> > Best Wishes
> > Andrey Anshin
> >
> >
> >
> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <ja...@potiuk.com> wrote:
> >>
> >> Yep. Agree this is not an efficient query and dynamic task mapping
> >> makes the effect much worse. Generally speaking, selecting "what
> >> should be left" and then deleting stuff where the key is "not in" is
> >> never an efficient way of running an sql query.  And the query not
> >> using index at all makes it rather terrible.
> >>
> >> I think we should not deprecate it though, but find a more efficient
> >> way of deleting the old keys. I think we could slightly denormalize
> >> RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE
> >> to the RenderedTaskInstance table and that will be enough to optimise
> >> it.
> >>
> >> Then we could have either:
> >>
> >> * a composite B-TREE indexed (non-unique) index on DAG_ID, TASK_ID,
> >> RUN_ID_EXECUTION_DATE
> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate
> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE
> >>
> >> Probably the latter is better as I am not sure how < , > comparison
> >> looks like for composite B-TREE indexes when char + date columns are
> >> mixed. Also we could have hit the infamous MySQL index key length
> >> limit.
> >>
> >> Then deletion process would look roughly like:
> >>
> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM
> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY
> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET
> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK>
> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>,
> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date
> >>
> >> I believe that would be fast, and it would use the B-TREE index
> >> features nicely (ordering support)
> >>
> >> J
> >>
> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <andrey.ans...@taragol.is>
> wrote:
> >> >
> >> > First of all I want to highlight that this approach I guess worked
> well until Dynamic Task Mappings introduced.
> >> >
> >> > > The main reason for adding that cleanup was -- if you don't do
> that, you will have many rows, similar to the TaskInstance table
> >> >
> >> > The problem itself is not how big your table/indexes, rather then
> what kind of operation you run.
> >> >
> >> > > Do you have any data for locks or performance degradation?
> >> >
> >> > In this case if we try to clean up rendered_task_instance_fields
> table when a new TI is created/cleared we make almost two full/sequential
> scans (note: need to check) against the table without any index usage, so
> we pay here a couple times:
> >> > 1. We scan without indexes - not all parts of the composite key are
> included to query, plus we need to filter everything except 30 records with
> order and distinct
> >> > 2. After that we make another full scan for find 1 record or map_size
> records
> >> >
> >> > And I guess the situation becomes worse if you have a lot of tasks,
> even if we have a small table, we need to do ineffective operations.
> >> >
> >> > That how looks like Query Plan (please note without commit
> transaction DELETE operation doesn't have all information):
> https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b
> >> >
> >> > In case if we do not clean up the table, we only use these operations:
> >> > 1. SELECT single record by index
> >> > 2. INSERT new record
> >> > 3. DELETE old record(s), which were found by index.
> >> >
> >> > I have not done any real tests yet, only synthetic DAGs (so we should
> not consider to use any findings as totally truth):
> https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480
> >> > DAG with parallel tasks: degradation up to 2-3 times
> >> > DAG with single map tasks: degradation up to 7-10 times
> >> >
> >> > I have a plan for more complex and more close to real use cases with
> Database which do not have network latency almost 0 as I have in my local.
> >> > But I will not refuse if someone also does their tests with
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value.
> >> >
> >> > About deadlock we know that it exists at least in MySQL:
> https://github.com/apache/airflow/pull/18616
> >> >
> >> > > And the larger tables create problems during database migrations.
> >> >
> >> > That is a very good point, so if we found that problem only related
> to migrations we could:
> >> > 1. Cleanup this table in migration
> >> > 2. Add cli command to airflow db which could cleanup only rendered
> fields, so it would be user's choice cleanup or not before migration, do
> periodical maintenance or not
> >> >
> >> >
> >> > ----
> >> > Best Wishes
> >> > Andrey Anshin
> >> >
> >> >
> >> >
> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <kaxiln...@gmail.com> wrote:
> >> >>>
> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >>
> >> >>
> >> >> The main reason for adding that cleanup was -- if you don't do that,
> you will have many rows, similar to the TaskInstance table. And the
> RenderedTIFields were mainly added for checking rendered TI fields on the
> Webserver only because after DAG Serialization, the webserver won't have
> access to DAG files.
> >> >>
> >> >> And the larger tables create problems during database migrations.
> >> >>
> >> >> Do you have any data for locks or performance degradation?
> >> >>
> >> >>
> >> >>
> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <
> andrey.ans...@taragol.is> wrote:
> >> >>>
> >> >>> Greetings!
> >> >>>
> >> >>> During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I
> probably found skeletons in the closet.
> >> >>>
> >> >>> Let's start from the beginning, initially I got this warning
> >> >>>
> >> >>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY
> columns added implicitly due to DISTINCT is deprecated and will be removed
> in SQLAlchemy 2.0.  SELECT statements with DISTINCT should be written to
> explicitly include the appropriate columns in the columns clause
> (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)')
> >> >>>
> >> >>> "OK let's fix it!", I thought at first and started to investigate
> RenderedTaskInstanceFields model
> >> >>>
> >> >>> Skeleton #1:
> >> >>>
> >> >>> When I first time look on the code and comments it got me to
> thinking that part which keep only latest N Rendered Task Fields
> potentially could lead different performance degradation (Locks, Dead
> Locks, Data Bloating): see code
> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245
> >> >>>
> >> >>> Also this historical part (from Airflow 1.10.10) generate this SQL
> Statement (pg backend)
> >> >>>
> >> >>> DELETE FROM rendered_task_instance_fields
> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s
> >> >>>   AND rendered_task_instance_fields.task_id = %(task_id_1) s
> >> >>>   AND (
> >> >>>     (
> >> >>>       rendered_task_instance_fields.dag_id,
> >> >>>       rendered_task_instance_fields.task_id,
> >> >>>       rendered_task_instance_fields.run_id
> >> >>>     ) NOT IN (
> >> >>>       SELECT
> >> >>>         anon_1.dag_id,
> >> >>>         anon_1.task_id,
> >> >>>         anon_1.run_id
> >> >>>       FROM
> >> >>>         (
> >> >>>           SELECT DISTINCT
> >> >>>             rendered_task_instance_fields.dag_id AS dag_id,
> >> >>>             rendered_task_instance_fields.task_id AS task_id,
> >> >>>             rendered_task_instance_fields.run_id AS run_id,
> >> >>>             dag_run.execution_date AS execution_date
> >> >>>           FROM rendered_task_instance_fields
> >> >>>             JOIN dag_run ON rendered_task_instance_fields.dag_id =
> dag_run.dag_id
> >> >>>             AND rendered_task_instance_fields.run_id =
> dag_run.run_id
> >> >>>           WHERE
> >> >>>             rendered_task_instance_fields.dag_id = %(dag_id_2) s
> >> >>>             AND rendered_task_instance_fields.task_id =
> %(task_id_2) s
> >> >>>           ORDER BY
> >> >>>             dag_run.execution_date DESC
> >> >>>           limit %(param_1) s
> >> >>>         ) AS anon_1
> >> >>>     )
> >> >>>   )
> >> >>>
> >> >>> Which is especially not effective in PostgreSQL. When IN SUBQUERY
> could be easily transform internaly into SEMI-JOIN (aka EXISTS clause), but
> it is not working for NOT IN SUBQUERY because it is not transformed into
> ANTI JOIN (aka NOT EXISTS clause) even if it possible, see:
> https://commitfest.postgresql.org/27/2023/
> >> >>>
> >> >>> I didn't do any performance benchmarks yet but I guess if users set
> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than default
> 30 it could improve performance and reduce number of DeadLocks, however the
> table size will increase but I think we don't do any maintenance job for
> other tables.
> >> >>>
> >> >>> Potentially it is a good idea to deprecate this option and
> recommend for users to set it to 0? WDYT? Maybe someone has already tried
> or investigated this?
> >> >>>
> >> >>>
> >> >>> Skeleton #2:
> >> >>>
> >> >>> We have a k8s_pod_yaml field which is exclusively used by K8S
> executors.
> >> >>>
> >> >>> Should we also decouple this field as part of AIP-51?
> >> >>>
> >> >>> ----
> >> >>> Best Wishes
> >> >>> Andrey Anshin
> >> >>>
>

Reply via email to