[ https://issues.apache.org/jira/browse/FLINK-37605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940559#comment-17940559 ]
Arvid Heise commented on FLINK-37605: ------------------------------------- On a second look, there is a deeper issue buried (that's why added the assertion in the first place): What is the interaction between committables generated during EOI and those that follow afterwards? Specifically how does it interact with the checkpoint subsumption contract? Picking up the example from above. Let's assume that subtask 4, 5 emit data before sending EOI for some reason (unbalanced splits?). We are running exactly once, so each checkpoint has a unique transaction per subtask, which is forwarded to the committer. Then on checkpoint 1, we simple have 3 transactions being forwarded to the committer; one for each subtask. These need to be remembered by checkpoint id so for committer subtask X, we have the committer state committables=\{1=[transaction-X]}. On upscaling during recovery, we are recommitting all of those so the committer states will be {}. Now on EOI, we are already forwarding committables of subtask 4, 5, such that the committer state is committables=\{EOI=[transaction-X]} for subtask 4, 5. On checkpoint 2, subtask 1,2,3 result in committables=\{2=[transaction-X]} On downscaling during recovery, states are folded such that subtask 2 will have committables=\{2=[transaction-3], EOI=[transaction-4]}. We are recommiting everything resulting in committer state {}. For checkpoint 3, subtask 2 would have a new state committables=\{3=[transaction-2]}. So I guess it kinda works because we are sure to recommit everything before accepting new committables in recovery. So we will never have the situation where a committable of checkpoint Y comes after EOI committable. > SinkWriter may incorrectly infer end of input during rescale > ------------------------------------------------------------ > > Key: FLINK-37605 > URL: https://issues.apache.org/jira/browse/FLINK-37605 > Project: Flink > Issue Type: Bug > Components: API / Core > Affects Versions: 2.0.0, 1.19.2, 1.20.1 > Reporter: Arvid Heise > Assignee: Arvid Heise > Priority: Major > > FLINK-25920 introduced an EOI check that uses state to ensure that no data is > lost after fail over during final checkpoint. > However, the check is too strict and can trigger in other occasions: > * Consider a simple pipeline DataGeneratorSource -> Sink > * Start run with parallelism 3, the source generates 3 splits > * Checkpoint 1 > * Upscale to 5, the source still only has 3 splits, subtask 4, 5 finish > * EOI arrives sink subtask 4, 5 > * Checkpoint 2 includes EOI for those subtasks > * Downscale back to 3 > * All source subtasks have active splits > * Sink subtasks get the following EOI states 1=[false, false], 2=[false, > true], 3=[true] > * So sink 3 assumes that it doesn't receive any more input and fails the > assertion > The assertion is not salvageable and we need to get rid of it entirely. The > sink needs to deal with "duplicate" EOIs: > * The writer will simply emit duplicate EOI committables/summary > * The committer needs to merge them. It already does since FLINK-25920. -- This message was sent by Atlassian Jira (v8.20.10#820010)