Yeah dynamic task mapping is a good case where you could easily end up with 
thousands of tasksof in a dag. 

As I like to say, Airflow is a broad church and if we’re can reasonably support 
diverse workloads without impacting others (either the workloads out our 
available to support and maintain etc) then I’m all for it. 

In addition to your two items I’d like to add

3. That it doesn’t increase the db’s CPU disproportionally to the increased 
task throughput 

> On 5 Aug 2025, at 19:14, asquator <asqua...@proton.me.invalid> wrote:
> 
> I'm glad this issue finally got enough attention and we can move it forward.
> I took a look at @Christos's patch and it makes sense overall, it's fine for 
> the specific problem they experienced with max_active_tasks limit.
> For those unfamiliar with the core problem, the bug has a plenty of 
> variations where starvation happens due to different concurrency limitations 
> being nearly satiated, which creates the opportunity for the scheduler to 
> pull many tasks and schedule none of them.
> To reproduce this bug, you need two conditions:
> 1. Many tasks (>> max_tis) belonging to one "pool", where "pool" is some 
> concurrency limitation of Airflow. Note that originally the bug was 
> discovered in context of task pools (see 
> https://github.com/apache/airflow/issues/45636).
> 2. The tasks are short enough (or the parallelism is large enough) for the 
> tasks from the nearly starved pool to free some slots in every scheduler's 
> iteration.
> 
> When we discovered a bug that starved our less prioritized pool, even when 
> the most prioritized pool was almost full (thanks to @nevcohen), we wanted to 
> implement a similar patch @Christos suggested above, but for pools. But then 
> we realized this issue can arise due to limits different from task pools, 
> including:
> max_active_tasks
> max_active_tis_per_dag
> max_active_tis_per_dagrun
> 
> So we were able to predict the forecoming bug reports for different kinds of 
> starvation, and we started working on the most general solution which is the 
> topic of this discussion.
> 
> I want to also answer @potiuk regarding "why you need such large DAGs", but I 
> will be brief.
> Airflow is an advanced tool for scheduling large data operations, and over 
> the years it has pushed to production many features that lead to 
> organizations writing DAGs that contain thousands of tasks. Most prominent 
> one is dynamic task mapping. This feature made us realize we can implement a 
> batching work queue pattern and create a task for every unit we have to 
> process, say it's a file in a specific folder, a path in the filesystem, a 
> pointer to some data stored in object storage, etc. We like to think in terms 
> of splitting the work into many tasks. Is it good? I don't know, but Airflow 
> has already stepped onto this path, and we have to make it technologically 
> possible (if we can).
> 
> Nevertheless, even if such DAGs are considered too big and splitting them is 
> a good idea (though you still have nothing to do with mapped tasks - we 
> create tens of thousands of them sometimes and expect them to be processed in 
> parallel), this issue does not only address the described case, but many 
> others, including prioritized pools, mapped tasks or max_active_runs 
> starvation on large backfills.
> 
> The only part that's missing now is measuring query time (static benchmarks) 
> and measuring overall scheduling metrics in production workloads (dynamic 
> benchmarks).
> We're working hard on this crucial part now.
> 
> We'd be happy to have any assistance from the community as regard to the 
> dynamic benchmarks, because every workload is different and it's pretty 
> difficult to simulate the general case in such a hard-to-reproduce issue. We 
> have to make sure that:
> 
> 1. In a busy workload, the new logic boosts the scheduler's throughput.
> 2. In a light workload, the nested windowing doesn't significantly slow down 
> the computation.
> 
> 
>> On Monday, August 4th, 2025 at 9:00 PM, Christos Bisias 
>> <christos...@gmail.com> wrote:
>> 
>> I created a draft PR for anyone interested to take a look at the code
>> 
>> https://github.com/apache/airflow/pull/54103
>> 
>> I was able to demonstrate the issue in the unit test with much fewer tasks.
>> All we need is the tasks brought back by the db query to belong to the same
>> dag_run or dag. This can happen when the first SCHEDULED tasks in line to
>> be examined are at least as many as the number of the tis per query.
>> 
>> On Mon, Aug 4, 2025 at 8:37 PM Daniel Standish
>> daniel.stand...@astronomer.io.invalid wrote:
>> 
>>>> The configurability was my recommendation for
>>>> https://github.com/apache/airflow/pull/53492
>>>> Given the fact that this change is at the heart of Airflow I think the
>>>> changes should be experimental where users can switch between different
>>>> strategies/modes of the scheduler.
>>>> If and when we have enough data to support that specific option is always
>>>> better we can make decisions accordingly.
>>> 
>>> Yeah I guess looking at #53492
>>> https://github.com/apache/airflow/pull/53492 it does seem too risky to
>>> just change the behavior in airflow without releasing it first as
>>> experimental.
>>> 
>>> I doubt we can get sufficient real world testing without doing that.
>>> 
>>> So if this is introduced, I think it should just be introduced as
>>> experimental optimization. And the intention would be that ultimately
>>> there will only be one scheduling mode, and this is just a way to test this
>>> out more widely. Not that we are intending to have two scheduling code
>>> paths on a permanent basis.
>>> 
>>> WDYT
>>> 
>>> On Mon, Aug 4, 2025 at 12:50 AM Christos Bisias christos...@gmail.com
>>> wrote:
>>> 
>>>>> So my question to you is: is it impossible, or just demanding or
>>>>> difficult
>>>>> to split your Dags into smaller dags connected with asset aware
>>>>> scheduling?
>>>> 
>>>> Jarek, I'm going to discuss this with the team and I will get you an
>>>> answer
>>>> on that.
>>>> 
>>>> I've shared this again on the thread
>>> 
>>> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
>>> 
>>>> I haven't created a PR because this is just a POC and it's also setting a
>>>> limit per dag. I would like to get feedback on whether it's better to
>>>> make
>>>> it per dag or per dag_run.
>>>> I can create a draft PR if that's helpful and makes it easier to add
>>>> comments.
>>>> 
>>>> Let me try to explain the issue better. From a high level overview, the
>>>> scheduler
>>>> 
>>>> 1. moves tasks to SCHEDULED
>>>> 2. runs a query to fetch SCHEDULED tasks from the db
>>>> 3. examines the tasks
>>>> 4. moves tasks to QUEUED
>>>> 
>>>> I'm focusing on step 2 and afterwards. The current code doesn't take into
>>>> account the max_active_tasks_per_dag. When it runs the query it fetches
>>>> up to max_tis which is determined here
>>>> <
>>> 
>>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L697-L705
>>> 
>>>> .
>>>> 
>>>> For example,
>>>> 
>>>> - if the query number is 32
>>>> - all 32 tasks in line belong to the same dag, dag1
>>>> - we are not concerned how the scheduler picks them
>>>> - dag1 has max_active_tasks set to 5
>>>> 
>>>> The current code will
>>>> 
>>>> - get 32 tasks from dag1
>>>> - start examining them one by one
>>>> - once 5 are moved to QUEUED, it won't stop, it will keep examining
>>>> the other 27 but won't be able to queue them because it has reached
>>>> the
>>>> limit
>>>> 
>>>> In the next loop, although we have reached the maximum number of tasks
>>>> for
>>>> dag1, the query will fetch again 32 tasks from dag1 to examine them
>>>> and
>>>> try to queue them.
>>>> 
>>>> The issue is that it gets more tasks than it can queue from the db and
>>>> then
>>>> examines them all.
>>>> 
>>>> This all leads to unnecessary processing that builds up and the more load
>>>> there is on the system, the more the throughput drops for the scheduler
>>>> and
>>>> the workers.
>>>> 
>>>> What I'm proposing is to adjust the query in step 2, to check the
>>>> max_active_tasks_per_dag
>>>> 
>>>>> run a query to fetch SCHEDULED tasks from the db
>>>> 
>>>> If a dag has already reached the maximum number of tasks in active
>>>> states,
>>>> it will be skipped by the query.
>>>> 
>>>> Don't we already stop examining at that point? I guess there's two
>>>> things
>>>> 
>>>>> you might be referring to. One is, which TIs come out of the db and
>>>>> into
>>>>> python, and the other is, what we do in python. Just might be helpful
>>>>> to
>>>>> be clear about the specific enhancements & changes you are making.
>>>> 
>>>> I think that if we adjust the query and fetch the right number of tasks,
>>>> then we won't have to make changes to what is done in python.
>>>> 
>>>> On Mon, Aug 4, 2025 at 8:01 AM Daniel Standish
>>>> daniel.stand...@astronomer.io.invalid wrote:
>>>> 
>>>>> @Christos Bisias
>>>>> 
>>>>> If you have a very large dag, and its tasks have been scheduled, then
>>>>> the
>>>>> 
>>>>>> scheduler will keep examining the tasks for queueing, even if it has
>>>>>> reached the maximum number of active tasks for that particular dag.
>>>>>> Once
>>>>>> that fails, then it will move on to examine the scheduled tasks of
>>>>>> the
>>>>>> next
>>>>>> dag or dag_run in line.
>>>>> 
>>>>> Can you make this a little more precise? There's some protection
>>>>> against
>>>>> "starvation" i.e. dag runs recently considered should go to the back of
>>>>> the
>>>>> line next time.
>>>>> 
>>>>> Maybe you could clarify why / how that's not working / not optimal /
>>>>> how
>>>>> to
>>>>> improve.
>>>>> 
>>>>> If there are available slots in the pool and
>>>>> 
>>>>>> the max parallelism hasn't been reached yet, then the scheduler
>>>>>> should
>>>>>> stop
>>>>>> processing a dag that has already reached its max capacity of active
>>>>>> tasks.
>>>>> 
>>>>> If a dag run (or dag) is already at max capacity, it doesn't really
>>>>> matter
>>>>> if there are slots available or parallelism isn't reached -- shouldn't
>>>>> it
>>>>> stop anyway?
>>>>> 
>>>>> In addition, the number of scheduled tasks picked for examining, should
>>>>> be
>>>>> 
>>>>>> capped at the number of max active tasks if that's lower than the
>>>>>> query
>>>>>> limit. If the active limit is 10 and we already have 5 running, then
>>>>>> we
>>>>>> can
>>>>>> queue at most 5 tasks. In that case, we shouldn't examine more than
>>>>>> that.
>>>>> 
>>>>> Don't we already stop examining at that point? I guess there's two
>>>>> things
>>>>> you might be referring to. One is, which TIs come out of the db and
>>>>> into
>>>>> python, and the other is, what we do in python. Just might be helpful
>>>>> to
>>>>> be clear about the specific enhancements & changes you are making.
>>>>> 
>>>>> There is already a patch with the changes mentioned above. IMO, these
>>>>> 
>>>>>> changes should be enabled/disabled with a config flag and not by
>>>>>> default
>>>>>> because not everyone has the same needs as us. In our testing,
>>>>>> adding a
>>>>>> limit on the tasks retrieved from the db requires more processing on
>>>>>> the
>>>>>> query which actually makes things worse when you have multiple small
>>>>>> dags.
>>>>> 
>>>>> I would like to see a stronger case made for configurability. Why make
>>>>> it
>>>>> configurable? If the performance is always better, it should not be
>>>>> made
>>>>> configurable. Unless it's merely released as an opt-in experimental
>>>>> feature. If it is worse in some profiles, let's be clear about that.
>>>>> 
>>>>> I did not read anything after `Here is a simple test case that makes the 
>>>>> benefits of the improvements noticeable` because, it seemed rather long 
>>>>> winded detail about a test
>>>>> case. A higher level summary might be helpful to your audience. Is
>>>>> there
>>>>> a PR with your optimization. You wrote "there is a patch" but did not,
>>>>> unless I miss something, share it. I would take a look if you share it
>>>>> though.
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> On Sun, Aug 3, 2025 at 5:08 PM Daniel Standish <
>>>>> daniel.stand...@astronomer.io> wrote:
>>>>> 
>>>>>> Yes Ui is another part of this.
>>>>>> 
>>>>>> At some point the grid and graph views completely stop making sense
>>>>>> for
>>>>>> that volume, and another type of view would be required both for
>>>>>> usability
>>>>>> and performance
>>>>>> 
>>>>>> On Sun, Aug 3, 2025 at 11:04 AM Jens Scheffler
>>>>>> j_scheff...@gmx.de.invalid
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> We also have a current demand to have a workflow to execute 10k to
>>>>>>> 100k
>>>>>>> tasks. Together with @AutomationDev85 we are working on a local
>>>>>>> solution
>>>>>>> because we also saw problems in the Scheduler that are not linearly
>>>>>>> scaling. And for sure not easy to be fixed. But from our
>>>>>>> investigation
>>>>>>> also there are other problems to be considered like UI will also
>>>>>>> potentially have problems.
>>>>>>> 
>>>>>>> I am a bit sceptic that PR 49160 completely fixes the problems
>>>>>>> mentioned
>>>>>>> here and made some comments. I do not want to stop enthusiasm to fix
>>>>>>> and
>>>>>>> improve things but the Scheduler is quite complex and changed need
>>>>>>> to
>>>>>>> be
>>>>>>> made with care.
>>>>>>> 
>>>>>>> Actually I like the patch
>>> 
>>> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
>>> 
>>>>>>> as it just adds some limit preventing scheduler to focus on only one
>>>>>>> run. But complexity is a bit big for a "patch" :-D
>>>>>>> 
>>>>>>> I'd also propose atm the way that Jarek described and split-up the
>>>>>>> Dag
>>>>>>> into multiple parts (divide and conquer) for the moment.
>>>>>>> 
>>>>>>> Otherwise if there is a concrete demand on such large Dags... we
>>>>>>> maybe
>>>>>>> need rather a broader initiative if we want to ensure 10k, 100k, 1M?
>>>>>>> tasks are supported per Dag. Because depending on the magnitude we
>>>>>>> strive for different approaches are needed.
>>>>>>> 
>>>>>>> Jens
>>>>>>> 
>>>>>>> On 03.08.25 16:33, Daniel Standish wrote:
>>>>>>> 
>>>>>>>> Definitely an area of the scheduler with some opportunity for
>>>>>>>> performance
>>>>>>>> improvement.
>>>>>>>> 
>>>>>>>> I would just mention that, you should also attempt to include some
>>>>>>>> performance testing at load / scale because, window functions are
>>>>>>>> going
>>>>>>>> to
>>>>>>>> be more expensive.
>>>>>>>> 
>>>>>>>> What happens when you have many dags, many historical dag runs &
>>>>>>>> TIs,
>>>>>>>> lots
>>>>>>>> of stuff running concurrently. You need to be mindful of the
>>>>>>>> overall
>>>>>>>> impact of such a change, and not look only at the time spent on
>>>>>>>> scheduling
>>>>>>>> this particular dag.
>>>>>>>> 
>>>>>>>> I did not look at the PRs yet, maybe you've covered this, but,
>>>>>>>> it's
>>>>>>>> important.
>>>>>>>> 
>>>>>>>> On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias<
>>>>>>>> christos...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> I'm going to review the PR code and test it more thoroughly
>>>>>>>>> before
>>>>>>>>> leaving
>>>>>>>>> a comment.
>>>>>>>>> 
>>>>>>>>> This is my code for reference
>>> 
>>> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
>>> 
>>>>>>>>> The current version is setting a limit per dag, across all
>>>>>>>>> dag_runs.
>>>>>>>>> 
>>>>>>>>> Please correct me if I'm wrong, but the PR looks like it's
>>>>>>>>> changing
>>>>>>>>> the way
>>>>>>>>> that tasks are prioritized to avoid starvation. If that's the
>>>>>>>>> case,
>>>>>>>>> I'm not
>>>>>>>>> sure that this is the same issue. My proposal is that, if we have
>>>>>>>>> reached
>>>>>>>>> the max resources assigned to a dag, then stop processing its
>>>>>>>>> tasks
>>>>>>>>> and
>>>>>>>>> move on to the next one. I'm not changing how or which tasks are
>>>>>>>>> picked.
>>>>>>>>> 
>>>>>>>>> On Sun, Aug 3, 2025 at 3:23 PM asquator<asqua...@proton.me
>>>>>>>>> .invalid>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Thank you for the feedback.
>>>>>>>>>> Please, describe the case with failing limit checks in the PR
>>>>>>>>>> (DAG's
>>>>>>>>>> parameters and it's tasks' parameters and what fails to be
>>>>>>>>>> checked)
>>>>>>>>>> and
>>>>>>>>>> we'll try to fix it ASAP before you can test it again. Let's
>>>>>>>>>> continue
>>>>>>>>>> the
>>>>>>>>>> PR-related discussion in the PR itself.
>>>>>>>>>> 
>>>>>>>>>> On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias <
>>>>>>>>>> christos...@gmail.com> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Thank you for bringing this PR to my attention.
>>>>>>>>>>> 
>>>>>>>>>>> I haven't studied the code but I ran a quick test on the branch
>>>>>>>>>>> and
>>>>>>>>>>> this
>>>>>>>>>>> completely ignores the limit on scheduled tasks per dag or
>>>>>>>>>>> dag_run.
>>>>>>>>>>> It
>>>>>>>>>>> grabbed 70 tasks from the first dag and then moved all 70 to
>>>>>>>>>>> QUEUED
>>>>>>>>>>> without
>>>>>>>>>>> any further checks.
>>>>>>>>>>> 
>>>>>>>>>>> This is how I tested it
>>> 
>>> https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1
>>> 
>>>>>>>>>>> On Sun, Aug 3, 2025 at 1:44 PM asquatorasqua...@proton.me
>>>>>>>>>>> .invalid
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hello,
>>>>>>>>>>>> 
>>>>>>>>>>>> This is a known issue stemming from the optimistic scheduling
>>>>>>>>>>>> strategy
>>>>>>>>>>>> used in Airflow. We do address this in the above-mentioned
>>>>>>>>>>>> PR. I
>>>>>>>>>>>> want
>>>>>>>>>>>> to
>>>>>>>>>>>> note that there are many cases where this problem may
>>>>>>>>>>>> appear—it
>>>>>>>>>>>> was
>>>>>>>>>>>> originally detected with pools, but we are striving to fix it
>>>>>>>>>>>> in
>>>>>>>>>>>> all
>>>>>>>>>>>> cases,
>>>>>>>>>>>> such as the one described here with max_active_tis_per_dag, by
>>>>>>>>>>>> switching to
>>>>>>>>>>>> pessimistic scheduling with SQL window functions. While the
>>>>>>>>>>>> current
>>>>>>>>>>>> strategy simply pulls the max_tis tasks and drops the ones
>>>>>>>>>>>> that
>>>>>>>>>>>> do
>>>>>>>>>>>> not
>>>>>>>>>>>> meet
>>>>>>>>>>>> the constraints, the new strategy will pull only the tasks
>>>>>>>>>>>> that
>>>>>>>>>>>> are
>>>>>>>>>>>> actually ready to be scheduled and that comply with all
>>>>>>>>>>>> concurrency
>>>>>>>>>>>> limits.
>>>>>>>>>>>> It would be very helpful for pushing this change to production
>>>>>>>>>>>> if
>>>>>>>>>>>> you
>>>>>>>>>>>> could assist us in alpha-testing it.
>>>>>>>>>>>> 
>>>>>>>>>>>> See also:
>>>>>>>>>>>> https://github.com/apache/airflow/discussions/49160
>>>>>>>>>>>> 
>>>>>>>>>>>> Sent with Proton Mail secure email.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif
>>>>>>>>>>>> elad...@apache.org
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> i think most of your issues will be addressed by
>>>>>>>>>>>>> https://github.com/apache/airflow/pull/53492
>>>>>>>>>>>>> The PR code can be tested with Breeze so you can set it up
>>>>>>>>>>>>> and
>>>>>>>>>>>>> see
>>>>>>>>>>>>> if it
>>>>>>>>>>>>> solves the problem this will also help with confirming it's
>>>>>>>>>>>>> the
>>>>>>>>>>>>> right
>>>>>>>>>>>>> fix.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias
>>>>>>>>>>>>> christos...@gmail.com
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> The scheduler is very efficient when running a large amount
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> dags
>>>>>>>>>>>>>> with up
>>>>>>>>>>>>>> to 1000 tasks each. But in our case, we have dags with as
>>>>>>>>>>>>>> many
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>> 10.000
>>>>>>>>>>>>>> tasks. And in that scenario the scheduler and worker
>>>>>>>>>>>>>> throughput
>>>>>>>>>>>>>> drops
>>>>>>>>>>>>>> significantly. Even if you have 1 such large dag with
>>>>>>>>>>>>>> scheduled
>>>>>>>>>>>>>> tasks,
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> performance hit becomes noticeable.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> We did some digging and we found that the issue comes from
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> scheduler's
>>>>>>>>>>>>>> _executable_task_instances_to_queued
>>>>>>>>>>>>>> <
>>> 
>>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647
>>> 
>>>>>>>>>>>>>> method.
>>>>>>>>>>>>>> In particular with the db query here
>>>>>>>>>>>>>> <
>>> 
>>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375
>>> 
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>> examining the results here
>>>>>>>>>>>>>> <
>>> 
>>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425
>>> 
>>>>>>>>>>>>>> .
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> If you have a very large dag, and its tasks have been
>>>>>>>>>>>>>> scheduled,
>>>>>>>>>>>>>> then
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> scheduler will keep examining the tasks for queueing, even
>>>>>>>>>>>>>> if
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>> has
>>>>>>>>>>>>>> reached the maximum number of active tasks for that
>>>>>>>>>>>>>> particular
>>>>>>>>>>>>>> dag.
>>>>>>>>>>>>>> Once
>>>>>>>>>>>>>> that fails, then it will move on to examine the scheduled
>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> next
>>>>>>>>>>>>>> dag or dag_run in line.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> This is inefficient and causes the throughput of the
>>>>>>>>>>>>>> scheduler
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> workers to drop significantly. If there are available slots
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> pool and
>>>>>>>>>>>>>> the max parallelism hasn't been reached yet, then the
>>>>>>>>>>>>>> scheduler
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>> stop
>>>>>>>>>>>>>> processing a dag that has already reached its max capacity
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> active
>>>>>>>>>>>>>> tasks.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> In addition, the number of scheduled tasks picked for
>>>>>>>>>>>>>> examining,
>>>>>>>>>>>>>> should be
>>>>>>>>>>>>>> capped at the number of max active tasks if that's lower
>>>>>>>>>>>>>> than
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> query
>>>>>>>>>>>>>> limit. If the active limit is 10 and we already have 5
>>>>>>>>>>>>>> running,
>>>>>>>>>>>>>> then
>>>>>>>>>>>>>> we can
>>>>>>>>>>>>>> queue at most 5 tasks. In that case, we shouldn't examine
>>>>>>>>>>>>>> more
>>>>>>>>>>>>>> than
>>>>>>>>>>>>>> that.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> There is already a patch with the changes mentioned above.
>>>>>>>>>>>>>> IMO,
>>>>>>>>>>>>>> these
>>>>>>>>>>>>>> changes should be enabled/disabled with a config flag and
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>> by
>>>>>>>>>>>>>> default
>>>>>>>>>>>>>> because not everyone has the same needs as us. In our
>>>>>>>>>>>>>> testing,
>>>>>>>>>>>>>> adding a
>>>>>>>>>>>>>> limit on the tasks retrieved from the db requires more
>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>> on
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> query which actually makes things worse when you have
>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>> small
>>>>>>>>>>>>>> dags.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Here is a simple test case that makes the benefits of the
>>>>>>>>>>>>>> improvements
>>>>>>>>>>>>>> noticeable
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> - we have 3 dags with thousands of tasks each
>>>>>>>>>>>>>> - for simplicity let's have 1 dag_run per dag
>>>>>>>>>>>>>> - triggering them takes some time and due to that, the FIFO
>>>>>>>>>>>>>> order
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> tasks is very clear
>>>>>>>>>>>>>> - e.g. 1000 tasks from dag1 were scheduled first and then
>>>>>>>>>>>>>> 200
>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>> from dag2 etc.
>>>>>>>>>>>>>> - the executor has parallelism=100 and slots_available=100
>>>>>>>>>>>>>> which
>>>>>>>>>>>>>> means
>>>>>>>>>>>>>> that it can run up to 100 tasks concurrently
>>>>>>>>>>>>>> - max_active_tasks_per_dag is 4 which means that we can have
>>>>>>>>>>>>>> up
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> 4
>>>>>>>>>>>>>> tasks running per dag.
>>>>>>>>>>>>>> - For 3 dags, it means that we can run up to 12 tasks at the
>>>>>>>>>>>>>> same
>>>>>>>>>>>>>> time (4 tasks from each dag)
>>>>>>>>>>>>>> - max tis per query are set to 32, meaning that we can
>>>>>>>>>>>>>> examine
>>>>>>>>>>>>>> up
>>>>>>>>>>>>>> to 32
>>>>>>>>>>>>>> scheduled tasks if there are available pool slots
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> If we were to run the scheduler loop repeatedly until it
>>>>>>>>>>>>>> queues
>>>>>>>>>>>>>> 12
>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>> and test the part that examines the scheduled tasks and
>>>>>>>>>>>>>> queues
>>>>>>>>>>>>>> them,
>>>>>>>>>>>>>> then
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> - with the query limit
>>>>>>>>>>>>>> - 1 iteration, total time 0.05
>>>>>>>>>>>>>> - During the iteration
>>>>>>>>>>>>>> - we have parallelism 100, available slots 100 and query
>>>>>>>>>>>>>> limit
>>>>>>>>>>>>>> 32
>>>>>>>>>>>>>> which means that it will examine up to 32 scheduled tasks
>>>>>>>>>>>>>> - it can queue up to 100 tasks
>>>>>>>>>>>>>> - examines 12 tasks (instead of 32)
>>>>>>>>>>>>>> - 4 tasks from dag1, reached max for the dag
>>>>>>>>>>>>>> - 4 tasks from dag2, reached max for the dag
>>>>>>>>>>>>>> - and 4 tasks from dag3, reached max for the dag
>>>>>>>>>>>>>> - queues 4 from dag1, reaches max for the dag and moves on
>>>>>>>>>>>>>> - queues 4 from dag2, reaches max for the dag and moves on
>>>>>>>>>>>>>> - queues 4 from dag3, reaches max for the dag and moves on
>>>>>>>>>>>>>> - stops queueing because we have reached the maximum per
>>>>>>>>>>>>>> dag,
>>>>>>>>>>>>>> although there are slots for more tasks
>>>>>>>>>>>>>> - iteration finishes
>>>>>>>>>>>>>> - without
>>>>>>>>>>>>>> - 3 iterations, total time 0.29
>>>>>>>>>>>>>> - During iteration 1
>>>>>>>>>>>>>> - Examines 32 tasks, all from dag1 (due to FIFO)
>>>>>>>>>>>>>> - queues 4 from dag1 and tries to queue the other 28 but
>>>>>>>>>>>>>> fails
>>>>>>>>>>>>>> - During iteration 2
>>>>>>>>>>>>>> - examines the next 32 tasks from dag1
>>>>>>>>>>>>>> - it can't queue any of them because it has reached the max
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>> dag1, since the previous 4 are still running
>>>>>>>>>>>>>> - examines 32 tasks from dag2
>>>>>>>>>>>>>> - queues 4 from dag2 and tries to queue the other 28 but
>>>>>>>>>>>>>> fails
>>>>>>>>>>>>>> - During iteration 3
>>>>>>>>>>>>>> - examines the next 32 tasks from dag1, same tasks that were
>>>>>>>>>>>>>> examined in iteration 2
>>>>>>>>>>>>>> - it can't queue any of them because it has reached the max
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>> dag1 and the first 4 are still running
>>>>>>>>>>>>>> - examines 32 tasks from dag2 , can't queue any of them
>>>>>>>>>>>>>> because
>>>>>>>>>>>>>> it has reached max for dag2 as well
>>>>>>>>>>>>>> - examines 32 tasks from dag3
>>>>>>>>>>>>>> - queues 4 from dag3 and tries to queue the other 28 but
>>>>>>>>>>>>>> fails
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I used very low values for all the configs so that I can
>>>>>>>>>>>>>> make
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> point
>>>>>>>>>>>>>> clear and easy to understand. If we increase them, then this
>>>>>>>>>>>>>> patch
>>>>>>>>>>>>>> also
>>>>>>>>>>>>>> makes the task selection more fair and the resource
>>>>>>>>>>>>>> distribution
>>>>>>>>>>>>>> more
>>>>>>>>>>>>>> even.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I would appreciate it if anyone familiar with the
>>>>>>>>>>>>>> scheduler's
>>>>>>>>>>>>>> code
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>> confirm this and also provide any feedback.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Additionally, I have one question regarding the query limit.
>>>>>>>>>>>>>> Should it
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>> per dag_run or per dag? I've noticed that
>>>>>>>>>>>>>> max_active_tasks_per_dag
>>>>>>>>>>>>>> has
>>>>>>>>>>>>>> been changed to provide a value per dag_run but the docs
>>>>>>>>>>>>>> haven't
>>>>>>>>>>>>>> been
>>>>>>>>>>>>>> updated.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thank you!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Christos Bisias
>>> 
>>> ---------------------------------------------------------------------
>>> 
>>>>>>>>>>>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
>>>>>>>>>>>> For additional commands, e-mail:dev-h...@airflow.apache.org
>>>>> 
>>>>> ---------------------------------------------------------------------
>>>>> 
>>>>>>>>>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
>>>>>>>>>> For additional commands, e-mail:dev-h...@airflow.apache.org
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
> For additional commands, e-mail: dev-h...@airflow.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
For additional commands, e-mail: dev-h...@airflow.apache.org

Reply via email to