[ 
https://issues.apache.org/jira/browse/FLINK-19701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17223767#comment-17223767
 ] 

Roman Khachatryan edited comment on FLINK-19701 at 10/30/20, 7:43 PM:
----------------------------------------------------------------------

-Another related issue occurs in the following scenario:- - not an issue (there 
is a check in getInflightBuffers)
 # taskA and taskB receive AlignedCheckpoint barrier
 # taskB cancels it
 # JM sends (next) UnalignedCheckpoint barrier
 # now taskA will have AC barrier in queue AND will receive UC barrier - so 
RemoteInputChannel.receivedBuffers.getNumUnprioritizedElements() will include 
this barrier, which is currently illegal (ChannelStateWriter will throw an 
exception)

Probably, it's easier to solve it together. If not, please create a separate 
ticket.


was (Author: roman_khachatryan):
Another related issue occurs in the following scenario:
 # taskA and taskB receive AlignedCheckpoint barrier
 # taskB cancels it
 # JM sends (next) UnalignedCheckpoint barrier
 # now taskA will have AC barrier in queue AND will receive UC barrier - so 
RemoteInputChannel.receivedBuffers.getNumUnprioritizedElements() will include 
this barrier, which is currently illegal (ChannelStateWriter will throw an 
exception)

Probably, it's easier to solve it together. If not, please create a separate 
ticket.

> Unaligned Checkpoint might misuse the number of buffers to persist from the 
> previous barrier
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19701
>                 URL: https://issues.apache.org/jira/browse/FLINK-19701
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.0
>            Reporter: Yun Gao
>            Priority: Major
>
> Current CheckpointUnaligner interacts with RemoteInputChannel to persisting 
> the input buffers. However, based the current implementation it seems if we 
> have the following case:
> {code:java}
> 1. There are 3 input channels.
> 2. Input channel 0 received barrier 1, and processed barrier 1 to start 
> checkpoint 1.
> 3. Input channel 1 received barrier 1, and processed barrier 1. Now the state 
> of input channel persister becomes BARRIER_RECEIVED and 
> numBuffersOvertaken(channel 1) = n_1.
> 4. However, input 2 received nothing and the checkpoint expired, new 
> checkpoint is trigger.
> 5. Input channel 0 received barrier 2, checkpoint 1 is deserted and 
> checkpoint 2 is started. However, in this case the state of the input 
> channels are not cleared. Thus now channel 1 is still BARRIER_RECEIVED and 
> numBuffersOvertaken(channel 1) = n_1. Then channel 1 would only persist n_1 
> buffers in the channel for the new checkpoint 2. 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to