Hi, Matthias.
Thank you very much for the comments.


> I was thinking about the case where a rescale was initiated by
> calling updateJobResourcesRequirements but it's not executed, yet, before
> the error occurs. In that case, a restart will be triggered that implicitly
> executes the rescale based on the available resources. If the available
> resources match the desired resource requirements of the
> updateJobResourcesRequirements, we execute the rescale through the job
> restart which was triggered by the error handling. That would result in a
> rescale that was actually "successful/completed".

IIUC, did you mean making some adjustments in such cases to reduce the number 
of ignored rescale records triggered by updateJobResourcesRequirements in the 
mentioned about case?
If so, I think that is reasonable, and I will handle it accordingly during the 
implementation.

> The State implementations handle AdaptiveScheduler State-specific
> functionality but the corresponding Context interfaces provide the methods
> to actually modify the AdaptiveScheduler instance itself. The
> RescaleTimeline is part of the AdaptiveScheduler instance and would be
> exposed through that instance (analogously to the exception history in
> AdaptiveScheduler#requestJob).

In the current design, AdaptiveScheduler has a method #getRescaleTimeline,
so we are already able to access the RescaleTimeline object within the 
AdaptiveScheduler instance and use it to handle rescale-related logic.
Sorry, I may not understand your point.
Did you mean that we still need some changes like the following?
- Remove AdaptiveScheduler#getRescaleTimeline
- Remove StateWithExecutionGraph#getRescaleTimeline
- Remove StateWithoutExecutionGraph#getRescaleTimeline
- Add interfaces in State.Context to operate on rescale in RescaleTimeline 
directly, so as to avoid an extra #getRescaleTimeline method calling.
I would appreciate any corrections or suggestions.

For Other notes:

> - nit (implementation detail): I'm not sure whether we need to keep the
> latestRescales as part of the RescalesStatsSnapshot. That could be quickly
> derived from RescalesStatsSnapshot#rescaleHistory (to reduce redundancy in
> the code) assuming that this list doesn't contain millions of entries. But
> that's an implementation detail. Not sure whether that even needs to be
> mentioned in the FLIP.
> - nit (implementation detail): VertexParallelismRescale#pre/postParallelism
> should be named pre/postRescaleParallelism. Same applies
> to SlotSharingGroupRescale#pre/postSlot
> - nit (implementation detail): The name of the SlotSharingGroup does not
> need to be stored in the VertexParallelismRescale as far as I can see. The
> SlotSharingGroup which holds the name is referenced through the ID.

Reasonable! Updated it.

> - Do we still need RescaleStatus in the Rescale structure? That seems to be
> a transient field that should be derived via the AdaptiveScheduler's state.

Thank you very much for the reminder.
This is exactly the point I have been reconsidering.
After introducing TerminalState and TerminatedReason, this field has lost its 
meaning.
If users need to pay attention to the AdaptiveScheduler state information, they 
can obtain it from schedulerStates.
Therefore, removing this field is reasonable to me, too.



Looking forward to your comments!


Best regards,
Yuepeng Pan
---- Replied Message ----
| From | Matthias Pohl<map...@apache.org> |
| Date | 08/25/2025 15:14 |
| To | <dev@flink.apache.org> |
| Cc | Roc Marshal<flin...@126.com> |
| Subject | Re: [DISCUSS] FLIP-495: Support AdaptiveScheduler record and query 
the rescale history |
Hi Yuepeng,
thanks for the update. Please see my responses below:

If, during failure handling and job recovery (i.e., when there is an
ongoing rescale), a new resource requirement change or new available
resource change occurs (provided that such a change can trigger a new
rescale), the ongoing rescale will be forcibly terminated and saved.


I was thinking about the case where a rescale was initiated by
calling updateJobResourcesRequirements but it's not executed, yet, before
the error occurs. In that case, a restart will be triggered that implicitly
executes the rescale based on the available resources. If the available
resources match the desired resource requirements of the
updateJobResourcesRequirements, we execute the rescale through the job
restart which was triggered by the error handling. That would result in a
rescale that was actually "successful/completed".

- Why does StateTransitions need to be extended with
getRescaleTimeline()?



- This is because we want to quickly obtain a reference to the
RescaleTimeline, and maintain rescale information through it.
- It was introduced into the StateTransitions class so that all states can
call #getRescaleTimeline via this interface.
- After re-examining the scope affected by rescale maintenance operations,
it might be more precise to introduce this method into
StateWithExecutionGraph.Context / StateWithoutExecutionGraph.Context [1]
instead.


