dawidwys closed pull request #6704: [FLINK-10354] Revert "[FLINK-6328] [chkPts] Don't add savepoints to CompletedCheckpointStore" URL: https://github.com/apache/flink/pull/6704
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/ops/state/savepoints.md b/docs/ops/state/savepoints.md index 6dd5154c5e6..d31063ee2c5 100644 --- a/docs/ops/state/savepoints.md +++ b/docs/ops/state/savepoints.md @@ -106,6 +106,11 @@ Please follow <a href="https://issues.apache.org/jira/browse/FLINK-5778">FLINK-5 Note that if you use the `MemoryStateBackend`, metadata *and* savepoint state will be stored in the `_metadata` file. Since it is self-contained, you may move the file and restore from any location. +<div class="alert alert-warning"> + <strong>Attention:</strong> It is discouraged to move or delete last savepoint of a running job, cause this might interfere with failure-recovery. Savepoints have side-effects on exactly-once sinks, therefore + to ensure exactly-once semantics, if there is no checkpoint after the last savepoint, the savepoint will be used for recovery. +</div> + #### Trigger a Savepoint {% highlight shell %} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index e936b246222..57337b6286f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -839,28 +839,22 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro // the pending checkpoint must be discarded after the finalization Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null); - // TODO: add savepoints to completed checkpoint store once FLINK-4815 has been completed - if (!completedCheckpoint.getProperties().isSavepoint()) { - try { - completedCheckpointStore.addCheckpoint(completedCheckpoint); - } catch (Exception exception) { - // we failed to store the completed checkpoint. Let's clean up - executor.execute(new Runnable() { - @Override - public void run() { - try { - completedCheckpoint.discardOnFailedStoring(); - } catch (Throwable t) { - LOG.warn("Could not properly discard completed checkpoint {} of job {}.", completedCheckpoint.getCheckpointID(), job, t); - } + try { + completedCheckpointStore.addCheckpoint(completedCheckpoint); + } catch (Exception exception) { + // we failed to store the completed checkpoint. Let's clean up + executor.execute(new Runnable() { + @Override + public void run() { + try { + completedCheckpoint.discardOnFailedStoring(); + } catch (Throwable t) { + LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t); } - }); - - throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception); - } + } + }); - // drop those pending checkpoints that are at prior to the completed one - dropSubsumedCheckpoints(checkpointId); + throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception); } } finally { pendingCheckpoints.remove(checkpointId); @@ -870,6 +864,9 @@ public void run() { rememberRecentCheckpointId(checkpointId); + // drop those pending checkpoints that are at prior to the completed one + dropSubsumedCheckpoints(checkpointId); + // record the time when this was completed, to calculate // the 'min delay between checkpoints' lastCheckpointCompletionNanos = System.nanoTime(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index b113e12ef69..3650f43066d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -1494,8 +1494,8 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception { assertTrue(pending.isDiscarded()); assertTrue(savepointFuture.isDone()); - // the now the savepoint should be completed but not added to the completed checkpoint store - assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + // the now we should have a completed checkpoint + assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); // validate that the relevant tasks got a confirmation message @@ -1510,7 +1510,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception { verify(subtaskState2, times(1)).registerSharedStates(any(SharedStateRegistry.class)); } - CompletedCheckpoint success = savepointFuture.get(); + CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0); assertEquals(jid, success.getJobId()); assertEquals(timestamp, success.getTimestamp()); assertEquals(pending.getCheckpointId(), success.getCheckpointID()); @@ -1528,9 +1528,9 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception { coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew)); assertEquals(0, coord.getNumberOfPendingCheckpoints()); - assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); - CompletedCheckpoint successNew = savepointFuture.get(); + CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0); assertEquals(jid, successNew.getJobId()); assertEquals(timestampNew, successNew.getTimestamp()); assertEquals(checkpointIdNew, successNew.getCheckpointID()); @@ -1557,7 +1557,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception { * Triggers a savepoint and two checkpoints. The second checkpoint completes * and subsumes the first checkpoint, but not the first savepoint. Then we * trigger another checkpoint and savepoint. The 2nd savepoint completes and - * does neither subsume the last checkpoint nor the first savepoint. + * subsumes the last checkpoint, but not the first savepoint. */ @Test public void testSavepointsAreNotSubsumed() throws Exception { @@ -1614,19 +1614,18 @@ public void testSavepointsAreNotSubsumed() throws Exception { assertFalse(savepointFuture1.isDone()); assertTrue(coord.triggerCheckpoint(timestamp + 3, false)); - long checkpointId3 = counter.getLast(); assertEquals(2, coord.getNumberOfPendingCheckpoints()); CompletableFuture<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir); long savepointId2 = counter.getLast(); assertEquals(3, coord.getNumberOfPendingCheckpoints()); - // 2nd savepoint should not subsume the last checkpoint and the 1st savepoint + // 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2)); coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2)); - assertEquals(2, coord.getNumberOfPendingCheckpoints()); - assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(1, coord.getNumberOfPendingCheckpoints()); + assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints()); assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded()); assertFalse(savepointFuture1.isDone()); @@ -1636,15 +1635,9 @@ public void testSavepointsAreNotSubsumed() throws Exception { coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1)); coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1)); - assertEquals(1, coord.getNumberOfPendingCheckpoints()); - assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); - assertTrue(savepointFuture1.isDone()); - - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3)); - assertEquals(0, coord.getNumberOfPendingCheckpoints()); - assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints()); + assertTrue(savepointFuture1.isDone()); } private void testMaxConcurrentAttempts(int maxConcurrentAttempts) { @@ -3467,92 +3460,6 @@ public void testCheckpointStatsTrackerRestoreCallback() throws Exception { .reportRestoredCheckpoint(any(RestoredCheckpointStats.class)); } - /** - * FLINK-6328 - * - * Tests that savepoints are not added to the {@link CompletedCheckpointStore} and, - * thus, are not subject to job recovery. The reason that we don't want that (until - * FLINK-4815 has been finished) is that the lifecycle of savepoints is not controlled - * by the {@link CheckpointCoordinator}. - */ - @Test - public void testSavepointsAreNotAddedToCompletedCheckpointStore() throws Exception { - final JobID jobId = new JobID(); - final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID(); - final ExecutionVertex vertex1 = mockExecutionVertex(executionAttemptId); - final CompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); - final long checkpointTimestamp1 = 1L; - final long savepointTimestamp = 2L; - final long checkpointTimestamp2 = 3L; - final String savepointDir = tmpFolder.newFolder().getAbsolutePath(); - - final StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter(); - - CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator( - jobId, - 600000L, - 600000L, - 0L, - Integer.MAX_VALUE, - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, - new ExecutionVertex[]{vertex1}, - new ExecutionVertex[]{vertex1}, - new ExecutionVertex[]{vertex1}, - checkpointIDCounter, - completedCheckpointStore, - new MemoryStateBackend(), - Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); - - // trigger a first checkpoint - assertTrue( - "Triggering of a checkpoint should work.", - checkpointCoordinator.triggerCheckpoint(checkpointTimestamp1, false)); - - assertTrue(0 == completedCheckpointStore.getNumberOfRetainedCheckpoints()); - - // complete the 1st checkpoint - checkpointCoordinator.receiveAcknowledgeMessage( - new AcknowledgeCheckpoint( - jobId, - executionAttemptId, - checkpointIDCounter.getLast())); - - // check that the checkpoint has been completed - assertTrue(1 == completedCheckpointStore.getNumberOfRetainedCheckpoints()); - - // trigger a savepoint --> this should not have any effect on the CompletedCheckpointStore - CompletableFuture<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir); - - checkpointCoordinator.receiveAcknowledgeMessage( - new AcknowledgeCheckpoint( - jobId, - executionAttemptId, - checkpointIDCounter.getLast())); - - // check that no errors occurred - final CompletedCheckpoint savepoint = savepointFuture.get(); - - assertFalse( - "The savepoint should not have been added to the completed checkpoint store", - savepoint.getCheckpointID() == completedCheckpointStore.getLatestCheckpoint().getCheckpointID()); - - assertTrue( - "Triggering of a checkpoint should work.", - checkpointCoordinator.triggerCheckpoint(checkpointTimestamp2, false)); - - // complete the 2nd checkpoint - checkpointCoordinator.receiveAcknowledgeMessage( - new AcknowledgeCheckpoint( - jobId, - executionAttemptId, - checkpointIDCounter.getLast())); - - assertTrue( - "The latest completed (proper) checkpoint should have been added to the completed checkpoint store.", - completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == checkpointIDCounter.getLast()); - } - @Test public void testSharedStateRegistrationOnRestore() throws Exception { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services