@Christos Bisias

If you have a very large dag, and its tasks have been scheduled, then the
> scheduler will keep examining the tasks for queueing, even if it has
> reached the maximum number of active tasks for that particular dag. Once
> that fails, then it will move on to examine the scheduled tasks of the next
> dag or dag_run in line.


Can you make this a little more precise?  There's some protection against
"starvation" i.e. dag runs recently considered should go to the back of the
line next time.

Maybe you could clarify why / how that's not working / not optimal / how to
improve.

If there are available slots in the pool and
> the max parallelism hasn't been reached yet, then the scheduler should stop
> processing a dag that has already reached its max capacity of active tasks.


If a dag run (or dag) is already at max capacity, it doesn't really matter
if there are slots available or parallelism isn't reached -- shouldn't it
stop anyway?

In addition, the number of scheduled tasks picked for examining, should be
> capped at the number of max active tasks if that's lower than the query
> limit. If the active limit is 10 and we already have 5 running, then we can
> queue at most 5 tasks. In that case, we shouldn't examine more than that.


Don't we already stop examining at that point?   I guess there's two things
you might be referring to.  One is, which TIs come out of the db and into
python, and the other is, what we do in python.  Just might be helpful to
be clear about the specific enhancements & changes you are making.

There is already a patch with the changes mentioned above. IMO, these
> changes should be enabled/disabled with a config flag and not by default
> because not everyone has the same needs as us. In our testing, adding a
> limit on the tasks retrieved from the db requires more processing on the
> query which actually makes things worse when you have multiple small dags.


I would like to see a stronger case made for configurability.  Why make it
configurable?  If the performance is always better, it should not be made
configurable.  Unless it's merely released as an opt-in experimental
feature.  If it is worse in some profiles, let's be clear about that.

I did not read anything after `Here is a simple test case that makes the
benefits of the improvements
noticeable` because, it seemed rather long winded detail about a test
case.  A higher level summary might be helpful to your audience.  Is there
a PR with your optimization.  You wrote "there is a patch" but did not,
unless I miss something, share it.  I would take a look if you share it
though.

Thanks







On Sun, Aug 3, 2025 at 5:08 PM Daniel Standish <
daniel.stand...@astronomer.io> wrote:

