fsk119 commented on code in PR #20609: URL: https://github.com/apache/flink/pull/20609#discussion_r948612670
########## flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java: ########## @@ -182,6 +189,7 @@ public void startFlinkCluster() throws IOException { } } } catch (IOException ioe) { + LOG.info(Arrays.toString(ioe.getStackTrace())); Review Comment: Do we need to modify this? ########## flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java: ########## @@ -82,6 +83,7 @@ public class SqlGatewayE2ECase extends TestLogger { @Rule public final FlinkResource flinkResource = buildFlinkResource(); private static NetUtils.Port port; Review Comment: rename to hiveserver2Port? ########## flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java: ########## @@ -117,8 +119,13 @@ public void testExecuteStatement() throws Exception { .setClientMode(SQLJobSubmission.ClientMode.HIVE_JDBC) .build(), Duration.ofSeconds(60)); + gateway.submitSQLJob( + new SQLJobSubmission.SQLJobSubmissionBuilder(lines) + .setClientMode(SQLJobSubmission.ClientMode.REST) + .build(), + Duration.ofSeconds(60)); } Review Comment: I think we should add a new test rather than reuse the same test case. ########## flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java: ########## @@ -301,7 +309,105 @@ public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exceptio }); new Thread(future).start(); future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } else if (job.getClientMode() == SQLJobSubmission.ClientMode.REST) { + FutureTaskWithException<Void> future = + new FutureTaskWithException<>( + () -> { + Map<String, String> configMap = + GlobalConfiguration.loadConfiguration( + conf.toAbsolutePath().toString()) + .toMap(); + String host = + configMap.getOrDefault( + "sql-gateway.endpoint.rest.address", + InetAddress.getByName("localhost") + .getHostAddress()); + String port = + configMap.getOrDefault( + "sql-gateway.endpoint.rest.port", "8083"); + // Open a session + String sessionHandle = openSession(host, port); + List<String> sqlLines = job.getSqlLines(); + for (String sql : sqlLines) { + // Execute statement + String operationHandle = + executeStatement(sessionHandle, sql, host, port); + int count = 5; + while (count > 0) { + TimeUnit.SECONDS.sleep(2); + // Fetch results + String results = + fetchResults( + sessionHandle, operationHandle, host, port); + if (results.contains("PAYLOAD")) { + break; + } + --count; + } + } + }); + new Thread(future).start(); + future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); Review Comment: Can we add such method to reuse the logic in the HIVE_JDBC and REST? ``` private void submitSQL(RunnableWithException command, Duration timeout) { FutureTaskWithException<Void> future = new FutureTaskWithException<>(command); new Thread(future).start(); future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); } ``` ########## flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml: ########## @@ -127,6 +127,11 @@ under the License. </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + <version>20201115</version> + </dependency> Review Comment: Why not using json format dependency? ########## flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java: ########## @@ -301,7 +309,105 @@ public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exceptio }); new Thread(future).start(); future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } else if (job.getClientMode() == SQLJobSubmission.ClientMode.REST) { + FutureTaskWithException<Void> future = + new FutureTaskWithException<>( + () -> { + Map<String, String> configMap = + GlobalConfiguration.loadConfiguration( + conf.toAbsolutePath().toString()) + .toMap(); + String host = + configMap.getOrDefault( + "sql-gateway.endpoint.rest.address", + InetAddress.getByName("localhost") + .getHostAddress()); + String port = + configMap.getOrDefault( + "sql-gateway.endpoint.rest.port", "8083"); + // Open a session + String sessionHandle = openSession(host, port); Review Comment: Here missing the logic about the ADD JAR. Please cc the logic in the Hive_JDBC ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointFactory.java: ########## @@ -94,6 +92,10 @@ public Set<ConfigOption<?>> requiredOptions() { @Override public Set<ConfigOption<?>> optionalOptions() { - return Collections.emptySet(); + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(BIND_ADDRESS); + options.add(PORT); + options.add(BIND_PORT); + return options; Review Comment: Why add this? If ADDRESS and PORT are used by the client, we should not add into the Server Options. ########## flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java: ########## @@ -301,7 +309,105 @@ public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws Exceptio }); new Thread(future).start(); future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } else if (job.getClientMode() == SQLJobSubmission.ClientMode.REST) { + FutureTaskWithException<Void> future = + new FutureTaskWithException<>( + () -> { + Map<String, String> configMap = + GlobalConfiguration.loadConfiguration( + conf.toAbsolutePath().toString()) + .toMap(); + String host = + configMap.getOrDefault( + "sql-gateway.endpoint.rest.address", + InetAddress.getByName("localhost") + .getHostAddress()); + String port = + configMap.getOrDefault( + "sql-gateway.endpoint.rest.port", "8083"); + // Open a session + String sessionHandle = openSession(host, port); + List<String> sqlLines = job.getSqlLines(); + for (String sql : sqlLines) { + // Execute statement + String operationHandle = + executeStatement(sessionHandle, sql, host, port); + int count = 5; + while (count > 0) { + TimeUnit.SECONDS.sleep(2); Review Comment: Why we need `count`? We should fetch results until `EOF`. BTW, here we only submit the SQL. Why do we fetch results? Why not fetching the OperationStatus until finished or error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org