[ 
https://issues.apache.org/jira/browse/FLINK-37605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arvid Heise updated FLINK-37605:
--------------------------------
    Description: 
FLINK-25920 introduced an EOI check that uses state to ensure that no data is 
lost after fail over during final checkpoint. The respective error message reads
{noformat}
Received element after endOfInput: ...{noformat}
However, the check is too strict and can trigger in other occasions:
 * Consider a simple pipeline DataGeneratorSource -> Sink
 * Start run with parallelism 3, the source generates 3 splits
 * Checkpoint 1
 * Upscale to 5, the source still only has 3 splits, subtask 4, 5 finish
 * EOI arrives sink subtask 4, 5
 * Checkpoint 2 includes EOI for those subtasks
 * Downscale back to 3
 * All source subtasks have active splits
 * Sink subtasks get the following EOI states 1=[false, false], 2=[false, 
true], 3=[true]
 * So sink 3 assumes that it doesn't receive any more input and fails the 
assertion

The assertion is not salvageable and we need to get rid of it entirely. The 
sink needs to deal with "duplicate" EOIs:
 * The writer will simply emit duplicate EOI committables/summary
 * The committer needs to merge them. It already does since FLINK-25920.

  was:
FLINK-25920 introduced an EOI check that uses state to ensure that no data is 
lost after fail over during final checkpoint.

However, the check is too strict and can trigger in other occasions:
 * Consider a simple pipeline DataGeneratorSource -> Sink
 * Start run with parallelism 3, the source generates 3 splits
 * Checkpoint 1
 * Upscale to 5, the source still only has 3 splits, subtask 4, 5 finish
 * EOI arrives sink subtask 4, 5
 * Checkpoint 2 includes EOI for those subtasks
 * Downscale back to 3
 * All source subtasks have active splits
 * Sink subtasks get the following EOI states 1=[false, false], 2=[false, 
true], 3=[true]
 * So sink 3 assumes that it doesn't receive any more input and fails the 
assertion

The assertion is not salvageable and we need to get rid of it entirely. The 
sink needs to deal with "duplicate" EOIs:
 * The writer will simply emit duplicate EOI committables/summary
 * The committer needs to merge them. It already does since FLINK-25920.


> SinkWriter may incorrectly infer end of input during rescale
> ------------------------------------------------------------
>
>                 Key: FLINK-37605
>                 URL: https://issues.apache.org/jira/browse/FLINK-37605
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 2.0.0, 1.19.2, 1.20.1
>            Reporter: Arvid Heise
>            Assignee: Arvid Heise
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.1.0, 2.0.1
>
>
> FLINK-25920 introduced an EOI check that uses state to ensure that no data is 
> lost after fail over during final checkpoint. The respective error message 
> reads
> {noformat}
> Received element after endOfInput: ...{noformat}
> However, the check is too strict and can trigger in other occasions:
>  * Consider a simple pipeline DataGeneratorSource -> Sink
>  * Start run with parallelism 3, the source generates 3 splits
>  * Checkpoint 1
>  * Upscale to 5, the source still only has 3 splits, subtask 4, 5 finish
>  * EOI arrives sink subtask 4, 5
>  * Checkpoint 2 includes EOI for those subtasks
>  * Downscale back to 3
>  * All source subtasks have active splits
>  * Sink subtasks get the following EOI states 1=[false, false], 2=[false, 
> true], 3=[true]
>  * So sink 3 assumes that it doesn't receive any more input and fails the 
> assertion
> The assertion is not salvageable and we need to get rid of it entirely. The 
> sink needs to deal with "duplicate" EOIs:
>  * The writer will simply emit duplicate EOI committables/summary
>  * The committer needs to merge them. It already does since FLINK-25920.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to