> Yes Ui is another part of this.
>
> At some point the grid and graph views completely stop making sense for
> that volume, and another type of view would be required both for usability
> and performance
>
>
>
> On Sun, Aug 3, 2025 at 11:04 AM Jens Scheffler <j_scheff...@gmx.de.invalid>
> wrote:
>
>> Hi,
>>
>> We also have a current demand to have a workflow to execute 10k to 100k
>> tasks. Together with @AutomationDev85 we are working on a local solution
>> because we also saw problems in the Scheduler that are not linearly
>> scaling. And for sure not easy to be fixed. But from our investigation
>> also there are other problems to be considered like UI will also
>> potentially have problems.
>>
>> I am a bit sceptic that PR 49160 completely fixes the problems mentioned
>> here and made some comments. I do not want to stop enthusiasm to fix and
>> improve things but the Scheduler is quite complex and changed need to be
>> made with care.
>>
>> Actually I like the patch
>>
>> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
>> as it just adds some limit preventing scheduler to focus on only one
>> run. But complexity is a bit big for a "patch" :-D
>>
>> I'd also propose atm the way that Jarek described and split-up the Dag
>> into multiple parts (divide and conquer) for the moment.
>>
>> Otherwise if there is a concrete demand on such large Dags... we maybe
>> need rather a broader initiative if we want to ensure 10k, 100k, 1M?
>> tasks are supported per Dag. Because depending on the magnitude we
>> strive for different approaches are needed.
>>
>> Jens
>>
>> On 03.08.25 16:33, Daniel Standish wrote:
>> > Definitely an area of the scheduler with some opportunity for
>> performance
>> > improvement.
>> >
>> > I would just mention that, you should also attempt to include some
>> > performance testing at load / scale because, window functions are going
>> to
>> > be more expensive.
>> >
>> > What happens when you have many dags, many historical dag runs & TIs,
>> lots
>> > of stuff running concurrently.  You need to be mindful of the overall
>> > impact of such a change, and not look only at the time spent on
>> scheduling
>> > this particular dag.
>> >
>> > I did not look at the PRs yet, maybe you've covered this, but, it's
>> > important.
>> >
>> >
>> > On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias<christos...@gmail.com>
>> > wrote:
>> >
>> >> I'm going to review the PR code and test it more thoroughly before
>> leaving
>> >> a comment.
>> >>
>> >> This is my code for reference
>> >>
>> >>
>> >>
>> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
>> >>
>> >> The current version is setting a limit per dag, across all dag_runs.
>> >>
>> >> Please correct me if I'm wrong, but the PR looks like it's changing
>> the way
>> >> that tasks are prioritized to avoid starvation. If that's the case,
>> I'm not
>> >> sure that this is the same issue. My proposal is that, if we have
>> reached
>> >> the max resources assigned to a dag, then stop processing its tasks and
>> >> move on to the next one. I'm not changing how or which tasks are
>> picked.
>> >>
>> >> On Sun, Aug 3, 2025 at 3:23 PM asquator<asqua...@proton.me.invalid>
>> >> wrote:
>> >>
>> >>> Thank you for the feedback.
>> >>> Please, describe the case with failing limit checks in the PR (DAG's
>> >>> parameters and it's tasks' parameters and what fails to be checked)
>> and
>> >>> we'll try to fix it ASAP before you can test it again. Let's continue
>> the
>> >>> PR-related discussion in the PR itself.
>> >>>
>> >>> On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias <
>> >>> christos...@gmail.com> wrote:
>> >>>
>> >>>> Thank you for bringing this PR to my attention.
>> >>>>
>> >>>> I haven't studied the code but I ran a quick test on the branch and
>> >> this
>> >>>> completely ignores the limit on scheduled tasks per dag or dag_run.
>> It
>> >>>> grabbed 70 tasks from the first dag and then moved all 70 to QUEUED
>> >>> without
>> >>>> any further checks.
>> >>>>
>> >>>> This is how I tested it
>> >>>>
>> >>
>> https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1
>> >>>> On Sun, Aug 3, 2025 at 1:44 PM asquatorasqua...@proton.me.invalid
>> >>> wrote:
>> >>>>> Hello,
>> >>>>>
>> >>>>> This is a known issue stemming from the optimistic scheduling
>> >> strategy
>> >>>>> used in Airflow. We do address this in the above-mentioned PR. I
>> want
>> >>> to
>> >>>>> note that there are many cases where this problem may appear—it was
>> >>>>> originally detected with pools, but we are striving to fix it in all
>> >>> cases,
>> >>>>> such as the one described here with max_active_tis_per_dag, by
>> >>> switching to
>> >>>>> pessimistic scheduling with SQL window functions. While the current
>> >>>>> strategy simply pulls the max_tis tasks and drops the ones that do
>> >> not
>> >>> meet
>> >>>>> the constraints, the new strategy will pull only the tasks that are
>> >>>>> actually ready to be scheduled and that comply with all concurrency
>> >>> limits.
>> >>>>> It would be very helpful for pushing this change to production if
>> you
>> >>>>> could assist us in alpha-testing it.
>> >>>>>
>> >>>>> See also:
>> >>>>> https://github.com/apache/airflow/discussions/49160
>> >>>>>
>> >>>>> Sent with Proton Mail secure email.
>> >>>>>
>> >>>>> On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif
>> >> elad...@apache.org
>> >>>>> wrote:
>> >>>>>
>> >>>>>> i think most of your issues will be addressed by
>> >>>>>> https://github.com/apache/airflow/pull/53492
>> >>>>>> The PR code can be tested with Breeze so you can set it up and see
>> >>> if it
>> >>>>>> solves the problem this will also help with confirming it's the
>> >> right
>> >>>>>> fix.
>> >>>>>>
>> >>>>>> On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias
>> >>> christos...@gmail.com
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Hello,
>> >>>>>>>
>> >>>>>>> The scheduler is very efficient when running a large amount of
>> >> dags
>> >>>>>>> with up
>> >>>>>>> to 1000 tasks each. But in our case, we have dags with as many as
>> >>>>>>> 10.000
>> >>>>>>> tasks. And in that scenario the scheduler and worker throughput
>> >>> drops
>> >>>>>>> significantly. Even if you have 1 such large dag with scheduled
>> >>> tasks,
>> >>>>>>> the
>> >>>>>>> performance hit becomes noticeable.
>> >>>>>>>
>> >>>>>>> We did some digging and we found that the issue comes from the
>> >>>>>>> scheduler's
>> >>>>>>> _executable_task_instances_to_queued
>> >>>>>>> <
>> >>>>>
>> >>
>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647
>> >>>>>>> method.
>> >>>>>>> In particular with the db query here
>> >>>>>>> <
>> >>>>>
>> >>
>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375
>> >>>>>>> and
>> >>>>>>> examining the results here
>> >>>>>>> <
>> >>>>>
>> >>
>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425
>> >>>>>>> .
>> >>>>>>>
>> >>>>>>> If you have a very large dag, and its tasks have been scheduled,
>> >>> then
>> >>>>>>> the
>> >>>>>>> scheduler will keep examining the tasks for queueing, even if it
>> >>> has
>> >>>>>>> reached the maximum number of active tasks for that particular
>> >> dag.
>> >>>>>>> Once
>> >>>>>>> that fails, then it will move on to examine the scheduled tasks
>> >> of
>> >>> the
>> >>>>>>> next
>> >>>>>>> dag or dag_run in line.
>> >>>>>>>
>> >>>>>>> This is inefficient and causes the throughput of the scheduler
>> >> and
>> >>> the
>> >>>>>>> workers to drop significantly. If there are available slots in
>> >> the
>> >>>>>>> pool and
>> >>>>>>> the max parallelism hasn't been reached yet, then the scheduler
>> >>> should
>> >>>>>>> stop
>> >>>>>>> processing a dag that has already reached its max capacity of
>> >>> active
>> >>>>>>> tasks.
>> >>>>>>>
>> >>>>>>> In addition, the number of scheduled tasks picked for examining,
>> >>>>>>> should be
>> >>>>>>> capped at the number of max active tasks if that's lower than the
>> >>> query
>> >>>>>>> limit. If the active limit is 10 and we already have 5 running,
>> >>> then
>> >>>>>>> we can
>> >>>>>>> queue at most 5 tasks. In that case, we shouldn't examine more
>> >> than
>> >>>>>>> that.
>> >>>>>>>
>> >>>>>>> There is already a patch with the changes mentioned above. IMO,
>> >>> these
>> >>>>>>> changes should be enabled/disabled with a config flag and not by
>> >>>>>>> default
>> >>>>>>> because not everyone has the same needs as us. In our testing,
>> >>> adding a
>> >>>>>>> limit on the tasks retrieved from the db requires more processing
>> >>> on
>> >>>>>>> the
>> >>>>>>> query which actually makes things worse when you have multiple
>> >>> small
>> >>>>>>> dags.
>> >>>>>>>
>> >>>>>>> Here is a simple test case that makes the benefits of the
>> >>> improvements
>> >>>>>>> noticeable
>> >>>>>>>
>> >>>>>>> - we have 3 dags with thousands of tasks each
>> >>>>>>> - for simplicity let's have 1 dag_run per dag
>> >>>>>>> - triggering them takes some time and due to that, the FIFO order
>> >>> of
>> >>>>>>> the
>> >>>>>>> tasks is very clear
>> >>>>>>> - e.g. 1000 tasks from dag1 were scheduled first and then 200
>> >> tasks
>> >>>>>>> from dag2 etc.
>> >>>>>>> - the executor has parallelism=100 and slots_available=100 which
>> >>> means
>> >>>>>>> that it can run up to 100 tasks concurrently
>> >>>>>>> - max_active_tasks_per_dag is 4 which means that we can have up
>> >> to
>> >>> 4
>> >>>>>>> tasks running per dag.
>> >>>>>>> - For 3 dags, it means that we can run up to 12 tasks at the same
>> >>>>>>> time (4 tasks from each dag)
>> >>>>>>> - max tis per query are set to 32, meaning that we can examine up
>> >>> to 32
>> >>>>>>> scheduled tasks if there are available pool slots
>> >>>>>>>
>> >>>>>>> If we were to run the scheduler loop repeatedly until it queues
>> >> 12
>> >>>>>>> tasks
>> >>>>>>> and test the part that examines the scheduled tasks and queues
>> >>> them,
>> >>>>>>> then
>> >>>>>>>
>> >>>>>>> - with the query limit
>> >>>>>>> - 1 iteration, total time 0.05
>> >>>>>>> - During the iteration
>> >>>>>>> - we have parallelism 100, available slots 100 and query limit 32
>> >>>>>>> which means that it will examine up to 32 scheduled tasks
>> >>>>>>> - it can queue up to 100 tasks
>> >>>>>>> - examines 12 tasks (instead of 32)
>> >>>>>>> - 4 tasks from dag1, reached max for the dag
>> >>>>>>> - 4 tasks from dag2, reached max for the dag
>> >>>>>>> - and 4 tasks from dag3, reached max for the dag
>> >>>>>>> - queues 4 from dag1, reaches max for the dag and moves on
>> >>>>>>> - queues 4 from dag2, reaches max for the dag and moves on
>> >>>>>>> - queues 4 from dag3, reaches max for the dag and moves on
>> >>>>>>> - stops queueing because we have reached the maximum per dag,
>> >>>>>>> although there are slots for more tasks
>> >>>>>>> - iteration finishes
>> >>>>>>> - without
>> >>>>>>> - 3 iterations, total time 0.29
>> >>>>>>> - During iteration 1
>> >>>>>>> - Examines 32 tasks, all from dag1 (due to FIFO)
>> >>>>>>> - queues 4 from dag1 and tries to queue the other 28 but fails
>> >>>>>>> - During iteration 2
>> >>>>>>> - examines the next 32 tasks from dag1
>> >>>>>>> - it can't queue any of them because it has reached the max for
>> >>>>>>> dag1, since the previous 4 are still running
>> >>>>>>> - examines 32 tasks from dag2
>> >>>>>>> - queues 4 from dag2 and tries to queue the other 28 but fails
>> >>>>>>> - During iteration 3
>> >>>>>>> - examines the next 32 tasks from dag1, same tasks that were
>> >>>>>>> examined in iteration 2
>> >>>>>>> - it can't queue any of them because it has reached the max for
>> >>>>>>> dag1 and the first 4 are still running
>> >>>>>>> - examines 32 tasks from dag2 , can't queue any of them because
>> >>>>>>> it has reached max for dag2 as well
>> >>>>>>> - examines 32 tasks from dag3
>> >>>>>>> - queues 4 from dag3 and tries to queue the other 28 but fails
>> >>>>>>>
>> >>>>>>> I used very low values for all the configs so that I can make the
>> >>> point
>> >>>>>>> clear and easy to understand. If we increase them, then this
>> >> patch
>> >>> also
>> >>>>>>> makes the task selection more fair and the resource distribution
>> >>> more
>> >>>>>>> even.
>> >>>>>>>
>> >>>>>>> I would appreciate it if anyone familiar with the scheduler's
>> >> code
>> >>> can
>> >>>>>>> confirm this and also provide any feedback.
>> >>>>>>>
>> >>>>>>> Additionally, I have one question regarding the query limit.
>> >>> Should it
>> >>>>>>> be
>> >>>>>>> per dag_run or per dag? I've noticed that
>> >> max_active_tasks_per_dag
>> >>> has
>> >>>>>>> been changed to provide a value per dag_run but the docs haven't
>> >>> been
>> >>>>>>> updated.
>> >>>>>>>
>> >>>>>>> Thank you!
>> >>>>>>>
>> >>>>>>> Regards,
>> >>>>>>> Christos Bisias
>> >>>>>
>> ---------------------------------------------------------------------
>> >>>>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
>> >>>>> For additional commands, e-mail:dev-h...@airflow.apache.org
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
>> >>> For additional commands, e-mail:dev-h...@airflow.apache.org
>> >>>
>> >>>
>>
>

Reply via email to