FANNG1 commented on code in PR #3795: URL: https://github.com/apache/gravitino/pull/3795#discussion_r1663345010
########## flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java: ########## @@ -145,32 +164,87 @@ public List<String> listViews(String s) throws DatabaseNotExistException, Catalo } @Override - public CatalogBaseTable getTable(ObjectPath objectPath) + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + Table table = + catalog() + .asTableCatalog() + .loadTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName())); + return toFlinkTable(table); + } catch (NoSuchCatalogException e) { + throw new CatalogException(e); + } } @Override - public boolean tableExists(ObjectPath objectPath) throws CatalogException { - throw new UnsupportedOperationException(); + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + return catalog() + .asTableCatalog() + .tableExists(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName())); } @Override - public void dropTable(ObjectPath objectPath, boolean b) + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + boolean dropped = + catalog() + .asTableCatalog() + .dropTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName())); + if (!dropped && !ignoreIfNotExists) { + throw new TableNotExistException(catalogName(), tablePath); + } } @Override - public void renameTable(ObjectPath objectPath, String s, boolean b) + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException { - throw new UnsupportedOperationException(); + NameIdentifier identifier = + NameIdentifier.of(Namespace.of(tablePath.getDatabaseName()), newTableName); + + if (catalog().asTableCatalog().tableExists(identifier)) { + throw new TableAlreadyExistException( + catalogName(), ObjectPath.fromString(tablePath.getDatabaseName() + newTableName)); + } + + try { + catalog() + .asTableCatalog() + .alterTable( + NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()), + TableChange.rename(newTableName)); + } catch (NoSuchCatalogException e) { Review Comment: NoSuchTableException? ########## flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java: ########## @@ -145,32 +164,87 @@ public List<String> listViews(String s) throws DatabaseNotExistException, Catalo } @Override - public CatalogBaseTable getTable(ObjectPath objectPath) + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + Table table = + catalog() + .asTableCatalog() + .loadTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName())); + return toFlinkTable(table); + } catch (NoSuchCatalogException e) { Review Comment: loadTable throws `NoSuchTableException` not `NoSuchCatalogException`? ########## flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java: ########## @@ -258,6 +278,154 @@ public void testGetCatalogFromGravitino() { numCatalogs, tableEnv.listCatalogs().length, "The created catalog should be dropped."); } + @Test + public void testCreateNoPartitionTable() { Review Comment: some table test could place in FlinkCommonIT? ########## flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java: ########## @@ -337,6 +411,29 @@ public void alterPartitionColumnStatistics( protected abstract PropertiesConverter getPropertiesConverter(); + protected CatalogBaseTable toFlinkTable(Table table) { + + org.apache.flink.table.api.Schema.Builder builder = + org.apache.flink.table.api.Schema.newBuilder(); + for (Column column : table.columns()) { + builder + .column(column.name(), TypeUtils.toFlinkType(column.dataType())) Review Comment: handle nullable? ########## flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java: ########## @@ -337,6 +411,29 @@ public void alterPartitionColumnStatistics( protected abstract PropertiesConverter getPropertiesConverter(); + protected CatalogBaseTable toFlinkTable(Table table) { + Review Comment: please remove blank line ########## flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java: ########## @@ -258,6 +278,154 @@ public void testGetCatalogFromGravitino() { numCatalogs, tableEnv.listCatalogs().length, "The created catalog should be dropped."); } + @Test + public void testCreateNoPartitionTable() { Review Comment: could you add write&read data after creating table? ########## flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/TypeUtils.java: ########## @@ -0,0 +1,42 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.flink.connector.utils; + +import com.datastrato.gravitino.rel.types.Type; +import com.datastrato.gravitino.rel.types.Types; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +public class TypeUtils { Review Comment: could you add UT for this class? ########## flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java: ########## @@ -337,6 +411,29 @@ public void alterPartitionColumnStatistics( protected abstract PropertiesConverter getPropertiesConverter(); + protected CatalogBaseTable toFlinkTable(Table table) { + + org.apache.flink.table.api.Schema.Builder builder = + org.apache.flink.table.api.Schema.newBuilder(); + for (Column column : table.columns()) { + builder + .column(column.name(), TypeUtils.toFlinkType(column.dataType())) + .withComment(column.comment()); + } + return CatalogTable.of( Review Comment: miss table properties? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@gravitino.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org