fsk119 commented on code in PR #25700:
URL: https://github.com/apache/flink/pull/25700#discussion_r1870567814


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/application/ScriptExecutor.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.application;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.SqlParserEOFException;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.operation.OperationExecutor;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Executor to execute the statements. */
+public class ScriptExecutor {
+
+    final SessionContext context;
+    final Printer printer;
+
+    private ScriptExecutor(SessionContext context) {
+        this(context, new Printer());
+    }
+
+    public ScriptExecutor(SessionContext context, Printer printer) {
+        this.context = context;
+        this.printer = printer;
+    }
+
+    public static ScriptExecutor of(SessionContext context) {
+        return new ScriptExecutor(context);
+    }
+
+    public void execute(String script) {
+        ResultIterator iterator = new ResultIterator(script);
+        try {
+            while (iterator.hasNext()) {
+                Result result = iterator.next();
+                printer.print(result.statement);
+                if (result.error != null) {
+                    throw result.error;
+                } else {
+                    printer.print(checkNotNull(result.fetcher));
+                }
+            }
+        } catch (Throwable t) {
+            printer.print(t);
+            throw new SqlGatewayException("Failed to execute the script.", t);
+        }
+    }
+
+    class ResultIterator implements Iterator<Result> {
+
+        // these 3 string builders is here to pad the split sql to its 
original line and column
+        // number
+        StringBuilder previousPaddingSqlBuilder = new StringBuilder();
+        StringBuilder currentPaddingSqlBuilder = new StringBuilder();
+        StringBuilder currentPaddingLineBuilder = new StringBuilder();
+
+        private final String script;
+        private int position;
+
+        private String statement;
+        private Throwable throwable;
+        private OperationExecutor executor;
+
+        public ResultIterator(String script) {
+            this.script = script;
+            this.position = 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+            State state = State.NORMAL;
+            StringBuilder currentSqlBuilder = new StringBuilder();
+            char currentChar = 0;
+
+            boolean hasNext = false;
+            // rebuild the executor because statement may change planner.
+            executor =
+                    new OperationExecutor(
+                            context,
+                            (config, classloader) ->
+                                    
StreamExecutionEnvironment.getExecutionEnvironment(config));
+            for (int i = position; i < script.length(); i++) {
+                char lastChar = currentChar;
+                currentChar = script.charAt(i);
+
+                currentSqlBuilder.append(currentChar);
+                currentPaddingLineBuilder.append(" ");
+
+                switch (currentChar) {
+                    case '\'':
+                        if (state == State.SINGLE_QUOTE) {
+                            state = State.NORMAL;
+                        } else if (state == State.NORMAL) {
+                            state = State.SINGLE_QUOTE;
+                        }
+                        break;
+                    case '"':
+                        if (state == State.DOUBLE_QUOTE) {
+                            state = State.NORMAL;
+                        } else if (state == State.NORMAL) {
+                            state = State.DOUBLE_QUOTE;
+                        }
+                        break;
+                    case '`':
+                        if (state == State.BACK_QUOTE) {
+                            state = State.NORMAL;
+                        } else if (state == State.NORMAL) {
+                            state = State.BACK_QUOTE;
+                        }
+                        break;
+                    case '-':
+                        if (lastChar == '-' && state == State.NORMAL) {
+                            state = State.SINGLE_COMMENT;
+                        }
+                        break;
+                    case '\n':
+                        if (state == State.SINGLE_COMMENT) {
+                            state = State.NORMAL;
+                        }
+                        currentPaddingLineBuilder.setLength(0);
+                        currentPaddingSqlBuilder.append("\n");
+                        break;
+                    case '*':
+                        if (lastChar == '/' && state == State.NORMAL) {
+                            state = State.MULTI_LINE_COMMENT;
+                        }
+                        break;
+                    case '/':
+                        if (lastChar == '*' && state == 
State.MULTI_LINE_COMMENT) {
+                            state = State.NORMAL;
+                        }
+                        break;
+                    case ';':
+                        if (state == State.NORMAL) {
+                            i =
+                                    prefetch(
+                                            i + 1,
+                                            currentSqlBuilder,
+                                            currentPaddingSqlBuilder,
+                                            currentPaddingLineBuilder);
+                            statement = currentSqlBuilder.toString();
+                            try {
+                                position = i + 1;
+                                parse(previousPaddingSqlBuilder + statement);

Review Comment:
   It is used to detect whether the statement is completed. Because Flink SQL 
supports `EXECUTE STATEMENT STATEMENT SET BEGIN INSERT INTO ...;`, semicolon is 
not the end of the statments(`END;` is the real end.). So we need to validate 
the statement is completed or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to