Hi, Maximilian, Rui, Matthias:
Thanks for the response, which gives me a general understanding of your 
proposed approach and its implementation outline.

Hi, All:
Thank you all very much for the discussion and suggestions.

Based on the discussions we have received so far,
we have reached a preliminary consensus:

- When the job runs in an application cluster, the default behavior 
of AdaptiveScheduler not actively releasing Taskmanagers resources 
during downscaling could be considered a 
bug (At least from certain perspectives, this is the case).
- We should fix it in flink 1.x.

However, there's still no consensus in the discussion on how to fix this issue 
under the following conditions:
- Flink 1.x series versions and Application deployment mode (It's not about to 
session cluster mode.)

Strategy list:

1). Adding this behavior while being guarded by a feature flag/configuration 
parameter in the 1.x LTS version.
    (@Matthias If my understanding is incorrect, please correct me, thanks! )
    a. This enables the option for users to revert to the original behavior
       eg. when ignoring idle resource occupation and focusing only
       on the resource waiting time during rescaling, this can achieve some 
positive impacts.
    b. Introducing new parameters increases the complexity for users, 
       as Maximilian mentioned, we already have many parameters.

2). Set the behavior as the default without introducing new parameters in the 
1.x LTS version.
    a. Avoid introducing new parameters and reduce complexity for users.
    b. This disables the option for users to revert to the original behavior.

We have to seek some trade-offs/change between the two options above in order 
to make a choice and reach a consensus on the conclusion.

Although Option-1) increases the complexity for users to use, I prefer to 
Option-1) due to the following reasons if we could set the default behavior in 
Option - 1) to the new behavior:
    a. This new parameter aligns with the expected behavior for most 
application deployment mode users, 
       who would have this behavior enabled by default. 
       Therefore, it doesn't add complexity in terms of configuration for them.
    b. However, this would require users who want to keep the original behavior 
to actively configure this setting. 
       So, this still gives users the flexibility to choose.
    c. Since this issue is only fixed by adopting this solution in version 1.x 
LTS with application deployment mode,
        this parameter doesn't have a plan for forward compatibility, and a new 
parameter would also be acceptable to me.

I'd like to hear more ideas about it or your opinions on the options mentioned 
above to reach a final and reasonable consensus.

Thank you very much.

Best,
Yuepeng.


