slinkydeveloper commented on a change in pull request #18088: URL: https://github.com/apache/flink/pull/18088#discussion_r769371971
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java ########## @@ -311,6 +311,15 @@ void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfE void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; + /** + * If true, tables which do not specify a connector will be translated to managed tables. + * + * @see CatalogBaseTable.TableKind#MANAGED + */ + default boolean supportsManagedTable() { Review comment: I might miss a point in the original design, but why the catalog needs to support managed tables? Isn't the support already provided by CatalogManager itself? ########## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java ########## @@ -158,6 +158,23 @@ public void testPropertyDeSerialization() { assertThat(resolvedTable.getResolvedSchema(), equalTo(RESOLVED_TABLE_SCHEMA)); } + @Test + public void testManagedPropertyDeSerialization() { + Map<String, String> properties = catalogTableAsProperties(); + properties.put("table-kind", CatalogBaseTable.TableKind.MANAGED.toString()); + + final CatalogTable table = CatalogTable.fromProperties(properties); + + final ResolvedCatalogTable resolvedTable = + resolveCatalogBaseTable(ResolvedCatalogTable.class, table); + + assertThat(resolvedTable.getTableKind(), equalTo(CatalogBaseTable.TableKind.MANAGED)); + + assertThat(resolvedTable.toProperties(), equalTo(properties)); + + assertThat(resolvedTable.getResolvedSchema(), equalTo(RESOLVED_TABLE_SCHEMA)); Review comment: Please use only assertj assertions, and convert the tests here to use assertj ########## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java ########## @@ -110,4 +121,102 @@ public void testTableFromDescriptor() { assertEquals("fake", lookupResult.get().getTable().getOptions().get("connector")); } + + @Test + public void testManagedTable() { + innerTestManagedTableFromDescriptor(false, false); + } + + @Test + public void testManagedTableWithIgnoreExists() { + innerTestManagedTableFromDescriptor(true, false); + } + + @Test + public void testTemporaryManagedTableWithIgnoreExists() { + innerTestManagedTableFromDescriptor(true, true); + } + + @Test + public void testTemporaryManagedTable() { + innerTestManagedTableFromDescriptor(true, true); + } + + private void innerTestManagedTableFromDescriptor(boolean ignoreIfExists, boolean isTemporary) { + final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + final String catalog = tEnv.getCurrentCatalog(); + final String database = tEnv.getCurrentDatabase(); + + final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build(); + final String tableName = UUID.randomUUID().toString(); + ObjectIdentifier identifier = ObjectIdentifier.of(catalog, database, tableName); + + // create table + MANAGED_TABLES.put(identifier, new AtomicReference<>()); + CreateTableOperation createOperation = + new CreateTableOperation( + identifier, + TableDescriptor.forManaged() + .schema(schema) + .option("a", "Test") + .build() + .toCatalogTable(), + ignoreIfExists, + isTemporary); + + tEnv.executeInternal(createOperation); + + // test ignore: create again + if (ignoreIfExists) { + tEnv.executeInternal(createOperation); + } else { + Assertions.assertThrows( Review comment: Please use assertj for testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org