twalthr commented on code in PR #25036:
URL: https://github.com/apache/flink/pull/25036#discussion_r1716774191


##########
flink-python/pyflink/table/catalog.py:
##########
@@ -449,6 +449,101 @@ def drop_function(self, function_path: 'ObjectPath', 
ignore_if_not_exists: bool)
         """
         self._j_catalog.dropFunction(function_path._j_object_path, 
ignore_if_not_exists)
 
+    def list_models(self, database_name: str) -> List[str]:
+        """
+        List the names of all models in the given database. An empty list is 
returned if none is
+        registered.
+
+        :param database_name: Name of the database.
+        :return: A list of the names of the models in this database.
+        :raise: CatalogException in case of any runtime exception.
+                DatabaseNotExistException if the database does not exist.
+        """
+        return list(self._j_catalog.listModels(database_name))
+
+    def get_model(self, model_path: 'ObjectPath') -> 'CatalogModel':
+        """
+        Get the model.
+
+        :param model_path: Path :class:`ObjectPath` of the model.
+        :return: The requested :class:`CatalogModel`.
+        :raise: CatalogException in case of any runtime exception.
+                ModelNotExistException if the model does not exist in the 
catalog.
+        """
+        return 
CatalogModel._get(self._j_catalog.getModel(model_path._j_object_path))
+
+    def model_exists(self, model_path: 'ObjectPath') -> bool:
+        """
+        Check whether a model exists or not.
+
+        :param model_path: Path :class:`ObjectPath` of the model.
+        :return: true if the model exists in the catalog false otherwise.
+        :raise: CatalogException in case of any runtime exception.
+        """
+        return self._j_catalog.modelExists(model_path._j_object_path)
+
+    def drop_model(self, model_path: 'ObjectPath', ignore_if_not_exists: bool):
+        """
+        Drop a model.
+
+        :param model_path: Path :class:`ObjectPath` of the function to be 
dropped.

