twalthr commented on code in PR #25211: URL: https://github.com/apache/flink/pull/25211#discussion_r1799549652
########## flink-python/pyflink/table/catalog.py: ########## @@ -1028,6 +1122,69 @@ def get_function_language(self): return self._j_catalog_function.getFunctionLanguage() +class CatalogModel(object): + """ + Interface for a model in a catalog. + """ + + def __init__(self, j_catalog_model): + self._j_catalog_model = j_catalog_model + + @staticmethod + def create_model( + input_schema: Schema, + output_schema: Schema, + properties: Dict[str, str] = {}, Review Comment: `options` ########## flink-python/pyflink/table/tests/test_catalog.py: ########## @@ -103,6 +116,22 @@ def get_streaming_table_properties(): def create_partition_keys(): return ["second", "third"] + @staticmethod + def create_model(): + return CatalogModel.create_model( + CatalogTestBase.create_model_schema(), + CatalogTestBase.create_model_schema(), + properties={}, + comment="some comment") + + @staticmethod + def create_another_model(): + return CatalogModel.create_model( + CatalogTestBase.create_model_schema(), + CatalogTestBase.create_model_schema(), + properties={"key": "value"}, Review Comment: ```suggestion options={"key": "value"}, ``` ########## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java: ########## @@ -52,6 +60,197 @@ static void init() { catalog.open(); } + // TODO (FLINK-35020) : remove after implementing dropModel in catalog + @AfterEach + void cleanup() throws Exception { + if (catalog.modelExists(modelPath1)) { + ((GenericInMemoryCatalog) catalog).dropModel(modelPath1, true); + } + if (catalog.modelExists(modelPath2)) { + ((GenericInMemoryCatalog) catalog).dropModel(modelPath2, true); + } + super.cleanup(); + } + + // These are put here instead of CatalogTest class since model operations are not implemented Review Comment: rather introduce a `supportsModels(): boolean` method in CatalogTest and move those tests to the testbase. Set this flag to false and ignore them for Hive catalog. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedModel.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.Optional; + +/** + * This class contains information about a model and its relationship with a {@link Catalog}, if + * any. + * + * <p>There can be 2 kinds of {@link ContextResolvedModel}: + * + * <ul> + * <li>A permanent model: a model which is stored in a {@link Catalog} and has an associated + * unique {@link ObjectIdentifier}. + * <li>A temporary model: a model which is stored in the {@link CatalogManager}, has an associated + * unique {@link ObjectIdentifier} and is flagged as temporary. + * </ul> + * + * <p>The different handling of temporary and permanent model is {@link Catalog} and {@link + * CatalogManager} instance specific, hence for these two kind of models, an instance of this object + * represents the relationship between the specific {@link ResolvedCatalogModel} instance and the + * specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link + * ResolvedCatalogModel} can be temporary for one catalog, but permanent for another one. + */ +@Internal +public final class ContextResolvedModel { + + private final ObjectIdentifier objectIdentifier; + private final @Nullable Catalog catalog; + private final ResolvedCatalogModel resolvedModel; + + public static ContextResolvedModel permanent( + ObjectIdentifier identifier, Catalog catalog, ResolvedCatalogModel resolvedModel) { + return new ContextResolvedModel( + identifier, Preconditions.checkNotNull(catalog), resolvedModel); + } + + public static ContextResolvedModel temporary( + ObjectIdentifier identifier, ResolvedCatalogModel resolvedModel) { + return new ContextResolvedModel(identifier, null, resolvedModel); + } + + private ContextResolvedModel( + ObjectIdentifier objectIdentifier, + @Nullable Catalog catalog, + ResolvedCatalogModel resolvedModel) { + this.objectIdentifier = Preconditions.checkNotNull(objectIdentifier); + this.catalog = catalog; + this.resolvedModel = Preconditions.checkNotNull(resolvedModel); + } + + /** @return true if the table is temporary. An anonymous table is always temporary. */ Review Comment: ```suggestion /** @return true if the model is temporary. */ ``` ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -1341,6 +1352,293 @@ private void dropTableInternal( } } + /** + * Retrieves a fully qualified model. If the path is not yet fully qualified use {@link + * #qualifyIdentifier(UnresolvedIdentifier)} first. + * + * @param objectIdentifier full path of the model to retrieve + * @return model that the path points to. + */ + public Optional<ContextResolvedModel> getModel(ObjectIdentifier objectIdentifier) { + CatalogModel temporaryModel = temporaryModels.get(objectIdentifier); + if (temporaryModel != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(temporaryModel); + return Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel)); + } + Optional<Catalog> catalogOptional = getCatalog(objectIdentifier.getCatalogName()); + ObjectPath objectPath = objectIdentifier.toObjectPath(); + if (catalogOptional.isPresent()) { + Catalog currentCatalog = catalogOptional.get(); + try { + final CatalogModel model = currentCatalog.getModel(objectPath); + if (model != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + return Optional.of( + ContextResolvedModel.permanent( + objectIdentifier, currentCatalog, resolvedModel)); + } + } catch (ModelNotExistException e) { + // Ignore. + } catch (UnsupportedOperationException e) { + // Ignore for catalogs that don't support models. + } + } + return Optional.empty(); + } + + /** + * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the model is not available + * in any of the catalogs. + */ + public ContextResolvedModel getModelOrError(ObjectIdentifier objectIdentifier) { + return getModel(objectIdentifier) + .orElseThrow( + () -> + new TableException( + String.format( + "Cannot find model '%s' in any of the catalogs %s.", + objectIdentifier, listCatalogs()))); + } + + /** + * Return whether the model with a fully qualified table path is temporary or not. + * + * @param objectIdentifier full path of the table + * @return the model is temporary or not. + */ + public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) { + return temporaryModels.containsKey(objectIdentifier); + } + + /** + * Returns an array of names of all models registered in the namespace of the current catalog + * and database. + * + * @return names of all registered models + */ + public Set<String> listModels() { + return listModels(getCurrentCatalog(), getCurrentDatabase()); + } + + /** + * Returns an array of names of all models registered in the namespace of the given catalog and + * database. + * + * @return names of all registered models + */ + public Set<String> listModels(String catalogName, String databaseName) { + Catalog catalog = getCatalogOrThrowException(catalogName); + if (catalog == null) { + throw new ValidationException(String.format("Catalog %s does not exist", catalogName)); + } + try { + return new HashSet<>(catalog.listModels(databaseName)); + } catch (DatabaseNotExistException e) { + throw new ValidationException( + String.format("Database %s does not exist", databaseName), e); + } + } + + /** + * Creates a model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists If false exception will be thrown if a model exists in the given path. + */ + public void createModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + execute( + (catalog, path) -> { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + catalog.createModel(path, resolvedModel, ignoreIfExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + CreateModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfExists, + false))); + }, + objectIdentifier, + false, + "CreateModel"); + } + + /** + * Creates a temporary model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists if false exception will be thrown if a model exists in the given path. + */ + public void createTemporaryModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + Optional<TemporaryOperationListener> listener = + getTemporaryOperationListener(objectIdentifier); + temporaryModels.compute( + objectIdentifier, + (k, v) -> { + if (v != null) { + if (!ignoreIfExists) { + throw new ValidationException( + String.format( + "Temporary model '%s' already exists", + objectIdentifier)); + } + return v; + } else { + ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + Catalog catalog = + getCatalog(objectIdentifier.getCatalogName()).orElse(null); + if (listener.isPresent()) { + return listener.get() + .onCreateTemporaryModel( + objectIdentifier.toObjectPath(), resolvedModel); + } + catalogModificationListeners.forEach( + l -> + l.onEvent( + CreateModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfExists, + true))); + return resolvedModel; + } + }); + } + + /** + * Alters a model in a given fully qualified path. + * + * @param modelChange The model containing only changes + * @param objectIdentifier The fully qualified path where to alter the model. + * @param ignoreIfNotExists If false exception will be thrown if the model to be altered does + * not exist. + */ + public void alterModel( + CatalogModel modelChange, + ObjectIdentifier objectIdentifier, + boolean ignoreIfNotExists) { + execute( + (catalog, path) -> { + ResolvedCatalogModel resolvedModel = resolveCatalogModel(modelChange); + catalog.alterModel(path, resolvedModel, ignoreIfNotExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + AlterModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfNotExists))); + }, + objectIdentifier, + ignoreIfNotExists, + "AlterModel"); + } + + /** + * Drops a model in a given fully qualified path. + * + * @param objectIdentifier The fully qualified path of the model to drop. + * @param ignoreIfNotExists If false exception will be thrown if the model to drop does not + * exist. + */ + public void dropModel(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { + execute( + (catalog, path) -> { + Optional<ContextResolvedModel> resultOpt = getModel(objectIdentifier); + if (resultOpt.isPresent()) { + ResolvedCatalogModel resolvedModel = resultOpt.get().getResolvedModel(); + catalog.dropModel(path, ignoreIfNotExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + DropModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfNotExists, + false))); + } else if (!ignoreIfNotExists) { + throw new ModelNotExistException( + objectIdentifier.getCatalogName(), objectIdentifier.toObjectPath()); + } + }, + objectIdentifier, + ignoreIfNotExists, + "DropModel"); + } + + /** + * Drop a temporary model in a given fully qualified path. + * + * @param objectIdentifier The fully qualified path of the model to drop. + * @param ignoreIfNotExists If false exception will be thrown if the model to be dropped does + * not exist. + */ + public void dropTemporaryModel(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { + CatalogModel model = temporaryModels.get(objectIdentifier); + if (model != null) { + getTemporaryOperationListener(objectIdentifier) + .ifPresent(l -> l.onDropTemporaryModel(objectIdentifier.toObjectPath())); + + Catalog catalog = getCatalog(objectIdentifier.getCatalogName()).orElse(null); + ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + temporaryModels.remove(objectIdentifier); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + DropModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), catalog), + objectIdentifier, + resolvedModel, + ignoreIfNotExists, + true))); + } else if (!ignoreIfNotExists) { + throw new ValidationException( + String.format( + "Temporary model with identifier '%s' does not exist.", + objectIdentifier.asSummaryString())); + } + } + + public ResolvedCatalogModel resolveCatalogModel(CatalogModel model) { + Preconditions.checkNotNull(schemaResolver, "Schema resolver is not initialized."); + if (model instanceof ResolvedCatalogModel) { + return (ResolvedCatalogModel) model; + } + // If input schema / output schema does not exist, get the schema from select as statement. + final ResolvedSchema resolvedInputSchema; + if (model.getInputSchema() == null) { Review Comment: Could you provide more information about this case? It looks like this is at the wrong layer. For tables, a catalog object has always a correct output type. Take a look at CREATE TABLE AS which has similar behavior. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -1341,6 +1352,293 @@ private void dropTableInternal( } } + /** + * Retrieves a fully qualified model. If the path is not yet fully qualified use {@link + * #qualifyIdentifier(UnresolvedIdentifier)} first. + * + * @param objectIdentifier full path of the model to retrieve + * @return model that the path points to. + */ + public Optional<ContextResolvedModel> getModel(ObjectIdentifier objectIdentifier) { + CatalogModel temporaryModel = temporaryModels.get(objectIdentifier); + if (temporaryModel != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(temporaryModel); + return Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel)); + } + Optional<Catalog> catalogOptional = getCatalog(objectIdentifier.getCatalogName()); + ObjectPath objectPath = objectIdentifier.toObjectPath(); + if (catalogOptional.isPresent()) { + Catalog currentCatalog = catalogOptional.get(); + try { + final CatalogModel model = currentCatalog.getModel(objectPath); + if (model != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + return Optional.of( + ContextResolvedModel.permanent( + objectIdentifier, currentCatalog, resolvedModel)); + } + } catch (ModelNotExistException e) { + // Ignore. + } catch (UnsupportedOperationException e) { + // Ignore for catalogs that don't support models. + } + } + return Optional.empty(); + } + + /** + * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the model is not available + * in any of the catalogs. + */ + public ContextResolvedModel getModelOrError(ObjectIdentifier objectIdentifier) { + return getModel(objectIdentifier) + .orElseThrow( + () -> + new TableException( + String.format( + "Cannot find model '%s' in any of the catalogs %s.", + objectIdentifier, listCatalogs()))); + } + + /** + * Return whether the model with a fully qualified table path is temporary or not. + * + * @param objectIdentifier full path of the table + * @return the model is temporary or not. + */ + public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) { + return temporaryModels.containsKey(objectIdentifier); + } + + /** + * Returns an array of names of all models registered in the namespace of the current catalog + * and database. + * + * @return names of all registered models + */ + public Set<String> listModels() { + return listModels(getCurrentCatalog(), getCurrentDatabase()); + } + + /** + * Returns an array of names of all models registered in the namespace of the given catalog and + * database. + * + * @return names of all registered models + */ + public Set<String> listModels(String catalogName, String databaseName) { + Catalog catalog = getCatalogOrThrowException(catalogName); + if (catalog == null) { + throw new ValidationException(String.format("Catalog %s does not exist", catalogName)); + } + try { + return new HashSet<>(catalog.listModels(databaseName)); + } catch (DatabaseNotExistException e) { + throw new ValidationException( + String.format("Database %s does not exist", databaseName), e); + } + } + + /** + * Creates a model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists If false exception will be thrown if a model exists in the given path. + */ + public void createModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + execute( + (catalog, path) -> { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + catalog.createModel(path, resolvedModel, ignoreIfExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + CreateModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfExists, + false))); + }, + objectIdentifier, + false, + "CreateModel"); + } + + /** + * Creates a temporary model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists if false exception will be thrown if a model exists in the given path. + */ + public void createTemporaryModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + Optional<TemporaryOperationListener> listener = + getTemporaryOperationListener(objectIdentifier); + temporaryModels.compute( + objectIdentifier, + (k, v) -> { + if (v != null) { + if (!ignoreIfExists) { + throw new ValidationException( + String.format( + "Temporary model '%s' already exists", + objectIdentifier)); + } + return v; + } else { + ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + Catalog catalog = + getCatalog(objectIdentifier.getCatalogName()).orElse(null); + if (listener.isPresent()) { + return listener.get() + .onCreateTemporaryModel( + objectIdentifier.toObjectPath(), resolvedModel); + } + catalogModificationListeners.forEach( + l -> + l.onEvent( + CreateModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfExists, + true))); + return resolvedModel; + } + }); + } + + /** + * Alters a model in a given fully qualified path. + * + * @param modelChange The model containing only changes + * @param objectIdentifier The fully qualified path where to alter the model. + * @param ignoreIfNotExists If false exception will be thrown if the model to be altered does + * not exist. + */ + public void alterModel( Review Comment: We should simply copy what tables do. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Objects; + +/** A catalog model implementation. */ +@Internal +public class DefaultCatalogModel implements CatalogModel { + private final Schema inputSchema; + private final Schema outputSchema; + private final Map<String, String> modelOptions; + private final @Nullable String comment; + + protected DefaultCatalogModel( + Schema inputSchema, + Schema outputSchema, + Map<String, String> modelOptions, + @Nullable String comment) { + this.inputSchema = inputSchema; Review Comment: add null checks for the args to avoid nulls travelling through the stack ########## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java: ########## @@ -256,6 +262,114 @@ public void testDropCurrentDatabase() throws Exception { .hasMessage("Cannot drop a database which is currently in use."); } + @Test + public void testModelModificationListener() throws Exception { + CompletableFuture<CreateModelEvent> createFuture = new CompletableFuture<>(); + CompletableFuture<CreateModelEvent> createTemporaryFuture = new CompletableFuture<>(); + CompletableFuture<AlterModelEvent> alterFuture = new CompletableFuture<>(); + CompletableFuture<DropModelEvent> dropFuture = new CompletableFuture<>(); + CompletableFuture<DropModelEvent> dropTemporaryFuture = new CompletableFuture<>(); + CatalogManager catalogManager = + CatalogManager.newBuilder() Review Comment: use `org.apache.flink.table.utils.CatalogManagerMocks#preparedCatalogManager` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogModel.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.Schema; + +import javax.annotation.Nullable; + +import java.util.Map; + +/** Interface for a model in a catalog. */ +@PublicEvolving +public interface CatalogModel { + /** Returns a map of string-based model options. */ + Map<String, String> getOptions(); + + /** + * Get the unresolved input schema of the model. + * + * @return unresolved input schema of the model. + */ + Schema getInputSchema(); Review Comment: So can this be null or not? ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -1341,6 +1352,293 @@ private void dropTableInternal( } } + /** + * Retrieves a fully qualified model. If the path is not yet fully qualified use {@link + * #qualifyIdentifier(UnresolvedIdentifier)} first. + * + * @param objectIdentifier full path of the model to retrieve + * @return model that the path points to. + */ + public Optional<ContextResolvedModel> getModel(ObjectIdentifier objectIdentifier) { + CatalogModel temporaryModel = temporaryModels.get(objectIdentifier); + if (temporaryModel != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(temporaryModel); + return Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel)); + } + Optional<Catalog> catalogOptional = getCatalog(objectIdentifier.getCatalogName()); + ObjectPath objectPath = objectIdentifier.toObjectPath(); + if (catalogOptional.isPresent()) { + Catalog currentCatalog = catalogOptional.get(); + try { + final CatalogModel model = currentCatalog.getModel(objectPath); + if (model != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + return Optional.of( + ContextResolvedModel.permanent( + objectIdentifier, currentCatalog, resolvedModel)); + } + } catch (ModelNotExistException e) { + // Ignore. + } catch (UnsupportedOperationException e) { + // Ignore for catalogs that don't support models. + } + } + return Optional.empty(); + } + + /** + * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the model is not available + * in any of the catalogs. + */ + public ContextResolvedModel getModelOrError(ObjectIdentifier objectIdentifier) { + return getModel(objectIdentifier) + .orElseThrow( + () -> + new TableException( + String.format( + "Cannot find model '%s' in any of the catalogs %s.", + objectIdentifier, listCatalogs()))); + } + + /** + * Return whether the model with a fully qualified table path is temporary or not. + * + * @param objectIdentifier full path of the table + * @return the model is temporary or not. + */ + public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) { + return temporaryModels.containsKey(objectIdentifier); + } + + /** + * Returns an array of names of all models registered in the namespace of the current catalog + * and database. + * + * @return names of all registered models + */ + public Set<String> listModels() { + return listModels(getCurrentCatalog(), getCurrentDatabase()); + } + + /** + * Returns an array of names of all models registered in the namespace of the given catalog and + * database. + * + * @return names of all registered models + */ + public Set<String> listModels(String catalogName, String databaseName) { + Catalog catalog = getCatalogOrThrowException(catalogName); + if (catalog == null) { + throw new ValidationException(String.format("Catalog %s does not exist", catalogName)); + } + try { + return new HashSet<>(catalog.listModels(databaseName)); + } catch (DatabaseNotExistException e) { + throw new ValidationException( + String.format("Database %s does not exist", databaseName), e); + } + } + + /** + * Creates a model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists If false exception will be thrown if a model exists in the given path. + */ + public void createModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + execute( + (catalog, path) -> { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + catalog.createModel(path, resolvedModel, ignoreIfExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + CreateModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfExists, + false))); + }, + objectIdentifier, + false, + "CreateModel"); + } + + /** + * Creates a temporary model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists if false exception will be thrown if a model exists in the given path. + */ + public void createTemporaryModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + Optional<TemporaryOperationListener> listener = + getTemporaryOperationListener(objectIdentifier); + temporaryModels.compute( + objectIdentifier, + (k, v) -> { + if (v != null) { + if (!ignoreIfExists) { + throw new ValidationException( + String.format( + "Temporary model '%s' already exists", + objectIdentifier)); + } + return v; + } else { + ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + Catalog catalog = + getCatalog(objectIdentifier.getCatalogName()).orElse(null); + if (listener.isPresent()) { + return listener.get() + .onCreateTemporaryModel( + objectIdentifier.toObjectPath(), resolvedModel); + } + catalogModificationListeners.forEach( + l -> + l.onEvent( + CreateModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfExists, + true))); + return resolvedModel; + } + }); + } + + /** + * Alters a model in a given fully qualified path. + * + * @param modelChange The model containing only changes + * @param objectIdentifier The fully qualified path where to alter the model. + * @param ignoreIfNotExists If false exception will be thrown if the model to be altered does + * not exist. + */ + public void alterModel( Review Comment: And as far as I can see altering temporary objects is not supported. ########## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java: ########## @@ -23,16 +23,21 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.listener.AlterDatabaseEvent; +import org.apache.flink.table.catalog.listener.AlterModelEvent; import org.apache.flink.table.catalog.listener.AlterTableEvent; import org.apache.flink.table.catalog.listener.CatalogModificationEvent; import org.apache.flink.table.catalog.listener.CatalogModificationListener; import org.apache.flink.table.catalog.listener.CreateDatabaseEvent; +import org.apache.flink.table.catalog.listener.CreateModelEvent; import org.apache.flink.table.catalog.listener.CreateTableEvent; import org.apache.flink.table.catalog.listener.DropDatabaseEvent; +import org.apache.flink.table.catalog.listener.DropModelEvent; import org.apache.flink.table.catalog.listener.DropTableEvent; import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.table.utils.ExpressionResolverMocks; +import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap; Review Comment: the use of Guava should be avoided, use a regular HashMap ########## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java: ########## @@ -460,10 +617,16 @@ void testCatalogStore() throws Exception { Collections.emptyMap()), ObjectIdentifier.of("exist_cat", "cat_db", "test_table"), false); + catalogManager.createModel( + CatalogModel.of(Schema.derived(), Schema.derived(), Collections.emptyMap(), null), Review Comment: I'm fine using Schema.derived here but then you should not perform null checks in CatalogManager. Because Schema.derived is not null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org