XComp commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1541160078


##########
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##########
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
 
-/**
- * End-to-end test for batch SQL queries.
- *
- * <p>The sources are generated and bounded. The result is always constant.
- *
- * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement 
SQL statement that
- * will be executed as executeSql
- */
-public class BatchSQLTestProgram {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@ExtendWith(TestLoggerExtension.class)
+public class BatchSQLTest {

Review Comment:
   ```suggestion
   class BatchSQLTest {
   ```
   Junit5 allows for test classes to be package-protected. This will enable you 
to remove the JavaDoc and still comply to checkstyle. The JavaDoc itself 
doesn't add much value.



##########
flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/log4j2-test.properties:
##########
@@ -0,0 +1,31 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
+# Uncomment to enable codegen logging
+#loggers = testlogger
+#logger.testlogger.name =org.apache.flink.table.planner.codegen
+#logger.testlogger.level = TRACE
+#logger.testlogger.appenderRefs = TestLogger

Review Comment:
   ```suggestion
   
   # Set root logger level to OFF to not flood build logs
   # set manually to INFO for debugging purposes
   rootLogger.level=OFF
   rootLogger.appenderRef.test.ref=TestLogger
   
   appender.testlogger.name=TestLogger
   appender.testlogger.type=CONSOLE
   appender.testlogger.target=SYSTEM_ERR
   appender.testlogger.layout.type=PatternLayout
   appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
   
   # Uncomment to enable codegen logging
   #loggers = testlogger
   #logger.testlogger.name =org.apache.flink.table.planner.codegen
   #logger.testlogger.level = TRACE
   #logger.testlogger.appenderRefs = TestLogger
   ```
   Then let's add some empty lines in between



##########
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##########
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
 
-/**
- * End-to-end test for batch SQL queries.
- *
- * <p>The sources are generated and bounded. The result is always constant.
- *
- * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement 
SQL statement that
- * will be executed as executeSql
- */
-public class BatchSQLTestProgram {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@ExtendWith(TestLoggerExtension.class)
+public class BatchSQLTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchSQLTest.class);
+
+    private static final Path sqlPath =
+            ResourceTestUtils.getResource("resources/sql-job-query.sql");
+
+    @TempDir private Path tmp;
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    private Path result;
+
+    @BeforeEach
+    public void before() {
+        this.result = tmp.resolve(String.format("result-%s", 
UUID.randomUUID()));
+        LOG.info("Results for this test will be stored at: {}", this.result);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = BatchShuffleMode.class,
+            names = {
+                "ALL_EXCHANGES_BLOCKING",
+                "ALL_EXCHANGES_HYBRID_FULL",
+                "ALL_EXCHANGES_HYBRID_SELECTIVE"
+            })
+    public void testBatchSQL(BatchShuffleMode shuffleMode) throws Exception {
+        String sqlStatement = new String(Files.readAllBytes(sqlPath));
 
-    public static void main(String[] args) throws Exception {
-        ParameterTool params = ParameterTool.fromArgs(args);
-        String outputPath = params.getRequired("outputPath");
-        String sqlStatement = params.getRequired("sqlStatement");
-        String shuffleType = params.getRequired("shuffleType");
         TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.inBatchMode());
-        BatchShuffleMode shuffleMode = checkAndGetShuffleMode(shuffleType);
         tEnv.getConfig().set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
+
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSourceInternal("table1", new 
GeneratorTableSource(10, 100, 60, 0));
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSourceInternal("table2", new 
GeneratorTableSource(5, 0.2f, 60, 5));
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSinkInternal(
                         "sinkTable",
-                        new CsvTableSink(outputPath)
+                        new CsvTableSink(this.result.toString())

Review Comment:
   Would it be easy to switch from `CsvTableSink` to `FileSink` here? 
:thinking: ...to remove the deprecation warning. Same applies to the 
`InputFormatTableSource` further down in the test code.



##########
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##########
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
 
-/**
- * End-to-end test for batch SQL queries.
- *
- * <p>The sources are generated and bounded. The result is always constant.
- *
- * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement 
SQL statement that
- * will be executed as executeSql
- */
-public class BatchSQLTestProgram {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@ExtendWith(TestLoggerExtension.class)
+public class BatchSQLTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchSQLTest.class);
+
+    private static final Path sqlPath =
+            ResourceTestUtils.getResource("resources/sql-job-query.sql");
+
+    @TempDir private Path tmp;
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    private Path result;
+
+    @BeforeEach
+    public void before() {
+        this.result = tmp.resolve(String.format("result-%s", 
UUID.randomUUID()));
+        LOG.info("Results for this test will be stored at: {}", this.result);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = BatchShuffleMode.class,
+            names = {
+                "ALL_EXCHANGES_BLOCKING",
+                "ALL_EXCHANGES_HYBRID_FULL",
+                "ALL_EXCHANGES_HYBRID_SELECTIVE"
+            })
+    public void testBatchSQL(BatchShuffleMode shuffleMode) throws Exception {

Review Comment:
   ```suggestion
       public void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path 
tmpDir) throws Exception {
           final Path outputFolder = tmpDir.resolve(String.format("result-%s", 
UUID.randomUUID()));
   ```
   nit: We could also add the tmpDir as a parameter here. No need to have the 
two instance fields `result` and `tmp` since we only have a single test method.



##########
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##########
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
 
-/**
- * End-to-end test for batch SQL queries.
- *
- * <p>The sources are generated and bounded. The result is always constant.
- *
- * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement 
SQL statement that
- * will be executed as executeSql
- */
-public class BatchSQLTestProgram {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@ExtendWith(TestLoggerExtension.class)
+public class BatchSQLTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchSQLTest.class);
+
+    private static final Path sqlPath =
+            ResourceTestUtils.getResource("resources/sql-job-query.sql");
+
+    @TempDir private Path tmp;
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    private Path result;
+
+    @BeforeEach
+    public void before() {
+        this.result = tmp.resolve(String.format("result-%s", 
UUID.randomUUID()));
+        LOG.info("Results for this test will be stored at: {}", this.result);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = BatchShuffleMode.class,
+            names = {
+                "ALL_EXCHANGES_BLOCKING",
+                "ALL_EXCHANGES_HYBRID_FULL",
+                "ALL_EXCHANGES_HYBRID_SELECTIVE"
+            })
+    public void testBatchSQL(BatchShuffleMode shuffleMode) throws Exception {
+        String sqlStatement = new String(Files.readAllBytes(sqlPath));
 
-    public static void main(String[] args) throws Exception {
-        ParameterTool params = ParameterTool.fromArgs(args);
-        String outputPath = params.getRequired("outputPath");
-        String sqlStatement = params.getRequired("sqlStatement");
-        String shuffleType = params.getRequired("shuffleType");
         TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.inBatchMode());
-        BatchShuffleMode shuffleMode = checkAndGetShuffleMode(shuffleType);
         tEnv.getConfig().set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
+
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSourceInternal("table1", new 
GeneratorTableSource(10, 100, 60, 0));
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSourceInternal("table2", new 
GeneratorTableSource(5, 0.2f, 60, 5));
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSinkInternal(
                         "sinkTable",
-                        new CsvTableSink(outputPath)
+                        new CsvTableSink(this.result.toString())

Review Comment:
   this should happen in a separate commit, though.



##########
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##########
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
 
-/**
- * End-to-end test for batch SQL queries.
- *
- * <p>The sources are generated and bounded. The result is always constant.
- *
- * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement 
SQL statement that
- * will be executed as executeSql
- */
-public class BatchSQLTestProgram {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@ExtendWith(TestLoggerExtension.class)
+public class BatchSQLTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchSQLTest.class);
+
+    private static final Path sqlPath =
+            ResourceTestUtils.getResource("resources/sql-job-query.sql");
+
+    @TempDir private Path tmp;
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    private Path result;
+
+    @BeforeEach
+    public void before() {

Review Comment:
   ```suggestion
       void before() {
   ```
   nit: test methods can be package-private in JUnit5. but that's more like an 
fyi because it doesn't really matter here.



##########
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##########
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
 
-/**
- * End-to-end test for batch SQL queries.
- *
- * <p>The sources are generated and bounded. The result is always constant.
- *
- * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement 
SQL statement that
- * will be executed as executeSql
- */
-public class BatchSQLTestProgram {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@ExtendWith(TestLoggerExtension.class)
+public class BatchSQLTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchSQLTest.class);
+
+    private static final Path sqlPath =
+            ResourceTestUtils.getResource("resources/sql-job-query.sql");
+
+    @TempDir private Path tmp;
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    private Path result;
+
+    @BeforeEach
+    public void before() {
+        this.result = tmp.resolve(String.format("result-%s", 
UUID.randomUUID()));
+        LOG.info("Results for this test will be stored at: {}", this.result);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = BatchShuffleMode.class,
+            names = {
+                "ALL_EXCHANGES_BLOCKING",
+                "ALL_EXCHANGES_HYBRID_FULL",
+                "ALL_EXCHANGES_HYBRID_SELECTIVE"
+            })
+    public void testBatchSQL(BatchShuffleMode shuffleMode) throws Exception {
+        String sqlStatement = new String(Files.readAllBytes(sqlPath));
 
-    public static void main(String[] args) throws Exception {
-        ParameterTool params = ParameterTool.fromArgs(args);
-        String outputPath = params.getRequired("outputPath");
-        String sqlStatement = params.getRequired("sqlStatement");
-        String shuffleType = params.getRequired("shuffleType");
         TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.inBatchMode());
-        BatchShuffleMode shuffleMode = checkAndGetShuffleMode(shuffleType);
         tEnv.getConfig().set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
+
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSourceInternal("table1", new 
GeneratorTableSource(10, 100, 60, 0));
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSourceInternal("table2", new 
GeneratorTableSource(5, 0.2f, 60, 5));
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSinkInternal(
                         "sinkTable",
-                        new CsvTableSink(outputPath)
+                        new CsvTableSink(this.result.toString())
                                 .configure(
                                         new String[] {"f0", "f1"},
                                         new TypeInformation[] {Types.INT, 
Types.SQL_TIMESTAMP}));
 
+        LOG.info("Submitting job");
         TableResult result = tEnv.executeSql(sqlStatement);
-        // wait job finish
-        result.getJobClient().get().getJobExecutionResult().get();
-    }
 
-    private static BatchShuffleMode checkAndGetShuffleMode(String shuffleType) 
{
-        BatchShuffleMode shuffleMode;
-        switch (shuffleType.toLowerCase()) {
-            case "blocking":
-                shuffleMode = BatchShuffleMode.ALL_EXCHANGES_BLOCKING;
-                break;
-            case "hybrid_full":
-                shuffleMode = BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL;
-                break;
-            case "hybrid_selective":
-                shuffleMode = BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE;
-                break;
-            default:
-                throw new IllegalArgumentException("unsupported shuffle type : 
" + shuffleType);
+        // Wait for the job to finish.
+        Optional<JobClient> jobClient = result.getJobClient();
+        if (!jobClient.isPresent()) {
+            throw new IllegalStateException("Job client is not present");
         }
-        return shuffleMode;
+        jobClient.get().getJobExecutionResult().get();

Review Comment:
   ```java
           JobClient jobClient =
                   result.getJobClient()
                           .orElseThrow(() -> new IllegalStateException("Job 
client is not present"));
           jobClient.getJobExecutionResult().get();
   ```
   nit: use the utilize the Optional's API



##########
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##########
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
 
-/**
- * End-to-end test for batch SQL queries.
- *
- * <p>The sources are generated and bounded. The result is always constant.
- *
- * <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement 
SQL statement that
- * will be executed as executeSql
- */
-public class BatchSQLTestProgram {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@ExtendWith(TestLoggerExtension.class)
+public class BatchSQLTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchSQLTest.class);
+
+    private static final Path sqlPath =
+            ResourceTestUtils.getResource("resources/sql-job-query.sql");
+
+    @TempDir private Path tmp;
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    private Path result;
+
+    @BeforeEach
+    public void before() {
+        this.result = tmp.resolve(String.format("result-%s", 
UUID.randomUUID()));
+        LOG.info("Results for this test will be stored at: {}", this.result);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = BatchShuffleMode.class,
+            names = {
+                "ALL_EXCHANGES_BLOCKING",

Review Comment:
   ```suggestion
                   "ALL_EXCHANGES_PIPELINED",
                   "ALL_EXCHANGES_BLOCKING",
   ```
   Does it make sense to add the pipelined mode as well? (just thinking out 
loud, I don't have much knowledge of this part of the code).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to