[ https://issues.apache.org/jira/browse/FLINK-19701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17223767#comment-17223767 ]
Roman Khachatryan edited comment on FLINK-19701 at 10/30/20, 7:43 PM: ---------------------------------------------------------------------- -Another related issue occurs in the following scenario:- - not an issue (there is a check in getInflightBuffers) # taskA and taskB receive AlignedCheckpoint barrier # taskB cancels it # JM sends (next) UnalignedCheckpoint barrier # now taskA will have AC barrier in queue AND will receive UC barrier - so RemoteInputChannel.receivedBuffers.getNumUnprioritizedElements() will include this barrier, which is currently illegal (ChannelStateWriter will throw an exception) Probably, it's easier to solve it together. If not, please create a separate ticket. was (Author: roman_khachatryan): Another related issue occurs in the following scenario: # taskA and taskB receive AlignedCheckpoint barrier # taskB cancels it # JM sends (next) UnalignedCheckpoint barrier # now taskA will have AC barrier in queue AND will receive UC barrier - so RemoteInputChannel.receivedBuffers.getNumUnprioritizedElements() will include this barrier, which is currently illegal (ChannelStateWriter will throw an exception) Probably, it's easier to solve it together. If not, please create a separate ticket. > Unaligned Checkpoint might misuse the number of buffers to persist from the > previous barrier > -------------------------------------------------------------------------------------------- > > Key: FLINK-19701 > URL: https://issues.apache.org/jira/browse/FLINK-19701 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.12.0 > Reporter: Yun Gao > Priority: Major > > Current CheckpointUnaligner interacts with RemoteInputChannel to persisting > the input buffers. However, based the current implementation it seems if we > have the following case: > {code:java} > 1. There are 3 input channels. > 2. Input channel 0 received barrier 1, and processed barrier 1 to start > checkpoint 1. > 3. Input channel 1 received barrier 1, and processed barrier 1. Now the state > of input channel persister becomes BARRIER_RECEIVED and > numBuffersOvertaken(channel 1) = n_1. > 4. However, input 2 received nothing and the checkpoint expired, new > checkpoint is trigger. > 5. Input channel 0 received barrier 2, checkpoint 1 is deserted and > checkpoint 2 is started. However, in this case the state of the input > channels are not cleared. Thus now channel 1 is still BARRIER_RECEIVED and > numBuffersOvertaken(channel 1) = n_1. Then channel 1 would only persist n_1 > buffers in the channel for the new checkpoint 2. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)