On 2025/01/15 08:54:23 Maximilian Michels wrote:
> Hey Yuepeng,
> 
> I think that would work.
> 
> Thanks,
> Max
> 
> On Sun, Jan 12, 2025 at 3:42 PM Yuepeng Pan <panyuep...@apache.org> wrote:
> >
> > Hi, Maximilian, Thank you very much for your reply and suggestions.
> >
> > That makes sense to me.
> >
> >
> >
> >
> > > Do you think we could condition the DefaultSlotAssigner based
> >
> > > on whether the cluster is a session or an application cluster? We
> >
> > > would use the new slot assignment for application clusters. We could
> >
> > > do this via an internal configuration option, but I would advise not
> >
> > > to add a public one, as we have too many already.
> >
> >
> >
> >
> > In my limited reading, perhaps we could use the 'execution.target' 
> > configuration
> >
> > in the running cluster to make such a determination.
> >
> >
> >
> >
> > The value of 'execution.target' on the following cases:
> >
> >
> >
> >
> > - 0). ${flink deployment mode} -> ${the value of 'execution.target'}
> >
> >
> >
> >
> > - 1). yarn-application       -> embedded
> >
> > - 2). local application mode -> embedded
> >
> > - 3). k8s-application        -> embedded
> >
> > - 4). yarn-per-job           -> yarn-per-job
> >
> > - 5). k8s-session            -> kubernetes-session
> >
> > - 6). yarn-session           -> yarn-session
> >
> > - 7). standalone session     -> local
> >
> > - 8). local-minicluster      -> local
> >
> >
> >
> >
> > For items 1), 2), 3), and 4), using the new slot prioritization strategy 
> > mentioned previous may be a good option.
> > If I'm wrong, please feel free to correct me.
> > And I would greatly appreciate it if you could provide more information.
> >
> >
> >
> >
> > Looking forward to your reply.
> >
> >
> >
> >
> > Best,
> >
> > Yuepeng Pan
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > At 2025-01-10 17:12:21, "Maximilian Michels" <m...@apache.org> wrote:
> > ># Recap
> > >
> > >The current slot assignment strategy via DefaultSlotAssigner is to
> > >pseudo-randomly assign the available TM slots. That works fine in the
> > >following scenarios:
> > >
> > >1. The number of TMs remains constant
> > >2. There is only a single slot per TaskManager
> > >
> > >As soon as we dynamically modify the job resource requirements via the
> > >AdaptiveScheduler, the current slot assignment strategy makes it near
> > >impossible to have TaskManagers without used slots, which makes
> > >scaling down the number of TaskManagers very unpredictable and in many
> > >cases impossible.
> > >
> > >The solution in https://github.com/apache/flink/pull/25218/files sorts
> > >the TaskManager by least available slots. There were concerns raised
> > >that in session clusters, this would result in more clocked clusters,
> > >due to tasks being less spread-out. I agree that we probably don't
> > >want to change this behavior in 1.X for session clusters.
> > >
> > ># Proposal
> > >
> > >@Yuepeng Do you think we could condition the DefaultSlotAssigner based
> > >on whether the cluster is a session or an application cluster? We
> > >would use the new slot assignment for application clusters. We could
> > >do this via an internal configuration option, but I would advise not
> > >to add a public one, as we have too many already.
> > >
> > >-Max
> > >
> > >
> > >
> > >On Tue, Jan 7, 2025 at 8:22 AM Yuepeng Pan <panyuep...@apache.org> wrote:
> > >>
> > >> Thanks Max and Rui for the reply and clarification.
> > >>
> > >>
> > >>
> > >> IIUC, Would setting the slot assignment strategy of
> > >> DefaultSlotAssigner to prioritize using the minimum number of
> > >> TaskManagers by default solve the problem?
> > >>
> > >>
> > >>
> > >> I'd be appreciated with your confirmation.
> > >>
> > >>
> > >>
> > >>
> > >> Best,
> > >>
> > >> Yuepeng Pan
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> At 2025-01-07 10:16:07, "Rui Fan" <1996fan...@gmail.com> wrote:
> > >> >Happy new year! And thanks Matthias, Yuepeng and Max for your comments!
> > >> >
> > >> >For the reference to FLIP-138[1] from Matthias:
> > >> >
> > >> >As FLIP-138 mentioned:
> > >> >
> > >> >> In a future version, we might think about letting the ResourceManager
> > >> >balance resources across jobs.
> > >> >
> > >> >I agree with this, balancing resources might be needed only
> > >> >when a flink cluster has multiple jobs (in session mode).
> > >> >
> > >> >For Yuepeng's summary:
> > >> >
> > >> >> Please let me make a brief summary based on the historical comments:
> > >> >> - It's agreeded to optimize/fix this issue in the 1.x TLS versions.
> > >> >> - The primary goal of this optimization/fix is to minimize the number 
> > >> >> of
> > >> >TaskManagers used in application mode.
> > >> >> - The optimized logic should be controlled via a parameter.
> > >> >
> > >> >IIUC, the second and third points are in conflict. The second point
> > >> >means the goal is to fix it in application mode, but the third point
> > >> >might be needed only in session mode. If we introduce a new option
> > >> >to balance resources in the future, it's better to only take effect
> > >> >in the session mode. And the new option could be ignored in
> > >> >the application mode.
> > >> >
> > >> >So I'm not sure whether we will fix this issue in flink 1.x for both
> > >> >application mode and session mode?
> > >> >
> > >> >Generally, I'm +1 for Max's suggestion of application mode.
> > >> >
> > >> >Please correct me if I misunderstand anything.
> > >> >
> > >> >[1]
> > >> >https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs
> > >> >
> > >> >Best,
> > >> >Rui
> > >> >
> > >> >On Tue, Jan 7, 2025 at 1:52 AM Maximilian Michels <m...@apache.org> 
> > >> >wrote:
> > >> >
> > >> >> Thanks Yuepeng for your work on this issue!
> > >> >>
> > >> >> I would advise not to add yet another config option to Flink. In
> > >> >> application mode, the scheduler should default to using the least 
> > >> >> amount of
> > >> >> resources required. From my perspective, the current behavior is a 
> > >> >> bug and
> > >> >> it doesn't help that we can come up with scenarios where the current
> > >> >> behavior may be more optimal (e.g. local state recovery turned on).
> > >> >> Ultimately, it's not what users expect and we don't need another
> > >> >> configuration option that users can set. We need sane defaults and I 
> > >> >> would
> > >> >> strongly suggest that we fix the current default, especially because 
> > >> >> there
> > >> >> aren't any drawbacks for existing users.
> > >> >>
> > >> >> -Max
> > >> >>
> > >> >> On Mon, Jan 6, 2025 at 7:56 AM Yuepeng Pan <panyuep...@apache.org> 
> > >> >> wrote:
> > >> >>
> > >> >> > Thank you Matthias and all for the feedback and suggestions.
> > >> >> >
> > >> >> > Please let me make a brief summary based on the historical comments:
> > >> >> > - It's agreeded to optimize/fix this issue in the 1.x TLS versions.
> > >> >> > - The primary goal of this optimization/fix is to minimize the 
> > >> >> > number of
> > >> >> > TaskManagers used in application mode.
> > >> >> > - The optimized logic should be controlled via a parameter.
> > >> >> >
> > >> >> > I'd like to introduce the following parameter to control whether the
> > >> >> > optimized logic should be enabled:
> > >> >> > - Name:
> > >> >> > jobmanager.adaptive-scheduler.resource.minimal-taskmanagers-preferred
> > >> >> > - Type: boolean
> > >> >> > - Default value: false
> > >> >> > - Description: This parameter defines whether the adaptive scheduler
> > >> >> > prioritizes
> > >> >> > using the minimum number of TaskManagers when scheduling tasks.
> > >> >> > Note: This parameter is currently suitable for cases that
> > >> >> > execution.state-recovery.from-local is disabled.'
> > >> >> >
> > >> >> > BTW, I'm uncertain whether the introduction of a parameter for this
> > >> >> > specific fix necessitates documentation via a FLIP.
> > >> >> > If so, I'm willing to initiate a FLIP to aid in subsequent tasks.
> > >> >> > If not, I will add this email address to the corresponding jira 
> > >> >> > ticket's
> > >> >> > comments for tracking  and start the work on MR.
> > >> >> >
> > >> >> > Any suggestion would be appreciated!
> > >> >> >
> > >> >> > Thank you!
> > >> >> >
> > >> >> > Best,
> > >> >> > Yuepeng Pan
> > >> >> >
> > >> >> > On 2025/01/05 18:41:11 Matthias Pohl wrote:
> > >> >> > > Hi everyone and sorry for the late reply. I was mostly off in 
> > >> >> > > November
> > >> >> > and
> > >> >> > > forgot about that topic in December last year.
> > >> >> > >
> > >> >> > > Thanks for summarizing and bringing up user feedback. I see the 
> > >> >> > > problem
> > >> >> > and
> > >> >> > > agree with your view that it's a topic that we might want to 
> > >> >> > > address in
> > >> >> > the
> > >> >> > > 1.x LTS version. I see how this can be labeled as a bug or a 
> > >> >> > > feature
> > >> >> > > depending on the perspective. I think adding this behavior while 
> > >> >> > > being
> > >> >> > > guarded by a feature flag/configuration parameter in the 1.x LTS
> > >> >> version
> > >> >> > is
> > >> >> > > reasonable.
> > >> >> > >
> > >> >> > > Best,
> > >> >> > > Matthias
> > >> >> > >
> > >> >> > > [1]
> > >> >> > >
> > >> >> >
> > >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs
> > >> >> > >
> > >> >> > > On Wed, Nov 6, 2024 at 9:21 AM Rui Fan <1996fan...@gmail.com> 
> > >> >> > > wrote:
> > >> >> > >
> > >> >> > > > Thanks Yuepeng for the PR and starting this discussion!
> > >> >> > > >
> > >> >> > > > And thanks Gyula and Yuanfeng for the input!
> > >> >> > > >
> > >> >> > > > I also agree to fix this behaviour in the 1.x line.
> > >> >> > > >
> > >> >> > > > The adaptive scheduler and rescaling API provide powerful
> > >> >> capabilities
> > >> >> > to
> > >> >> > > > increase or decrease parallelism.
> > >> >> > > >
> > >> >> > > > The main benefit I understand of decreasing parallelism is 
> > >> >> > > > saving
> > >> >> > > > resources.
> > >> >> > > > If decreasing parallelism can't save resources, why do users 
> > >> >> > > > decrease
> > >> >> > it?
> > >> >> > > > This is why I think releasing TM resources when decreasing
> > >> >> parallelism
> > >> >> > is
> > >> >> > > > a basic capability that the Adaptive Scheduler should have.
> > >> >> > > >
> > >> >> > > > Please correct me if I miss anything, thanks~
> > >> >> > > >
> > >> >> > > > Also, I believe it does not work as the user expects. Because 
> > >> >> > > > this
> > >> >> > > > behaviour
> > >> >> > > > was reported multiple times in the flink community, such as:
> > >> >> > > > FLINK-33977[1],
> > >> >> > > > FLINK-35594[2], FLINK-35903[3] and Slack channel[4].
> > >> >> > > > And 1.20.x is a LTS version, so I agree to fix it in the 1.x 
> > >> >> > > > line.
> > >> >> > > >
> > >> >> > > > [1] https://issues.apache.org/jira/browse/FLINK-33977
> > >> >> > > > [2] https://issues.apache.org/jira/browse/FLINK-35594
> > >> >> > > > [3] https://issues.apache.org/jira/browse/FLINK-35903
> > >> >> > > > [4]
> > >> >> > https://apache-flink.slack.com/archives/C03G7LJTS2G/p1729167222445569
> > >> >> > > >
> > >> >> > > > Best,
> > >> >> > > > Rui
> > >> >> > > >
> > >> >> > > > On Wed, Nov 6, 2024 at 4:15 PM yuanfeng hu <yuanf...@apache.org>
> > >> >> > wrote:
> > >> >> > > >
> > >> >> > > >> > Is it considered an error if the adaptive scheduler fails to
> > >> >> > release the
> > >> >> > > >> task manager during scaling?
> > >> >> > > >>
> > >> >> > > >> +1 . When we enable adaptive mode and perform scaling 
> > >> >> > > >> operations on
> > >> >> > tasks,
> > >> >> > > >> a significant part of the goal is to reduce resource usage for 
> > >> >> > > >> the
> > >> >> > tasks.
> > >> >> > > >> However, due to some logic in the adaptive scheduler's 
> > >> >> > > >> scheduling
> > >> >> > process,
> > >> >> > > >> the task manager cannot be released, and the ultimate goal 
> > >> >> > > >> cannot be
> > >> >> > > >> achieved. Therefore, I consider this to be a mistake.
> > >> >> > > >>
> > >> >> > > >> Additionally, many tasks are currently running in this mode 
> > >> >> > > >> and will
> > >> >> > > >> continue to run for quite a long time (many users are in this
> > >> >> > situation).
> > >> >> > > >> So whether or not it is considered a bug, I believe we need to 
> > >> >> > > >> fix
> > >> >> it
> > >> >> > in
> > >> >> > > >> the 1.x version.
> > >> >> > > >>
> > >> >> > > >> Yuepeng Pan <panyuep...@apache.org> 于2024年11月6日周三 14:32写道:
> > >> >> > > >>
> > >> >> > > >> > Hi, community.
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >> > When working on ticket[1] we have received some lively 
> > >> >> > > >> > discussions
> > >> >> > and
> > >> >> > > >> > valuable
> > >> >> > > >> > feedback[2](thanks for Matthias, Rui, Gyula, Maximilian, 
> > >> >> > > >> > Tison,
> > >> >> > etc.),
> > >> >> > > >> the
> > >> >> > > >> > main issues are that:
> > >> >> > > >> >
> > >> >> > > >> > When the job runs in an application cluster, could the 
> > >> >> > > >> > default
> > >> >> > behavior
> > >> >> > > >> of
> > >> >> > > >> > AdaptiveScheduler not actively releasing Taskmanagers 
> > >> >> > > >> > resources
> > >> >> > during
> > >> >> > > >> > downscaling be considered a bug?
> > >> >> > > >> >
> > >> >> > > >> > If so,should we fix it in flink 1.x?
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >> > I’d like to start a discussion to hear more comments about 
> > >> >> > > >> > it to
> > >> >> > define
> > >> >> > > >> > the next step and I have sorted out some information in the 
> > >> >> > > >> > doc[3]
> > >> >> > > >> > regarding this discussion for you.
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >> > Looking forward to your comments and attention.
> > >> >> > > >> >
> > >> >> > > >> > Thank you.
> > >> >> > > >> >
> > >> >> > > >> > Best,
> > >> >> > > >> > Yuepeng Pan
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >> > [1] https://issues.apache.org/jira/browse/FLINK-33977
> > >> >> > > >> >
> > >> >> > > >> > [2]
> > >> >> > https://github.com/apache/flink/pull/25218#issuecomment-2401913141
> > >> >> > > >> >
> > >> >> > > >> > [3]
> > >> >> > > >> >
> > >> >> > > >>
> > >> >> >
> > >> >> https://docs.google.com/document/d/1Rwwl2aGVz9g5kUJFMP5GMlJwzEO_a-eo4gPf7gITpdw/edit?tab=t.0#heading=h.s4i4hehbbli5
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >> >
> > >> >> > > >>
> > >> >> > > >> --
> > >> >> > > >> Best,
> > >> >> > > >> Yuanfeng
> > >> >> > > >>
> > >> >> > > >
> > >> >> > >
> > >> >> >
> > >> >>
> 

Reply via email to