Please please please: Give us DB benchmark figures. It almost doesnNothing else matters if this performance hits DBs too hard.
What about Daniel’s idea in the slack about adding a “last_scheduling_decision” or similar to TaskInstance too, and order by that so that we don’t just repeatedly hit the same TI? On the surface, that seems like a much more maintainable solution with 90-99% of the same net effect (that TI's that can’t get queued don’t starve out everything else) without a complex query and possible DB load, not to mention possible big differences between mysql and Postgres. > On 8 Aug 2025, at 11:34, asquator <asqua...@proton.me.INVALID> wrote: > > #53492 status update: > > We optimized the query significantly by abandoning the nesting of window > functions and joining on TI.id in the outer query. It's still a heavier query > than we had before for fetching tasks (with lighter python work), so > benchmarking the DB is required, but it's somewhat difficult because there > don't seem to be unified, agreed upon workloads for benchmarking the > scheduler. Running dynamic benchmarks with multiple deployments as suggested > by @ashb is challenging too due to high resource requirements. I remember > there was an AIP-59 for similar cases, but I'm not sure if it's fit here. > We're open for any suggestions as to how to advance. > > Regarding #54103, I think it's difficult to maintain and extend just because > it solves just one out of four possible starvation cases. If it's merged, the > TI per DAG issue is solved forever, but TI per pool starvation (issue #45636) > will still be present. What this PR does is computing a lateral join on DAG > runs and ensuring the query never fetches more TIs for a DAG run than it can > run. It's roughly equivalent to one of the window functions in #53492. If we > want to also solve pool starvation, we'll have to add another lateral join. > It's all the same except performance, but let's be creative - we can reduce > the performance overhead of #53492 to that of #54103 by simply computing > only one window per scheduler iteration, and it can be done in a round-robin > fashion (for windows), such that every #concurrency_limits = 4 iterations we > break starvation for at least one of the limits. This way we have the same > performance but solve all the cases, at least once in 4 iterations. If we see > that windows don't cause a big overhead, we can run two of them in every > iteration. This can be another configuration called scheduler.optimism_level > that defines how many window functions we include, while handling other > limits optimistically. This requires lots of coding and testing, but the idea > is clear. > > I say we should handle all the starvation issues in a comprehensive, > encompassing logic change for the scheduler. > > > On Thursday, August 7th, 2025 at 4:43 AM, Christos Bisias > <christos...@gmail.com> wrote: > >>> You're talking about https://github.com/apache/airflow/pull/53492/ right? >> >> >> Yes. >> >>> Where is the PR from @Christos? >> >> >> https://github.com/apache/airflow/pull/54103 >> >> >> >> On Wed, Aug 6, 2025 at 23:51 Daniel Standish >> daniel.stand...@astronomer.io.invalid wrote: >> >>>> IMO, the approach on the patch isn't easily maintainable. Most of the >>>> calculations are performed by SQL in a huge query. >>>> It would be my preference to have many smaller queries and do part of the >>>> calculations in python. This will be easier to understand, maintain and >>>> debug in the future. Also, it will be easier to unit test. >>> >>> You're talking about https://github.com/apache/airflow/pull/53492/ right? >>> I agree. I share the skepticism that it must be one big ugly query. At a >>> minimum it needs a lot more work and refinement. Not something that should >>> be merged in the current state, even as experimental. >>> >>> Where is the PR from @Christos? >>> >>> On Wed, Aug 6, 2025 at 12:54 PM Jens Scheffler <j_scheff...@gmx.de.invalid >>> >>> wrote: >>> >>>> Hi, >>>> >>>> I was (until now) not be able to re-read all the Slack discussion and >>>> like to make this latest at the weekend. I also like Jarek fear that the >>>> optimization makes the Scheduler rather hard to maintain. We also had >>>> some points where we_thought_ we can contribute some optimizations >>>> especially for Mapped Tasks and then considered the complexity of Mapped >>>> Task Groups where the Depth-First Strategy would defeat all our drafted >>>> optimizations. So also in our current apporach we are cutting down the >>>> Dags in manageable pieces. >>>> >>>> So far (I believ, but anybody correct me if I am wrong) the scaling was >>>> always documented only with options, no real upper boundary (other than >>>> soft limtis) existing in the code. So the delivered product never >>>> confirmed fixed upper limits. It might be good also to consider that we >>>> document where we know there are natural or structural boundaries. So >>>> hope that I can read more details the next days. >>>> >>>> Jens >>>> >>>> On 06.08.25 10:31, Jarek Potiuk wrote: >>>> >>>>>> My main issue and the topic of this thread, has been that the >>>>>> scheduler >>>>>> does unnecessary work that leads to decreased throughput. My solution >>>>>> has >>>>>> been to limit the results of the query to the dag cap of active tasks >>>>>> that >>>>>> the user has defined. >>>>> >>>>> Yes. I understand that. There are situations that cause this >>>>> "unnecessary >>>>> work" to be excessive and lead to lower performance and more memory >>>>> usage. >>>>> This is quite "normal". No system in the world is optimized for all >>>>> kinds >>>>> of scenarios and sometimes you need to make trade-offs - for example >>>>> lower >>>>> performance and maintainability (and support for MySQL and Postgres as >>>>> Ash >>>>> pointed out in some other threads) which we have to make. There are >>>>> various >>>>> optimisation goals we can chase: optimal performance and no wasted >>>>> resources in certain situations and configurations is one of (many) >>>>> goals >>>>> we might have. Other goals might include: easier maintainability, >>>>> better >>>>> community collaboration, simplicity, less code to maintain, >>>>> testability, >>>>> also (what I mentioned before) sometimes deliberate not handling >>>>> certain >>>>> scenarios and introducing friction might be deliberate decision we >>>>> can >>>>> take in order to push our users in the direction we want them to go. >>>>> Yes. >>>>> As community and maintainers we do not have to always "follow" our >>>>> users >>>>> behaviour - we can (and we often do) educate our users and show them >>>>> better >>>>> ways of doing things. >>>>> >>>>> For example we had a LONG discussion whether to introduce caching of >>>>> Variable values during Dag parsing - because we knew our users are >>>>> often >>>>> using Variables in top-level code of their Dags and this leads to a lot >>>>> of >>>>> waste and high CPU and I/O usage by Dag processor. We finally >>>>> implemented >>>>> it as an experimental feature, but it was not at all certain we will - >>>>> we >>>>> had to carefully consider what we are trading in exchange for that >>>>> performance - and whether it's worth it. >>>>> >>>>> Same here - I understand: there are some cases (arguably rather niche - >>>>> with very large Dags) where scheduler does unnecessary processing and >>>>> performance could be improved. Now - we need to understand what >>>>> trade-offs >>>>> we need to make as maintainers and community (including our users) if >>>>> we >>>>> want to address it. We need to know what complexity is involved, >>>>> whether >>>>> it >>>>> will work with Postgres/MySQL and SQlite, whether we will be able to >>>>> continue debugging and testing it. And whether we want to drive away >>>>> our >>>>> user from the modularisation strategy (smaller Dags) that we think >>>>> makes >>>>> more sense than bigger Dags. We have to think about what happens next. >>>>> If >>>>> we make "huge Dags" first-class-citizens, will it mean that we will >>>>> have >>>>> to >>>>> redesign our UI to support them? What should we do when someone opens >>>>> up >>>>> an >>>>> issue "I have this 1000000 task Dag and I cannot open Airflow UI - it >>>>> crashes hard and makes my Airflow instance unusable - please fix it >>>>> ASAP". >>>>> I certainly would like to avoid such a situation to stress our friend >>>>> maintainers who work on UI - so also they should have a say on how >>>>> feasible >>>>> it is to make it "easy" to have "huge Dags" for them. >>>>> >>>>> All those factors should be taken into account when you make a >>>>> "product" >>>>> decision. Performance gains for particular cases is just one of many >>>>> factors to consider - and often not the most important ones. >>>>> >>>>> J. >>>>> >>>>> On Wed, Aug 6, 2025 at 7:34 AM Christos Bisias christos...@gmail.com >>>>> wrote: >>>>> >>>>>> We also have a dag with dynamic task mapping that can grow immensely. >>>>>> >>>>>> I've been looking at https://github.com/apache/airflow/pull/53492. >>>>>> >>>>>> My main issue and the topic of this thread, has been that the >>>>>> scheduler >>>>>> does unnecessary work that leads to decreased throughput. My solution >>>>>> has >>>>>> been to limit the results of the query to the dag cap of active tasks >>>>>> that >>>>>> the user has defined. >>>>>> >>>>>> The patch is more focused on the available pool slots. I get the idea >>>>>> that >>>>>> if we can only examine and queue as many tasks as available slots, >>>>>> then >>>>>> we >>>>>> will be efficiently utilizing the available slots to the max, the >>>>>> throughput will increase and my issue will be solved as well. >>>>>> >>>>>> IMO, the approach on the patch isn't easily maintainable. Most of the >>>>>> calculations are performed by SQL in a huge query. >>>>>> >>>>>> It would be my preference to have many smaller queries and do part of >>>>>> the >>>>>> calculations in python. This will be easier to understand, maintain >>>>>> and >>>>>> debug in the future. Also, it will be easier to unit test. >>>>>> >>>>>> On Tue, Aug 5, 2025 at 10:20 PM Jarek Potiuk ja...@potiuk.com >>>>>> wrote: >>>>>> >>>>>>> Just a comment here - I am also not opposed as well if optimizations >>>>>>> will >>>>>>> be implemented without impacting the more "regular"cases. And - >>>>>>> important - >>>>>>> without adding huge complexity. >>>>>>> >>>>>>> The SQL queries I saw in recent PRs and discussions look both "smart" >>>>>>> and >>>>>>> "scary" at the same time. Optimizations like that tend to lead to >>>>>>> obfuscated, difficult to understand and reason code and "smart" >>>>>>> solutions - >>>>>>> sometimes "too smart". And when it ends up with one or two people >>>>>>> only >>>>>>> being able to debug and fix problems connected with those, things >>>>>>> become >>>>>>> a >>>>>>> little hairy. So whatever we do there, it must be not only >>>>>>> "smart" >>>>>>> but >>>>>>> also easy to read and well tested - so that anyone can run the tests >>>>>>> easily >>>>>>> and reproduce potential failure cases. >>>>>>> >>>>>>> And yes I know I am writing this as someone who - for years was the >>>>>>> only >>>>>>> one to understand our complex CI setup. But I think over the last two >>>>>>> years >>>>>>> we are definitely going into, simpler, easier to understand setup and >>>>>>> we >>>>>>> have more people on board who know how to deal with it and I think >>>>>>> that >>>>>>> is >>>>>>> a very good direction we are taking :). And I am sure that when I go >>>>>>> for >>>>>>> my >>>>>>> planned 3 weeks holidays before the summit, everything will work as >>>>>>> smoothly as when I am here - at least. >>>>>>> >>>>>>> Also I think there is quite a difference (when it comes to >>>>>>> scheduling) >>>>>>> when >>>>>>> you have mapped tasks versus "regular tasks". I think Airflow even >>>>>>> currently behaves rather differently in those two different cases, >>>>>>> and >>>>>>> also >>>>>>> it has a well-thought and optimized UI experience to handle thousands >>>>>>> of >>>>>>> them. Also the work of David Blain on Lazy Expandable Task Mapping >>>>>>> will >>>>>>> push the boundaries of what is possible there as well: >>>>>>> https://github.com/apache/airflow/pull/51391. Even if we solve >>>>>>> scheduling >>>>>>> optimization - the UI and ability to monitor such huge Dags is still >>>>>>> likely >>>>>>> not something our UI was designed for. >>>>>>> >>>>>>> And I am fully on board with "splitting to even smaller pieces" and >>>>>>> "modularizing" things - and "modularizing and splitting big Dags into >>>>>>> smaller Dags" feels like precisely what should be done. And I think >>>>>>> it >>>>>>> would be a nice idea to try it and follow and see if you can't >>>>>>> achieve >>>>>>> the >>>>>>> same results without adding complexity. >>>>>>> >>>>>>> J. >>>>>>> >>>>>>> On Tue, Aug 5, 2025 at 8:47 PM Ash Berlin-Taylor a...@apache.org >>>>>>> wrote: >>>>>>> >>>>>>>> Yeah dynamic task mapping is a good case where you could easily end >>>>>>>> up >>>>>>>> with thousands of tasksof in a dag. >>>>>>>> >>>>>>>> As I like to say, Airflow is a broad church and if we’re can >>>>>>>> reasonably >>>>>>>> support diverse workloads without impacting others (either the >>>>>>>> workloads >>>>>>>> out our available to support and maintain etc) then I’m all for it. >>>>>>>> >>>>>>>> In addition to your two items I’d like to add >>>>>>>> >>>>>>>> 3. That it doesn’t increase the db’s CPU disproportionally to the >>>>>>>> increased task throughput >>>>>>>> >>>>>>>>> On 5 Aug 2025, at 19:14, asquator asqua...@proton.me.invalid >>>>>>>>> wrote: >>>>>>>>> I'm glad this issue finally got enough attention and we can move >>>>>>>>> it >>>>>>>>> forward. >>>>>>>>> I took a look at @Christos's patch and it makes sense overall, it's >>>>>>>>> fine >>>>>>>>> for the specific problem they experienced with max_active_tasks >>>>>>>>> limit. >>>>>>>>> For those unfamiliar with the core problem, the bug has a plenty of >>>>>>>>> variations where starvation happens due to different concurrency >>>>>>>>> limitations being nearly satiated, which creates the opportunity for >>>>>>>>> the >>>>>>>>> scheduler to pull many tasks and schedule none of them. >>>>>>>>> To reproduce this bug, you need two conditions: >>>>>>>>> 1. Many tasks (>> max_tis) belonging to one "pool", where "pool" is >>>>>>>>> some >>>>>>>>> concurrency limitation of Airflow. Note that originally the bug was >>>>>>>>> discovered in context of task pools (see >>>>>>>>> https://github.com/apache/airflow/issues/45636). >>>>>>>>> 2. The tasks are short enough (or the parallelism is large enough) >>>>>>>>> for >>>>>>>>> the tasks from the nearly starved pool to free some slots in every >>>>>>>>> scheduler's iteration. >>>>>>>>> When we discovered a bug that starved our less prioritized pool, >>>>>>>>> even >>>>>>>>> when the most prioritized pool was almost full (thanks to >>>>>>>>> @nevcohen), >>>>>>>>> we >>>>>>>>> wanted to implement a similar patch @Christos suggested above, but >>>>>>>>> for >>>>>>>>> pools. But then we realized this issue can arise due to limits >>>>>>>>> different >>>>>>>>> from task pools, including: >>>>>>>>> max_active_tasks >>>>>>>>> max_active_tis_per_dag >>>>>>>>> max_active_tis_per_dagrun >>>>>>>>> >>>>>>>>> So we were able to predict the forecoming bug reports for different >>>>>>>>> kinds of starvation, and we started working on the most general >>>>>>>>> solution >>>>>>>>> which is the topic of this discussion. >>>>>>>>> I want to also answer @potiuk regarding "why you need such large >>>>>>>>> DAGs", >>>>>>>>> but I will be brief. >>>>>>>>> Airflow is an advanced tool for scheduling large data operations, >>>>>>>>> and >>>>>>>>> over the years it has pushed to production many features that lead >>>>>>>>> to >>>>>>>>> organizations writing DAGs that contain thousands of tasks. Most >>>>>>>>> prominent >>>>>>>>> one is dynamic task mapping. This feature made us realize we can >>>>>>>>> implement >>>>>>>>> a batching work queue pattern and create a task for every unit we >>>>>>>>> have >>>>>>>>> to >>>>>>>>> process, say it's a file in a specific folder, a path in the >>>>>>>>> filesystem, >>>>>>>>> a >>>>>>>>> pointer to some data stored in object storage, etc. We like to think >>>>>>>>> in >>>>>>>>> terms of splitting the work into many tasks. Is it good? I don't >>>>>>>>> know, >>>>>>>>> but >>>>>>>>> Airflow has already stepped onto this path, and we have to make it >>>>>>>>> technologically possible (if we can). >>>>>>>>> Nevertheless, even if such DAGs are considered too big and >>>>>>>>> splitting >>>>>>>>> them is a good idea (though you still have nothing to do with mapped >>>>>>>>> tasks >>>>>>>>> - we create tens of thousands of them sometimes and expect them to >>>>>>>>> be >>>>>>>>> processed in parallel), this issue does not only address the >>>>>>>>> described >>>>>>>>> case, but many others, including prioritized pools, mapped tasks or >>>>>>>>> max_active_runs starvation on large backfills. >>>>>>>>> The only part that's missing now is measuring query time (static >>>>>>>>> benchmarks) and measuring overall scheduling metrics in production >>>>>>>>> workloads (dynamic benchmarks). >>>>>>>>> We're working hard on this crucial part now. >>>>>>>>> >>>>>>>>> We'd be happy to have any assistance from the community as regard >>>>>>>>> to >>>>>>>>> the >>>>>>>>> dynamic benchmarks, because every workload is different and it's >>>>>>>>> pretty >>>>>>>>> difficult to simulate the general case in such a hard-to-reproduce >>>>>>>>> issue. >>>>>>>>> We have to make sure that: >>>>>>>>> 1. In a busy workload, the new logic boosts the scheduler's >>>>>>>>> throughput. >>>>>>>>> 2. In a light workload, the nested windowing doesn't significantly >>>>>>>>> slow >>>>>>>>> down the computation. >>>>>>>>> >>>>>>>>>> On Monday, August 4th, 2025 at 9:00 PM, Christos Bisias < >>>>>>>>>> christos...@gmail.com> wrote: >>>>>>>>>> I created a draft PR for anyone interested to take a look at the >>>>>>>>>> code >>>>>>>>>> https://github.com/apache/airflow/pull/54103 >>>>>>>>>> >>>>>>>>>> I was able to demonstrate the issue in the unit test with much >>>>>>>>>> fewer >>>>>>>>>> tasks. >>>>>>>>>> All we need is the tasks brought back by the db query to belong to >>>>>>>>>> the >>>>>>>>>> same >>>>>>>>>> dag_run or dag. This can happen when the first SCHEDULED tasks in >>>>>>>>>> line >>>>>>>>>> to >>>>>>>>>> be examined are at least as many as the number of the tis per >>>>>>>>>> query. >>>>>>>>>> >>>>>>>>>> On Mon, Aug 4, 2025 at 8:37 PM Daniel Standish >>>>>>>>>> daniel.stand...@astronomer.io.invalid wrote: >>>>>>>>>> >>>>>>>>>>>> The configurability was my recommendation for >>>>>>>>>>>> https://github.com/apache/airflow/pull/53492 >>>>>>>>>>>> Given the fact that this change is at the heart of Airflow I >>>>>>>>>>>> think >>>>>>>>>>>> the >>>>>>>>>>>> changes should be experimental where users can switch between >>>>>>>>>>>> different >>>>>>>>>>>> strategies/modes of the scheduler. >>>>>>>>>>>> If and when we have enough data to support that specific option >>>>>>>>>>>> is >>>>>>>>>>>> always >>>>>>>>>>>> better we can make decisions accordingly. >>>>>>>>>>>> Yeah I guess looking at #53492 >>>>>>>>>>>> https://github.com/apache/airflow/pull/53492 it does seem too >>>>>>>>>>>> risky >>>>>>>>>>>> to >>>>>>>>>>>> just change the behavior in airflow without releasing it first as >>>>>>>>>>>> experimental. >>>>>>>>>>> >>>>>>>>>>> I doubt we can get sufficient real world testing without doing >>>>>>>>>>> that. >>>>>>>>>>> So if this is introduced, I think it should just be introduced as >>>>>>>>>>> experimental optimization. And the intention would be that >>>>>>>>>>> ultimately >>>>>>>>>>> there will only be one scheduling mode, and this is just a way to >>>>>>>>>>> test >>>>>>>>>>> this >>>>>>>>>>> out more widely. Not that we are intending to have two scheduling >>>>>>>>>>> code >>>>>>>>>>> paths on a permanent basis. >>>>>>>>>>> >>>>>>>>>>> WDYT >>>>>>>>>>> >>>>>>>>>>> On Mon, Aug 4, 2025 at 12:50 AM Christos Bisias >>>>>>>>>>> christos...@gmail.com >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>>> So my question to you is: is it impossible, or just demanding >>>>>>>>>>>>> or >>>>>>>>>>>>> difficult >>>>>>>>>>>>> to split your Dags into smaller dags connected with asset aware >>>>>>>>>>>>> scheduling? >>>>>>>>>>>>> Jarek, I'm going to discuss this with the team and I will get >>>>>>>>>>>>> you >>>>>>>>>>>>> an >>>>>>>>>>>>> answer >>>>>>>>>>>>> on that. >>>>>>>>>>>> >>>>>>>>>>>> I've shared this again on the thread >>> >>> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch >>> >>>>>>>>>>>> I haven't created a PR because this is just a POC and it's also >>>>>>>>>>>> setting a >>>>>>>>>>>> limit per dag. I would like to get feedback on whether it's >>>>>>>>>>>> better >>>>>>>>>>>> to >>>>>>>>>>>> make >>>>>>>>>>>> it per dag or per dag_run. >>>>>>>>>>>> I can create a draft PR if that's helpful and makes it easier to >>>>>>>>>>>> add >>>>>>>>>>>> comments. >>>>>>>>>>>> >>>>>>>>>>>> Let me try to explain the issue better. From a high level >>>>>>>>>>>> overview, >>>>>>>>>>>> the >>>>>>>>>>>> scheduler >>>>>>>>>>>> >>>>>>>>>>>> 1. moves tasks to SCHEDULED >>>>>>>>>>>> 2. runs a query to fetch SCHEDULED tasks from the db >>>>>>>>>>>> 3. examines the tasks >>>>>>>>>>>> 4. moves tasks to QUEUED >>>>>>>>>>>> >>>>>>>>>>>> I'm focusing on step 2 and afterwards. The current code doesn't >>>>>>>>>>>> take >>>>>>>>>>>> into >>>>>>>>>>>> account the max_active_tasks_per_dag. When it runs the query it >>>>>>>>>>>> fetches >>>>>>>>>>>> up to max_tis which is determined here >>>>>>>>>>>> < >>> >>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L697-L705 >>> >>>>>>>>>>>> . >>>>>>>>>>>> >>>>>>>>>>>> For example, >>>>>>>>>>>> >>>>>>>>>>>> - if the query number is 32 >>>>>>>>>>>> - all 32 tasks in line belong to the same dag, dag1 >>>>>>>>>>>> - we are not concerned how the scheduler picks them >>>>>>>>>>>> - dag1 has max_active_tasks set to 5 >>>>>>>>>>>> >>>>>>>>>>>> The current code will >>>>>>>>>>>> >>>>>>>>>>>> - get 32 tasks from dag1 >>>>>>>>>>>> - start examining them one by one >>>>>>>>>>>> - once 5 are moved to QUEUED, it won't stop, it will keep >>>>>>>>>>>> examining >>>>>>>>>>>> the other 27 but won't be able to queue them because it has >>>>>>>>>>>> reached >>>>>>>>>>>> the >>>>>>>>>>>> limit >>>>>>>>>>>> >>>>>>>>>>>> In the next loop, although we have reached the maximum number of >>>>>>>>>>>> tasks >>>>>>>>>>>> for >>>>>>>>>>>> dag1, the query will fetch again 32 tasks from dag1 to examine >>>>>>>>>>>> them >>>>>>>>>>>> and >>>>>>>>>>>> try to queue them. >>>>>>>>>>>> >>>>>>>>>>>> The issue is that it gets more tasks than it can queue from the >>>>>>>>>>>> db >>>>>>>>>>>> and >>>>>>>>>>>> then >>>>>>>>>>>> examines them all. >>>>>>>>>>>> >>>>>>>>>>>> This all leads to unnecessary processing that builds up and the >>>>>>>>>>>> more >>>>>>>>>>>> load >>>>>>>>>>>> there is on the system, the more the throughput drops for the >>>>>>>>>>>> scheduler >>>>>>>>>>>> and >>>>>>>>>>>> the workers. >>>>>>>>>>>> >>>>>>>>>>>> What I'm proposing is to adjust the query in step 2, to check >>>>>>>>>>>> the >>>>>>>>>>>> max_active_tasks_per_dag >>>>>>>>>>>> >>>>>>>>>>>>> run a query to fetch SCHEDULED tasks from the db >>>>>>>>>>>>> If a dag has already reached the maximum number of tasks in >>>>>>>>>>>>> active >>>>>>>>>>>>> states, >>>>>>>>>>>>> it will be skipped by the query. >>>>>>>>>>>> >>>>>>>>>>>> Don't we already stop examining at that point? I guess there's >>>>>>>>>>>> two >>>>>>>>>>>> things >>>>>>>>>>>> >>>>>>>>>>>>> you might be referring to. One is, which TIs come out of the db >>>>>>>>>>>>> and >>>>>>>>>>>>> into >>>>>>>>>>>>> python, and the other is, what we do in python. Just might be >>>>>>>>>>>>> helpful >>>>>>>>>>>>> to >>>>>>>>>>>>> be clear about the specific enhancements & changes you are >>>>>>>>>>>>> making. >>>>>>>>>>>>> I think that if we adjust the query and fetch the right number >>>>>>>>>>>>> of >>>>>>>>>>>>> tasks, >>>>>>>>>>>>> then we won't have to make changes to what is done in python. >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Aug 4, 2025 at 8:01 AM Daniel Standish >>>>>>>>>>>> daniel.stand...@astronomer.io.invalid wrote: >>>>>>>>>>>> >>>>>>>>>>>>> @Christos Bisias >>>>>>>>>>>>> >>>>>>>>>>>>> If you have a very large dag, and its tasks have been >>>>>>>>>>>>> scheduled, >>>>>>>>>>>>> then >>>>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>>>> scheduler will keep examining the tasks for queueing, even if >>>>>>>>>>>>>> it >>>>>>>>>>>>>> has >>>>>>>>>>>>>> reached the maximum number of active tasks for that particular >>>>>>>>>>>>>> dag. >>>>>>>>>>>>>> Once >>>>>>>>>>>>>> that fails, then it will move on to examine the scheduled >>>>>>>>>>>>>> tasks >>>>>>>>>>>>>> of >>>>>>>>>>>>>> the >>>>>>>>>>>>>> next >>>>>>>>>>>>>> dag or dag_run in line. >>>>>>>>>>>>>> Can you make this a little more precise? There's some >>>>>>>>>>>>>> protection >>>>>>>>>>>>>> against >>>>>>>>>>>>>> "starvation" i.e. dag runs recently considered should go to the >>>>>>>>>>>>>> back >>>>>>>>>>>>>> of >>>>>>>>>>>>>> the >>>>>>>>>>>>>> line next time. >>>>>>>>>>>>> >>>>>>>>>>>>> Maybe you could clarify why / how that's not working / not >>>>>>>>>>>>> optimal >>>>>>>>>>>>> / >>>>>>>>>>>>> how >>>>>>>>>>>>> to >>>>>>>>>>>>> improve. >>>>>>>>>>>>> >>>>>>>>>>>>> If there are available slots in the pool and >>>>>>>>>>>>> >>>>>>>>>>>>>> the max parallelism hasn't been reached yet, then the >>>>>>>>>>>>>> scheduler >>>>>>>>>>>>>> should >>>>>>>>>>>>>> stop >>>>>>>>>>>>>> processing a dag that has already reached its max capacity of >>>>>>>>>>>>>> active >>>>>>>>>>>>>> tasks. >>>>>>>>>>>>>> If a dag run (or dag) is already at max capacity, it doesn't >>>>>>>>>>>>>> really >>>>>>>>>>>>>> matter >>>>>>>>>>>>>> if there are slots available or parallelism isn't reached -- >>>>>>>>>>>>>> shouldn't >>>>>>>>>>>>>> it >>>>>>>>>>>>>> stop anyway? >>>>>>>>>>>>> >>>>>>>>>>>>> In addition, the number of scheduled tasks picked for >>>>>>>>>>>>> examining, >>>>>>>>>>>>> should >>>>>>>>>>>>> be >>>>>>>>>>>>> >>>>>>>>>>>>>> capped at the number of max active tasks if that's lower than >>>>>>>>>>>>>> the >>>>>>>>>>>>>> query >>>>>>>>>>>>>> limit. If the active limit is 10 and we already have 5 >>>>>>>>>>>>>> running, >>>>>>>>>>>>>> then >>>>>>>>>>>>>> we >>>>>>>>>>>>>> can >>>>>>>>>>>>>> queue at most 5 tasks. In that case, we shouldn't examine more >>>>>>>>>>>>>> than >>>>>>>>>>>>>> that. >>>>>>>>>>>>>> Don't we already stop examining at that point? I guess there's >>>>>>>>>>>>>> two >>>>>>>>>>>>>> things >>>>>>>>>>>>>> you might be referring to. One is, which TIs come out of the db >>>>>>>>>>>>>> and >>>>>>>>>>>>>> into >>>>>>>>>>>>>> python, and the other is, what we do in python. Just might be >>>>>>>>>>>>>> helpful >>>>>>>>>>>>>> to >>>>>>>>>>>>>> be clear about the specific enhancements & changes you are >>>>>>>>>>>>>> making. >>>>>>>>>>>>>> There is already a patch with the changes mentioned above. IMO, >>>>>>>>>>>>>> these >>>>>>>>>>>>>> changes should be enabled/disabled with a config flag and not >>>>>>>>>>>>>> by >>>>>>>>>>>>>> default >>>>>>>>>>>>>> because not everyone has the same needs as us. In our testing, >>>>>>>>>>>>>> adding a >>>>>>>>>>>>>> limit on the tasks retrieved from the db requires more >>>>>>>>>>>>>> processing >>>>>>>>>>>>>> on >>>>>>>>>>>>>> the >>>>>>>>>>>>>> query which actually makes things worse when you have multiple >>>>>>>>>>>>>> small >>>>>>>>>>>>>> dags. >>>>>>>>>>>>>> I would like to see a stronger case made for configurability. >>>>>>>>>>>>>> Why >>>>>>>>>>>>>> make >>>>>>>>>>>>>> it >>>>>>>>>>>>>> configurable? If the performance is always better, it should >>>>>>>>>>>>>> not >>>>>>>>>>>>>> be >>>>>>>>>>>>>> made >>>>>>>>>>>>>> configurable. Unless it's merely released as an opt-in >>>>>>>>>>>>>> experimental >>>>>>>>>>>>>> feature. If it is worse in some profiles, let's be clear about >>>>>>>>>>>>>> that. >>>>>>>>>>>>>> I did not read anything after `Here is a simple test case that >>>>>>>>>>>>>> makes the benefits of the improvements noticeable` because, it >>>>>>>>>>>>>> seemed >>>>>>>>>>>>>> rather >>>>>>>>>>>>>> long >>>>>>>>>>>>>> winded detail about a test >>>>>>>>>>>>>> case. A higher level summary might be helpful to your audience. >>>>>>>>>>>>>> Is >>>>>>>>>>>>>> there >>>>>>>>>>>>>> a PR with your optimization. You wrote "there is a patch" but >>>>>>>>>>>>>> did >>>>>>>>>>>>>> not, >>>>>>>>>>>>>> unless I miss something, share it. I would take a look if you >>>>>>>>>>>>>> share >>>>>>>>>>>>>> it >>>>>>>>>>>>>> though. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Aug 3, 2025 at 5:08 PM Daniel Standish < >>>>>>>>>>>>> daniel.stand...@astronomer.io> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Yes Ui is another part of this. >>>>>>>>>>>>>> >>>>>>>>>>>>>> At some point the grid and graph views completely stop making >>>>>>>>>>>>>> sense >>>>>>>>>>>>>> for >>>>>>>>>>>>>> that volume, and another type of view would be required both >>>>>>>>>>>>>> for >>>>>>>>>>>>>> usability >>>>>>>>>>>>>> and performance >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 11:04 AM Jens Scheffler >>>>>>>>>>>>>> j_scheff...@gmx.de.invalid >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We also have a current demand to have a workflow to execute >>>>>>>>>>>>>>> 10k >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> 100k >>>>>>>>>>>>>>> tasks. Together with @AutomationDev85 we are working on a >>>>>>>>>>>>>>> local >>>>>>>>>>>>>>> solution >>>>>>>>>>>>>>> because we also saw problems in the Scheduler that are not >>>>>>>>>>>>>>> linearly >>>>>>>>>>>>>>> scaling. And for sure not easy to be fixed. But from our >>>>>>>>>>>>>>> investigation >>>>>>>>>>>>>>> also there are other problems to be considered like UI will >>>>>>>>>>>>>>> also >>>>>>>>>>>>>>> potentially have problems. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am a bit sceptic that PR 49160 completely fixes the >>>>>>>>>>>>>>> problems >>>>>>>>>>>>>>> mentioned >>>>>>>>>>>>>>> here and made some comments. I do not want to stop enthusiasm >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> fix >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>> improve things but the Scheduler is quite complex and changed >>>>>>>>>>>>>>> need >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> be >>>>>>>>>>>>>>> made with care. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Actually I like the patch >>> >>> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch >>> >>>>>>>>>>>>>>> as it just adds some limit preventing scheduler to focus on >>>>>>>>>>>>>>> only >>>>>>>>>>>>>>> one >>>>>>>>>>>>>>> run. But complexity is a bit big for a "patch" :-D >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'd also propose atm the way that Jarek described and >>>>>>>>>>>>>>> split-up >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> Dag >>>>>>>>>>>>>>> into multiple parts (divide and conquer) for the moment. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Otherwise if there is a concrete demand on such large Dags... >>>>>>>>>>>>>>> we >>>>>>>>>>>>>>> maybe >>>>>>>>>>>>>>> need rather a broader initiative if we want to ensure 10k, >>>>>>>>>>>>>>> 100k, >>>>>>>>>>>>>>> 1M? >>>>>>>>>>>>>>> tasks are supported per Dag. Because depending on the >>>>>>>>>>>>>>> magnitude >>>>>>>>>>>>>>> we >>>>>>>>>>>>>>> strive for different approaches are needed. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jens >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 03.08.25 16:33, Daniel Standish wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Definitely an area of the scheduler with some opportunity >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>> performance >>>>>>>>>>>>>>>> improvement. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I would just mention that, you should also attempt to >>>>>>>>>>>>>>>> include >>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>> performance testing at load / scale because, window >>>>>>>>>>>>>>>> functions >>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>> going >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> be more expensive. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What happens when you have many dags, many historical dag >>>>>>>>>>>>>>>> runs & >>>>>>>>>>>>>>>> TIs, >>>>>>>>>>>>>>>> lots >>>>>>>>>>>>>>>> of stuff running concurrently. You need to be mindful of the >>>>>>>>>>>>>>>> overall >>>>>>>>>>>>>>>> impact of such a change, and not look only at the time spent >>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>> scheduling >>>>>>>>>>>>>>>> this particular dag. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I did not look at the PRs yet, maybe you've covered this, >>>>>>>>>>>>>>>> but, >>>>>>>>>>>>>>>> it's >>>>>>>>>>>>>>>> important. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias< >>>>>>>>>>>>>>>> christos...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm going to review the PR code and test it more thoroughly >>>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>> leaving >>>>>>>>>>>>>>>>> a comment. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This is my code for reference >>> >>> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch >>> >>>>>>>>>>>>>>>>> The current version is setting a limit per dag, across all >>>>>>>>>>>>>>>>> dag_runs. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Please correct me if I'm wrong, but the PR looks like it's >>>>>>>>>>>>>>>>> changing >>>>>>>>>>>>>>>>> the way >>>>>>>>>>>>>>>>> that tasks are prioritized to avoid starvation. If that's >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> case, >>>>>>>>>>>>>>>>> I'm not >>>>>>>>>>>>>>>>> sure that this is the same issue. My proposal is that, if >>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>> reached >>>>>>>>>>>>>>>>> the max resources assigned to a dag, then stop processing >>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>> tasks >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> move on to the next one. I'm not changing how or which >>>>>>>>>>>>>>>>> tasks >>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>> picked. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 3:23 PM asquator<asqua...@proton.me >>>>>>>>>>>>>>>>> .invalid> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thank you for the feedback. >>>>>>>>>>>>>>>>>> Please, describe the case with failing limit checks in the >>>>>>>>>>>>>>>>>> PR >>>>>>>>>>>>>>>>>> (DAG's >>>>>>>>>>>>>>>>>> parameters and it's tasks' parameters and what fails to be >>>>>>>>>>>>>>>>>> checked) >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> we'll try to fix it ASAP before you can test it again. >>>>>>>>>>>>>>>>>> Let's >>>>>>>>>>>>>>>>>> continue >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> PR-related discussion in the PR itself. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias < >>>>>>>>>>>>>>>>>> christos...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thank you for bringing this PR to my attention. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I haven't studied the code but I ran a quick test on the >>>>>>>>>>>>>>>>>>> branch >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>> completely ignores the limit on scheduled tasks per dag >>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>> dag_run. >>>>>>>>>>>>>>>>>>> It >>>>>>>>>>>>>>>>>>> grabbed 70 tasks from the first dag and then moved all 70 >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> QUEUED >>>>>>>>>>>>>>>>>>> without >>>>>>>>>>>>>>>>>>> any further checks. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This is how I tested it >>> >>> https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1 >>> >>>>>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 1:44 PM >>>>>>>>>>>>>>>>>>> asquatorasqua...@proton.me >>>>>>>>>>>>>>>>>>> .invalid >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> This is a known issue stemming from the optimistic >>>>>>>>>>>>>>>>>>>> scheduling >>>>>>>>>>>>>>>>>>>> strategy >>>>>>>>>>>>>>>>>>>> used in Airflow. We do address this in the >>>>>>>>>>>>>>>>>>>> above-mentioned >>>>>>>>>>>>>>>>>>>> PR. I >>>>>>>>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> note that there are many cases where this problem may >>>>>>>>>>>>>>>>>>>> appear—it >>>>>>>>>>>>>>>>>>>> was >>>>>>>>>>>>>>>>>>>> originally detected with pools, but we are striving to >>>>>>>>>>>>>>>>>>>> fix >>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>> cases, >>>>>>>>>>>>>>>>>>>> such as the one described here with >>>>>>>>>>>>>>>>>>>> max_active_tis_per_dag, >>>>>>>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>>>> switching to >>>>>>>>>>>>>>>>>>>> pessimistic scheduling with SQL window functions. While >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>>>> strategy simply pulls the max_tis tasks and drops the >>>>>>>>>>>>>>>>>>>> ones >>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> meet >>>>>>>>>>>>>>>>>>>> the constraints, the new strategy will pull only the >>>>>>>>>>>>>>>>>>>> tasks >>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>> actually ready to be scheduled and that comply with all >>>>>>>>>>>>>>>>>>>> concurrency >>>>>>>>>>>>>>>>>>>> limits. >>>>>>>>>>>>>>>>>>>> It would be very helpful for pushing this change to >>>>>>>>>>>>>>>>>>>> production >>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>> could assist us in alpha-testing it. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> See also: >>>>>>>>>>>>>>>>>>>> https://github.com/apache/airflow/discussions/49160 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Sent with Proton Mail secure email. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif >>>>>>>>>>>>>>>>>>>> elad...@apache.org >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> i think most of your issues will be addressed by >>>>>>>>>>>>>>>>>>>>> https://github.com/apache/airflow/pull/53492 >>>>>>>>>>>>>>>>>>>>> The PR code can be tested with Breeze so you can set it >>>>>>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>>>>>> if it >>>>>>>>>>>>>>>>>>>>> solves the problem this will also help with confirming >>>>>>>>>>>>>>>>>>>>> it's >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> right >>>>>>>>>>>>>>>>>>>>> fix. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias >>>>>>>>>>>>>>>>>>>>> christos...@gmail.com >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> The scheduler is very efficient when running a large >>>>>>>>>>>>>>>>>>>>>> amount >>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> dags >>>>>>>>>>>>>>>>>>>>>> with up >>>>>>>>>>>>>>>>>>>>>> to 1000 tasks each. But in our case, we have dags with >>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>> many >>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>> 10.000 >>>>>>>>>>>>>>>>>>>>>> tasks. And in that scenario the scheduler and worker >>>>>>>>>>>>>>>>>>>>>> throughput >>>>>>>>>>>>>>>>>>>>>> drops >>>>>>>>>>>>>>>>>>>>>> significantly. Even if you have 1 such large dag with >>>>>>>>>>>>>>>>>>>>>> scheduled >>>>>>>>>>>>>>>>>>>>>> tasks, >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> performance hit becomes noticeable. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> We did some digging and we found that the issue comes >>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> scheduler's >>>>>>>>>>>>>>>>>>>>>> _executable_task_instances_to_queued >>>>>>>>>>>>>>>>>>>>>> < >>> >>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647 >>> >>>>>>>>>>>>>>>>>>>>>> method. >>>>>>>>>>>>>>>>>>>>>> In particular with the db query here >>>>>>>>>>>>>>>>>>>>>> < >>> >>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375 >>> >>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> examining the results here >>>>>>>>>>>>>>>>>>>>>> < >>> >>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425 >>> >>>>>>>>>>>>>>>>>>>>>> . >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> If you have a very large dag, and its tasks have been >>>>>>>>>>>>>>>>>>>>>> scheduled, >>>>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> scheduler will keep examining the tasks for queueing, >>>>>>>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>>>>>> reached the maximum number of active tasks for that >>>>>>>>>>>>>>>>>>>>>> particular >>>>>>>>>>>>>>>>>>>>>> dag. >>>>>>>>>>>>>>>>>>>>>> Once >>>>>>>>>>>>>>>>>>>>>> that fails, then it will move on to examine the >>>>>>>>>>>>>>>>>>>>>> scheduled >>>>>>>>>>>>>>>>>>>>>> tasks >>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> next >>>>>>>>>>>>>>>>>>>>>> dag or dag_run in line. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> This is inefficient and causes the throughput of the >>>>>>>>>>>>>>>>>>>>>> scheduler >>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> workers to drop significantly. If there are available >>>>>>>>>>>>>>>>>>>>>> slots >>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> pool and >>>>>>>>>>>>>>>>>>>>>> the max parallelism hasn't been reached yet, then the >>>>>>>>>>>>>>>>>>>>>> scheduler >>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>> stop >>>>>>>>>>>>>>>>>>>>>> processing a dag that has already reached its max >>>>>>>>>>>>>>>>>>>>>> capacity >>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> active >>>>>>>>>>>>>>>>>>>>>> tasks. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In addition, the number of scheduled tasks picked for >>>>>>>>>>>>>>>>>>>>>> examining, >>>>>>>>>>>>>>>>>>>>>> should be >>>>>>>>>>>>>>>>>>>>>> capped at the number of max active tasks if that's >>>>>>>>>>>>>>>>>>>>>> lower >>>>>>>>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> query >>>>>>>>>>>>>>>>>>>>>> limit. If the active limit is 10 and we already have 5 >>>>>>>>>>>>>>>>>>>>>> running, >>>>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>> we can >>>>>>>>>>>>>>>>>>>>>> queue at most 5 tasks. In that case, we shouldn't >>>>>>>>>>>>>>>>>>>>>> examine >>>>>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>>>>>> that. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> There is already a patch with the changes mentioned >>>>>>>>>>>>>>>>>>>>>> above. >>>>>>>>>>>>>>>>>>>>>> IMO, >>>>>>>>>>>>>>>>>>>>>> these >>>>>>>>>>>>>>>>>>>>>> changes should be enabled/disabled with a config flag >>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>>>>>> default >>>>>>>>>>>>>>>>>>>>>> because not everyone has the same needs as us. In our >>>>>>>>>>>>>>>>>>>>>> testing, >>>>>>>>>>>>>>>>>>>>>> adding a >>>>>>>>>>>>>>>>>>>>>> limit on the tasks retrieved from the db requires more >>>>>>>>>>>>>>>>>>>>>> processing >>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> query which actually makes things worse when you have >>>>>>>>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>>>>>>> small >>>>>>>>>>>>>>>>>>>>>> dags. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Here is a simple test case that makes the benefits of >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> improvements >>>>>>>>>>>>>>>>>>>>>> noticeable >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> - we have 3 dags with thousands of tasks each >>>>>>>>>>>>>>>>>>>>>> - for simplicity let's have 1 dag_run per dag >>>>>>>>>>>>>>>>>>>>>> - triggering them takes some time and due to that, the >>>>>>>>>>>>>>>>>>>>>> FIFO >>>>>>>>>>>>>>>>>>>>>> order >>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> tasks is very clear >>>>>>>>>>>>>>>>>>>>>> - e.g. 1000 tasks from dag1 were scheduled first and >>>>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>> 200 >>>>>>>>>>>>>>>>>>>>>> tasks >>>>>>>>>>>>>>>>>>>>>> from dag2 etc. >>>>>>>>>>>>>>>>>>>>>> - the executor has parallelism=100 and >>>>>>>>>>>>>>>>>>>>>> slots_available=100 >>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>> means >>>>>>>>>>>>>>>>>>>>>> that it can run up to 100 tasks concurrently >>>>>>>>>>>>>>>>>>>>>> - max_active_tasks_per_dag is 4 which means that we >>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> 4 >>>>>>>>>>>>>>>>>>>>>> tasks running per dag. >>>>>>>>>>>>>>>>>>>>>> - For 3 dags, it means that we can run up to 12 tasks >>>>>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>> time (4 tasks from each dag) >>>>>>>>>>>>>>>>>>>>>> - max tis per query are set to 32, meaning that we can >>>>>>>>>>>>>>>>>>>>>> examine >>>>>>>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>>>>> to 32 >>>>>>>>>>>>>>>>>>>>>> scheduled tasks if there are available pool slots >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> If we were to run the scheduler loop repeatedly until >>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>> queues >>>>>>>>>>>>>>>>>>>>>> 12 >>>>>>>>>>>>>>>>>>>>>> tasks >>>>>>>>>>>>>>>>>>>>>> and test the part that examines the scheduled tasks >>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> queues >>>>>>>>>>>>>>>>>>>>>> them, >>>>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> - with the query limit >>>>>>>>>>>>>>>>>>>>>> - 1 iteration, total time 0.05 >>>>>>>>>>>>>>>>>>>>>> - During the iteration >>>>>>>>>>>>>>>>>>>>>> - we have parallelism 100, available slots 100 and >>>>>>>>>>>>>>>>>>>>>> query >>>>>>>>>>>>>>>>>>>>>> limit >>>>>>>>>>>>>>>>>>>>>> 32 >>>>>>>>>>>>>>>>>>>>>> which means that it will examine up to 32 scheduled >>>>>>>>>>>>>>>>>>>>>> tasks >>>>>>>>>>>>>>>>>>>>>> - it can queue up to 100 tasks >>>>>>>>>>>>>>>>>>>>>> - examines 12 tasks (instead of 32) >>>>>>>>>>>>>>>>>>>>>> - 4 tasks from dag1, reached max for the dag >>>>>>>>>>>>>>>>>>>>>> - 4 tasks from dag2, reached max for the dag >>>>>>>>>>>>>>>>>>>>>> - and 4 tasks from dag3, reached max for the dag >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag1, reaches max for the dag and >>>>>>>>>>>>>>>>>>>>>> moves >>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag2, reaches max for the dag and >>>>>>>>>>>>>>>>>>>>>> moves >>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag3, reaches max for the dag and >>>>>>>>>>>>>>>>>>>>>> moves >>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>> - stops queueing because we have reached the maximum >>>>>>>>>>>>>>>>>>>>>> per >>>>>>>>>>>>>>>>>>>>>> dag, >>>>>>>>>>>>>>>>>>>>>> although there are slots for more tasks >>>>>>>>>>>>>>>>>>>>>> - iteration finishes >>>>>>>>>>>>>>>>>>>>>> - without >>>>>>>>>>>>>>>>>>>>>> - 3 iterations, total time 0.29 >>>>>>>>>>>>>>>>>>>>>> - During iteration 1 >>>>>>>>>>>>>>>>>>>>>> - Examines 32 tasks, all from dag1 (due to FIFO) >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag1 and tries to queue the other 28 >>>>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>> fails >>>>>>>>>>>>>>>>>>>>>> - During iteration 2 >>>>>>>>>>>>>>>>>>>>>> - examines the next 32 tasks from dag1 >>>>>>>>>>>>>>>>>>>>>> - it can't queue any of them because it has reached >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> max >>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> dag1, since the previous 4 are still running >>>>>>>>>>>>>>>>>>>>>> - examines 32 tasks from dag2 >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag2 and tries to queue the other 28 >>>>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>> fails >>>>>>>>>>>>>>>>>>>>>> - During iteration 3 >>>>>>>>>>>>>>>>>>>>>> - examines the next 32 tasks from dag1, same tasks >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> were >>>>>>>>>>>>>>>>>>>>>> examined in iteration 2 >>>>>>>>>>>>>>>>>>>>>> - it can't queue any of them because it has reached >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> max >>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> dag1 and the first 4 are still running >>>>>>>>>>>>>>>>>>>>>> - examines 32 tasks from dag2 , can't queue any of >>>>>>>>>>>>>>>>>>>>>> them >>>>>>>>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>>>>> it has reached max for dag2 as well >>>>>>>>>>>>>>>>>>>>>> - examines 32 tasks from dag3 >>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag3 and tries to queue the other 28 >>>>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>> fails >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I used very low values for all the configs so that I >>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> point >>>>>>>>>>>>>>>>>>>>>> clear and easy to understand. If we increase them, >>>>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>> patch >>>>>>>>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>>>>> makes the task selection more fair and the resource >>>>>>>>>>>>>>>>>>>>>> distribution >>>>>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>> even. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I would appreciate it if anyone familiar with the >>>>>>>>>>>>>>>>>>>>>> scheduler's >>>>>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>> confirm this and also provide any feedback. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Additionally, I have one question regarding the query >>>>>>>>>>>>>>>>>>>>>> limit. >>>>>>>>>>>>>>>>>>>>>> Should it >>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> per dag_run or per dag? I've noticed that >>>>>>>>>>>>>>>>>>>>>> max_active_tasks_per_dag >>>>>>>>>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>>>>>> been changed to provide a value per dag_run but the >>>>>>>>>>>>>>>>>>>>>> docs >>>>>>>>>>>>>>>>>>>>>> haven't >>>>>>>>>>>>>>>>>>>>>> been >>>>>>>>>>>>>>>>>>>>>> updated. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thank you! >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>> Christos Bisias >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> >>>>>>>>>>>>>>>>>>>> To unsubscribe, >>>>>>>>>>>>>>>>>>>> e-mail:dev-unsubscr...@airflow.apache.org >>>>>>>>>>>>>>>>>>>> For additional commands, >>>>>>>>>>>>>>>>>>>> e-mail:dev-h...@airflow.apache.org >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> >>>>>>>>>>>>>>>>>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org >>>>>>>>>>>>>>>>>> For additional commands, >>>>>>>>>>>>>>>>>> e-mail:dev-h...@airflow.apache.org >>> >>> --------------------------------------------------------------------- >>> >>>>>>>>> To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org >>>>>>>>> For additional commands, e-mail: dev-h...@airflow.apache.org >>> >>> --------------------------------------------------------------------- >>> >>>>>>>> To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org >>>>>>>> For additional commands, e-mail: dev-h...@airflow.apache.org >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org >>>> For additional commands, e-mail: dev-h...@airflow.apache.org > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org > For additional commands, e-mail: dev-h...@airflow.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org For additional commands, e-mail: dev-h...@airflow.apache.org