This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ca88e0b06 [#5194] feat(flink): Support basic table DDL Operation for 
paimon-catalog (#6224)
9ca88e0b06 is described below

commit 9ca88e0b06a75366c680610397f136519e8890f4
Author: yangyang zhong <35210666+hdyg...@users.noreply.github.com>
AuthorDate: Wed Jan 15 18:20:25 2025 +0800

    [#5194] feat(flink): Support basic table DDL Operation for paimon-catalog 
(#6224)
    
    ### What changes were proposed in this pull request?
    
    Support basic table DDL Operation for paimon-catalog
    
    ### Why are the changes needed?
    
    Fix: #5194
    
    ### Does this PR introduce _any_ user-facing change?
    
    None.
    
    ### How was this patch tested?
    
    
    
org.apache.gravitino.flink.connector.integration.test.paimon.FlinkPaimonCatalogIT
---
 .../flink/connector/catalog/BaseCatalog.java       |  4 ++--
 .../connector/paimon/GravitinoPaimonCatalog.java   | 24 +++++++++++++++++++++
 .../connector/integration/test/FlinkEnvIT.java     |  8 ++-----
 .../integration/test/hive/FlinkHiveCatalogIT.java  | 25 ++++++++++++++++++++++
 .../test/paimon/FlinkPaimonCatalogIT.java          | 10 ---------
 5 files changed, 53 insertions(+), 18 deletions(-)

diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
index 1496742177..fd8e118ee4 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
@@ -656,11 +656,11 @@ public abstract class BaseCatalog extends AbstractCatalog 
{
     return schemaChanges.toArray(new SchemaChange[0]);
   }
 
-  private Catalog catalog() {
+  protected Catalog catalog() {
     return GravitinoCatalogManager.get().getGravitinoCatalogInfo(getName());
   }
 
-  private String catalogName() {
+  protected String catalogName() {
     return getName();
   }
 }
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
index 017ac6e708..c22e00fa12 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
@@ -19,10 +19,17 @@
 
 package org.apache.gravitino.flink.connector.paimon;
 
+import java.util.Optional;
 import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.Factory;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.flink.connector.PartitionConverter;
 import org.apache.gravitino.flink.connector.PropertiesConverter;
 import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
+import org.apache.paimon.flink.FlinkTableFactory;
 
 /**
  * The GravitinoPaimonCatalog class is an implementation of the BaseCatalog 
class that is used to
@@ -45,4 +52,21 @@ public class GravitinoPaimonCatalog extends BaseCatalog {
   protected AbstractCatalog realCatalog() {
     return paimonCatalog;
   }
+
+  @Override
+  public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {
+    boolean dropped =
+        catalog()
+            .asTableCatalog()
+            .purgeTable(NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName()));
+    if (!dropped && !ignoreIfNotExists) {
+      throw new TableNotExistException(catalogName(), tablePath);
+    }
+  }
+
+  @Override
+  public Optional<Factory> getFactory() {
+    return Optional.of(new FlinkTableFactory());
+  }
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
index 5ae8847c6c..f56b5297e1 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
@@ -19,7 +19,6 @@
 package org.apache.gravitino.flink.connector.integration.test;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
 import com.google.errorprone.annotations.FormatMethod;
 import com.google.errorprone.annotations.FormatString;
 import java.io.IOException;
@@ -159,17 +158,14 @@ public abstract class FlinkEnvIT extends BaseIT {
     return tableEnv.executeSql(String.format(sql, args));
   }
 
-  protected static void doWithSchema(
+  protected void doWithSchema(
       Catalog catalog, String schemaName, Consumer<Catalog> action, boolean 
dropSchema) {
     Preconditions.checkNotNull(catalog);
     Preconditions.checkNotNull(schemaName);
     try {
       tableEnv.useCatalog(catalog.name());
       if (!catalog.asSchemas().schemaExists(schemaName)) {
-        catalog
-            .asSchemas()
-            .createSchema(
-                schemaName, null, ImmutableMap.of("location", warehouse + "/" 
+ schemaName));
+        catalog.asSchemas().createSchema(schemaName, null, null);
       }
       tableEnv.useDatabase(schemaName);
       action.accept(catalog);
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
index 333aa83f0b..bb7b25f6b2 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
@@ -586,4 +587,28 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
   protected org.apache.gravitino.Catalog currentCatalog() {
     return hiveCatalog;
   }
+
+  protected void doWithSchema(
+      org.apache.gravitino.Catalog catalog,
+      String schemaName,
+      Consumer<org.apache.gravitino.Catalog> action,
+      boolean dropSchema) {
+    Preconditions.checkNotNull(catalog);
+    Preconditions.checkNotNull(schemaName);
+    try {
+      tableEnv.useCatalog(catalog.name());
+      if (!catalog.asSchemas().schemaExists(schemaName)) {
+        catalog
+            .asSchemas()
+            .createSchema(
+                schemaName, null, ImmutableMap.of("location", warehouse + "/" 
+ schemaName));
+      }
+      tableEnv.useDatabase(schemaName);
+      action.accept(catalog);
+    } finally {
+      if (dropSchema) {
+        catalog.asSchemas().dropSchema(schemaName, true);
+      }
+    }
+  }
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
index 10fab3567a..57a17c2a11 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
@@ -42,16 +42,6 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
 
   private static org.apache.gravitino.Catalog catalog;
 
-  @Override
-  protected boolean supportColumnOperation() {
-    return false;
-  }
-
-  @Override
-  protected boolean supportTableOperation() {
-    return false;
-  }
-
   @Override
   protected boolean supportSchemaOperationWithCommentAndOptions() {
     return false;

Reply via email to