The State implementations handle AdaptiveScheduler State-specific
functionality but the corresponding Context interfaces provide the methods
to actually modify the AdaptiveScheduler instance itself. The
RescaleTimeline is part of the AdaptiveScheduler instance and would be
exposed through that instance (analogously to the exception history in
AdaptiveScheduler#requestJob).

Other notes:
- nit (implementation detail): I'm not sure whether we need to keep the
latestRescales as part of the RescalesStatsSnapshot. That could be quickly
derived from RescalesStatsSnapshot#rescaleHistory (to reduce redundancy in
the code) assuming that this list doesn't contain millions of entries. But
that's an implementation detail. Not sure whether that even needs to be
mentioned in the FLIP.

- nit (implementation detail): VertexParallelismRescale#pre/postParallelism
should be named pre/postRescaleParallelism. Same applies
to SlotSharingGroupRescale#pre/postSlot

- nit (implementation detail): The name of the SlotSharingGroup does not
need to be stored in the VertexParallelismRescale as far as I can see. The
SlotSharingGroup which holds the name is referenced through the ID.

- Do we still need RescaleStatus in the Rescale structure? That seems to be
a transient field that should be derived via the AdaptiveScheduler's state.

Thanks,
Matthias

On Fri, Aug 22, 2025 at 1:19 PM Roc Marshal <flin...@126.com> wrote:

Hi, Matthias.
Thank you very much for the comments.

- Why do we mark the rescale operation as IGNORED if an (repeatable)
exception causes a restart?

Here we mainly discuss two scenarios:
- If a recoverable exception occurs while there is an ongoing rescale, the
current rescale will be forcibly terminated and saved. Then, a new rescale
will be created to record the subsequent rescale information.
- If there is no ongoing rescale, a new rescale will be created directly
to record the information.

That's because the failure will create a new rescale instance that will
be saved if the resources changed as part of the failure handling/job
restart process?

If, during failure handling and job recovery (i.e., when there is an
ongoing rescale), a new resource requirement change or new available
resource change occurs (provided that such a change can trigger a new
rescale), the ongoing rescale will be forcibly terminated and saved.
Then, the system will create a new rescale to continue recording the
subsequent process.

- Thinking about the GlobalRescaleId: Should we still keep it
considering that we're not preserving the value on JM failovers (because of
the missing HA functionality) in this FLIP? We could remove it and add it
as part of a HA-support follow-up. Providing it despite the missing HA
support might be misleading in case a JM failover actually happens. WDYT?

Nice proposal! That makes sense to me~

- For the job vertices, you introduced a 5th resource type (previous,
acquired, required, min bound, max bound). What's the difference? I guess,
min bound corresponds to "sufficient resources" and max bound corresponds
to "desired resources". What are required resources here in that case? It
might be good to stick to the naming that was introduced in FLIP-472 [1].

I’ve already made the latest changes.
BTW, the original intention here was to record information as similar as
possible to the one from #updateJobResourcesRequirements requests. Now I
believe it’s quite necessary to remove the previous design and use a
unified description instead, since this makes things more consistent and
easier to understand. Many thanks for your suggestions.

- "Job Vertices Rescaled Information" and "Slots / Resources Rescaled
Information" seem to have redundant information now: "parallelism of job
vertex" and "slot number of the slot sharing group" are more or less the
same. The parallelism is defined per job vertex (along the
SlotSharingGroupId). The SlotSharingGroup holds the ResourceProfile (but no
parallelism).

Please let me to explain the reasoning:
- The parallelism of job vertices is kept so that users can clearly see
the parallelism of each job vertex.
- Adding information about the number of slots in a SlotSharingGroup is
intended to give users a more intuitive understanding of how many resources
this type of group consumes. Of course, even if we remove the slot number
in the SlotSharingGroup, users could still calculate it by themselves based
on job vertices paralllelsims, but that would require them to be familiar
with the AdaptiveScheduler’s logic for computing slot numbers.
- In addition, if we only keep the information about the number of slots
in the SlotSharingGroup and drop the parallelism information at the
job-vertex level, then users would lose the fine-grained parallelism
details of job vertices during a rescale. They would only be able to see
SlotSharingGroup-level information, which is somewhat like parallelism but
less precise.
- If necessary, I’m willing to remove the slot-number-related field from
the SlotSharingGroup description.
Please let me know your thoughts.

