This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push: new 9ca88e0b06 [#5194] feat(flink): Support basic table DDL Operation for paimon-catalog (#6224) 9ca88e0b06 is described below commit 9ca88e0b06a75366c680610397f136519e8890f4 Author: yangyang zhong <35210666+hdyg...@users.noreply.github.com> AuthorDate: Wed Jan 15 18:20:25 2025 +0800 [#5194] feat(flink): Support basic table DDL Operation for paimon-catalog (#6224) ### What changes were proposed in this pull request? Support basic table DDL Operation for paimon-catalog ### Why are the changes needed? Fix: #5194 ### Does this PR introduce _any_ user-facing change? None. ### How was this patch tested? org.apache.gravitino.flink.connector.integration.test.paimon.FlinkPaimonCatalogIT --- .../flink/connector/catalog/BaseCatalog.java | 4 ++-- .../connector/paimon/GravitinoPaimonCatalog.java | 24 +++++++++++++++++++++ .../connector/integration/test/FlinkEnvIT.java | 8 ++----- .../integration/test/hive/FlinkHiveCatalogIT.java | 25 ++++++++++++++++++++++ .../test/paimon/FlinkPaimonCatalogIT.java | 10 --------- 5 files changed, 53 insertions(+), 18 deletions(-) diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index 1496742177..fd8e118ee4 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -656,11 +656,11 @@ public abstract class BaseCatalog extends AbstractCatalog { return schemaChanges.toArray(new SchemaChange[0]); } - private Catalog catalog() { + protected Catalog catalog() { return GravitinoCatalogManager.get().getGravitinoCatalogInfo(getName()); } - private String catalogName() { + protected String catalogName() { return getName(); } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java index 017ac6e708..c22e00fa12 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java @@ -19,10 +19,17 @@ package org.apache.gravitino.flink.connector.paimon; +import java.util.Optional; import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.factories.Factory; +import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.flink.connector.PartitionConverter; import org.apache.gravitino.flink.connector.PropertiesConverter; import org.apache.gravitino.flink.connector.catalog.BaseCatalog; +import org.apache.paimon.flink.FlinkTableFactory; /** * The GravitinoPaimonCatalog class is an implementation of the BaseCatalog class that is used to @@ -45,4 +52,21 @@ public class GravitinoPaimonCatalog extends BaseCatalog { protected AbstractCatalog realCatalog() { return paimonCatalog; } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + boolean dropped = + catalog() + .asTableCatalog() + .purgeTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName())); + if (!dropped && !ignoreIfNotExists) { + throw new TableNotExistException(catalogName(), tablePath); + } + } + + @Override + public Optional<Factory> getFactory() { + return Optional.of(new FlinkTableFactory()); + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java index 5ae8847c6c..f56b5297e1 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -19,7 +19,6 @@ package org.apache.gravitino.flink.connector.integration.test; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import java.io.IOException; @@ -159,17 +158,14 @@ public abstract class FlinkEnvIT extends BaseIT { return tableEnv.executeSql(String.format(sql, args)); } - protected static void doWithSchema( + protected void doWithSchema( Catalog catalog, String schemaName, Consumer<Catalog> action, boolean dropSchema) { Preconditions.checkNotNull(catalog); Preconditions.checkNotNull(schemaName); try { tableEnv.useCatalog(catalog.name()); if (!catalog.asSchemas().schemaExists(schemaName)) { - catalog - .asSchemas() - .createSchema( - schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName)); + catalog.asSchemas().createSchema(schemaName, null, null); } tableEnv.useDatabase(schemaName); action.accept(catalog); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 333aa83f0b..bb7b25f6b2 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; @@ -586,4 +587,28 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { protected org.apache.gravitino.Catalog currentCatalog() { return hiveCatalog; } + + protected void doWithSchema( + org.apache.gravitino.Catalog catalog, + String schemaName, + Consumer<org.apache.gravitino.Catalog> action, + boolean dropSchema) { + Preconditions.checkNotNull(catalog); + Preconditions.checkNotNull(schemaName); + try { + tableEnv.useCatalog(catalog.name()); + if (!catalog.asSchemas().schemaExists(schemaName)) { + catalog + .asSchemas() + .createSchema( + schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName)); + } + tableEnv.useDatabase(schemaName); + action.accept(catalog); + } finally { + if (dropSchema) { + catalog.asSchemas().dropSchema(schemaName, true); + } + } + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index 10fab3567a..57a17c2a11 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -42,16 +42,6 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog catalog; - @Override - protected boolean supportColumnOperation() { - return false; - } - - @Override - protected boolean supportTableOperation() { - return false; - } - @Override protected boolean supportSchemaOperationWithCommentAndOptions() { return false;