afedulov commented on code in PR #637:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/637#discussion_r1279737381


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java:
##########
@@ -409,24 +417,143 @@ public void testTriggerSavepoint() throws Exception {
                 sp1SessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
 
         
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
-        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
-                sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), 
sp1SessionJob);
+        ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
+                sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(),
+                sp1SessionJob,
+                SAVEPOINT);
 
         // trigger when new nonce is defined
         sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(4L);
         reconciler.reconcile(sp1SessionJob, readyContext);
         assertEquals(
-                "trigger_1",
+                "savepoint_trigger_1",
                 
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
 
         
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
-        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
-                sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), 
sp1SessionJob);
+        ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
+                sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(),
+                sp1SessionJob,
+                SAVEPOINT);
 
         // don't trigger when nonce is cleared
         sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(null);
         reconciler.reconcile(sp1SessionJob, readyContext);
-        
assertFalse(SavepointUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
+        
assertFalse(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
+    }
+
+    @Test
+    public void testTriggerCheckpoint() throws Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+        
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sessionJob)));
+
+        var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+        reconciler.reconcile(sessionJob, readyContext);
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+
+        
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sessionJob)));
+
+        // trigger checkpoint
+        var sp1SessionJob = ReconciliationUtils.clone(sessionJob);
+
+        // do not trigger checkpoint if nonce is null
+        reconciler.reconcile(sp1SessionJob, readyContext);
+        
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
+
+        getJobSpec(sp1SessionJob).setCheckpointTriggerNonce(2L);
+        getJobStatus(sp1SessionJob).setState(CREATED.name());
+        reconciler.reconcile(sp1SessionJob, readyContext);
+        // do not trigger checkpoint if job is not running
+        
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
+
+        getJobStatus(sp1SessionJob).setState(RUNNING.name());
+
+        reconciler.reconcile(sp1SessionJob, readyContext);
+        
assertTrue(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
+
+        // the last reconcile nonce updated
+        
assertNull(getReconciledJobSpec(sp1SessionJob).getCheckpointTriggerNonce());
+
+        // don't trigger new checkpoint when checkpoint is in progress
+        getJobSpec(sp1SessionJob).setCheckpointTriggerNonce(3L);
+        reconciler.reconcile(sp1SessionJob, readyContext);
+        assertEquals("checkpoint_trigger_0", 
getCheckpointInfo(sp1SessionJob).getTriggerId());
+        /*
+            TODO: this section needs to be reintroduced in case the LAST_STATE 
optimization gets
+             added

Review Comment:
   👍  https://issues.apache.org/jira/browse/FLINK-32719
   Removed.



##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java:
##########
@@ -409,24 +417,143 @@ public void testTriggerSavepoint() throws Exception {
                 sp1SessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
 
         
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
-        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
-                sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), 
sp1SessionJob);
+        ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
+                sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(),
+                sp1SessionJob,
+                SAVEPOINT);
 
         // trigger when new nonce is defined
         sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(4L);
         reconciler.reconcile(sp1SessionJob, readyContext);
         assertEquals(
-                "trigger_1",
+                "savepoint_trigger_1",
                 
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
 
         
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
-        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
-                sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), 
sp1SessionJob);
+        ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
+                sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(),
+                sp1SessionJob,
+                SAVEPOINT);
 
         // don't trigger when nonce is cleared
         sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(null);
         reconciler.reconcile(sp1SessionJob, readyContext);
-        
assertFalse(SavepointUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
+        
assertFalse(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
+    }
+
+    @Test
+    public void testTriggerCheckpoint() throws Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+        
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sessionJob)));
+
+        var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+        reconciler.reconcile(sessionJob, readyContext);
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+
+        
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sessionJob)));
+
+        // trigger checkpoint
+        var sp1SessionJob = ReconciliationUtils.clone(sessionJob);
+
+        // do not trigger checkpoint if nonce is null
+        reconciler.reconcile(sp1SessionJob, readyContext);
+        
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
+
+        getJobSpec(sp1SessionJob).setCheckpointTriggerNonce(2L);
+        getJobStatus(sp1SessionJob).setState(CREATED.name());
+        reconciler.reconcile(sp1SessionJob, readyContext);
+        // do not trigger checkpoint if job is not running
+        
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
+
+        getJobStatus(sp1SessionJob).setState(RUNNING.name());
+
+        reconciler.reconcile(sp1SessionJob, readyContext);
+        
assertTrue(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
+
+        // the last reconcile nonce updated
+        
assertNull(getReconciledJobSpec(sp1SessionJob).getCheckpointTriggerNonce());
+
+        // don't trigger new checkpoint when checkpoint is in progress
+        getJobSpec(sp1SessionJob).setCheckpointTriggerNonce(3L);
+        reconciler.reconcile(sp1SessionJob, readyContext);
+        assertEquals("checkpoint_trigger_0", 
getCheckpointInfo(sp1SessionJob).getTriggerId());
+        /*
+            TODO: this section needs to be reintroduced in case the LAST_STATE 
optimization gets
+             added

Review Comment:
   👍  https://issues.apache.org/jira/browse/FLINK-32719
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to