- "Other information" section contains a comment message for the reason
why a rescale event terminates. Reason might be a better term than comment
(especially because we're relying on an enum here rather than plain text).

Good idea — I’ve already made the corresponding changes.

- Based on the current proposal of the FLIP, the intermediate states of
the rescale event actually match the AdaptiveScheduler's states. Only the
final states are exclusive to the scale event?  Would it make sense to rely
on the AdaptiveScheduler state entirely for the intermediate state and a
dedicated field "terminalState"?

Yes, your understanding is correct. This approach works well for me, and
I’ve already made the corresponding changes in the wiki document.

- Why does StateTransitions need to be extended with
getRescaleTimeline()?

- This is because we want to quickly obtain a reference to the
RescaleTimeline, and maintain rescale information through it.
- It was introduced into the StateTransitions class so that all states can
call #getRescaleTimeline via this interface.
- After re-examining the scope affected by rescale maintenance operations,
it might be more precise to introduce this method into
StateWithExecutionGraph.Context / StateWithoutExecutionGraph.Context [1]
instead.

Shouldn't the RescaleTimeline be located in the AdaptiveScheduler?

Yes. Whether #getRescaleTimeline is introduced in StateTransitions or in
StateWithExecutionGraph.Context / StateWithoutExecutionGraph.Context, the
RescaleTimeline should still be part of the AdaptiveScheduler.

Instead, the State.Context (essentially, the AdaptiveScheduler) should
provide methods for updating the current rescale process object that lives
in the AdaptiveScheduler. WDYT?

Sounds good to me. Once #getRescaleTimeline is introduced in
State.Context, it can directly obtain a reference to the RescaleTimeline,
allowing quick operations on the current rescale.
What do you think?


- Section "How to process rescales storage ?": ExecutionGraphInfo could
be modified to include the Rescale history (along the exception history).
The data can be then saved in the ExecutionGraphInfoStore. All you have to
do is to store the rescale history in AdaptiveScheduler and an object that
holds the infos of the current rescale process. If this current rescale
process terminates, an immutable rescale snapshot of this event is created
that is saved in the rescale history.

Sorry for my previous wording was ambiguous and inconsistent with the
documentation. Thanks a lot for pointing it out. This is actually what I
intended to do, and I’ve already corrected the corresponding description in
the document.

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=334760525#FLIP495:SupportAdaptiveSchedulerrecordandquerytherescalehistory-Howtohold/maintaintherescaleshistory
?

Best,
Yuepeng Pan



Roc Marshal
flin...@126.com

---- Replied Message ----
From Matthias Pohl<map...@apache.org> <map...@apache.org>
Date 08/21/2025 18:54
To dev@flink.apache.org<dev@flink.apache.org> <dev@flink.apache.org>
Cc Yuepeng Pan<panyuep...@apache.org> <panyuep...@apache.org>
Subject Re: [DISCUSS] FLIP-495: Support AdaptiveScheduler record and
query the rescale history
Thanks for the update, Yuepeng. Here are my remarks:

- Why do we mark the rescale operation as IGNORED if an (repeatable)
exception causes a restart? That's because the failure will create a new
rescale instance that will be saved if the resources changed as part of the
failure handling/job restart process?

- Thinking about the GlobalRescaleId: Should we still keep it considering
that we're not preserving the value on JM failovers (because of the missing
HA functionality) in this FLIP? We could remove it and add it as part of a
HA-support follow-up. Providing it despite the missing HA support might be
misleading in case a JM failover actually happens. WDYT?

- For the job vertices, you introduced a 5th resource type (previous,
acquired, required, min bound, max bound). What's the difference? I guess,
min bound corresponds to "sufficient resources" and max bound corresponds
to "desired resources". What are required resources here in that case? It
might be good to stick to the naming that was introduced in FLIP-472 [1].

- "Job Vertices Rescaled Information" and "Slots / Resources Rescaled
Information" seem to have redundant information now: "parallelism of job
vertex" and "slot number of the slot sharing group" are more or less the
same. The parallelism is defined per job vertex (along the
SlotSharingGroupId). The SlotSharingGroup holds the ResourceProfile (but no
parallelism).

