Feng Jiajie created FLINK-33360: ----------------------------------- Summary: HybridSource fails to clear the previous round's state when switching sources, leading to data loss Key: FLINK-33360 URL: https://issues.apache.org/jira/browse/FLINK-33360 Project: Flink Issue Type: Bug Components: Connectors / HybridSource Affects Versions: 1.17.1, 1.16.2 Reporter: Feng Jiajie Fix For: 1.7.3
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator: {code:java} // track readers that have finished processing for current enumerator finishedReaders.add(subtaskId); if (finishedReaders.size() == context.currentParallelism()) { LOG.debug("All readers finished, ready to switch enumerator!"); if (currentSourceIndex + 1 < sources.size()) { switchEnumerator(); // switch all readers prior to sending split assignments for (int i = 0; i < context.currentParallelism(); i++) { sendSwitchSourceEvent(i, currentSourceIndex); } } } {code} I think that *finishedReaders* is used to keep track of all the subTaskIds that have finished reading the current round of the source. Therefore, in the *switchEnumerator* function, *finishedReaders* should be cleared: If it's not cleared, then in the next source reading, whenever any SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders may not have finished processing in parallel), the condition *finishedReaders.size() == context.currentParallelism()* will be satisfied and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), sending a *SwitchSourceEvent* to all SourceReaders. If a SourceReader receives a SwitchSourceEvent before it finishes reading the previous source, it will execute {*}currentReader.close(){*}, and some data may not be fully read, resulting in a partial data loss in the source. -- This message was sent by Atlassian Jira (v8.20.10#820010)