asfgit closed pull request #7064: [FLINK-10753] Improve propagation and logging of snapshot exceptions URL: https://github.com/apache/flink/pull/7064
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 57337b6286f..02b6fa4a2bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1249,11 +1249,14 @@ private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Th final long checkpointId = pendingCheckpoint.getCheckpointId(); - final String reason = (cause != null) ? cause.getMessage() : ""; + LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause); - LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason); + if (cause != null) { + pendingCheckpoint.abortError(cause); + } else { + pendingCheckpoint.abortDeclined(); + } - pendingCheckpoint.abortDeclined(); rememberRecentCheckpointId(checkpointId); // we don't have to schedule another "dissolving" checkpoint any more because the diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 1b51ac4bf8d..1bc6b0e4baa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; @@ -433,25 +434,23 @@ public void abortSubsumed() { } } + public void abortDeclined() { - try { - Exception cause = new Exception("Checkpoint was declined (tasks not ready)"); - onCompletionPromise.completeExceptionally(cause); - reportFailedCheckpoint(cause); - } finally { - dispose(true); - } + abortWithCause(new Exception("Checkpoint was declined (tasks not ready)")); } /** * Aborts the pending checkpoint due to an error. * @param cause The error's exception. */ - public void abortError(Throwable cause) { + public void abortError(@Nonnull Throwable cause) { + abortWithCause(new Exception("Checkpoint failed: " + cause.getMessage(), cause)); + } + + private void abortWithCause(@Nonnull Exception cause) { try { - Exception failure = new Exception("Checkpoint failed: " + cause.getMessage(), cause); - onCompletionPromise.completeExceptionally(failure); - reportFailedCheckpoint(failure); + onCompletionPromise.completeExceptionally(cause); + reportFailedCheckpoint(cause); } finally { dispose(true); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java index aba8bda1918..918fa50483d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java @@ -26,8 +26,13 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class RpcCheckpointResponder implements CheckpointResponder { + private static final Logger LOG = LoggerFactory.getLogger(RpcCheckpointResponder.class); + private final CheckpointCoordinatorGateway checkpointCoordinatorGateway; public RpcCheckpointResponder(CheckpointCoordinatorGateway checkpointCoordinatorGateway) { @@ -57,6 +62,7 @@ public void declineCheckpoint( long checkpointId, Throwable cause) { + LOG.info("Declining checkpoint {} of job {}.", checkpointId, jobID, cause); checkpointCoordinatorGateway.declineCheckpoint(jobID, executionAttemptID, checkpointId, cause); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a63a7971679..4967cb9ead9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -413,8 +413,10 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times snapshotException.addSuppressed(e); } - throw new Exception("Could not complete snapshot " + checkpointId + " for operator " + - getOperatorName() + '.', snapshotException); + String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " + + getOperatorName() + "."; + + throw new Exception(snapshotFailMessage, snapshotException); } return snapshotInProgress; diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index 3db0f62f829..097616feb96 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -142,6 +142,7 @@ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClien } catch (Exception e) { String exceptionString = ExceptionUtils.stringifyException(e); if (!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*") // legacy + || exceptionString.matches("(.*\n)*.*was not running(.*\n)*") || exceptionString.matches("(.*\n)*.*Not all required tasks are currently running(.*\n)*") // new || exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not ready\\)(.*\n)*"))) { // new throw e; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services