- "Other information" section contains a comment message for the reason why
a rescale event terminates. Reason might be a better term than comment
(especially because we're relying on an enum here rather than plain text).

- Based on the current proposal of the FLIP, the intermediate states of the
rescale event actually match the AdaptiveScheduler's states. Only the final
states are exclusive to the scale event? Would it make sense to rely on the
AdaptiveScheduler state entirely for the intermediate state and a dedicated
field "terminalState"?

- Why does StateTransitions need to be extended with getRescaleTimeline()?
Shouldn't the RescaleTimeline be located in the AdaptiveScheduler? Instead,
the State.Context (essentially, the AdaptiveScheduler) should provide
methods for updating the current rescale process object that lives in the
AdaptiveScheduler. WDYT?

- Section "How to process rescales storage ?": ExecutionGraphInfo could be
modified to include the Rescale history (along the exception history). The
data can be then saved in the ExecutionGraphInfoStore. All you have to do
is to store the rescale history in AdaptiveScheduler and an object that
holds the infos of the current rescale process. If this current rescale
process terminates, an immutable rescale snapshot of this event is created
that is saved in the rescale history.

Matthias

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states

On Tue, Aug 19, 2025 at 11:14 AM Yuepeng Pan <panyuep...@apache.org>
wrote:

Bumping this thread kindly. Thanks!

Best,
Yuepeng Pan


At 2025-08-13 14:52:26, "Yuepeng Pan" <panyuep...@apache.org> wrote:

Hi, Matthias,
Thank you  very much for your comments!
I have carefully read your reply and made some changes in the hope of
making improvements.
Please help take a look.

For your comments:

1. You mention a few options for when it comes to storing the data which is
good. The FLIP doesn't point out, though, what option you're going to go
for as part of this FLIP (as far as I can see). It would be good to only
outline the option to go for in the FLIP and list the other options as
rejected alternatives (with the pro's and con's). I think it make sense to
go for option 3 (i.e. following what's done for the ExecutionGraphInfoStore
for now). The other options can be considered as a follow-up.


This is very meaningful. Based on this comment, I have kept option 3 in
its original place and moved the other candidate options to [1].

2. About the terminal states of a rescaling (i.e. IGNORED, FAILED,
COMPLETED): Can we we clarify in the FLIP under what conditions the
rescaling transitions into each of the three terminal states?


Yes, this is a reasonable request for understanding and explaining the
logic of transitions to terminated states.
A new subsection [2] has been added to address this.

3. The section "The information to record in a rescale event" could be
restructured in four sections (to remove redundancy):
a) The IDs (Rescale
ID, resourceRequirementsEpochID,
subRescaleIdOfResourceRequirementsEpochID):
What about making these names easier to read: GlobalRescaleID, RescaleUUID,
RescaleAttemptId)
b) Per-vertex data which includes: JobVertexID, JobVertexName,
SlotSharingGroupId, the different parallelisms (pre-rescale, sufficient,
desired, post-rescale)
c) The SlotSharingGroup information: SlotSharingGroupId, name,
ResourceProfile
d) Other information: Timestamps of state transitions, etc. as laid out in
the FLIP already


That makes sense to me. Please check [3] for the latest updates in this
part.

4. The FLIP doesn't explain how the data is passed through the
AdaptiveScheduler states. We should be handling some kind of
RescaleSnapshot that is passed through the different states and updated and
its final state is stored somewhere within AdaptiveScheduler in the end, I
guess. Can we clarify that in the FLIP?


Indeed — this was missing in the original FLIP. To address this, I have
added [4], which focuses on describing how a Rescale is represented,
and how we can quickly pass and maintain the Rescale history.

5. You mention the config parameters for the cache in the public interface
section. But there's no mentioning of any caching and how that is used
within the FLIP.


Sorry for the rough description in the previous version.
Since this part belongs to the REST API acceleration mechanism for
rescaling, and Option 6 seems reasonable to me,
I plan to add it to FLIP-487 once the design of FLIP-495 has reached
consensus.
Of course, if needed, I'd be happy to clarify the usage and purpose of
this parameter in the current email thread.

6. The REST endpoint is probably better suited in FLIP-487. FLIP-495 should
be about the actual implementation details and how the data is stored
internally whereas FLIP-487 is about exposing the information to the
outside through the REST API and the Flink UI. That would be a way to
decrease the scope of FLIP-495. WDYT?


That sounds nice to me. Therefore, I have moved all REST API–related
changes to FLIP-487.
BTW, to avoid repetitive changes in FLIP-487, I'll start organizing
FLIP-487 after FLIP-495 has been finalized.

