Hello, The scheduler is very efficient when running a large amount of dags with up to 1000 tasks each. But in our case, we have dags with as many as 10.000 tasks. And in that scenario the scheduler and worker throughput drops significantly. Even if you have 1 such large dag with scheduled tasks, the performance hit becomes noticeable.
We did some digging and we found that the issue comes from the scheduler's _executable_task_instances_to_queued <https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647> method. In particular with the db query here <https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375> and examining the results here <https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425> . If you have a very large dag, and its tasks have been scheduled, then the scheduler will keep examining the tasks for queueing, even if it has reached the maximum number of active tasks for that particular dag. Once that fails, then it will move on to examine the scheduled tasks of the next dag or dag_run in line. This is inefficient and causes the throughput of the scheduler and the workers to drop significantly. If there are available slots in the pool and the max parallelism hasn't been reached yet, then the scheduler should stop processing a dag that has already reached its max capacity of active tasks. In addition, the number of scheduled tasks picked for examining, should be capped at the number of max active tasks if that's lower than the query limit. If the active limit is 10 and we already have 5 running, then we can queue at most 5 tasks. In that case, we shouldn't examine more than that. There is already a patch with the changes mentioned above. IMO, these changes should be enabled/disabled with a config flag and not by default because not everyone has the same needs as us. In our testing, adding a limit on the tasks retrieved from the db requires more processing on the query which actually makes things worse when you have multiple small dags. Here is a simple test case that makes the benefits of the improvements noticeable - we have 3 dags with thousands of tasks each - for simplicity let's have 1 dag_run per dag - triggering them takes some time and due to that, the FIFO order of the tasks is very clear - e.g. 1000 tasks from dag1 were scheduled first and then 200 tasks from dag2 etc. - the executor has parallelism=100 and slots_available=100 which means that it can run up to 100 tasks concurrently - max_active_tasks_per_dag is 4 which means that we can have up to 4 tasks running per dag. - For 3 dags, it means that we can run up to 12 tasks at the same time (4 tasks from each dag) - max tis per query are set to 32, meaning that we can examine up to 32 scheduled tasks if there are available pool slots If we were to run the scheduler loop repeatedly until it queues 12 tasks and test the part that examines the scheduled tasks and queues them, then - with the query limit - 1 iteration, total time 0.05 - During the iteration - we have parallelism 100, available slots 100 and query limit 32 which means that it will examine up to 32 scheduled tasks - it can queue up to 100 tasks - examines 12 tasks (instead of 32) - 4 tasks from *dag1*, reached max for the dag - 4 tasks from *dag2*, reached max for the dag - and 4 tasks from *dag3*, reached max for the dag - queues 4 from *dag1*, reaches max for the dag and moves on - queues 4 from *dag2*, reaches max for the dag and moves on - queues 4 from *dag3*, reaches max for the dag and moves on - stops queueing because we have reached the maximum per dag, although there are slots for more tasks - iteration finishes - without - 3 iterations, total time 0.29 - During iteration 1 - Examines 32 tasks, all from *dag1* (due to FIFO) - queues 4 from *dag1* and tries to queue the other 28 but fails - During iteration 2 - examines the next 32 tasks from *dag1* - it can't queue any of them because it has reached the max for *dag1*, since the previous 4 are still running - examines 32 tasks from *dag2* - queues 4 from *dag2* and tries to queue the other 28 but fails - During iteration 3 - examines the next 32 tasks from *dag1*, same tasks that were examined in iteration 2 - it can't queue any of them because it has reached the max for *dag1* and the first 4 are still running - examines 32 tasks from *dag2* , can't queue any of them because it has reached max for *dag2* as well - examines 32 tasks from *dag3* - queues 4 from *dag3* and tries to queue the other 28 but fails I used very low values for all the configs so that I can make the point clear and easy to understand. If we increase them, then this patch also makes the task selection more fair and the resource distribution more even. I would appreciate it if anyone familiar with the scheduler's code can confirm this and also provide any feedback. Additionally, I have one question regarding the query limit. Should it be per dag_run or per dag? I've noticed that *max_active_tasks_per_dag* has been changed to provide a value per dag_run but the docs haven't been updated. Thank you! Regards, Christos Bisias