XComp commented on a change in pull request #18189: URL: https://github.com/apache/flink/pull/18189#discussion_r792392290
########## File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java ########## @@ -147,6 +157,67 @@ public void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc } } + @Test + public void testDirtyJobResultRecoveryInApplicationMode() throws Exception { + final Deadline deadline = Deadline.fromNow(TIMEOUT); + final Configuration configuration = new Configuration(); + configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); + configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME); + configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, Duration.ofMillis(100)); + final TestingMiniClusterConfiguration clusterConfiguration = + TestingMiniClusterConfiguration.newBuilder() + .setConfiguration(configuration) + .build(); + + // having a dirty entry in the JobResultStore should make the ApplicationDispatcherBootstrap + // implementation fail to submit the job + final JobResultStore jobResultStore = new EmbeddedJobResultStore(); + jobResultStore.createDirtyResult( + new JobResultEntry( + new JobResult.Builder() + .jobId(ApplicationDispatcherBootstrap.ZERO_JOB_ID) + .applicationStatus(ApplicationStatus.SUCCEEDED) + .netRuntime(1) + .build())); + final EmbeddedHaServicesWithLeadershipControl haServices = + new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) { + + @Override + public JobResultStore getJobResultStore() { + return jobResultStore; + } + }; + + final TestingMiniCluster.Builder clusterBuilder = + TestingMiniCluster.newBuilder(clusterConfiguration) + .setHighAvailabilityServicesSupplier(() -> haServices) + .setDispatcherResourceManagerComponentFactorySupplier( + createApplicationModeDispatcherResourceManagerComponentFactorySupplier( + clusterConfiguration.getConfiguration(), + ErrorHandlingSubmissionJob.createPackagedProgram())); + try (final MiniCluster cluster = clusterBuilder.build()) { + // start mini cluster and submit the job + cluster.start(); + + // the cluster should shut down automatically once the application completes + awaitClusterStopped(cluster, deadline); + } + + FlinkAssertions.assertThatChainOfCauses(ErrorHandlingSubmissionJob.getSubmissionException()) + .as( + "The job's main method shouldn't have been succeeded due to a DuplicateJobSubmissionException.") + .hasAtLeastOneElementOfType(DuplicateJobSubmissionException.class); + + assertThat( + jobResultStore.hasDirtyJobResultEntry( + ApplicationDispatcherBootstrap.ZERO_JOB_ID)) + .isTrue(); + assertThat( + jobResultStore.hasCleanJobResultEntry( + ApplicationDispatcherBootstrap.ZERO_JOB_ID)) + .isFalse(); Review comment: This test is here to check that no job was re-submitted in case of a dirty jobresult existing in the `JobResultStore`. The cleanup is integrated in [FLINK-25432](https://issues.apache.org/jira/browse/FLINK-25432). That's where we would have to adjust the test accordingly. We won't miss the change in `FLINK-25432` because the test would fail after integrating the cleanup because of the asserts on `hasDirtyJobResultEntry` and `hasCleanJobResultEntry` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org