Hi Becket, I think that's a great idea! I have added the SupportSplitReassignmentOnRecovery interface in this FLIP. If a Source implements this interface indicates that the source operator needs to report splits to the enumerator and receive reassignment.[1]
Best, Hongshun [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected]> wrote: > Hi Hongshun, > > I think the convention for such optional features in Source is via mix-in > interfaces. So instead of adding a method to the SourceReader, maybe we > should introduce an interface SupportSplitReassingmentOnRecovery with this > method. If a Source implementation implements that interface, then the > SourceOperator will check the desired behavior and act accordingly. > > Thanks, > > Jiangjie (Becket) Qin > > On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <[email protected]> > wrote: > >> Hi de vs, >> >> Would anyone like to discuss this FLIP? I'd appreciate your feedback and >> suggestions. >> >> Best, >> Hongshun >> >> >> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道: >> >> Hi Becket, >> >> Thank you for your detailed feedback. The new contract makes good sense >> to me and effectively addresses the issues I encountered at the beginning >> of the design. >> >> That said, I recommend not reporting splits by default, primarily for >> compatibility and practical reasons: >> >> > For these reasons, we do not expect the Split objects to be huge, and >> we are not trying to design for huge Split objects either as they will have >> problems even today. >> >> 1. >> >> Not all existing connector match this rule >> For example, in mysql cdc connector, a binlog split may contain >> hundreds (or even more) snapshot split completion records. This state is >> large and is currently transmitted incrementally through multiple >> BinlogSplitMetaEvent messages. Since the binlog reader operates with >> single >> parallelism, reporting the full split state on recovery could be >> inefficient or even infeasible. >> For such sources, it would be better to provide a mechanism to skip >> split reporting during restart until they redesign and reduce the >> split size. >> 2. >> >> Not all enumerators maintain unassigned splits in state. >> Some SplitEnumerator(such as kafka connector) implementations do not >> track or persistently manage unassigned splits. Requiring them to handle >> re-registration would add unnecessary complexity. Even though we maybe >> implements in kafka connector, currently, kafka connector is decouple with >> flink version, we also need to make sure the elder version is compatible. >> >> ------------------------------ >> >> To address these concerns, I propose introducing a new method: boolean >> SourceReader#shouldReassignSplitsOnRecovery() with a default >> implementation returning false. This allows source readers to opt in to >> split reassignment only when necessary. Since the new contract already >> places the responsibility for split assignment on the enumerator, not >> reporting splits by default is a safe and clean default behavior. >> >> >> ------------------------------ >> >> >> I’ve updated the implementation and the FIP accordingly[1]. It quite a >> big change. In particular, for the Kafka connector, we can now use a >> pluggable SplitPartitioner to support different split assignment >> strategies (e.g., default, round-robin). >> >> >> Could you please review it when you have a chance? >> >> >> Best, >> >> Hongshun >> >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >> >> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected]> wrote: >> >>> Hi Hongshun, >>> >>> I am not too concerned about the transmission cost. Because the full >>> split transmission has to happen in the initial assignment phase already. >>> And in the future, we probably want to also introduce some kind of workload >>> balance across source readers, e.g. based on the per-split throughput or >>> the per-source-reader workload in heterogeneous clusters. For these >>> reasons, we do not expect the Split objects to be huge, and we are not >>> trying to design for huge Split objects either as they will have problems >>> even today. >>> >>> Good point on the potential split loss, please see the reply below: >>> >>> Scenario 2: >>> >>> >>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) >>>> upon restart. >>>> 2. Before the enumerator receives all reports and performs >>>> reassignment, a checkpoint is triggered. >>>> 3. Since no splits have been reassigned yet, both readers have empty >>>> states. >>>> 4. When restarting from this checkpoint, all four splits are lost. >>> >>> The reader registration happens in the SourceOperator.open(), which >>> means the task is still in the initializing state, therefore the checkpoint >>> should not be triggered until the enumerator receives all the split reports. >>> >>> There is a nuance here. Today, the RPC call from the TM to the JM is >>> async. So it is possible that the SourceOpertor.open() has returned, but >>> the enumerator has not received the split reports. However, because the >>> task status update RPC call goes to the same channel as the split reports >>> call, so the task status RPC call will happen after the split reports call >>> on the JM side. Therefore, on the JM side, the SourceCoordinator will >>> always first receive the split reports, then receive the checkpoint request. >>> This "happen before" relationship is kind of important to guarantee the >>> consistent state between enumerator and readers. >>> >>> Scenario 1: >>> >>> >>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and Reader >>>> B reports (3 and 4). >>>> 2. The enumerator receives these reports but only reassigns splits 1 >>>> and 2 — not 3 and 4. >>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 are >>>> recorded in the reader states; splits 3 and 4 are not persisted. >>>> 4. If the job is later restarted from this checkpoint, splits 3 and 4 >>>> will be permanently lost. >>> >>> This scenario is possible. One solution is to let the enumerator >>> implementation handle this. That means if the enumerator relies on the >>> initial split reports from the source readers, it should maintain these >>> reports by itself. In the above example, the enumerator will need to >>> remember that 3 and 4 are not assigned and put it into its own state. >>> The current contract is that anything assigned to the SourceReaders >>> are completely owned by the SourceReaders. Enumerators can remember the >>> assignments but cannot change them, even when the source reader recovers / >>> restarts. >>> With this FLIP, the contract becomes that the source readers will return >>> the ownership of the splits to the enumerator. So the enumerator is >>> responsible for maintaining these splits, until they are assigned to a >>> source reader again. >>> >>> There are other cases where there may be conflict information between >>> reader and enumerator. For example, consider the following sequence: >>> 1. reader A reports splits (1 and 2) up on restart. >>> 2. enumerator receives the report and assigns both 1 and 2 to reader B. >>> 3. reader A failed before checkpointing. And this is a partial failure, >>> so only reader A restarts. >>> 4. When reader A recovers, it will again report splits (1 and 2) to the >>> enumerator. >>> 5. The enumerator should ignore this report because it has >>> assigned splits (1 and 2) to reader B. >>> >>> So with the new contract, the enumerator should be the source of truth >>> for split ownership. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <[email protected]> >>> wrote: >>> >>>> Hi Becket, >>>> >>>> >>>> I did consider this approach at the beginning (and it was also >>>> mentioned in this FLIP), since it would allow more flexibility in >>>> reassigning all splits. However, there are a few potential issues. >>>> >>>> 1. High Transmission Cost >>>> If we pass the full split objects (rather than just split IDs), the >>>> data size could be significant, leading to high overhead during >>>> transmission — especially when many splits are involved. >>>> >>>> 2. Risk of Split Loss >>>> >>>> Risk of split loss exists unless we have a mechanism to make sure only >>>> can checkpoint after all the splits are reassigned. >>>> There are scenarios where splits could be lost due to inconsistent >>>> state handling during recovery: >>>> >>>> >>>> Scenario 1: >>>> >>>> >>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and >>>> Reader B reports (3 and 4). >>>> 2. The enumerator receives these reports but only reassigns splits >>>> 1 and 2 — not 3 and 4. >>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 >>>> are recorded in the reader states; splits 3 and 4 are not persisted. >>>> 4. If the job is later restarted from this checkpoint, splits 3 and >>>> 4 will be permanently lost. >>>> >>>> >>>> Scenario 2: >>>> >>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and >>>> 4) upon restart. >>>> 2. Before the enumerator receives all reports and performs >>>> reassignment, a checkpoint is triggered. >>>> 3. Since no splits have been reassigned yet, both readers have >>>> empty states. >>>> 4. When restarting from this checkpoint, all four splits are lost. >>>> >>>> >>>> Let me know if you have thoughts on how we might mitigate these risks! >>>> >>>> Best >>>> Hongshun >>>> >>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected]> wrote: >>>> >>>>> Hi Hongshun, >>>>> >>>>> The steps sound reasonable to me in general. In terms of the updated >>>>> FLIP wiki, it would be good to see if we can keep the protocol simple. One >>>>> alternative way to achieve this behavior is following: >>>>> >>>>> 1. Upon SourceOperator startup, the SourceOperator sends >>>>> ReaderRegistrationEvent with the currently assigned splits to the >>>>> enumerator. It does not add these splits to the SourceReader. >>>>> 2. The enumerator will always use the >>>>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via the >>>>> response of the SourceRegistrationEvent, this allows async split >>>>> assignment >>>>> in case the enumerator wants to wait until all the readers are registered) >>>>> 3. The SourceOperator will only call SourceReader.addSplits() when it >>>>> receives the AddSplitEvent from the enumerator. >>>>> >>>>> This protocol has a few benefits: >>>>> 1. it basically allows arbitrary split reassignment upon restart >>>>> 2. simplicity: there is only one way to assign splits. >>>>> >>>>> So we only need one interface change: >>>>> - add the initially assigned splits to ReaderInfo so the Enumerator >>>>> can access it. >>>>> and one behavior change: >>>>> - The SourceOperator should stop assigning splits to the from state >>>>> restoration, but only do that when it receives AddSplitsEvent from the >>>>> enumerator. >>>>> >>>>> The enumerator story is also simple: >>>>> 1. Receive some kind of notification (new partition, new reader, etc) >>>>> 2. look at the reader information (in the enumerator context or >>>>> self-maintained state) >>>>> 3. assign splits via the enumerator context. >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Becket, >>>>>> Thanks for your advice — I’ve quickly learned a lot about the >>>>>> reader’s design principle. It’s really interesting! >>>>>> >>>>>> > One principle we want to follow is that the enumerator should be >>>>>> the brain doing the splits assignment, while the source readers read from >>>>>> the assigned splits. So we want to avoid the case where the SourceReader >>>>>> ignores the split assignment. >>>>>> >>>>>> It appears that MySQL CDC currently bypasses this principle by >>>>>> proactively removing unused splits directly in the SourceReader. This may >>>>>> be due to the lack of built-in framework support for such cleanup, >>>>>> forcing >>>>>> connectors to handle it manually. However, this responsibility ideally >>>>>> belongs in the framework. >>>>>> >>>>>> With this FLIP, we propose a redesigned mechanism that centralizes >>>>>> split cleanup logic in the SplitEnumerator, allowing connectors like >>>>>> MySQL >>>>>> CDC to eventually adopt it( @leneord, CC). >>>>>> >>>>>> To achieve this, we must carefully manage state consistency during >>>>>> startup and recovery. The proposed approach is as follows: >>>>>> >>>>>> 1. Reader Registration with Deferred Assignment >>>>>> When a reader starts (SourceOperator#open), it sends a >>>>>> ReaderRegistrationEvent to the SplitEnumerator, including its >>>>>> previously assigned splits (restored from state). However, these >>>>>> splits >>>>>> are not yet assigned to the reader. The SourceOperator is placed >>>>>> in a PENDING state. >>>>>> 2. Prevent State Pollution During Registration >>>>>> While in the PENDING state, SourceOperator#snapshotState will not >>>>>> update the operator state. This prevents empty or outdated reader >>>>>> state >>>>>> (e.g., with removed splits) from polluting the checkpoint. >>>>>> 3. Enumerator Performs Split Cleanup and Acknowledges >>>>>> Upon receiving the ReaderRegistrationEvent, the SplitEnumerator >>>>>> removes >>>>>> any splits that are no longer valid (e.g., due to removed topics or >>>>>> tables) >>>>>> and returns the list of remaining valid split IDs to the reader via a >>>>>> ReaderRegistrationACKEvent. >>>>>> For backward compatibility, the default behavior is to return all >>>>>> split IDs (i.e., no filtering). >>>>>> 4. Finalize Registration and Resume Normal Operation >>>>>> When the SourceOperator receives the ReaderRegistrationACKEvent, >>>>>> it assigns the confirmed splits to the reader and transitions its >>>>>> state to >>>>>> REGISTERED. From this point onward, SourceOperator#snapshotState can >>>>>> safely update the operator state. >>>>>> >>>>>> >>>>>> Best, >>>>>> Hongshun >>>>>> >>>>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> SourceCoordinator doesn't store splits that have already been >>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits only >>>>>>>> for >>>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you mean >>>>>>>> SourceOperator? >>>>>>> >>>>>>> Yes, I meant SourceOperator. >>>>>>> >>>>>>> At the beginning, I also thought about using it. However, there are >>>>>>>> two situations: >>>>>>>> 1. During restart, if source options remove a topic or table: >>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after >>>>>>>> restart >>>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the >>>>>>>> configured >>>>>>>> topics change, removed topic's splits are still read. I also want to >>>>>>>> do the >>>>>>>> same thing in Kafka. >>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>>>> removed after restart. >>>>>>>> In these cases, I have to get the assigned splits after >>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>>>> directly. >>>>>>> >>>>>>> >>>>>>> One principle we want to follow is that the enumerator should be the >>>>>>> brain doing the splits assignment, while the source readers read from >>>>>>> the >>>>>>> assigned splits. So we want to avoid the case where the SourceReader >>>>>>> ignores the split assignment. Given this principle, >>>>>>> For case 1, if there is a subscription change, it might be better to >>>>>>> hold back calling SourceReader.addSplits() until an assignment is >>>>>>> confirmed >>>>>>> by the Enumerator. In fact, this might be a good default behavior >>>>>>> regardless of whether there is a subscription change. >>>>>>> For case 2: if a bounded split is finished, the >>>>>>> SourceReader.snapshotState() will not contain that split. So upon >>>>>>> restoration, those splits should not appear, right? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi Becket, >>>>>>>> >>>>>>>> Thank you a lot for your advice, which helped me a lot. >>>>>>>> > It seems that we don't need the method `SourceReader. >>>>>>>> getAssignedSplits()`. The assigned splits are available in the >>>>>>>> SourceCoordinator upon state restoration. >>>>>>>> >>>>>>>> SourceCoordinator doesn't store splits that have already been >>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits only >>>>>>>> for >>>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you mean >>>>>>>> SourceOperator? >>>>>>>> >>>>>>>> At the beginning, I also thought about using it. However, there are >>>>>>>> two situations: >>>>>>>> 1. During restart, if source options remove a topic or table: >>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after >>>>>>>> restart >>>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the >>>>>>>> configured >>>>>>>> topics change, removed topic's splits are still read. I also want to >>>>>>>> do the >>>>>>>> same thing in Kafka. >>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>>>> removed after restart. >>>>>>>> In these cases, I have to get the assigned splits after >>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>>>> directly. >>>>>>>> >>>>>>>> > By design, the SplitEnumerator can get the reader information >>>>>>>> any time from the `SplitEnumeratorContext.registeredReaders()`. >>>>>>>> It looks good. >>>>>>>> >>>>>>>> Thanks again. >>>>>>>> >>>>>>>> Best, >>>>>>>> Hongshun >>>>>>>> >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238 >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Hongshun, >>>>>>>>> >>>>>>>>> Thanks for the proposal. The current Kafka split assignment >>>>>>>>> algorithm does seem to have issues. (I cannot recall why it was >>>>>>>>> implemented >>>>>>>>> this way at that time...). >>>>>>>>> >>>>>>>>> Two quick comments: >>>>>>>>> 1. It seems that we don't need the method `SourceReader. >>>>>>>>> getAssignedSplits()`. The assigned splits are available in the >>>>>>>>> SourceCoordinator upon state restoration and can be put into the >>>>>>>>> ReaderRegistrationEvent. >>>>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int >>>>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of >>>>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the >>>>>>>>> SplitEnumerator >>>>>>>>> can get the reader information any time from the >>>>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the >>>>>>>>> Enumerator implementation to remember the initially assigned splits, >>>>>>>>> if it >>>>>>>>> wants to wait until all the readers are registered. This also allow >>>>>>>>> future >>>>>>>>> addition of reader information. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Anyone familiar with kafka connector can help review this FLIP? I >>>>>>>>>> am looking forward for your reply. >>>>>>>>>> >>>>>>>>>> Best >>>>>>>>>> Hongshun >>>>>>>>>> >>>>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks Hongshun for driving this work. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> We also suffering the issue in production Kafka restoration >>>>>>>>>>> usage, current design is a nice tradeoff and has considered the new >>>>>>>>>>> Source >>>>>>>>>>> implementation details, +1 from my side. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Leonard >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <[email protected]> 写道: >>>>>>>>>>> > >>>>>>>>>>> > Hi devs, >>>>>>>>>>> > >>>>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator >>>>>>>>>>> with Global >>>>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment] >>>>>>>>>>> [1], which >>>>>>>>>>> > addresses critical limitations in our current Kafka connector >>>>>>>>>>> split >>>>>>>>>>> > distribution mechanism. >>>>>>>>>>> > >>>>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios >>>>>>>>>>> currently lead to >>>>>>>>>>> > uneven Kafka split distribution, causing reader delays and >>>>>>>>>>> performance >>>>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack >>>>>>>>>>> of visibility >>>>>>>>>>> > into post-assignment split distribution. >>>>>>>>>>> > >>>>>>>>>>> > This flip does two things: >>>>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should >>>>>>>>>>> send >>>>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after >>>>>>>>>>> startup to >>>>>>>>>>> > ensure state consistency. >>>>>>>>>>> > 2. Implementation in the Kafka connector to resolve imbalanced >>>>>>>>>>> splits and >>>>>>>>>>> > state awareness during recovery (the enumerator will always >>>>>>>>>>> choose the >>>>>>>>>>> > least assigned subtask,and reason aslo as follows) >>>>>>>>>>> > >>>>>>>>>>> > Any additional questions regarding this FLIP? Looking forward >>>>>>>>>>> to hearing >>>>>>>>>>> > from you. >>>>>>>>>>> > >>>>>>>>>>> > Best >>>>>>>>>>> > Hongshun >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > [1] >>>>>>>>>>> > >>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762 >>>>>>>>>>> >>>>>>>>>>> >>
