Hello, This is a known issue stemming from the optimistic scheduling strategy used in Airflow. We do address this in the above-mentioned PR. I want to note that there are many cases where this problem may appear—it was originally detected with pools, but we are striving to fix it in all cases, such as the one described here with max_active_tis_per_dag, by switching to pessimistic scheduling with SQL window functions. While the current strategy simply pulls the max_tis tasks and drops the ones that do not meet the constraints, the new strategy will pull only the tasks that are actually ready to be scheduled and that comply with all concurrency limits.
It would be very helpful for pushing this change to production if you could assist us in alpha-testing it. See also: https://github.com/apache/airflow/discussions/49160 Sent with Proton Mail secure email. On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif <elad...@apache.org> wrote: > i think most of your issues will be addressed by > https://github.com/apache/airflow/pull/53492 > The PR code can be tested with Breeze so you can set it up and see if it > solves the problem this will also help with confirming it's the right fix. > > On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias christos...@gmail.com > > wrote: > > > Hello, > > > > The scheduler is very efficient when running a large amount of dags with up > > to 1000 tasks each. But in our case, we have dags with as many as 10.000 > > tasks. And in that scenario the scheduler and worker throughput drops > > significantly. Even if you have 1 such large dag with scheduled tasks, the > > performance hit becomes noticeable. > > > > We did some digging and we found that the issue comes from the scheduler's > > _executable_task_instances_to_queued > > < > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647 > > > > method. > > In particular with the db query here > > < > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375 > > > > and > > examining the results here > > < > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425 > > > > . > > > > If you have a very large dag, and its tasks have been scheduled, then the > > scheduler will keep examining the tasks for queueing, even if it has > > reached the maximum number of active tasks for that particular dag. Once > > that fails, then it will move on to examine the scheduled tasks of the next > > dag or dag_run in line. > > > > This is inefficient and causes the throughput of the scheduler and the > > workers to drop significantly. If there are available slots in the pool and > > the max parallelism hasn't been reached yet, then the scheduler should stop > > processing a dag that has already reached its max capacity of active tasks. > > > > In addition, the number of scheduled tasks picked for examining, should be > > capped at the number of max active tasks if that's lower than the query > > limit. If the active limit is 10 and we already have 5 running, then we can > > queue at most 5 tasks. In that case, we shouldn't examine more than that. > > > > There is already a patch with the changes mentioned above. IMO, these > > changes should be enabled/disabled with a config flag and not by default > > because not everyone has the same needs as us. In our testing, adding a > > limit on the tasks retrieved from the db requires more processing on the > > query which actually makes things worse when you have multiple small dags. > > > > Here is a simple test case that makes the benefits of the improvements > > noticeable > > > > - we have 3 dags with thousands of tasks each > > - for simplicity let's have 1 dag_run per dag > > - triggering them takes some time and due to that, the FIFO order of the > > tasks is very clear > > - e.g. 1000 tasks from dag1 were scheduled first and then 200 tasks > > from dag2 etc. > > - the executor has parallelism=100 and slots_available=100 which means > > that it can run up to 100 tasks concurrently > > - max_active_tasks_per_dag is 4 which means that we can have up to 4 > > tasks running per dag. > > - For 3 dags, it means that we can run up to 12 tasks at the same > > time (4 tasks from each dag) > > - max tis per query are set to 32, meaning that we can examine up to 32 > > scheduled tasks if there are available pool slots > > > > If we were to run the scheduler loop repeatedly until it queues 12 tasks > > and test the part that examines the scheduled tasks and queues them, then > > > > - with the query limit > > - 1 iteration, total time 0.05 > > - During the iteration > > - we have parallelism 100, available slots 100 and query limit 32 > > which means that it will examine up to 32 scheduled tasks > > - it can queue up to 100 tasks > > - examines 12 tasks (instead of 32) > > - 4 tasks from dag1, reached max for the dag > > - 4 tasks from dag2, reached max for the dag > > - and 4 tasks from dag3, reached max for the dag > > - queues 4 from dag1, reaches max for the dag and moves on > > - queues 4 from dag2, reaches max for the dag and moves on > > - queues 4 from dag3, reaches max for the dag and moves on > > - stops queueing because we have reached the maximum per dag, > > although there are slots for more tasks > > - iteration finishes > > - without > > - 3 iterations, total time 0.29 > > - During iteration 1 > > - Examines 32 tasks, all from dag1 (due to FIFO) > > - queues 4 from dag1 and tries to queue the other 28 but fails > > - During iteration 2 > > - examines the next 32 tasks from dag1 > > - it can't queue any of them because it has reached the max for > > dag1, since the previous 4 are still running > > - examines 32 tasks from dag2 > > - queues 4 from dag2 and tries to queue the other 28 but fails > > - During iteration 3 > > - examines the next 32 tasks from dag1, same tasks that were > > examined in iteration 2 > > - it can't queue any of them because it has reached the max for > > dag1 and the first 4 are still running > > - examines 32 tasks from dag2 , can't queue any of them because > > it has reached max for dag2 as well > > - examines 32 tasks from dag3 > > - queues 4 from dag3 and tries to queue the other 28 but fails > > > > I used very low values for all the configs so that I can make the point > > clear and easy to understand. If we increase them, then this patch also > > makes the task selection more fair and the resource distribution more even. > > > > I would appreciate it if anyone familiar with the scheduler's code can > > confirm this and also provide any feedback. > > > > Additionally, I have one question regarding the query limit. Should it be > > per dag_run or per dag? I've noticed that max_active_tasks_per_dag has > > been changed to provide a value per dag_run but the docs haven't been > > updated. > > > > Thank you! > > > > Regards, > > Christos Bisias --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org For additional commands, e-mail: dev-h...@airflow.apache.org