XComp commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1541160078
########## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ########## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; -/** - * End-to-end test for batch SQL queries. - * - * <p>The sources are generated and bounded. The result is always constant. - * - * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that - * will be executed as executeSql - */ -public class BatchSQLTestProgram { +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@ExtendWith(TestLoggerExtension.class) +public class BatchSQLTest { Review Comment: ```suggestion class BatchSQLTest { ``` Junit5 allows for test classes to be package-protected. This will enable you to remove the JavaDoc and still comply to checkstyle. The JavaDoc itself doesn't add much value. ########## flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/log4j2-test.properties: ########## @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=OFF +rootLogger.appenderRef.test.ref=TestLogger +appender.testlogger.name=TestLogger +appender.testlogger.type=CONSOLE +appender.testlogger.target=SYSTEM_ERR +appender.testlogger.layout.type=PatternLayout +appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n +# Uncomment to enable codegen logging +#loggers = testlogger +#logger.testlogger.name =org.apache.flink.table.planner.codegen +#logger.testlogger.level = TRACE +#logger.testlogger.appenderRefs = TestLogger Review Comment: ```suggestion # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes rootLogger.level=OFF rootLogger.appenderRef.test.ref=TestLogger appender.testlogger.name=TestLogger appender.testlogger.type=CONSOLE appender.testlogger.target=SYSTEM_ERR appender.testlogger.layout.type=PatternLayout appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n # Uncomment to enable codegen logging #loggers = testlogger #logger.testlogger.name =org.apache.flink.table.planner.codegen #logger.testlogger.level = TRACE #logger.testlogger.appenderRefs = TestLogger ``` Then let's add some empty lines in between ########## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ########## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; -/** - * End-to-end test for batch SQL queries. - * - * <p>The sources are generated and bounded. The result is always constant. - * - * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that - * will be executed as executeSql - */ -public class BatchSQLTestProgram { +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@ExtendWith(TestLoggerExtension.class) +public class BatchSQLTest { + private static final Logger LOG = LoggerFactory.getLogger(BatchSQLTest.class); + + private static final Path sqlPath = + ResourceTestUtils.getResource("resources/sql-job-query.sql"); + + @TempDir private Path tmp; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(1) + .build()); + + private Path result; + + @BeforeEach + public void before() { + this.result = tmp.resolve(String.format("result-%s", UUID.randomUUID())); + LOG.info("Results for this test will be stored at: {}", this.result); + } + + @ParameterizedTest + @EnumSource( + value = BatchShuffleMode.class, + names = { + "ALL_EXCHANGES_BLOCKING", + "ALL_EXCHANGES_HYBRID_FULL", + "ALL_EXCHANGES_HYBRID_SELECTIVE" + }) + public void testBatchSQL(BatchShuffleMode shuffleMode) throws Exception { + String sqlStatement = new String(Files.readAllBytes(sqlPath)); - public static void main(String[] args) throws Exception { - ParameterTool params = ParameterTool.fromArgs(args); - String outputPath = params.getRequired("outputPath"); - String sqlStatement = params.getRequired("sqlStatement"); - String shuffleType = params.getRequired("shuffleType"); TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); - BatchShuffleMode shuffleMode = checkAndGetShuffleMode(shuffleType); tEnv.getConfig().set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode); + ((TableEnvironmentInternal) tEnv) .registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0)); ((TableEnvironmentInternal) tEnv) .registerTableSourceInternal("table2", new GeneratorTableSource(5, 0.2f, 60, 5)); ((TableEnvironmentInternal) tEnv) .registerTableSinkInternal( "sinkTable", - new CsvTableSink(outputPath) + new CsvTableSink(this.result.toString()) Review Comment: Would it be easy to switch from `CsvTableSink` to `FileSink` here? :thinking: ...to remove the deprecation warning. Same applies to the `InputFormatTableSource` further down in the test code. ########## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ########## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; -/** - * End-to-end test for batch SQL queries. - * - * <p>The sources are generated and bounded. The result is always constant. - * - * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that - * will be executed as executeSql - */ -public class BatchSQLTestProgram { +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@ExtendWith(TestLoggerExtension.class) +public class BatchSQLTest { + private static final Logger LOG = LoggerFactory.getLogger(BatchSQLTest.class); + + private static final Path sqlPath = + ResourceTestUtils.getResource("resources/sql-job-query.sql"); + + @TempDir private Path tmp; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(1) + .build()); + + private Path result; + + @BeforeEach + public void before() { + this.result = tmp.resolve(String.format("result-%s", UUID.randomUUID())); + LOG.info("Results for this test will be stored at: {}", this.result); + } + + @ParameterizedTest + @EnumSource( + value = BatchShuffleMode.class, + names = { + "ALL_EXCHANGES_BLOCKING", + "ALL_EXCHANGES_HYBRID_FULL", + "ALL_EXCHANGES_HYBRID_SELECTIVE" + }) + public void testBatchSQL(BatchShuffleMode shuffleMode) throws Exception { Review Comment: ```suggestion public void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) throws Exception { final Path outputFolder = tmpDir.resolve(String.format("result-%s", UUID.randomUUID())); ``` nit: We could also add the tmpDir as a parameter here. No need to have the two instance fields `result` and `tmp` since we only have a single test method. ########## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ########## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; -/** - * End-to-end test for batch SQL queries. - * - * <p>The sources are generated and bounded. The result is always constant. - * - * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that - * will be executed as executeSql - */ -public class BatchSQLTestProgram { +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@ExtendWith(TestLoggerExtension.class) +public class BatchSQLTest { + private static final Logger LOG = LoggerFactory.getLogger(BatchSQLTest.class); + + private static final Path sqlPath = + ResourceTestUtils.getResource("resources/sql-job-query.sql"); + + @TempDir private Path tmp; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(1) + .build()); + + private Path result; + + @BeforeEach + public void before() { + this.result = tmp.resolve(String.format("result-%s", UUID.randomUUID())); + LOG.info("Results for this test will be stored at: {}", this.result); + } + + @ParameterizedTest + @EnumSource( + value = BatchShuffleMode.class, + names = { + "ALL_EXCHANGES_BLOCKING", + "ALL_EXCHANGES_HYBRID_FULL", + "ALL_EXCHANGES_HYBRID_SELECTIVE" + }) + public void testBatchSQL(BatchShuffleMode shuffleMode) throws Exception { + String sqlStatement = new String(Files.readAllBytes(sqlPath)); - public static void main(String[] args) throws Exception { - ParameterTool params = ParameterTool.fromArgs(args); - String outputPath = params.getRequired("outputPath"); - String sqlStatement = params.getRequired("sqlStatement"); - String shuffleType = params.getRequired("shuffleType"); TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); - BatchShuffleMode shuffleMode = checkAndGetShuffleMode(shuffleType); tEnv.getConfig().set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode); + ((TableEnvironmentInternal) tEnv) .registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0)); ((TableEnvironmentInternal) tEnv) .registerTableSourceInternal("table2", new GeneratorTableSource(5, 0.2f, 60, 5)); ((TableEnvironmentInternal) tEnv) .registerTableSinkInternal( "sinkTable", - new CsvTableSink(outputPath) + new CsvTableSink(this.result.toString()) Review Comment: this should happen in a separate commit, though. ########## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ########## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; -/** - * End-to-end test for batch SQL queries. - * - * <p>The sources are generated and bounded. The result is always constant. - * - * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that - * will be executed as executeSql - */ -public class BatchSQLTestProgram { +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@ExtendWith(TestLoggerExtension.class) +public class BatchSQLTest { + private static final Logger LOG = LoggerFactory.getLogger(BatchSQLTest.class); + + private static final Path sqlPath = + ResourceTestUtils.getResource("resources/sql-job-query.sql"); + + @TempDir private Path tmp; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(1) + .build()); + + private Path result; + + @BeforeEach + public void before() { Review Comment: ```suggestion void before() { ``` nit: test methods can be package-private in JUnit5. but that's more like an fyi because it doesn't really matter here. ########## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ########## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; -/** - * End-to-end test for batch SQL queries. - * - * <p>The sources are generated and bounded. The result is always constant. - * - * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that - * will be executed as executeSql - */ -public class BatchSQLTestProgram { +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@ExtendWith(TestLoggerExtension.class) +public class BatchSQLTest { + private static final Logger LOG = LoggerFactory.getLogger(BatchSQLTest.class); + + private static final Path sqlPath = + ResourceTestUtils.getResource("resources/sql-job-query.sql"); + + @TempDir private Path tmp; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(1) + .build()); + + private Path result; + + @BeforeEach + public void before() { + this.result = tmp.resolve(String.format("result-%s", UUID.randomUUID())); + LOG.info("Results for this test will be stored at: {}", this.result); + } + + @ParameterizedTest + @EnumSource( + value = BatchShuffleMode.class, + names = { + "ALL_EXCHANGES_BLOCKING", + "ALL_EXCHANGES_HYBRID_FULL", + "ALL_EXCHANGES_HYBRID_SELECTIVE" + }) + public void testBatchSQL(BatchShuffleMode shuffleMode) throws Exception { + String sqlStatement = new String(Files.readAllBytes(sqlPath)); - public static void main(String[] args) throws Exception { - ParameterTool params = ParameterTool.fromArgs(args); - String outputPath = params.getRequired("outputPath"); - String sqlStatement = params.getRequired("sqlStatement"); - String shuffleType = params.getRequired("shuffleType"); TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); - BatchShuffleMode shuffleMode = checkAndGetShuffleMode(shuffleType); tEnv.getConfig().set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode); + ((TableEnvironmentInternal) tEnv) .registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0)); ((TableEnvironmentInternal) tEnv) .registerTableSourceInternal("table2", new GeneratorTableSource(5, 0.2f, 60, 5)); ((TableEnvironmentInternal) tEnv) .registerTableSinkInternal( "sinkTable", - new CsvTableSink(outputPath) + new CsvTableSink(this.result.toString()) .configure( new String[] {"f0", "f1"}, new TypeInformation[] {Types.INT, Types.SQL_TIMESTAMP})); + LOG.info("Submitting job"); TableResult result = tEnv.executeSql(sqlStatement); - // wait job finish - result.getJobClient().get().getJobExecutionResult().get(); - } - private static BatchShuffleMode checkAndGetShuffleMode(String shuffleType) { - BatchShuffleMode shuffleMode; - switch (shuffleType.toLowerCase()) { - case "blocking": - shuffleMode = BatchShuffleMode.ALL_EXCHANGES_BLOCKING; - break; - case "hybrid_full": - shuffleMode = BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL; - break; - case "hybrid_selective": - shuffleMode = BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE; - break; - default: - throw new IllegalArgumentException("unsupported shuffle type : " + shuffleType); + // Wait for the job to finish. + Optional<JobClient> jobClient = result.getJobClient(); + if (!jobClient.isPresent()) { + throw new IllegalStateException("Job client is not present"); } - return shuffleMode; + jobClient.get().getJobExecutionResult().get(); Review Comment: ```java JobClient jobClient = result.getJobClient() .orElseThrow(() -> new IllegalStateException("Job client is not present")); jobClient.getJobExecutionResult().get(); ``` nit: use the utilize the Optional's API ########## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ########## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; -/** - * End-to-end test for batch SQL queries. - * - * <p>The sources are generated and bounded. The result is always constant. - * - * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that - * will be executed as executeSql - */ -public class BatchSQLTestProgram { +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@ExtendWith(TestLoggerExtension.class) +public class BatchSQLTest { + private static final Logger LOG = LoggerFactory.getLogger(BatchSQLTest.class); + + private static final Path sqlPath = + ResourceTestUtils.getResource("resources/sql-job-query.sql"); + + @TempDir private Path tmp; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(1) + .build()); + + private Path result; + + @BeforeEach + public void before() { + this.result = tmp.resolve(String.format("result-%s", UUID.randomUUID())); + LOG.info("Results for this test will be stored at: {}", this.result); + } + + @ParameterizedTest + @EnumSource( + value = BatchShuffleMode.class, + names = { + "ALL_EXCHANGES_BLOCKING", Review Comment: ```suggestion "ALL_EXCHANGES_PIPELINED", "ALL_EXCHANGES_BLOCKING", ``` Does it make sense to add the pipelined mode as well? (just thinking out loud, I don't have much knowledge of this part of the code). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org