Hi Roman and Raorao,

Regarding the union operator state, I'd like to share some thoughts.

In my understanding, union operator state is not a separate state type, but
a different recovery semantic of OperatorState. The underlying state is the
same; the only difference is that regular operator state redistributes
state partitions, while union operator state restores the full state list
to every subtask.

>From this perspective, I think combining newer snapshots from healthy
regions with an older snapshot from a failed region is still valid. The
snapshots of different regions are independent, so the newer snapshots are
unaffected by whether the failed region contributes its latest or previous
snapshot.

Another way to see this is to assume the failed subtask simply stopped
making progress after the previous checkpoint instead of failing. The
resulting checkpoint would naturally contain the latest snapshots from
healthy subtasks and the previous snapshot from that subtask, which is
exactly the same composition as the proposed regional checkpoint.

What do you think?


Best,

Gen

On Mon, Jun 29, 2026 at 4:45 AM Roman Khachatryan <[email protected]> wrote:

> Hi Raorao,
>
> Thanks for the walk-through.
> That partially resolves my concern about re-assigning state: it doesn't
> have to be recomputed on every partial checkpoint — but the restored
> (redistributed) assignment has to be retained until the first checkpoint
> completes at the new parallelism.
>
> My other concern still stands: the isolation argument doesn't obviously
> cover union operator state, which every subtask sees in full across all
> regions — so a partial checkpoint stores a temporal mix of elements, and
> rescaling reshuffles them across new region boundaries.
>
> > I'll cover all this in the FLIP document.
>
> Yes — I think it's better to capture these edge cases in a Rescaling
> section of the FLIP and discuss in more detail there. Looking forward to
> it.
>
> Regards,
> Roman
>
>
> On Fri, Jun 26, 2026 at 10:06 AM 熊饶饶 <[email protected]> wrote:
>
> > Hi Roman, thanks for the detailed follow-up.
> > Rescaling and state redistribution
> > StateAssignmentOperation runs only once per restore — it doesn’t run per
> > referenced checkpoint. The subtask states entering
> StateAssignmentOperation
> > have already been pre-merged during the Regional Checkpoint completion
> > phase. Each OperatorSubtaskState already physically contains the state
> > handles from whichever checkpoint it originated from. The refCheckpointId
> > is just metadata for the cleaner.
> > Example: P=4 → P=2 → P=1, two rounds of downscaling
> > Initial state (P=4, even-split operator state):
> > ckp100 (T1, GLOBAL): all 4 regions ack
> >   subtask 0: [A0, A1]    region 0
> >   subtask 1: [B0, B1]    region 1
> >   subtask 2: [C0, C1]    region 2
> >   subtask 3: [D0, D1]    region 3
> > First downscale: P=4 → P=2 (recover from ckp100 after failover)
> > Even-split redistribution:
> > concatenate: [A0,A1, B0,B1, C0,C1, D0,D1]  → 8 elements
> > split into 2:
> >   new subtask 0: [A0, A1, B0, B1]
> >   new subtask 1: [C0, C1, D0, D1]
> > Now running at P=2:
> > ckp101 (T2, REGIONAL):
> >   new-subtask 0 ack: [E0, E1, E2, E3]
> >   new-subtask 1 decline, fallback to ckp100 → [C0,C1,D0,D1]  (ref=100)
> >
> > ckp101 metadata (pre-merged):
> >   new-subtask 0: stateHandles=[E0,E1,E2,E3], refCheckpointId=null
> >   new-subtask 1: stateHandles=[C0,C1,D0,D1], refCheckpointId=100
> > Second downscale: P=2 → P=1 (recover from ckp101)
> > Even-split redistribution:
> > concatenate: [E0,E1,E2,E3, C0,C1,D0,D1]  → 8 elements
> > split into 1:
> >   new subtask 0: [E0,E1,E2,E3, C0,C1,D0,D1]
> > At this point, the single subtask’s state contains elements from two time
> > points — this is safe per the partition isolation argument above. Now the
> > job continues running at P=1:
> > ckp102 (T3, REGIONAL):
> >   new subtask 0 decline, fallback to ckp101's redistributed state
> >   → ref=101
> >
> > ckp102 metadata (pre-merged):
> >   subtask 0: stateHandles=[E0,E1,E2,E3, C0,C1,D0,D1], refCheckpointId=101
> >
> > ref chain: ckp102(ref=101) → ckp101(ref=100) → ckp100
> > The ref chain is tracked entirely in the CompletedCheckpoint store
> > metadata. Rescaling is a restore operation — it redistributes state
> handles
> > to TMs but doesn't create new checkpoint metadata. The next Regional
> > Checkpoint (ckp102) references the previous CompletedCheckpoint (ckp101)
> > from the store, which still has its original ref=100 annotation in the
> > metadata.
> >
> > Union state, same scenario:
> > ckp100 (T1, GLOBAL): P=4, union operator state
> >   subtask 0: [A0, A1]
> >   subtask 1: [B0, B1]
> >   subtask 2: [C0, C1]
> >   subtask 3: [D0, D1]
> >
> > P=4 → P=2 (union):
> >   new subtask 0 gets FULL concatenated: [A0,A1, B0,B1, C0,C1, D0,D1]
> >   new subtask 1 gets FULL concatenated: same
> >
> > ckp101 (T2, REGIONAL):
> >   new-subtask 0 ack: [E0, ..., E7]
> >   new-subtask 1 decline, fallback → [A0,A1,B0,B1,C0,C1,D0,D1]  (ref=100)
> >
> > P=2 → P=1 (union):
> >   new subtask 0 gets: [E0,...,E7, A0,...,D1]  ← complete set, safe
> > Union is even simpler — each subtask always gets the complete state, so
> > mixing doesn’t add new complexity.
> > refCheckpointId chain bounded by maxConsecutiveFailures
> >
> > ckp100 (global)
> >   → ckp101 (regional, ref=100)  consecutive=1
> >   → ckp102 (regional, ref=101)  consecutive=2
> >   → ckp103: consecutive >= maxConsecutiveFailures(2)
> >      → FORCED GLOBAL → if success, chain resets
> > The reference chain can only grow as deep as maxConsecutiveFailures. Once
> > a global checkpoint succeeds, it’s a fresh start. No inflation beyond the
> > configured limit.
> > Channel state redistribution
> > Each channel state handle carries InputChannelInfo /
> > ResultSubpartitionInfo. TaskStateAssignment maps handles by their channel
> > metadata — time of origin is irrelevant. Per-subtask channel state and
> > operator state come from the same checkpoint snapshot, ensuring internal
> > consistency regardless of which checkpoint other subtasks reference.
> > Summary
> > 1.  One StateAssignmentOperation per restore — states are pre-merged
> > 2.  POINTWISE partition isolation makes mixing states across time points
> > safe for both even-split and union
> > 3.  maxConsecutiveFailures bounds refCheckpointId chain depth
> > 4.  Channel state redistribution is time-agnostic
> >
> > I’ll cover all this in the FLIP document.
> >
> > Thanks again for the detailed replies, looking forward to your feedback.
> >
> > Regards,
> > Raorao
> >
> > > 2026年6月24日 18:38,Roman Khachatryan <[email protected]> 写道:
> > >
> > > Hi Raorao, thanks for your answers
> > >
> > >>> 2. Rescaling for OperatorSubtaskState
> > >
> > >> `refCheckpointId` is checkpoint-level metadata stored in the completed
> > > checkpoint —
> > >> it doesn't participate in state redistribution during rescaling. The
> > > `TaskStateAssignment`
> > >> redistributes only the state handles to new subtasks, while the
> > > `CompletedCheckpointStore`
> > >> retains the original metadata (including `refCheckpointId`) for the
> > > cleaner to trace
> > >> reference chains. So rescaling is inherently compatible.
> > >
> > > Do you mean that we'll run StateAssignmentOperation per every
> referenced
> > > checkpoint on every "partial" checkpoint completion?
> > > (it can be quite heavy)
> > >
> > > I don't think that "redistributes only the state handles to new
> subtasks"
> > > part is true - on rescaling, distribution changes for (potentially)
> every
> > > sub-task.
> > >
> > > From the docs [1]:
> > > - Even-split redistribution: Each operator returns a List of state
> > > elements. The whole state is logically a concatenation of all lists. On
> > > restore/redistribution, the list is evenly divided into as many
> sublists
> > as
> > > there are parallel operators. Each operator gets a sublist, which can
> be
> > > empty, or contain one or more elements.
> > > - Union redistribution: Each operator returns a List of state elements.
> > The
> > > whole state is logically a concatenation of all lists. On
> > > restore/redistribution, each operator gets the complete list of state
> > > elements. ...
> > > - (we said earlier that keyed stated is not supported)
> > >
> > > So I don't see how any of the above can work because we might mix old
> and
> > > new states in a sub-task.
> > >
> > > For example:
> > > - a job is running with parallelism 2 and has even-split state
> > distribution
> > > - 1st checkpoint is completes by 2/2 sub-tasks
> > > - 2nd checkpoint is completes by 1/2 sub-tasks: sub-task 2 failed the
> > > checkpoint; its state in checkpoint 2 refers to "subtask 2 state in
> > > checkpoint 1"
> > > - job is rescaled to 1
> > > - sub-task 1 now has state from checkpoint 1 AND 2?
> > >
> > > Furthermore, with multiple rounds of downscaling, there can be a single
> > > sub-task referring to multiple historical checkpoints.
> > >
> > > For Union, it's even more problematic.
> > >
> > > Also, what about channel state (Unaligned checkpoint) re-distribution?
> > >
> > > Probably it's better to have a FLIP document with the corresponding
> > section
> > > first and then discuss it.
> > >
> > > [1]
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#operator-state
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Wed, Jun 24, 2026 at 5:21 AM 熊饶饶 <[email protected]> wrote:
> > >
> > >> Hi Zakelly,
> > >>
> > >> Thanks for the feedback! I completely agree — forcing region-level
> > merging
> > >> would significantly reduce the effectiveness of file merging, and
> that's
> > >> not the right trade-off.
> > >>
> > >> I think TM-level merging can work correctly with Regional Checkpoint
> if
> > we
> > >> ensure the FileMergingSnapshotManager properly handles the new
> > checkpoint
> > >> notification semantics. Specifically:
> > >>
> > >> - The FileMergingSnapshotManager already uses a notification-based
> > >> lifecycle (checkpoint complete/abort/subsumed) to manage physical file
> > >> deletion.
> > >> - For Regional Checkpoint, we need to introduce the new notification
> > type
> > >> we discussed earlier (notify partially-completed) so that the merging
> > >> manager knows: this checkpoint is complete but some segments belong to
> > >> failed regions — keep the physical files that contain those segments
> > alive
> > >> until the referencing checkpoints are subsumed.
> > >> - This aligns with the existing design and requires only minor
> > adjustments
> > >> to the merging manager's notification handling.
> > >>
> > >> The previous suggestion of region-level merging was overly
> conservative
> > —
> > >> glad you pointed out the better approach.
> > >>
> > >> Also good to know about the plan to cover channel state in FLIP-306.
> > That
> > >> will simplify the compatibility matrix significantly.
> > >>
> > >> Best Regards,
> > >> Raorao
> > >>
> > >>> 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道:
> > >>>
> > >>> Hi Raorao,
> > >>>
> > >>> Good to see this proposal and +1 for the direction. Sorry for joining
> > the
> > >>> discussion late. I have seen many constructive suggestions that
> largely
> > >>> align with my thoughts, but I still have one concern:
> > >>>
> > >>> I'm one of the authors of FLIP-306 and I'm not in favor of
> region-level
> > >>> merging. IIUC, region-level merge files severely limit the
> > effectiveness
> > >> of
> > >>> merging, as merging cannot happen between subtasks. I think it should
> > >> still
> > >>> be possible to perform TM-level merge. The only thing we should do is
> > to
> > >>> keep previous checkpoint files alive when a region checkpoint occurs.
> > >> This
> > >>> does not conflict with the current design. It is only necessary to
> > ensure
> > >>> that the new behavior of checkpoint notifications is compatible with
> > >>> FLIP-306, or some minor adjustment needed towards the merging
> manager.
> > >>>
> > >>> And BTW to @Rui, we still need more work to let FLIP-306 cover
> channel
> > >>> state and deprecate FLINK-26803, and I will look into this soon.
> > >>>
> > >>>
> > >>> Best,
> > >>> Zakelly
> > >>>
> > >>> On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote:
> > >>>
> > >>>> Hi Roman, thanks for the follow-up.
> > >>>>
> > >>>> 1. FLIP page
> > >>>>
> > >>>> Thanks for your reminder, I'll create the proper FLIP page before
> > >> starting
> > >>>> [VOTE] thread.
> > >>>>
> > >>>> 2. Rescaling for OperatorSubtaskState
> > >>>>
> > >>>> `refCheckpointId` is checkpoint-level metadata stored in the
> completed
> > >>>> checkpoint — it doesn't participate in state redistribution during
> > >>>> rescaling. The `TaskStateAssignment` redistributes only the state
> > >> handles
> > >>>> to new subtasks, while the `CompletedCheckpointStore` retains the
> > >> original
> > >>>> metadata (including `refCheckpointId`) for the cleaner to trace
> > >> reference
> > >>>> chains. So rescaling is inherently compatible.
> > >>>>
> > >>>> 3. Finished operators
> > >>>>
> > >>>> You're right, my previous answer missed the real issue. For bounded
> > >> source
> > >>>> jobs (FLIP-147), if the final checkpoint is Regional and a failed
> > >> Region's
> > >>>> subtask misses `notifyCheckpointComplete`, their side effects (e.g.,
> > >> Kafka
> > >>>> transactions) are never committed — that's data loss.
> > >>>>
> > >>>> Solution: force a global checkpoint when the job is about to
> > terminate.
> > >>>> When all sources are exhausted, the JM will mark the next checkpoint
> > as
> > >>>> mandatory global, requiring all regions to ack. If it fails, the job
> > >>>> retries rather than terminating with a partial snapshot.
> > >>>>
> > >>>> 4. Limitations section
> > >>>>
> > >>>> I agree with you, and I'll add a section covering all limitations:
> > >>>>
> > >>>> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime
> > >>>> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging)
> > >>>> NO_CLAIM restore mode -> Warn (require global checkpoint before
> > snapshot
> > >>>> deletion)
> > >>>> Changelog state backend ->  Reject job submission
> > >>>> Finished operators (FLIP-147) -> Force global checkpoint when
> sources
> > >> are
> > >>>> exhausted
> > >>>>
> > >>>>
> > >>>> Thanks again for the review, looking forward to your feedback.
> > >>>>
> > >>>> Regards,
> > >>>> Raorao
> > >>>>
> > >>>>
> > >>>>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道:
> > >>>>>
> > >>>>> Hi, thanks for your replies and sorry for the delay.
> > >>>>>
> > >>>>> Most of my questions were answered, but I still have some concerns.
> > >>>>>
> > >>>>>> If there are no further concerns by next Monday (June 22), I'll go
> > >> ahead
> > >>>>> and start the [VOTE] thread for this FLIP.
> > >>>>>
> > >>>>> Isn't the actual FLIP still missing? I only saw Google Document. Do
> > you
> > >>>>> mind creating a page according to [1]?
> > >>>>>
> > >>>>> ----------------------------------------
> > >>>>>
> > >>>>>> 3. Checkpoint metadata layout
> > >>>>>> Regional Checkpoint recombines state from different checkpoint
> IDs.
> > To
> > >>>>> track this, we add a refCheckpointId field to OperatorSubtaskState
> in
> > >> the
> > >>>>> metadata, indicating which historical checkpoint a subtask’s state
> > >>>>> references.
> > >>>>>
> > >>>>> Could you explain how do we find the right OperatorSubtaskState -
> > >>>>> especially in case of rescaling?
> > >>>>> Does the proposal support rescaling?
> > >>>>>
> > >>>>>> 9. Finished operators
> > >>>>>> The concern is: a finished operator’s final commit notification
> gets
> > >>>>> skipped by Regional Checkpoint, and if this checkpoint is the last
> > one,
> > >>>> the
> > >>>>> operator never receives it — could this cause data loss?
> > >>>>>> In practice, the impact is limited:
> > >>>>>> ● Failed Region tasks are already gone: By the time the Regional
> > >>>>> Checkpoint completes, tasks in the failed Region have already been
> > >>>>> restarted (decline) or cancelled (timeout). There is no task left
> to
> > >>>>> receive the notification anyway.
> > >>>>> Checkpoint failure doesn't necessarily cause a restart (especially
> if
> > >>>> this
> > >>>>> is limited to one region). The tasks should still be up and
> running.
> > >>>>>
> > >>>>>> ● maxConsecutiveFailures guarantees a global checkpoint: After
> > >> reaching
> > >>>>> the limit, the next checkpoint is forced to be global, ensuring all
> > >> tasks
> > >>>>> eventually receive notifyCheckpointComplete. We can’t skip the same
> > >>>> Region
> > >>>>> forever.
> > >>>>> maxConsecutiveFailures might not be reached for the final
> checkpoint.
> > >>>>>
> > >>>>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the user
> > >> stops
> > >>>>> the job gracefully, it triggers a full global snapshot, not a
> > Regional
> > >>>>> Checkpoint. So the final checkpoint is always complete.
> > >>>>> stop-with-savepoint should be fine, yes.
> > >>>>>
> > >>>>> To clarify, my concern is about jobs with bounded sources. In such
> > >> cases,
> > >>>>> some subtasks might finish processing but still participate in
> > >>>> checkpoints.
> > >>>>> After a successful checkpoint, they are guaranteed to get
> checkpoint
> > >>>>> completion notification - so that they can make side effects
> visible
> > in
> > >>>>> external systems (commit Kafka transactions).
> > >>>>> See FLIP-147 [2]
> > >>>>>
> > >>>>> However, with the current proposal, the job might complete with
> some
> > >>>>> subtasks/regions failing the final checkpoint unless I'm missing
> > >>>> something.
> > >>>>> This is essentially data loss.
> > >>>>> To prevent this, the final checkpoint must always be acked by all
> > >>>>> subtasks/regions.
> > >>>>>
> > >>>>> ----------------------------------------
> > >>>>>
> > >>>>> There are quite some limitations in this proposal.
> > >>>>> Could you add a section describing how each of them is handled?
> > >>>>> 1. Reject job submission
> > >>>>> 2. Force all-region checkpoint
> > >>>>> 3. Warn in documentation
> > >>>>>
> > >>>>>> 1. Region independence — BLOCKING/HYBRID edges
> > >>>>>> You’re right. Our current scope is limited to embarrassingly
> > parallel
> > >>>>> regions. In typical ETL scenarios, each parallelism maps to an
> > >>>> independent
> > >>>>> Region with no edges connecting them.
> > >>>>>
> > >>>>>> 5. SharedStateRegistry — how are old states kept alive?
> > >>>>>> Good question. In the current design, since we only target
> > >>>> embarrassingly
> > >>>>> parallel regions, there is typically no keyed state and no
> > incremental
> > >>>>> state. As a result, the SharedStateRegistry is generally empty
> > (setting
> > >>>>> aside File Merging and Changelog State for now, discussed on 8.),
> so
> > >>>>> keep-alive of files under the shared directory is not a concern.
> > >>>>>
> > >>>>>> 8. FLINK-26803 and FLIP-306 compatibility
> > >>>>>> This is a very important point. Both features essentially merge
> > small
> > >>>>> files at the job level. As Rui Fan pointed out, if the merging
> > >>>> granularity
> > >>>>> is reduced to the Region level, compatibility with Regional
> > Checkpoint
> > >>>>> should be achievable in theory. I think this can be deferred to
> > future
> > >>>> work
> > >>>>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit
> and
> > >>>> enable
> > >>>>> support.
> > >>>>>
> > >>>>>> 10. NO_CLAIM mode warning
> > >>>>>> You’re absolutely right — this is an important reminder. After
> > >> restoring
> > >>>>> from a Regional Checkpoint, only a successful global checkpoint
> > >>>> guarantees
> > >>>>> independence from the old state. We’ll add a clear user warning in
> > the
> > >>>>> documentation.
> > >>>>>
> > >>>>>> 11. Changelog state backend — not supported
> > >>>>>> As mentioned earlier, our primary target is embarrassingly
> parallel
> > >>>>> regions, which typically have no keyed state and therefore no slow
> > >>>>> incremental state flush issues. I don’t think we need to support
> > >>>> Changelog
> > >>>>> state backend for now.
> > >>>>>
> > >>>>> ----------------------------------------
> > >>>>>
> > >>>>>> 2. max-consecutive-failures exceeded — what exactly happens?
> > >>>>>> The current design says “force a global checkpoint.” To clarify
> the
> > >>>>> two-tier behavior:
> > >>>>>> ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures,
> > the
> > >>>>> next checkpoint is forced to be global.
> > >>>>>> ● Tier 2: If that forced global checkpoint also fails (any task
> > >>>>> declines), the checkpoint is aborted normally (not a job failure).
> > The
> > >>>>> counter is then reset since a global checkpoint was attempted, and
> > the
> > >>>> next
> > >>>>> checkpoint cycle can try again.
> > >>>>>> This avoids cascading into job failure while ensuring we don’t
> drift
> > >>>>> indefinitely on historical state.
> > >>>>>
> > >>>>> My assumption was that we would not allow this particular failed
> > region
> > >>>> to
> > >>>>> fail the checkpoint again.
> > >>>>> But forcing a global checkpoint works as well.
> > >>>>>
> > >>>>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new
> > >>>>> notification type
> > >>>>>> This is a very insightful point. Zihao and Gen also raised this in
> > >>>>> earlier discussions. The current design doesn’t address state
> cleanup
> > >> for
> > >>>>> tasks in failed regions. I agree it’s necessary to introduce a new
> > >>>>> notification type. For tasks in failed regions, local state cleanup
> > can
> > >>>> be
> > >>>>> deferred until the next checkpoint trigger.
> > >>>>> Ok, this can be some future work.
> > >>>>>
> > >>>>>> 7. Task that never acknowledges nor declines — per-region timeouts
> > >>>>>> This was discussed in the previous thread. Network issues may
> cause
> > a
> > >>>>> task to neither ack nor decline in time. In such cases, we treat it
> > as
> > >> a
> > >>>>> checkpoint timeout: the affected tasks’ region is marked as failed,
> > and
> > >>>> the
> > >>>>> process ultimately falls through to the normalRegional Checkpoint
> > >>>>> processing logic.
> > >>>>> Ok, this can be some future work.
> > >>>>>
> > >>>>> ----------------------------------------
> > >>>>>
> > >>>>> [1]
> > >>>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP
> > >>>>>
> > >>>>> [2]
> > >>>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> > >>>>>
> > >>>>> Regards,
> > >>>>> Roman
> > >>>>>
> > >>>>> Regards,
> > >>>>> Roman
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> wrote:
> > >>>>>
> > >>>>>> Hi all,
> > >>>>>>
> > >>>>>> Thanks everyone for the valuable feedback. I believe all the
> points
> > >>>> raised
> > >>>>>> above have been addressed (@Roman @Rui Fan). If there are no
> further
> > >>>>>> concerns  by next Monday (June 22), I'll go ahead and start the
> > [VOTE]
> > >>>>>> thread for this FLIP.
> > >>>>>>
> > >>>>>> For reference, the earlier related discussion can be found here:
> > >>>>>> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf
> > >>>>>>
> > >>>>>>
> > >>>>>> Please feel free to share any additional feedback before then.
> > >>>>>>
> > >>>>>> Best Regards,
> > >>>>>> Raorao
> > >>>>>>
> > >>>>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
> > >>>>>>
> > >>>>>> Hi devs,
> > >>>>>>
> > >>>>>> I would like to start a discussion on FLIP-XXX: Independent
> > Checkpoint
> > >>>>>> Based On Pipeline Region.
> > >>>>>>
> > >>>>>> In high-parallelism streaming jobs, a single Task's checkpoint
> > failure
> > >>>>>> causes the entire global Checkpoint to abort, leading to degraded
> > >>>>>> checkpoint success rates and wasted compute resources (especially
> > for
> > >>>> GPU
> > >>>>>> operators).
> > >>>>>>
> > >>>>>> We propose Regional Checkpoint: when some Regions fail to
> > checkpoint,
> > >>>> the
> > >>>>>> framework combines the historical state of the failed Regions with
> > the
> > >>>>>> current state of the healthy Regions to produce a logically
> complete
> > >>>>>> Completed Checkpoint — while preserving state consistency. The key
> > >>>> changes
> > >>>>>> are:
> > >>>>>>
> > >>>>>> 1. Snapshot Collection — Allow partial region failures; combine
> last
> > >>>>>> successful state of failed Regions with current state of normal
> > >> Regions.
> > >>>>>>
> > >>>>>> 2. State Correction — New checkpointCoordinatorForRegionFallback
> > >>>> interface
> > >>>>>> for OperatorCoordinators to produce consistent snapshots against
> the
> > >>>> mixed
> > >>>>>> view.
> > >>>>>>
> > >>>>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to
> prevent
> > >>>>>> premature cleanup of referenced historical checkpoints.
> > >>>>>>
> > >>>>>> The detailed design is described in the FLIP document:
> > >>>>>>
> > >>>>>>
> > >>>>
> > >>
> >
> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
> > >>>>>>
> > >>>>>> Looking forward to your feedback!
> > >>>>>>
> > >>>>>> Best regards,
> > >>>>>>
> > >>>>>> Raorao Xiong
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>
> > >>>>
> > >>
> > >>
> >
> >
>

Reply via email to