[
https://issues.apache.org/jira/browse/FLINK-39645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39645:
-----------------------------------
Labels: pull-request-available (was: )
> HybridSourceReader.snapshotState() loses recovered splits when currentReader
> is null
> ------------------------------------------------------------------------------------
>
> Key: FLINK-39645
> URL: https://issues.apache.org/jira/browse/FLINK-39645
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HybridSource
> Reporter: Chen Zhang
> Priority: Minor
> Labels: pull-request-available
>
> h3. Summary
> HybridSourceReader.snapshotState() can return an empty split list during
> recovery, permanently losing splits stored in the restoredSplits field. This
> leads to silent data loss under repeated failover scenarios.
> h3. Reproduction Scenario
> 1. A Flink job using HybridSource takes a checkpoint successfully.
> 2. The job fails and restores from the checkpoint.
> 3. During recovery, addSplits() is called. Since currentSourceIndex == -1,
> splits are buffered into restoredSplits (not forwarded to any reader).
> 4. start() sends SourceReaderFinishedEvent to the coordinator, requesting a
> SwitchSourceEvent to activate the appropriate source reader.
> 5. Before the SwitchSourceEvent arrives, a checkpoint is triggered.
> 6. snapshotState() finds currentReader == null and returns
> Collections.emptyList(), ignoring restoredSplits entirely.
> 7. If the job fails again and restores from this new checkpoint, the buffered
> splits are gone forever.
> h3. Root Cause
> In HybridSourceReader.java line 109-114:
> {code:java}
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
> List<? extends SourceSplit> state =
> currentReader != null
> ? currentReader.snapshotState(checkpointId)
> : Collections.emptyList();
> return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
> switchedSources);
> }
> {code}
> When currentReader is null (which is the normal state between recovery and
> source switch), the method snapshots an empty list. The restoredSplits field
> – which holds splits recovered from the previous checkpoint but not yet
> assigned to a reader – is completely excluded from the snapshot.
> h3. Impact
> - Silent data loss: splits are dropped without any error or warning
> - Most likely to surface in unstable environments with frequent restarts,
> where the window between recovery and source switching is hit by consecutive
> failures
> - Affects all HybridSource users
> h3. Suggested Fix
> Throw exceptions when trying to snapshot state when current reader is null.
> {code:java}
> @Override
> public List<HybridSourceSplit> snapshotState(long checkpointId) {
> if (currentReader != null) {
> List<? extends SourceSplit> state =
> currentReader.snapshotState(checkpointId);
> return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
> switchedSources);
> } else {
> throw new IllegalStateException("currentReader can't be null when
> snapshot");
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)