Feng Jiajie created FLINK-33360:
-----------------------------------
Summary: HybridSource fails to clear the previous round's state
when switching sources, leading to data loss
Key: FLINK-33360
URL: https://issues.apache.org/jira/browse/FLINK-33360
Project: Flink
Issue Type: Bug
Components: Connectors / HybridSource
Affects Versions: 1.17.1, 1.16.2
Reporter: Feng Jiajie
Fix For: 1.7.3
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
{code:java}
// track readers that have finished processing for current
enumerator
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
LOG.debug("All readers finished, ready to switch enumerator!");
if (currentSourceIndex + 1 < sources.size()) {
switchEnumerator();
// switch all readers prior to sending split assignments
for (int i = 0; i < context.currentParallelism(); i++) {
sendSwitchSourceEvent(i, currentSourceIndex);
}
}
} {code}
I think that *finishedReaders* is used to keep track of all the subTaskIds that
have finished reading the current round of the source. Therefore, in the
*switchEnumerator* function, *finishedReaders* should be cleared:
If it's not cleared, then in the next source reading, whenever any SourceReader
reports a *SourceReaderFinishedEvent* (while other SourceReaders may not have
finished processing in parallel), the condition *finishedReaders.size() ==
context.currentParallelism()* will be satisfied and it will trigger
{*}sendSwitchSourceEvent{*}(i, currentSourceIndex), sending a
*SwitchSourceEvent* to all SourceReaders.
If a SourceReader receives a SwitchSourceEvent before it finishes reading the
previous source, it will execute {*}currentReader.close(){*}, and some data may
not be fully read, resulting in a partial data loss in the source.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)