Hi, Maximilian, Thank you very much for your reply and suggestions. That makes sense to me.
> Do you think we could condition the DefaultSlotAssigner based > on whether the cluster is a session or an application cluster? We > would use the new slot assignment for application clusters. We could > do this via an internal configuration option, but I would advise not > to add a public one, as we have too many already. In my limited reading, perhaps we could use the 'execution.target' configuration in the running cluster to make such a determination. The value of 'execution.target' on the following cases: - 0). ${flink deployment mode} -> ${the value of 'execution.target'} - 1). yarn-application -> embedded - 2). local application mode -> embedded - 3). k8s-application -> embedded - 4). yarn-per-job -> yarn-per-job - 5). k8s-session -> kubernetes-session - 6). yarn-session -> yarn-session - 7). standalone session -> local - 8). local-minicluster -> local For items 1), 2), 3), and 4), using the new slot prioritization strategy mentioned previous may be a good option. If I'm wrong, please feel free to correct me. And I would greatly appreciate it if you could provide more information. Looking forward to your reply. Best, Yuepeng Pan At 2025-01-10 17:12:21, "Maximilian Michels" <m...@apache.org> wrote: ># Recap > >The current slot assignment strategy via DefaultSlotAssigner is to >pseudo-randomly assign the available TM slots. That works fine in the >following scenarios: > >1. The number of TMs remains constant >2. There is only a single slot per TaskManager > >As soon as we dynamically modify the job resource requirements via the >AdaptiveScheduler, the current slot assignment strategy makes it near >impossible to have TaskManagers without used slots, which makes >scaling down the number of TaskManagers very unpredictable and in many >cases impossible. > >The solution in https://github.com/apache/flink/pull/25218/files sorts >the TaskManager by least available slots. There were concerns raised >that in session clusters, this would result in more clocked clusters, >due to tasks being less spread-out. I agree that we probably don't >want to change this behavior in 1.X for session clusters. > ># Proposal > >@Yuepeng Do you think we could condition the DefaultSlotAssigner based >on whether the cluster is a session or an application cluster? We >would use the new slot assignment for application clusters. We could >do this via an internal configuration option, but I would advise not >to add a public one, as we have too many already. > >-Max > > > >On Tue, Jan 7, 2025 at 8:22 AM Yuepeng Pan <panyuep...@apache.org> wrote: >> >> Thanks Max and Rui for the reply and clarification. >> >> >> >> IIUC, Would setting the slot assignment strategy of >> DefaultSlotAssigner to prioritize using the minimum number of >> TaskManagers by default solve the problem? >> >> >> >> I'd be appreciated with your confirmation. >> >> >> >> >> Best, >> >> Yuepeng Pan >> >> >> >> >> >> >> >> >> At 2025-01-07 10:16:07, "Rui Fan" <1996fan...@gmail.com> wrote: >> >Happy new year! And thanks Matthias, Yuepeng and Max for your comments! >> > >> >For the reference to FLIP-138[1] from Matthias: >> > >> >As FLIP-138 mentioned: >> > >> >> In a future version, we might think about letting the ResourceManager >> >balance resources across jobs. >> > >> >I agree with this, balancing resources might be needed only >> >when a flink cluster has multiple jobs (in session mode). >> > >> >For Yuepeng's summary: >> > >> >> Please let me make a brief summary based on the historical comments: >> >> - It's agreeded to optimize/fix this issue in the 1.x TLS versions. >> >> - The primary goal of this optimization/fix is to minimize the number of >> >TaskManagers used in application mode. >> >> - The optimized logic should be controlled via a parameter. >> > >> >IIUC, the second and third points are in conflict. The second point >> >means the goal is to fix it in application mode, but the third point >> >might be needed only in session mode. If we introduce a new option >> >to balance resources in the future, it's better to only take effect >> >in the session mode. And the new option could be ignored in >> >the application mode. >> > >> >So I'm not sure whether we will fix this issue in flink 1.x for both >> >application mode and session mode? >> > >> >Generally, I'm +1 for Max's suggestion of application mode. >> > >> >Please correct me if I misunderstand anything. >> > >> >[1] >> >https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs >> > >> >Best, >> >Rui >> > >> >On Tue, Jan 7, 2025 at 1:52 AM Maximilian Michels <m...@apache.org> wrote: >> > >> >> Thanks Yuepeng for your work on this issue! >> >> >> >> I would advise not to add yet another config option to Flink. In >> >> application mode, the scheduler should default to using the least amount >> >> of >> >> resources required. From my perspective, the current behavior is a bug and >> >> it doesn't help that we can come up with scenarios where the current >> >> behavior may be more optimal (e.g. local state recovery turned on). >> >> Ultimately, it's not what users expect and we don't need another >> >> configuration option that users can set. We need sane defaults and I would >> >> strongly suggest that we fix the current default, especially because there >> >> aren't any drawbacks for existing users. >> >> >> >> -Max >> >> >> >> On Mon, Jan 6, 2025 at 7:56 AM Yuepeng Pan <panyuep...@apache.org> wrote: >> >> >> >> > Thank you Matthias and all for the feedback and suggestions. >> >> > >> >> > Please let me make a brief summary based on the historical comments: >> >> > - It's agreeded to optimize/fix this issue in the 1.x TLS versions. >> >> > - The primary goal of this optimization/fix is to minimize the number of >> >> > TaskManagers used in application mode. >> >> > - The optimized logic should be controlled via a parameter. >> >> > >> >> > I'd like to introduce the following parameter to control whether the >> >> > optimized logic should be enabled: >> >> > - Name: >> >> > jobmanager.adaptive-scheduler.resource.minimal-taskmanagers-preferred >> >> > - Type: boolean >> >> > - Default value: false >> >> > - Description: This parameter defines whether the adaptive scheduler >> >> > prioritizes >> >> > using the minimum number of TaskManagers when scheduling tasks. >> >> > Note: This parameter is currently suitable for cases that >> >> > execution.state-recovery.from-local is disabled.' >> >> > >> >> > BTW, I'm uncertain whether the introduction of a parameter for this >> >> > specific fix necessitates documentation via a FLIP. >> >> > If so, I'm willing to initiate a FLIP to aid in subsequent tasks. >> >> > If not, I will add this email address to the corresponding jira ticket's >> >> > comments for tracking and start the work on MR. >> >> > >> >> > Any suggestion would be appreciated! >> >> > >> >> > Thank you! >> >> > >> >> > Best, >> >> > Yuepeng Pan >> >> > >> >> > On 2025/01/05 18:41:11 Matthias Pohl wrote: >> >> > > Hi everyone and sorry for the late reply. I was mostly off in November >> >> > and >> >> > > forgot about that topic in December last year. >> >> > > >> >> > > Thanks for summarizing and bringing up user feedback. I see the >> >> > > problem >> >> > and >> >> > > agree with your view that it's a topic that we might want to address >> >> > > in >> >> > the >> >> > > 1.x LTS version. I see how this can be labeled as a bug or a feature >> >> > > depending on the perspective. I think adding this behavior while being >> >> > > guarded by a feature flag/configuration parameter in the 1.x LTS >> >> version >> >> > is >> >> > > reasonable. >> >> > > >> >> > > Best, >> >> > > Matthias >> >> > > >> >> > > [1] >> >> > > >> >> > >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs >> >> > > >> >> > > On Wed, Nov 6, 2024 at 9:21 AM Rui Fan <1996fan...@gmail.com> wrote: >> >> > > >> >> > > > Thanks Yuepeng for the PR and starting this discussion! >> >> > > > >> >> > > > And thanks Gyula and Yuanfeng for the input! >> >> > > > >> >> > > > I also agree to fix this behaviour in the 1.x line. >> >> > > > >> >> > > > The adaptive scheduler and rescaling API provide powerful >> >> capabilities >> >> > to >> >> > > > increase or decrease parallelism. >> >> > > > >> >> > > > The main benefit I understand of decreasing parallelism is saving >> >> > > > resources. >> >> > > > If decreasing parallelism can't save resources, why do users >> >> > > > decrease >> >> > it? >> >> > > > This is why I think releasing TM resources when decreasing >> >> parallelism >> >> > is >> >> > > > a basic capability that the Adaptive Scheduler should have. >> >> > > > >> >> > > > Please correct me if I miss anything, thanks~ >> >> > > > >> >> > > > Also, I believe it does not work as the user expects. Because this >> >> > > > behaviour >> >> > > > was reported multiple times in the flink community, such as: >> >> > > > FLINK-33977[1], >> >> > > > FLINK-35594[2], FLINK-35903[3] and Slack channel[4]. >> >> > > > And 1.20.x is a LTS version, so I agree to fix it in the 1.x line. >> >> > > > >> >> > > > [1] https://issues.apache.org/jira/browse/FLINK-33977 >> >> > > > [2] https://issues.apache.org/jira/browse/FLINK-35594 >> >> > > > [3] https://issues.apache.org/jira/browse/FLINK-35903 >> >> > > > [4] >> >> > https://apache-flink.slack.com/archives/C03G7LJTS2G/p1729167222445569 >> >> > > > >> >> > > > Best, >> >> > > > Rui >> >> > > > >> >> > > > On Wed, Nov 6, 2024 at 4:15 PM yuanfeng hu <yuanf...@apache.org> >> >> > wrote: >> >> > > > >> >> > > >> > Is it considered an error if the adaptive scheduler fails to >> >> > release the >> >> > > >> task manager during scaling? >> >> > > >> >> >> > > >> +1 . When we enable adaptive mode and perform scaling operations on >> >> > tasks, >> >> > > >> a significant part of the goal is to reduce resource usage for the >> >> > tasks. >> >> > > >> However, due to some logic in the adaptive scheduler's scheduling >> >> > process, >> >> > > >> the task manager cannot be released, and the ultimate goal cannot >> >> > > >> be >> >> > > >> achieved. Therefore, I consider this to be a mistake. >> >> > > >> >> >> > > >> Additionally, many tasks are currently running in this mode and >> >> > > >> will >> >> > > >> continue to run for quite a long time (many users are in this >> >> > situation). >> >> > > >> So whether or not it is considered a bug, I believe we need to fix >> >> it >> >> > in >> >> > > >> the 1.x version. >> >> > > >> >> >> > > >> Yuepeng Pan <panyuep...@apache.org> 于2024年11月6日周三 14:32写道: >> >> > > >> >> >> > > >> > Hi, community. >> >> > > >> > >> >> > > >> > >> >> > > >> > >> >> > > >> > >> >> > > >> > When working on ticket[1] we have received some lively >> >> > > >> > discussions >> >> > and >> >> > > >> > valuable >> >> > > >> > feedback[2](thanks for Matthias, Rui, Gyula, Maximilian, Tison, >> >> > etc.), >> >> > > >> the >> >> > > >> > main issues are that: >> >> > > >> > >> >> > > >> > When the job runs in an application cluster, could the default >> >> > behavior >> >> > > >> of >> >> > > >> > AdaptiveScheduler not actively releasing Taskmanagers resources >> >> > during >> >> > > >> > downscaling be considered a bug? >> >> > > >> > >> >> > > >> > If so,should we fix it in flink 1.x? >> >> > > >> > >> >> > > >> > >> >> > > >> > >> >> > > >> > I’d like to start a discussion to hear more comments about it to >> >> > define >> >> > > >> > the next step and I have sorted out some information in the >> >> > > >> > doc[3] >> >> > > >> > regarding this discussion for you. >> >> > > >> > >> >> > > >> > >> >> > > >> > >> >> > > >> > Looking forward to your comments and attention. >> >> > > >> > >> >> > > >> > Thank you. >> >> > > >> > >> >> > > >> > Best, >> >> > > >> > Yuepeng Pan >> >> > > >> > >> >> > > >> > >> >> > > >> > >> >> > > >> > >> >> > > >> > [1] https://issues.apache.org/jira/browse/FLINK-33977 >> >> > > >> > >> >> > > >> > [2] >> >> > https://github.com/apache/flink/pull/25218#issuecomment-2401913141 >> >> > > >> > >> >> > > >> > [3] >> >> > > >> > >> >> > > >> >> >> > >> >> https://docs.google.com/document/d/1Rwwl2aGVz9g5kUJFMP5GMlJwzEO_a-eo4gPf7gITpdw/edit?tab=t.0#heading=h.s4i4hehbbli5 >> >> > > >> > >> >> > > >> > >> >> > > >> > >> >> > > >> >> >> > > >> -- >> >> > > >> Best, >> >> > > >> Yuanfeng >> >> > > >> >> >> > > > >> >> > > >> >> > >> >>