Feng Jiajie created FLINK-33360:
-----------------------------------

             Summary: HybridSource fails to clear the previous round's state 
when switching sources, leading to data loss
                 Key: FLINK-33360
                 URL: https://issues.apache.org/jira/browse/FLINK-33360
             Project: Flink
          Issue Type: Bug
          Components: Connectors / HybridSource
    Affects Versions: 1.17.1, 1.16.2
            Reporter: Feng Jiajie
             Fix For: 1.7.3


org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
{code:java}
            // track readers that have finished processing for current 
enumerator
            finishedReaders.add(subtaskId);
            if (finishedReaders.size() == context.currentParallelism()) {
                LOG.debug("All readers finished, ready to switch enumerator!");
                if (currentSourceIndex + 1 < sources.size()) {
                    switchEnumerator();
                    // switch all readers prior to sending split assignments
                    for (int i = 0; i < context.currentParallelism(); i++) {
                        sendSwitchSourceEvent(i, currentSourceIndex);
                    }
                }
            } {code}
I think that *finishedReaders* is used to keep track of all the subTaskIds that 
have finished reading the current round of the source. Therefore, in the 
*switchEnumerator* function, *finishedReaders* should be cleared:

If it's not cleared, then in the next source reading, whenever any SourceReader 
reports a *SourceReaderFinishedEvent* (while other SourceReaders may not have 
finished processing in parallel), the condition *finishedReaders.size() == 
context.currentParallelism()* will be satisfied and it will trigger 
{*}sendSwitchSourceEvent{*}(i, currentSourceIndex), sending a 
*SwitchSourceEvent* to all SourceReaders.
If a SourceReader receives a SwitchSourceEvent before it finishes reading the 
previous source, it will execute {*}currentReader.close(){*}, and some data may 
not be fully read, resulting in a partial data loss in the source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to