fsk119 commented on code in PR #25834:
URL: https://github.com/apache/flink/pull/25834#discussion_r2053197415


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterModelSet;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ModelChange;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterModelChangeOperation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlAlterModelSet}. 
*/
+public class SqlAlterModelSetConverter extends 
AbstractSqlAlterModelConverter<SqlAlterModelSet> {
+
+    @Override
+    public Operation convertSqlNode(SqlAlterModelSet sqlAlterModelSet, 
ConvertContext context) {
+        ResolvedCatalogModel existingModel =
+                getExistingModel(
+                        context,
+                        sqlAlterModelSet.fullModelName(),
+                        sqlAlterModelSet.ifModelExists());
+
+        Map<String, String> changeModelOptions =
+                
OperationConverterUtils.extractProperties(sqlAlterModelSet.getOptionList());
+        if (changeModelOptions.isEmpty()) {
+            throw new ValidationException("ALTER MODEL SET does not support 
empty option.");
+        }
+        List<ModelChange> modelChanges = new ArrayList<>();
+        changeModelOptions.forEach((key, value) -> 
modelChanges.add(ModelChange.set(key, value)));
+
+        if (existingModel == null) {

Review Comment:
   ```
   if (existingModel == null) {
       return new NopOperation();
   }
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateModel;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogModel;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateModelOperation;
+import org.apache.flink.table.planner.operations.SchemaBuilderUtil;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlCreateModel}. */
+public class SqlCreateModelConverter implements 
SqlNodeConverter<SqlCreateModel> {
+
+    @Override
+    public Operation convertSqlNode(SqlCreateModel sqlCreateModel, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlCreateModel.fullModelName());
+        ObjectIdentifier identifier =
+                
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+        Map<String, String> modelOptions = getModelOptions(sqlCreateModel);
+
+        ModelSchemaBuilderUtils schemaBuilderUtil =
+                new ModelSchemaBuilderUtils(
+                        context.getSqlValidator(),
+                        SqlNode::toString,
+                        context.getCatalogManager().getDataTypeFactory());
+
+        CatalogModel catalogModel =
+                CatalogModel.of(
+                        
schemaBuilderUtil.getSchema(sqlCreateModel.getInputColumnList()),
+                        
schemaBuilderUtil.getSchema(sqlCreateModel.getOutputColumnList()),
+                        modelOptions,
+                        
sqlCreateModel.getComment().map(SqlLiteral::toValue).orElse(null));
+
+        return new CreateModelOperation(
+                identifier,
+                context.getCatalogManager().resolveCatalogModel(catalogModel),
+                sqlCreateModel.isIfNotExists(),
+                sqlCreateModel.isTemporary());
+    }
+
+    private Map<String, String> getModelOptions(SqlCreateModel sqlCreateModel) 
{
+        Map<String, String> options = new HashMap<>();
+        sqlCreateModel
+                .getPropertyList()
+                .getList()
+                .forEach(
+                        p ->
+                                options.put(
+                                        ((SqlTableOption) 
Objects.requireNonNull(p)).getKeyString(),
+                                        ((SqlTableOption) 
p).getValueString()));
+        return options;
+    }
+
+    /** Builder for {@link Schema} of a model. */
+    private static class ModelSchemaBuilderUtils extends SchemaBuilderUtil {
+        ModelSchemaBuilderUtils(
+                SqlValidator sqlValidator,
+                Function<SqlNode, String> escapeExpressions,
+                DataTypeFactory dataTypeFactory) {
+            super(sqlValidator, escapeExpressions, dataTypeFactory);
+        }
+
+        private Schema getSchema(SqlNodeList nodeList) {
+            columns.clear();
+            for (SqlNode column : nodeList) {
+                if (column instanceof SqlRegularColumn) {
+                    SqlRegularColumn regularColumn = (SqlRegularColumn) column;
+                    columns.put(

Review Comment:
   I think we should check whether the column is duplicate. If input schema 
contains columns with same name, please throw exception to nodify users.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelResetConverter.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterModelReset;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ModelChange;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterModelChangeOperation;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** A converter for {@link 
org.apache.flink.sql.parser.ddl.SqlAlterModelReset}. */
+public class SqlAlterModelResetConverter
+        extends AbstractSqlAlterModelConverter<SqlAlterModelReset> {
+
+    @Override
+    public Operation convertSqlNode(SqlAlterModelReset sqlAlterModelReset, 
ConvertContext context) {
+        ResolvedCatalogModel existingModel =
+                getExistingModel(
+                        context,
+                        sqlAlterModelReset.fullModelName(),
+                        sqlAlterModelReset.ifModelExists());
+
+        Set<String> resetKeys = sqlAlterModelReset.getResetKeys();
+        if (resetKeys.isEmpty()) {
+            throw new ValidationException("ALTER MODEL RESET does not support 
empty key.");
+        }
+        List<ModelChange> modelChanges =
+                
resetKeys.stream().map(ModelChange::reset).collect(Collectors.toList());
+
+        if (existingModel == null) {
+            return new AlterModelChangeOperation(

Review Comment:
   return new NopOperation();



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterModelRenameOperation.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+import org.apache.flink.table.catalog.exceptions.ModelAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ModelNotExistException;
+
+import javax.annotation.Nullable;
+
+/** Operation to describe a ALTER MODEL .. RENAME to .. statement. */
+@Internal
+public class AlterModelRenameOperation implements AlterOperation {
+
+    private final ResolvedCatalogModel model;
+    private final ObjectIdentifier modelIdentifier;
+    private final ObjectIdentifier newModelIdentifier;
+    private final boolean ignoreIfNotExists;
+
+    /**
+     * Creates a AlterModelRenameOperation.
+     *
+     * @param model The resolved model to rename. If null, the model does not 
exist.
+     * @param modelIdentifier The identifier of the model to rename.
+     * @param newModelIdentifier The new identifier of the model.
+     * @param ignoreIfNotExists A flag that indicates if the operation should 
throw an exception if
+     *     model does not exist.
+     */
+    public AlterModelRenameOperation(
+            @Nullable ResolvedCatalogModel model,

Review Comment:
   But I think `AlterModelRenameOperation` doesn't need `model` field. Please 
remove it. 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateModel;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogModel;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateModelOperation;
+import org.apache.flink.table.planner.operations.SchemaBuilderUtil;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlCreateModel}. */
+public class SqlCreateModelConverter implements 
SqlNodeConverter<SqlCreateModel> {
+
+    @Override
+    public Operation convertSqlNode(SqlCreateModel sqlCreateModel, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlCreateModel.fullModelName());
+        ObjectIdentifier identifier =
+                
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+        Map<String, String> modelOptions = getModelOptions(sqlCreateModel);
+
+        ModelSchemaBuilderUtils schemaBuilderUtil =
+                new ModelSchemaBuilderUtils(
+                        context.getSqlValidator(),
+                        SqlNode::toString,
+                        context.getCatalogManager().getDataTypeFactory());
+
+        CatalogModel catalogModel =
+                CatalogModel.of(
+                        
schemaBuilderUtil.getSchema(sqlCreateModel.getInputColumnList()),
+                        
schemaBuilderUtil.getSchema(sqlCreateModel.getOutputColumnList()),
+                        modelOptions,
+                        
sqlCreateModel.getComment().map(SqlLiteral::toValue).orElse(null));
+
+        return new CreateModelOperation(
+                identifier,
+                context.getCatalogManager().resolveCatalogModel(catalogModel),
+                sqlCreateModel.isIfNotExists(),
+                sqlCreateModel.isTemporary());
+    }
+
+    private Map<String, String> getModelOptions(SqlCreateModel sqlCreateModel) 
{
+        Map<String, String> options = new HashMap<>();
+        sqlCreateModel
+                .getPropertyList()
+                .getList()
+                .forEach(
+                        p ->
+                                options.put(
+                                        ((SqlTableOption) 
Objects.requireNonNull(p)).getKeyString(),
+                                        ((SqlTableOption) 
p).getValueString()));
+        return options;
+    }
+
+    /** Builder for {@link Schema} of a model. */
+    private static class ModelSchemaBuilderUtils extends SchemaBuilderUtil {
+        ModelSchemaBuilderUtils(
+                SqlValidator sqlValidator,
+                Function<SqlNode, String> escapeExpressions,
+                DataTypeFactory dataTypeFactory) {
+            super(sqlValidator, escapeExpressions, dataTypeFactory);
+        }
+
+        private Schema getSchema(SqlNodeList nodeList) {
+            columns.clear();
+            for (SqlNode column : nodeList) {
+                if (column instanceof SqlRegularColumn) {
+                    SqlRegularColumn regularColumn = (SqlRegularColumn) column;
+                    columns.put(
+                            regularColumn.getName().getSimple(),
+                            toUnresolvedPhysicalColumn(regularColumn));
+                } else {
+                    throw new TableException(

Review Comment:
   I prefer to use ValidationException here. TableException indicates that an 
internal error occurred or that a feature is not supported yet. Usually, this 
exception does not indicate a fault of the user.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelRenameConverter.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterModelRename;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterModelRenameOperation;
+
+/** A converter for {@link 
org.apache.flink.sql.parser.ddl.SqlAlterModelRename}. */
+public class SqlAlterModelRenameConverter
+        extends AbstractSqlAlterModelConverter<SqlAlterModelRename> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlAlterModelRename sqlAlterModelRename, ConvertContext context) {
+        ResolvedCatalogModel existingModel =
+                getExistingModel(
+                        context,
+                        sqlAlterModelRename.fullModelName(),
+                        sqlAlterModelRename.ifModelExists());
+
+        UnresolvedIdentifier newUnresolvedIdentifier =
+                
UnresolvedIdentifier.of(sqlAlterModelRename.fullNewModelName());
+        ObjectIdentifier newModelIdentifier =
+                
context.getCatalogManager().qualifyIdentifier(newUnresolvedIdentifier);
+        ObjectIdentifier oldModelIdentifier =
+                context.getCatalogManager()
+                        .qualifyIdentifier(
+                                
UnresolvedIdentifier.of(sqlAlterModelRename.fullModelName()));
+
+        if 
(!newModelIdentifier.getCatalogName().equals(oldModelIdentifier.getCatalogName()))
 {
+            throw new ValidationException(
+                    String.format(
+                            "The catalog name of the new model name '%s' must 
be the same as the old model name '%s'.",
+                            newModelIdentifier.asSummaryString(),
+                            oldModelIdentifier.asSummaryString()));
+        }
+

Review Comment:
           if (existingModel == null) {
               return new NopOperation();
           }
   



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterModelRenameOperation.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+import org.apache.flink.table.catalog.exceptions.ModelAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ModelNotExistException;
+
+import javax.annotation.Nullable;
+
+/** Operation to describe a ALTER MODEL .. RENAME to .. statement. */
+@Internal
+public class AlterModelRenameOperation implements AlterOperation {
+
+    private final ResolvedCatalogModel model;
+    private final ObjectIdentifier modelIdentifier;
+    private final ObjectIdentifier newModelIdentifier;
+    private final boolean ignoreIfNotExists;
+
+    /**
+     * Creates a AlterModelRenameOperation.
+     *
+     * @param model The resolved model to rename. If null, the model does not 
exist.
+     * @param modelIdentifier The identifier of the model to rename.
+     * @param newModelIdentifier The new identifier of the model.
+     * @param ignoreIfNotExists A flag that indicates if the operation should 
throw an exception if
+     *     model does not exist.
+     */
+    public AlterModelRenameOperation(
+            @Nullable ResolvedCatalogModel model,

Review Comment:
   After reading the codes again, 
`AbstractSqlAlterModelConverter#getExistingModel` has checked the model exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to