[ 
https://issues.apache.org/jira/browse/FLINK-37605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940559#comment-17940559
 ] 

Arvid Heise commented on FLINK-37605:
-------------------------------------

On a second look, there is a deeper issue buried (that's why added the 
assertion in the first place): What is the interaction between committables 
generated during EOI and those that follow afterwards? Specifically how does it 
interact with the checkpoint subsumption contract?

Picking up the example from above. Let's assume that subtask 4, 5 emit data 
before sending EOI for some reason (unbalanced splits?). We are running exactly 
once, so each checkpoint has a unique transaction per subtask, which is 
forwarded to the committer.

Then on checkpoint 1, we simple have 3 transactions being forwarded to the 
committer; one for each subtask. These need to be remembered by checkpoint id 
so for committer subtask X, we have the committer state 
committables=\{1=[transaction-X]}.

On upscaling during recovery, we are recommitting all of those so the committer 
states will be {}.

Now on EOI, we are already forwarding committables of subtask 4, 5, such that 
the committer state is committables=\{EOI=[transaction-X]} for subtask 4, 5.

On checkpoint 2, subtask 1,2,3 result in committables=\{2=[transaction-X]}

On downscaling during recovery, states are folded such that subtask 2 will have 
committables=\{2=[transaction-3], EOI=[transaction-4]}. We are recommiting 
everything resulting in committer state {}.

For checkpoint 3, subtask 2 would have a new state 
committables=\{3=[transaction-2]}.

 

So I guess it kinda works because we are sure to recommit everything before 
accepting new committables in recovery. So we will never have the situation 
where a committable of checkpoint Y comes after EOI committable.

 

> SinkWriter may incorrectly infer end of input during rescale
> ------------------------------------------------------------
>
>                 Key: FLINK-37605
>                 URL: https://issues.apache.org/jira/browse/FLINK-37605
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 2.0.0, 1.19.2, 1.20.1
>            Reporter: Arvid Heise
>            Assignee: Arvid Heise
>            Priority: Major
>
> FLINK-25920 introduced an EOI check that uses state to ensure that no data is 
> lost after fail over during final checkpoint.
> However, the check is too strict and can trigger in other occasions:
>  * Consider a simple pipeline DataGeneratorSource -> Sink
>  * Start run with parallelism 3, the source generates 3 splits
>  * Checkpoint 1
>  * Upscale to 5, the source still only has 3 splits, subtask 4, 5 finish
>  * EOI arrives sink subtask 4, 5
>  * Checkpoint 2 includes EOI for those subtasks
>  * Downscale back to 3
>  * All source subtasks have active splits
>  * Sink subtasks get the following EOI states 1=[false, false], 2=[false, 
> true], 3=[true]
>  * So sink 3 assumes that it doesn't receive any more input and fails the 
> assertion
> The assertion is not salvageable and we need to get rid of it entirely. The 
> sink needs to deal with "duplicate" EOIs:
>  * The writer will simply emit duplicate EOI committables/summary
>  * The committer needs to merge them. It already does since FLINK-25920.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to