twalthr commented on a change in pull request #18349: URL: https://github.com/apache/flink/pull/18349#discussion_r786804756
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class contains information about a table, its {@link ResolvedSchema}, its options and its + * relationship with a {@link Catalog}, if any. + * + * <p>There can be 3 kinds of {@link ContextResolvedTable}: + * + * <ul> + * <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated + * unique {@link ObjectIdentifier}. + * <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated + * unique {@link ObjectIdentifier} and is flagged as temporary. + * <li>An anonymous/inline table: a table which is not stored in a catalog and doesn't have an + * associated unique {@link ObjectIdentifier}. + * </ul> + * + * <p>The different handling of temporary and permanent tables is {@link Catalog} and {@link + * CatalogManager} instance specific, hence for these two kind of tables, an instance of this object + * represents the relationship between the specific {@link ResolvedCatalogBaseTable} instance and + * the specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link + * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent for another one. + */ +@Internal +public class ContextResolvedTable { + + private static final AtomicInteger uniqueId = new AtomicInteger(0); + + private final ObjectIdentifier objectIdentifier; + private final @Nullable Catalog catalog; + private final ResolvedCatalogBaseTable<?> resolvedTable; + private final boolean anonymous; + + public static ContextResolvedTable permanent( + ObjectIdentifier identifier, + Catalog catalog, + ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable( + identifier, Preconditions.checkNotNull(catalog), resolvedTable, false); + } + + public static ContextResolvedTable temporary( + ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable(identifier, null, resolvedTable, false); + } + + public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable( + ObjectIdentifier.ofAnonymous( + generateAnonymousStringIdentifier(null, resolvedTable)), + null, + resolvedTable, + true); + } + + public static ContextResolvedTable anonymous( + String hint, ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable( + ObjectIdentifier.ofAnonymous( + generateAnonymousStringIdentifier(hint, resolvedTable)), + null, + resolvedTable, + true); + } + + private ContextResolvedTable( + ObjectIdentifier objectIdentifier, + @Nullable Catalog catalog, + ResolvedCatalogBaseTable<?> resolvedTable, + boolean anonymous) { + this.objectIdentifier = Preconditions.checkNotNull(objectIdentifier); + this.catalog = catalog; + this.resolvedTable = Preconditions.checkNotNull(resolvedTable); + this.anonymous = anonymous; + } + + public boolean isAnonymous() { + return this.anonymous; + } + + /** @return true if the table is temporary. An anonymous table is always temporary. */ + public boolean isTemporary() { + return catalog == null; + } + + public boolean isPermanent() { + return !isTemporary(); + } + + public ObjectIdentifier getIdentifier() { + return objectIdentifier; + } + + /** Returns empty if {@link #isPermanent()} is false. */ + public Optional<Catalog> getCatalog() { + return Optional.ofNullable(catalog); + } + + /** Returns a fully resolved catalog object. */ + @SuppressWarnings("unchecked") + public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() { + return (T) resolvedTable; + } + + public ResolvedSchema getResolvedSchema() { + return resolvedTable.getResolvedSchema(); + } + + /** Returns the original metadata object returned by the catalog. */ + @SuppressWarnings("unchecked") + public <T extends CatalogBaseTable> T getTable() { + return (T) resolvedTable.getOrigin(); + } + + /** + * Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options. + */ + public ContextResolvedTable copy(Map<String, String> newOptions) { + if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) { + throw new ValidationException( + String.format("The view '%s' cannot be enriched with new options.", this)); + } + return new ContextResolvedTable( + objectIdentifier, + catalog, + ((ResolvedCatalogTable) resolvedTable).copy(newOptions), + false); + } + + @Override + public String toString() { + return objectIdentifier.asSummaryString(); + } + + /** + * This method tries to return the connector name of the table, trying to provide a bit more + * helpful toString for anonymous tables. It's only to help users to debug, and its return value + * should not be relied on. + */ + private static String generateAnonymousStringIdentifier( + @Nullable String hint, ResolvedCatalogBaseTable<?> resolvedTable) { + // This has been added after a very intensive debugging session, so don't remove it unless + // you really know what you're doing. + // Planner can do some fancy optimizations' logic squashing two sources together in the same + // operator. Because this logic is string based, anonymous tables still need some kind of + // unique string based identifier that can be used later by the planner. + if (hint == null) { + try { + hint = resolvedTable.getOptions().get(FactoryUtil.CONNECTOR.key()); + } catch (Exception ignored) { + } + } + + int id = uniqueId.incrementAndGet(); + if (hint == null) { + return "anonymous$" + id; + } + + return "anonymous_" + hint + "$" + id; Review comment: let's put `*...*` around it to indicate an anonymous object consistently. we do the same for functions and types. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class contains information about a table, its {@link ResolvedSchema}, its options and its + * relationship with a {@link Catalog}, if any. + * + * <p>There can be 3 kinds of {@link ContextResolvedTable}: + * + * <ul> + * <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated + * unique {@link ObjectIdentifier}. + * <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated + * unique {@link ObjectIdentifier} and is flagged as temporary. + * <li>An anonymous/inline table: a table which is not stored in a catalog and doesn't have an + * associated unique {@link ObjectIdentifier}. + * </ul> + * + * <p>The different handling of temporary and permanent tables is {@link Catalog} and {@link + * CatalogManager} instance specific, hence for these two kind of tables, an instance of this object + * represents the relationship between the specific {@link ResolvedCatalogBaseTable} instance and + * the specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link + * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent for another one. + */ +@Internal +public class ContextResolvedTable { + + private static final AtomicInteger uniqueId = new AtomicInteger(0); + + private final ObjectIdentifier objectIdentifier; + private final @Nullable Catalog catalog; + private final ResolvedCatalogBaseTable<?> resolvedTable; + private final boolean anonymous; + + public static ContextResolvedTable permanent( + ObjectIdentifier identifier, + Catalog catalog, + ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable( + identifier, Preconditions.checkNotNull(catalog), resolvedTable, false); + } + + public static ContextResolvedTable temporary( + ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable(identifier, null, resolvedTable, false); + } + + public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable( + ObjectIdentifier.ofAnonymous( + generateAnonymousStringIdentifier(null, resolvedTable)), + null, + resolvedTable, + true); + } + + public static ContextResolvedTable anonymous( + String hint, ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable( + ObjectIdentifier.ofAnonymous( + generateAnonymousStringIdentifier(hint, resolvedTable)), + null, + resolvedTable, + true); + } + + private ContextResolvedTable( + ObjectIdentifier objectIdentifier, + @Nullable Catalog catalog, + ResolvedCatalogBaseTable<?> resolvedTable, + boolean anonymous) { + this.objectIdentifier = Preconditions.checkNotNull(objectIdentifier); + this.catalog = catalog; + this.resolvedTable = Preconditions.checkNotNull(resolvedTable); + this.anonymous = anonymous; + } + + public boolean isAnonymous() { + return this.anonymous; + } + + /** @return true if the table is temporary. An anonymous table is always temporary. */ + public boolean isTemporary() { + return catalog == null; + } + + public boolean isPermanent() { + return !isTemporary(); + } + + public ObjectIdentifier getIdentifier() { + return objectIdentifier; + } + + /** Returns empty if {@link #isPermanent()} is false. */ + public Optional<Catalog> getCatalog() { + return Optional.ofNullable(catalog); + } + + /** Returns a fully resolved catalog object. */ + @SuppressWarnings("unchecked") + public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() { + return (T) resolvedTable; + } + + public ResolvedSchema getResolvedSchema() { + return resolvedTable.getResolvedSchema(); + } + + /** Returns the original metadata object returned by the catalog. */ + @SuppressWarnings("unchecked") + public <T extends CatalogBaseTable> T getTable() { + return (T) resolvedTable.getOrigin(); + } + + /** + * Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options. + */ + public ContextResolvedTable copy(Map<String, String> newOptions) { + if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) { + throw new ValidationException( + String.format("The view '%s' cannot be enriched with new options.", this)); + } + return new ContextResolvedTable( + objectIdentifier, + catalog, + ((ResolvedCatalogTable) resolvedTable).copy(newOptions), + false); + } + + @Override + public String toString() { + return objectIdentifier.asSummaryString(); + } + + /** + * This method tries to return the connector name of the table, trying to provide a bit more + * helpful toString for anonymous tables. It's only to help users to debug, and its return value + * should not be relied on. + */ + private static String generateAnonymousStringIdentifier( + @Nullable String hint, ResolvedCatalogBaseTable<?> resolvedTable) { + // This has been added after a very intensive debugging session, so don't remove it unless Review comment: remove this and the following line, people should always know what they are doing in this code base ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java ########## @@ -157,7 +157,7 @@ public static DynamicTableSource createDynamicTableSource( } catch (Throwable t) { throw new ValidationException( String.format( - "Unable to create a source for reading table '%s'.\n\n" + "Unable to create a source for reading the table '%s'.\n\n" Review comment: not a typo: this is similar to http://www.englishteachermelanie.com/grammar-when-not-to-use-the-definite-article/ point 5 ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java ########## @@ -107,8 +109,20 @@ public StatementSet addInsert( final TableDescriptor updatedDescriptor = targetDescriptor.toBuilder().schema(schemaTranslationResult.getSchema()).build(); - tableEnvironment.createTemporaryTable(path, updatedDescriptor); - return addInsert(path, table, overwrite); + final ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + tableEnvironment + .getCatalogManager() + .resolveCatalogBaseTable(updatedDescriptor.toCatalogTable()); Review comment: can be `resolveCatalogTable ` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java ########## @@ -593,8 +587,25 @@ public TableResult executeInsert(TableDescriptor descriptor, boolean overwrite) final TableDescriptor updatedDescriptor = descriptor.toBuilder().schema(schemaTranslationResult.getSchema()).build(); - tableEnvironment.createTemporaryTable(path, updatedDescriptor); - return executeInsert(path, overwrite); + final ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + tableEnvironment + .getCatalogManager() + .resolveCatalogBaseTable(updatedDescriptor.toCatalogTable()); Review comment: can be `resolveCatalogTable ` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ########## @@ -540,9 +538,11 @@ public Table from(String path) { public Table from(TableDescriptor descriptor) { Preconditions.checkNotNull(descriptor, "Table descriptor must not be null."); - final String path = TableDescriptorUtil.getUniqueAnonymousPath(); - createTemporaryTableInternal(UnresolvedIdentifier.of(path), descriptor); - return from(path); + final ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + catalogManager.resolveCatalogBaseTable(descriptor.toCatalogTable()); Review comment: can be `resolveCatalogTable ` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class contains information about a table, its {@link ResolvedSchema}, its options and its + * relationship with a {@link Catalog}, if any. + * + * <p>There can be 3 kinds of {@link ContextResolvedTable}: + * + * <ul> + * <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated + * unique {@link ObjectIdentifier}. + * <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated + * unique {@link ObjectIdentifier} and is flagged as temporary. + * <li>An anonymous/inline table: a table which is not stored in a catalog and doesn't have an + * associated unique {@link ObjectIdentifier}. + * </ul> + * + * <p>The different handling of temporary and permanent tables is {@link Catalog} and {@link + * CatalogManager} instance specific, hence for these two kind of tables, an instance of this object + * represents the relationship between the specific {@link ResolvedCatalogBaseTable} instance and + * the specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link + * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent for another one. + */ +@Internal +public class ContextResolvedTable { + + private static final AtomicInteger uniqueId = new AtomicInteger(0); + + private final ObjectIdentifier objectIdentifier; + private final @Nullable Catalog catalog; + private final ResolvedCatalogBaseTable<?> resolvedTable; + private final boolean anonymous; + + public static ContextResolvedTable permanent( + ObjectIdentifier identifier, + Catalog catalog, + ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable( + identifier, Preconditions.checkNotNull(catalog), resolvedTable, false); + } + + public static ContextResolvedTable temporary( + ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable(identifier, null, resolvedTable, false); + } + + public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable( + ObjectIdentifier.ofAnonymous( + generateAnonymousStringIdentifier(null, resolvedTable)), + null, + resolvedTable, + true); + } + + public static ContextResolvedTable anonymous( + String hint, ResolvedCatalogBaseTable<?> resolvedTable) { + return new ContextResolvedTable( + ObjectIdentifier.ofAnonymous( + generateAnonymousStringIdentifier(hint, resolvedTable)), + null, + resolvedTable, + true); + } + + private ContextResolvedTable( + ObjectIdentifier objectIdentifier, + @Nullable Catalog catalog, + ResolvedCatalogBaseTable<?> resolvedTable, + boolean anonymous) { + this.objectIdentifier = Preconditions.checkNotNull(objectIdentifier); + this.catalog = catalog; + this.resolvedTable = Preconditions.checkNotNull(resolvedTable); + this.anonymous = anonymous; + } + + public boolean isAnonymous() { + return this.anonymous; + } + + /** @return true if the table is temporary. An anonymous table is always temporary. */ + public boolean isTemporary() { + return catalog == null; + } + + public boolean isPermanent() { + return !isTemporary(); + } + + public ObjectIdentifier getIdentifier() { + return objectIdentifier; + } + + /** Returns empty if {@link #isPermanent()} is false. */ + public Optional<Catalog> getCatalog() { + return Optional.ofNullable(catalog); + } + + /** Returns a fully resolved catalog object. */ + @SuppressWarnings("unchecked") + public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() { + return (T) resolvedTable; + } + + public ResolvedSchema getResolvedSchema() { + return resolvedTable.getResolvedSchema(); + } + + /** Returns the original metadata object returned by the catalog. */ + @SuppressWarnings("unchecked") + public <T extends CatalogBaseTable> T getTable() { + return (T) resolvedTable.getOrigin(); + } + + /** + * Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options. + */ + public ContextResolvedTable copy(Map<String, String> newOptions) { + if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) { + throw new ValidationException( + String.format("The view '%s' cannot be enriched with new options.", this)); Review comment: "View '%s' cannot be enriched with new options. Hints can only be applied to tables." ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java ########## @@ -26,25 +26,32 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; +import java.util.function.Supplier; /** Operation to describe a CREATE TABLE AS statement. */ @Internal public class CreateTableASOperation implements CreateOperation { private final CreateTableOperation createTableOperation; - private final SinkModifyOperation insertOperation; + private final Supplier<SinkModifyOperation> insertOperationFactory; + + private SinkModifyOperation insertOperation; Review comment: then simplify the summary string? It seems not really a summary anyways looking at `getCatalogTable().toProperties()` ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java ########## @@ -80,28 +79,28 @@ public static RelNode convertDataStreamToRel( boolean isBatchMode, ReadableConfig config, FlinkRelBuilder relBuilder, - ObjectIdentifier identifier, - ResolvedSchema schema, + ContextResolvedTable contextResolvedTable, DataStream<?> dataStream, DataType physicalDataType, boolean isTopLevelRecord, ChangelogMode changelogMode) { - final CatalogTable unresolvedTable = new InlineCatalogTable(schema); - final ResolvedCatalogTable catalogTable = new ResolvedCatalogTable(unresolvedTable, schema); final DynamicTableSource tableSource = new ExternalDynamicSource<>( - identifier, dataStream, physicalDataType, isTopLevelRecord, changelogMode); + dataStream, physicalDataType, isTopLevelRecord, changelogMode); final FlinkStatistic statistic = FlinkStatistic.builder() // this is a temporary solution, FLINK-15123 will resolve this Review comment: You added another one below. This comment exists 3 times in the code base now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org