fsk119 commented on code in PR #25834:
URL: https://github.com/apache/flink/pull/25834#discussion_r2050107163


##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -3214,6 +3214,20 @@ void testAlterModelRenameIfExists() {
         sql(sql).ok(expected);
     }
 
+    @Test
+    void testAlterModelReset() {

Review Comment:
   nit: it's better to rename the test `testAlterModel` to `testAlterModelSet` 
at line 3182



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterModelEvent.java:
##########
@@ -22,6 +22,8 @@
 import org.apache.flink.table.catalog.CatalogModel;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
+import javax.annotation.Nullable;
+
 /** When a model is altered, a {@link AlterModelEvent} event will be created 
and fired. */
 @PublicEvolving
 public interface AlterModelEvent extends CatalogModificationEvent {

Review Comment:
   The class structure is not align with the Database/Table. You can see:
   ```
   CatalogModificationEvent
   ├── DatabaseModifcationEvent
   │   ├── Alter
   │   ├── Create
   │   └──Drop
   ├── TableModifcationEvent
   │   ├── Alter
   │   ├── Create
   │   └── Drop
   ```
   
   Hope we can have the same structure here.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelResetConverter.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterModelReset;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ModelChange;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterModelChangeOperation;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** A converter for {@link 
org.apache.flink.sql.parser.ddl.SqlAlterModelReset}. */
+public class SqlAlterModelResetConverter
+        extends AbstractSqlAlterModelConverter<SqlAlterModelReset> {
+
+    @Override
+    public Operation convertSqlNode(SqlAlterModelReset sqlAlterModelReset, 
ConvertContext context) {
+        ResolvedCatalogModel existingModel =
+                getExistingModel(
+                        context,
+                        sqlAlterModelReset.fullModelName(),
+                        sqlAlterModelReset.ifModelExists());
+
+        Set<String> resetKeys = sqlAlterModelReset.getResetKeys();
+        if (resetKeys.isEmpty()) {
+            throw new ValidationException("ALTER MODEL RESET does not support 
empty key.");
+        }
+        List<ModelChange> modelChanges =
+                
resetKeys.stream().map(ModelChange::reset).collect(Collectors.toList());
+
+        if (existingModel == null) {

Review Comment:
   It's better we can have the same behaivour as AlterTableReset: if the 
existing model is null and ignoreIfNotExist is true, just return a NopOperation



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateModelOperation.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogModel;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Operation to describe a CREATE MODEL statement. */
+@Internal
+public class CreateModelOperation implements CreateOperation {
+    private final ObjectIdentifier modelIdentifier;
+    private final CatalogModel catalogModel;
+    private final boolean ignoreIfExists;
+    private final boolean isTemporary;
+
+    public CreateModelOperation(
+            ObjectIdentifier modelIdentifier,
+            CatalogModel catalogModel,
+            boolean ignoreIfExists,
+            boolean isTemporary) {
+        this.modelIdentifier = modelIdentifier;
+        this.catalogModel = catalogModel;
+        this.ignoreIfExists = ignoreIfExists;
+        this.isTemporary = isTemporary;
+    }
+
+    public CatalogModel getCatalogModel() {
+        return catalogModel;
+    }
+
+    public ObjectIdentifier getModelIdentifier() {
+        return modelIdentifier;
+    }
+
+    public boolean isIgnoreIfExists() {
+        return ignoreIfExists;
+    }
+
+    public boolean isTemporary() {
+        return isTemporary;
+    }
+
+    public String asSummaryString() {

Review Comment:
   add override annotation



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateModel;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogModel;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateModelOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlCreateModel}. */
+public class SqlCreateModelConverter implements 
SqlNodeConverter<SqlCreateModel> {
+
+    @Override
+    public Operation convertSqlNode(SqlCreateModel sqlCreateModel, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlCreateModel.fullModelName());
+        ObjectIdentifier identifier =
+                
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+        Map<String, String> modelOptions = getModelOptions(sqlCreateModel);
+        CatalogModel catalogModel =
+                CatalogModel.of(
+                        getSchema(sqlCreateModel.getInputColumnList(), 
context.getSqlValidator()),
+                        getSchema(sqlCreateModel.getOutputColumnList(), 
context.getSqlValidator()),
+                        modelOptions,
+                        
sqlCreateModel.getComment().map(SqlLiteral::toValue).orElse(null));
+
+        return new CreateModelOperation(
+                identifier,
+                context.getCatalogManager().resolveCatalogModel(catalogModel),
+                sqlCreateModel.isIfNotExists(),
+                sqlCreateModel.isTemporary());
+    }
+
+    private Schema getSchema(SqlNodeList nodeList, SqlValidator sqlValidator) {
+        final List<UnresolvedColumn> columnList = new ArrayList<>();
+        for (SqlNode column : nodeList) {
+            if (column instanceof SqlRegularColumn) {

Review Comment:
   I think we can reuse SchemaBuilderUtil to build the schema. How about adding 
a inner class named SchemaBuilder that extends SchemaBuilderUtil?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterModelRenameOperation.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+import org.apache.flink.table.catalog.exceptions.ModelAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ModelNotExistException;
+
+import javax.annotation.Nullable;
+
+/** Operation to describe a ALTER MODEL .. RENAME to .. statement. */
+@Internal
+public class AlterModelRenameOperation implements AlterOperation {
+
+    private final ResolvedCatalogModel model;
+    private final ObjectIdentifier modelIdentifier;
+    private final ObjectIdentifier newModelIdentifier;
+    private final boolean ignoreIfNotExists;
+
+    /**
+     * Creates a AlterModelRenameOperation.
+     *
+     * @param model The resolved model to rename. If null, the model does not 
exist.
+     * @param modelIdentifier The identifier of the model to rename.
+     * @param newModelIdentifier The new identifier of the model.
+     * @param ignoreIfNotExists A flag that indicates if the operation should 
throw an exception if
+     *     model does not exist.
+     */
+    public AlterModelRenameOperation(
+            @Nullable ResolvedCatalogModel model,

Review Comment:
   it's better to throw exception early if the model doesn't exist and 
ignoreIfNotExists is false. 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -880,14 +880,37 @@ default void createModel(ObjectPath modelPath, 
CatalogModel model, boolean ignor
      * @param newModel the new model definition
      * @param ignoreIfNotExists flag to specify behavior when the model does 
not exist: if set to
      *     false, throw an exception, if set to true, do nothing.
-     * @throws ModelNotExistException if the model does not exist
      * @throws CatalogException in case of any runtime exception
      */
     default void alterModel(ObjectPath modelPath, CatalogModel newModel, 
boolean ignoreIfNotExists)
-            throws ModelNotExistException, CatalogException {
+            throws CatalogException {

Review Comment:
   Why remove the exception? I think we can have similar method signature as 
alterTable



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateModelOperation.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogModel;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Operation to describe a CREATE MODEL statement. */
+@Internal
+public class CreateModelOperation implements CreateOperation {
+    private final ObjectIdentifier modelIdentifier;
+    private final CatalogModel catalogModel;
+    private final boolean ignoreIfExists;
+    private final boolean isTemporary;
+
+    public CreateModelOperation(
+            ObjectIdentifier modelIdentifier,
+            CatalogModel catalogModel,
+            boolean ignoreIfExists,
+            boolean isTemporary) {
+        this.modelIdentifier = modelIdentifier;
+        this.catalogModel = catalogModel;
+        this.ignoreIfExists = ignoreIfExists;
+        this.isTemporary = isTemporary;
+    }
+
+    public CatalogModel getCatalogModel() {
+        return catalogModel;
+    }
+
+    public ObjectIdentifier getModelIdentifier() {
+        return modelIdentifier;
+    }
+
+    public boolean isIgnoreIfExists() {
+        return ignoreIfExists;
+    }
+
+    public boolean isTemporary() {
+        return isTemporary;
+    }
+
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("catalogModel", catalogModel.getOptions());

Review Comment:
   catalogModel.toProperties()



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateModelOperation.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogModel;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Operation to describe a CREATE MODEL statement. */
+@Internal
+public class CreateModelOperation implements CreateOperation {
+    private final ObjectIdentifier modelIdentifier;

Review Comment:
   add a blank line



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterModelEvent.java:
##########
@@ -39,6 +41,7 @@ static AlterModelEvent createEvent(
             final CatalogModel newModel,
             final boolean ignoreIfNotExists) {
         return new AlterModelEvent() {
+            @Nullable

Review Comment:
   Why we need to add Nullable annotation here?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterModelRenameOperation.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+import org.apache.flink.table.catalog.exceptions.ModelAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ModelNotExistException;
+
+import javax.annotation.Nullable;
+
+/** Operation to describe a ALTER MODEL .. RENAME to .. statement. */

Review Comment:
   nit: RENAME to -> RENAME TO



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterModelChangeOperation.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogModel;
+import org.apache.flink.table.catalog.ModelChange;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Operation to describe a ALTER MODEL .. SET .. statement. */
+@Internal
+public class AlterModelChangeOperation implements AlterOperation {
+    private final ObjectIdentifier modelIdentifier;

Review Comment:
   nit: add a blank line before the fields declaration.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -880,14 +880,37 @@ default void createModel(ObjectPath modelPath, 
CatalogModel model, boolean ignor
      * @param newModel the new model definition
      * @param ignoreIfNotExists flag to specify behavior when the model does 
not exist: if set to
      *     false, throw an exception, if set to true, do nothing.
-     * @throws ModelNotExistException if the model does not exist
      * @throws CatalogException in case of any runtime exception
      */
     default void alterModel(ObjectPath modelPath, CatalogModel newModel, 
boolean ignoreIfNotExists)
-            throws ModelNotExistException, CatalogException {
+            throws CatalogException {
         throw new UnsupportedOperationException(
                 String.format(
                         "alterModel(ObjectPath, CatalogModel, boolean) is not 
implemented for %s.",
                         this.getClass()));
     }
+
+    /**
+     * Modifies an existing model.
+     *
+     * <p>The framework will make sure to call this method with fully 
validated {@link
+     * ResolvedCatalogModel}. Those instances are easy to serialize for a 
durable catalog
+     * implementation.
+     *
+     * @param modelPath path of the model to be modified
+     * @param newModel the new model definition
+     * @param modelChanges changes to describe the modification between the 
newModel and the
+     *     original model
+     * @param ignoreIfNotExists flag to specify behavior when the model does 
not exist: if set to
+     *     false, throw an exception, if set to true, do nothing.
+     * @throws CatalogException in case of any runtime exception
+     */
+    default void alterModel(

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterModelSet;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ModelChange;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterModelChangeOperation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlAlterModelSet}. 
*/
+public class SqlAlterModelSetConverter extends 
AbstractSqlAlterModelConverter<SqlAlterModelSet> {
+
+    @Override
+    public Operation convertSqlNode(SqlAlterModelSet sqlAlterModelSet, 
ConvertContext context) {
+        ResolvedCatalogModel existingModel =
+                getExistingModel(
+                        context,
+                        sqlAlterModelSet.fullModelName(),
+                        sqlAlterModelSet.ifModelExists());
+
+        Map<String, String> changeModelOptions =
+                
OperationConverterUtils.extractProperties(sqlAlterModelSet.getOptionList());
+        if (changeModelOptions.isEmpty()) {
+            throw new ValidationException("ALTER MODEL SET does not support 
empty option.");
+        }
+        List<ModelChange> modelChanges = new ArrayList<>();
+        changeModelOptions.forEach((key, value) -> 
modelChanges.add(ModelChange.set(key, value)));
+
+        if (existingModel == null) {

Review Comment:
   ditto



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeModelOperation.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.flink.shaded.guava32.com.google.common.collect.Streams;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildTableResult;
+
+/**
+ * Operation to describe a DESCRIBE MODEL [EXTENDED] [[catalogName.] 
dataBasesName.]sqlIdentifier
+ * statement.
+ */
+@Internal
+public class DescribeModelOperation implements Operation, ExecutableOperation {
+
+    private final ObjectIdentifier sqlIdentifier;
+    private final boolean isExtended;
+
+    public DescribeModelOperation(ObjectIdentifier sqlIdentifier, boolean 
isExtended) {
+        this.sqlIdentifier = sqlIdentifier;
+        this.isExtended = isExtended;
+    }
+
+    public ObjectIdentifier getSqlIdentifier() {
+        return sqlIdentifier;
+    }
+
+    public boolean isExtended() {
+        return isExtended;
+    }
+
+    @Override
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("identifier", sqlIdentifier);
+        params.put("isExtended", isExtended);
+        return OperationUtils.formatWithChildren(
+                "DESCRIBE MODEL", params, Collections.emptyList(), 
Operation::asSummaryString);
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        // DESCRIBE MODEL <model> shows input/output schema if any.
+        Optional<ContextResolvedModel> result = 
ctx.getCatalogManager().getModel(sqlIdentifier);
+        if (result.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Model with the identifier '%s' doesn't exist.",
+                            sqlIdentifier.asSummaryString()));
+        }
+
+        ResolvedSchema inputSchema = 
result.get().getResolvedModel().getResolvedInputSchema();
+        ResolvedSchema outputSchema = 
result.get().getResolvedModel().getResolvedOutputSchema();
+        Object[][] rows = generateModelColumnsRows(inputSchema, outputSchema);
+
+        boolean nonComments = isSchemaNonColumnComments(inputSchema, 
outputSchema);
+        return buildTableResult(
+                generateTableColumnsNames(nonComments),
+                generateTableColumnsDataTypes(nonComments),
+                rows);
+    }
+
+    private Object[][] generateModelColumnsRows(
+            ResolvedSchema inputSchema, ResolvedSchema outputSchema) {
+        boolean nonComments = isSchemaNonColumnComments(inputSchema, 
outputSchema);
+
+        return Streams.concat(
+                        inputSchema.getColumns().stream()
+                                .map((c) -> buildSingleRow(c, nonComments, 
true)),
+                        outputSchema.getColumns().stream()
+                                .map((c) -> buildSingleRow(c, nonComments, 
false)))
+                .toArray(Object[][]::new);
+    }
+
+    private Object[] buildSingleRow(Column c, boolean nonComments, boolean 
isInput) {
+        final LogicalType logicalType = c.getDataType().getLogicalType();
+        final ArrayList<Object> result =
+                new ArrayList<>(
+                        Arrays.asList(
+                                c.getName(),
+                                logicalType.copy(true).asSummaryString(),
+                                logicalType.isNullable(),
+                                isInput));
+        if (!nonComments) {
+            result.add(c.getComment().orElse(null));
+        }
+        return result.toArray();
+    }
+
+    private boolean isSchemaNonColumnComments(
+            ResolvedSchema inputSchema, ResolvedSchema outputSchema) {
+        return inputSchema.getColumns().stream()
+                        .map(Column::getComment)
+                        .noneMatch(Optional::isPresent)
+                && outputSchema.getColumns().stream()
+                        .map(Column::getComment)
+                        .noneMatch(Optional::isPresent);
+    }
+
+    private String[] generateTableColumnsNames(boolean nonComments) {
+        final ArrayList<String> result =
+                new ArrayList<>(Arrays.asList("name", "type", "null", 
"isInput"));

Review Comment:
   I think flink doesn't use camel case as the column names. You can refer to 
DescribeCatalogOperation or the ShowCurrentDatabaseOperation.  How about  use 
`is input`  as the column names?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowModelsOperation.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.operations.utils.ShowLikeOperator;
+
+import javax.annotation.Nullable;
+
+import java.util.Set;
+
+/**
+ * Operation to describe a SHOW MODELS statement. The full syntax for SHOW 
MODELS is as followings:
+ *
+ * <pre>{@code
+ * SHOW MODELS [ ( FROM | IN ) [catalog_name.]database_name ] [[NOT] LIKE
+ * &lt;sql_like_pattern&gt;] statement
+ * }</pre>
+ */
+@Internal
+public class ShowModelsOperation extends AbstractShowOperation {
+
+    private final @Nullable String databaseName;
+
+    public ShowModelsOperation(
+            @Nullable String catalogName,
+            @Nullable String databaseName,
+            @Nullable String preposition,
+            @Nullable ShowLikeOperator likeOp) {
+        super(catalogName, preposition, likeOp);
+        this.databaseName = databaseName;
+    }
+
+    public ShowModelsOperation(
+            @Nullable String catalogName,
+            @Nullable String databaseName,
+            @Nullable ShowLikeOperator likeOp) {
+        this(catalogName, databaseName, null, likeOp);
+    }
+
+    @Nullable
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    @Override
+    protected Set<String> retrieveDataForTableResult(Context ctx) {
+        final CatalogManager catalogManager = ctx.getCatalogManager();
+        if (preposition == null) {
+            return catalogManager.listModels();
+        } else {
+            Catalog catalog = 
catalogManager.getCatalogOrThrowException(catalogName);
+            if (catalog.databaseExists(databaseName)) {
+                return catalogManager.listModels(catalogName, databaseName);

Review Comment:
   How about 
   ```
               return catalogManager.listModels(catalogName, databaseName);
   
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelRenameConverter.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterModelRename;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogModel;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterModelRenameOperation;
+
+/** A converter for {@link 
org.apache.flink.sql.parser.ddl.SqlAlterModelRename}. */
+public class SqlAlterModelRenameConverter
+        extends AbstractSqlAlterModelConverter<SqlAlterModelRename> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlAlterModelRename sqlAlterModelRename, ConvertContext context) {
+        ResolvedCatalogModel existingModel =
+                getExistingModel(
+                        context,
+                        sqlAlterModelRename.fullModelName(),
+                        sqlAlterModelRename.ifModelExists());
+
+        UnresolvedIdentifier newUnresolvedIdentifier =
+                
UnresolvedIdentifier.of(sqlAlterModelRename.fullNewModelName());
+        ObjectIdentifier newModelIdentifier =

Review Comment:
   It's better we can forbid the following statement.
   ```
   ALTER MODEL `default_cat`.`default`.`m1` RENAME TO `other_cat`.`default`.`m1`
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to