We also have a dag with dynamic task mapping that can grow immensely.
I've been looking at https://github.com/apache/airflow/pull/53492.
My main issue and the topic of this thread, has been that the scheduler
does unnecessary work that leads to decreased throughput. My solution has
been to limit the results of the query to the dag cap of active tasks that
the user has defined.
The patch is more focused on the available pool slots. I get the idea that
if we can only examine and queue as many tasks as available slots, then we
will be efficiently utilizing the available slots to the max, the
throughput will increase and my issue will be solved as well.
IMO, the approach on the patch isn't easily maintainable. Most of the
calculations are performed by SQL in a huge query.
It would be my preference to have many smaller queries and do part of the
calculations in python. This will be easier to understand, maintain and
debug in the future. Also, it will be easier to unit test.
On Tue, Aug 5, 2025 at 10:20 PM Jarek Potiuk <ja...@potiuk.com> wrote:
Just a comment here - I am also not opposed as well if optimizations will
be implemented without impacting the more "regular"cases. And -
important -
without adding huge complexity.
The SQL queries I saw in recent PRs and discussions look both "smart" and
"scary" at the same time. Optimizations like that tend to lead to
obfuscated, difficult to understand and reason code and "smart"
solutions -
sometimes "too smart". And when it ends up with one or two people only
being able to debug and fix problems connected with those, things become
a
little hairy. So whatever we do there, it **must** be not only "smart"
but
also easy to read and well tested - so that anyone can run the tests
easily
and reproduce potential failure cases.
And yes I know I am writing this as someone who - for years was the only
one to understand our complex CI setup. But I think over the last two
years
we are definitely going into, simpler, easier to understand setup and we
have more people on board who know how to deal with it and I think that
is
a very good direction we are taking :). And I am sure that when I go for
my
planned 3 weeks holidays before the summit, everything will work as
smoothly as when I am here - at least.
Also I think there is quite a difference (when it comes to scheduling)
when
you have mapped tasks versus "regular tasks". I think Airflow even
currently behaves rather differently in those two different cases, and
also
it has a well-thought and optimized UI experience to handle thousands of
them. Also the work of David Blain on Lazy Expandable Task Mapping will
push the boundaries of what is possible there as well:
https://github.com/apache/airflow/pull/51391. Even if we solve
scheduling
optimization - the UI and ability to monitor such huge Dags is still
likely
not something our UI was designed for.
And I am fully on board with "splitting to even smaller pieces" and
"modularizing" things - and "modularizing and splitting big Dags into
smaller Dags" feels like precisely what should be done. And I think it
would be a nice idea to try it and follow and see if you can't achieve
the
same results without adding complexity.
J.
On Tue, Aug 5, 2025 at 8:47 PM Ash Berlin-Taylor <a...@apache.org> wrote:
Yeah dynamic task mapping is a good case where you could easily end up
with thousands of tasksof in a dag.
As I like to say, Airflow is a broad church and if we’re can reasonably
support diverse workloads without impacting others (either the
workloads
out our available to support and maintain etc) then I’m all for it.
In addition to your two items I’d like to add
3. That it doesn’t increase the db’s CPU disproportionally to the
increased task throughput
On 5 Aug 2025, at 19:14, asquator <asqua...@proton.me.invalid>
wrote:
I'm glad this issue finally got enough attention and we can move it
forward.
I took a look at @Christos's patch and it makes sense overall, it's
fine
for the specific problem they experienced with max_active_tasks limit.
For those unfamiliar with the core problem, the bug has a plenty of
variations where starvation happens due to different concurrency
limitations being nearly satiated, which creates the opportunity for
the
scheduler to pull many tasks and schedule none of them.
To reproduce this bug, you need two conditions:
1. Many tasks (>> max_tis) belonging to one "pool", where "pool" is
some
concurrency limitation of Airflow. Note that originally the bug was
discovered in context of task pools (see
https://github.com/apache/airflow/issues/45636).
2. The tasks are short enough (or the parallelism is large enough)
for
the tasks from the nearly starved pool to free some slots in every
scheduler's iteration.
When we discovered a bug that starved our less prioritized pool, even
when the most prioritized pool was almost full (thanks to @nevcohen),
we
wanted to implement a similar patch @Christos suggested above, but for
pools. But then we realized this issue can arise due to limits
different
from task pools, including:
max_active_tasks
max_active_tis_per_dag
max_active_tis_per_dagrun
So we were able to predict the forecoming bug reports for different
kinds of starvation, and we started working on the most general
solution
which is the topic of this discussion.
I want to also answer @potiuk regarding "why you need such large
DAGs",
but I will be brief.
Airflow is an advanced tool for scheduling large data operations, and
over the years it has pushed to production many features that lead to
organizations writing DAGs that contain thousands of tasks. Most
prominent
one is dynamic task mapping. This feature made us realize we can
implement
a batching work queue pattern and create a task for every unit we have
to
process, say it's a file in a specific folder, a path in the
filesystem,
a
pointer to some data stored in object storage, etc. We like to think in
terms of splitting the work into many tasks. Is it good? I don't know,
but
Airflow has already stepped onto this path, and we have to make it
technologically possible (if we can).
Nevertheless, even if such DAGs are considered too big and splitting
them is a good idea (though you still have nothing to do with mapped
tasks
- we create tens of thousands of them sometimes and expect them to be
processed in parallel), this issue does not only address the described
case, but many others, including prioritized pools, mapped tasks or
max_active_runs starvation on large backfills.
The only part that's missing now is measuring query time (static
benchmarks) and measuring overall scheduling metrics in production
workloads (dynamic benchmarks).
We're working hard on this crucial part now.
We'd be happy to have any assistance from the community as regard to
the
dynamic benchmarks, because every workload is different and it's pretty
difficult to simulate the general case in such a hard-to-reproduce
issue.
We have to make sure that:
1. In a busy workload, the new logic boosts the scheduler's
throughput.
2. In a light workload, the nested windowing doesn't significantly
slow
down the computation.
On Monday, August 4th, 2025 at 9:00 PM, Christos Bisias <
christos...@gmail.com> wrote:
I created a draft PR for anyone interested to take a look at the
code
https://github.com/apache/airflow/pull/54103
I was able to demonstrate the issue in the unit test with much fewer
tasks.
All we need is the tasks brought back by the db query to belong to
the
same
dag_run or dag. This can happen when the first SCHEDULED tasks in
line
to
be examined are at least as many as the number of the tis per query.
On Mon, Aug 4, 2025 at 8:37 PM Daniel Standish
daniel.stand...@astronomer.io.invalid wrote:
The configurability was my recommendation for
https://github.com/apache/airflow/pull/53492
Given the fact that this change is at the heart of Airflow I think
the
changes should be experimental where users can switch between
different
strategies/modes of the scheduler.
If and when we have enough data to support that specific option is
always
better we can make decisions accordingly.
Yeah I guess looking at #53492
https://github.com/apache/airflow/pull/53492 it does seem too
risky
to
just change the behavior in airflow without releasing it first as
experimental.
I doubt we can get sufficient real world testing without doing
that.
So if this is introduced, I think it should just be introduced as
experimental optimization. And the intention would be that
ultimately
there will only be one scheduling mode, and this is just a way to
test
this
out more widely. Not that we are intending to have two scheduling
code
paths on a permanent basis.
WDYT
On Mon, Aug 4, 2025 at 12:50 AM Christos Bisias
christos...@gmail.com
wrote:
So my question to you is: is it impossible, or just demanding or
difficult
to split your Dags into smaller dags connected with asset aware
scheduling?
Jarek, I'm going to discuss this with the team and I will get you
an
answer
on that.
I've shared this again on the thread
https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
I haven't created a PR because this is just a POC and it's also
setting a
limit per dag. I would like to get feedback on whether it's better
to
make
it per dag or per dag_run.
I can create a draft PR if that's helpful and makes it easier to
add
comments.
Let me try to explain the issue better. From a high level
overview,
the
scheduler
1. moves tasks to SCHEDULED
2. runs a query to fetch SCHEDULED tasks from the db
3. examines the tasks
4. moves tasks to QUEUED
I'm focusing on step 2 and afterwards. The current code doesn't
take
into
account the max_active_tasks_per_dag. When it runs the query it
fetches
up to max_tis which is determined here
<
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L697-L705
.
For example,
- if the query number is 32
- all 32 tasks in line belong to the same dag, dag1
- we are not concerned how the scheduler picks them
- dag1 has max_active_tasks set to 5
The current code will
- get 32 tasks from dag1
- start examining them one by one
- once 5 are moved to QUEUED, it won't stop, it will keep
examining
the other 27 but won't be able to queue them because it has
reached
the
limit
In the next loop, although we have reached the maximum number of
tasks
for
dag1, the query will fetch again 32 tasks from dag1 to examine
them
and
try to queue them.
The issue is that it gets more tasks than it can queue from the db
and
then
examines them all.
This all leads to unnecessary processing that builds up and the
more
load
there is on the system, the more the throughput drops for the
scheduler
and
the workers.
What I'm proposing is to adjust the query in step 2, to check the
max_active_tasks_per_dag
run a query to fetch SCHEDULED tasks from the db
If a dag has already reached the maximum number of tasks in active
states,
it will be skipped by the query.
Don't we already stop examining at that point? I guess there's two
things
you might be referring to. One is, which TIs come out of the db
and
into
python, and the other is, what we do in python. Just might be
helpful
to
be clear about the specific enhancements & changes you are
making.
I think that if we adjust the query and fetch the right number of
tasks,
then we won't have to make changes to what is done in python.
On Mon, Aug 4, 2025 at 8:01 AM Daniel Standish
daniel.stand...@astronomer.io.invalid wrote:
@Christos Bisias
If you have a very large dag, and its tasks have been scheduled,
then
the
scheduler will keep examining the tasks for queueing, even if it
has
reached the maximum number of active tasks for that particular
dag.
Once
that fails, then it will move on to examine the scheduled tasks
of
the
next
dag or dag_run in line.
Can you make this a little more precise? There's some protection
against
"starvation" i.e. dag runs recently considered should go to the
back
of
the
line next time.
Maybe you could clarify why / how that's not working / not
optimal
/
how
to
improve.
If there are available slots in the pool and
the max parallelism hasn't been reached yet, then the scheduler
should
stop
processing a dag that has already reached its max capacity of
active
tasks.
If a dag run (or dag) is already at max capacity, it doesn't
really
matter
if there are slots available or parallelism isn't reached --
shouldn't
it
stop anyway?
In addition, the number of scheduled tasks picked for examining,
should
be
capped at the number of max active tasks if that's lower than
the
query
limit. If the active limit is 10 and we already have 5 running,
then
we
can
queue at most 5 tasks. In that case, we shouldn't examine more
than
that.
Don't we already stop examining at that point? I guess there's
two
things
you might be referring to. One is, which TIs come out of the db
and
into
python, and the other is, what we do in python. Just might be
helpful
to
be clear about the specific enhancements & changes you are
making.
There is already a patch with the changes mentioned above. IMO,
these
changes should be enabled/disabled with a config flag and not by
default
because not everyone has the same needs as us. In our testing,
adding a
limit on the tasks retrieved from the db requires more
processing
on
the
query which actually makes things worse when you have multiple
small
dags.
I would like to see a stronger case made for configurability. Why
make
it
configurable? If the performance is always better, it should not
be
made
configurable. Unless it's merely released as an opt-in
experimental
feature. If it is worse in some profiles, let's be clear about
that.
I did not read anything after `Here is a simple test case that
makes
the benefits of the improvements noticeable` because, it seemed rather
long
winded detail about a test
case. A higher level summary might be helpful to your audience.
Is
there
a PR with your optimization. You wrote "there is a patch" but did
not,
unless I miss something, share it. I would take a look if you
share
it
though.
Thanks
On Sun, Aug 3, 2025 at 5:08 PM Daniel Standish <
daniel.stand...@astronomer.io> wrote:
Yes Ui is another part of this.
At some point the grid and graph views completely stop making
sense
for
that volume, and another type of view would be required both for
usability
and performance
On Sun, Aug 3, 2025 at 11:04 AM Jens Scheffler
j_scheff...@gmx.de.invalid
wrote:
Hi,
We also have a current demand to have a workflow to execute 10k
to
100k
tasks. Together with @AutomationDev85 we are working on a local
solution
because we also saw problems in the Scheduler that are not
linearly
scaling. And for sure not easy to be fixed. But from our
investigation
also there are other problems to be considered like UI will
also
potentially have problems.
I am a bit sceptic that PR 49160 completely fixes the problems
mentioned
here and made some comments. I do not want to stop enthusiasm
to
fix
and
improve things but the Scheduler is quite complex and changed
need
to
be
made with care.
Actually I like the patch
https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
as it just adds some limit preventing scheduler to focus on
only
one
run. But complexity is a bit big for a "patch" :-D
I'd also propose atm the way that Jarek described and split-up
the
Dag
into multiple parts (divide and conquer) for the moment.
Otherwise if there is a concrete demand on such large Dags...
we
maybe
need rather a broader initiative if we want to ensure 10k,
100k,
1M?
tasks are supported per Dag. Because depending on the magnitude
we
strive for different approaches are needed.
Jens
On 03.08.25 16:33, Daniel Standish wrote:
Definitely an area of the scheduler with some opportunity for
performance
improvement.
I would just mention that, you should also attempt to include
some
performance testing at load / scale because, window functions
are
going
to
be more expensive.
What happens when you have many dags, many historical dag
runs &
TIs,
lots
of stuff running concurrently. You need to be mindful of the
overall
impact of such a change, and not look only at the time spent
on
scheduling
this particular dag.
I did not look at the PRs yet, maybe you've covered this, but,
it's
important.
On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias<
christos...@gmail.com>
wrote:
I'm going to review the PR code and test it more thoroughly
before
leaving
a comment.
This is my code for reference
https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
The current version is setting a limit per dag, across all
dag_runs.
Please correct me if I'm wrong, but the PR looks like it's
changing
the way
that tasks are prioritized to avoid starvation. If that's the
case,
I'm not
sure that this is the same issue. My proposal is that, if we
have
reached
the max resources assigned to a dag, then stop processing its
tasks
and
move on to the next one. I'm not changing how or which tasks
are
picked.
On Sun, Aug 3, 2025 at 3:23 PM asquator<asqua...@proton.me
.invalid>
wrote:
Thank you for the feedback.
Please, describe the case with failing limit checks in the
PR
(DAG's
parameters and it's tasks' parameters and what fails to be
checked)
and
we'll try to fix it ASAP before you can test it again. Let's
continue
the
PR-related discussion in the PR itself.
On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias <
christos...@gmail.com> wrote:
Thank you for bringing this PR to my attention.
I haven't studied the code but I ran a quick test on the
branch
and
this
completely ignores the limit on scheduled tasks per dag or
dag_run.
It
grabbed 70 tasks from the first dag and then moved all 70
to
QUEUED
without
any further checks.
This is how I tested it
https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1
On Sun, Aug 3, 2025 at 1:44 PM asquatorasqua...@proton.me
.invalid
wrote:
Hello,
This is a known issue stemming from the optimistic
scheduling
strategy
used in Airflow. We do address this in the above-mentioned
PR. I
want
to
note that there are many cases where this problem may
appear—it
was
originally detected with pools, but we are striving to fix
it
in
all
cases,
such as the one described here with
max_active_tis_per_dag,
by
switching to
pessimistic scheduling with SQL window functions. While
the
current
strategy simply pulls the max_tis tasks and drops the ones
that
do
not
meet
the constraints, the new strategy will pull only the tasks
that
are
actually ready to be scheduled and that comply with all
concurrency
limits.
It would be very helpful for pushing this change to
production
if
you
could assist us in alpha-testing it.
See also:
https://github.com/apache/airflow/discussions/49160
Sent with Proton Mail secure email.
On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif
elad...@apache.org
wrote:
i think most of your issues will be addressed by
https://github.com/apache/airflow/pull/53492
The PR code can be tested with Breeze so you can set it
up
and
see
if it
solves the problem this will also help with confirming
it's
the
right
fix.
On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias
christos...@gmail.com
wrote:
Hello,
The scheduler is very efficient when running a large
amount
of
dags
with up
to 1000 tasks each. But in our case, we have dags with
as
many
as
10.000
tasks. And in that scenario the scheduler and worker
throughput
drops
significantly. Even if you have 1 such large dag with
scheduled
tasks,
the
performance hit becomes noticeable.
We did some digging and we found that the issue comes
from
the
scheduler's
_executable_task_instances_to_queued
<
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647
method.
In particular with the db query here
<
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375
and
examining the results here
<
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425
.
If you have a very large dag, and its tasks have been
scheduled,
then
the
scheduler will keep examining the tasks for queueing,
even
if
it
has
reached the maximum number of active tasks for that
particular
dag.
Once
that fails, then it will move on to examine the
scheduled
tasks
of
the
next
dag or dag_run in line.
This is inefficient and causes the throughput of the
scheduler
and
the
workers to drop significantly. If there are available
slots
in
the
pool and
the max parallelism hasn't been reached yet, then the
scheduler
should
stop
processing a dag that has already reached its max
capacity
of
active
tasks.
In addition, the number of scheduled tasks picked for
examining,
should be
capped at the number of max active tasks if that's lower
than
the
query
limit. If the active limit is 10 and we already have 5
running,
then
we can
queue at most 5 tasks. In that case, we shouldn't
examine
more
than
that.
There is already a patch with the changes mentioned
above.
IMO,
these
changes should be enabled/disabled with a config flag
and
not
by
default
because not everyone has the same needs as us. In our
testing,
adding a
limit on the tasks retrieved from the db requires more
processing
on
the
query which actually makes things worse when you have
multiple
small
dags.
Here is a simple test case that makes the benefits of
the
improvements
noticeable
- we have 3 dags with thousands of tasks each
- for simplicity let's have 1 dag_run per dag
- triggering them takes some time and due to that, the
FIFO
order
of
the
tasks is very clear
- e.g. 1000 tasks from dag1 were scheduled first and
then
200
tasks
from dag2 etc.
- the executor has parallelism=100 and
slots_available=100
which
means
that it can run up to 100 tasks concurrently
- max_active_tasks_per_dag is 4 which means that we can
have
up
to
4
tasks running per dag.
- For 3 dags, it means that we can run up to 12 tasks at
the
same
time (4 tasks from each dag)
- max tis per query are set to 32, meaning that we can
examine
up
to 32
scheduled tasks if there are available pool slots
If we were to run the scheduler loop repeatedly until it
queues
12
tasks
and test the part that examines the scheduled tasks and
queues
them,
then
- with the query limit
- 1 iteration, total time 0.05
- During the iteration
- we have parallelism 100, available slots 100 and query
limit
32
which means that it will examine up to 32 scheduled
tasks
- it can queue up to 100 tasks
- examines 12 tasks (instead of 32)
- 4 tasks from dag1, reached max for the dag
- 4 tasks from dag2, reached max for the dag
- and 4 tasks from dag3, reached max for the dag
- queues 4 from dag1, reaches max for the dag and moves
on
- queues 4 from dag2, reaches max for the dag and moves
on
- queues 4 from dag3, reaches max for the dag and moves
on
- stops queueing because we have reached the maximum per
dag,
although there are slots for more tasks
- iteration finishes
- without
- 3 iterations, total time 0.29
- During iteration 1
- Examines 32 tasks, all from dag1 (due to FIFO)
- queues 4 from dag1 and tries to queue the other 28 but
fails
- During iteration 2
- examines the next 32 tasks from dag1
- it can't queue any of them because it has reached the
max
for
dag1, since the previous 4 are still running
- examines 32 tasks from dag2
- queues 4 from dag2 and tries to queue the other 28 but
fails
- During iteration 3
- examines the next 32 tasks from dag1, same tasks that
were
examined in iteration 2
- it can't queue any of them because it has reached the
max
for
dag1 and the first 4 are still running
- examines 32 tasks from dag2 , can't queue any of them
because
it has reached max for dag2 as well
- examines 32 tasks from dag3
- queues 4 from dag3 and tries to queue the other 28 but
fails
I used very low values for all the configs so that I can
make
the
point
clear and easy to understand. If we increase them, then
this
patch
also
makes the task selection more fair and the resource
distribution
more
even.
I would appreciate it if anyone familiar with the
scheduler's
code
can
confirm this and also provide any feedback.
Additionally, I have one question regarding the query
limit.
Should it
be
per dag_run or per dag? I've noticed that
max_active_tasks_per_dag
has
been changed to provide a value per dag_run but the docs
haven't
been
updated.
Thank you!
Regards,
Christos Bisias
---------------------------------------------------------------------
To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
For additional commands,
e-mail:dev-h...@airflow.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
For additional commands, e-mail:dev-h...@airflow.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
For additional commands, e-mail: dev-h...@airflow.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
For additional commands, e-mail: dev-h...@airflow.apache.org