Hi Becket,

I think that's a great idea!  I have added the
SupportSplitReassignmentOnRecovery interface in this FLIP. If a Source
implements this interface indicates that the source operator needs to
report splits to the enumerator and receive reassignment.[1]

Best,
Hongshun

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment

On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected]> wrote:

> Hi Hongshun,
>
> I think the convention for such optional features in Source is via mix-in
> interfaces. So instead of adding a method to the SourceReader, maybe we
> should introduce an interface SupportSplitReassingmentOnRecovery with this
> method. If a Source implementation implements that interface, then the
> SourceOperator will check the desired behavior and act accordingly.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <[email protected]>
> wrote:
>
>> Hi de vs,
>>
>> Would anyone like to discuss this FLIP? I'd appreciate your feedback and
>> suggestions.
>>
>> Best,
>> Hongshun
>>
>>
>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道:
>>
>> Hi Becket,
>>
>> Thank you for your detailed feedback. The new contract makes good sense
>> to me and effectively addresses the issues I encountered at the beginning
>> of the design.
>>
>> That said, I recommend not reporting splits by default, primarily for
>> compatibility and practical reasons:
>>
>> >  For these reasons, we do not expect the Split objects to be huge, and
>> we are not trying to design for huge Split objects either as they will have
>> problems even today.
>>
>>    1.
>>
>>    Not all existing connector match this rule
>>    For example, in mysql cdc connector, a binlog split may contain
>>    hundreds (or even more) snapshot split completion records. This state is
>>    large and is currently transmitted incrementally through multiple
>>    BinlogSplitMetaEvent messages. Since the binlog reader operates with 
>> single
>>    parallelism, reporting the full split state on recovery could be
>>    inefficient or even infeasible.
>>    For such sources, it would be better to provide a mechanism to skip
>>    split reporting during restart until they redesign and reduce the
>>    split size.
>>    2.
>>
>>    Not all enumerators maintain unassigned splits in state.
>>    Some SplitEnumerator(such as kafka connector) implementations do not
>>    track or persistently manage unassigned splits. Requiring them to handle
>>    re-registration would add unnecessary complexity. Even though we maybe
>>    implements in kafka connector, currently, kafka connector is decouple with
>>    flink version, we also need to make sure the elder version is compatible.
>>
>> ------------------------------
>>
>> To address these concerns, I propose introducing a new method: boolean
>> SourceReader#shouldReassignSplitsOnRecovery() with a default
>> implementation returning false. This allows source readers to opt in to
>> split reassignment only when necessary. Since the new contract already
>> places the responsibility for split assignment on the enumerator, not
>> reporting splits by default is a safe and clean default behavior.
>>
>>
>> ------------------------------
>>
>>
>> I’ve updated the implementation and the FIP accordingly[1]. It quite a
>> big change. In particular, for the Kafka connector, we can now use a
>> pluggable SplitPartitioner to support different split assignment
>> strategies (e.g., default, round-robin).
>>
>>
>> Could you please review it when you have a chance?
>>
>>
>> Best,
>>
>> Hongshun
>>
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>
>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected]> wrote:
>>
>>> Hi Hongshun,
>>>
>>> I am not too concerned about the transmission cost. Because the full
>>> split transmission has to happen in the initial assignment phase already.
>>> And in the future, we probably want to also introduce some kind of workload
>>> balance across source readers, e.g. based on the per-split throughput or
>>> the per-source-reader workload in heterogeneous clusters. For these
>>> reasons, we do not expect the Split objects to be huge, and we are not
>>> trying to design for huge Split objects either as they will have problems
>>> even today.
>>>
>>> Good point on the potential split loss, please see the reply below:
>>>
>>> Scenario 2:
>>>
>>>
>>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4)
>>>> upon restart.
>>>> 2. Before the enumerator receives all reports and performs
>>>> reassignment, a checkpoint is triggered.
>>>> 3. Since no splits have been reassigned yet, both readers have empty
>>>> states.
>>>> 4. When restarting from this checkpoint, all four splits are lost.
>>>
>>> The reader registration happens in the SourceOperator.open(), which
>>> means the task is still in the initializing state, therefore the checkpoint
>>> should not be triggered until the enumerator receives all the split reports.
>>>
>>> There is a nuance here. Today, the RPC call from the TM to the JM is
>>> async. So it is possible that the SourceOpertor.open() has returned, but
>>> the enumerator has not received the split reports. However, because the
>>> task status update RPC call goes to the same channel as the split reports
>>> call, so the task status RPC call will happen after the split reports call
>>> on the JM side. Therefore, on the JM side, the SourceCoordinator will
>>> always first receive the split reports, then receive the checkpoint request.
>>> This "happen before" relationship is kind of important to guarantee the
>>> consistent state between enumerator and readers.
>>>
>>> Scenario 1:
>>>
>>>
>>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and Reader
>>>> B reports (3 and 4).
>>>> 2. The enumerator receives these reports but only reassigns splits 1
>>>> and 2 — not 3 and 4.
>>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 are
>>>> recorded in the reader states; splits 3 and 4 are not persisted.
>>>> 4. If the job is later restarted from this checkpoint, splits 3 and 4
>>>> will be permanently lost.
>>>
>>> This scenario is possible. One solution is to let the enumerator
>>> implementation handle this. That means if the enumerator relies on the
>>> initial split reports from the source readers, it should maintain these
>>> reports by itself. In the above example, the enumerator will need to
>>> remember that 3 and 4 are not assigned and put it into its own state.
>>> The current contract is that anything assigned to the SourceReaders
>>> are completely owned by the SourceReaders. Enumerators can remember the
>>> assignments but cannot change them, even when the source reader recovers /
>>> restarts.
>>> With this FLIP, the contract becomes that the source readers will return
>>> the ownership of the splits to the enumerator. So the enumerator is
>>> responsible for maintaining these splits, until they are assigned to a
>>> source reader again.
>>>
>>> There are other cases where there may be conflict information between
>>> reader and enumerator. For example, consider the following sequence:
>>> 1. reader A reports splits (1 and 2) up on restart.
>>> 2. enumerator receives the report and assigns both 1 and 2 to reader B.
>>> 3. reader A failed before checkpointing. And this is a partial failure,
>>> so only reader A restarts.
>>> 4. When reader A recovers, it will again report splits (1 and 2) to the
>>> enumerator.
>>> 5. The enumerator should ignore this report because it has
>>> assigned splits (1 and 2) to reader B.
>>>
>>> So with the new contract, the enumerator should be the source of truth
>>> for split ownership.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <[email protected]>
>>> wrote:
>>>
>>>> Hi Becket,
>>>>
>>>>
>>>> I did consider this approach at the beginning (and it was also
>>>> mentioned in this FLIP), since it would allow more flexibility in
>>>> reassigning all splits. However, there are a few potential issues.
>>>>
>>>> 1. High Transmission Cost
>>>> If we pass the full split objects (rather than just split IDs), the
>>>> data size could be significant, leading to high overhead during
>>>> transmission — especially when many splits are involved.
>>>>
>>>> 2. Risk of Split Loss
>>>>
>>>> Risk of split loss exists unless we have a mechanism to make sure only
>>>> can checkpoint after all the splits are reassigned.
>>>> There are scenarios where splits could be lost due to inconsistent
>>>> state handling during recovery:
>>>>
>>>>
>>>> Scenario 1:
>>>>
>>>>
>>>>    1. Upon restart, Reader A reports assigned splits (1 and 2), and
>>>>    Reader B reports (3 and 4).
>>>>    2. The enumerator receives these reports but only reassigns splits
>>>>    1 and 2 — not 3 and 4.
>>>>    3. A checkpoint or savepoint is then triggered. Only splits 1 and 2
>>>>    are recorded in the reader states; splits 3 and 4 are not persisted.
>>>>    4. If the job is later restarted from this checkpoint, splits 3 and
>>>>    4 will be permanently lost.
>>>>
>>>>
>>>> Scenario 2:
>>>>
>>>>    1. Reader A reports splits (1 and 2), and Reader B reports (3 and
>>>>    4) upon restart.
>>>>    2. Before the enumerator receives all reports and performs
>>>>    reassignment, a checkpoint is triggered.
>>>>    3. Since no splits have been reassigned yet, both readers have
>>>>    empty states.
>>>>    4. When restarting from this checkpoint, all four splits are lost.
>>>>
>>>>
>>>> Let me know if you have thoughts on how we might mitigate these risks!
>>>>
>>>> Best
>>>> Hongshun
>>>>
>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected]> wrote:
>>>>
>>>>> Hi Hongshun,
>>>>>
>>>>> The steps sound reasonable to me in general. In terms of the updated
>>>>> FLIP wiki, it would be good to see if we can keep the protocol simple. One
>>>>> alternative way to achieve this behavior is following:
>>>>>
>>>>> 1. Upon SourceOperator startup, the SourceOperator sends
>>>>> ReaderRegistrationEvent with the currently assigned splits to the
>>>>> enumerator. It does not add these splits to the SourceReader.
>>>>> 2. The enumerator will always use the
>>>>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via the
>>>>> response of the SourceRegistrationEvent, this allows async split 
>>>>> assignment
>>>>> in case the enumerator wants to wait until all the readers are registered)
>>>>> 3. The SourceOperator will only call SourceReader.addSplits() when it
>>>>> receives the AddSplitEvent from the enumerator.
>>>>>
>>>>> This protocol has a few benefits:
>>>>> 1. it basically allows arbitrary split reassignment upon restart
>>>>> 2. simplicity: there is only one way to assign splits.
>>>>>
>>>>> So we only need one interface change:
>>>>> - add the initially assigned splits to ReaderInfo so the Enumerator
>>>>> can access it.
>>>>> and one behavior change:
>>>>> - The SourceOperator should stop assigning splits to the from state
>>>>> restoration, but only do that when it receives AddSplitsEvent from the
>>>>> enumerator.
>>>>>
>>>>> The enumerator story is also simple:
>>>>> 1. Receive some kind of notification (new partition, new reader, etc)
>>>>> 2. look at the reader information (in the enumerator context or
>>>>> self-maintained state)
>>>>> 3. assign splits via the enumerator context.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Becket,
>>>>>> Thanks for your advice — I’ve quickly learned a lot about the
>>>>>> reader’s design principle. It’s really interesting!
>>>>>>
>>>>>> > One principle we want to follow is that the enumerator should be
>>>>>> the brain doing the splits assignment, while the source readers read from
>>>>>> the assigned splits. So we want to avoid the case where the SourceReader
>>>>>> ignores the split assignment.
>>>>>>
>>>>>> It appears that MySQL CDC currently bypasses this principle by
>>>>>> proactively removing unused splits directly in the SourceReader. This may
>>>>>> be due to the lack of built-in framework support for such cleanup, 
>>>>>> forcing
>>>>>> connectors to handle it manually. However, this responsibility ideally
>>>>>> belongs in the framework.
>>>>>>
>>>>>> With this FLIP, we propose a redesigned mechanism that centralizes
>>>>>> split cleanup logic in the SplitEnumerator, allowing connectors like 
>>>>>> MySQL
>>>>>> CDC to eventually adopt it( @leneord, CC).
>>>>>>
>>>>>> To achieve this, we must carefully manage state consistency during
>>>>>> startup and recovery. The proposed approach is as follows:
>>>>>>
>>>>>>    1. Reader Registration with Deferred Assignment
>>>>>>    When a reader starts (SourceOperator#open), it sends a
>>>>>>    ReaderRegistrationEvent to the SplitEnumerator, including its
>>>>>>    previously assigned splits (restored from state). However, these 
>>>>>> splits
>>>>>>    are not yet assigned to the reader. The SourceOperator is placed
>>>>>>    in a PENDING state.
>>>>>>    2. Prevent State Pollution During Registration
>>>>>>    While in the PENDING state, SourceOperator#snapshotState will not
>>>>>>    update the operator state. This prevents empty or outdated reader 
>>>>>> state
>>>>>>    (e.g., with removed splits) from polluting the checkpoint.
>>>>>>    3. Enumerator Performs Split Cleanup and Acknowledges
>>>>>>    Upon receiving the ReaderRegistrationEvent, the SplitEnumerator 
>>>>>> removes
>>>>>>    any splits that are no longer valid (e.g., due to removed topics or 
>>>>>> tables)
>>>>>>    and returns the list of remaining valid split IDs to the reader via a
>>>>>>    ReaderRegistrationACKEvent.
>>>>>>    For backward compatibility, the default behavior is to return all
>>>>>>    split IDs (i.e., no filtering).
>>>>>>    4. Finalize Registration and Resume Normal Operation
>>>>>>    When the SourceOperator receives the ReaderRegistrationACKEvent,
>>>>>>    it assigns the confirmed splits to the reader and transitions its 
>>>>>> state to
>>>>>>    REGISTERED. From this point onward, SourceOperator#snapshotState can
>>>>>>    safely update the operator state.
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Hongshun
>>>>>>
>>>>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> SourceCoordinator doesn't store splits that have already been
>>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits only 
>>>>>>>> for
>>>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you mean
>>>>>>>> SourceOperator?
>>>>>>>
>>>>>>> Yes, I meant SourceOperator.
>>>>>>>
>>>>>>> At the beginning, I also thought about using it. However, there are
>>>>>>>> two situations:
>>>>>>>> 1. During restart, if source options remove a topic or table:
>>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after 
>>>>>>>> restart
>>>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the 
>>>>>>>> configured
>>>>>>>> topics change, removed topic's splits are still read. I also want to 
>>>>>>>> do the
>>>>>>>> same thing in Kafka.
>>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be
>>>>>>>> removed after restart.
>>>>>>>> In these cases, I have to get the assigned splits after
>>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator
>>>>>>>> directly.
>>>>>>>
>>>>>>>
>>>>>>> One principle we want to follow is that the enumerator should be the
>>>>>>> brain doing the splits assignment, while the source readers read from 
>>>>>>> the
>>>>>>> assigned splits. So we want to avoid the case where the SourceReader
>>>>>>> ignores the split assignment. Given this principle,
>>>>>>> For case 1, if there is a subscription change, it might be better to
>>>>>>> hold back calling SourceReader.addSplits() until an assignment is 
>>>>>>> confirmed
>>>>>>> by the Enumerator. In fact, this might be a good default behavior
>>>>>>> regardless of whether there is a subscription change.
>>>>>>> For case 2: if a bounded split is finished, the
>>>>>>> SourceReader.snapshotState() will not contain that split. So upon
>>>>>>> restoration, those splits should not appear, right?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Jiangjie (Becket) Qin
>>>>>>>
>>>>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Becket,
>>>>>>>>
>>>>>>>> Thank you a lot for your advice, which helped me a lot.
>>>>>>>> >  It seems that we don't need the method `SourceReader.
>>>>>>>> getAssignedSplits()`. The assigned splits are available in the
>>>>>>>> SourceCoordinator upon state restoration.
>>>>>>>>
>>>>>>>>  SourceCoordinator doesn't store splits that have already been
>>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits only 
>>>>>>>> for
>>>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you mean
>>>>>>>> SourceOperator?
>>>>>>>>
>>>>>>>> At the beginning, I also thought about using it. However, there are
>>>>>>>> two situations:
>>>>>>>> 1. During restart, if source options remove a topic or table:
>>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after 
>>>>>>>> restart
>>>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the 
>>>>>>>> configured
>>>>>>>> topics change, removed topic's splits are still read. I also want to 
>>>>>>>> do the
>>>>>>>> same thing in Kafka.
>>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be
>>>>>>>> removed after restart.
>>>>>>>> In these cases, I have to get the assigned splits after
>>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator
>>>>>>>> directly.
>>>>>>>>
>>>>>>>> >  By design, the SplitEnumerator can get the reader information
>>>>>>>> any time from the `SplitEnumeratorContext.registeredReaders()`.
>>>>>>>> It looks good.
>>>>>>>>
>>>>>>>> Thanks again.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Hongshun
>>>>>>>>
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Hongshun,
>>>>>>>>>
>>>>>>>>> Thanks for the proposal. The current Kafka split assignment
>>>>>>>>> algorithm does seem to have issues. (I cannot recall why it was 
>>>>>>>>> implemented
>>>>>>>>> this way at that time...).
>>>>>>>>>
>>>>>>>>> Two quick comments:
>>>>>>>>> 1. It seems that we don't need the method `SourceReader.
>>>>>>>>> getAssignedSplits()`. The assigned splits are available in the
>>>>>>>>> SourceCoordinator upon state restoration and can be put into the
>>>>>>>>> ReaderRegistrationEvent.
>>>>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int
>>>>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of
>>>>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the 
>>>>>>>>> SplitEnumerator
>>>>>>>>> can get the reader information any time from the
>>>>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the
>>>>>>>>> Enumerator implementation to remember the initially assigned splits, 
>>>>>>>>> if it
>>>>>>>>> wants to wait until all the readers are registered. This also allow 
>>>>>>>>> future
>>>>>>>>> addition of reader information.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Anyone familiar with kafka connector can help review this FLIP? I
>>>>>>>>>> am looking forward for your reply.
>>>>>>>>>>
>>>>>>>>>> Best
>>>>>>>>>> Hongshun
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Hongshun for driving this work.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We also suffering the issue in production Kafka restoration
>>>>>>>>>>> usage, current design is a nice tradeoff and has considered the new 
>>>>>>>>>>> Source
>>>>>>>>>>> implementation details, +1 from my side.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Leonard
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>> >
>>>>>>>>>>> > Hi devs,
>>>>>>>>>>> >
>>>>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator
>>>>>>>>>>> with Global
>>>>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment]
>>>>>>>>>>> [1], which
>>>>>>>>>>> > addresses critical limitations in our current Kafka connector
>>>>>>>>>>> split
>>>>>>>>>>> > distribution mechanism.
>>>>>>>>>>> >
>>>>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios
>>>>>>>>>>> currently lead to
>>>>>>>>>>> > uneven Kafka split distribution, causing reader delays and
>>>>>>>>>>> performance
>>>>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack
>>>>>>>>>>> of visibility
>>>>>>>>>>> > into post-assignment split distribution.
>>>>>>>>>>> >
>>>>>>>>>>> > This flip does two things:
>>>>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should
>>>>>>>>>>> send
>>>>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after
>>>>>>>>>>> startup to
>>>>>>>>>>> > ensure state consistency.
>>>>>>>>>>> > 2. Implementation in the Kafka connector to resolve imbalanced
>>>>>>>>>>> splits and
>>>>>>>>>>> > state awareness during recovery (the enumerator will always
>>>>>>>>>>> choose the
>>>>>>>>>>> > least assigned subtask,and reason aslo as follows)
>>>>>>>>>>> >
>>>>>>>>>>> > Any additional questions regarding this FLIP? Looking forward
>>>>>>>>>>> to hearing
>>>>>>>>>>> > from you.
>>>>>>>>>>> >
>>>>>>>>>>> > Best
>>>>>>>>>>> > Hongshun
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > [1]
>>>>>>>>>>> >
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762
>>>>>>>>>>>
>>>>>>>>>>>
>>

Reply via email to