Hi all, It hasn't received further responses to this email over the past few days, and as such, we currently lack sufficient feedback to draw a definitive conclusion. I'd like to proceed by merging both approaches as much as possible to reach the final consensus. I'll move forward with this plan to address the issue and update the PR[1] accordingly:
> - It's agreeded to optimize/fix this issue in the 1.x TLS versions. > - The primary goal of this optimization/fix is to minimize the number of > TaskManagers used in application mode only. > - The optimized logic will be set as the default logic, and the original > logic will be retained when explicitly enabled through a parameter. Please rest assured, I am not rushing ahead recklessly. If there is anything inappropriate about this conclusion/measure or the reasoning supporting it, I will promptly halt and revise the action. > a. The default behavior of this new parameter (the optimized logic) aligns > with the expected behavior of most application deployment mode users > who would have this behavior enabled by default. > Therefore, it doesn't add complexity in terms of configuration for them. > b. However, this would require a small number of users who want to keep the > original behavior to actively configure this setting. > So, this still gives users the flexibility to choose. > c. Since this issue is only fixed by adopting this solution in version 1.x > LTS with application deployment mode, > this parameter doesn't have a plan for forward compatibility, and a new > parameter would also be acceptable to me. Thank you all very much for your attention and help. Best, Yuepeng Pan [1] https://github.com/apache/flink/pull/25218 On 2025/01/20 01:56:36 Yuepeng Pan wrote: > Hi, Maximilian, Rui, Matthias: > Thanks for the response, which gives me a general understanding of your > proposed approach and its implementation outline. > > Hi, All: > Thank you all very much for the discussion and suggestions. > > Based on the discussions we have received so far, > we have reached a preliminary consensus: > > - When the job runs in an application cluster, the default behavior > of AdaptiveScheduler not actively releasing Taskmanagers resources > during downscaling could be considered a > bug (At least from certain perspectives, this is the case). > - We should fix it in flink 1.x. > > However, there's still no consensus in the discussion on how to fix this > issue under the following conditions: > - Flink 1.x series versions and Application deployment mode (It's not about > to session cluster mode.) > > Strategy list: > > 1). Adding this behavior while being guarded by a feature flag/configuration > parameter in the 1.x LTS version. > (@Matthias If my understanding is incorrect, please correct me, thanks! ) > a. This enables the option for users to revert to the original behavior > eg. when ignoring idle resource occupation and focusing only > on the resource waiting time during rescaling, this can achieve some > positive impacts. > b. Introducing new parameters increases the complexity for users, > as Maximilian mentioned, we already have many parameters. > > 2). Set the behavior as the default without introducing new parameters in the > 1.x LTS version. > a. Avoid introducing new parameters and reduce complexity for users. > b. This disables the option for users to revert to the original behavior. > > We have to seek some trade-offs/change between the two options above in order > to make a choice and reach a consensus on the conclusion. > > Although Option-1) increases the complexity for users to use, I prefer to > Option-1) due to the following reasons if we could set the default behavior > in Option - 1) to the new behavior: > a. This new parameter aligns with the expected behavior for most > application deployment mode users, > who would have this behavior enabled by default. > Therefore, it doesn't add complexity in terms of configuration for > them. > b. However, this would require users who want to keep the original > behavior to actively configure this setting. > So, this still gives users the flexibility to choose. > c. Since this issue is only fixed by adopting this solution in version > 1.x LTS with application deployment mode, > this parameter doesn't have a plan for forward compatibility, and a > new parameter would also be acceptable to me. > > I'd like to hear more ideas about it or your opinions on the options > mentioned above to reach a final and reasonable consensus. > > Thank you very much. > > Best, > Yuepeng. > > > On 2025/01/15 08:54:23 Maximilian Michels wrote: > > Hey Yuepeng, > > > > I think that would work. > > > > Thanks, > > Max > > > > On Sun, Jan 12, 2025 at 3:42 PM Yuepeng Pan <panyuep...@apache.org> wrote: > > > > > > Hi, Maximilian, Thank you very much for your reply and suggestions. > > > > > > That makes sense to me. > > > > > > > > > > > > > > > > Do you think we could condition the DefaultSlotAssigner based > > > > > > > on whether the cluster is a session or an application cluster? We > > > > > > > would use the new slot assignment for application clusters. We could > > > > > > > do this via an internal configuration option, but I would advise not > > > > > > > to add a public one, as we have too many already. > > > > > > > > > > > > > > > In my limited reading, perhaps we could use the 'execution.target' > > > configuration > > > > > > in the running cluster to make such a determination. > > > > > > > > > > > > > > > The value of 'execution.target' on the following cases: > > > > > > > > > > > > > > > - 0). ${flink deployment mode} -> ${the value of 'execution.target'} > > > > > > > > > > > > > > > - 1). yarn-application -> embedded > > > > > > - 2). local application mode -> embedded > > > > > > - 3). k8s-application -> embedded > > > > > > - 4). yarn-per-job -> yarn-per-job > > > > > > - 5). k8s-session -> kubernetes-session > > > > > > - 6). yarn-session -> yarn-session > > > > > > - 7). standalone session -> local > > > > > > - 8). local-minicluster -> local > > > > > > > > > > > > > > > For items 1), 2), 3), and 4), using the new slot prioritization strategy > > > mentioned previous may be a good option. > > > If I'm wrong, please feel free to correct me. > > > And I would greatly appreciate it if you could provide more information. > > > > > > > > > > > > > > > Looking forward to your reply. > > > > > > > > > > > > > > > Best, > > > > > > Yuepeng Pan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > At 2025-01-10 17:12:21, "Maximilian Michels" <m...@apache.org> wrote: > > > ># Recap > > > > > > > >The current slot assignment strategy via DefaultSlotAssigner is to > > > >pseudo-randomly assign the available TM slots. That works fine in the > > > >following scenarios: > > > > > > > >1. The number of TMs remains constant > > > >2. There is only a single slot per TaskManager > > > > > > > >As soon as we dynamically modify the job resource requirements via the > > > >AdaptiveScheduler, the current slot assignment strategy makes it near > > > >impossible to have TaskManagers without used slots, which makes > > > >scaling down the number of TaskManagers very unpredictable and in many > > > >cases impossible. > > > > > > > >The solution in https://github.com/apache/flink/pull/25218/files sorts > > > >the TaskManager by least available slots. There were concerns raised > > > >that in session clusters, this would result in more clocked clusters, > > > >due to tasks being less spread-out. I agree that we probably don't > > > >want to change this behavior in 1.X for session clusters. > > > > > > > ># Proposal > > > > > > > >@Yuepeng Do you think we could condition the DefaultSlotAssigner based > > > >on whether the cluster is a session or an application cluster? We > > > >would use the new slot assignment for application clusters. We could > > > >do this via an internal configuration option, but I would advise not > > > >to add a public one, as we have too many already. > > > > > > > >-Max > > > > > > > > > > > > > > > >On Tue, Jan 7, 2025 at 8:22 AM Yuepeng Pan <panyuep...@apache.org> wrote: > > > >> > > > >> Thanks Max and Rui for the reply and clarification. > > > >> > > > >> > > > >> > > > >> IIUC, Would setting the slot assignment strategy of > > > >> DefaultSlotAssigner to prioritize using the minimum number of > > > >> TaskManagers by default solve the problem? > > > >> > > > >> > > > >> > > > >> I'd be appreciated with your confirmation. > > > >> > > > >> > > > >> > > > >> > > > >> Best, > > > >> > > > >> Yuepeng Pan > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> At 2025-01-07 10:16:07, "Rui Fan" <1996fan...@gmail.com> wrote: > > > >> >Happy new year! And thanks Matthias, Yuepeng and Max for your > > > >> >comments! > > > >> > > > > >> >For the reference to FLIP-138[1] from Matthias: > > > >> > > > > >> >As FLIP-138 mentioned: > > > >> > > > > >> >> In a future version, we might think about letting the > > > >> >> ResourceManager > > > >> >balance resources across jobs. > > > >> > > > > >> >I agree with this, balancing resources might be needed only > > > >> >when a flink cluster has multiple jobs (in session mode). > > > >> > > > > >> >For Yuepeng's summary: > > > >> > > > > >> >> Please let me make a brief summary based on the historical comments: > > > >> >> - It's agreeded to optimize/fix this issue in the 1.x TLS versions. > > > >> >> - The primary goal of this optimization/fix is to minimize the > > > >> >> number of > > > >> >TaskManagers used in application mode. > > > >> >> - The optimized logic should be controlled via a parameter. > > > >> > > > > >> >IIUC, the second and third points are in conflict. The second point > > > >> >means the goal is to fix it in application mode, but the third point > > > >> >might be needed only in session mode. If we introduce a new option > > > >> >to balance resources in the future, it's better to only take effect > > > >> >in the session mode. And the new option could be ignored in > > > >> >the application mode. > > > >> > > > > >> >So I'm not sure whether we will fix this issue in flink 1.x for both > > > >> >application mode and session mode? > > > >> > > > > >> >Generally, I'm +1 for Max's suggestion of application mode. > > > >> > > > > >> >Please correct me if I misunderstand anything. > > > >> > > > > >> >[1] > > > >> >https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs > > > >> > > > > >> >Best, > > > >> >Rui > > > >> > > > > >> >On Tue, Jan 7, 2025 at 1:52 AM Maximilian Michels <m...@apache.org> > > > >> >wrote: > > > >> > > > > >> >> Thanks Yuepeng for your work on this issue! > > > >> >> > > > >> >> I would advise not to add yet another config option to Flink. In > > > >> >> application mode, the scheduler should default to using the least > > > >> >> amount of > > > >> >> resources required. From my perspective, the current behavior is a > > > >> >> bug and > > > >> >> it doesn't help that we can come up with scenarios where the current > > > >> >> behavior may be more optimal (e.g. local state recovery turned on). > > > >> >> Ultimately, it's not what users expect and we don't need another > > > >> >> configuration option that users can set. We need sane defaults and > > > >> >> I would > > > >> >> strongly suggest that we fix the current default, especially > > > >> >> because there > > > >> >> aren't any drawbacks for existing users. > > > >> >> > > > >> >> -Max > > > >> >> > > > >> >> On Mon, Jan 6, 2025 at 7:56 AM Yuepeng Pan <panyuep...@apache.org> > > > >> >> wrote: > > > >> >> > > > >> >> > Thank you Matthias and all for the feedback and suggestions. > > > >> >> > > > > >> >> > Please let me make a brief summary based on the historical > > > >> >> > comments: > > > >> >> > - It's agreeded to optimize/fix this issue in the 1.x TLS > > > >> >> > versions. > > > >> >> > - The primary goal of this optimization/fix is to minimize the > > > >> >> > number of > > > >> >> > TaskManagers used in application mode. > > > >> >> > - The optimized logic should be controlled via a parameter. > > > >> >> > > > > >> >> > I'd like to introduce the following parameter to control whether > > > >> >> > the > > > >> >> > optimized logic should be enabled: > > > >> >> > - Name: > > > >> >> > jobmanager.adaptive-scheduler.resource.minimal-taskmanagers-preferred > > > >> >> > - Type: boolean > > > >> >> > - Default value: false > > > >> >> > - Description: This parameter defines whether the adaptive > > > >> >> > scheduler > > > >> >> > prioritizes > > > >> >> > using the minimum number of TaskManagers when scheduling tasks. > > > >> >> > Note: This parameter is currently suitable for cases that > > > >> >> > execution.state-recovery.from-local is disabled.' > > > >> >> > > > > >> >> > BTW, I'm uncertain whether the introduction of a parameter for > > > >> >> > this > > > >> >> > specific fix necessitates documentation via a FLIP. > > > >> >> > If so, I'm willing to initiate a FLIP to aid in subsequent tasks. > > > >> >> > If not, I will add this email address to the corresponding jira > > > >> >> > ticket's > > > >> >> > comments for tracking and start the work on MR. > > > >> >> > > > > >> >> > Any suggestion would be appreciated! > > > >> >> > > > > >> >> > Thank you! > > > >> >> > > > > >> >> > Best, > > > >> >> > Yuepeng Pan > > > >> >> > > > > >> >> > On 2025/01/05 18:41:11 Matthias Pohl wrote: > > > >> >> > > Hi everyone and sorry for the late reply. I was mostly off in > > > >> >> > > November > > > >> >> > and > > > >> >> > > forgot about that topic in December last year. > > > >> >> > > > > > >> >> > > Thanks for summarizing and bringing up user feedback. I see the > > > >> >> > > problem > > > >> >> > and > > > >> >> > > agree with your view that it's a topic that we might want to > > > >> >> > > address in > > > >> >> > the > > > >> >> > > 1.x LTS version. I see how this can be labeled as a bug or a > > > >> >> > > feature > > > >> >> > > depending on the perspective. I think adding this behavior > > > >> >> > > while being > > > >> >> > > guarded by a feature flag/configuration parameter in the 1.x LTS > > > >> >> version > > > >> >> > is > > > >> >> > > reasonable. > > > >> >> > > > > > >> >> > > Best, > > > >> >> > > Matthias > > > >> >> > > > > > >> >> > > [1] > > > >> >> > > > > > >> >> > > > > >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs > > > >> >> > > > > > >> >> > > On Wed, Nov 6, 2024 at 9:21 AM Rui Fan <1996fan...@gmail.com> > > > >> >> > > wrote: > > > >> >> > > > > > >> >> > > > Thanks Yuepeng for the PR and starting this discussion! > > > >> >> > > > > > > >> >> > > > And thanks Gyula and Yuanfeng for the input! > > > >> >> > > > > > > >> >> > > > I also agree to fix this behaviour in the 1.x line. > > > >> >> > > > > > > >> >> > > > The adaptive scheduler and rescaling API provide powerful > > > >> >> capabilities > > > >> >> > to > > > >> >> > > > increase or decrease parallelism. > > > >> >> > > > > > > >> >> > > > The main benefit I understand of decreasing parallelism is > > > >> >> > > > saving > > > >> >> > > > resources. > > > >> >> > > > If decreasing parallelism can't save resources, why do users > > > >> >> > > > decrease > > > >> >> > it? > > > >> >> > > > This is why I think releasing TM resources when decreasing > > > >> >> parallelism > > > >> >> > is > > > >> >> > > > a basic capability that the Adaptive Scheduler should have. > > > >> >> > > > > > > >> >> > > > Please correct me if I miss anything, thanks~ > > > >> >> > > > > > > >> >> > > > Also, I believe it does not work as the user expects. Because > > > >> >> > > > this > > > >> >> > > > behaviour > > > >> >> > > > was reported multiple times in the flink community, such as: > > > >> >> > > > FLINK-33977[1], > > > >> >> > > > FLINK-35594[2], FLINK-35903[3] and Slack channel[4]. > > > >> >> > > > And 1.20.x is a LTS version, so I agree to fix it in the 1.x > > > >> >> > > > line. > > > >> >> > > > > > > >> >> > > > [1] https://issues.apache.org/jira/browse/FLINK-33977 > > > >> >> > > > [2] https://issues.apache.org/jira/browse/FLINK-35594 > > > >> >> > > > [3] https://issues.apache.org/jira/browse/FLINK-35903 > > > >> >> > > > [4] > > > >> >> > https://apache-flink.slack.com/archives/C03G7LJTS2G/p1729167222445569 > > > >> >> > > > > > > >> >> > > > Best, > > > >> >> > > > Rui > > > >> >> > > > > > > >> >> > > > On Wed, Nov 6, 2024 at 4:15 PM yuanfeng hu > > > >> >> > > > <yuanf...@apache.org> > > > >> >> > wrote: > > > >> >> > > > > > > >> >> > > >> > Is it considered an error if the adaptive scheduler fails > > > >> >> > > >> > to > > > >> >> > release the > > > >> >> > > >> task manager during scaling? > > > >> >> > > >> > > > >> >> > > >> +1 . When we enable adaptive mode and perform scaling > > > >> >> > > >> operations on > > > >> >> > tasks, > > > >> >> > > >> a significant part of the goal is to reduce resource usage > > > >> >> > > >> for the > > > >> >> > tasks. > > > >> >> > > >> However, due to some logic in the adaptive scheduler's > > > >> >> > > >> scheduling > > > >> >> > process, > > > >> >> > > >> the task manager cannot be released, and the ultimate goal > > > >> >> > > >> cannot be > > > >> >> > > >> achieved. Therefore, I consider this to be a mistake. > > > >> >> > > >> > > > >> >> > > >> Additionally, many tasks are currently running in this mode > > > >> >> > > >> and will > > > >> >> > > >> continue to run for quite a long time (many users are in this > > > >> >> > situation). > > > >> >> > > >> So whether or not it is considered a bug, I believe we need > > > >> >> > > >> to fix > > > >> >> it > > > >> >> > in > > > >> >> > > >> the 1.x version. > > > >> >> > > >> > > > >> >> > > >> Yuepeng Pan <panyuep...@apache.org> 于2024年11月6日周三 14:32写道: > > > >> >> > > >> > > > >> >> > > >> > Hi, community. > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > When working on ticket[1] we have received some lively > > > >> >> > > >> > discussions > > > >> >> > and > > > >> >> > > >> > valuable > > > >> >> > > >> > feedback[2](thanks for Matthias, Rui, Gyula, Maximilian, > > > >> >> > > >> > Tison, > > > >> >> > etc.), > > > >> >> > > >> the > > > >> >> > > >> > main issues are that: > > > >> >> > > >> > > > > >> >> > > >> > When the job runs in an application cluster, could the > > > >> >> > > >> > default > > > >> >> > behavior > > > >> >> > > >> of > > > >> >> > > >> > AdaptiveScheduler not actively releasing Taskmanagers > > > >> >> > > >> > resources > > > >> >> > during > > > >> >> > > >> > downscaling be considered a bug? > > > >> >> > > >> > > > > >> >> > > >> > If so,should we fix it in flink 1.x? > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > I’d like to start a discussion to hear more comments about > > > >> >> > > >> > it to > > > >> >> > define > > > >> >> > > >> > the next step and I have sorted out some information in > > > >> >> > > >> > the doc[3] > > > >> >> > > >> > regarding this discussion for you. > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > Looking forward to your comments and attention. > > > >> >> > > >> > > > > >> >> > > >> > Thank you. > > > >> >> > > >> > > > > >> >> > > >> > Best, > > > >> >> > > >> > Yuepeng Pan > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > [1] https://issues.apache.org/jira/browse/FLINK-33977 > > > >> >> > > >> > > > > >> >> > > >> > [2] > > > >> >> > https://github.com/apache/flink/pull/25218#issuecomment-2401913141 > > > >> >> > > >> > > > > >> >> > > >> > [3] > > > >> >> > > >> > > > > >> >> > > >> > > > >> >> > > > > >> >> https://docs.google.com/document/d/1Rwwl2aGVz9g5kUJFMP5GMlJwzEO_a-eo4gPf7gITpdw/edit?tab=t.0#heading=h.s4i4hehbbli5 > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > >> >> > > >> -- > > > >> >> > > >> Best, > > > >> >> > > >> Yuanfeng > > > >> >> > > >> > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > >