twalthr commented on code in PR #25211:
URL: https://github.com/apache/flink/pull/25211#discussion_r1799549652


##########
flink-python/pyflink/table/catalog.py:
##########
@@ -1028,6 +1122,69 @@ def get_function_language(self):
         return self._j_catalog_function.getFunctionLanguage()
 
 
+class CatalogModel(object):
+    """
+    Interface for a model in a catalog.
+    """
+
+    def __init__(self, j_catalog_model):
+        self._j_catalog_model = j_catalog_model
+
+    @staticmethod
+    def create_model(
+        input_schema: Schema,
+        output_schema: Schema,
+        properties: Dict[str, str] = {},

Review Comment:
   `options`



##########
flink-python/pyflink/table/tests/test_catalog.py:
##########
@@ -103,6 +116,22 @@ def get_streaming_table_properties():
     def create_partition_keys():
         return ["second", "third"]
 
+    @staticmethod
+    def create_model():
+        return CatalogModel.create_model(
+            CatalogTestBase.create_model_schema(),
+            CatalogTestBase.create_model_schema(),
+            properties={},
+            comment="some comment")
+
+    @staticmethod
+    def create_another_model():
+        return CatalogModel.create_model(
+            CatalogTestBase.create_model_schema(),
+            CatalogTestBase.create_model_schema(),
+            properties={"key": "value"},

Review Comment:
   ```suggestion
               options={"key": "value"},
   ```



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java:
##########
@@ -52,6 +60,197 @@ static void init() {
         catalog.open();
     }
 
+    // TODO (FLINK-35020) : remove after implementing dropModel in catalog
+    @AfterEach
+    void cleanup() throws Exception {
+        if (catalog.modelExists(modelPath1)) {
+            ((GenericInMemoryCatalog) catalog).dropModel(modelPath1, true);
+        }
+        if (catalog.modelExists(modelPath2)) {
+            ((GenericInMemoryCatalog) catalog).dropModel(modelPath2, true);
+        }
+        super.cleanup();
+    }
+
+    // These are put here instead of CatalogTest class since model operations 
are not implemented

Review Comment:
   rather introduce a `supportsModels(): boolean` method in CatalogTest and 
move those tests to the testbase. Set this flag to false and ignore them for 
Hive catalog.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedModel.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class contains information about a model and its relationship with a 
{@link Catalog}, if
+ * any.
+ *
+ * <p>There can be 2 kinds of {@link ContextResolvedModel}:
+ *
+ * <ul>
+ *   <li>A permanent model: a model which is stored in a {@link Catalog} and 
has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary model: a model which is stored in the {@link 
CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and is flagged as temporary.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent model is {@link 
Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of models, an 
instance of this object
+ * represents the relationship between the specific {@link 
ResolvedCatalogModel} instance and the
+ * specific {@link Catalog}/{@link CatalogManager} instances. For example, the 
same {@link
+ * ResolvedCatalogModel} can be temporary for one catalog, but permanent for 
another one.
+ */
+@Internal
+public final class ContextResolvedModel {
+
+    private final ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogModel resolvedModel;
+
+    public static ContextResolvedModel permanent(
+            ObjectIdentifier identifier, Catalog catalog, ResolvedCatalogModel 
resolvedModel) {
+        return new ContextResolvedModel(
+                identifier, Preconditions.checkNotNull(catalog), 
resolvedModel);
+    }
+
+    public static ContextResolvedModel temporary(
+            ObjectIdentifier identifier, ResolvedCatalogModel resolvedModel) {
+        return new ContextResolvedModel(identifier, null, resolvedModel);
+    }
+
+    private ContextResolvedModel(
+            ObjectIdentifier objectIdentifier,
+            @Nullable Catalog catalog,
+            ResolvedCatalogModel resolvedModel) {
+        this.objectIdentifier = Preconditions.checkNotNull(objectIdentifier);
+        this.catalog = catalog;
+        this.resolvedModel = Preconditions.checkNotNull(resolvedModel);
+    }
+
+    /** @return true if the table is temporary. An anonymous table is always 
temporary. */

Review Comment:
   ```suggestion
       /** @return true if the model is temporary. */
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1341,6 +1352,293 @@ private void dropTableInternal(
         }
     }
 
+    /**
+     * Retrieves a fully qualified model. If the path is not yet fully 
qualified use {@link
+     * #qualifyIdentifier(UnresolvedIdentifier)} first.
+     *
+     * @param objectIdentifier full path of the model to retrieve
+     * @return model that the path points to.
+     */
+    public Optional<ContextResolvedModel> getModel(ObjectIdentifier 
objectIdentifier) {
+        CatalogModel temporaryModel = temporaryModels.get(objectIdentifier);
+        if (temporaryModel != null) {
+            final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(temporaryModel);
+            return 
Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel));
+        }
+        Optional<Catalog> catalogOptional = 
getCatalog(objectIdentifier.getCatalogName());
+        ObjectPath objectPath = objectIdentifier.toObjectPath();
+        if (catalogOptional.isPresent()) {
+            Catalog currentCatalog = catalogOptional.get();
+            try {
+                final CatalogModel model = currentCatalog.getModel(objectPath);
+                if (model != null) {
+                    final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                    return Optional.of(
+                            ContextResolvedModel.permanent(
+                                    objectIdentifier, currentCatalog, 
resolvedModel));
+                }
+            } catch (ModelNotExistException e) {
+                // Ignore.
+            } catch (UnsupportedOperationException e) {
+                // Ignore for catalogs that don't support models.
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the 
model is not available
+     * in any of the catalogs.
+     */
+    public ContextResolvedModel getModelOrError(ObjectIdentifier 
objectIdentifier) {
+        return getModel(objectIdentifier)
+                .orElseThrow(
+                        () ->
+                                new TableException(
+                                        String.format(
+                                                "Cannot find model '%s' in any 
of the catalogs %s.",
+                                                objectIdentifier, 
listCatalogs())));
+    }
+
+    /**
+     * Return whether the model with a fully qualified table path is temporary 
or not.
+     *
+     * @param objectIdentifier full path of the table
+     * @return the model is temporary or not.
+     */
+    public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) {
+        return temporaryModels.containsKey(objectIdentifier);
+    }
+
+    /**
+     * Returns an array of names of all models registered in the namespace of 
the current catalog
+     * and database.
+     *
+     * @return names of all registered models
+     */
+    public Set<String> listModels() {
+        return listModels(getCurrentCatalog(), getCurrentDatabase());
+    }
+
+    /**
+     * Returns an array of names of all models registered in the namespace of 
the given catalog and
+     * database.
+     *
+     * @return names of all registered models
+     */
+    public Set<String> listModels(String catalogName, String databaseName) {
+        Catalog catalog = getCatalogOrThrowException(catalogName);
+        if (catalog == null) {
+            throw new ValidationException(String.format("Catalog %s does not 
exist", catalogName));
+        }
+        try {
+            return new HashSet<>(catalog.listModels(databaseName));
+        } catch (DatabaseNotExistException e) {
+            throw new ValidationException(
+                    String.format("Database %s does not exist", databaseName), 
e);
+        }
+    }
+
+    /**
+     * Creates a model in a given fully qualified path.
+     *
+     * @param model The resolved model to put in the given path.
+     * @param objectIdentifier The fully qualified path where to put the model.
+     * @param ignoreIfExists If false exception will be thrown if a model 
exists in the given path.
+     */
+    public void createModel(
+            CatalogModel model, ObjectIdentifier objectIdentifier, boolean 
ignoreIfExists) {
+        execute(
+                (catalog, path) -> {
+                    final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                    catalog.createModel(path, resolvedModel, ignoreIfExists);
+                    catalogModificationListeners.forEach(
+                            listener ->
+                                    listener.onEvent(
+                                            CreateModelEvent.createEvent(
+                                                    
CatalogContext.createContext(
+                                                            
objectIdentifier.getCatalogName(),
+                                                            catalog),
+                                                    objectIdentifier,
+                                                    resolvedModel,
+                                                    ignoreIfExists,
+                                                    false)));
+                },
+                objectIdentifier,
+                false,
+                "CreateModel");
+    }
+
+    /**
+     * Creates a temporary model in a given fully qualified path.
+     *
+     * @param model The resolved model to put in the given path.
+     * @param objectIdentifier The fully qualified path where to put the model.
+     * @param ignoreIfExists if false exception will be thrown if a model 
exists in the given path.
+     */
+    public void createTemporaryModel(
+            CatalogModel model, ObjectIdentifier objectIdentifier, boolean 
ignoreIfExists) {
+        Optional<TemporaryOperationListener> listener =
+                getTemporaryOperationListener(objectIdentifier);
+        temporaryModels.compute(
+                objectIdentifier,
+                (k, v) -> {
+                    if (v != null) {
+                        if (!ignoreIfExists) {
+                            throw new ValidationException(
+                                    String.format(
+                                            "Temporary model '%s' already 
exists",
+                                            objectIdentifier));
+                        }
+                        return v;
+                    } else {
+                        ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                        Catalog catalog =
+                                
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+                        if (listener.isPresent()) {
+                            return listener.get()
+                                    .onCreateTemporaryModel(
+                                            objectIdentifier.toObjectPath(), 
resolvedModel);
+                        }
+                        catalogModificationListeners.forEach(
+                                l ->
+                                        l.onEvent(
+                                                CreateModelEvent.createEvent(
+                                                        
CatalogContext.createContext(
+                                                                
objectIdentifier.getCatalogName(),
+                                                                catalog),
+                                                        objectIdentifier,
+                                                        resolvedModel,
+                                                        ignoreIfExists,
+                                                        true)));
+                        return resolvedModel;
+                    }
+                });
+    }
+
+    /**
+     * Alters a model in a given fully qualified path.
+     *
+     * @param modelChange The model containing only changes
+     * @param objectIdentifier The fully qualified path where to alter the 
model.
+     * @param ignoreIfNotExists If false exception will be thrown if the model 
to be altered does
+     *     not exist.
+     */
+    public void alterModel(
+            CatalogModel modelChange,
+            ObjectIdentifier objectIdentifier,
+            boolean ignoreIfNotExists) {
+        execute(
+                (catalog, path) -> {
+                    ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(modelChange);
+                    catalog.alterModel(path, resolvedModel, ignoreIfNotExists);
+                    catalogModificationListeners.forEach(
+                            listener ->
+                                    listener.onEvent(
+                                            AlterModelEvent.createEvent(
+                                                    
CatalogContext.createContext(
+                                                            
objectIdentifier.getCatalogName(),
+                                                            catalog),
+                                                    objectIdentifier,
+                                                    resolvedModel,
+                                                    ignoreIfNotExists)));
+                },
+                objectIdentifier,
+                ignoreIfNotExists,
+                "AlterModel");
+    }
+
+    /**
+     * Drops a model in a given fully qualified path.
+     *
+     * @param objectIdentifier The fully qualified path of the model to drop.
+     * @param ignoreIfNotExists If false exception will be thrown if the model 
to drop does not
+     *     exist.
+     */
+    public void dropModel(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
+        execute(
+                (catalog, path) -> {
+                    Optional<ContextResolvedModel> resultOpt = 
getModel(objectIdentifier);
+                    if (resultOpt.isPresent()) {
+                        ResolvedCatalogModel resolvedModel = 
resultOpt.get().getResolvedModel();
+                        catalog.dropModel(path, ignoreIfNotExists);
+                        catalogModificationListeners.forEach(
+                                listener ->
+                                        listener.onEvent(
+                                                DropModelEvent.createEvent(
+                                                        
CatalogContext.createContext(
+                                                                
objectIdentifier.getCatalogName(),
+                                                                catalog),
+                                                        objectIdentifier,
+                                                        resolvedModel,
+                                                        ignoreIfNotExists,
+                                                        false)));
+                    } else if (!ignoreIfNotExists) {
+                        throw new ModelNotExistException(
+                                objectIdentifier.getCatalogName(), 
objectIdentifier.toObjectPath());
+                    }
+                },
+                objectIdentifier,
+                ignoreIfNotExists,
+                "DropModel");
+    }
+
+    /**
+     * Drop a temporary model in a given fully qualified path.
+     *
+     * @param objectIdentifier The fully qualified path of the model to drop.
+     * @param ignoreIfNotExists If false exception will be thrown if the model 
to be dropped does
+     *     not exist.
+     */
+    public void dropTemporaryModel(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
+        CatalogModel model = temporaryModels.get(objectIdentifier);
+        if (model != null) {
+            getTemporaryOperationListener(objectIdentifier)
+                    .ifPresent(l -> 
l.onDropTemporaryModel(objectIdentifier.toObjectPath()));
+
+            Catalog catalog = 
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+            ResolvedCatalogModel resolvedModel = resolveCatalogModel(model);
+            temporaryModels.remove(objectIdentifier);
+            catalogModificationListeners.forEach(
+                    listener ->
+                            listener.onEvent(
+                                    DropModelEvent.createEvent(
+                                            CatalogContext.createContext(
+                                                    
objectIdentifier.getCatalogName(), catalog),
+                                            objectIdentifier,
+                                            resolvedModel,
+                                            ignoreIfNotExists,
+                                            true)));
+        } else if (!ignoreIfNotExists) {
+            throw new ValidationException(
+                    String.format(
+                            "Temporary model with identifier '%s' does not 
exist.",
+                            objectIdentifier.asSummaryString()));
+        }
+    }
+
+    public ResolvedCatalogModel resolveCatalogModel(CatalogModel model) {
+        Preconditions.checkNotNull(schemaResolver, "Schema resolver is not 
initialized.");
+        if (model instanceof ResolvedCatalogModel) {
+            return (ResolvedCatalogModel) model;
+        }
+        // If input schema / output schema does not exist, get the schema from 
select as statement.
+        final ResolvedSchema resolvedInputSchema;
+        if (model.getInputSchema() == null) {

Review Comment:
   Could you provide more information about this case? It looks like this is at 
the wrong layer. For tables, a catalog object has always a correct output type. 
Take a look at CREATE TABLE AS which has similar behavior.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1341,6 +1352,293 @@ private void dropTableInternal(
         }
     }
 
+    /**
+     * Retrieves a fully qualified model. If the path is not yet fully 
qualified use {@link
+     * #qualifyIdentifier(UnresolvedIdentifier)} first.
+     *
+     * @param objectIdentifier full path of the model to retrieve
+     * @return model that the path points to.
+     */
+    public Optional<ContextResolvedModel> getModel(ObjectIdentifier 
objectIdentifier) {
+        CatalogModel temporaryModel = temporaryModels.get(objectIdentifier);
+        if (temporaryModel != null) {
+            final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(temporaryModel);
+            return 
Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel));
+        }
+        Optional<Catalog> catalogOptional = 
getCatalog(objectIdentifier.getCatalogName());
+        ObjectPath objectPath = objectIdentifier.toObjectPath();
+        if (catalogOptional.isPresent()) {
+            Catalog currentCatalog = catalogOptional.get();
+            try {
+                final CatalogModel model = currentCatalog.getModel(objectPath);
+                if (model != null) {
+                    final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                    return Optional.of(
+                            ContextResolvedModel.permanent(
+                                    objectIdentifier, currentCatalog, 
resolvedModel));
+                }
+            } catch (ModelNotExistException e) {
+                // Ignore.
+            } catch (UnsupportedOperationException e) {
+                // Ignore for catalogs that don't support models.
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the 
model is not available
+     * in any of the catalogs.
+     */
+    public ContextResolvedModel getModelOrError(ObjectIdentifier 
objectIdentifier) {
+        return getModel(objectIdentifier)
+                .orElseThrow(
+                        () ->
+                                new TableException(
+                                        String.format(
+                                                "Cannot find model '%s' in any 
of the catalogs %s.",
+                                                objectIdentifier, 
listCatalogs())));
+    }
+
+    /**
+     * Return whether the model with a fully qualified table path is temporary 
or not.
+     *
+     * @param objectIdentifier full path of the table
+     * @return the model is temporary or not.
+     */
+    public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) {
+        return temporaryModels.containsKey(objectIdentifier);
+    }
+
+    /**
+     * Returns an array of names of all models registered in the namespace of 
the current catalog
+     * and database.
+     *
+     * @return names of all registered models
+     */
+    public Set<String> listModels() {
+        return listModels(getCurrentCatalog(), getCurrentDatabase());
+    }
+
+    /**
+     * Returns an array of names of all models registered in the namespace of 
the given catalog and
+     * database.
+     *
+     * @return names of all registered models
+     */
+    public Set<String> listModels(String catalogName, String databaseName) {
+        Catalog catalog = getCatalogOrThrowException(catalogName);
+        if (catalog == null) {
+            throw new ValidationException(String.format("Catalog %s does not 
exist", catalogName));
+        }
+        try {
+            return new HashSet<>(catalog.listModels(databaseName));
+        } catch (DatabaseNotExistException e) {
+            throw new ValidationException(
+                    String.format("Database %s does not exist", databaseName), 
e);
+        }
+    }
+
+    /**
+     * Creates a model in a given fully qualified path.
+     *
+     * @param model The resolved model to put in the given path.
+     * @param objectIdentifier The fully qualified path where to put the model.
+     * @param ignoreIfExists If false exception will be thrown if a model 
exists in the given path.
+     */
+    public void createModel(
+            CatalogModel model, ObjectIdentifier objectIdentifier, boolean 
ignoreIfExists) {
+        execute(
+                (catalog, path) -> {
+                    final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                    catalog.createModel(path, resolvedModel, ignoreIfExists);
+                    catalogModificationListeners.forEach(
+                            listener ->
+                                    listener.onEvent(
+                                            CreateModelEvent.createEvent(
+                                                    
CatalogContext.createContext(
+                                                            
objectIdentifier.getCatalogName(),
+                                                            catalog),
+                                                    objectIdentifier,
+                                                    resolvedModel,
+                                                    ignoreIfExists,
+                                                    false)));
+                },
+                objectIdentifier,
+                false,
+                "CreateModel");
+    }
+
+    /**
+     * Creates a temporary model in a given fully qualified path.
+     *
+     * @param model The resolved model to put in the given path.
+     * @param objectIdentifier The fully qualified path where to put the model.
+     * @param ignoreIfExists if false exception will be thrown if a model 
exists in the given path.
+     */
+    public void createTemporaryModel(
+            CatalogModel model, ObjectIdentifier objectIdentifier, boolean 
ignoreIfExists) {
+        Optional<TemporaryOperationListener> listener =
+                getTemporaryOperationListener(objectIdentifier);
+        temporaryModels.compute(
+                objectIdentifier,
+                (k, v) -> {
+                    if (v != null) {
+                        if (!ignoreIfExists) {
+                            throw new ValidationException(
+                                    String.format(
+                                            "Temporary model '%s' already 
exists",
+                                            objectIdentifier));
+                        }
+                        return v;
+                    } else {
+                        ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                        Catalog catalog =
+                                
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+                        if (listener.isPresent()) {
+                            return listener.get()
+                                    .onCreateTemporaryModel(
+                                            objectIdentifier.toObjectPath(), 
resolvedModel);
+                        }
+                        catalogModificationListeners.forEach(
+                                l ->
+                                        l.onEvent(
+                                                CreateModelEvent.createEvent(
+                                                        
CatalogContext.createContext(
+                                                                
objectIdentifier.getCatalogName(),
+                                                                catalog),
+                                                        objectIdentifier,
+                                                        resolvedModel,
+                                                        ignoreIfExists,
+                                                        true)));
+                        return resolvedModel;
+                    }
+                });
+    }
+
+    /**
+     * Alters a model in a given fully qualified path.
+     *
+     * @param modelChange The model containing only changes
+     * @param objectIdentifier The fully qualified path where to alter the 
model.
+     * @param ignoreIfNotExists If false exception will be thrown if the model 
to be altered does
+     *     not exist.
+     */
+    public void alterModel(

Review Comment:
   We should simply copy what tables do.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Objects;
+
+/** A catalog model implementation. */
+@Internal
+public class DefaultCatalogModel implements CatalogModel {
+    private final Schema inputSchema;
+    private final Schema outputSchema;
+    private final Map<String, String> modelOptions;
+    private final @Nullable String comment;
+
+    protected DefaultCatalogModel(
+            Schema inputSchema,
+            Schema outputSchema,
+            Map<String, String> modelOptions,
+            @Nullable String comment) {
+        this.inputSchema = inputSchema;

Review Comment:
   add null checks for the args to avoid nulls travelling through the stack



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java:
##########
@@ -256,6 +262,114 @@ public void testDropCurrentDatabase() throws Exception {
                 .hasMessage("Cannot drop a database which is currently in 
use.");
     }
 
+    @Test
+    public void testModelModificationListener() throws Exception {
+        CompletableFuture<CreateModelEvent> createFuture = new 
CompletableFuture<>();
+        CompletableFuture<CreateModelEvent> createTemporaryFuture = new 
CompletableFuture<>();
+        CompletableFuture<AlterModelEvent> alterFuture = new 
CompletableFuture<>();
+        CompletableFuture<DropModelEvent> dropFuture = new 
CompletableFuture<>();
+        CompletableFuture<DropModelEvent> dropTemporaryFuture = new 
CompletableFuture<>();
+        CatalogManager catalogManager =
+                CatalogManager.newBuilder()

Review Comment:
   use `org.apache.flink.table.utils.CatalogManagerMocks#preparedCatalogManager`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogModel.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** Interface for a model in a catalog. */
+@PublicEvolving
+public interface CatalogModel {
+    /** Returns a map of string-based model options. */
+    Map<String, String> getOptions();
+
+    /**
+     * Get the unresolved input schema of the model.
+     *
+     * @return unresolved input schema of the model.
+     */
+    Schema getInputSchema();

Review Comment:
   So can this be null or not?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1341,6 +1352,293 @@ private void dropTableInternal(
         }
     }
 
+    /**
+     * Retrieves a fully qualified model. If the path is not yet fully 
qualified use {@link
+     * #qualifyIdentifier(UnresolvedIdentifier)} first.
+     *
+     * @param objectIdentifier full path of the model to retrieve
+     * @return model that the path points to.
+     */
+    public Optional<ContextResolvedModel> getModel(ObjectIdentifier 
objectIdentifier) {
+        CatalogModel temporaryModel = temporaryModels.get(objectIdentifier);
+        if (temporaryModel != null) {
+            final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(temporaryModel);
+            return 
Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel));
+        }
+        Optional<Catalog> catalogOptional = 
getCatalog(objectIdentifier.getCatalogName());
+        ObjectPath objectPath = objectIdentifier.toObjectPath();
+        if (catalogOptional.isPresent()) {
+            Catalog currentCatalog = catalogOptional.get();
+            try {
+                final CatalogModel model = currentCatalog.getModel(objectPath);
+                if (model != null) {
+                    final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                    return Optional.of(
+                            ContextResolvedModel.permanent(
+                                    objectIdentifier, currentCatalog, 
resolvedModel));
+                }
+            } catch (ModelNotExistException e) {
+                // Ignore.
+            } catch (UnsupportedOperationException e) {
+                // Ignore for catalogs that don't support models.
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the 
model is not available
+     * in any of the catalogs.
+     */
+    public ContextResolvedModel getModelOrError(ObjectIdentifier 
objectIdentifier) {
+        return getModel(objectIdentifier)
+                .orElseThrow(
+                        () ->
+                                new TableException(
+                                        String.format(
+                                                "Cannot find model '%s' in any 
of the catalogs %s.",
+                                                objectIdentifier, 
listCatalogs())));
+    }
+
+    /**
+     * Return whether the model with a fully qualified table path is temporary 
or not.
+     *
+     * @param objectIdentifier full path of the table
+     * @return the model is temporary or not.
+     */
+    public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) {
+        return temporaryModels.containsKey(objectIdentifier);
+    }
+
+    /**
+     * Returns an array of names of all models registered in the namespace of 
the current catalog
+     * and database.
+     *
+     * @return names of all registered models
+     */
+    public Set<String> listModels() {
+        return listModels(getCurrentCatalog(), getCurrentDatabase());
+    }
+
+    /**
+     * Returns an array of names of all models registered in the namespace of 
the given catalog and
+     * database.
+     *
+     * @return names of all registered models
+     */
+    public Set<String> listModels(String catalogName, String databaseName) {
+        Catalog catalog = getCatalogOrThrowException(catalogName);
+        if (catalog == null) {
+            throw new ValidationException(String.format("Catalog %s does not 
exist", catalogName));
+        }
+        try {
+            return new HashSet<>(catalog.listModels(databaseName));
+        } catch (DatabaseNotExistException e) {
+            throw new ValidationException(
+                    String.format("Database %s does not exist", databaseName), 
e);
+        }
+    }
+
+    /**
+     * Creates a model in a given fully qualified path.
+     *
+     * @param model The resolved model to put in the given path.
+     * @param objectIdentifier The fully qualified path where to put the model.
+     * @param ignoreIfExists If false exception will be thrown if a model 
exists in the given path.
+     */
+    public void createModel(
+            CatalogModel model, ObjectIdentifier objectIdentifier, boolean 
ignoreIfExists) {
+        execute(
+                (catalog, path) -> {
+                    final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                    catalog.createModel(path, resolvedModel, ignoreIfExists);
+                    catalogModificationListeners.forEach(
+                            listener ->
+                                    listener.onEvent(
+                                            CreateModelEvent.createEvent(
+                                                    
CatalogContext.createContext(
+                                                            
objectIdentifier.getCatalogName(),
+                                                            catalog),
+                                                    objectIdentifier,
+                                                    resolvedModel,
+                                                    ignoreIfExists,
+                                                    false)));
+                },
+                objectIdentifier,
+                false,
+                "CreateModel");
+    }
+
+    /**
+     * Creates a temporary model in a given fully qualified path.
+     *
+     * @param model The resolved model to put in the given path.
+     * @param objectIdentifier The fully qualified path where to put the model.
+     * @param ignoreIfExists if false exception will be thrown if a model 
exists in the given path.
+     */
+    public void createTemporaryModel(
+            CatalogModel model, ObjectIdentifier objectIdentifier, boolean 
ignoreIfExists) {
+        Optional<TemporaryOperationListener> listener =
+                getTemporaryOperationListener(objectIdentifier);
+        temporaryModels.compute(
+                objectIdentifier,
+                (k, v) -> {
+                    if (v != null) {
+                        if (!ignoreIfExists) {
+                            throw new ValidationException(
+                                    String.format(
+                                            "Temporary model '%s' already 
exists",
+                                            objectIdentifier));
+                        }
+                        return v;
+                    } else {
+                        ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+                        Catalog catalog =
+                                
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+                        if (listener.isPresent()) {
+                            return listener.get()
+                                    .onCreateTemporaryModel(
+                                            objectIdentifier.toObjectPath(), 
resolvedModel);
+                        }
+                        catalogModificationListeners.forEach(
+                                l ->
+                                        l.onEvent(
+                                                CreateModelEvent.createEvent(
+                                                        
CatalogContext.createContext(
+                                                                
objectIdentifier.getCatalogName(),
+                                                                catalog),
+                                                        objectIdentifier,
+                                                        resolvedModel,
+                                                        ignoreIfExists,
+                                                        true)));
+                        return resolvedModel;
+                    }
+                });
+    }
+
+    /**
+     * Alters a model in a given fully qualified path.
+     *
+     * @param modelChange The model containing only changes
+     * @param objectIdentifier The fully qualified path where to alter the 
model.
+     * @param ignoreIfNotExists If false exception will be thrown if the model 
to be altered does
+     *     not exist.
+     */
+    public void alterModel(

Review Comment:
   And as far as I can see altering temporary objects is not supported.



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java:
##########
@@ -23,16 +23,21 @@
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
+import org.apache.flink.table.catalog.listener.AlterModelEvent;
 import org.apache.flink.table.catalog.listener.AlterTableEvent;
 import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
 import org.apache.flink.table.catalog.listener.CatalogModificationListener;
 import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
+import org.apache.flink.table.catalog.listener.CreateModelEvent;
 import org.apache.flink.table.catalog.listener.CreateTableEvent;
 import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
+import org.apache.flink.table.catalog.listener.DropModelEvent;
 import org.apache.flink.table.catalog.listener.DropTableEvent;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.table.utils.ExpressionResolverMocks;
 
+import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap;

Review Comment:
   the use of Guava should be avoided, use a regular HashMap



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java:
##########
@@ -460,10 +617,16 @@ void testCatalogStore() throws Exception {
                         Collections.emptyMap()),
                 ObjectIdentifier.of("exist_cat", "cat_db", "test_table"),
                 false);
+        catalogManager.createModel(
+                CatalogModel.of(Schema.derived(), Schema.derived(), 
Collections.emptyMap(), null),

Review Comment:
   I'm fine using Schema.derived here but then you should not perform null 
checks in CatalogManager. Because Schema.derived is not null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to