> My main issue and the topic of this thread, has been that the scheduler
does unnecessary work that leads to decreased throughput. My solution has
been to limit the results of the query to the dag cap of active tasks that
the user has defined.

Yes. I understand that. There are situations that cause this "unnecessary
work" to be excessive and lead to lower performance and more memory usage.
This is quite "normal". No system in the world is optimized for all kinds
of scenarios and sometimes you need to make trade-offs - for example lower
performance and maintainability (and support for MySQL and Postgres as Ash
pointed out in some other threads) which we have to make. There are various
optimisation goals we can chase: optimal performance and no wasted
resources in certain situations and configurations is one of (many) goals
we might have. Other goals might include: easier maintainability, better
community collaboration, simplicity, less code to maintain, testability,
also (what I mentioned before) sometimes deliberate not handling certain
scenarios and introducing friction **might** be deliberate decision we can
take in order to push our users in the direction we want them to go. Yes.
As community and maintainers we do not have to always "follow" our users
behaviour - we can (and we often do) educate our users and show them better
ways of doing things.

For example we had a LONG discussion whether to introduce caching of
Variable values during Dag parsing - because we knew our users are often
using Variables in top-level code of their Dags and this leads to a lot of
waste and high CPU and I/O usage by Dag processor. We finally implemented
it as an experimental feature, but it was not at all certain we will - we
had to carefully consider what we are trading in exchange for that
performance - and whether it's worth it.

Same here - I understand: there are some cases (arguably rather niche -
with very large Dags) where scheduler does unnecessary processing and
performance could be improved. Now - we need to understand what trade-offs
we need to make as maintainers and community (including our users) if we
want to address it. We need to know what complexity is involved, whether it
will work with Postgres/MySQL and SQlite, whether we will be able to
continue debugging and testing it. And whether we want to drive away our
user from the modularisation strategy (smaller Dags) that we think makes
more sense than bigger Dags. We have to think about what happens next. If
we make "huge Dags" first-class-citizens, will it mean that we will have to
redesign our UI to support them? What should we do when someone opens up an
issue "I have this 1000000 task Dag and I cannot open Airflow UI - it
crashes hard and makes my Airflow instance unusable - please fix it ASAP".
I certainly would like to avoid such a situation to stress our friend
maintainers who work on UI - so also they should have a say on how feasible
it is to make it "easy" to have "huge Dags" for them.

All those factors should be taken into account when you make a "product"
decision. Performance gains for particular cases is just one of many
factors to consider - and often not the most important ones.

J.


On Wed, Aug 6, 2025 at 7:34 AM Christos Bisias <christos...@gmail.com>
wrote:

