XComp commented on code in PR #20990:
URL: https://github.com/apache/flink/pull/20990#discussion_r1296124024


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.fail;
+
+/**
+ * Base class for unit tests that run a single test with object reuse 
enabled/disabled and against
+ * collection environments.
+ *
+ * <p>To write a unit test against this test base, simply extend it and 
implement the {@link
+ * #testProgram()} method.
+ *
+ * <p>To skip the execution against collection environments you have to 
override {@link
+ * #skipCollectionExecution()}.
+ */
+public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 
{
+
+    private JobExecutionResult latestExecutionResult;
+
+    /**
+     * The number of times a test should be repeated.
+     *
+     * <p>This is useful for runtime changes, which affect resource 
management. Running certain
+     * tests repeatedly might help to discover resource leaks, race conditions 
etc.
+     */
+    private int numberOfTestRepetitions = 1;
+
+    private boolean isCollectionExecution;
+
+    public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
+        this.numberOfTestRepetitions = numberOfTestRepetitions;
+    }
+
+    public int getParallelism() {
+        return isCollectionExecution ? 1 : 
MINI_CLUSTER_EXTENSION.getNumberSlots();
+    }
+
+    public JobExecutionResult getLatestExecutionResult() {
+        return this.latestExecutionResult;
+    }
+
+    public boolean isCollectionExecution() {
+        return isCollectionExecution;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    //  Methods to create the test program and for pre- and post- test work
+    // 
--------------------------------------------------------------------------------------------
+
+    protected abstract void testProgram() throws Exception;
+
+    protected void preSubmit() throws Exception {}
+
+    protected void postSubmit() throws Exception {}
+
+    protected boolean skipCollectionExecution() {
+        return false;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    //  Test entry point
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testJobWithObjectReuse() {
+        isCollectionExecution = false;
+
+        // pre-submit
+        try {
+            preSubmit();
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            e.printStackTrace();
+            fail("Pre-submit work caused an error: " + e.getMessage());
+        }
+
+        // This only works because the underlying ExecutionEnvironment is a 
TestEnvironment
+        // We should fix that we are able to get access to the latest 
execution result from a
+        // different
+        // execution environment and how the object reuse mode is enabled
+        TestEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment();
+        env.getConfig().enableObjectReuse();
+
+        // Possibly run the test multiple times
+        executeProgramMultipleTimes(env);
+    }
+
+    private void executeProgramMultipleTimes(ExecutionEnvironment env) {
+        for (int i = 0; i < numberOfTestRepetitions; i++) {
+            // call the test program
+            try {
+                testProgram();
+                this.latestExecutionResult = env.getLastJobExecutionResult();
+            } catch (Exception e) {
+                System.err.println(e.getMessage());
+                e.printStackTrace();
+                fail("Error while calling the test program: " + 
e.getMessage());
+            }
+
+            assertThat(this.latestExecutionResult)
+                    .as("The test program never triggered an execution.")
+                    .isNotNull();
+        }
+
+        // post-submit
+        try {
+            postSubmit();
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            e.printStackTrace();
+            fail("Post-submit work caused an error: " + e.getMessage());
+        }

Review Comment:
   ```suggestion
         postSubmit();
   ```
   Same here: The try/catch is obsolete.



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.fail;
+
+/**
+ * Base class for unit tests that run a single test with object reuse 
enabled/disabled and against
+ * collection environments.
+ *
+ * <p>To write a unit test against this test base, simply extend it and 
implement the {@link
+ * #testProgram()} method.
+ *
+ * <p>To skip the execution against collection environments you have to 
override {@link
+ * #skipCollectionExecution()}.
+ */
+public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 
{
+
+    private JobExecutionResult latestExecutionResult;
+
+    /**
+     * The number of times a test should be repeated.
+     *
+     * <p>This is useful for runtime changes, which affect resource 
management. Running certain
+     * tests repeatedly might help to discover resource leaks, race conditions 
etc.
+     */
+    private int numberOfTestRepetitions = 1;
+
+    private boolean isCollectionExecution;
+
+    public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
+        this.numberOfTestRepetitions = numberOfTestRepetitions;
+    }
+
+    public int getParallelism() {
+        return isCollectionExecution ? 1 : 
MINI_CLUSTER_EXTENSION.getNumberSlots();
+    }
+
+    public JobExecutionResult getLatestExecutionResult() {
+        return this.latestExecutionResult;
+    }
+
+    public boolean isCollectionExecution() {
+        return isCollectionExecution;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    //  Methods to create the test program and for pre- and post- test work
+    // 
--------------------------------------------------------------------------------------------
+
+    protected abstract void testProgram() throws Exception;
+
+    protected void preSubmit() throws Exception {}
+
+    protected void postSubmit() throws Exception {}
+
+    protected boolean skipCollectionExecution() {
+        return false;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    //  Test entry point
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testJobWithObjectReuse() {
+        isCollectionExecution = false;
+
+        // pre-submit
+        try {
+            preSubmit();
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            e.printStackTrace();
+            fail("Pre-submit work caused an error: " + e.getMessage());

Review Comment:
   What's the purpose of printing the stacktrace and the message to stderr? 
Shouldn't it be enough to remove the try/catch block. The error will be 
forwarded to JUnit if it happens and the test will fail, anyway.



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.fail;
+
+/**
+ * Base class for unit tests that run a single test with object reuse 
enabled/disabled and against
+ * collection environments.
+ *
+ * <p>To write a unit test against this test base, simply extend it and 
implement the {@link
+ * #testProgram()} method.
+ *
+ * <p>To skip the execution against collection environments you have to 
override {@link
+ * #skipCollectionExecution()}.
+ */
+public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 
{
+
+    private JobExecutionResult latestExecutionResult;
+
+    /**
+     * The number of times a test should be repeated.
+     *
+     * <p>This is useful for runtime changes, which affect resource 
management. Running certain
+     * tests repeatedly might help to discover resource leaks, race conditions 
etc.
+     */
+    private int numberOfTestRepetitions = 1;
+
+    private boolean isCollectionExecution;
+
+    public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
+        this.numberOfTestRepetitions = numberOfTestRepetitions;
+    }
+
+    public int getParallelism() {
+        return isCollectionExecution ? 1 : 
MINI_CLUSTER_EXTENSION.getNumberSlots();
+    }
+
+    public JobExecutionResult getLatestExecutionResult() {
+        return this.latestExecutionResult;
+    }
+
+    public boolean isCollectionExecution() {
+        return isCollectionExecution;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    //  Methods to create the test program and for pre- and post- test work
+    // 
--------------------------------------------------------------------------------------------
+
+    protected abstract void testProgram() throws Exception;
+
+    protected void preSubmit() throws Exception {}
+
+    protected void postSubmit() throws Exception {}
+
+    protected boolean skipCollectionExecution() {
+        return false;
+    }

Review Comment:
   What's the purpose of this method? It's never used as far as I can see. 
Looks like it's coming from the old implementation where it is overwritten in 
two deriving subclasses. :thinking:
   
   I'm not sure whether we can get rid of this class entirely. But it feels 
like it's an overly complex setup for parameterized tests. Could we look into 
whether we can do it more the JUnit5 way?



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit5.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Base class for unit tests that run multiple tests and want to reuse the 
same Flink cluster. This
+ * saves a significant amount of time, since the startup and shutdown of the 
Flink clusters
+ * (including actor systems, etc) usually dominates the execution of the 
actual tests.
+ *
+ * <p>To write a unit test against this test base, simply extend it and add 
one or more regular test
+ * methods and retrieve the StreamExecutionEnvironment from the context:
+ *
+ * <pre>
+ *   {@literal @}Test
+ *   public void someTest() {
+ *       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   {@literal @}Test
+ *   public void anotherTest() {
+ *       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * </pre>
+ */
+public abstract class AbstractTestBaseJUnit5 {

Review Comment:
   I guess, @1996fanrui proposal sounds reasonable. But looking at the newly 
added classes:
   * `AbstractTestBase(Junit5)`: This class contains a `@AfterEach` method for 
cancelling any still running jobs. This should be done in the 
`MiniClusterExtension`, instead. We don't need an extra class for it. 
Additionally, it provides helper methods for temporary folders. JUnit5 has the 
[@TempDir](https://www.baeldung.com/junit-5-temporary-directory) support. You 
can inject a temporary directory through the test methods parameters or as a 
class member. That should make `AbstractTestBaseJUnit5` obsolete. WDYT?
   * `MultipleProgramsTestBaseJUnit5` looks like it can be transformed into an 
extension as well.
   
   I added a separate comment for `JavaProgramTestBaseJUnit5` further in the 
class itself.
   
   I didn't do a thorough review because two @1996fanrui and @Samrat002 
volunteered already.



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.fail;
+
+/**
+ * Base class for unit tests that run a single test with object reuse 
enabled/disabled and against
+ * collection environments.
+ *
+ * <p>To write a unit test against this test base, simply extend it and 
implement the {@link
+ * #testProgram()} method.
+ *
+ * <p>To skip the execution against collection environments you have to 
override {@link
+ * #skipCollectionExecution()}.
+ */
+public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 
{
+
+    private JobExecutionResult latestExecutionResult;
+
+    /**
+     * The number of times a test should be repeated.
+     *
+     * <p>This is useful for runtime changes, which affect resource 
management. Running certain
+     * tests repeatedly might help to discover resource leaks, race conditions 
etc.
+     */
+    private int numberOfTestRepetitions = 1;
+
+    private boolean isCollectionExecution;
+
+    public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
+        this.numberOfTestRepetitions = numberOfTestRepetitions;
+    }
+
+    public int getParallelism() {
+        return isCollectionExecution ? 1 : 
MINI_CLUSTER_EXTENSION.getNumberSlots();
+    }
+
+    public JobExecutionResult getLatestExecutionResult() {
+        return this.latestExecutionResult;
+    }
+
+    public boolean isCollectionExecution() {
+        return isCollectionExecution;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    //  Methods to create the test program and for pre- and post- test work
+    // 
--------------------------------------------------------------------------------------------
+
+    protected abstract void testProgram() throws Exception;
+
+    protected void preSubmit() throws Exception {}
+
+    protected void postSubmit() throws Exception {}
+
+    protected boolean skipCollectionExecution() {
+        return false;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    //  Test entry point
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testJobWithObjectReuse() {
+        isCollectionExecution = false;
+
+        // pre-submit
+        try {
+            preSubmit();
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            e.printStackTrace();
+            fail("Pre-submit work caused an error: " + e.getMessage());
+        }
+
+        // This only works because the underlying ExecutionEnvironment is a 
TestEnvironment
+        // We should fix that we are able to get access to the latest 
execution result from a
+        // different
+        // execution environment and how the object reuse mode is enabled
+        TestEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment();
+        env.getConfig().enableObjectReuse();
+
+        // Possibly run the test multiple times
+        executeProgramMultipleTimes(env);
+    }
+
+    private void executeProgramMultipleTimes(ExecutionEnvironment env) {
+        for (int i = 0; i < numberOfTestRepetitions; i++) {
+            // call the test program
+            try {
+                testProgram();
+                this.latestExecutionResult = env.getLastJobExecutionResult();
+            } catch (Exception e) {
+                System.err.println(e.getMessage());
+                e.printStackTrace();
+                fail("Error while calling the test program: " + 
e.getMessage());
+            }
+
+            assertThat(this.latestExecutionResult)
+                    .as("The test program never triggered an execution.")
+                    .isNotNull();
+        }
+
+        // post-submit
+        try {
+            postSubmit();
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            e.printStackTrace();
+            fail("Post-submit work caused an error: " + e.getMessage());
+        }

Review Comment:
   This applies to other test implementations as well. I won't mark each 
occurrence...



##########
flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplitTest.java:
##########
@@ -62,45 +63,45 @@ private void testInner(
         serializeSizeChecker.accept(bytes.length);
 
         split = InstantiationUtil.deserializeObject(bytes, 
split.getClass().getClassLoader());
-        Assert.assertEquals(5, split.getSplitNumber());
-        Assert.assertArrayEquals(new String[] {"host0"}, split.getHostnames());
+        assertThat(split.getSplitNumber()).isEqualTo(5);
+        assertThat(split.getHostnames()).containsExactly("host0");
         splitChecker.accept(split.getHadoopInputSplit());
     }
 
     @Test
-    public void testFileSplit() throws IOException, ClassNotFoundException {
+    void testFileSplit() throws IOException, ClassNotFoundException {
         FileSplit fileSplit = new FileSplit(new Path("/test"), 0, 100, new 
String[] {"host0"});
         testInner(
                 fileSplit,
-                i -> Assert.assertTrue(i < 10000),
-                split -> Assert.assertEquals(fileSplit, split));
+                i -> assertThat(i < 10000).isTrue(),

Review Comment:
   ```suggestion
                   i -> assertThat(i).isLessThan(10000);
   ```
   nit: just a fyi. your's is fine as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to