XComp commented on a change in pull request #18189:
URL: https://github.com/apache/flink/pull/18189#discussion_r792392290



##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -147,6 +157,67 @@ public void 
testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc
         }
     }
 
+    @Test
+    public void testDirtyJobResultRecoveryInApplicationMode() throws Exception 
{
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
+        configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, 
Duration.ofMillis(100));
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder()
+                        .setConfiguration(configuration)
+                        .build();
+
+        // having a dirty entry in the JobResultStore should make the 
ApplicationDispatcherBootstrap
+        // implementation fail to submit the job
+        final JobResultStore jobResultStore = new EmbeddedJobResultStore();
+        jobResultStore.createDirtyResult(
+                new JobResultEntry(
+                        new JobResult.Builder()
+                                
.jobId(ApplicationDispatcherBootstrap.ZERO_JOB_ID)
+                                .applicationStatus(ApplicationStatus.SUCCEEDED)
+                                .netRuntime(1)
+                                .build()));
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {
+
+                    @Override
+                    public JobResultStore getJobResultStore() {
+                        return jobResultStore;
+                    }
+                };
+
+        final TestingMiniCluster.Builder clusterBuilder =
+                TestingMiniCluster.newBuilder(clusterConfiguration)
+                        .setHighAvailabilityServicesSupplier(() -> haServices)
+                        .setDispatcherResourceManagerComponentFactorySupplier(
+                                
createApplicationModeDispatcherResourceManagerComponentFactorySupplier(
+                                        
clusterConfiguration.getConfiguration(),
+                                        
ErrorHandlingSubmissionJob.createPackagedProgram()));
+        try (final MiniCluster cluster = clusterBuilder.build()) {
+            // start mini cluster and submit the job
+            cluster.start();
+
+            // the cluster should shut down automatically once the application 
completes
+            awaitClusterStopped(cluster, deadline);
+        }
+
+        
FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException())
+                .as(
+                        "The job's main method shouldn't have been succeeded 
due to a DuplicateJobSubmissionException.")
+                
.hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class);
+
+        assertThat(
+                        jobResultStore.hasDirtyJobResultEntry(
+                                ApplicationDispatcherBootstrap.ZERO_JOB_ID))
+                .isTrue();
+        assertThat(
+                        jobResultStore.hasCleanJobResultEntry(
+                                ApplicationDispatcherBootstrap.ZERO_JOB_ID))
+                .isFalse();

Review comment:
       This test is here to check that no job was re-submitted in case of a 
dirty jobresult existing in the `JobResultStore`. The cleanup is integrated in 
[FLINK-25432](https://issues.apache.org/jira/browse/FLINK-25432). That's where 
we would have to adjust the test accordingly.
   
   We won't miss the change in `FLINK-25432` because the test would fail after 
integrating the cleanup because of the asserts on `hasDirtyJobResultEntry` and 
`hasCleanJobResultEntry`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to