wuchong commented on a change in pull request #15400: URL: https://github.com/apache/flink/pull/15400#discussion_r603002125
########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java ########## @@ -299,15 +307,23 @@ private void callOperation(Operation operation) { } else if (operation instanceof HelpOperation) { // HELP callHelp(); + } else if (operation instanceof BeginStatementSetOperation) { + // BEGIN STATEMENT SET + callBeginStatementSet(); + } else if (operation instanceof EndStatementSetOperation) { + // END + callEndStatementSet(); + } else if (operation instanceof CatalogSinkModifyOperation) { + // INSERT INTO/OVERWRITE + callInsert((CatalogSinkModifyOperation) operation); + } else if (isStatementSetMode) { Review comment: This looks really hack and hard to maintain what statement are not allowed in statement set. I suggest to check statements at the beginning of this method. ```java if (isStatementSetMode) { // check the current operation is allowed in STATEMENT SET if (!(operation instanceof CatalogSinkModifyOperation || operation instanceof EndStatementSetOperation)) { printError(MESSAGE_STATEMENT_SET_SQL_EXECUTION_ERROR); return; } } ``` ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java ########## @@ -412,27 +428,65 @@ private void callSelect(QueryOperation operation) { } private boolean callInsert(CatalogSinkModifyOperation operation) { - printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT); - - try { - TableResult result = executor.executeOperation(sessionId, operation); - checkState(result.getJobClient().isPresent()); - terminal.writer() - .println( - CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED) - .toAnsi()); - // keep compatibility with before - terminal.writer() - .println( - String.format( - "Job ID: %s\n", - result.getJobClient().get().getJobID().toString())); - terminal.flush(); + if (isStatementSetMode) { + statementSetOperations.add(operation); + printInfo(CliStrings.MESSAGE_ADD_STATEMENT_TO_STATEMENT_SET); return true; - } catch (SqlExecutionException e) { - printExecutionException(e); + } else { + printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT); + + try { + TableResult result = executor.executeOperation(sessionId, operation); + checkState(result.getJobClient().isPresent()); + terminal.writer() + .println( + CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED) + .toAnsi()); + // keep compatibility with before + terminal.writer() + .println( + String.format( + "Job ID: %s\n", + result.getJobClient().get().getJobID().toString())); + terminal.flush(); + return true; + } catch (SqlExecutionException e) { + printExecutionException(e); + } + return false; + } + } + + private void callBeginStatementSet() { + if (isStatementSetMode) { + printStatementSetExecutionException(); + } else { + isStatementSetMode = true; + statementSetOperations = new ArrayList<>(); + printInfo(CliStrings.MESSAGE_BEGIN_STATEMENT_SET); + } + } + + private void callEndStatementSet() { + if (isStatementSetMode) { + isStatementSetMode = false; + printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT_SET); + + try { + TableResult result = executor.executeOperation(sessionId, statementSetOperations); + checkState(result.getJobClient().isPresent()); + terminal.writer() + .println( + CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SET_SUBMITTED) + .toAnsi()); + terminal.flush(); Review comment: This logic should be reused with `callInsert`, and we should also consider dml-sync option (could you rebase the branch) ? ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java ########## @@ -196,6 +204,17 @@ private CliStrings() { public static final String MESSAGE_STATEMENT_SUBMITTED = "Table update statement has been successfully submitted to the cluster:"; + public static final String MESSAGE_BEGIN_STATEMENT_SET = "Begin the statement set."; + + public static final String MESSAGE_ADD_STATEMENT_TO_STATEMENT_SET = + "Add SQL update statement to the statement set."; + + public static final String MESSAGE_SUBMITTING_STATEMENT_SET = + "Submitting SQL statement set to the cluster..."; Review comment: ```suggestion "Submitting SQL update statement set to the cluster..."; ``` ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java ########## @@ -174,6 +179,9 @@ private CliStrings() { public static final String MESSAGE_SQL_EXECUTION_ERROR = "Could not execute SQL statement."; + public static final String MESSAGE_STATEMENT_SET_SQL_EXECUTION_ERROR = + "Could not execute SQL statement in statement set."; Review comment: "Only INSERT statement is allowed in Statement Set." ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java ########## @@ -196,6 +204,17 @@ private CliStrings() { public static final String MESSAGE_STATEMENT_SUBMITTED = "Table update statement has been successfully submitted to the cluster:"; + public static final String MESSAGE_BEGIN_STATEMENT_SET = "Begin the statement set."; + + public static final String MESSAGE_ADD_STATEMENT_TO_STATEMENT_SET = + "Add SQL update statement to the statement set."; + + public static final String MESSAGE_SUBMITTING_STATEMENT_SET = + "Submitting SQL statement set to the cluster..."; + + public static final String MESSAGE_STATEMENT_SET_SUBMITTED = + "Table statement set has been successfully submitted to the cluster."; Review comment: ```suggestion "Table update statement set has been successfully submitted to the cluster."; ``` ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java ########## @@ -108,6 +109,10 @@ void setSessionProperty(String sessionId, String key, String value) TableResult executeOperation(String sessionId, Operation operation) throws SqlExecutionException; + /** Executes modify operations, and return {@link TableResult} as execution result. */ + TableResult executeOperation(String sessionId, List<ModifyOperation> operations) Review comment: ==> `executeModifyOperations` ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java ########## @@ -196,6 +204,17 @@ private CliStrings() { public static final String MESSAGE_STATEMENT_SUBMITTED = "Table update statement has been successfully submitted to the cluster:"; + public static final String MESSAGE_BEGIN_STATEMENT_SET = "Begin the statement set."; Review comment: ```suggestion public static final String MESSAGE_BEGIN_STATEMENT_SET = "Begin a statement set."; ``` ########## File path: flink-table/flink-sql-client/src/test/resources/sql/statement-set.q ########## @@ -0,0 +1,74 @@ +# statement-set.q - BEGIN STATEMENT SET, END +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# set default streaming mode and tableau result mode + +SET execution.runtime-mode = streaming; +[INFO] Session property has been set. +!info + +SET sql-client.execution.result-mode = tableau; +[INFO] Session property has been set. +!info + +create table src ( + id int, + str string +) with ( + 'connector' = 'values' +); +[INFO] Execute statement succeed. +!info + +# ========================================================================== +# test statement set +# ========================================================================== + +BEGIN STATEMENT SET; +[INFO] Begin the statement set. +!info + +BEGIN STATEMENT SET; +[ERROR] Could not execute SQL statement in statement set. +!error + +create table test ( +id int, +str string +) with ( +'connector' = 'values' +); +[ERROR] Could not execute SQL statement in statement set. +!error + +SELECT id, str FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi')) as T(id, str); +[ERROR] Could not execute SQL statement in statement set. +!error + +INSERT INTO src SELECT id, str +FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi')) as T(id, str); +[INFO] Add SQL update statement to the statement set. +!info + +END; +[INFO] Submitting SQL statement set to the cluster... +[INFO] Table statement set has been successfully submitted to the cluster. Review comment: I suggest to test this feature in dml-sync mode. Could you rebase the branch and take the dml-sync tests as an example? ########## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java ########## @@ -83,15 +83,20 @@ @Parameterized.Parameters(name = "{0}") public static Object[] parameters() throws Exception { - String first = "sql/table.q"; - URL url = CliClientITCase.class.getResource("/" + first); - File firstFile = Paths.get(url.toURI()).toFile(); - final int commonPrefixLength = firstFile.getAbsolutePath().length() - first.length(); - File dir = firstFile.getParentFile(); + String sqlDir = "sql"; Review comment: Why change this? I think we don't need to change this and all new SQL scripts can be discovered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org