zhijiangW commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r435201417
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -154,46 +154,51 @@ public void processBarrier(CheckpointBarrier
receivedBarrier, int channelIndex)
@Override
public void processCancellationBarrier(CancelCheckpointMarker
cancelBarrier) throws Exception {
long cancelledId = cancelBarrier.getCheckpointId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Checkpoint {} canceled, aborting
alignment.", taskName, cancelledId);
- }
-
- if (currentConsumedCheckpointId >= cancelledId &&
!isCheckpointPending()) {
- return;
- }
+ // tag whether we should abort checkpoint from task thread view
+ boolean shouldAbort1 = false;
- if (isCheckpointPending()) {
+ if (cancelledId > currentConsumedCheckpointId) {
+ currentConsumedCheckpointId = cancelledId;
+ shouldAbort1 = true;
+ } else if (cancelledId == currentConsumedCheckpointId &&
isCheckpointPending()) {
LOG.warn("{}: Received cancellation barrier for
checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
taskName,
cancelledId,
currentConsumedCheckpointId);
resetConsumedBarriers();
+ shouldAbort1 = true;
+ }
+
+ // tag whether we should abort checkpoint from
threadSafeUnaligner view
+ boolean shouldAbort2 =
threadSafeUnaligner.setCancelledCheckpointId(cancelledId);
Review comment:
The key problem here is that we actually maintain two suits of states in
`CheckpointBarrierUnaligner` and `ThreadSafeUnaligner` separately, and we are
not sure which states are up to date in race condition case. Therefore we need
to judge both of them to give a final decision.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]