XComp commented on code in PR #21137:
URL: https://github.com/apache/flink/pull/21137#discussion_r1011489421


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -46,45 +49,36 @@
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.function.ThrowingRunnable;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import javax.annotation.Nonnull;
 
-import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
-import static org.apache.flink.core.testutils.FlinkMatchers.willNotComplete;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link JobMasterServiceLeadershipRunner}. */
-public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   Usually, we would create a file `org.junit.jupiter.api.extension.Extension` 
in `src/test/resources/META-INF/services` containing 
`org.apache.flink.util.TestLoggerExtension`. This way, the extension applies to 
all JUnit5 tests in the module (e.g. see `flink-clients` module).



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java:
##########
@@ -20,124 +20,114 @@
 
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 
-import javax.annotation.Nullable;
-
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 /** Testing implementation of {@link JobMasterServiceProcess}. */
 public class TestingJobMasterServiceProcess implements JobMasterServiceProcess 
{
 
-    private final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture;
-
-    private final CompletableFuture<JobManagerRunnerResult> 
jobManagerRunnerResultFuture;
-
-    private final CompletableFuture<String> leaderAddressFuture;
-
-    private final boolean isInitialized;
-
-    private final CompletableFuture<Void> terminationFuture;
-
-    private final boolean manualTerminationFutureCompletion;
-
-    private TestingJobMasterServiceProcess(
-            CompletableFuture<JobMasterGateway> jobMasterGatewayFuture,
-            CompletableFuture<JobManagerRunnerResult> 
jobManagerRunnerResultFuture,
-            CompletableFuture<String> leaderAddressFuture,
-            boolean isInitialized,
-            CompletableFuture<Void> terminationFuture,
-            boolean manualTerminationFutureCompletion) {
-        this.jobMasterGatewayFuture = jobMasterGatewayFuture;
-        this.jobManagerRunnerResultFuture = jobManagerRunnerResultFuture;
-        this.leaderAddressFuture = leaderAddressFuture;
-        this.isInitialized = isInitialized;
-        this.terminationFuture = terminationFuture;
-        this.manualTerminationFutureCompletion = 
manualTerminationFutureCompletion;
+    private final Supplier<CompletableFuture<Void>> closeAsyncSupplier;
+    private final Supplier<Boolean> isInitializedAndRunningSupplier;
+    private final Supplier<CompletableFuture<JobMasterGateway>> 
getJobMasterGatewayFutureSupplier;
+    private final Supplier<CompletableFuture<JobManagerRunnerResult>> 
getResultFutureSupplier;
+    private final Supplier<CompletableFuture<String>> 
getLeaderAddressFutureSupplier;
+
+    public TestingJobMasterServiceProcess(
+            Supplier<CompletableFuture<Void>> closeAsyncSupplier,
+            Supplier<Boolean> isInitializedAndRunningSupplier,
+            Supplier<CompletableFuture<JobMasterGateway>> 
getJobMasterGatewayFutureSupplier,
+            Supplier<CompletableFuture<JobManagerRunnerResult>> 
getResultFutureSupplier,
+            Supplier<CompletableFuture<String>> 
getLeaderAddressFutureSupplier) {
+        this.closeAsyncSupplier = closeAsyncSupplier;
+        this.isInitializedAndRunningSupplier = isInitializedAndRunningSupplier;
+        this.getJobMasterGatewayFutureSupplier = 
getJobMasterGatewayFutureSupplier;
+        this.getResultFutureSupplier = getResultFutureSupplier;
+        this.getLeaderAddressFutureSupplier = getLeaderAddressFutureSupplier;
     }
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        if (!manualTerminationFutureCompletion) {
-            terminationFuture.complete(null);
-        }
-
-        return terminationFuture;
+        return closeAsyncSupplier.get();
     }
 
     @Override
     public boolean isInitializedAndRunning() {
-        return isInitialized && !terminationFuture.isDone();
+        return isInitializedAndRunningSupplier.get();
     }
 
     @Override
     public CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture() {
-        return jobMasterGatewayFuture;
+        return getJobMasterGatewayFutureSupplier.get();
     }
 
     @Override
     public CompletableFuture<JobManagerRunnerResult> getResultFuture() {
-        return jobManagerRunnerResultFuture;
+        return getResultFutureSupplier.get();
     }
 
     @Override
     public CompletableFuture<String> getLeaderAddressFuture() {
-        return leaderAddressFuture;
+        return getLeaderAddressFutureSupplier.get();
     }
 
