afedulov commented on code in PR #637: URL: https://github.com/apache/flink-kubernetes-operator/pull/637#discussion_r1273647391
########## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java: ########## @@ -409,24 +417,143 @@ public void testTriggerSavepoint() throws Exception { sp1SessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs()); sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger(); - ReconciliationUtils.updateLastReconciledSavepointTriggerNonce( - sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), sp1SessionJob); + ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce( + sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), + sp1SessionJob, + SAVEPOINT); // trigger when new nonce is defined sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(4L); reconciler.reconcile(sp1SessionJob, readyContext); assertEquals( - "trigger_1", + "savepoint_trigger_1", sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId()); sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger(); - ReconciliationUtils.updateLastReconciledSavepointTriggerNonce( - sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), sp1SessionJob); + ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce( + sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), + sp1SessionJob, + SAVEPOINT); // don't trigger when nonce is cleared sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(null); reconciler.reconcile(sp1SessionJob, readyContext); - assertFalse(SavepointUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus())); + assertFalse(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus())); + } + + @Test + public void testTriggerCheckpoint() throws Exception { + FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); + assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sessionJob))); + + var readyContext = TestUtils.createContextWithReadyFlinkDeployment(); + reconciler.reconcile(sessionJob, readyContext); + verifyAndSetRunningJobsToStatus( + sessionJob, JobState.RUNNING, RECONCILING.name(), null, flinkService.listJobs()); + + assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sessionJob))); + + // trigger checkpoint + var sp1SessionJob = ReconciliationUtils.clone(sessionJob); + + // do not trigger checkpoint if nonce is null + reconciler.reconcile(sp1SessionJob, readyContext); + assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob))); + + getJobSpec(sp1SessionJob).setCheckpointTriggerNonce(2L); + getJobStatus(sp1SessionJob).setState(CREATED.name()); + reconciler.reconcile(sp1SessionJob, readyContext); + // do not trigger checkpoint if job is not running + assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob))); + + getJobStatus(sp1SessionJob).setState(RUNNING.name()); + + reconciler.reconcile(sp1SessionJob, readyContext); + assertTrue(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob))); + + // the last reconcile nonce updated + assertNull(getReconciledJobSpec(sp1SessionJob).getCheckpointTriggerNonce()); + + // don't trigger new checkpoint when checkpoint is in progress + getJobSpec(sp1SessionJob).setCheckpointTriggerNonce(3L); + reconciler.reconcile(sp1SessionJob, readyContext); + assertEquals("checkpoint_trigger_0", getCheckpointInfo(sp1SessionJob).getTriggerId()); + /* + TODO: this section needs to be reintroduced in case the LAST_STATE optimization gets + added Review Comment: I mostly left it for the review discussion. @gyfora what do you think, is is a worthy optimization in your opinion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org