twalthr commented on code in PR #25834: URL: https://github.com/apache/flink/pull/25834#discussion_r1973700721
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelResetConverter.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterModelReset; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ModelChange; +import org.apache.flink.table.catalog.ResolvedCatalogModel; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.AlterModelChangeOperation; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; + +/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlAlterModelReset}. */ +public class SqlAlterModelResetConverter + extends AbstractSqlAlterModelConverter<SqlAlterModelReset> { + + @Override + public Operation convertSqlNode(SqlAlterModelReset sqlAlterModelReset, ConvertContext context) { + ResolvedCatalogModel existingModel = + getExistingModel( + context, + sqlAlterModelReset.fullModelName(), + sqlAlterModelReset.ifModelExists()); + + Set<String> lowercaseResetKeys = + sqlAlterModelReset.getResetKeys().stream() + .map(String::toLowerCase) Review Comment: ```suggestion .map(String::toLowerCase) ``` A `toLowerCase` or `toUpperCase` is always a red flag. There is no need to modify the user's input. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropModelOperation.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** Operation to describe a DROP MODEL statement. */ +@Internal +public class DropModelOperation implements DropOperation { + private final ObjectIdentifier modelIdentifier; + private final boolean ifExists; + private final boolean isTemporary; + + public DropModelOperation( + ObjectIdentifier modelIdentifier, boolean ifExists, boolean isTemporary) { + this.modelIdentifier = modelIdentifier; + this.ifExists = ifExists; + this.isTemporary = isTemporary; + } + + public ObjectIdentifier getModelIdentifier() { + return this.modelIdentifier; + } + + public boolean isIfExists() { + return this.ifExists; + } + + public boolean isTemporary() { + return isTemporary; + } + + @Override + public String asSummaryString() { + Map<String, Object> params = new LinkedHashMap<>(); + params.put("identifier", modelIdentifier); + params.put("IfExists", ifExists); Review Comment: ```suggestion params.put("ifExists", ifExists); ``` please also update the table version ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelResetConverter.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterModelReset; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ModelChange; +import org.apache.flink.table.catalog.ResolvedCatalogModel; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.AlterModelChangeOperation; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; + +/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlAlterModelReset}. */ +public class SqlAlterModelResetConverter + extends AbstractSqlAlterModelConverter<SqlAlterModelReset> { + + @Override + public Operation convertSqlNode(SqlAlterModelReset sqlAlterModelReset, ConvertContext context) { + ResolvedCatalogModel existingModel = + getExistingModel( + context, + sqlAlterModelReset.fullModelName(), + sqlAlterModelReset.ifModelExists()); + + Set<String> lowercaseResetKeys = + sqlAlterModelReset.getResetKeys().stream() + .map(String::toLowerCase) + .collect(Collectors.toSet()); + if (lowercaseResetKeys.isEmpty()) { Review Comment: For tables we also check for immutable `connector` option. Do we want to do the same for models? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterModelSet; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ModelChange; +import org.apache.flink.table.catalog.ResolvedCatalogModel; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.AlterModelChangeOperation; +import org.apache.flink.table.planner.utils.OperationConverterUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlAlterModelSet}. */ +public class SqlAlterModelSetConverter extends AbstractSqlAlterModelConverter<SqlAlterModelSet> { + + @Override + public Operation convertSqlNode(SqlAlterModelSet sqlAlterModelSet, ConvertContext context) { + ResolvedCatalogModel existingModel = + getExistingModel( + context, + sqlAlterModelSet.fullModelName(), + sqlAlterModelSet.ifModelExists()); + + Map<String, String> changeModelOptions = + OperationConverterUtils.extractProperties(sqlAlterModelSet.getOptionList()); + if (changeModelOptions.isEmpty()) { + throw new ValidationException("ALTER MODEL SET does not support empty option"); Review Comment: Same question as above: For tables we also check for immutable connector option. Do we want to do the same for models? ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala: ########## @@ -2573,6 +2574,454 @@ class TableEnvironmentTest { checkData(expectedResult1.iterator(), tableResult2.collect()) } + @Test + def testAlterModelOptions(): Unit = { + val sourceDDL = + """ + |CREATE MODEL M1 + | INPUT(f0 char(10), f1 varchar(10)) + | OUTPUT(f2 string) + |with ( + | 'task' = 'clustering', + | 'provider' = 'openai', + | 'openai.endpoint' = 'some-endpoint' + |) + """.stripMargin + tableEnv.executeSql(sourceDDL) + + val alterDDL = + """ + |ALTER MODEL M1 + |SET( + | 'openai.endpoint' = 'openai-endpoint', + | 'task' = 'embedding' + |) + |""".stripMargin + tableEnv.executeSql(alterDDL) + + assertEquals( + Map( + "provider" -> "openai", + "task" -> "embedding", + "openai.endpoint" -> "openai-endpoint").asJava, + tableEnv + .getCatalog(tableEnv.getCurrentCatalog) + .get() + .getModel(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.M1")) + .getOptions + ) + } + + @Test + def testAlterModelEmptyOptions(): Unit = { + val sourceDDL = + """ + |CREATE MODEL M1 + | INPUT(f0 char(10), f1 varchar(10)) + | OUTPUT(f2 string) + |with ( + | 'task' = 'clustering', + | 'provider' = 'openai', + | 'openai.endpoint' = 'some-endpoint' + |) + """.stripMargin + tableEnv.executeSql(sourceDDL) + + assertThatThrownBy( + () => + tableEnv + .executeSql("ALTER MODEL M1 SET ()")) + .isInstanceOf(classOf[ValidationException]) + .hasMessageContaining("ALTER MODEL SET does not support empty option"); + } + + @Test + def testAlterNonExistModel(): Unit = { + val alterDDL = + """ + |ALTER MODEL M1 + |SET( + | 'provider' = 'azureml', + | 'azueml.endpoint' = 'azure-endpoint', + | 'task' = 'clustering' + |) + |""".stripMargin + assertThatThrownBy(() => tableEnv.executeSql(alterDDL)) + .isInstanceOf(classOf[ValidationException]) + .hasMessageContaining("Model `default_catalog`.`default_database`.`M1` doesn't exist.") + } + + @Test + def testAlterNonExistModelWithIfExist(): Unit = { + val alterDDL = + """ + |ALTER MODEL IF EXISTS M1 + |SET( + | 'provider' = 'azureml', + | 'azueml.endpoint' = 'azure-endpoint', + | 'task' = 'clustering' + |) + |""".stripMargin + tableEnv.executeSql(alterDDL) + } + + @Test + def testAlterModelRename(): Unit = { + val sourceDDL = + """ + |CREATE MODEL M1 + | INPUT(f0 char(10), f1 varchar(10)) + | OUTPUT(f2 string) + |with ( + | 'task' = 'clustering', + | 'provider' = 'openai', + | 'openai.endpoint' = 'some-endpoint' + |) + """.stripMargin + tableEnv.executeSql(sourceDDL) + + val alterDDL = + """ + |ALTER MODEL M1 RENAME TO M2 + |""".stripMargin + tableEnv.executeSql(alterDDL) + } + + @Test + def testAlterModelRenameNonExist(): Unit = { + val alterDDL = + """ + |ALTER MODEL M1 RENAME TO M2 + |""".stripMargin + assertThatThrownBy(() => tableEnv.executeSql(alterDDL)) + .isInstanceOf(classOf[ValidationException]) + .hasMessageContaining("Model `default_catalog`.`default_database`.`M1` doesn't exist.") + } + + @Test + def testAlterModelRenameWithIfExists(): Unit = { + val alterDDL = + """ + |ALTER MODEL IF EXISTS M1 RENAME TO M2 + |""".stripMargin + tableEnv.executeSql(alterDDL) + } + + @Test + def testAlterModelReset(): Unit = { + val sourceDDL = + """ + |CREATE MODEL M1 + | INPUT(f0 char(10), f1 varchar(10)) + | OUTPUT(f2 string) + |with ( + | 'task' = 'clustering', + | 'provider' = 'openai', + | 'openai.endpoint' = 'some-endpoint' + |) + """.stripMargin + tableEnv.executeSql(sourceDDL) + + tableEnv.executeSql("ALTER MODEL M1 RESET ('task')"); + + assertEquals( + Map("provider" -> "openai", "openai.endpoint" -> "some-endpoint").asJava, + tableEnv + .getCatalog(tableEnv.getCurrentCatalog) + .get() + .getModel(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.M1")) + .getOptions + ) + } + + @Test + def testAlterModelResetNonExist(): Unit = { + assertThatThrownBy(() => tableEnv.executeSql("ALTER MODEL M1 RESET ('task')")) + .isInstanceOf(classOf[ValidationException]) + .hasMessageContaining("Model `default_catalog`.`default_database`.`M1` doesn't exist.") + } + + @Test + def testAlterModelResetWithIfExists(): Unit = { + tableEnv.executeSql("ALTER MODEL IF EXISTS M1 RESET ('task')"); + } + + @Test + def testAlterModelRestEmptyOptionKey(): Unit = { + val sourceDDL = + """ + |CREATE MODEL M1 + | INPUT(f0 char(10), f1 varchar(10)) + | OUTPUT(f2 string) + |with ( + | 'task' = 'clustering', + | 'provider' = 'openai', + | 'openai.endpoint' = 'some-endpoint' + |) + """.stripMargin + tableEnv.executeSql(sourceDDL) + + assertThatThrownBy( + () => + tableEnv + .executeSql("ALTER MODEL M1 RESET ()")) + .isInstanceOf(classOf[ValidationException]) + .hasMessageContaining("ALTER MODEL RESET does not support empty key"); Review Comment: Make sure all exceptions end with a dot. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -1512,18 +1513,52 @@ public void createTemporaryModel( /** * Alters a model in a given fully qualified path. * - * @param modelChange The model containing only changes + * @param newModel The new model containing changes. It could be null if the model to alter + * doesn't exist + * @param modelChanges The changes to apply to the model. * @param objectIdentifier The fully qualified path where to alter the model. * @param ignoreIfNotExists If false exception will be thrown if the model to be altered does * not exist. */ public void alterModel( - CatalogModel modelChange, + @Nullable CatalogModel newModel, + List<ModelChange> modelChanges, ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { execute( (catalog, path) -> { - ResolvedCatalogModel resolvedModel = resolveCatalogModel(modelChange); + ResolvedCatalogModel resolvedModel = + newModel == null ? null : resolveCatalogModel(newModel); + catalog.alterModel(path, resolvedModel, modelChanges, ignoreIfNotExists); Review Comment: We cannot simply pass nulls if this is not documented in the Catalog JavaDoc. Why should we allow alter model if it doesn't exist? The catalog manager or higher layers can deal with the problem. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterModelSet; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ModelChange; +import org.apache.flink.table.catalog.ResolvedCatalogModel; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.AlterModelChangeOperation; +import org.apache.flink.table.planner.utils.OperationConverterUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlAlterModelSet}. */ +public class SqlAlterModelSetConverter extends AbstractSqlAlterModelConverter<SqlAlterModelSet> { + + @Override + public Operation convertSqlNode(SqlAlterModelSet sqlAlterModelSet, ConvertContext context) { + ResolvedCatalogModel existingModel = + getExistingModel( + context, + sqlAlterModelSet.fullModelName(), + sqlAlterModelSet.ifModelExists()); + + Map<String, String> changeModelOptions = + OperationConverterUtils.extractProperties(sqlAlterModelSet.getOptionList()); + if (changeModelOptions.isEmpty()) { + throw new ValidationException("ALTER MODEL SET does not support empty option"); + } + List<ModelChange> modelChanges = new ArrayList<>(); + changeModelOptions.forEach((key, value) -> modelChanges.add(ModelChange.set(key, value))); + + if (existingModel == null) { + return new AlterModelChangeOperation( + context.getCatalogManager() + .qualifyIdentifier( + UnresolvedIdentifier.of(sqlAlterModelSet.fullModelName())), + modelChanges, + null, + sqlAlterModelSet.ifModelExists()); + } + + Map<String, String> newOptions = + existingModel.getOptions().entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey().toLowerCase(), Entry::getValue)); Review Comment: ```suggestion entry -> entry.getKey(), Entry::getValue)); ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelResetConverter.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterModelReset; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ModelChange; +import org.apache.flink.table.catalog.ResolvedCatalogModel; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.AlterModelChangeOperation; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; + +/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlAlterModelReset}. */ +public class SqlAlterModelResetConverter + extends AbstractSqlAlterModelConverter<SqlAlterModelReset> { + + @Override + public Operation convertSqlNode(SqlAlterModelReset sqlAlterModelReset, ConvertContext context) { + ResolvedCatalogModel existingModel = + getExistingModel( + context, + sqlAlterModelReset.fullModelName(), + sqlAlterModelReset.ifModelExists()); + + Set<String> lowercaseResetKeys = + sqlAlterModelReset.getResetKeys().stream() + .map(String::toLowerCase) + .collect(Collectors.toSet()); + if (lowercaseResetKeys.isEmpty()) { + throw new ValidationException("ALTER MODEL RESET does not support empty key"); + } + List<ModelChange> modelChanges = + lowercaseResetKeys.stream().map(ModelChange::reset).collect(Collectors.toList()); + + if (existingModel == null) { + return new AlterModelChangeOperation( + context.getCatalogManager() + .qualifyIdentifier( + UnresolvedIdentifier.of(sqlAlterModelReset.fullModelName())), + modelChanges, + null, + sqlAlterModelReset.ifModelExists()); + } + + Map<String, String> newOptions = + existingModel.getOptions().entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey().toLowerCase(), Entry::getValue)); Review Comment: ```suggestion entry -> entry.getKey(), Entry::getValue)); ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterModelSet; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ModelChange; +import org.apache.flink.table.catalog.ResolvedCatalogModel; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.AlterModelChangeOperation; +import org.apache.flink.table.planner.utils.OperationConverterUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlAlterModelSet}. */ +public class SqlAlterModelSetConverter extends AbstractSqlAlterModelConverter<SqlAlterModelSet> { + + @Override + public Operation convertSqlNode(SqlAlterModelSet sqlAlterModelSet, ConvertContext context) { + ResolvedCatalogModel existingModel = + getExistingModel( + context, + sqlAlterModelSet.fullModelName(), + sqlAlterModelSet.ifModelExists()); + + Map<String, String> changeModelOptions = + OperationConverterUtils.extractProperties(sqlAlterModelSet.getOptionList()); + if (changeModelOptions.isEmpty()) { + throw new ValidationException("ALTER MODEL SET does not support empty option"); + } + List<ModelChange> modelChanges = new ArrayList<>(); + changeModelOptions.forEach((key, value) -> modelChanges.add(ModelChange.set(key, value))); + + if (existingModel == null) { + return new AlterModelChangeOperation( + context.getCatalogManager() + .qualifyIdentifier( + UnresolvedIdentifier.of(sqlAlterModelSet.fullModelName())), + modelChanges, + null, + sqlAlterModelSet.ifModelExists()); + } + + Map<String, String> newOptions = + existingModel.getOptions().entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey().toLowerCase(), Entry::getValue)); + Map<String, String> lowercaseChangeModelOptions = + changeModelOptions.entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey().toLowerCase(), Entry::getValue)); Review Comment: ```suggestion entry -> entry.getKey(), Entry::getValue)); ``` ########## tools/maven/suppressions.xml: ########## @@ -40,6 +40,7 @@ under the License. <suppress files="WindowOperatorContractTest.java" checks="FileLength"/> <suppress files="NFAITCase.java" checks="FileLength"/> <suppress files="HyperLogLogPlusPlus.java" checks="FileLength"/> + <suppress files="SqlDdlToOperationConverterTest.java" checks="FileLength"/> Review Comment: The checks exist for a reason. It's time to split this class. Create a `SqlModelOperationConverterTest` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlCreateModel; +import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn; +import org.apache.flink.sql.parser.ddl.SqlTableOption; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Schema.UnresolvedColumn; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.CatalogModel; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.CreateModelOperation; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.utils.OperationConverterUtils; +import org.apache.flink.table.types.DataType; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.validate.SqlValidator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; + +/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlCreateModel}. */ +public class SqlCreateModelConverter implements SqlNodeConverter<SqlCreateModel> { + + @Override + public Operation convertSqlNode(SqlCreateModel sqlCreateModel, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlCreateModel.fullModelName()); + ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + Map<String, String> modelOptions = getModelOptions(sqlCreateModel); + CatalogModel catalogModel = + CatalogModel.of( + getSchema(sqlCreateModel.getInputColumnList(), context.getSqlValidator()), + getSchema(sqlCreateModel.getOutputColumnList(), context.getSqlValidator()), + modelOptions, + sqlCreateModel.getComment().map(SqlLiteral::toValue).orElse(null)); + + return new CreateModelOperation( + identifier, + context.getCatalogManager().resolveCatalogModel(catalogModel), + sqlCreateModel.isIfNotExists(), + sqlCreateModel.isTemporary()); + } + + private Schema getSchema(SqlNodeList nodeList, SqlValidator sqlValidator) { + final List<UnresolvedColumn> columnList = new ArrayList<>(); + for (SqlNode column : nodeList) { + if (column instanceof SqlRegularColumn) { + SqlRegularColumn regularColumn = (SqlRegularColumn) column; + SqlDataTypeSpec type = regularColumn.getType(); + boolean nullable = type.getNullable() == null || type.getNullable(); + RelDataType relType = type.deriveType(sqlValidator, nullable); + DataType dataType = fromLogicalToDataType(FlinkTypeFactory.toLogicalType(relType)); + columnList.add( + new Schema.UnresolvedPhysicalColumn( + regularColumn.getName().getSimple(), + dataType, + OperationConverterUtils.getComment(regularColumn))); + } else { + throw new TableException("Column " + column + " can only be SqlRegularColumn"); + } + } + final Schema.Builder builder = Schema.newBuilder(); + builder.fromColumns(columnList); + return builder.build(); + } + + private Map<String, String> getModelOptions(SqlCreateModel sqlCreateModel) { + Map<String, String> options = new HashMap<>(); + sqlCreateModel + .getPropertyList() + .getList() + .forEach( + p -> + options.put( + ((SqlTableOption) Objects.requireNonNull(p)) + .getKeyString() + .toUpperCase(), Review Comment: ```suggestion , ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlCreateModel; +import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn; +import org.apache.flink.sql.parser.ddl.SqlTableOption; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Schema.UnresolvedColumn; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.CatalogModel; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.CreateModelOperation; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.utils.OperationConverterUtils; +import org.apache.flink.table.types.DataType; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.validate.SqlValidator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; + +/** A converter for {@link org.apache.flink.sql.parser.ddl.SqlCreateModel}. */ +public class SqlCreateModelConverter implements SqlNodeConverter<SqlCreateModel> { + + @Override + public Operation convertSqlNode(SqlCreateModel sqlCreateModel, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlCreateModel.fullModelName()); + ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + Map<String, String> modelOptions = getModelOptions(sqlCreateModel); + CatalogModel catalogModel = + CatalogModel.of( + getSchema(sqlCreateModel.getInputColumnList(), context.getSqlValidator()), + getSchema(sqlCreateModel.getOutputColumnList(), context.getSqlValidator()), + modelOptions, + sqlCreateModel.getComment().map(SqlLiteral::toValue).orElse(null)); + + return new CreateModelOperation( + identifier, + context.getCatalogManager().resolveCatalogModel(catalogModel), + sqlCreateModel.isIfNotExists(), + sqlCreateModel.isTemporary()); + } + + private Schema getSchema(SqlNodeList nodeList, SqlValidator sqlValidator) { + final List<UnresolvedColumn> columnList = new ArrayList<>(); + for (SqlNode column : nodeList) { + if (column instanceof SqlRegularColumn) { + SqlRegularColumn regularColumn = (SqlRegularColumn) column; + SqlDataTypeSpec type = regularColumn.getType(); + boolean nullable = type.getNullable() == null || type.getNullable(); + RelDataType relType = type.deriveType(sqlValidator, nullable); + DataType dataType = fromLogicalToDataType(FlinkTypeFactory.toLogicalType(relType)); + columnList.add( + new Schema.UnresolvedPhysicalColumn( + regularColumn.getName().getSimple(), + dataType, + OperationConverterUtils.getComment(regularColumn))); + } else { + throw new TableException("Column " + column + " can only be SqlRegularColumn"); Review Comment: ```suggestion throw new TableException("Column " + column + " can only be a physical column."); ``` This error is otherwise not helpful for users. ########## flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; +import org.apache.calcite.util.NlsString; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * ALTER MODEL [IF EXISTS] [[catalogName.] dataBasesName.]modelName RESET ( 'key1' [, 'key2']...). + */ +public class SqlAlterModelReset extends SqlAlterModel { + private final SqlNodeList optionKeyList; + + public SqlAlterModelReset( + SqlParserPos pos, + SqlIdentifier modelName, + boolean ifModelExists, + SqlNodeList optionKeyList) { + super(pos, modelName, ifModelExists); + this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList should not be null"); + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(modelName, optionKeyList); Review Comment: We can simply use `List.of()` for all lists going forward. Recent Java versions don't require any lib for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org