Hi, Maximilian, Thank you very much for your reply and suggestions.

That makes sense to me.




> Do you think we could condition the DefaultSlotAssigner based

> on whether the cluster is a session or an application cluster? We

> would use the new slot assignment for application clusters. We could

> do this via an internal configuration option, but I would advise not

> to add a public one, as we have too many already.




In my limited reading, perhaps we could use the 'execution.target' 
configuration 

in the running cluster to make such a determination.




The value of 'execution.target' on the following cases:




- 0). ${flink deployment mode} -> ${the value of 'execution.target'}




- 1). yarn-application       -> embedded 

- 2). local application mode -> embedded

- 3). k8s-application        -> embedded

- 4). yarn-per-job           -> yarn-per-job

- 5). k8s-session            -> kubernetes-session

- 6). yarn-session           -> yarn-session

- 7). standalone session     -> local

- 8). local-minicluster      -> local




For items 1), 2), 3), and 4), using the new slot prioritization strategy 
mentioned previous may be a good option.
If I'm wrong, please feel free to correct me.
And I would greatly appreciate it if you could provide more information.




Looking forward to your reply.




Best,

Yuepeng Pan

















At 2025-01-10 17:12:21, "Maximilian Michels" <m...@apache.org> wrote:
># Recap
>
>The current slot assignment strategy via DefaultSlotAssigner is to
>pseudo-randomly assign the available TM slots. That works fine in the
>following scenarios:
>
>1. The number of TMs remains constant
>2. There is only a single slot per TaskManager
>
>As soon as we dynamically modify the job resource requirements via the
>AdaptiveScheduler, the current slot assignment strategy makes it near
>impossible to have TaskManagers without used slots, which makes
>scaling down the number of TaskManagers very unpredictable and in many
>cases impossible.
>
>The solution in https://github.com/apache/flink/pull/25218/files sorts
>the TaskManager by least available slots. There were concerns raised
>that in session clusters, this would result in more clocked clusters,
>due to tasks being less spread-out. I agree that we probably don't
>want to change this behavior in 1.X for session clusters.
>
># Proposal
>
>@Yuepeng Do you think we could condition the DefaultSlotAssigner based
>on whether the cluster is a session or an application cluster? We
>would use the new slot assignment for application clusters. We could
>do this via an internal configuration option, but I would advise not
>to add a public one, as we have too many already.
>
>-Max
>
>
>
>On Tue, Jan 7, 2025 at 8:22 AM Yuepeng Pan <panyuep...@apache.org> wrote:
>>
>> Thanks Max and Rui for the reply and clarification.
>>
>>
>>
>> IIUC, Would setting the slot assignment strategy of
>> DefaultSlotAssigner to prioritize using the minimum number of
>> TaskManagers by default solve the problem?
>>
>>
>>
>> I'd be appreciated with your confirmation.
>>
>>
>>
>>
>> Best,
>>
>> Yuepeng Pan
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2025-01-07 10:16:07, "Rui Fan" <1996fan...@gmail.com> wrote:
>> >Happy new year! And thanks Matthias, Yuepeng and Max for your comments!
>> >
>> >For the reference to FLIP-138[1] from Matthias:
>> >
>> >As FLIP-138 mentioned:
>> >
>> >> In a future version, we might think about letting the ResourceManager
>> >balance resources across jobs.
>> >
>> >I agree with this, balancing resources might be needed only
>> >when a flink cluster has multiple jobs (in session mode).
>> >
>> >For Yuepeng's summary:
>> >
>> >> Please let me make a brief summary based on the historical comments:
>> >> - It's agreeded to optimize/fix this issue in the 1.x TLS versions.
>> >> - The primary goal of this optimization/fix is to minimize the number of
>> >TaskManagers used in application mode.
>> >> - The optimized logic should be controlled via a parameter.
>> >
>> >IIUC, the second and third points are in conflict. The second point
>> >means the goal is to fix it in application mode, but the third point
>> >might be needed only in session mode. If we introduce a new option
>> >to balance resources in the future, it's better to only take effect
>> >in the session mode. And the new option could be ignored in
>> >the application mode.
>> >
>> >So I'm not sure whether we will fix this issue in flink 1.x for both
>> >application mode and session mode?
>> >
>> >Generally, I'm +1 for Max's suggestion of application mode.
>> >
>> >Please correct me if I misunderstand anything.
>> >
>> >[1]
>> >https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs
>> >
>> >Best,
>> >Rui
>> >
>> >On Tue, Jan 7, 2025 at 1:52 AM Maximilian Michels <m...@apache.org> wrote:
>> >
>> >> Thanks Yuepeng for your work on this issue!
>> >>
>> >> I would advise not to add yet another config option to Flink. In
>> >> application mode, the scheduler should default to using the least amount 
>> >> of
>> >> resources required. From my perspective, the current behavior is a bug and
>> >> it doesn't help that we can come up with scenarios where the current
>> >> behavior may be more optimal (e.g. local state recovery turned on).
>> >> Ultimately, it's not what users expect and we don't need another
>> >> configuration option that users can set. We need sane defaults and I would
>> >> strongly suggest that we fix the current default, especially because there
>> >> aren't any drawbacks for existing users.
>> >>
>> >> -Max
>> >>
>> >> On Mon, Jan 6, 2025 at 7:56 AM Yuepeng Pan <panyuep...@apache.org> wrote:
>> >>
>> >> > Thank you Matthias and all for the feedback and suggestions.
>> >> >
>> >> > Please let me make a brief summary based on the historical comments:
>> >> > - It's agreeded to optimize/fix this issue in the 1.x TLS versions.
>> >> > - The primary goal of this optimization/fix is to minimize the number of
>> >> > TaskManagers used in application mode.
>> >> > - The optimized logic should be controlled via a parameter.
>> >> >
>> >> > I'd like to introduce the following parameter to control whether the
>> >> > optimized logic should be enabled:
>> >> > - Name:
>> >> > jobmanager.adaptive-scheduler.resource.minimal-taskmanagers-preferred
>> >> > - Type: boolean
>> >> > - Default value: false
>> >> > - Description: This parameter defines whether the adaptive scheduler
>> >> > prioritizes
>> >> > using the minimum number of TaskManagers when scheduling tasks.
>> >> > Note: This parameter is currently suitable for cases that
>> >> > execution.state-recovery.from-local is disabled.'
>> >> >
>> >> > BTW, I'm uncertain whether the introduction of a parameter for this
>> >> > specific fix necessitates documentation via a FLIP.
>> >> > If so, I'm willing to initiate a FLIP to aid in subsequent tasks.
>> >> > If not, I will add this email address to the corresponding jira ticket's
>> >> > comments for tracking  and start the work on MR.
>> >> >
>> >> > Any suggestion would be appreciated!
>> >> >
>> >> > Thank you!
>> >> >
>> >> > Best,
>> >> > Yuepeng Pan
>> >> >
>> >> > On 2025/01/05 18:41:11 Matthias Pohl wrote:
>> >> > > Hi everyone and sorry for the late reply. I was mostly off in November
>> >> > and
>> >> > > forgot about that topic in December last year.
>> >> > >
>> >> > > Thanks for summarizing and bringing up user feedback. I see the 
>> >> > > problem
>> >> > and
>> >> > > agree with your view that it's a topic that we might want to address 
>> >> > > in
>> >> > the
>> >> > > 1.x LTS version. I see how this can be labeled as a bug or a feature
>> >> > > depending on the perspective. I think adding this behavior while being
>> >> > > guarded by a feature flag/configuration parameter in the 1.x LTS
>> >> version
>> >> > is
>> >> > > reasonable.
>> >> > >
>> >> > > Best,
>> >> > > Matthias
>> >> > >
>> >> > > [1]
>> >> > >
>> >> >
>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs
>> >> > >
>> >> > > On Wed, Nov 6, 2024 at 9:21 AM Rui Fan <1996fan...@gmail.com> wrote:
>> >> > >
>> >> > > > Thanks Yuepeng for the PR and starting this discussion!
>> >> > > >
>> >> > > > And thanks Gyula and Yuanfeng for the input!
>> >> > > >
>> >> > > > I also agree to fix this behaviour in the 1.x line.
>> >> > > >
>> >> > > > The adaptive scheduler and rescaling API provide powerful
>> >> capabilities
>> >> > to
>> >> > > > increase or decrease parallelism.
>> >> > > >
>> >> > > > The main benefit I understand of decreasing parallelism is saving
>> >> > > > resources.
>> >> > > > If decreasing parallelism can't save resources, why do users 
>> >> > > > decrease
>> >> > it?
>> >> > > > This is why I think releasing TM resources when decreasing
>> >> parallelism
>> >> > is
>> >> > > > a basic capability that the Adaptive Scheduler should have.
>> >> > > >
>> >> > > > Please correct me if I miss anything, thanks~
>> >> > > >
>> >> > > > Also, I believe it does not work as the user expects. Because this
>> >> > > > behaviour
>> >> > > > was reported multiple times in the flink community, such as:
>> >> > > > FLINK-33977[1],
>> >> > > > FLINK-35594[2], FLINK-35903[3] and Slack channel[4].
>> >> > > > And 1.20.x is a LTS version, so I agree to fix it in the 1.x line.
>> >> > > >
>> >> > > > [1] https://issues.apache.org/jira/browse/FLINK-33977
>> >> > > > [2] https://issues.apache.org/jira/browse/FLINK-35594
>> >> > > > [3] https://issues.apache.org/jira/browse/FLINK-35903
>> >> > > > [4]
>> >> > https://apache-flink.slack.com/archives/C03G7LJTS2G/p1729167222445569
>> >> > > >
>> >> > > > Best,
>> >> > > > Rui
>> >> > > >
>> >> > > > On Wed, Nov 6, 2024 at 4:15 PM yuanfeng hu <yuanf...@apache.org>
>> >> > wrote:
>> >> > > >
>> >> > > >> > Is it considered an error if the adaptive scheduler fails to
>> >> > release the
>> >> > > >> task manager during scaling?
>> >> > > >>
>> >> > > >> +1 . When we enable adaptive mode and perform scaling operations on
>> >> > tasks,
>> >> > > >> a significant part of the goal is to reduce resource usage for the
>> >> > tasks.
>> >> > > >> However, due to some logic in the adaptive scheduler's scheduling
>> >> > process,
>> >> > > >> the task manager cannot be released, and the ultimate goal cannot 
>> >> > > >> be
>> >> > > >> achieved. Therefore, I consider this to be a mistake.
>> >> > > >>
>> >> > > >> Additionally, many tasks are currently running in this mode and 
>> >> > > >> will
>> >> > > >> continue to run for quite a long time (many users are in this
>> >> > situation).
>> >> > > >> So whether or not it is considered a bug, I believe we need to fix
>> >> it
>> >> > in
>> >> > > >> the 1.x version.
>> >> > > >>
>> >> > > >> Yuepeng Pan <panyuep...@apache.org> 于2024年11月6日周三 14:32写道:
>> >> > > >>
>> >> > > >> > Hi, community.
>> >> > > >> >
>> >> > > >> >
>> >> > > >> >
>> >> > > >> >
>> >> > > >> > When working on ticket[1] we have received some lively 
>> >> > > >> > discussions
>> >> > and
>> >> > > >> > valuable
>> >> > > >> > feedback[2](thanks for Matthias, Rui, Gyula, Maximilian, Tison,
>> >> > etc.),
>> >> > > >> the
>> >> > > >> > main issues are that:
>> >> > > >> >
>> >> > > >> > When the job runs in an application cluster, could the default
>> >> > behavior
>> >> > > >> of
>> >> > > >> > AdaptiveScheduler not actively releasing Taskmanagers resources
>> >> > during
>> >> > > >> > downscaling be considered a bug?
>> >> > > >> >
>> >> > > >> > If so,should we fix it in flink 1.x?
>> >> > > >> >
>> >> > > >> >
>> >> > > >> >
>> >> > > >> > I’d like to start a discussion to hear more comments about it to
>> >> > define
>> >> > > >> > the next step and I have sorted out some information in the 
>> >> > > >> > doc[3]
>> >> > > >> > regarding this discussion for you.
>> >> > > >> >
>> >> > > >> >
>> >> > > >> >
>> >> > > >> > Looking forward to your comments and attention.
>> >> > > >> >
>> >> > > >> > Thank you.
>> >> > > >> >
>> >> > > >> > Best,
>> >> > > >> > Yuepeng Pan
>> >> > > >> >
>> >> > > >> >
>> >> > > >> >
>> >> > > >> >
>> >> > > >> > [1] https://issues.apache.org/jira/browse/FLINK-33977
>> >> > > >> >
>> >> > > >> > [2]
>> >> > https://github.com/apache/flink/pull/25218#issuecomment-2401913141
>> >> > > >> >
>> >> > > >> > [3]
>> >> > > >> >
>> >> > > >>
>> >> >
>> >> https://docs.google.com/document/d/1Rwwl2aGVz9g5kUJFMP5GMlJwzEO_a-eo4gPf7gITpdw/edit?tab=t.0#heading=h.s4i4hehbbli5
>> >> > > >> >
>> >> > > >> >
>> >> > > >> >
>> >> > > >>
>> >> > > >> --
>> >> > > >> Best,
>> >> > > >> Yuanfeng
>> >> > > >>
>> >> > > >
>> >> > >
>> >> >
>> >>

Reply via email to