lihaosky commented on code in PR #25211: URL: https://github.com/apache/flink/pull/25211#discussion_r1800214336
########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -1341,6 +1352,293 @@ private void dropTableInternal( } } + /** + * Retrieves a fully qualified model. If the path is not yet fully qualified use {@link + * #qualifyIdentifier(UnresolvedIdentifier)} first. + * + * @param objectIdentifier full path of the model to retrieve + * @return model that the path points to. + */ + public Optional<ContextResolvedModel> getModel(ObjectIdentifier objectIdentifier) { + CatalogModel temporaryModel = temporaryModels.get(objectIdentifier); + if (temporaryModel != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(temporaryModel); + return Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel)); + } + Optional<Catalog> catalogOptional = getCatalog(objectIdentifier.getCatalogName()); + ObjectPath objectPath = objectIdentifier.toObjectPath(); + if (catalogOptional.isPresent()) { + Catalog currentCatalog = catalogOptional.get(); + try { + final CatalogModel model = currentCatalog.getModel(objectPath); + if (model != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + return Optional.of( + ContextResolvedModel.permanent( + objectIdentifier, currentCatalog, resolvedModel)); + } + } catch (ModelNotExistException e) { + // Ignore. + } catch (UnsupportedOperationException e) { + // Ignore for catalogs that don't support models. Review Comment: Based on @twalthr 's previous comment, we return empty here instead of not supported exception ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -1341,6 +1352,293 @@ private void dropTableInternal( } } + /** + * Retrieves a fully qualified model. If the path is not yet fully qualified use {@link + * #qualifyIdentifier(UnresolvedIdentifier)} first. + * + * @param objectIdentifier full path of the model to retrieve + * @return model that the path points to. + */ + public Optional<ContextResolvedModel> getModel(ObjectIdentifier objectIdentifier) { + CatalogModel temporaryModel = temporaryModels.get(objectIdentifier); + if (temporaryModel != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(temporaryModel); + return Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel)); + } + Optional<Catalog> catalogOptional = getCatalog(objectIdentifier.getCatalogName()); + ObjectPath objectPath = objectIdentifier.toObjectPath(); + if (catalogOptional.isPresent()) { + Catalog currentCatalog = catalogOptional.get(); + try { + final CatalogModel model = currentCatalog.getModel(objectPath); + if (model != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + return Optional.of( + ContextResolvedModel.permanent( + objectIdentifier, currentCatalog, resolvedModel)); + } + } catch (ModelNotExistException e) { + // Ignore. + } catch (UnsupportedOperationException e) { + // Ignore for catalogs that don't support models. + } + } + return Optional.empty(); + } + + /** + * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the model is not available + * in any of the catalogs. + */ + public ContextResolvedModel getModelOrError(ObjectIdentifier objectIdentifier) { + return getModel(objectIdentifier) + .orElseThrow( + () -> + new TableException( + String.format( + "Cannot find model '%s' in any of the catalogs %s.", + objectIdentifier, listCatalogs()))); + } + + /** + * Return whether the model with a fully qualified table path is temporary or not. + * + * @param objectIdentifier full path of the table + * @return the model is temporary or not. + */ + public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) { + return temporaryModels.containsKey(objectIdentifier); + } + + /** + * Returns an array of names of all models registered in the namespace of the current catalog + * and database. + * + * @return names of all registered models + */ + public Set<String> listModels() { + return listModels(getCurrentCatalog(), getCurrentDatabase()); + } + + /** + * Returns an array of names of all models registered in the namespace of the given catalog and + * database. + * + * @return names of all registered models + */ + public Set<String> listModels(String catalogName, String databaseName) { + Catalog catalog = getCatalogOrThrowException(catalogName); + if (catalog == null) { + throw new ValidationException(String.format("Catalog %s does not exist", catalogName)); + } + try { + return new HashSet<>(catalog.listModels(databaseName)); + } catch (DatabaseNotExistException e) { + throw new ValidationException( + String.format("Database %s does not exist", databaseName), e); + } + } + + /** + * Creates a model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists If false exception will be thrown if a model exists in the given path. + */ + public void createModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + execute( + (catalog, path) -> { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + catalog.createModel(path, resolvedModel, ignoreIfExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + CreateModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfExists, + false))); + }, + objectIdentifier, + false, + "CreateModel"); + } + + /** + * Creates a temporary model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists if false exception will be thrown if a model exists in the given path. + */ + public void createTemporaryModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + Optional<TemporaryOperationListener> listener = + getTemporaryOperationListener(objectIdentifier); + temporaryModels.compute( + objectIdentifier, + (k, v) -> { + if (v != null) { + if (!ignoreIfExists) { + throw new ValidationException( + String.format( + "Temporary model '%s' already exists", + objectIdentifier)); + } + return v; + } else { + ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + Catalog catalog = + getCatalog(objectIdentifier.getCatalogName()).orElse(null); + if (listener.isPresent()) { + return listener.get() + .onCreateTemporaryModel( + objectIdentifier.toObjectPath(), resolvedModel); + } + catalogModificationListeners.forEach( + l -> + l.onEvent( + CreateModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfExists, + true))); + return resolvedModel; + } + }); + } + + /** + * Alters a model in a given fully qualified path. + * + * @param modelChange The model containing only changes + * @param objectIdentifier The fully qualified path where to alter the model. + * @param ignoreIfNotExists If false exception will be thrown if the model to be altered does + * not exist. + */ + public void alterModel( + CatalogModel modelChange, + ObjectIdentifier objectIdentifier, + boolean ignoreIfNotExists) { + execute( + (catalog, path) -> { + ResolvedCatalogModel resolvedModel = resolveCatalogModel(modelChange); + catalog.alterModel(path, resolvedModel, ignoreIfNotExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + AlterModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfNotExists))); + }, + objectIdentifier, + ignoreIfNotExists, + "AlterModel"); + } + + /** + * Drops a model in a given fully qualified path. + * + * @param objectIdentifier The fully qualified path of the model to drop. + * @param ignoreIfNotExists If false exception will be thrown if the model to drop does not + * exist. + */ + public void dropModel(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { + execute( + (catalog, path) -> { + Optional<ContextResolvedModel> resultOpt = getModel(objectIdentifier); + if (resultOpt.isPresent()) { + ResolvedCatalogModel resolvedModel = resultOpt.get().getResolvedModel(); + catalog.dropModel(path, ignoreIfNotExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + DropModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfNotExists, + false))); + } else if (!ignoreIfNotExists) { + throw new ModelNotExistException( + objectIdentifier.getCatalogName(), objectIdentifier.toObjectPath()); + } + }, + objectIdentifier, + ignoreIfNotExists, + "DropModel"); + } + + /** + * Drop a temporary model in a given fully qualified path. + * + * @param objectIdentifier The fully qualified path of the model to drop. + * @param ignoreIfNotExists If false exception will be thrown if the model to be dropped does + * not exist. + */ + public void dropTemporaryModel(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { + CatalogModel model = temporaryModels.get(objectIdentifier); + if (model != null) { + getTemporaryOperationListener(objectIdentifier) + .ifPresent(l -> l.onDropTemporaryModel(objectIdentifier.toObjectPath())); + + Catalog catalog = getCatalog(objectIdentifier.getCatalogName()).orElse(null); + ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + temporaryModels.remove(objectIdentifier); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + DropModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), catalog), + objectIdentifier, + resolvedModel, + ignoreIfNotExists, + true))); + } else if (!ignoreIfNotExists) { + throw new ValidationException( + String.format( + "Temporary model with identifier '%s' does not exist.", + objectIdentifier.asSummaryString())); + } + } + + public ResolvedCatalogModel resolveCatalogModel(CatalogModel model) { + Preconditions.checkNotNull(schemaResolver, "Schema resolver is not initialized."); + if (model instanceof ResolvedCatalogModel) { + return (ResolvedCatalogModel) model; + } + // If input schema / output schema does not exist, get the schema from select as statement. + final ResolvedSchema resolvedInputSchema; + if (model.getInputSchema() == null) { Review Comment: Yes. It should be initialized as empty list from parser. So will not be null. Will remove this ########## flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java: ########## @@ -460,10 +617,16 @@ void testCatalogStore() throws Exception { Collections.emptyMap()), ObjectIdentifier.of("exist_cat", "cat_db", "test_table"), false); + catalogManager.createModel( + CatalogModel.of(Schema.derived(), Schema.derived(), Collections.emptyMap(), null), Review Comment: Removed null check ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -1341,6 +1352,293 @@ private void dropTableInternal( } } + /** + * Retrieves a fully qualified model. If the path is not yet fully qualified use {@link + * #qualifyIdentifier(UnresolvedIdentifier)} first. + * + * @param objectIdentifier full path of the model to retrieve + * @return model that the path points to. + */ + public Optional<ContextResolvedModel> getModel(ObjectIdentifier objectIdentifier) { + CatalogModel temporaryModel = temporaryModels.get(objectIdentifier); + if (temporaryModel != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(temporaryModel); + return Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel)); + } + Optional<Catalog> catalogOptional = getCatalog(objectIdentifier.getCatalogName()); + ObjectPath objectPath = objectIdentifier.toObjectPath(); + if (catalogOptional.isPresent()) { + Catalog currentCatalog = catalogOptional.get(); + try { + final CatalogModel model = currentCatalog.getModel(objectPath); + if (model != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + return Optional.of( + ContextResolvedModel.permanent( + objectIdentifier, currentCatalog, resolvedModel)); + } + } catch (ModelNotExistException e) { + // Ignore. + } catch (UnsupportedOperationException e) { + // Ignore for catalogs that don't support models. + } + } + return Optional.empty(); + } + + /** + * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the model is not available + * in any of the catalogs. + */ + public ContextResolvedModel getModelOrError(ObjectIdentifier objectIdentifier) { + return getModel(objectIdentifier) + .orElseThrow( + () -> + new TableException( + String.format( + "Cannot find model '%s' in any of the catalogs %s.", + objectIdentifier, listCatalogs()))); + } + + /** + * Return whether the model with a fully qualified table path is temporary or not. + * + * @param objectIdentifier full path of the table + * @return the model is temporary or not. + */ + public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) { + return temporaryModels.containsKey(objectIdentifier); + } + + /** + * Returns an array of names of all models registered in the namespace of the current catalog + * and database. + * + * @return names of all registered models + */ + public Set<String> listModels() { + return listModels(getCurrentCatalog(), getCurrentDatabase()); + } + + /** + * Returns an array of names of all models registered in the namespace of the given catalog and + * database. + * + * @return names of all registered models + */ + public Set<String> listModels(String catalogName, String databaseName) { + Catalog catalog = getCatalogOrThrowException(catalogName); + if (catalog == null) { + throw new ValidationException(String.format("Catalog %s does not exist", catalogName)); + } + try { + return new HashSet<>(catalog.listModels(databaseName)); + } catch (DatabaseNotExistException e) { + throw new ValidationException( + String.format("Database %s does not exist", databaseName), e); + } + } + + /** + * Creates a model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists If false exception will be thrown if a model exists in the given path. + */ + public void createModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + execute( + (catalog, path) -> { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + catalog.createModel(path, resolvedModel, ignoreIfExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + CreateModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfExists, + false))); + }, + objectIdentifier, + false, + "CreateModel"); + } + + /** + * Creates a temporary model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists if false exception will be thrown if a model exists in the given path. + */ + public void createTemporaryModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + Optional<TemporaryOperationListener> listener = + getTemporaryOperationListener(objectIdentifier); + temporaryModels.compute( + objectIdentifier, + (k, v) -> { + if (v != null) { + if (!ignoreIfExists) { + throw new ValidationException( + String.format( + "Temporary model '%s' already exists", + objectIdentifier)); + } + return v; Review Comment: I think checking just name is fine. `createTemporaryTable` does same thing ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -1341,6 +1352,293 @@ private void dropTableInternal( } } + /** + * Retrieves a fully qualified model. If the path is not yet fully qualified use {@link + * #qualifyIdentifier(UnresolvedIdentifier)} first. + * + * @param objectIdentifier full path of the model to retrieve + * @return model that the path points to. + */ + public Optional<ContextResolvedModel> getModel(ObjectIdentifier objectIdentifier) { + CatalogModel temporaryModel = temporaryModels.get(objectIdentifier); + if (temporaryModel != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(temporaryModel); + return Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel)); + } + Optional<Catalog> catalogOptional = getCatalog(objectIdentifier.getCatalogName()); + ObjectPath objectPath = objectIdentifier.toObjectPath(); + if (catalogOptional.isPresent()) { + Catalog currentCatalog = catalogOptional.get(); + try { + final CatalogModel model = currentCatalog.getModel(objectPath); + if (model != null) { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + return Optional.of( + ContextResolvedModel.permanent( + objectIdentifier, currentCatalog, resolvedModel)); + } + } catch (ModelNotExistException e) { + // Ignore. + } catch (UnsupportedOperationException e) { + // Ignore for catalogs that don't support models. + } + } + return Optional.empty(); + } + + /** + * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the model is not available + * in any of the catalogs. + */ + public ContextResolvedModel getModelOrError(ObjectIdentifier objectIdentifier) { + return getModel(objectIdentifier) + .orElseThrow( + () -> + new TableException( + String.format( + "Cannot find model '%s' in any of the catalogs %s.", + objectIdentifier, listCatalogs()))); + } + + /** + * Return whether the model with a fully qualified table path is temporary or not. + * + * @param objectIdentifier full path of the table + * @return the model is temporary or not. + */ + public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) { + return temporaryModels.containsKey(objectIdentifier); + } + + /** + * Returns an array of names of all models registered in the namespace of the current catalog + * and database. + * + * @return names of all registered models + */ + public Set<String> listModels() { + return listModels(getCurrentCatalog(), getCurrentDatabase()); + } + + /** + * Returns an array of names of all models registered in the namespace of the given catalog and + * database. + * + * @return names of all registered models + */ + public Set<String> listModels(String catalogName, String databaseName) { + Catalog catalog = getCatalogOrThrowException(catalogName); + if (catalog == null) { + throw new ValidationException(String.format("Catalog %s does not exist", catalogName)); + } + try { + return new HashSet<>(catalog.listModels(databaseName)); + } catch (DatabaseNotExistException e) { + throw new ValidationException( + String.format("Database %s does not exist", databaseName), e); + } + } + + /** + * Creates a model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists If false exception will be thrown if a model exists in the given path. + */ + public void createModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + execute( + (catalog, path) -> { + final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + catalog.createModel(path, resolvedModel, ignoreIfExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + CreateModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfExists, + false))); + }, + objectIdentifier, + false, + "CreateModel"); + } + + /** + * Creates a temporary model in a given fully qualified path. + * + * @param model The resolved model to put in the given path. + * @param objectIdentifier The fully qualified path where to put the model. + * @param ignoreIfExists if false exception will be thrown if a model exists in the given path. + */ + public void createTemporaryModel( + CatalogModel model, ObjectIdentifier objectIdentifier, boolean ignoreIfExists) { + Optional<TemporaryOperationListener> listener = + getTemporaryOperationListener(objectIdentifier); + temporaryModels.compute( + objectIdentifier, + (k, v) -> { + if (v != null) { + if (!ignoreIfExists) { + throw new ValidationException( + String.format( + "Temporary model '%s' already exists", + objectIdentifier)); + } + return v; + } else { + ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); + Catalog catalog = + getCatalog(objectIdentifier.getCatalogName()).orElse(null); + if (listener.isPresent()) { + return listener.get() + .onCreateTemporaryModel( + objectIdentifier.toObjectPath(), resolvedModel); + } + catalogModificationListeners.forEach( + l -> + l.onEvent( + CreateModelEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + resolvedModel, + ignoreIfExists, + true))); + return resolvedModel; + } + }); + } + + /** + * Alters a model in a given fully qualified path. + * + * @param modelChange The model containing only changes + * @param objectIdentifier The fully qualified path where to alter the model. + * @param ignoreIfNotExists If false exception will be thrown if the model to be altered does + * not exist. + */ + public void alterModel( Review Comment: Yes. I don't see alter temporary table -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org