Looking forward to your next review!

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=334760525#FLIP495:SupportAdaptiveSchedulerrecordandquerytherescalehistory-Aboutrescaleeventsstorage.1
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=334760525#FLIP495:SupportAdaptiveSchedulerrecordandquerytherescalehistory-ThemainscenarioswhereRescalestatusswitchestoterminated
[3]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=334760525#FLIP495:SupportAdaptiveSchedulerrecordandquerytherescalehistory-Theinformationtorecordinarescaleevent
[4]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=334760525#FLIP495:SupportAdaptiveSchedulerrecordandquerytherescalehistory-InternalInterfaces


Best regards,
Yuepeng Pan





At 2025-08-10 23:54:37, "Matthias Pohl" <map...@apache.org> wrote:

Hi Yuepeng,
thanks for reminding me of this FLIP. I went over it and have a few items
which we might need to address before we can actually finalize the vote:

1. You mention a few options for when it comes to storing the data which is
good. The FLIP doesn't point out, though, what option you're going to go
for as part of this FLIP (as far as I can see). It would be good to only
outline the option to go for in the FLIP and list the other options as
rejected alternatives (with the pro's and con's). I think it make sense to
go for option 3 (i.e. following what's done for the ExecutionGraphInfoStore
for now). The other options can be considered as a follow-up.
2. About the terminal states of a rescaling (i.e. IGNORED, FAILED,
COMPLETED): Can we we clarify in the FLIP under what conditions the
rescaling transitions into each of the three terminal states?
3. The section "The information to record in a rescale event" could be
restructured in four sections (to remove redundancy):
a) The IDs (Rescale
ID, resourceRequirementsEpochID,
subRescaleIdOfResourceRequirementsEpochID):
What about making these names easier to read: GlobalRescaleID, RescaleUUID,
RescaleAttemptId)
b) Per-vertex data which includes: JobVertexID, JobVertexName,
SlotSharingGroupId, the different parallelisms (pre-rescale, sufficient,
desired, post-rescale)
c) The SlotSharingGroup information: SlotSharingGroupId, name,
ResourceProfile
d) Other information: Timestamps of state transitions, etc. as laid out in
the FLIP already
4. The FLIP doesn't explain how the data is passed through the
AdaptiveScheduler states. We should be handling some kind of
RescaleSnapshot that is passed through the different states and updated and
its final state is stored somewhere within AdaptiveScheduler in the end, I
guess. Can we clarify that in the FLIP?
5. You mention the config parameters for the cache in the public interface
section. But there's no mentioning of any caching and how that is used
within the FLIP.
6. The REST endpoint is probably better suited in FLIP-487. FLIP-495 should
be about the actual implementation details and how the data is stored
internally whereas FLIP-487 is about exposing the information to the
outside through the REST API and the Flink UI. That would be a way to
decrease the scope of FLIP-495. WDYT?

Best,
Matthias


On Mon, Mar 24, 2025 at 11:37 AM Yuepeng Pan <panyuep...@apache.org>
wrote:

Hi, Community,

There haven’t been any further responses to this email over the past few
days.
I'd like to initiate a vote on the current proposal[1] in the next few
days.
Please rest assured that I’m proceeding cautiously and not rushing the
process.
If there are any concerns about this FLIP-495[1],
I will gladly pause and make the adjustments.

Best regards,
Yuepeng Pan

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-495%3A+Support+AdaptiveScheduler+record+and+query+the+rescale+history


On 2024/12/17 15:18:45 Yuepeng Pan wrote:

Hi community,




We discussed several aspects of FLIP-487[1] 'Show history of rescales in

Web UI for AdaptiveScheduler'

and received a lot of valuable feedback. Based on the suggestions from

the email thread[2],

we plan to split the original proposal for FLIP-487[1].




The current email thread and the FLIP-495[3] wiki will be used to

discuss 'Support AdaptiveScheduler in recording and querying the rescale
history',

while FLIP-487[1] will primarily focus on displaying-related design

content





Looking forward to any feedback and opinions on FLIP-495[3].




[1]


https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-487%3A+Show+history+of+rescales+in+Web+UI+for+AdaptiveScheduler


[2] https://lists.apache.org/thread/f4md4btkf006mxcxf66bng1kfz0rsn8c

[3]


https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-495%3A+Support+AdaptiveScheduler+record+and+query+the+rescale+history





Thank you very much.




Best,

Regards.

Yuepeng Pan





Reply via email to