slinkydeveloper commented on a change in pull request #18088:
URL: https://github.com/apache/flink/pull/18088#discussion_r769371971



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
##########
@@ -311,6 +311,15 @@ void createTable(ObjectPath tablePath, CatalogBaseTable 
table, boolean ignoreIfE
     void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
             throws TableNotExistException, CatalogException;
 
+    /**
+     * If true, tables which do not specify a connector will be translated to 
managed tables.
+     *
+     * @see CatalogBaseTable.TableKind#MANAGED
+     */
+    default boolean supportsManagedTable() {

Review comment:
       I might miss a point in the original design, but why the catalog needs 
to support managed tables? Isn't the support already provided by CatalogManager 
itself?

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
##########
@@ -158,6 +158,23 @@ public void testPropertyDeSerialization() {
         assertThat(resolvedTable.getResolvedSchema(), 
equalTo(RESOLVED_TABLE_SCHEMA));
     }
 
+    @Test
+    public void testManagedPropertyDeSerialization() {
+        Map<String, String> properties = catalogTableAsProperties();
+        properties.put("table-kind", 
CatalogBaseTable.TableKind.MANAGED.toString());
+
+        final CatalogTable table = CatalogTable.fromProperties(properties);
+
+        final ResolvedCatalogTable resolvedTable =
+                resolveCatalogBaseTable(ResolvedCatalogTable.class, table);
+
+        assertThat(resolvedTable.getTableKind(), 
equalTo(CatalogBaseTable.TableKind.MANAGED));
+
+        assertThat(resolvedTable.toProperties(), equalTo(properties));
+
+        assertThat(resolvedTable.getResolvedSchema(), 
equalTo(RESOLVED_TABLE_SCHEMA));

Review comment:
       Please use only assertj assertions, and convert the tests here to use 
assertj

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
##########
@@ -110,4 +121,102 @@ public void testTableFromDescriptor() {
 
         assertEquals("fake", 
lookupResult.get().getTable().getOptions().get("connector"));
     }
+
+    @Test
+    public void testManagedTable() {
+        innerTestManagedTableFromDescriptor(false, false);
+    }
+
+    @Test
+    public void testManagedTableWithIgnoreExists() {
+        innerTestManagedTableFromDescriptor(true, false);
+    }
+
+    @Test
+    public void testTemporaryManagedTableWithIgnoreExists() {
+        innerTestManagedTableFromDescriptor(true, true);
+    }
+
+    @Test
+    public void testTemporaryManagedTable() {
+        innerTestManagedTableFromDescriptor(true, true);
+    }
+
+    private void innerTestManagedTableFromDescriptor(boolean ignoreIfExists, 
boolean isTemporary) {
+        final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+        final String catalog = tEnv.getCurrentCatalog();
+        final String database = tEnv.getCurrentDatabase();
+
+        final Schema schema = Schema.newBuilder().column("f0", 
DataTypes.INT()).build();
+        final String tableName = UUID.randomUUID().toString();
+        ObjectIdentifier identifier = ObjectIdentifier.of(catalog, database, 
tableName);
+
+        // create table
+        MANAGED_TABLES.put(identifier, new AtomicReference<>());
+        CreateTableOperation createOperation =
+                new CreateTableOperation(
+                        identifier,
+                        TableDescriptor.forManaged()
+                                .schema(schema)
+                                .option("a", "Test")
+                                .build()
+                                .toCatalogTable(),
+                        ignoreIfExists,
+                        isTemporary);
+
+        tEnv.executeInternal(createOperation);
+
+        // test ignore: create again
+        if (ignoreIfExists) {
+            tEnv.executeInternal(createOperation);
+        } else {
+            Assertions.assertThrows(

Review comment:
       Please use assertj for testing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to