     public static Builder newBuilder() {
         return new Builder();
     }
 
     public static final class Builder {
-        private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
-                CompletableFuture.completedFuture(new 
TestingJobMasterGatewayBuilder().build());
-        private CompletableFuture<JobManagerRunnerResult> 
jobManagerRunnerResultFuture =
-                new CompletableFuture<>();
-        private CompletableFuture<String> leaderAddressFuture =
-                CompletableFuture.completedFuture("foobar");
-        private boolean isInitialized = true;
-        @Nullable private CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
-        private boolean manualTerminationFutureCompletion = false;
-
-        public Builder setJobMasterGatewayFuture(
-                CompletableFuture<JobMasterGateway> jobMasterGatewayFuture) {
-            this.jobMasterGatewayFuture = jobMasterGatewayFuture;
-            return this;
+        private Supplier<CompletableFuture<Void>> closeAsyncSupplier = 
unsupportedOperation();
+        private Supplier<Boolean> isInitializedAndRunningSupplier = 
unsupportedOperation();
+        private Supplier<CompletableFuture<JobMasterGateway>> 
getJobMasterGatewayFutureSupplier =
+                () ->
+                        CompletableFuture.completedFuture(
+                                new TestingJobMasterGatewayBuilder().build());;

Review Comment:
   ```suggestion
                                   new 
TestingJobMasterGatewayBuilder().build());
   ```
   Checkstyle complaining here again



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -683,21 +686,148 @@ public void testJobAlreadyDone() throws Exception {
                     jobManagerRunner.getResultFuture();
 
             JobManagerRunnerResult result = resultFuture.get();
-            assertEquals(
-                    JobStatus.FAILED,
-                    
result.getExecutionGraphInfo().getArchivedExecutionGraph().getState());
+            
assertThat(result.getExecutionGraphInfo().getArchivedExecutionGraph().getState())
+                    .isEqualTo(JobStatus.FAILED);
         }
     }
 
+    @Test
+    void 
testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip()
+            throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
+                testingLeaderElectionDriverFactory =
+                        new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+        // we need to use DefaultLeaderElectionService here because 
JobMasterServiceLeadershipRunner
+        // in connection with the DefaultLeaderElectionService generates the 
nested locking
+        final LeaderElectionService defaultLeaderElectionService =
+                new 
DefaultLeaderElectionService(testingLeaderElectionDriverFactory);
+
+        // latch to detect when we reached the first synchronized section 
having a lock on the
+        // JobMasterServiceProcess#stop side
+        final OneShotLatch closeAsyncCalledTrigger = new OneShotLatch();
+        // latch to halt the JobMasterServiceProcess#stop before calling stop 
on the
+        // DefaultLeaderElectionService instance (and entering the 
LeaderElectionService's
+        // synchronized block)
+        final OneShotLatch triggerClassLoaderLeaseRelease = new OneShotLatch();
+
+        final JobMasterServiceProcess jobMasterServiceProcess =
+                TestingJobMasterServiceProcess.newBuilder()
+                        
.setGetJobMasterGatewayFutureSupplier(CompletableFuture::new)
+                        .setGetResultFutureSupplier(CompletableFuture::new)
+                        .setGetLeaderAddressFutureSupplier(
+                                () -> 
CompletableFuture.completedFuture("unused address"))
+                        .setCloseAsyncSupplier(
+                                () -> {
+                                    closeAsyncCalledTrigger.trigger();
+                                    // we have to return a completed future 
because we need the
+                                    // follow-up task to run in the calling 
thread to make the
+                                    // follow-up logic block be executed in 
the synchronized block

Review Comment:
   ```suggestion
                                       // follow-up code block being executed 
in the synchronized block
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -151,10 +155,12 @@ public CompletableFuture<Void> closeAsync() {
                 FutureUtils.forward(serviceTerminationFuture, 
terminationFuture);
 
                 terminationFuture.whenComplete(
-                        (unused, throwable) ->
-                                LOG.debug(
-                                        "Leadership runner for job {} has been 
terminated.",
-                                        getJobID()));
+                        (unused, throwable) -> {
+                            LOG.debug(
+                                    "Leadership runner for job {} has been 
terminated.",
+                                    getJobID());
+                            handleLeaderEventExecutor.shutdown();
+                        });

Review Comment:
   Could we make the resulting `CompletableFuture` of this call being returned 
by `closeAsync`. Shutting down the executor is part of the closing procedure 
and, therefore, should be reflected in the method's result.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java:
##########
@@ -20,124 +20,114 @@
 
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 
-import javax.annotation.Nullable;
-
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 /** Testing implementation of {@link JobMasterServiceProcess}. */
 public class TestingJobMasterServiceProcess implements JobMasterServiceProcess 
{
 
-    private final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture;
-
-    private final CompletableFuture<JobManagerRunnerResult> 
jobManagerRunnerResultFuture;
-
-    private final CompletableFuture<String> leaderAddressFuture;
-
-    private final boolean isInitialized;
-
-    private final CompletableFuture<Void> terminationFuture;
-
-    private final boolean manualTerminationFutureCompletion;
-
-    private TestingJobMasterServiceProcess(
-            CompletableFuture<JobMasterGateway> jobMasterGatewayFuture,
-            CompletableFuture<JobManagerRunnerResult> 
jobManagerRunnerResultFuture,
-            CompletableFuture<String> leaderAddressFuture,
-            boolean isInitialized,
-            CompletableFuture<Void> terminationFuture,
-            boolean manualTerminationFutureCompletion) {
-        this.jobMasterGatewayFuture = jobMasterGatewayFuture;
-        this.jobManagerRunnerResultFuture = jobManagerRunnerResultFuture;
-        this.leaderAddressFuture = leaderAddressFuture;
-        this.isInitialized = isInitialized;
-        this.terminationFuture = terminationFuture;
-        this.manualTerminationFutureCompletion = 
manualTerminationFutureCompletion;
+    private final Supplier<CompletableFuture<Void>> closeAsyncSupplier;
+    private final Supplier<Boolean> isInitializedAndRunningSupplier;
+    private final Supplier<CompletableFuture<JobMasterGateway>> 
getJobMasterGatewayFutureSupplier;
+    private final Supplier<CompletableFuture<JobManagerRunnerResult>> 
getResultFutureSupplier;
+    private final Supplier<CompletableFuture<String>> 
getLeaderAddressFutureSupplier;
+
+    public TestingJobMasterServiceProcess(

Review Comment:
   ```suggestion
       private TestingJobMasterServiceProcess(
   ```
   Intellij rightfully claimed that `private` is good enough here.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -93,48 +87,47 @@ public class JobMasterServiceLeadershipRunnerTest extends 
TestLogger {
 
     private JobResultStore jobResultStore;
 
-    @BeforeClass
-    public static void setupClass() {
+    @BeforeAll
+    static void setupClass() {
 
         final JobVertex jobVertex = new JobVertex("Test vertex");
         jobVertex.setInvokableClass(NoOpInvokable.class);
         jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
     }
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         leaderElectionService = new TestingLeaderElectionService();
         jobResultStore = new EmbeddedJobResultStore();
         fatalErrorHandler = new TestingFatalErrorHandler();
     }
 
-    @After
-    public void tearDown() throws Exception {
+    @AfterEach
+    void tearDown() throws Exception {
         fatalErrorHandler.rethrowError();
     }
 
     @Test
-    public void testShutDownSignalsJobAsNotFinished() throws Exception {
+    void testShutDownSignalsJobAsNotFinished() throws Exception {
         try (JobManagerRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder().build()) {
             jobManagerRunner.start();
 
             final CompletableFuture<JobManagerRunnerResult> resultFuture =
                     jobManagerRunner.getResultFuture();
 
-            assertThat(resultFuture.isDone(), is(false));
+            assertThat(resultFuture).isNotDone();
 
             jobManagerRunner.closeAsync();
 
             assertJobNotFinished(resultFuture);
-            assertThat(
-                    jobManagerRunner.getJobMasterGateway(),
-                    
FlinkMatchers.futureWillCompleteExceptionally(Duration.ofMillis(5L)));
+            assertThat(jobManagerRunner.getJobMasterGateway())
+                    .failsWithin(5L, TimeUnit.MILLISECONDS);

Review Comment:
   We agreed on not relying on timeouts in JUnit tests anymore to enable the 
thread dump and prevent test instabilities (see [Flink's Common Code 
Guidelines](https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-timeouts-in-junit-tests)).
 This also applies to the other test implementations if you want to migrate 
this test class as part of this PR.
   It would be also reasonable to not touch the JUnit5 migration here. I didn't 
check whether you actually did it to use some JUnit5 feature in that test. That 
might help focusing on the actual topic of the PR. ...considering that it's not 
that straight-forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to