> We also have a dag with dynamic task mapping that can grow immensely.
>
> I've been looking at https://github.com/apache/airflow/pull/53492.
>
> My main issue and the topic of this thread, has been that the scheduler
> does unnecessary work that leads to decreased throughput. My solution has
> been to limit the results of the query to the dag cap of active tasks that
> the user has defined.
>
> The patch is more focused on the available pool slots. I get the idea that
> if we can only examine and queue as many tasks as available slots, then we
> will be efficiently utilizing the available slots to the max, the
> throughput will increase and my issue will be solved as well.
>
> IMO, the approach on the patch isn't easily maintainable. Most of the
> calculations are performed by SQL in a huge query.
>
> It would be my preference to have many smaller queries and do part of the
> calculations in python. This will be easier to understand, maintain and
> debug in the future. Also, it will be easier to unit test.
>
> On Tue, Aug 5, 2025 at 10:20 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>
> > Just a comment here - I am also not opposed as well if optimizations will
> > be implemented without impacting the more "regular"cases. And -
> important -
> > without adding huge complexity.
> >
> > The SQL queries I saw in recent PRs and discussions look both "smart" and
> > "scary" at the same time. Optimizations like that tend to lead to
> > obfuscated, difficult to understand and reason code and "smart"
> solutions -
> > sometimes "too smart". And when it ends up with one or two people only
> > being able to debug and fix problems connected with those, things become
> a
> > little hairy. So whatever we do there, it **must** be not only "smart"
> but
> > also easy to read and well tested - so that anyone can run the tests
> easily
> > and reproduce potential failure cases.
> >
> > And yes I know I am writing this as someone who - for years was the only
> > one to understand our complex CI setup. But I think over the last two
> years
> > we are definitely going into, simpler, easier to understand setup and we
> > have more people on board who know how to deal with it and I think that
> is
> > a very good direction we are taking :). And I am sure that when I go for
> my
> > planned 3 weeks holidays before the summit, everything will work as
> > smoothly as when I am here - at least.
> >
> > Also I think there is quite a difference (when it comes to scheduling)
> when
> > you have mapped tasks versus "regular tasks". I think Airflow even
> > currently behaves rather differently in those two different cases, and
> also
> > it has a well-thought and optimized UI experience to handle thousands of
> > them. Also the work of David Blain on Lazy Expandable Task Mapping will
> > push the boundaries of what is possible there as well:
> > https://github.com/apache/airflow/pull/51391. Even if we solve
> scheduling
> > optimization - the UI and ability to monitor such huge Dags is still
> likely
> > not something our UI was designed for.
> >
> > And I am fully on board with "splitting to even smaller pieces" and
> > "modularizing" things - and "modularizing and splitting big Dags into
> > smaller Dags" feels like precisely what should be done. And I think it
> > would be a nice idea to try it and follow and see if you can't achieve
> the
> > same results without adding complexity.
> >
> > J.
> >
> >
> > On Tue, Aug 5, 2025 at 8:47 PM Ash Berlin-Taylor <a...@apache.org> wrote:
> >
> > > Yeah dynamic task mapping is a good case where you could easily end up
> > > with thousands of tasksof in a dag.
> > >
> > > As I like to say, Airflow is a broad church and if we’re can reasonably
> > > support diverse workloads without impacting others (either the
> workloads
> > > out our available to support and maintain etc) then I’m all for it.
> > >
> > > In addition to your two items I’d like to add
> > >
> > > 3. That it doesn’t increase the db’s CPU disproportionally to the
> > > increased task throughput
> > >
> > > > On 5 Aug 2025, at 19:14, asquator <asqua...@proton.me.invalid>
> wrote:
> > > >
> > > > I'm glad this issue finally got enough attention and we can move it
> > > forward.
> > > > I took a look at @Christos's patch and it makes sense overall, it's
> > fine
> > > for the specific problem they experienced with max_active_tasks limit.
> > > > For those unfamiliar with the core problem, the bug has a plenty of
> > > variations where starvation happens due to different concurrency
> > > limitations being nearly satiated, which creates the opportunity for
> the
> > > scheduler to pull many tasks and schedule none of them.
> > > > To reproduce this bug, you need two conditions:
> > > > 1. Many tasks (>> max_tis) belonging to one "pool", where "pool" is
> > some
> > > concurrency limitation of Airflow. Note that originally the bug was
> > > discovered in context of task pools (see
> > > https://github.com/apache/airflow/issues/45636).
> > > > 2. The tasks are short enough (or the parallelism is large enough)
> for
> > > the tasks from the nearly starved pool to free some slots in every
> > > scheduler's iteration.
> > > >
> > > > When we discovered a bug that starved our less prioritized pool, even
> > > when the most prioritized pool was almost full (thanks to @nevcohen),
> we
> > > wanted to implement a similar patch @Christos suggested above, but for
> > > pools. But then we realized this issue can arise due to limits
> different
> > > from task pools, including:
> > > > max_active_tasks
> > > > max_active_tis_per_dag
> > > > max_active_tis_per_dagrun
> > > >
> > > > So we were able to predict the forecoming bug reports for different
> > > kinds of starvation, and we started working on the most general
> solution
> > > which is the topic of this discussion.
> > > >
> > > > I want to also answer @potiuk regarding "why you need such large
> DAGs",
> > > but I will be brief.
> > > > Airflow is an advanced tool for scheduling large data operations, and
> > > over the years it has pushed to production many features that lead to
> > > organizations writing DAGs that contain thousands of tasks. Most
> > prominent
> > > one is dynamic task mapping. This feature made us realize we can
> > implement
> > > a batching work queue pattern and create a task for every unit we have
> to
> > > process, say it's a file in a specific folder, a path in the
> filesystem,
> > a
> > > pointer to some data stored in object storage, etc. We like to think in
> > > terms of splitting the work into many tasks. Is it good? I don't know,
> > but
> > > Airflow has already stepped onto this path, and we have to make it
> > > technologically possible (if we can).
> > > >
> > > > Nevertheless, even if such DAGs are considered too big and splitting
> > > them is a good idea (though you still have nothing to do with mapped
> > tasks
> > > - we create tens of thousands of them sometimes and expect them to be
> > > processed in parallel), this issue does not only address the described
> > > case, but many others, including prioritized pools, mapped tasks or
> > > max_active_runs starvation on large backfills.
> > > >
> > > > The only part that's missing now is measuring query time (static
> > > benchmarks) and measuring overall scheduling metrics in production
> > > workloads (dynamic benchmarks).
> > > > We're working hard on this crucial part now.
> > > >
> > > > We'd be happy to have any assistance from the community as regard to
> > the
> > > dynamic benchmarks, because every workload is different and it's pretty
> > > difficult to simulate the general case in such a hard-to-reproduce
> issue.
> > > We have to make sure that:
> > > >
> > > > 1. In a busy workload, the new logic boosts the scheduler's
> throughput.
> > > > 2. In a light workload, the nested windowing doesn't significantly
> slow
> > > down the computation.
> > > >
> > > >
> > > >> On Monday, August 4th, 2025 at 9:00 PM, Christos Bisias <
> > > christos...@gmail.com> wrote:
> > > >>
> > > >> I created a draft PR for anyone interested to take a look at the
> code
> > > >>
> > > >> https://github.com/apache/airflow/pull/54103
> > > >>
> > > >> I was able to demonstrate the issue in the unit test with much fewer
> > > tasks.
> > > >> All we need is the tasks brought back by the db query to belong to
> the
> > > same
> > > >> dag_run or dag. This can happen when the first SCHEDULED tasks in
> line
> > > to
> > > >> be examined are at least as many as the number of the tis per query.
> > > >>
> > > >> On Mon, Aug 4, 2025 at 8:37 PM Daniel Standish
> > > >> daniel.stand...@astronomer.io.invalid wrote:
> > > >>
> > > >>>> The configurability was my recommendation for
> > > >>>> https://github.com/apache/airflow/pull/53492
> > > >>>> Given the fact that this change is at the heart of Airflow I think
> > the
> > > >>>> changes should be experimental where users can switch between
> > > different
> > > >>>> strategies/modes of the scheduler.
> > > >>>> If and when we have enough data to support that specific option is
> > > always
> > > >>>> better we can make decisions accordingly.
> > > >>>
> > > >>> Yeah I guess looking at #53492
> > > >>> https://github.com/apache/airflow/pull/53492 it does seem too
> risky
> > to
> > > >>> just change the behavior in airflow without releasing it first as
> > > >>> experimental.
> > > >>>
> > > >>> I doubt we can get sufficient real world testing without doing
> that.
> > > >>>
> > > >>> So if this is introduced, I think it should just be introduced as
> > > >>> experimental optimization. And the intention would be that
> ultimately
> > > >>> there will only be one scheduling mode, and this is just a way to
> > test
> > > this
> > > >>> out more widely. Not that we are intending to have two scheduling
> > code
> > > >>> paths on a permanent basis.
> > > >>>
> > > >>> WDYT
> > > >>>
> > > >>> On Mon, Aug 4, 2025 at 12:50 AM Christos Bisias
> > christos...@gmail.com
> > > >>> wrote:
> > > >>>
> > > >>>>> So my question to you is: is it impossible, or just demanding or
> > > >>>>> difficult
> > > >>>>> to split your Dags into smaller dags connected with asset aware
> > > >>>>> scheduling?
> > > >>>>
> > > >>>> Jarek, I'm going to discuss this with the team and I will get you
> an
> > > >>>> answer
> > > >>>> on that.
> > > >>>>
> > > >>>> I've shared this again on the thread
> > > >>>
> > > >>>
> > >
> >
> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
> > > >>>
> > > >>>> I haven't created a PR because this is just a POC and it's also
> > > setting a
> > > >>>> limit per dag. I would like to get feedback on whether it's better
> > to
> > > >>>> make
> > > >>>> it per dag or per dag_run.
> > > >>>> I can create a draft PR if that's helpful and makes it easier to
> add
> > > >>>> comments.
> > > >>>>
> > > >>>> Let me try to explain the issue better. From a high level
> overview,
> > > the
> > > >>>> scheduler
> > > >>>>
> > > >>>> 1. moves tasks to SCHEDULED
> > > >>>> 2. runs a query to fetch SCHEDULED tasks from the db
> > > >>>> 3. examines the tasks
> > > >>>> 4. moves tasks to QUEUED
> > > >>>>
> > > >>>> I'm focusing on step 2 and afterwards. The current code doesn't
> take
> > > into
> > > >>>> account the max_active_tasks_per_dag. When it runs the query it
> > > fetches
> > > >>>> up to max_tis which is determined here
> > > >>>> <
> > > >>>
> > > >>>
> > >
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L697-L705
> > > >>>
> > > >>>> .
> > > >>>>
> > > >>>> For example,
> > > >>>>
> > > >>>> - if the query number is 32
> > > >>>> - all 32 tasks in line belong to the same dag, dag1
> > > >>>> - we are not concerned how the scheduler picks them
> > > >>>> - dag1 has max_active_tasks set to 5
> > > >>>>
> > > >>>> The current code will
> > > >>>>
> > > >>>> - get 32 tasks from dag1
> > > >>>> - start examining them one by one
> > > >>>> - once 5 are moved to QUEUED, it won't stop, it will keep
> examining
> > > >>>> the other 27 but won't be able to queue them because it has
> reached
> > > >>>> the
> > > >>>> limit
> > > >>>>
> > > >>>> In the next loop, although we have reached the maximum number of
> > tasks
> > > >>>> for
> > > >>>> dag1, the query will fetch again 32 tasks from dag1 to examine
> them
> > > >>>> and
> > > >>>> try to queue them.
> > > >>>>
> > > >>>> The issue is that it gets more tasks than it can queue from the db
> > and
> > > >>>> then
> > > >>>> examines them all.
> > > >>>>
> > > >>>> This all leads to unnecessary processing that builds up and the
> more
> > > load
> > > >>>> there is on the system, the more the throughput drops for the
> > > scheduler
> > > >>>> and
> > > >>>> the workers.
> > > >>>>
> > > >>>> What I'm proposing is to adjust the query in step 2, to check the
> > > >>>> max_active_tasks_per_dag
> > > >>>>
> > > >>>>> run a query to fetch SCHEDULED tasks from the db
> > > >>>>
> > > >>>> If a dag has already reached the maximum number of tasks in active
> > > >>>> states,
> > > >>>> it will be skipped by the query.
> > > >>>>
> > > >>>> Don't we already stop examining at that point? I guess there's two
> > > >>>> things
> > > >>>>
> > > >>>>> you might be referring to. One is, which TIs come out of the db
> and
> > > >>>>> into
> > > >>>>> python, and the other is, what we do in python. Just might be
> > helpful
> > > >>>>> to
> > > >>>>> be clear about the specific enhancements & changes you are
> making.
> > > >>>>
> > > >>>> I think that if we adjust the query and fetch the right number of
> > > tasks,
> > > >>>> then we won't have to make changes to what is done in python.
> > > >>>>
> > > >>>> On Mon, Aug 4, 2025 at 8:01 AM Daniel Standish
> > > >>>> daniel.stand...@astronomer.io.invalid wrote:
> > > >>>>
> > > >>>>> @Christos Bisias
> > > >>>>>
> > > >>>>> If you have a very large dag, and its tasks have been scheduled,
> > then
> > > >>>>> the
> > > >>>>>
> > > >>>>>> scheduler will keep examining the tasks for queueing, even if it
> > has
> > > >>>>>> reached the maximum number of active tasks for that particular
> > dag.
> > > >>>>>> Once
> > > >>>>>> that fails, then it will move on to examine the scheduled tasks
> of
> > > >>>>>> the
> > > >>>>>> next
> > > >>>>>> dag or dag_run in line.
> > > >>>>>
> > > >>>>> Can you make this a little more precise? There's some protection
> > > >>>>> against
> > > >>>>> "starvation" i.e. dag runs recently considered should go to the
> > back
> > > of
> > > >>>>> the
> > > >>>>> line next time.
> > > >>>>>
> > > >>>>> Maybe you could clarify why / how that's not working / not
> optimal
> > /
> > > >>>>> how
> > > >>>>> to
> > > >>>>> improve.
> > > >>>>>
> > > >>>>> If there are available slots in the pool and
> > > >>>>>
> > > >>>>>> the max parallelism hasn't been reached yet, then the scheduler
> > > >>>>>> should
> > > >>>>>> stop
> > > >>>>>> processing a dag that has already reached its max capacity of
> > active
> > > >>>>>> tasks.
> > > >>>>>
> > > >>>>> If a dag run (or dag) is already at max capacity, it doesn't
> really
> > > >>>>> matter
> > > >>>>> if there are slots available or parallelism isn't reached --
> > > shouldn't
> > > >>>>> it
> > > >>>>> stop anyway?
> > > >>>>>
> > > >>>>> In addition, the number of scheduled tasks picked for examining,
> > > should
> > > >>>>> be
> > > >>>>>
> > > >>>>>> capped at the number of max active tasks if that's lower than
> the
> > > >>>>>> query
> > > >>>>>> limit. If the active limit is 10 and we already have 5 running,
> > then
> > > >>>>>> we
> > > >>>>>> can
> > > >>>>>> queue at most 5 tasks. In that case, we shouldn't examine more
> > than
> > > >>>>>> that.
> > > >>>>>
> > > >>>>> Don't we already stop examining at that point? I guess there's
> two
> > > >>>>> things
> > > >>>>> you might be referring to. One is, which TIs come out of the db
> and
> > > >>>>> into
> > > >>>>> python, and the other is, what we do in python. Just might be
> > helpful
> > > >>>>> to
> > > >>>>> be clear about the specific enhancements & changes you are
> making.
> > > >>>>>
> > > >>>>> There is already a patch with the changes mentioned above. IMO,
> > these
> > > >>>>>
> > > >>>>>> changes should be enabled/disabled with a config flag and not by
> > > >>>>>> default
> > > >>>>>> because not everyone has the same needs as us. In our testing,
> > > >>>>>> adding a
> > > >>>>>> limit on the tasks retrieved from the db requires more
> processing
> > on
> > > >>>>>> the
> > > >>>>>> query which actually makes things worse when you have multiple
> > small
> > > >>>>>> dags.
> > > >>>>>
> > > >>>>> I would like to see a stronger case made for configurability. Why
> > > make
> > > >>>>> it
> > > >>>>> configurable? If the performance is always better, it should not
> be
> > > >>>>> made
> > > >>>>> configurable. Unless it's merely released as an opt-in
> experimental
> > > >>>>> feature. If it is worse in some profiles, let's be clear about
> > that.
> > > >>>>>
> > > >>>>> I did not read anything after `Here is a simple test case that
> > makes
> > > the benefits of the improvements noticeable` because, it seemed rather
> > long
> > > winded detail about a test
> > > >>>>> case. A higher level summary might be helpful to your audience.
> Is
> > > >>>>> there
> > > >>>>> a PR with your optimization. You wrote "there is a patch" but did
> > > not,
> > > >>>>> unless I miss something, share it. I would take a look if you
> share
> > > it
> > > >>>>> though.
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>>
> > > >>>>> On Sun, Aug 3, 2025 at 5:08 PM Daniel Standish <
> > > >>>>> daniel.stand...@astronomer.io> wrote:
> > > >>>>>
> > > >>>>>> Yes Ui is another part of this.
> > > >>>>>>
> > > >>>>>> At some point the grid and graph views completely stop making
> > sense
> > > >>>>>> for
> > > >>>>>> that volume, and another type of view would be required both for
> > > >>>>>> usability
> > > >>>>>> and performance
> > > >>>>>>
> > > >>>>>> On Sun, Aug 3, 2025 at 11:04 AM Jens Scheffler
> > > >>>>>> j_scheff...@gmx.de.invalid
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi,
> > > >>>>>>>
> > > >>>>>>> We also have a current demand to have a workflow to execute 10k
> > to
> > > >>>>>>> 100k
> > > >>>>>>> tasks. Together with @AutomationDev85 we are working on a local
> > > >>>>>>> solution
> > > >>>>>>> because we also saw problems in the Scheduler that are not
> > linearly
> > > >>>>>>> scaling. And for sure not easy to be fixed. But from our
> > > >>>>>>> investigation
> > > >>>>>>> also there are other problems to be considered like UI will
> also
> > > >>>>>>> potentially have problems.
> > > >>>>>>>
> > > >>>>>>> I am a bit sceptic that PR 49160 completely fixes the problems
> > > >>>>>>> mentioned
> > > >>>>>>> here and made some comments. I do not want to stop enthusiasm
> to
> > > fix
> > > >>>>>>> and
> > > >>>>>>> improve things but the Scheduler is quite complex and changed
> > need
> > > >>>>>>> to
> > > >>>>>>> be
> > > >>>>>>> made with care.
> > > >>>>>>>
> > > >>>>>>> Actually I like the patch
> > > >>>
> > > >>>
> > >
> >
> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
> > > >>>
> > > >>>>>>> as it just adds some limit preventing scheduler to focus on
> only
> > > one
> > > >>>>>>> run. But complexity is a bit big for a "patch" :-D
> > > >>>>>>>
> > > >>>>>>> I'd also propose atm the way that Jarek described and split-up
> > the
> > > >>>>>>> Dag
> > > >>>>>>> into multiple parts (divide and conquer) for the moment.
> > > >>>>>>>
> > > >>>>>>> Otherwise if there is a concrete demand on such large Dags...
> we
> > > >>>>>>> maybe
> > > >>>>>>> need rather a broader initiative if we want to ensure 10k,
> 100k,
> > > 1M?
> > > >>>>>>> tasks are supported per Dag. Because depending on the magnitude
> > we
> > > >>>>>>> strive for different approaches are needed.
> > > >>>>>>>
> > > >>>>>>> Jens
> > > >>>>>>>
> > > >>>>>>> On 03.08.25 16:33, Daniel Standish wrote:
> > > >>>>>>>
> > > >>>>>>>> Definitely an area of the scheduler with some opportunity for
> > > >>>>>>>> performance
> > > >>>>>>>> improvement.
> > > >>>>>>>>
> > > >>>>>>>> I would just mention that, you should also attempt to include
> > some
> > > >>>>>>>> performance testing at load / scale because, window functions
> > are
> > > >>>>>>>> going
> > > >>>>>>>> to
> > > >>>>>>>> be more expensive.
> > > >>>>>>>>
> > > >>>>>>>> What happens when you have many dags, many historical dag
> runs &
> > > >>>>>>>> TIs,
> > > >>>>>>>> lots
> > > >>>>>>>> of stuff running concurrently. You need to be mindful of the
> > > >>>>>>>> overall
> > > >>>>>>>> impact of such a change, and not look only at the time spent
> on
> > > >>>>>>>> scheduling
> > > >>>>>>>> this particular dag.
> > > >>>>>>>>
> > > >>>>>>>> I did not look at the PRs yet, maybe you've covered this, but,
> > > >>>>>>>> it's
> > > >>>>>>>> important.
> > > >>>>>>>>
> > > >>>>>>>> On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias<
> > > >>>>>>>> christos...@gmail.com>
> > > >>>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> I'm going to review the PR code and test it more thoroughly
> > > >>>>>>>>> before
> > > >>>>>>>>> leaving
> > > >>>>>>>>> a comment.
> > > >>>>>>>>>
> > > >>>>>>>>> This is my code for reference
> > > >>>
> > > >>>
> > >
> >
> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
> > > >>>
> > > >>>>>>>>> The current version is setting a limit per dag, across all
> > > >>>>>>>>> dag_runs.
> > > >>>>>>>>>
> > > >>>>>>>>> Please correct me if I'm wrong, but the PR looks like it's
> > > >>>>>>>>> changing
> > > >>>>>>>>> the way
> > > >>>>>>>>> that tasks are prioritized to avoid starvation. If that's the
> > > >>>>>>>>> case,
> > > >>>>>>>>> I'm not
> > > >>>>>>>>> sure that this is the same issue. My proposal is that, if we
> > have
> > > >>>>>>>>> reached
> > > >>>>>>>>> the max resources assigned to a dag, then stop processing its
> > > >>>>>>>>> tasks
> > > >>>>>>>>> and
> > > >>>>>>>>> move on to the next one. I'm not changing how or which tasks
> > are
> > > >>>>>>>>> picked.
> > > >>>>>>>>>
> > > >>>>>>>>> On Sun, Aug 3, 2025 at 3:23 PM asquator<asqua...@proton.me
> > > >>>>>>>>> .invalid>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Thank you for the feedback.
> > > >>>>>>>>>> Please, describe the case with failing limit checks in the
> PR
> > > >>>>>>>>>> (DAG's
> > > >>>>>>>>>> parameters and it's tasks' parameters and what fails to be
> > > >>>>>>>>>> checked)
> > > >>>>>>>>>> and
> > > >>>>>>>>>> we'll try to fix it ASAP before you can test it again. Let's
> > > >>>>>>>>>> continue
> > > >>>>>>>>>> the
> > > >>>>>>>>>> PR-related discussion in the PR itself.
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias <
> > > >>>>>>>>>> christos...@gmail.com> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Thank you for bringing this PR to my attention.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> I haven't studied the code but I ran a quick test on the
> > branch
> > > >>>>>>>>>>> and
> > > >>>>>>>>>>> this
> > > >>>>>>>>>>> completely ignores the limit on scheduled tasks per dag or
> > > >>>>>>>>>>> dag_run.
> > > >>>>>>>>>>> It
> > > >>>>>>>>>>> grabbed 70 tasks from the first dag and then moved all 70
> to
> > > >>>>>>>>>>> QUEUED
> > > >>>>>>>>>>> without
> > > >>>>>>>>>>> any further checks.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> This is how I tested it
> > > >>>
> > > >>>
> > >
> >
> https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1
> > > >>>
> > > >>>>>>>>>>> On Sun, Aug 3, 2025 at 1:44 PM asquatorasqua...@proton.me
> > > >>>>>>>>>>> .invalid
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Hello,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> This is a known issue stemming from the optimistic
> > scheduling
> > > >>>>>>>>>>>> strategy
> > > >>>>>>>>>>>> used in Airflow. We do address this in the above-mentioned
> > > >>>>>>>>>>>> PR. I
> > > >>>>>>>>>>>> want
> > > >>>>>>>>>>>> to
> > > >>>>>>>>>>>> note that there are many cases where this problem may
> > > >>>>>>>>>>>> appear—it
> > > >>>>>>>>>>>> was
> > > >>>>>>>>>>>> originally detected with pools, but we are striving to fix
> > it
> > > >>>>>>>>>>>> in
> > > >>>>>>>>>>>> all
> > > >>>>>>>>>>>> cases,
> > > >>>>>>>>>>>> such as the one described here with
> max_active_tis_per_dag,
> > by
> > > >>>>>>>>>>>> switching to
> > > >>>>>>>>>>>> pessimistic scheduling with SQL window functions. While
> the
> > > >>>>>>>>>>>> current
> > > >>>>>>>>>>>> strategy simply pulls the max_tis tasks and drops the ones
> > > >>>>>>>>>>>> that
> > > >>>>>>>>>>>> do
> > > >>>>>>>>>>>> not
> > > >>>>>>>>>>>> meet
> > > >>>>>>>>>>>> the constraints, the new strategy will pull only the tasks
> > > >>>>>>>>>>>> that
> > > >>>>>>>>>>>> are
> > > >>>>>>>>>>>> actually ready to be scheduled and that comply with all
> > > >>>>>>>>>>>> concurrency
> > > >>>>>>>>>>>> limits.
> > > >>>>>>>>>>>> It would be very helpful for pushing this change to
> > production
> > > >>>>>>>>>>>> if
> > > >>>>>>>>>>>> you
> > > >>>>>>>>>>>> could assist us in alpha-testing it.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> See also:
> > > >>>>>>>>>>>> https://github.com/apache/airflow/discussions/49160
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Sent with Proton Mail secure email.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif
> > > >>>>>>>>>>>> elad...@apache.org
> > > >>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> i think most of your issues will be addressed by
> > > >>>>>>>>>>>>> https://github.com/apache/airflow/pull/53492
> > > >>>>>>>>>>>>> The PR code can be tested with Breeze so you can set it
> up
> > > >>>>>>>>>>>>> and
> > > >>>>>>>>>>>>> see
> > > >>>>>>>>>>>>> if it
> > > >>>>>>>>>>>>> solves the problem this will also help with confirming
> it's
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>> right
> > > >>>>>>>>>>>>> fix.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias
> > > >>>>>>>>>>>>> christos...@gmail.com
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hello,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> The scheduler is very efficient when running a large
> > amount
> > > >>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>> dags
> > > >>>>>>>>>>>>>> with up
> > > >>>>>>>>>>>>>> to 1000 tasks each. But in our case, we have dags with
> as
> > > >>>>>>>>>>>>>> many
> > > >>>>>>>>>>>>>> as
> > > >>>>>>>>>>>>>> 10.000
> > > >>>>>>>>>>>>>> tasks. And in that scenario the scheduler and worker
> > > >>>>>>>>>>>>>> throughput
> > > >>>>>>>>>>>>>> drops
> > > >>>>>>>>>>>>>> significantly. Even if you have 1 such large dag with
> > > >>>>>>>>>>>>>> scheduled
> > > >>>>>>>>>>>>>> tasks,
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> performance hit becomes noticeable.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> We did some digging and we found that the issue comes
> from
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> scheduler's
> > > >>>>>>>>>>>>>> _executable_task_instances_to_queued
> > > >>>>>>>>>>>>>> <
> > > >>>
> > > >>>
> > >
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647
> > > >>>
> > > >>>>>>>>>>>>>> method.
> > > >>>>>>>>>>>>>> In particular with the db query here
> > > >>>>>>>>>>>>>> <
> > > >>>
> > > >>>
> > >
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375
> > > >>>
> > > >>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>> examining the results here
> > > >>>>>>>>>>>>>> <
> > > >>>
> > > >>>
> > >
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425
> > > >>>
> > > >>>>>>>>>>>>>> .
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> If you have a very large dag, and its tasks have been
> > > >>>>>>>>>>>>>> scheduled,
> > > >>>>>>>>>>>>>> then
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> scheduler will keep examining the tasks for queueing,
> even
> > > >>>>>>>>>>>>>> if
> > > >>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>> has
> > > >>>>>>>>>>>>>> reached the maximum number of active tasks for that
> > > >>>>>>>>>>>>>> particular
> > > >>>>>>>>>>>>>> dag.
> > > >>>>>>>>>>>>>> Once
> > > >>>>>>>>>>>>>> that fails, then it will move on to examine the
> scheduled
> > > >>>>>>>>>>>>>> tasks
> > > >>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> next
> > > >>>>>>>>>>>>>> dag or dag_run in line.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> This is inefficient and causes the throughput of the
> > > >>>>>>>>>>>>>> scheduler
> > > >>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> workers to drop significantly. If there are available
> > slots
> > > >>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> pool and
> > > >>>>>>>>>>>>>> the max parallelism hasn't been reached yet, then the
> > > >>>>>>>>>>>>>> scheduler
> > > >>>>>>>>>>>>>> should
> > > >>>>>>>>>>>>>> stop
> > > >>>>>>>>>>>>>> processing a dag that has already reached its max
> capacity
> > > >>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>> active
> > > >>>>>>>>>>>>>> tasks.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> In addition, the number of scheduled tasks picked for
> > > >>>>>>>>>>>>>> examining,
> > > >>>>>>>>>>>>>> should be
> > > >>>>>>>>>>>>>> capped at the number of max active tasks if that's lower
> > > >>>>>>>>>>>>>> than
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> query
> > > >>>>>>>>>>>>>> limit. If the active limit is 10 and we already have 5
> > > >>>>>>>>>>>>>> running,
> > > >>>>>>>>>>>>>> then
> > > >>>>>>>>>>>>>> we can
> > > >>>>>>>>>>>>>> queue at most 5 tasks. In that case, we shouldn't
> examine
> > > >>>>>>>>>>>>>> more
> > > >>>>>>>>>>>>>> than
> > > >>>>>>>>>>>>>> that.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> There is already a patch with the changes mentioned
> above.
> > > >>>>>>>>>>>>>> IMO,
> > > >>>>>>>>>>>>>> these
> > > >>>>>>>>>>>>>> changes should be enabled/disabled with a config flag
> and
> > > >>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>> by
> > > >>>>>>>>>>>>>> default
> > > >>>>>>>>>>>>>> because not everyone has the same needs as us. In our
> > > >>>>>>>>>>>>>> testing,
> > > >>>>>>>>>>>>>> adding a
> > > >>>>>>>>>>>>>> limit on the tasks retrieved from the db requires more
> > > >>>>>>>>>>>>>> processing
> > > >>>>>>>>>>>>>> on
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> query which actually makes things worse when you have
> > > >>>>>>>>>>>>>> multiple
> > > >>>>>>>>>>>>>> small
> > > >>>>>>>>>>>>>> dags.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Here is a simple test case that makes the benefits of
> the
> > > >>>>>>>>>>>>>> improvements
> > > >>>>>>>>>>>>>> noticeable
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> - we have 3 dags with thousands of tasks each
> > > >>>>>>>>>>>>>> - for simplicity let's have 1 dag_run per dag
> > > >>>>>>>>>>>>>> - triggering them takes some time and due to that, the
> > FIFO
> > > >>>>>>>>>>>>>> order
> > > >>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> tasks is very clear
> > > >>>>>>>>>>>>>> - e.g. 1000 tasks from dag1 were scheduled first and
> then
> > > >>>>>>>>>>>>>> 200
> > > >>>>>>>>>>>>>> tasks
> > > >>>>>>>>>>>>>> from dag2 etc.
> > > >>>>>>>>>>>>>> - the executor has parallelism=100 and
> slots_available=100
> > > >>>>>>>>>>>>>> which
> > > >>>>>>>>>>>>>> means
> > > >>>>>>>>>>>>>> that it can run up to 100 tasks concurrently
> > > >>>>>>>>>>>>>> - max_active_tasks_per_dag is 4 which means that we can
> > have
> > > >>>>>>>>>>>>>> up
> > > >>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>> 4
> > > >>>>>>>>>>>>>> tasks running per dag.
> > > >>>>>>>>>>>>>> - For 3 dags, it means that we can run up to 12 tasks at
> > the
> > > >>>>>>>>>>>>>> same
> > > >>>>>>>>>>>>>> time (4 tasks from each dag)
> > > >>>>>>>>>>>>>> - max tis per query are set to 32, meaning that we can
> > > >>>>>>>>>>>>>> examine
> > > >>>>>>>>>>>>>> up
> > > >>>>>>>>>>>>>> to 32
> > > >>>>>>>>>>>>>> scheduled tasks if there are available pool slots
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> If we were to run the scheduler loop repeatedly until it
> > > >>>>>>>>>>>>>> queues
> > > >>>>>>>>>>>>>> 12
> > > >>>>>>>>>>>>>> tasks
> > > >>>>>>>>>>>>>> and test the part that examines the scheduled tasks and
> > > >>>>>>>>>>>>>> queues
> > > >>>>>>>>>>>>>> them,
> > > >>>>>>>>>>>>>> then
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> - with the query limit
> > > >>>>>>>>>>>>>> - 1 iteration, total time 0.05
> > > >>>>>>>>>>>>>> - During the iteration
> > > >>>>>>>>>>>>>> - we have parallelism 100, available slots 100 and query
> > > >>>>>>>>>>>>>> limit
> > > >>>>>>>>>>>>>> 32
> > > >>>>>>>>>>>>>> which means that it will examine up to 32 scheduled
> tasks
> > > >>>>>>>>>>>>>> - it can queue up to 100 tasks
> > > >>>>>>>>>>>>>> - examines 12 tasks (instead of 32)
> > > >>>>>>>>>>>>>> - 4 tasks from dag1, reached max for the dag
> > > >>>>>>>>>>>>>> - 4 tasks from dag2, reached max for the dag
> > > >>>>>>>>>>>>>> - and 4 tasks from dag3, reached max for the dag
> > > >>>>>>>>>>>>>> - queues 4 from dag1, reaches max for the dag and moves
> on
> > > >>>>>>>>>>>>>> - queues 4 from dag2, reaches max for the dag and moves
> on
> > > >>>>>>>>>>>>>> - queues 4 from dag3, reaches max for the dag and moves
> on
> > > >>>>>>>>>>>>>> - stops queueing because we have reached the maximum per
> > > >>>>>>>>>>>>>> dag,
> > > >>>>>>>>>>>>>> although there are slots for more tasks
> > > >>>>>>>>>>>>>> - iteration finishes
> > > >>>>>>>>>>>>>> - without
> > > >>>>>>>>>>>>>> - 3 iterations, total time 0.29
> > > >>>>>>>>>>>>>> - During iteration 1
> > > >>>>>>>>>>>>>> - Examines 32 tasks, all from dag1 (due to FIFO)
> > > >>>>>>>>>>>>>> - queues 4 from dag1 and tries to queue the other 28 but
> > > >>>>>>>>>>>>>> fails
> > > >>>>>>>>>>>>>> - During iteration 2
> > > >>>>>>>>>>>>>> - examines the next 32 tasks from dag1
> > > >>>>>>>>>>>>>> - it can't queue any of them because it has reached the
> > max
> > > >>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>> dag1, since the previous 4 are still running
> > > >>>>>>>>>>>>>> - examines 32 tasks from dag2
> > > >>>>>>>>>>>>>> - queues 4 from dag2 and tries to queue the other 28 but
> > > >>>>>>>>>>>>>> fails
> > > >>>>>>>>>>>>>> - During iteration 3
> > > >>>>>>>>>>>>>> - examines the next 32 tasks from dag1, same tasks that
> > were
> > > >>>>>>>>>>>>>> examined in iteration 2
> > > >>>>>>>>>>>>>> - it can't queue any of them because it has reached the
> > max
> > > >>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>> dag1 and the first 4 are still running
> > > >>>>>>>>>>>>>> - examines 32 tasks from dag2 , can't queue any of them
> > > >>>>>>>>>>>>>> because
> > > >>>>>>>>>>>>>> it has reached max for dag2 as well
> > > >>>>>>>>>>>>>> - examines 32 tasks from dag3
> > > >>>>>>>>>>>>>> - queues 4 from dag3 and tries to queue the other 28 but
> > > >>>>>>>>>>>>>> fails
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I used very low values for all the configs so that I can
> > > >>>>>>>>>>>>>> make
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> point
> > > >>>>>>>>>>>>>> clear and easy to understand. If we increase them, then
> > this
> > > >>>>>>>>>>>>>> patch
> > > >>>>>>>>>>>>>> also
> > > >>>>>>>>>>>>>> makes the task selection more fair and the resource
> > > >>>>>>>>>>>>>> distribution
> > > >>>>>>>>>>>>>> more
> > > >>>>>>>>>>>>>> even.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I would appreciate it if anyone familiar with the
> > > >>>>>>>>>>>>>> scheduler's
> > > >>>>>>>>>>>>>> code
> > > >>>>>>>>>>>>>> can
> > > >>>>>>>>>>>>>> confirm this and also provide any feedback.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Additionally, I have one question regarding the query
> > limit.
> > > >>>>>>>>>>>>>> Should it
> > > >>>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>> per dag_run or per dag? I've noticed that
> > > >>>>>>>>>>>>>> max_active_tasks_per_dag
> > > >>>>>>>>>>>>>> has
> > > >>>>>>>>>>>>>> been changed to provide a value per dag_run but the docs
> > > >>>>>>>>>>>>>> haven't
> > > >>>>>>>>>>>>>> been
> > > >>>>>>>>>>>>>> updated.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thank you!
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>> Christos Bisias
> > > >>>
> > > >>>
> ---------------------------------------------------------------------
> > > >>>
> > > >>>>>>>>>>>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
> > > >>>>>>>>>>>> For additional commands,
> e-mail:dev-h...@airflow.apache.org
> > > >>>>>
> > > >>>>>
> > ---------------------------------------------------------------------
> > > >>>>>
> > > >>>>>>>>>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
> > > >>>>>>>>>> For additional commands, e-mail:dev-h...@airflow.apache.org
> > > >
> > > > ---------------------------------------------------------------------
> > > > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
> > > > For additional commands, e-mail: dev-h...@airflow.apache.org
> > > >
> > >
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
> > > For additional commands, e-mail: dev-h...@airflow.apache.org
> > >
> > >
> >
>

Reply via email to