luoyuxia commented on code in PR #22934:
URL: https://github.com/apache/flink/pull/22934#discussion_r1250852891


##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/** Replace table as statement. */
+public class SqlReplaceTableAs extends SqlCreate implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("REPLACE TABLE AS", SqlKind.OTHER_DDL);
+
+    private final SqlIdentifier tableName;
+
+    private final SqlNodeList columnList;
+
+    private final SqlNodeList propertyList;
+
+    private final List<SqlTableConstraint> tableConstraints;
+
+    private final SqlNodeList partitionKeyList;
+
+    private final SqlWatermark watermark;
+
+    private final SqlCharStringLiteral comment;
+
+    private final boolean isTemporary;
+
+    private final boolean isCreateOrReplace;
+
+    private final SqlNode asQuery;
+
+    public SqlReplaceTableAs(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists,
+            boolean isCreateOrReplace) {
+        this(
+                OPERATOR,
+                pos,
+                tableName,
+                columnList,
+                tableConstraints,
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                asQuery,
+                isTemporary,
+                ifNotExists,
+                isCreateOrReplace);
+    }
+
+    public SqlReplaceTableAs(
+            SqlSpecialOperator operator,
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists,
+            boolean isCreateOrReplace) {
+        super(operator, pos, true, ifNotExists);
+
+        this.tableName = requireNonNull(tableName, "tableName should not be 
null");
+        this.columnList = requireNonNull(columnList, "columnList should not be 
null");
+        this.tableConstraints =
+                requireNonNull(tableConstraints, "table constraints should not 
be null");
+        this.propertyList = requireNonNull(propertyList, "propertyList should 
not be null");
+        this.partitionKeyList =
+                requireNonNull(partitionKeyList, "partitionKeyList should not 
be null");
+        this.watermark = watermark;
+        this.comment = comment;
+        this.isTemporary = isTemporary;
+
+        this.asQuery = asQuery;
+        this.isCreateOrReplace = isCreateOrReplace;
+    }
+
+    @Override
+    public @Nonnull SqlOperator getOperator() {

Review Comment:
   I'm wondering if `isCreateOrReplace` is true, can we return 
`SqlSpecialOperator("CREATE OR REPLACE TABLE AS", SqlKind.OTHER_DDL)`



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/** Replace table as statement. */
+public class SqlReplaceTableAs extends SqlCreate implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("REPLACE TABLE AS", SqlKind.OTHER_DDL);
+
+    private final SqlIdentifier tableName;
+
+    private final SqlNodeList columnList;
+
+    private final SqlNodeList propertyList;
+
+    private final List<SqlTableConstraint> tableConstraints;
+
+    private final SqlNodeList partitionKeyList;
+
+    private final SqlWatermark watermark;
+
+    private final SqlCharStringLiteral comment;
+
+    private final boolean isTemporary;
+
+    private final boolean isCreateOrReplace;
+
+    private final SqlNode asQuery;
+
+    public SqlReplaceTableAs(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists,
+            boolean isCreateOrReplace) {
+        this(
+                OPERATOR,
+                pos,
+                tableName,
+                columnList,
+                tableConstraints,
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                asQuery,
+                isTemporary,
+                ifNotExists,
+                isCreateOrReplace);
+    }
+
+    public SqlReplaceTableAs(

Review Comment:
   Do we really need this construct method? Can't we use the construct method 
above.



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/** Replace table as statement. */
+public class SqlReplaceTableAs extends SqlCreate implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("REPLACE TABLE AS", SqlKind.OTHER_DDL);
+
+    private final SqlIdentifier tableName;
+
+    private final SqlNodeList columnList;
+
+    private final SqlNodeList propertyList;
+
+    private final List<SqlTableConstraint> tableConstraints;
+
+    private final SqlNodeList partitionKeyList;
+
+    private final SqlWatermark watermark;
+
+    private final SqlCharStringLiteral comment;
+
+    private final boolean isTemporary;
+
+    private final boolean isCreateOrReplace;
+
+    private final SqlNode asQuery;
+
+    public SqlReplaceTableAs(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists,
+            boolean isCreateOrReplace) {
+        this(
+                OPERATOR,
+                pos,
+                tableName,
+                columnList,
+                tableConstraints,
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                asQuery,
+                isTemporary,
+                ifNotExists,
+                isCreateOrReplace);
+    }
+
+    public SqlReplaceTableAs(
+            SqlSpecialOperator operator,
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists,
+            boolean isCreateOrReplace) {
+        super(operator, pos, true, ifNotExists);
+
+        this.tableName = requireNonNull(tableName, "tableName should not be 
null");
+        this.columnList = requireNonNull(columnList, "columnList should not be 
null");
+        this.tableConstraints =
+                requireNonNull(tableConstraints, "table constraints should not 
be null");
+        this.propertyList = requireNonNull(propertyList, "propertyList should 
not be null");
+        this.partitionKeyList =
+                requireNonNull(partitionKeyList, "partitionKeyList should not 
be null");
+        this.watermark = watermark;
+        this.comment = comment;
+        this.isTemporary = isTemporary;
+
+        this.asQuery = asQuery;
+        this.isCreateOrReplace = isCreateOrReplace;
+    }
+
+    @Override
+    public @Nonnull SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public @Nonnull List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(
+                tableName,
+                columnList,
+                new SqlNodeList(tableConstraints, SqlParserPos.ZERO),
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                asQuery);
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, 
columnList);
+        // The following features are not currently supported by RTAS, but may 
be supported in the
+        // future
+        String errorMsg =
+                isCreateOrReplace ? "CREATE OR REPLACE TABLE AS SELECT" : 
"REPLACE TABLE AS SELECT";
+
+        if (isIfNotExists()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support IF NOT EXISTS 
statements yet.");
+        }
+
+        if (isTemporary()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to create temporary 
table yet.");
+        }
+
+        if (getColumnList().size() > 0) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to specify explicit 
columns yet.");
+        }
+
+        if (getWatermark().isPresent()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to specify explicit 
watermark yet.");
+        }
+        if (getPartitionKeyList().size() > 0) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to create partitioned 
table yet.");
+        }
+        if 
(getFullConstraints().stream().anyMatch(SqlTableConstraint::isPrimaryKey)) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support primary key 
constraints yet.");
+        }
+    }
+
+    public SqlNode getAsQuery() {
+        return asQuery;
+    }
+
+    public boolean isCreateOrReplace() {
+        return isCreateOrReplace;
+    }
+
+    public SqlIdentifier getTableName() {
+        return tableName;
+    }
+
+    public SqlNodeList getColumnList() {
+        return columnList;
+    }
+
+    public SqlNodeList getPropertyList() {
+        return propertyList;
+    }
+
+    public SqlNodeList getPartitionKeyList() {
+        return partitionKeyList;
+    }
+
+    public List<SqlTableConstraint> getTableConstraints() {
+        return tableConstraints;
+    }
+
+    public Optional<SqlWatermark> getWatermark() {
+        return Optional.ofNullable(watermark);
+    }
+
+    public Optional<SqlCharStringLiteral> getComment() {
+        return Optional.ofNullable(comment);
+    }
+
+    public boolean isIfNotExists() {
+        return ifNotExists;
+    }
+
+    public boolean isTemporary() {
+        return isTemporary;
+    }
+
+    /** Returns the column constraints plus the table constraints. */
+    public List<SqlTableConstraint> getFullConstraints() {
+        return SqlConstraintValidator.getFullConstraints(tableConstraints, 
columnList);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        if (isCreateOrReplace) {
+            writer.keyword("CREATE");
+            writer.keyword("OR");
+        }
+        writer.keyword("REPLACE");
+        if (isTemporary()) {

Review Comment:
   since `isTemporary` is not supported, we can remove it since we also remove 
the partition & columns in here.



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -2469,6 +2469,142 @@ void testCreateTableAsSelectWithPartitionKey() {
                                         "CREATE TABLE AS SELECT syntax does 
not support to create partitioned table yet."));
     }
 
+    @Test
+    void testReplaceTableAsSelectWithoutOptions() {

Review Comment:
   I'd like to combine these test methods into two methods.
   One is for ReplaceTableAs, and the other one if for CREATE OR REPLACE TABLE.
   
   But I'll add comment in this combined method for which case to test.



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/** Replace table as statement. */
+public class SqlReplaceTableAs extends SqlCreate implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("REPLACE TABLE AS", SqlKind.OTHER_DDL);
+
+    private final SqlIdentifier tableName;
+
+    private final SqlNodeList columnList;
+
+    private final SqlNodeList propertyList;
+
+    private final List<SqlTableConstraint> tableConstraints;
+
+    private final SqlNodeList partitionKeyList;
+
+    private final SqlWatermark watermark;
+
+    private final SqlCharStringLiteral comment;
+
+    private final boolean isTemporary;
+
+    private final boolean isCreateOrReplace;
+
+    private final SqlNode asQuery;
+
+    public SqlReplaceTableAs(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists,
+            boolean isCreateOrReplace) {
+        this(
+                OPERATOR,
+                pos,
+                tableName,
+                columnList,
+                tableConstraints,
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                asQuery,
+                isTemporary,
+                ifNotExists,
+                isCreateOrReplace);
+    }
+
+    public SqlReplaceTableAs(
+            SqlSpecialOperator operator,
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists,
+            boolean isCreateOrReplace) {
+        super(operator, pos, true, ifNotExists);
+
+        this.tableName = requireNonNull(tableName, "tableName should not be 
null");
+        this.columnList = requireNonNull(columnList, "columnList should not be 
null");
+        this.tableConstraints =
+                requireNonNull(tableConstraints, "table constraints should not 
be null");
+        this.propertyList = requireNonNull(propertyList, "propertyList should 
not be null");
+        this.partitionKeyList =
+                requireNonNull(partitionKeyList, "partitionKeyList should not 
be null");
+        this.watermark = watermark;
+        this.comment = comment;
+        this.isTemporary = isTemporary;
+
+        this.asQuery = asQuery;
+        this.isCreateOrReplace = isCreateOrReplace;
+    }
+
+    @Override
+    public @Nonnull SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public @Nonnull List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(
+                tableName,
+                columnList,
+                new SqlNodeList(tableConstraints, SqlParserPos.ZERO),
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                asQuery);
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, 
columnList);
+        // The following features are not currently supported by RTAS, but may 
be supported in the
+        // future
+        String errorMsg =
+                isCreateOrReplace ? "CREATE OR REPLACE TABLE AS SELECT" : 
"REPLACE TABLE AS SELECT";
+
+        if (isIfNotExists()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support IF NOT EXISTS 
statements yet.");
+        }
+
+        if (isTemporary()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to create temporary 
table yet.");

Review Comment:
   ```suggestion
                       errorMsg + " syntax does not support  temporary table 
yet.");
   ```



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/** Replace table as statement. */

Review Comment:
   Please follow the java doc in `SqlCreateTableAs`. 
   Also, it's for create or replace table as statement



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/** Replace table as statement. */
+public class SqlReplaceTableAs extends SqlCreate implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("REPLACE TABLE AS", SqlKind.OTHER_DDL);
+
+    private final SqlIdentifier tableName;
+
+    private final SqlNodeList columnList;
+
+    private final SqlNodeList propertyList;
+
+    private final List<SqlTableConstraint> tableConstraints;
+
+    private final SqlNodeList partitionKeyList;
+
+    private final SqlWatermark watermark;
+
+    private final SqlCharStringLiteral comment;
+
+    private final boolean isTemporary;
+
+    private final boolean isCreateOrReplace;
+
+    private final SqlNode asQuery;
+
+    public SqlReplaceTableAs(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists,
+            boolean isCreateOrReplace) {
+        this(
+                OPERATOR,
+                pos,
+                tableName,
+                columnList,
+                tableConstraints,
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                asQuery,
+                isTemporary,
+                ifNotExists,
+                isCreateOrReplace);
+    }
+
+    public SqlReplaceTableAs(
+            SqlSpecialOperator operator,
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists,
+            boolean isCreateOrReplace) {
+        super(operator, pos, true, ifNotExists);
+
+        this.tableName = requireNonNull(tableName, "tableName should not be 
null");
+        this.columnList = requireNonNull(columnList, "columnList should not be 
null");
+        this.tableConstraints =
+                requireNonNull(tableConstraints, "table constraints should not 
be null");
+        this.propertyList = requireNonNull(propertyList, "propertyList should 
not be null");
+        this.partitionKeyList =
+                requireNonNull(partitionKeyList, "partitionKeyList should not 
be null");
+        this.watermark = watermark;
+        this.comment = comment;
+        this.isTemporary = isTemporary;
+
+        this.asQuery = asQuery;
+        this.isCreateOrReplace = isCreateOrReplace;
+    }
+
+    @Override
+    public @Nonnull SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public @Nonnull List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(
+                tableName,
+                columnList,
+                new SqlNodeList(tableConstraints, SqlParserPos.ZERO),
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                asQuery);
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, 
columnList);
+        // The following features are not currently supported by RTAS, but may 
be supported in the
+        // future
+        String errorMsg =
+                isCreateOrReplace ? "CREATE OR REPLACE TABLE AS SELECT" : 
"REPLACE TABLE AS SELECT";
+
+        if (isIfNotExists()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support IF NOT EXISTS 
statements yet.");
+        }
+
+        if (isTemporary()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to create temporary 
table yet.");
+        }
+
+        if (getColumnList().size() > 0) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to specify explicit 
columns yet.");
+        }
+
+        if (getWatermark().isPresent()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to specify explicit 
watermark yet.");
+        }
+        if (getPartitionKeyList().size() > 0) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to create partitioned 
table yet.");
+        }
+        if 
(getFullConstraints().stream().anyMatch(SqlTableConstraint::isPrimaryKey)) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support primary key 
constraints yet.");
+        }
+    }
+
+    public SqlNode getAsQuery() {
+        return asQuery;
+    }
+
+    public boolean isCreateOrReplace() {
+        return isCreateOrReplace;
+    }
+
+    public SqlIdentifier getTableName() {
+        return tableName;
+    }
+
+    public SqlNodeList getColumnList() {
+        return columnList;
+    }
+
+    public SqlNodeList getPropertyList() {
+        return propertyList;
+    }
+
+    public SqlNodeList getPartitionKeyList() {
+        return partitionKeyList;
+    }
+
+    public List<SqlTableConstraint> getTableConstraints() {
+        return tableConstraints;
+    }
+
+    public Optional<SqlWatermark> getWatermark() {
+        return Optional.ofNullable(watermark);
+    }
+
+    public Optional<SqlCharStringLiteral> getComment() {
+        return Optional.ofNullable(comment);
+    }
+
+    public boolean isIfNotExists() {
+        return ifNotExists;
+    }
+
+    public boolean isTemporary() {
+        return isTemporary;
+    }
+
+    /** Returns the column constraints plus the table constraints. */
+    public List<SqlTableConstraint> getFullConstraints() {
+        return SqlConstraintValidator.getFullConstraints(tableConstraints, 
columnList);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        if (isCreateOrReplace) {
+            writer.keyword("CREATE");

Review Comment:
   ```suggestion
               writer.keyword("CREATE OR");
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to