twalthr commented on code in PR #25036: URL: https://github.com/apache/flink/pull/25036#discussion_r1716774191
########## flink-python/pyflink/table/catalog.py: ########## @@ -449,6 +449,101 @@ def drop_function(self, function_path: 'ObjectPath', ignore_if_not_exists: bool) """ self._j_catalog.dropFunction(function_path._j_object_path, ignore_if_not_exists) + def list_models(self, database_name: str) -> List[str]: + """ + List the names of all models in the given database. An empty list is returned if none is + registered. + + :param database_name: Name of the database. + :return: A list of the names of the models in this database. + :raise: CatalogException in case of any runtime exception. + DatabaseNotExistException if the database does not exist. + """ + return list(self._j_catalog.listModels(database_name)) + + def get_model(self, model_path: 'ObjectPath') -> 'CatalogModel': + """ + Get the model. + + :param model_path: Path :class:`ObjectPath` of the model. + :return: The requested :class:`CatalogModel`. + :raise: CatalogException in case of any runtime exception. + ModelNotExistException if the model does not exist in the catalog. + """ + return CatalogModel._get(self._j_catalog.getModel(model_path._j_object_path)) + + def model_exists(self, model_path: 'ObjectPath') -> bool: + """ + Check whether a model exists or not. + + :param model_path: Path :class:`ObjectPath` of the model. + :return: true if the model exists in the catalog false otherwise. + :raise: CatalogException in case of any runtime exception. + """ + return self._j_catalog.modelExists(model_path._j_object_path) + + def drop_model(self, model_path: 'ObjectPath', ignore_if_not_exists: bool): + """ + Drop a model. + + :param model_path: Path :class:`ObjectPath` of the function to be dropped. Review Comment: ```suggestion :param model_path: Path :class:`ObjectPath` of the model to be dropped. ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogModel.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** * A validated {@link CatalogModel} that is backed by the original metadata coming from the {@link Review Comment: invalid JavaDoc ########## flink-python/pyflink/table/catalog.py: ########## @@ -1028,6 +1123,63 @@ def get_function_language(self): return self._j_catalog_function.getFunctionLanguage() +class CatalogModel(object): + """ + Interface for a model in a catalog. + """ + + def __init__(self, j_catalog_model): + self._j_catalog_model = j_catalog_model + + @staticmethod + def create_model( + properties: Dict[str, str] = {}, Review Comment: Aren't input and output schema missing? ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -1341,6 +1347,193 @@ private void dropTableInternal( } } + /** + * Retrieves a fully qualified model. If the path is not yet fully qualified use {@link + * #qualifyIdentifier(UnresolvedIdentifier)} first. + * + * @param objectIdentifier full path of the model to retrieve + * @return model that the path points to. + */ + public Optional<ResolvedCatalogModel> getModel(ObjectIdentifier objectIdentifier) { Review Comment: This comment is still not addressed. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -1341,6 +1347,193 @@ private void dropTableInternal( } } + /** + * Retrieves a fully qualified model. If the path is not yet fully qualified use {@link + * #qualifyIdentifier(UnresolvedIdentifier)} first. + * + * @param objectIdentifier full path of the model to retrieve + * @return model that the path points to. + */ + public Optional<ResolvedCatalogModel> getModel(ObjectIdentifier objectIdentifier) { + Optional<Catalog> catalogOptional = getCatalog(objectIdentifier.getCatalogName()); + ObjectPath objectPath = objectIdentifier.toObjectPath(); + if (catalogOptional.isPresent()) { + Catalog currentCatalog = catalogOptional.get(); + try { + final CatalogModel model = currentCatalog.getModel(objectPath); + if (model != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + return Optional.of(resolvedModel); + } + } catch (ModelNotExistException e) { + // Ignore. + } catch (UnsupportedOperationException e) { + // Ignore for catalogs that don't support models. + } + } + return Optional.empty(); + } + + /** + * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the model is not available + * in any of the catalogs. + */ + public ResolvedCatalogModel getModelOrError(ObjectIdentifier objectIdentifier) { + return getModel(objectIdentifier) + .orElseThrow( + () -> + new ModelException( Review Comment: This comment is still not addressed. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java: ########## @@ -381,6 +385,112 @@ private void ensureTableExists(ObjectPath tablePath) throws TableNotExistExcepti } } + // ------ models ------ + + @Override + public void createModel(ObjectPath modelPath, CatalogModel model, boolean ignoreIfExists) + throws ModelAlreadyExistException, DatabaseNotExistException { + checkNotNull(modelPath); + checkNotNull(model); + if (!databaseExists(modelPath.getDatabaseName())) { + throw new DatabaseNotExistException(getName(), modelPath.getDatabaseName()); + } + if (modelExists(modelPath)) { + if (!ignoreIfExists) { + throw new ModelAlreadyExistException(getName(), modelPath); + } + } else { + models.put(modelPath, model.copy()); + } + } + + @Override + public void alterModel( + ObjectPath modelPath, CatalogModel modelChange, boolean ignoreIfNotExists) + throws ModelNotExistException { + checkNotNull(modelPath); + checkNotNull(modelChange); + + CatalogModel existingModel = models.get(modelPath); + if (existingModel == null) { + if (ignoreIfNotExists) { + return; + } + throw new ModelNotExistException(getName(), modelPath); + } + + Map<String, String> newOptions = new HashMap<>(existingModel.getOptions()); + newOptions.putAll(modelChange.getOptions()); + models.put(modelPath, existingModel.copy(newOptions)); + } + + @Override + public void dropModel(ObjectPath modelPath, boolean ignoreIfNotExists) + throws ModelNotExistException { + checkNotNull(modelPath); + if (modelExists(modelPath)) { + models.remove(modelPath); + } else if (!ignoreIfNotExists) { + throw new ModelNotExistException(getName(), modelPath); + } + } + + @Override + public void renameModel(ObjectPath modelPath, String newModelName, boolean ignoreIfNotExists) + throws ModelNotExistException, ModelAlreadyExistException { + checkNotNull(modelPath); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(newModelName)); + + if (modelExists(modelPath)) { + ObjectPath newPath = new ObjectPath(modelPath.getDatabaseName(), newModelName); + + if (modelExists(newPath)) { + throw new ModelAlreadyExistException(getName(), newPath); + } else { + models.put(newPath, models.remove(modelPath)); + } + } else if (!ignoreIfNotExists) { + throw new ModelNotExistException(getName(), modelPath); + } + } + + @Override + public List<String> listModels(String databaseName) throws DatabaseNotExistException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "databaseName cannot be null or empty"); + + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return models.keySet().stream() + .filter(k -> k.getDatabaseName().equals(databaseName)) + .map(k -> k.getObjectName()) + .collect(Collectors.toList()); + } + + @Override + public CatalogModel getModel(ObjectPath modelPath) throws ModelNotExistException { + checkNotNull(modelPath); + + if (!modelExists(modelPath)) { + throw new ModelNotExistException(getName(), modelPath); + } else { + return models.get(modelPath).copy(); + } + } + + @Override + public boolean modelExists(ObjectPath modelPath) { + checkNotNull(modelPath); + return databaseExists(modelPath.getDatabaseName()) && models.containsKey(modelPath); + } + + public void dropAllModels() { Review Comment: remove this method? we also don't support it for tables or other catalog objects. ########## flink-python/pyflink/table/catalog.py: ########## @@ -1028,6 +1123,63 @@ def get_function_language(self): return self._j_catalog_function.getFunctionLanguage() +class CatalogModel(object): + """ + Interface for a model in a catalog. + """ + + def __init__(self, j_catalog_model): + self._j_catalog_model = j_catalog_model + + @staticmethod + def create_model( + properties: Dict[str, str] = {}, + comment: str = None + ) -> "CatalogModel": + """ + Create an instance of CatalogModel for the catalog model. + + :param properties: the properties of the catalog model + :param comment: the comment of the catalog model + """ + assert properties is not None + + gateway = get_gateway() + return CatalogModel( + gateway.jvm.org.apache.flink.table.catalog.CatalogModel.of( Review Comment: Ensure that CatalogModel.of does not accept null for the schema yet. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java: ########## @@ -790,4 +792,126 @@ void alterPartitionColumnStatistics( CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException; + + // ------ models ------ + + /** + * Get names of all tables models under this database. An empty list is returned if none exists. + * + * @return a list of the names of all models in this database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + default List<String> listModels(String databaseName) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException( + String.format("listModel(String) is not implemented for %s.", this.getClass())); + } + + /** + * Returns a {@link CatalogModel} identified by the given {@link ObjectPath}. + * + * @param modelPath Path of the model + * @return The requested model + * @throws ModelNotExistException if the target does not exist + * @throws CatalogException in case of any runtime exception + */ + default CatalogModel getModel(ObjectPath modelPath) + throws ModelNotExistException, CatalogException { + throw new UnsupportedOperationException( + String.format("getModel(ObjectPath) is not implemented for %s.", this.getClass())); + } + + /** + * Check if a model exists in this catalog. + * + * @param modelPath Path of the model + * @return true if the given model exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + default boolean modelExists(ObjectPath modelPath) throws CatalogException { + throw new UnsupportedOperationException( + String.format( + "modelExists(ObjectPath) is not implemented for %s.", this.getClass())); + } + + /** + * Drop a model. + * + * @param modelPath Path of the model to be dropped + * @param ignoreIfNotExists Flag to specify behavior when the model does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @throws ModelNotExistException if the model does not exist + * @throws CatalogException in case of any runtime exception + */ + default void dropModel(ObjectPath modelPath, boolean ignoreIfNotExists) + throws ModelNotExistException, CatalogException { + throw new UnsupportedOperationException( + String.format( + "dropModel(ObjectPath, boolean) is not implemented for %s.", + this.getClass())); + } + + /** + * Rename an existing model. + * + * @param modelPath Path of the model to be renamed + * @param newModelName the new name of the model + * @param ignoreIfNotExists Flag to specify behavior when the model does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @throws ModelNotExistException if the model does not exist + * @throws CatalogException in case of any runtime exception + */ + default void renameModel(ObjectPath modelPath, String newModelName, boolean ignoreIfNotExists) + throws ModelNotExistException, ModelAlreadyExistException, CatalogException { + throw new UnsupportedOperationException( + String.format( + "renameModel(ObjectPath, String, boolean) is not implemented for %s.", + this.getClass())); + } + + /** + * Creates a new model. + * + * @param modelPath path of the model to be created + * @param model the CatalogModel definition + * @param ignoreIfExists flag to specify behavior when a model already exists at the given path: + * if set to false, it throws a ModelAlreadyExistException, if set to true, do nothing. + * @throws ModelAlreadyExistException if model already exists and ignoreIfExists is false + * @throws DatabaseNotExistException if the database in tablePath doesn't exist + * @throws CatalogException in case of any runtime exception + */ + default void createModel(ObjectPath modelPath, CatalogModel model, boolean ignoreIfExists) + throws ModelAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException( + String.format( + "createModel(ObjectPath, CatalogModel, boolean) is not implemented for %s.", + this.getClass())); + } + + /** + * Modifies an existing model. Note that the new and old {@link CatalogModel} must be of the + * same kind. For example, this doesn't allow altering a remote model to import model or native + * model, and vice versa. Note that the newModel contains only changes. Current alter model + * syntax supports alter properties and rename. This call is for altering model properties. + * Therefore, the newModel's properties are only changed properties. It's up to catalog + * implementation to apply the changes. The reason for this behavior is that this doesn't + * dictate how catalog implementation should handle model alteration. For example, it can do + * Read-modify-write or merge in place etc. + * + * @param modelPath path of the model to be modified + * @param modelChange the CatalogModel containing only changes Review Comment: > return NopOperation if model doesn't exist We are about to change this behavior for ALTER TABLE. Because NopOperation doesn't represent what the statement is about. Yes, let's fix this in the next PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org