Hi Hongshun, I think the convention for such optional features in Source is via mix-in interfaces. So instead of adding a method to the SourceReader, maybe we should introduce an interface SupportSplitReassingmentOnRecovery with this method. If a Source implementation implements that interface, then the SourceOperator will check the desired behavior and act accordingly.
Thanks, Jiangjie (Becket) Qin On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <loserwang1...@gmail.com> wrote: > Hi de vs, > > Would anyone like to discuss this FLIP? I'd appreciate your feedback and > suggestions. > > Best, > Hongshun > > > 2025年8月13日 14:23,Hongshun Wang <loserwang1...@gmail.com> 写道: > > Hi Becket, > > Thank you for your detailed feedback. The new contract makes good sense to > me and effectively addresses the issues I encountered at the beginning of > the design. > > That said, I recommend not reporting splits by default, primarily for > compatibility and practical reasons: > > > For these reasons, we do not expect the Split objects to be huge, and > we are not trying to design for huge Split objects either as they will have > problems even today. > > 1. > > Not all existing connector match this rule > For example, in mysql cdc connector, a binlog split may contain > hundreds (or even more) snapshot split completion records. This state is > large and is currently transmitted incrementally through multiple > BinlogSplitMetaEvent messages. Since the binlog reader operates with single > parallelism, reporting the full split state on recovery could be > inefficient or even infeasible. > For such sources, it would be better to provide a mechanism to skip > split reporting during restart until they redesign and reduce the > split size. > 2. > > Not all enumerators maintain unassigned splits in state. > Some SplitEnumerator(such as kafka connector) implementations do not > track or persistently manage unassigned splits. Requiring them to handle > re-registration would add unnecessary complexity. Even though we maybe > implements in kafka connector, currently, kafka connector is decouple with > flink version, we also need to make sure the elder version is compatible. > > ------------------------------ > > To address these concerns, I propose introducing a new method: boolean > SourceReader#shouldReassignSplitsOnRecovery() with a default > implementation returning false. This allows source readers to opt in to > split reassignment only when necessary. Since the new contract already > places the responsibility for split assignment on the enumerator, not > reporting splits by default is a safe and clean default behavior. > > > ------------------------------ > > > I’ve updated the implementation and the FIP accordingly[1]. It quite a big > change. In particular, for the Kafka connector, we can now use a pluggable > SplitPartitioner to support different split assignment strategies (e.g., > default, round-robin). > > > Could you please review it when you have a chance? > > > Best, > > Hongshun > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment > > On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <becket....@gmail.com> wrote: > >> Hi Hongshun, >> >> I am not too concerned about the transmission cost. Because the full >> split transmission has to happen in the initial assignment phase already. >> And in the future, we probably want to also introduce some kind of workload >> balance across source readers, e.g. based on the per-split throughput or >> the per-source-reader workload in heterogeneous clusters. For these >> reasons, we do not expect the Split objects to be huge, and we are not >> trying to design for huge Split objects either as they will have problems >> even today. >> >> Good point on the potential split loss, please see the reply below: >> >> Scenario 2: >> >> >>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) >>> upon restart. >>> 2. Before the enumerator receives all reports and performs reassignment, >>> a checkpoint is triggered. >>> 3. Since no splits have been reassigned yet, both readers have empty >>> states. >>> 4. When restarting from this checkpoint, all four splits are lost. >> >> The reader registration happens in the SourceOperator.open(), which means >> the task is still in the initializing state, therefore the checkpoint >> should not be triggered until the enumerator receives all the split reports. >> >> There is a nuance here. Today, the RPC call from the TM to the JM is >> async. So it is possible that the SourceOpertor.open() has returned, but >> the enumerator has not received the split reports. However, because the >> task status update RPC call goes to the same channel as the split reports >> call, so the task status RPC call will happen after the split reports call >> on the JM side. Therefore, on the JM side, the SourceCoordinator will >> always first receive the split reports, then receive the checkpoint request. >> This "happen before" relationship is kind of important to guarantee the >> consistent state between enumerator and readers. >> >> Scenario 1: >> >> >>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and Reader >>> B reports (3 and 4). >>> 2. The enumerator receives these reports but only reassigns splits 1 and >>> 2 — not 3 and 4. >>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 are >>> recorded in the reader states; splits 3 and 4 are not persisted. >>> 4. If the job is later restarted from this checkpoint, splits 3 and 4 >>> will be permanently lost. >> >> This scenario is possible. One solution is to let the enumerator >> implementation handle this. That means if the enumerator relies on the >> initial split reports from the source readers, it should maintain these >> reports by itself. In the above example, the enumerator will need to >> remember that 3 and 4 are not assigned and put it into its own state. >> The current contract is that anything assigned to the SourceReaders >> are completely owned by the SourceReaders. Enumerators can remember the >> assignments but cannot change them, even when the source reader recovers / >> restarts. >> With this FLIP, the contract becomes that the source readers will return >> the ownership of the splits to the enumerator. So the enumerator is >> responsible for maintaining these splits, until they are assigned to a >> source reader again. >> >> There are other cases where there may be conflict information between >> reader and enumerator. For example, consider the following sequence: >> 1. reader A reports splits (1 and 2) up on restart. >> 2. enumerator receives the report and assigns both 1 and 2 to reader B. >> 3. reader A failed before checkpointing. And this is a partial failure, >> so only reader A restarts. >> 4. When reader A recovers, it will again report splits (1 and 2) to the >> enumerator. >> 5. The enumerator should ignore this report because it has >> assigned splits (1 and 2) to reader B. >> >> So with the new contract, the enumerator should be the source of truth >> for split ownership. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <loserwang1...@gmail.com> >> wrote: >> >>> Hi Becket, >>> >>> >>> I did consider this approach at the beginning (and it was also mentioned >>> in this FLIP), since it would allow more flexibility in reassigning all >>> splits. However, there are a few potential issues. >>> >>> 1. High Transmission Cost >>> If we pass the full split objects (rather than just split IDs), the data >>> size could be significant, leading to high overhead during transmission — >>> especially when many splits are involved. >>> >>> 2. Risk of Split Loss >>> >>> Risk of split loss exists unless we have a mechanism to make sure only >>> can checkpoint after all the splits are reassigned. >>> There are scenarios where splits could be lost due to inconsistent state >>> handling during recovery: >>> >>> >>> Scenario 1: >>> >>> >>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and >>> Reader B reports (3 and 4). >>> 2. The enumerator receives these reports but only reassigns splits 1 >>> and 2 — not 3 and 4. >>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 >>> are recorded in the reader states; splits 3 and 4 are not persisted. >>> 4. If the job is later restarted from this checkpoint, splits 3 and >>> 4 will be permanently lost. >>> >>> >>> Scenario 2: >>> >>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) >>> upon restart. >>> 2. Before the enumerator receives all reports and performs >>> reassignment, a checkpoint is triggered. >>> 3. Since no splits have been reassigned yet, both readers have empty >>> states. >>> 4. When restarting from this checkpoint, all four splits are lost. >>> >>> >>> Let me know if you have thoughts on how we might mitigate these risks! >>> >>> Best >>> Hongshun >>> >>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <becket....@gmail.com> wrote: >>> >>>> Hi Hongshun, >>>> >>>> The steps sound reasonable to me in general. In terms of the updated >>>> FLIP wiki, it would be good to see if we can keep the protocol simple. One >>>> alternative way to achieve this behavior is following: >>>> >>>> 1. Upon SourceOperator startup, the SourceOperator sends >>>> ReaderRegistrationEvent with the currently assigned splits to the >>>> enumerator. It does not add these splits to the SourceReader. >>>> 2. The enumerator will always use the >>>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via the >>>> response of the SourceRegistrationEvent, this allows async split assignment >>>> in case the enumerator wants to wait until all the readers are registered) >>>> 3. The SourceOperator will only call SourceReader.addSplits() when it >>>> receives the AddSplitEvent from the enumerator. >>>> >>>> This protocol has a few benefits: >>>> 1. it basically allows arbitrary split reassignment upon restart >>>> 2. simplicity: there is only one way to assign splits. >>>> >>>> So we only need one interface change: >>>> - add the initially assigned splits to ReaderInfo so the Enumerator can >>>> access it. >>>> and one behavior change: >>>> - The SourceOperator should stop assigning splits to the from state >>>> restoration, but only do that when it receives AddSplitsEvent from the >>>> enumerator. >>>> >>>> The enumerator story is also simple: >>>> 1. Receive some kind of notification (new partition, new reader, etc) >>>> 2. look at the reader information (in the enumerator context or >>>> self-maintained state) >>>> 3. assign splits via the enumerator context. >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <loserwang1...@gmail.com> >>>> wrote: >>>> >>>>> Hi Becket, >>>>> Thanks for your advice — I’ve quickly learned a lot about the reader’s >>>>> design principle. It’s really interesting! >>>>> >>>>> > One principle we want to follow is that the enumerator should be the >>>>> brain doing the splits assignment, while the source readers read from the >>>>> assigned splits. So we want to avoid the case where the SourceReader >>>>> ignores the split assignment. >>>>> >>>>> It appears that MySQL CDC currently bypasses this principle by >>>>> proactively removing unused splits directly in the SourceReader. This may >>>>> be due to the lack of built-in framework support for such cleanup, forcing >>>>> connectors to handle it manually. However, this responsibility ideally >>>>> belongs in the framework. >>>>> >>>>> With this FLIP, we propose a redesigned mechanism that centralizes >>>>> split cleanup logic in the SplitEnumerator, allowing connectors like MySQL >>>>> CDC to eventually adopt it( @leneord, CC). >>>>> >>>>> To achieve this, we must carefully manage state consistency during >>>>> startup and recovery. The proposed approach is as follows: >>>>> >>>>> 1. Reader Registration with Deferred Assignment >>>>> When a reader starts (SourceOperator#open), it sends a >>>>> ReaderRegistrationEvent to the SplitEnumerator, including its >>>>> previously assigned splits (restored from state). However, these splits >>>>> are not yet assigned to the reader. The SourceOperator is placed >>>>> in a PENDING state. >>>>> 2. Prevent State Pollution During Registration >>>>> While in the PENDING state, SourceOperator#snapshotState will not >>>>> update the operator state. This prevents empty or outdated reader state >>>>> (e.g., with removed splits) from polluting the checkpoint. >>>>> 3. Enumerator Performs Split Cleanup and Acknowledges >>>>> Upon receiving the ReaderRegistrationEvent, the SplitEnumerator removes >>>>> any splits that are no longer valid (e.g., due to removed topics or >>>>> tables) >>>>> and returns the list of remaining valid split IDs to the reader via a >>>>> ReaderRegistrationACKEvent. >>>>> For backward compatibility, the default behavior is to return all >>>>> split IDs (i.e., no filtering). >>>>> 4. Finalize Registration and Resume Normal Operation >>>>> When the SourceOperator receives the ReaderRegistrationACKEvent, >>>>> it assigns the confirmed splits to the reader and transitions its >>>>> state to >>>>> REGISTERED. From this point onward, SourceOperator#snapshotState can >>>>> safely update the operator state. >>>>> >>>>> >>>>> Best, >>>>> Hongshun >>>>> >>>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <becket....@gmail.com> >>>>> wrote: >>>>> >>>>>> SourceCoordinator doesn't store splits that have already been >>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits only >>>>>>> for >>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you mean >>>>>>> SourceOperator? >>>>>> >>>>>> Yes, I meant SourceOperator. >>>>>> >>>>>> At the beginning, I also thought about using it. However, there are >>>>>>> two situations: >>>>>>> 1. During restart, if source options remove a topic or table: >>>>>>> sometimes connectors like MySQL CDC will remove unused splits after >>>>>>> restart >>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the >>>>>>> configured >>>>>>> topics change, removed topic's splits are still read. I also want to do >>>>>>> the >>>>>>> same thing in Kafka. >>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>>> removed after restart. >>>>>>> In these cases, I have to get the assigned splits after >>>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>>> directly. >>>>>> >>>>>> >>>>>> One principle we want to follow is that the enumerator should be the >>>>>> brain doing the splits assignment, while the source readers read from the >>>>>> assigned splits. So we want to avoid the case where the SourceReader >>>>>> ignores the split assignment. Given this principle, >>>>>> For case 1, if there is a subscription change, it might be better to >>>>>> hold back calling SourceReader.addSplits() until an assignment is >>>>>> confirmed >>>>>> by the Enumerator. In fact, this might be a good default behavior >>>>>> regardless of whether there is a subscription change. >>>>>> For case 2: if a bounded split is finished, the >>>>>> SourceReader.snapshotState() will not contain that split. So upon >>>>>> restoration, those splits should not appear, right? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jiangjie (Becket) Qin >>>>>> >>>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <loserwang1...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Becket, >>>>>>> >>>>>>> Thank you a lot for your advice, which helped me a lot. >>>>>>> > It seems that we don't need the method `SourceReader. >>>>>>> getAssignedSplits()`. The assigned splits are available in the >>>>>>> SourceCoordinator upon state restoration. >>>>>>> >>>>>>> SourceCoordinator doesn't store splits that have already been >>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits only >>>>>>> for >>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you mean >>>>>>> SourceOperator? >>>>>>> >>>>>>> At the beginning, I also thought about using it. However, there are >>>>>>> two situations: >>>>>>> 1. During restart, if source options remove a topic or table: >>>>>>> sometimes connectors like MySQL CDC will remove unused splits after >>>>>>> restart >>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the >>>>>>> configured >>>>>>> topics change, removed topic's splits are still read. I also want to do >>>>>>> the >>>>>>> same thing in Kafka. >>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>>> removed after restart. >>>>>>> In these cases, I have to get the assigned splits after >>>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>>> directly. >>>>>>> >>>>>>> > By design, the SplitEnumerator can get the reader information any >>>>>>> time from the `SplitEnumeratorContext.registeredReaders()`. >>>>>>> It looks good. >>>>>>> >>>>>>> Thanks again. >>>>>>> >>>>>>> Best, >>>>>>> Hongshun >>>>>>> >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238 >>>>>>> >>>>>>> >>>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <becket....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Hongshun, >>>>>>>> >>>>>>>> Thanks for the proposal. The current Kafka split assignment >>>>>>>> algorithm does seem to have issues. (I cannot recall why it was >>>>>>>> implemented >>>>>>>> this way at that time...). >>>>>>>> >>>>>>>> Two quick comments: >>>>>>>> 1. It seems that we don't need the method `SourceReader. >>>>>>>> getAssignedSplits()`. The assigned splits are available in the >>>>>>>> SourceCoordinator upon state restoration and can be put into the >>>>>>>> ReaderRegistrationEvent. >>>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int >>>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of >>>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the >>>>>>>> SplitEnumerator >>>>>>>> can get the reader information any time from the >>>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the >>>>>>>> Enumerator implementation to remember the initially assigned splits, >>>>>>>> if it >>>>>>>> wants to wait until all the readers are registered. This also allow >>>>>>>> future >>>>>>>> addition of reader information. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Jiangjie (Becket) Qin >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang < >>>>>>>> loserwang1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Anyone familiar with kafka connector can help review this FLIP? I >>>>>>>>> am looking forward for your reply. >>>>>>>>> >>>>>>>>> Best >>>>>>>>> Hongshun >>>>>>>>> >>>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <xbjt...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks Hongshun for driving this work. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> We also suffering the issue in production Kafka restoration >>>>>>>>>> usage, current design is a nice tradeoff and has considered the new >>>>>>>>>> Source >>>>>>>>>> implementation details, +1 from my side. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Leonard >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <loserwang1...@gmail.com> 写道: >>>>>>>>>> > >>>>>>>>>> > Hi devs, >>>>>>>>>> > >>>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator >>>>>>>>>> with Global >>>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment] >>>>>>>>>> [1], which >>>>>>>>>> > addresses critical limitations in our current Kafka connector >>>>>>>>>> split >>>>>>>>>> > distribution mechanism. >>>>>>>>>> > >>>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios currently >>>>>>>>>> lead to >>>>>>>>>> > uneven Kafka split distribution, causing reader delays and >>>>>>>>>> performance >>>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack of >>>>>>>>>> visibility >>>>>>>>>> > into post-assignment split distribution. >>>>>>>>>> > >>>>>>>>>> > This flip does two things: >>>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should >>>>>>>>>> send >>>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after >>>>>>>>>> startup to >>>>>>>>>> > ensure state consistency. >>>>>>>>>> > 2. Implementation in the Kafka connector to resolve imbalanced >>>>>>>>>> splits and >>>>>>>>>> > state awareness during recovery (the enumerator will always >>>>>>>>>> choose the >>>>>>>>>> > least assigned subtask,and reason aslo as follows) >>>>>>>>>> > >>>>>>>>>> > Any additional questions regarding this FLIP? Looking forward >>>>>>>>>> to hearing >>>>>>>>>> > from you. >>>>>>>>>> > >>>>>>>>>> > Best >>>>>>>>>> > Hongshun >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > [1] >>>>>>>>>> > >>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762 >>>>>>>>>> >>>>>>>>>> >