Hello,

The scheduler is very efficient when running a large amount of dags with up
to 1000 tasks each. But in our case, we have dags with as many as 10.000
tasks. And in that scenario the scheduler and worker throughput drops
significantly. Even if you have 1 such large dag with scheduled tasks, the
performance hit becomes noticeable.

We did some digging and we found that the issue comes from the scheduler's
_executable_task_instances_to_queued
<https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647>
method.
In particular with the db query here
<https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375>
and
examining the results here
<https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425>
.

If you have a very large dag, and its tasks have been scheduled, then the
scheduler will keep examining the tasks for queueing, even if it has
reached the maximum number of active tasks for that particular dag. Once
that fails, then it will move on to examine the scheduled tasks of the next
dag or dag_run in line.

This is inefficient and causes the throughput of the scheduler and the
workers to drop significantly. If there are available slots in the pool and
the max parallelism hasn't been reached yet, then the scheduler should stop
processing a dag that has already reached its max capacity of active tasks.

In addition, the number of scheduled tasks picked for examining, should be
capped at the number of max active tasks if that's lower than the query
limit. If the active limit is 10 and we already have 5 running, then we can
queue at most 5 tasks. In that case, we shouldn't examine more than that.

There is already a patch with the changes mentioned above. IMO, these
changes should be enabled/disabled with a config flag and not by default
because not everyone has the same needs as us. In our testing, adding a
limit on the tasks retrieved from the db requires more processing on the
query which actually makes things worse when you have multiple small dags.

Here is a simple test case that makes the benefits of the improvements
noticeable

   - we have 3 dags with thousands of tasks each
      - for simplicity let's have 1 dag_run per dag
   - triggering them takes some time and due to that, the FIFO order of the
   tasks is very clear
      - e.g. 1000 tasks from dag1 were scheduled first and then 200 tasks
      from dag2 etc.
   - the executor has parallelism=100 and slots_available=100 which means
   that it can run up to 100 tasks concurrently
   - max_active_tasks_per_dag is 4 which means that we can have up to 4
   tasks running per dag.
      - For 3 dags, it means that we can run up to 12 tasks at the same
      time (4 tasks from each dag)
   - max tis per query are set to 32, meaning that we can examine up to 32
   scheduled tasks if there are available pool slots

If we were to run the scheduler loop repeatedly until it queues 12 tasks
and test the part that examines the scheduled tasks and queues them, then

   - with the query limit
      - 1 iteration, total time 0.05
      - During the iteration
         - we have parallelism 100, available slots 100 and query limit 32
         which means that it will examine up to 32 scheduled tasks
         - it can queue up to 100 tasks
         - examines 12 tasks (instead of 32)
            - 4 tasks from *dag1*, reached max for the dag
            - 4 tasks from *dag2*, reached max for the dag
            - and 4 tasks from *dag3*, reached max for the dag
         - queues 4 from *dag1*, reaches max for the dag and moves on
         - queues 4 from *dag2*, reaches max for the dag and moves on
         - queues 4 from *dag3*, reaches max for the dag and moves on
         - stops queueing because we have reached the maximum per dag,
         although there are slots for more tasks
         - iteration finishes
      - without
      - 3 iterations, total time 0.29
      - During iteration 1
         - Examines 32 tasks, all from *dag1*  (due to FIFO)
         - queues 4 from *dag1*  and tries to queue the other 28 but fails
      - During iteration 2
         - examines the next 32 tasks from *dag1*
         - it can't queue any of them because it has reached the max for
         *dag1*, since the previous 4 are still running
         - examines 32 tasks from *dag2*
         - queues 4 from *dag2*  and tries to queue the other 28 but fails
      - During iteration 3
         - examines the next 32 tasks from *dag1*, same tasks that were
         examined in iteration 2
         - it can't queue any of them because it has reached the max for
         *dag1* and the first 4 are still running
         - examines 32 tasks from *dag2* , can't queue any of them because
         it has reached max for *dag2* as well
         - examines 32 tasks from *dag3*
         - queues 4 from *dag3* and tries to queue the other 28 but fails


I used very low values for all the configs so that I can make the point
clear and easy to understand. If we increase them, then this patch also
makes the task selection more fair and the resource distribution more even.

I would appreciate it if anyone familiar with the scheduler's code can
confirm this and also provide any feedback.

Additionally, I have one question regarding the query limit. Should it be
per dag_run or per dag? I've noticed that *max_active_tasks_per_dag* has
been changed to provide a value per dag_run but the docs haven't been
updated.

Thank you!

Regards,
Christos Bisias

Reply via email to