pnowojski commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r434542740
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -409,5 +416,15 @@ synchronized long getCurrentCheckpointId() {
boolean isCheckpointPending() {
return numBarriersReceived > 0;
}
+
+ private void resetReceivedBarriers() {
+ Arrays.fill(storeNewBuffers, false);
+ numBarriersReceived = 0;
+ }
+
+ private void notifyAbort(CheckpointException exception) throws
IOException {
+ long currentCheckpointId = currentReceivedCheckpointId;
+ handler.executeInTaskThread(() ->
handler.notifyAbort(currentCheckpointId, exception), "notifyAbort");
+ }
Review comment:
I'm not sure if this is working properly and even if it is, I'm not sure
if it's a good way to solve the problem.
As far as I understand it works like this:
1. we trigger checkpoint from netty thread and enqueue `notifyCheckpoint` in
the mailbox
2. we receive channel closed/end of partition event before
`notifyCheckpoint` starts executing in the task thread from mailbox. This will
lead us to this method and enqueuing `notifyAbort` after `notifyCheckpoint` in
the mailbox
3. we are still going to execute `notifyCheckpoint` callback and trigger the
checkpoint, despite it was already (partially?) aborted (for example
`resetReceivedBarriers()` has already been called.
4. `notifyAbort` will clean up the checkpoint started in 3.
Besides being quite complicated and hard to reason about, I'm not sure if
it's correct and what could be the side effects of doing this in so many
stages. It also might be unnecessarily using resources for starting a
checkpoint that we already know will not happen.
Why can not it work like that:
1. as before
2. we receive channel closed/end of partition event before
`notifyCheckpoint` starts executing in the task thread from mailbox. We mark
the correct checkpoints as cancelled (by bumping the cancelled/current
checkpoint ids) in the `ThreadSafeUnaligner` and we abort the checkpoint
immediately (assuming we are in the task thead, but I think we are always).
3. if `notifyCheckpoint` starts executing it should check if the checkpoint
it's suppose to notify wasn't cancelled.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -154,46 +154,51 @@ public void processBarrier(CheckpointBarrier
receivedBarrier, int channelIndex)
@Override
public void processCancellationBarrier(CancelCheckpointMarker
cancelBarrier) throws Exception {
long cancelledId = cancelBarrier.getCheckpointId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Checkpoint {} canceled, aborting
alignment.", taskName, cancelledId);
- }
Review comment:
Have we lost this log message?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -117,17 +117,6 @@
threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels,
checkNotNull(channelStateWriter), this);
}
- @Override
- public void releaseBlocksAndResetBarriers() {
- if (isCheckpointPending()) {
- // make sure no additional data is persisted
- Arrays.fill(hasInflightBuffers, false);
- // the next barrier that comes must assume it is the
first
- numBarrierConsumed = 0;
- }
-
threadSafeUnaligner.resetReceivedBarriers(currentConsumedCheckpointId);
- }
-
Review comment:
Are you sure this is a safe delete?
`AlternatingCheckpointBarrierHandler` is for example using this method to clean
up state of the previous handler before switching to another.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -154,46 +154,51 @@ public void processBarrier(CheckpointBarrier
receivedBarrier, int channelIndex)
@Override
public void processCancellationBarrier(CancelCheckpointMarker
cancelBarrier) throws Exception {
long cancelledId = cancelBarrier.getCheckpointId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Checkpoint {} canceled, aborting
alignment.", taskName, cancelledId);
- }
-
- if (currentConsumedCheckpointId >= cancelledId &&
!isCheckpointPending()) {
- return;
- }
+ // tag whether we should abort checkpoint from task thread view
+ boolean shouldAbort1 = false;
- if (isCheckpointPending()) {
+ if (cancelledId > currentConsumedCheckpointId) {
+ currentConsumedCheckpointId = cancelledId;
+ shouldAbort1 = true;
+ } else if (cancelledId == currentConsumedCheckpointId &&
isCheckpointPending()) {
LOG.warn("{}: Received cancellation barrier for
checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
taskName,
cancelledId,
currentConsumedCheckpointId);
resetConsumedBarriers();
+ shouldAbort1 = true;
+ }
+
+ // tag whether we should abort checkpoint from
threadSafeUnaligner view
+ boolean shouldAbort2 =
threadSafeUnaligner.setCancelledCheckpointId(cancelledId);
Review comment:
I don't fully understand this `boolean shouldAbort2`. What are the
conditions when it's set to true while `shouldAbort1` is false? Why do we have
those two sources of truth?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -387,12 +378,28 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized boolean onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (numBarriersReceived > 0) {
+ resetReceivedBarriers();
+ notifyAbort(new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ return true;
+ }
+ return false;
}
- synchronized void setCurrentReceivedCheckpointId(long
currentReceivedCheckpointId) {
- this.currentReceivedCheckpointId =
Math.max(currentReceivedCheckpointId, this.currentReceivedCheckpointId);
+ synchronized boolean setCancelledCheckpointId(long
canceledCheckpointId) {
+ boolean shouldAbort = false;
+ if (canceledCheckpointId > currentReceivedCheckpointId)
{
+ currentReceivedCheckpointId =
canceledCheckpointId;
+ shouldAbort = true;
+
+ } else if (canceledCheckpointId ==
currentReceivedCheckpointId && isCheckpointPending()) {
+ resetReceivedBarriers();
+ shouldAbort = true;
+ }
+ return shouldAbort;
Review comment:
I think something is missing in the:
```
private void notifyCheckpoint(CheckpointBarrier barrier) throws
IOException {
// ignore the previous triggered checkpoint by netty thread if
it was already canceled or aborted before.
if (barrier.getId() >=
threadSafeUnaligner.getCurrentCheckpointId()) {
super.notifyCheckpoint(barrier, 0);
}
}
```
? As it is now, after cancelling/closing checkpoint, the above method would
still trigger the checkpoint, as `barrier.getId()` would be equal to
`currentReceivedCheckpointId`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]