Review Comment:
   ```suggestion
           :param model_path: Path :class:`ObjectPath` of the model to be 
dropped.
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogModel.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/**  * A validated {@link CatalogModel} that is backed by the original 
metadata coming from the {@link

Review Comment:
   invalid JavaDoc



##########
flink-python/pyflink/table/catalog.py:
##########
@@ -1028,6 +1123,63 @@ def get_function_language(self):
         return self._j_catalog_function.getFunctionLanguage()
 
 
+class CatalogModel(object):
+    """
+    Interface for a model in a catalog.
+    """
+
+    def __init__(self, j_catalog_model):
+        self._j_catalog_model = j_catalog_model
+
+    @staticmethod
+    def create_model(
+        properties: Dict[str, str] = {},

Review Comment:
   Aren't input and output schema missing?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1341,6 +1347,193 @@ private void dropTableInternal(
         }
     }
 
+    /**
+     * Retrieves a fully qualified model. If the path is not yet fully 
qualified use {@link
+     * #qualifyIdentifier(UnresolvedIdentifier)} first.
+     *
+     * @param objectIdentifier full path of the model to retrieve
+     * @return model that the path points to.
+     */
+    public Optional<ResolvedCatalogModel> getModel(ObjectIdentifier 
objectIdentifier) {

Review Comment:
   This comment is still not addressed.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1341,6 +1347,193 @@ private void dropTableInternal(
         }
     }
 
+    /**
+     * Retrieves a fully qualified model. If the path is not yet fully 
qualified use {@link
+     * #qualifyIdentifier(UnresolvedIdentifier)} first.
+     *
+     * @param objectIdentifier full path of the model to retrieve
+     * @return model that the path points to.
+     */
+    public Optional<ResolvedCatalogModel> getModel(ObjectIdentifier 
objectIdentifier) {
+        Optional<Catalog> catalogOptional = 
getCatalog(objectIdentifier.getCatalogName());
+        ObjectPath objectPath = objectIdentifier.toObjectPath();
+        if (catalogOptional.isPresent()) {
+            Catalog currentCatalog = catalogOptional.get();
+            try {
+                final CatalogModel model = currentCatalog.getModel(objectPath);
+                if (model != null) {
+                    final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                    return Optional.of(resolvedModel);
+                }
+            } catch (ModelNotExistException e) {
+                // Ignore.
+            } catch (UnsupportedOperationException e) {
+                // Ignore for catalogs that don't support models.
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the 
model is not available
+     * in any of the catalogs.
+     */
+    public ResolvedCatalogModel getModelOrError(ObjectIdentifier 
objectIdentifier) {
+        return getModel(objectIdentifier)
+                .orElseThrow(
+                        () ->
+                                new ModelException(

Review Comment:
   This comment is still not addressed.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java:
##########
@@ -381,6 +385,112 @@ private void ensureTableExists(ObjectPath tablePath) 
throws TableNotExistExcepti
         }
     }
 
+    // ------ models ------
+
+    @Override
+    public void createModel(ObjectPath modelPath, CatalogModel model, boolean 
ignoreIfExists)
+            throws ModelAlreadyExistException, DatabaseNotExistException {
+        checkNotNull(modelPath);
+        checkNotNull(model);
+        if (!databaseExists(modelPath.getDatabaseName())) {
+            throw new DatabaseNotExistException(getName(), 
modelPath.getDatabaseName());
+        }
+        if (modelExists(modelPath)) {
+            if (!ignoreIfExists) {
+                throw new ModelAlreadyExistException(getName(), modelPath);
+            }
+        } else {
+            models.put(modelPath, model.copy());
+        }
+    }
+
+    @Override
+    public void alterModel(
+            ObjectPath modelPath, CatalogModel modelChange, boolean 
ignoreIfNotExists)
+            throws ModelNotExistException {
+        checkNotNull(modelPath);
+        checkNotNull(modelChange);
+
+        CatalogModel existingModel = models.get(modelPath);
+        if (existingModel == null) {
+            if (ignoreIfNotExists) {
+                return;
+            }
+            throw new ModelNotExistException(getName(), modelPath);
+        }
+
+        Map<String, String> newOptions = new 
HashMap<>(existingModel.getOptions());
+        newOptions.putAll(modelChange.getOptions());
+        models.put(modelPath, existingModel.copy(newOptions));
+    }
+
+    @Override
+    public void dropModel(ObjectPath modelPath, boolean ignoreIfNotExists)
+            throws ModelNotExistException {
+        checkNotNull(modelPath);
+        if (modelExists(modelPath)) {
+            models.remove(modelPath);
+        } else if (!ignoreIfNotExists) {
+            throw new ModelNotExistException(getName(), modelPath);
+        }
+    }
+
+    @Override
+    public void renameModel(ObjectPath modelPath, String newModelName, boolean 
ignoreIfNotExists)
+            throws ModelNotExistException, ModelAlreadyExistException {
+        checkNotNull(modelPath);
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(newModelName));
+
+        if (modelExists(modelPath)) {
+            ObjectPath newPath = new ObjectPath(modelPath.getDatabaseName(), 
newModelName);
+
+            if (modelExists(newPath)) {
+                throw new ModelAlreadyExistException(getName(), newPath);
+            } else {
+                models.put(newPath, models.remove(modelPath));
+            }
+        } else if (!ignoreIfNotExists) {
+            throw new ModelNotExistException(getName(), modelPath);
+        }
+    }
+
+    @Override
+    public List<String> listModels(String databaseName) throws 
DatabaseNotExistException {
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(databaseName),
+                "databaseName cannot be null or empty");
+
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return models.keySet().stream()
+                .filter(k -> k.getDatabaseName().equals(databaseName))
+                .map(k -> k.getObjectName())
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public CatalogModel getModel(ObjectPath modelPath) throws 
ModelNotExistException {
+        checkNotNull(modelPath);
+
+        if (!modelExists(modelPath)) {
+            throw new ModelNotExistException(getName(), modelPath);
+        } else {
+            return models.get(modelPath).copy();
+        }
+    }
+
+    @Override
+    public boolean modelExists(ObjectPath modelPath) {
+        checkNotNull(modelPath);
+        return databaseExists(modelPath.getDatabaseName()) && 
models.containsKey(modelPath);
+    }
+
+    public void dropAllModels() {

Review Comment:
   remove this method? we also don't support it for tables or other catalog 
objects.



##########
flink-python/pyflink/table/catalog.py:
##########
@@ -1028,6 +1123,63 @@ def get_function_language(self):
         return self._j_catalog_function.getFunctionLanguage()
 
 
+class CatalogModel(object):
+    """
+    Interface for a model in a catalog.
+    """
+
+    def __init__(self, j_catalog_model):
+        self._j_catalog_model = j_catalog_model
+
+    @staticmethod
+    def create_model(
+        properties: Dict[str, str] = {},
+        comment: str = None
+    ) -> "CatalogModel":
+        """
+        Create an instance of CatalogModel for the catalog model.
+
+        :param properties: the properties of the catalog model
+        :param comment: the comment of the catalog model
+        """
+        assert properties is not None
+
+        gateway = get_gateway()
+        return CatalogModel(
+            gateway.jvm.org.apache.flink.table.catalog.CatalogModel.of(

Review Comment:
   Ensure that CatalogModel.of does not accept null for the schema yet.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -790,4 +792,126 @@ void alterPartitionColumnStatistics(
             CatalogColumnStatistics columnStatistics,
             boolean ignoreIfNotExists)
             throws PartitionNotExistException, CatalogException;
+
+    // ------ models  ------
+
+    /**
+     * Get names of all tables models under this database. An empty list is 
returned if none exists.
+     *
+     * @return a list of the names of all models in this database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default List<String> listModels(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException(
+                String.format("listModel(String) is not implemented for %s.", 
this.getClass()));
+    }
+
+    /**
+     * Returns a {@link CatalogModel} identified by the given {@link 
ObjectPath}.
+     *
+     * @param modelPath Path of the model
+     * @return The requested model
+     * @throws ModelNotExistException if the target does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default CatalogModel getModel(ObjectPath modelPath)
+            throws ModelNotExistException, CatalogException {
+        throw new UnsupportedOperationException(
+                String.format("getModel(ObjectPath) is not implemented for 
%s.", this.getClass()));
+    }
+
+    /**
+     * Check if a model exists in this catalog.
+     *
+     * @param modelPath Path of the model
+     * @return true if the given model exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    default boolean modelExists(ObjectPath modelPath) throws CatalogException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "modelExists(ObjectPath) is not implemented for %s.", 
this.getClass()));
+    }
+
+    /**
+     * Drop a model.
+     *
+     * @param modelPath Path of the model to be dropped
+     * @param ignoreIfNotExists Flag to specify behavior when the model does 
not exist: if set to
+     *     false, throw an exception, if set to true, do nothing.
+     * @throws ModelNotExistException if the model does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default void dropModel(ObjectPath modelPath, boolean ignoreIfNotExists)
+            throws ModelNotExistException, CatalogException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "dropModel(ObjectPath, boolean) is not implemented for 
%s.",
+                        this.getClass()));
+    }
+
+    /**
+     * Rename an existing model.
+     *
+     * @param modelPath Path of the model to be renamed
+     * @param newModelName the new name of the model
+     * @param ignoreIfNotExists Flag to specify behavior when the model does 
not exist: if set to
+     *     false, throw an exception, if set to true, do nothing.
+     * @throws ModelNotExistException if the model does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default void renameModel(ObjectPath modelPath, String newModelName, 
boolean ignoreIfNotExists)
+            throws ModelNotExistException, ModelAlreadyExistException, 
CatalogException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "renameModel(ObjectPath, String, boolean) is not 
implemented for %s.",
+                        this.getClass()));
+    }
+
+    /**
+     * Creates a new model.
+     *
+     * @param modelPath path of the model to be created
+     * @param model the CatalogModel definition
+     * @param ignoreIfExists flag to specify behavior when a model already 
exists at the given path:
+     *     if set to false, it throws a ModelAlreadyExistException, if set to 
true, do nothing.
+     * @throws ModelAlreadyExistException if model already exists and 
ignoreIfExists is false
+     * @throws DatabaseNotExistException if the database in tablePath doesn't 
exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default void createModel(ObjectPath modelPath, CatalogModel model, boolean 
ignoreIfExists)
+            throws ModelAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "createModel(ObjectPath, CatalogModel, boolean) is not 
implemented for %s.",
+                        this.getClass()));
+    }
+
+    /**
+     * Modifies an existing model. Note that the new and old {@link 
CatalogModel} must be of the
+     * same kind. For example, this doesn't allow altering a remote model to 
import model or native
+     * model, and vice versa. Note that the newModel contains only changes. 
Current alter model
+     * syntax supports alter properties and rename. This call is for altering 
model properties.
+     * Therefore, the newModel's properties are only changed properties. It's 
up to catalog
+     * implementation to apply the changes. The reason for this behavior is 
that this doesn't
+     * dictate how catalog implementation should handle model alteration. For 
example, it can do
+     * Read-modify-write or merge in place etc.
+     *
+     * @param modelPath path of the model to be modified
+     * @param modelChange the CatalogModel containing only changes

Review Comment:
   > return NopOperation if model doesn't exist
   
   We are about to change this behavior for ALTER TABLE. Because NopOperation 
doesn't represent what the statement is about. Yes, let's fix this in the next 
PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to