XComp commented on code in PR #20990: URL: https://github.com/apache/flink/pull/20990#discussion_r1296124024
########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; + +/** + * Base class for unit tests that run a single test with object reuse enabled/disabled and against + * collection environments. + * + * <p>To write a unit test against this test base, simply extend it and implement the {@link + * #testProgram()} method. + * + * <p>To skip the execution against collection environments you have to override {@link + * #skipCollectionExecution()}. + */ +public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 { + + private JobExecutionResult latestExecutionResult; + + /** + * The number of times a test should be repeated. + * + * <p>This is useful for runtime changes, which affect resource management. Running certain + * tests repeatedly might help to discover resource leaks, race conditions etc. + */ + private int numberOfTestRepetitions = 1; + + private boolean isCollectionExecution; + + public void setNumberOfTestRepetitions(int numberOfTestRepetitions) { + this.numberOfTestRepetitions = numberOfTestRepetitions; + } + + public int getParallelism() { + return isCollectionExecution ? 1 : MINI_CLUSTER_EXTENSION.getNumberSlots(); + } + + public JobExecutionResult getLatestExecutionResult() { + return this.latestExecutionResult; + } + + public boolean isCollectionExecution() { + return isCollectionExecution; + } + + // -------------------------------------------------------------------------------------------- + // Methods to create the test program and for pre- and post- test work + // -------------------------------------------------------------------------------------------- + + protected abstract void testProgram() throws Exception; + + protected void preSubmit() throws Exception {} + + protected void postSubmit() throws Exception {} + + protected boolean skipCollectionExecution() { + return false; + } + + // -------------------------------------------------------------------------------------------- + // Test entry point + // -------------------------------------------------------------------------------------------- + + @Test + public void testJobWithObjectReuse() { + isCollectionExecution = false; + + // pre-submit + try { + preSubmit(); + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Pre-submit work caused an error: " + e.getMessage()); + } + + // This only works because the underlying ExecutionEnvironment is a TestEnvironment + // We should fix that we are able to get access to the latest execution result from a + // different + // execution environment and how the object reuse mode is enabled + TestEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment(); + env.getConfig().enableObjectReuse(); + + // Possibly run the test multiple times + executeProgramMultipleTimes(env); + } + + private void executeProgramMultipleTimes(ExecutionEnvironment env) { + for (int i = 0; i < numberOfTestRepetitions; i++) { + // call the test program + try { + testProgram(); + this.latestExecutionResult = env.getLastJobExecutionResult(); + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Error while calling the test program: " + e.getMessage()); + } + + assertThat(this.latestExecutionResult) + .as("The test program never triggered an execution.") + .isNotNull(); + } + + // post-submit + try { + postSubmit(); + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Post-submit work caused an error: " + e.getMessage()); + } Review Comment: ```suggestion postSubmit(); ``` Same here: The try/catch is obsolete. ########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; + +/** + * Base class for unit tests that run a single test with object reuse enabled/disabled and against + * collection environments. + * + * <p>To write a unit test against this test base, simply extend it and implement the {@link + * #testProgram()} method. + * + * <p>To skip the execution against collection environments you have to override {@link + * #skipCollectionExecution()}. + */ +public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 { + + private JobExecutionResult latestExecutionResult; + + /** + * The number of times a test should be repeated. + * + * <p>This is useful for runtime changes, which affect resource management. Running certain + * tests repeatedly might help to discover resource leaks, race conditions etc. + */ + private int numberOfTestRepetitions = 1; + + private boolean isCollectionExecution; + + public void setNumberOfTestRepetitions(int numberOfTestRepetitions) { + this.numberOfTestRepetitions = numberOfTestRepetitions; + } + + public int getParallelism() { + return isCollectionExecution ? 1 : MINI_CLUSTER_EXTENSION.getNumberSlots(); + } + + public JobExecutionResult getLatestExecutionResult() { + return this.latestExecutionResult; + } + + public boolean isCollectionExecution() { + return isCollectionExecution; + } + + // -------------------------------------------------------------------------------------------- + // Methods to create the test program and for pre- and post- test work + // -------------------------------------------------------------------------------------------- + + protected abstract void testProgram() throws Exception; + + protected void preSubmit() throws Exception {} + + protected void postSubmit() throws Exception {} + + protected boolean skipCollectionExecution() { + return false; + } + + // -------------------------------------------------------------------------------------------- + // Test entry point + // -------------------------------------------------------------------------------------------- + + @Test + public void testJobWithObjectReuse() { + isCollectionExecution = false; + + // pre-submit + try { + preSubmit(); + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Pre-submit work caused an error: " + e.getMessage()); Review Comment: What's the purpose of printing the stacktrace and the message to stderr? Shouldn't it be enough to remove the try/catch block. The error will be forwarded to JUnit if it happens and the test will fail, anyway. ########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; + +/** + * Base class for unit tests that run a single test with object reuse enabled/disabled and against + * collection environments. + * + * <p>To write a unit test against this test base, simply extend it and implement the {@link + * #testProgram()} method. + * + * <p>To skip the execution against collection environments you have to override {@link + * #skipCollectionExecution()}. + */ +public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 { + + private JobExecutionResult latestExecutionResult; + + /** + * The number of times a test should be repeated. + * + * <p>This is useful for runtime changes, which affect resource management. Running certain + * tests repeatedly might help to discover resource leaks, race conditions etc. + */ + private int numberOfTestRepetitions = 1; + + private boolean isCollectionExecution; + + public void setNumberOfTestRepetitions(int numberOfTestRepetitions) { + this.numberOfTestRepetitions = numberOfTestRepetitions; + } + + public int getParallelism() { + return isCollectionExecution ? 1 : MINI_CLUSTER_EXTENSION.getNumberSlots(); + } + + public JobExecutionResult getLatestExecutionResult() { + return this.latestExecutionResult; + } + + public boolean isCollectionExecution() { + return isCollectionExecution; + } + + // -------------------------------------------------------------------------------------------- + // Methods to create the test program and for pre- and post- test work + // -------------------------------------------------------------------------------------------- + + protected abstract void testProgram() throws Exception; + + protected void preSubmit() throws Exception {} + + protected void postSubmit() throws Exception {} + + protected boolean skipCollectionExecution() { + return false; + } Review Comment: What's the purpose of this method? It's never used as far as I can see. Looks like it's coming from the old implementation where it is overwritten in two deriving subclasses. :thinking: I'm not sure whether we can get rid of this class entirely. But it feels like it's an overly complex setup for parameterized tests. Could we look into whether we can do it more the JUnit5 way? ########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit5.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.FileUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * Base class for unit tests that run multiple tests and want to reuse the same Flink cluster. This + * saves a significant amount of time, since the startup and shutdown of the Flink clusters + * (including actor systems, etc) usually dominates the execution of the actual tests. + * + * <p>To write a unit test against this test base, simply extend it and add one or more regular test + * methods and retrieve the StreamExecutionEnvironment from the context: + * + * <pre> + * {@literal @}Test + * public void someTest() { + * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * {@literal @}Test + * public void anotherTest() { + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * </pre> + */ +public abstract class AbstractTestBaseJUnit5 { Review Comment: I guess, @1996fanrui proposal sounds reasonable. But looking at the newly added classes: * `AbstractTestBase(Junit5)`: This class contains a `@AfterEach` method for cancelling any still running jobs. This should be done in the `MiniClusterExtension`, instead. We don't need an extra class for it. Additionally, it provides helper methods for temporary folders. JUnit5 has the [@TempDir](https://www.baeldung.com/junit-5-temporary-directory) support. You can inject a temporary directory through the test methods parameters or as a class member. That should make `AbstractTestBaseJUnit5` obsolete. WDYT? * `MultipleProgramsTestBaseJUnit5` looks like it can be transformed into an extension as well. I added a separate comment for `JavaProgramTestBaseJUnit5` further in the class itself. I didn't do a thorough review because two @1996fanrui and @Samrat002 volunteered already. ########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; + +/** + * Base class for unit tests that run a single test with object reuse enabled/disabled and against + * collection environments. + * + * <p>To write a unit test against this test base, simply extend it and implement the {@link + * #testProgram()} method. + * + * <p>To skip the execution against collection environments you have to override {@link + * #skipCollectionExecution()}. + */ +public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 { + + private JobExecutionResult latestExecutionResult; + + /** + * The number of times a test should be repeated. + * + * <p>This is useful for runtime changes, which affect resource management. Running certain + * tests repeatedly might help to discover resource leaks, race conditions etc. + */ + private int numberOfTestRepetitions = 1; + + private boolean isCollectionExecution; + + public void setNumberOfTestRepetitions(int numberOfTestRepetitions) { + this.numberOfTestRepetitions = numberOfTestRepetitions; + } + + public int getParallelism() { + return isCollectionExecution ? 1 : MINI_CLUSTER_EXTENSION.getNumberSlots(); + } + + public JobExecutionResult getLatestExecutionResult() { + return this.latestExecutionResult; + } + + public boolean isCollectionExecution() { + return isCollectionExecution; + } + + // -------------------------------------------------------------------------------------------- + // Methods to create the test program and for pre- and post- test work + // -------------------------------------------------------------------------------------------- + + protected abstract void testProgram() throws Exception; + + protected void preSubmit() throws Exception {} + + protected void postSubmit() throws Exception {} + + protected boolean skipCollectionExecution() { + return false; + } + + // -------------------------------------------------------------------------------------------- + // Test entry point + // -------------------------------------------------------------------------------------------- + + @Test + public void testJobWithObjectReuse() { + isCollectionExecution = false; + + // pre-submit + try { + preSubmit(); + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Pre-submit work caused an error: " + e.getMessage()); + } + + // This only works because the underlying ExecutionEnvironment is a TestEnvironment + // We should fix that we are able to get access to the latest execution result from a + // different + // execution environment and how the object reuse mode is enabled + TestEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment(); + env.getConfig().enableObjectReuse(); + + // Possibly run the test multiple times + executeProgramMultipleTimes(env); + } + + private void executeProgramMultipleTimes(ExecutionEnvironment env) { + for (int i = 0; i < numberOfTestRepetitions; i++) { + // call the test program + try { + testProgram(); + this.latestExecutionResult = env.getLastJobExecutionResult(); + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Error while calling the test program: " + e.getMessage()); + } + + assertThat(this.latestExecutionResult) + .as("The test program never triggered an execution.") + .isNotNull(); + } + + // post-submit + try { + postSubmit(); + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Post-submit work caused an error: " + e.getMessage()); + } Review Comment: This applies to other test implementations as well. I won't mark each occurrence... ########## flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplitTest.java: ########## @@ -62,45 +63,45 @@ private void testInner( serializeSizeChecker.accept(bytes.length); split = InstantiationUtil.deserializeObject(bytes, split.getClass().getClassLoader()); - Assert.assertEquals(5, split.getSplitNumber()); - Assert.assertArrayEquals(new String[] {"host0"}, split.getHostnames()); + assertThat(split.getSplitNumber()).isEqualTo(5); + assertThat(split.getHostnames()).containsExactly("host0"); splitChecker.accept(split.getHadoopInputSplit()); } @Test - public void testFileSplit() throws IOException, ClassNotFoundException { + void testFileSplit() throws IOException, ClassNotFoundException { FileSplit fileSplit = new FileSplit(new Path("/test"), 0, 100, new String[] {"host0"}); testInner( fileSplit, - i -> Assert.assertTrue(i < 10000), - split -> Assert.assertEquals(fileSplit, split)); + i -> assertThat(i < 10000).isTrue(), Review Comment: ```suggestion i -> assertThat(i).isLessThan(10000); ``` nit: just a fyi. your's is fine as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org