Hi Hongshun,

I think the convention for such optional features in Source is via mix-in
interfaces. So instead of adding a method to the SourceReader, maybe we
should introduce an interface SupportSplitReassingmentOnRecovery with this
method. If a Source implementation implements that interface, then the
SourceOperator will check the desired behavior and act accordingly.

Thanks,

Jiangjie (Becket) Qin

On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi de vs,
>
> Would anyone like to discuss this FLIP? I'd appreciate your feedback and
> suggestions.
>
> Best,
> Hongshun
>
>
> 2025年8月13日 14:23,Hongshun Wang <loserwang1...@gmail.com> 写道:
>
> Hi Becket,
>
> Thank you for your detailed feedback. The new contract makes good sense to
> me and effectively addresses the issues I encountered at the beginning of
> the design.
>
> That said, I recommend not reporting splits by default, primarily for
> compatibility and practical reasons:
>
> >  For these reasons, we do not expect the Split objects to be huge, and
> we are not trying to design for huge Split objects either as they will have
> problems even today.
>
>    1.
>
>    Not all existing connector match this rule
>    For example, in mysql cdc connector, a binlog split may contain
>    hundreds (or even more) snapshot split completion records. This state is
>    large and is currently transmitted incrementally through multiple
>    BinlogSplitMetaEvent messages. Since the binlog reader operates with single
>    parallelism, reporting the full split state on recovery could be
>    inefficient or even infeasible.
>    For such sources, it would be better to provide a mechanism to skip
>    split reporting during restart until they redesign and reduce the
>    split size.
>    2.
>
>    Not all enumerators maintain unassigned splits in state.
>    Some SplitEnumerator(such as kafka connector) implementations do not
>    track or persistently manage unassigned splits. Requiring them to handle
>    re-registration would add unnecessary complexity. Even though we maybe
>    implements in kafka connector, currently, kafka connector is decouple with
>    flink version, we also need to make sure the elder version is compatible.
>
> ------------------------------
>
> To address these concerns, I propose introducing a new method: boolean
> SourceReader#shouldReassignSplitsOnRecovery() with a default
> implementation returning false. This allows source readers to opt in to
> split reassignment only when necessary. Since the new contract already
> places the responsibility for split assignment on the enumerator, not
> reporting splits by default is a safe and clean default behavior.
>
>
> ------------------------------
>
>
> I’ve updated the implementation and the FIP accordingly[1]. It quite a big
> change. In particular, for the Kafka connector, we can now use a pluggable
> SplitPartitioner to support different split assignment strategies (e.g.,
> default, round-robin).
>
>
> Could you please review it when you have a chance?
>
>
> Best,
>
> Hongshun
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>
> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <becket....@gmail.com> wrote:
>
>> Hi Hongshun,
>>
>> I am not too concerned about the transmission cost. Because the full
>> split transmission has to happen in the initial assignment phase already.
>> And in the future, we probably want to also introduce some kind of workload
>> balance across source readers, e.g. based on the per-split throughput or
>> the per-source-reader workload in heterogeneous clusters. For these
>> reasons, we do not expect the Split objects to be huge, and we are not
>> trying to design for huge Split objects either as they will have problems
>> even today.
>>
>> Good point on the potential split loss, please see the reply below:
>>
>> Scenario 2:
>>
>>
>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4)
>>> upon restart.
>>> 2. Before the enumerator receives all reports and performs reassignment,
>>> a checkpoint is triggered.
>>> 3. Since no splits have been reassigned yet, both readers have empty
>>> states.
>>> 4. When restarting from this checkpoint, all four splits are lost.
>>
>> The reader registration happens in the SourceOperator.open(), which means
>> the task is still in the initializing state, therefore the checkpoint
>> should not be triggered until the enumerator receives all the split reports.
>>
>> There is a nuance here. Today, the RPC call from the TM to the JM is
>> async. So it is possible that the SourceOpertor.open() has returned, but
>> the enumerator has not received the split reports. However, because the
>> task status update RPC call goes to the same channel as the split reports
>> call, so the task status RPC call will happen after the split reports call
>> on the JM side. Therefore, on the JM side, the SourceCoordinator will
>> always first receive the split reports, then receive the checkpoint request.
>> This "happen before" relationship is kind of important to guarantee the
>> consistent state between enumerator and readers.
>>
>> Scenario 1:
>>
>>
>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and Reader
>>> B reports (3 and 4).
>>> 2. The enumerator receives these reports but only reassigns splits 1 and
>>> 2 — not 3 and 4.
>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 are
>>> recorded in the reader states; splits 3 and 4 are not persisted.
>>> 4. If the job is later restarted from this checkpoint, splits 3 and 4
>>> will be permanently lost.
>>
>> This scenario is possible. One solution is to let the enumerator
>> implementation handle this. That means if the enumerator relies on the
>> initial split reports from the source readers, it should maintain these
>> reports by itself. In the above example, the enumerator will need to
>> remember that 3 and 4 are not assigned and put it into its own state.
>> The current contract is that anything assigned to the SourceReaders
>> are completely owned by the SourceReaders. Enumerators can remember the
>> assignments but cannot change them, even when the source reader recovers /
>> restarts.
>> With this FLIP, the contract becomes that the source readers will return
>> the ownership of the splits to the enumerator. So the enumerator is
>> responsible for maintaining these splits, until they are assigned to a
>> source reader again.
>>
>> There are other cases where there may be conflict information between
>> reader and enumerator. For example, consider the following sequence:
>> 1. reader A reports splits (1 and 2) up on restart.
>> 2. enumerator receives the report and assigns both 1 and 2 to reader B.
>> 3. reader A failed before checkpointing. And this is a partial failure,
>> so only reader A restarts.
>> 4. When reader A recovers, it will again report splits (1 and 2) to the
>> enumerator.
>> 5. The enumerator should ignore this report because it has
>> assigned splits (1 and 2) to reader B.
>>
>> So with the new contract, the enumerator should be the source of truth
>> for split ownership.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <loserwang1...@gmail.com>
>> wrote:
>>
>>> Hi Becket,
>>>
>>>
>>> I did consider this approach at the beginning (and it was also mentioned
>>> in this FLIP), since it would allow more flexibility in reassigning all
>>> splits. However, there are a few potential issues.
>>>
>>> 1. High Transmission Cost
>>> If we pass the full split objects (rather than just split IDs), the data
>>> size could be significant, leading to high overhead during transmission —
>>> especially when many splits are involved.
>>>
>>> 2. Risk of Split Loss
>>>
>>> Risk of split loss exists unless we have a mechanism to make sure only
>>> can checkpoint after all the splits are reassigned.
>>> There are scenarios where splits could be lost due to inconsistent state
>>> handling during recovery:
>>>
>>>
>>> Scenario 1:
>>>
>>>
>>>    1. Upon restart, Reader A reports assigned splits (1 and 2), and
>>>    Reader B reports (3 and 4).
>>>    2. The enumerator receives these reports but only reassigns splits 1
>>>    and 2 — not 3 and 4.
>>>    3. A checkpoint or savepoint is then triggered. Only splits 1 and 2
>>>    are recorded in the reader states; splits 3 and 4 are not persisted.
>>>    4. If the job is later restarted from this checkpoint, splits 3 and
>>>    4 will be permanently lost.
>>>
>>>
>>> Scenario 2:
>>>
>>>    1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4)
>>>    upon restart.
>>>    2. Before the enumerator receives all reports and performs
>>>    reassignment, a checkpoint is triggered.
>>>    3. Since no splits have been reassigned yet, both readers have empty
>>>    states.
>>>    4. When restarting from this checkpoint, all four splits are lost.
>>>
>>>
>>> Let me know if you have thoughts on how we might mitigate these risks!
>>>
>>> Best
>>> Hongshun
>>>
>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <becket....@gmail.com> wrote:
>>>
>>>> Hi Hongshun,
>>>>
>>>> The steps sound reasonable to me in general. In terms of the updated
>>>> FLIP wiki, it would be good to see if we can keep the protocol simple. One
>>>> alternative way to achieve this behavior is following:
>>>>
>>>> 1. Upon SourceOperator startup, the SourceOperator sends
>>>> ReaderRegistrationEvent with the currently assigned splits to the
>>>> enumerator. It does not add these splits to the SourceReader.
>>>> 2. The enumerator will always use the
>>>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via the
>>>> response of the SourceRegistrationEvent, this allows async split assignment
>>>> in case the enumerator wants to wait until all the readers are registered)
>>>> 3. The SourceOperator will only call SourceReader.addSplits() when it
>>>> receives the AddSplitEvent from the enumerator.
>>>>
>>>> This protocol has a few benefits:
>>>> 1. it basically allows arbitrary split reassignment upon restart
>>>> 2. simplicity: there is only one way to assign splits.
>>>>
>>>> So we only need one interface change:
>>>> - add the initially assigned splits to ReaderInfo so the Enumerator can
>>>> access it.
>>>> and one behavior change:
>>>> - The SourceOperator should stop assigning splits to the from state
>>>> restoration, but only do that when it receives AddSplitsEvent from the
>>>> enumerator.
>>>>
>>>> The enumerator story is also simple:
>>>> 1. Receive some kind of notification (new partition, new reader, etc)
>>>> 2. look at the reader information (in the enumerator context or
>>>> self-maintained state)
>>>> 3. assign splits via the enumerator context.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <loserwang1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Becket,
>>>>> Thanks for your advice — I’ve quickly learned a lot about the reader’s
>>>>> design principle. It’s really interesting!
>>>>>
>>>>> > One principle we want to follow is that the enumerator should be the
>>>>> brain doing the splits assignment, while the source readers read from the
>>>>> assigned splits. So we want to avoid the case where the SourceReader
>>>>> ignores the split assignment.
>>>>>
>>>>> It appears that MySQL CDC currently bypasses this principle by
>>>>> proactively removing unused splits directly in the SourceReader. This may
>>>>> be due to the lack of built-in framework support for such cleanup, forcing
>>>>> connectors to handle it manually. However, this responsibility ideally
>>>>> belongs in the framework.
>>>>>
>>>>> With this FLIP, we propose a redesigned mechanism that centralizes
>>>>> split cleanup logic in the SplitEnumerator, allowing connectors like MySQL
>>>>> CDC to eventually adopt it( @leneord, CC).
>>>>>
>>>>> To achieve this, we must carefully manage state consistency during
>>>>> startup and recovery. The proposed approach is as follows:
>>>>>
>>>>>    1. Reader Registration with Deferred Assignment
>>>>>    When a reader starts (SourceOperator#open), it sends a
>>>>>    ReaderRegistrationEvent to the SplitEnumerator, including its
>>>>>    previously assigned splits (restored from state). However, these splits
>>>>>    are not yet assigned to the reader. The SourceOperator is placed
>>>>>    in a PENDING state.
>>>>>    2. Prevent State Pollution During Registration
>>>>>    While in the PENDING state, SourceOperator#snapshotState will not
>>>>>    update the operator state. This prevents empty or outdated reader state
>>>>>    (e.g., with removed splits) from polluting the checkpoint.
>>>>>    3. Enumerator Performs Split Cleanup and Acknowledges
>>>>>    Upon receiving the ReaderRegistrationEvent, the SplitEnumerator removes
>>>>>    any splits that are no longer valid (e.g., due to removed topics or 
>>>>> tables)
>>>>>    and returns the list of remaining valid split IDs to the reader via a
>>>>>    ReaderRegistrationACKEvent.
>>>>>    For backward compatibility, the default behavior is to return all
>>>>>    split IDs (i.e., no filtering).
>>>>>    4. Finalize Registration and Resume Normal Operation
>>>>>    When the SourceOperator receives the ReaderRegistrationACKEvent,
>>>>>    it assigns the confirmed splits to the reader and transitions its 
>>>>> state to
>>>>>    REGISTERED. From this point onward, SourceOperator#snapshotState can
>>>>>    safely update the operator state.
>>>>>
>>>>>
>>>>> Best,
>>>>> Hongshun
>>>>>
>>>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <becket....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> SourceCoordinator doesn't store splits that have already been
>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits only 
>>>>>>> for
>>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you mean
>>>>>>> SourceOperator?
>>>>>>
>>>>>> Yes, I meant SourceOperator.
>>>>>>
>>>>>> At the beginning, I also thought about using it. However, there are
>>>>>>> two situations:
>>>>>>> 1. During restart, if source options remove a topic or table:
>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after 
>>>>>>> restart
>>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the 
>>>>>>> configured
>>>>>>> topics change, removed topic's splits are still read. I also want to do 
>>>>>>> the
>>>>>>> same thing in Kafka.
>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be
>>>>>>> removed after restart.
>>>>>>> In these cases, I have to get the assigned splits after
>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator
>>>>>>> directly.
>>>>>>
>>>>>>
>>>>>> One principle we want to follow is that the enumerator should be the
>>>>>> brain doing the splits assignment, while the source readers read from the
>>>>>> assigned splits. So we want to avoid the case where the SourceReader
>>>>>> ignores the split assignment. Given this principle,
>>>>>> For case 1, if there is a subscription change, it might be better to
>>>>>> hold back calling SourceReader.addSplits() until an assignment is 
>>>>>> confirmed
>>>>>> by the Enumerator. In fact, this might be a good default behavior
>>>>>> regardless of whether there is a subscription change.
>>>>>> For case 2: if a bounded split is finished, the
>>>>>> SourceReader.snapshotState() will not contain that split. So upon
>>>>>> restoration, those splits should not appear, right?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jiangjie (Becket) Qin
>>>>>>
>>>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <loserwang1...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Becket,
>>>>>>>
>>>>>>> Thank you a lot for your advice, which helped me a lot.
>>>>>>> >  It seems that we don't need the method `SourceReader.
>>>>>>> getAssignedSplits()`. The assigned splits are available in the
>>>>>>> SourceCoordinator upon state restoration.
>>>>>>>
>>>>>>>  SourceCoordinator doesn't store splits that have already been
>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits only 
>>>>>>> for
>>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you mean
>>>>>>> SourceOperator?
>>>>>>>
>>>>>>> At the beginning, I also thought about using it. However, there are
>>>>>>> two situations:
>>>>>>> 1. During restart, if source options remove a topic or table:
>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after 
>>>>>>> restart
>>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the 
>>>>>>> configured
>>>>>>> topics change, removed topic's splits are still read. I also want to do 
>>>>>>> the
>>>>>>> same thing in Kafka.
>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be
>>>>>>> removed after restart.
>>>>>>> In these cases, I have to get the assigned splits after
>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator
>>>>>>> directly.
>>>>>>>
>>>>>>> >  By design, the SplitEnumerator can get the reader information any
>>>>>>> time from the `SplitEnumeratorContext.registeredReaders()`.
>>>>>>> It looks good.
>>>>>>>
>>>>>>> Thanks again.
>>>>>>>
>>>>>>> Best,
>>>>>>> Hongshun
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <becket....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Hongshun,
>>>>>>>>
>>>>>>>> Thanks for the proposal. The current Kafka split assignment
>>>>>>>> algorithm does seem to have issues. (I cannot recall why it was 
>>>>>>>> implemented
>>>>>>>> this way at that time...).
>>>>>>>>
>>>>>>>> Two quick comments:
>>>>>>>> 1. It seems that we don't need the method `SourceReader.
>>>>>>>> getAssignedSplits()`. The assigned splits are available in the
>>>>>>>> SourceCoordinator upon state restoration and can be put into the
>>>>>>>> ReaderRegistrationEvent.
>>>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int
>>>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of
>>>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the 
>>>>>>>> SplitEnumerator
>>>>>>>> can get the reader information any time from the
>>>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the
>>>>>>>> Enumerator implementation to remember the initially assigned splits, 
>>>>>>>> if it
>>>>>>>> wants to wait until all the readers are registered. This also allow 
>>>>>>>> future
>>>>>>>> addition of reader information.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang <
>>>>>>>> loserwang1...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Anyone familiar with kafka connector can help review this FLIP? I
>>>>>>>>> am looking forward for your reply.
>>>>>>>>>
>>>>>>>>> Best
>>>>>>>>> Hongshun
>>>>>>>>>
>>>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <xbjt...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Hongshun for driving this work.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> We also suffering the issue in production Kafka restoration
>>>>>>>>>> usage, current design is a nice tradeoff and has considered the new 
>>>>>>>>>> Source
>>>>>>>>>> implementation details, +1 from my side.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Leonard
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <loserwang1...@gmail.com> 写道:
>>>>>>>>>> >
>>>>>>>>>> > Hi devs,
>>>>>>>>>> >
>>>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator
>>>>>>>>>> with Global
>>>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment]
>>>>>>>>>> [1], which
>>>>>>>>>> > addresses critical limitations in our current Kafka connector
>>>>>>>>>> split
>>>>>>>>>> > distribution mechanism.
>>>>>>>>>> >
>>>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios currently
>>>>>>>>>> lead to
>>>>>>>>>> > uneven Kafka split distribution, causing reader delays and
>>>>>>>>>> performance
>>>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack of
>>>>>>>>>> visibility
>>>>>>>>>> > into post-assignment split distribution.
>>>>>>>>>> >
>>>>>>>>>> > This flip does two things:
>>>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should
>>>>>>>>>> send
>>>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after
>>>>>>>>>> startup to
>>>>>>>>>> > ensure state consistency.
>>>>>>>>>> > 2. Implementation in the Kafka connector to resolve imbalanced
>>>>>>>>>> splits and
>>>>>>>>>> > state awareness during recovery (the enumerator will always
>>>>>>>>>> choose the
>>>>>>>>>> > least assigned subtask,and reason aslo as follows)
>>>>>>>>>> >
>>>>>>>>>> > Any additional questions regarding this FLIP? Looking forward
>>>>>>>>>> to hearing
>>>>>>>>>> > from you.
>>>>>>>>>> >
>>>>>>>>>> > Best
>>>>>>>>>> > Hongshun
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > [1]
>>>>>>>>>> >
>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762
>>>>>>>>>>
>>>>>>>>>>
>

Reply via email to