Hey Yuepeng, I think that would work.
Thanks, Max On Sun, Jan 12, 2025 at 3:42 PM Yuepeng Pan <panyuep...@apache.org> wrote: > > Hi, Maximilian, Thank you very much for your reply and suggestions. > > That makes sense to me. > > > > > > Do you think we could condition the DefaultSlotAssigner based > > > on whether the cluster is a session or an application cluster? We > > > would use the new slot assignment for application clusters. We could > > > do this via an internal configuration option, but I would advise not > > > to add a public one, as we have too many already. > > > > > In my limited reading, perhaps we could use the 'execution.target' > configuration > > in the running cluster to make such a determination. > > > > > The value of 'execution.target' on the following cases: > > > > > - 0). ${flink deployment mode} -> ${the value of 'execution.target'} > > > > > - 1). yarn-application -> embedded > > - 2). local application mode -> embedded > > - 3). k8s-application -> embedded > > - 4). yarn-per-job -> yarn-per-job > > - 5). k8s-session -> kubernetes-session > > - 6). yarn-session -> yarn-session > > - 7). standalone session -> local > > - 8). local-minicluster -> local > > > > > For items 1), 2), 3), and 4), using the new slot prioritization strategy > mentioned previous may be a good option. > If I'm wrong, please feel free to correct me. > And I would greatly appreciate it if you could provide more information. > > > > > Looking forward to your reply. > > > > > Best, > > Yuepeng Pan > > > > > > > > > > > > > > > > > > At 2025-01-10 17:12:21, "Maximilian Michels" <m...@apache.org> wrote: > ># Recap > > > >The current slot assignment strategy via DefaultSlotAssigner is to > >pseudo-randomly assign the available TM slots. That works fine in the > >following scenarios: > > > >1. The number of TMs remains constant > >2. There is only a single slot per TaskManager > > > >As soon as we dynamically modify the job resource requirements via the > >AdaptiveScheduler, the current slot assignment strategy makes it near > >impossible to have TaskManagers without used slots, which makes > >scaling down the number of TaskManagers very unpredictable and in many > >cases impossible. > > > >The solution in https://github.com/apache/flink/pull/25218/files sorts > >the TaskManager by least available slots. There were concerns raised > >that in session clusters, this would result in more clocked clusters, > >due to tasks being less spread-out. I agree that we probably don't > >want to change this behavior in 1.X for session clusters. > > > ># Proposal > > > >@Yuepeng Do you think we could condition the DefaultSlotAssigner based > >on whether the cluster is a session or an application cluster? We > >would use the new slot assignment for application clusters. We could > >do this via an internal configuration option, but I would advise not > >to add a public one, as we have too many already. > > > >-Max > > > > > > > >On Tue, Jan 7, 2025 at 8:22 AM Yuepeng Pan <panyuep...@apache.org> wrote: > >> > >> Thanks Max and Rui for the reply and clarification. > >> > >> > >> > >> IIUC, Would setting the slot assignment strategy of > >> DefaultSlotAssigner to prioritize using the minimum number of > >> TaskManagers by default solve the problem? > >> > >> > >> > >> I'd be appreciated with your confirmation. > >> > >> > >> > >> > >> Best, > >> > >> Yuepeng Pan > >> > >> > >> > >> > >> > >> > >> > >> > >> At 2025-01-07 10:16:07, "Rui Fan" <1996fan...@gmail.com> wrote: > >> >Happy new year! And thanks Matthias, Yuepeng and Max for your comments! > >> > > >> >For the reference to FLIP-138[1] from Matthias: > >> > > >> >As FLIP-138 mentioned: > >> > > >> >> In a future version, we might think about letting the ResourceManager > >> >balance resources across jobs. > >> > > >> >I agree with this, balancing resources might be needed only > >> >when a flink cluster has multiple jobs (in session mode). > >> > > >> >For Yuepeng's summary: > >> > > >> >> Please let me make a brief summary based on the historical comments: > >> >> - It's agreeded to optimize/fix this issue in the 1.x TLS versions. > >> >> - The primary goal of this optimization/fix is to minimize the number of > >> >TaskManagers used in application mode. > >> >> - The optimized logic should be controlled via a parameter. > >> > > >> >IIUC, the second and third points are in conflict. The second point > >> >means the goal is to fix it in application mode, but the third point > >> >might be needed only in session mode. If we introduce a new option > >> >to balance resources in the future, it's better to only take effect > >> >in the session mode. And the new option could be ignored in > >> >the application mode. > >> > > >> >So I'm not sure whether we will fix this issue in flink 1.x for both > >> >application mode and session mode? > >> > > >> >Generally, I'm +1 for Max's suggestion of application mode. > >> > > >> >Please correct me if I misunderstand anything. > >> > > >> >[1] > >> >https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs > >> > > >> >Best, > >> >Rui > >> > > >> >On Tue, Jan 7, 2025 at 1:52 AM Maximilian Michels <m...@apache.org> wrote: > >> > > >> >> Thanks Yuepeng for your work on this issue! > >> >> > >> >> I would advise not to add yet another config option to Flink. In > >> >> application mode, the scheduler should default to using the least > >> >> amount of > >> >> resources required. From my perspective, the current behavior is a bug > >> >> and > >> >> it doesn't help that we can come up with scenarios where the current > >> >> behavior may be more optimal (e.g. local state recovery turned on). > >> >> Ultimately, it's not what users expect and we don't need another > >> >> configuration option that users can set. We need sane defaults and I > >> >> would > >> >> strongly suggest that we fix the current default, especially because > >> >> there > >> >> aren't any drawbacks for existing users. > >> >> > >> >> -Max > >> >> > >> >> On Mon, Jan 6, 2025 at 7:56 AM Yuepeng Pan <panyuep...@apache.org> > >> >> wrote: > >> >> > >> >> > Thank you Matthias and all for the feedback and suggestions. > >> >> > > >> >> > Please let me make a brief summary based on the historical comments: > >> >> > - It's agreeded to optimize/fix this issue in the 1.x TLS versions. > >> >> > - The primary goal of this optimization/fix is to minimize the number > >> >> > of > >> >> > TaskManagers used in application mode. > >> >> > - The optimized logic should be controlled via a parameter. > >> >> > > >> >> > I'd like to introduce the following parameter to control whether the > >> >> > optimized logic should be enabled: > >> >> > - Name: > >> >> > jobmanager.adaptive-scheduler.resource.minimal-taskmanagers-preferred > >> >> > - Type: boolean > >> >> > - Default value: false > >> >> > - Description: This parameter defines whether the adaptive scheduler > >> >> > prioritizes > >> >> > using the minimum number of TaskManagers when scheduling tasks. > >> >> > Note: This parameter is currently suitable for cases that > >> >> > execution.state-recovery.from-local is disabled.' > >> >> > > >> >> > BTW, I'm uncertain whether the introduction of a parameter for this > >> >> > specific fix necessitates documentation via a FLIP. > >> >> > If so, I'm willing to initiate a FLIP to aid in subsequent tasks. > >> >> > If not, I will add this email address to the corresponding jira > >> >> > ticket's > >> >> > comments for tracking and start the work on MR. > >> >> > > >> >> > Any suggestion would be appreciated! > >> >> > > >> >> > Thank you! > >> >> > > >> >> > Best, > >> >> > Yuepeng Pan > >> >> > > >> >> > On 2025/01/05 18:41:11 Matthias Pohl wrote: > >> >> > > Hi everyone and sorry for the late reply. I was mostly off in > >> >> > > November > >> >> > and > >> >> > > forgot about that topic in December last year. > >> >> > > > >> >> > > Thanks for summarizing and bringing up user feedback. I see the > >> >> > > problem > >> >> > and > >> >> > > agree with your view that it's a topic that we might want to > >> >> > > address in > >> >> > the > >> >> > > 1.x LTS version. I see how this can be labeled as a bug or a feature > >> >> > > depending on the perspective. I think adding this behavior while > >> >> > > being > >> >> > > guarded by a feature flag/configuration parameter in the 1.x LTS > >> >> version > >> >> > is > >> >> > > reasonable. > >> >> > > > >> >> > > Best, > >> >> > > Matthias > >> >> > > > >> >> > > [1] > >> >> > > > >> >> > > >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs > >> >> > > > >> >> > > On Wed, Nov 6, 2024 at 9:21 AM Rui Fan <1996fan...@gmail.com> wrote: > >> >> > > > >> >> > > > Thanks Yuepeng for the PR and starting this discussion! > >> >> > > > > >> >> > > > And thanks Gyula and Yuanfeng for the input! > >> >> > > > > >> >> > > > I also agree to fix this behaviour in the 1.x line. > >> >> > > > > >> >> > > > The adaptive scheduler and rescaling API provide powerful > >> >> capabilities > >> >> > to > >> >> > > > increase or decrease parallelism. > >> >> > > > > >> >> > > > The main benefit I understand of decreasing parallelism is saving > >> >> > > > resources. > >> >> > > > If decreasing parallelism can't save resources, why do users > >> >> > > > decrease > >> >> > it? > >> >> > > > This is why I think releasing TM resources when decreasing > >> >> parallelism > >> >> > is > >> >> > > > a basic capability that the Adaptive Scheduler should have. > >> >> > > > > >> >> > > > Please correct me if I miss anything, thanks~ > >> >> > > > > >> >> > > > Also, I believe it does not work as the user expects. Because this > >> >> > > > behaviour > >> >> > > > was reported multiple times in the flink community, such as: > >> >> > > > FLINK-33977[1], > >> >> > > > FLINK-35594[2], FLINK-35903[3] and Slack channel[4]. > >> >> > > > And 1.20.x is a LTS version, so I agree to fix it in the 1.x line. > >> >> > > > > >> >> > > > [1] https://issues.apache.org/jira/browse/FLINK-33977 > >> >> > > > [2] https://issues.apache.org/jira/browse/FLINK-35594 > >> >> > > > [3] https://issues.apache.org/jira/browse/FLINK-35903 > >> >> > > > [4] > >> >> > https://apache-flink.slack.com/archives/C03G7LJTS2G/p1729167222445569 > >> >> > > > > >> >> > > > Best, > >> >> > > > Rui > >> >> > > > > >> >> > > > On Wed, Nov 6, 2024 at 4:15 PM yuanfeng hu <yuanf...@apache.org> > >> >> > wrote: > >> >> > > > > >> >> > > >> > Is it considered an error if the adaptive scheduler fails to > >> >> > release the > >> >> > > >> task manager during scaling? > >> >> > > >> > >> >> > > >> +1 . When we enable adaptive mode and perform scaling operations > >> >> > > >> on > >> >> > tasks, > >> >> > > >> a significant part of the goal is to reduce resource usage for > >> >> > > >> the > >> >> > tasks. > >> >> > > >> However, due to some logic in the adaptive scheduler's scheduling > >> >> > process, > >> >> > > >> the task manager cannot be released, and the ultimate goal > >> >> > > >> cannot be > >> >> > > >> achieved. Therefore, I consider this to be a mistake. > >> >> > > >> > >> >> > > >> Additionally, many tasks are currently running in this mode and > >> >> > > >> will > >> >> > > >> continue to run for quite a long time (many users are in this > >> >> > situation). > >> >> > > >> So whether or not it is considered a bug, I believe we need to > >> >> > > >> fix > >> >> it > >> >> > in > >> >> > > >> the 1.x version. > >> >> > > >> > >> >> > > >> Yuepeng Pan <panyuep...@apache.org> 于2024年11月6日周三 14:32写道: > >> >> > > >> > >> >> > > >> > Hi, community. > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > When working on ticket[1] we have received some lively > >> >> > > >> > discussions > >> >> > and > >> >> > > >> > valuable > >> >> > > >> > feedback[2](thanks for Matthias, Rui, Gyula, Maximilian, Tison, > >> >> > etc.), > >> >> > > >> the > >> >> > > >> > main issues are that: > >> >> > > >> > > >> >> > > >> > When the job runs in an application cluster, could the default > >> >> > behavior > >> >> > > >> of > >> >> > > >> > AdaptiveScheduler not actively releasing Taskmanagers resources > >> >> > during > >> >> > > >> > downscaling be considered a bug? > >> >> > > >> > > >> >> > > >> > If so,should we fix it in flink 1.x? > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > I’d like to start a discussion to hear more comments about it > >> >> > > >> > to > >> >> > define > >> >> > > >> > the next step and I have sorted out some information in the > >> >> > > >> > doc[3] > >> >> > > >> > regarding this discussion for you. > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > Looking forward to your comments and attention. > >> >> > > >> > > >> >> > > >> > Thank you. > >> >> > > >> > > >> >> > > >> > Best, > >> >> > > >> > Yuepeng Pan > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > [1] https://issues.apache.org/jira/browse/FLINK-33977 > >> >> > > >> > > >> >> > > >> > [2] > >> >> > https://github.com/apache/flink/pull/25218#issuecomment-2401913141 > >> >> > > >> > > >> >> > > >> > [3] > >> >> > > >> > > >> >> > > >> > >> >> > > >> >> https://docs.google.com/document/d/1Rwwl2aGVz9g5kUJFMP5GMlJwzEO_a-eo4gPf7gITpdw/edit?tab=t.0#heading=h.s4i4hehbbli5 > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > >> >> > > >> -- > >> >> > > >> Best, > >> >> > > >> Yuanfeng > >> >> > > >> > >> >> > > > > >> >> > > > >> >> > > >> >>