[ 
https://issues.apache.org/jira/browse/FLINK-17869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-17869:
-----------------------------
    Description: 
On ChannelStateWriter side, the lifecycle of checkpoint should be as follows:

start -> in progress/abort -> stop.

We must guarantee that #abort should be queued after #start, otherwise the 
aborted checkpoint might be started later again in the case of race condition.

There are two cases might trigger abort checkpoint:
 * CheckpointBarrierUnaligner#processEndOfPartition
 * CheckpointBarrierUnaligner#processCancellationBarrier

The current condition to execute abort checkpoint for above both cases is based 
on #isCheckpointPending(), which can not cover all the cases. The unaligned 
checkpoint might be triggered either by task thread via 
CheckpointBarrierUnaligner#processBarrier or netty thread via 
ThreadSafeUnaligner#notifyBarrierReceived. Anyway we should maintain the 
current triggered checkpoint id in order to handle both above abort cases 
properly.

Another bug is that during ChannelStateWriterImpl#abort, we should not remove 
the respective ChannelStateWriteResult. Otherwise it would throw 
IllegalArgumentException when ChannelStateWriterImpl#getWriteResult in the 
process of checkpoint. ChannelStateWriteResult should be created at #start 
method and only removed at #stop method.

  was:
On ChannelStateWriter side, the lifecycle of checkpoint should be as follows:

start -> in progress/abort -> stop.

We must guarantee that #abort should be queued after #start, otherwise the 
aborted checkpoint might be started later again in the case of race condition.

There are two cases might trigger abort checkpoint:
 * One is CheckpointBarrierUnaligner#processEndOfPartition, which should abort 
all the current and future checkpoints, no need to judge the condition 
`isCheckpointPending()` as current code did. 
 * Another is CheckpointBarrierUnaligner#processCancellationBarrier, which 
should only abort the respective checkpoint id if already triggered before.

The unaligned checkpoint might be triggered either by task thread or netty 
thread inside ThreadSafeUnaligner. Anyway we should know the current triggered 
checkpoint id in order to handle both above cases properly.

Another bug is that during ChannelStateWriterImpl#abort, we should not remove 
the respective ChannelStateWriteResult. Otherwise it would throw 
IllegalArgumentException when ChannelStateWriterImpl#getWriteResult in the 
process of checkpoint. ChannelStateWriteResult should be created at #start 
method and only removed at #stop method.


> Fix the race condition of aborting unaligned checkpoint
> -------------------------------------------------------
>
>                 Key: FLINK-17869
>                 URL: https://issues.apache.org/jira/browse/FLINK-17869
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>            Reporter: Zhijiang
>            Assignee: Zhijiang
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> On ChannelStateWriter side, the lifecycle of checkpoint should be as follows:
> start -> in progress/abort -> stop.
> We must guarantee that #abort should be queued after #start, otherwise the 
> aborted checkpoint might be started later again in the case of race condition.
> There are two cases might trigger abort checkpoint:
>  * CheckpointBarrierUnaligner#processEndOfPartition
>  * CheckpointBarrierUnaligner#processCancellationBarrier
> The current condition to execute abort checkpoint for above both cases is 
> based on #isCheckpointPending(), which can not cover all the cases. The 
> unaligned checkpoint might be triggered either by task thread via 
> CheckpointBarrierUnaligner#processBarrier or netty thread via 
> ThreadSafeUnaligner#notifyBarrierReceived. Anyway we should maintain the 
> current triggered checkpoint id in order to handle both above abort cases 
> properly.
> Another bug is that during ChannelStateWriterImpl#abort, we should not remove 
> the respective ChannelStateWriteResult. Otherwise it would throw 
> IllegalArgumentException when ChannelStateWriterImpl#getWriteResult in the 
> process of checkpoint. ChannelStateWriteResult should be created at #start 
> method and only removed at #stop method.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to