This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new e626faaa59 [#6216] flink-connector: Add integration Tests for 
GravitinoPaimonCatalog with Catalog-backend as HiveMetaStore (#6805)
e626faaa59 is described below

commit e626faaa59527ab9ee087f4c38c500dba7fd25e1
Author: yangyang zhong <35210666+hdyg...@users.noreply.github.com>
AuthorDate: Mon Apr 7 10:12:21 2025 +0800

    [#6216] flink-connector: Add integration Tests for GravitinoPaimonCatalog 
with Catalog-backend as HiveMetaStore (#6805)
    
    ### Why are the changes needed?
    
    Fix: #6216
    
    ### How was this patch tested?
    
    FlinkPaimonHiveCatalogIT
---
 .../test/paimon/FlinkPaimonCatalogIT.java          | 76 +++++++++-------------
 .../test/paimon/FlinkPaimonHiveBackendIT.java      | 59 +++++++++++++++++
 .../FlinkPaimonLocalFileSystemBackendIT.java       | 63 ++++++++++++++++++
 3 files changed, 152 insertions(+), 46 deletions(-)

diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
index 948e02dd81..caa480c756 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
@@ -19,28 +19,17 @@
 package org.apache.gravitino.flink.connector.integration.test.paimon;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import java.nio.file.Path;
 import java.util.Map;
 import org.apache.gravitino.Catalog;
-import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
 import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
-@Tag("gravitino-docker-test")
-public class FlinkPaimonCatalogIT extends FlinkCommonIT {
+public abstract class FlinkPaimonCatalogIT extends FlinkCommonIT {
 
-  @TempDir private static Path warehouseDir;
-
-  private static final String DEFAULT_PAIMON_CATALOG =
-      "test_flink_paimon_filesystem_schema_catalog";
-
-  private static org.apache.gravitino.Catalog catalog;
+  protected org.apache.gravitino.Catalog catalog;
 
   @Override
   protected boolean supportSchemaOperationWithCommentAndOptions() {
@@ -61,56 +50,51 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
     return catalog;
   }
 
-  @BeforeAll
-  void setup() {
-    initPaimonCatalog();
-  }
-
-  @AfterAll
-  void stop() {
-    Preconditions.checkNotNull(metalake);
-    metalake.dropCatalog(DEFAULT_PAIMON_CATALOG, true);
-  }
-
   private void initPaimonCatalog() {
     Preconditions.checkNotNull(metalake);
     catalog =
         metalake.createCatalog(
-            DEFAULT_PAIMON_CATALOG,
+            getPaimonCatalogName(),
             org.apache.gravitino.Catalog.Type.RELATIONAL,
             getProvider(),
             null,
-            ImmutableMap.of(
-                PaimonConstants.CATALOG_BACKEND,
-                "filesystem",
-                "warehouse",
-                warehouseDir.toString()));
+            getPaimonCatalogOptions());
   }
 
+  protected abstract void createGravitinoCatalogByFlinkSql(String catalogName);
+
+  protected abstract String getPaimonCatalogName();
+
+  protected abstract Map<String, String> getPaimonCatalogOptions();
+
+  @BeforeAll
+  void paimonSetup() {
+    initPaimonCatalog();
+  }
+
+  @AfterAll
+  void paimonStop() {
+    Preconditions.checkNotNull(metalake);
+    metalake.dropCatalog(getPaimonCatalogName(), true);
+  }
+
+  protected abstract String getWarehouse();
+
   @Test
   public void testCreateGravitinoPaimonCatalogUsingSQL() {
     tableEnv.useCatalog(DEFAULT_CATALOG);
     int numCatalogs = tableEnv.listCatalogs().length;
-    String catalogName = "gravitino_hive_sql";
-    String warehouse = warehouseDir.toString();
-    tableEnv.executeSql(
-        String.format(
-            "create catalog %s with ("
-                + "'type'='gravitino-paimon', "
-                + "'warehouse'='%s',"
-                + "'metastore'='filesystem'"
-                + ")",
-            catalogName, warehouse));
+    String catalogName = "gravitino_paimon_catalog";
+    createGravitinoCatalogByFlinkSql(catalogName);
     String[] catalogs = tableEnv.listCatalogs();
     Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a 
new catalog");
     Assertions.assertTrue(metalake.catalogExists(catalogName));
     org.apache.gravitino.Catalog gravitinoCatalog = 
metalake.loadCatalog(catalogName);
     Map<String, String> properties = gravitinoCatalog.properties();
-    Assertions.assertEquals(warehouse, properties.get("warehouse"));
-  }
-
-  @Override
-  protected Map<String, String> getCreateSchemaProps(String schemaName) {
-    return null;
+    Assertions.assertEquals(getWarehouse(), properties.get("warehouse"));
+    tableEnv.executeSql("drop catalog " + catalogName);
+    Assertions.assertFalse(metalake.catalogExists(catalogName));
+    Assertions.assertEquals(
+        numCatalogs, tableEnv.listCatalogs().length, "The created catalog 
should be dropped.");
   }
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonHiveBackendIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonHiveBackendIT.java
new file mode 100644
index 0000000000..9025baeb5f
--- /dev/null
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonHiveBackendIT.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.integration.test.paimon;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
+import org.junit.jupiter.api.Tag;
+
+@Tag("gravitino-docker-test")
+public class FlinkPaimonHiveBackendIT extends FlinkPaimonCatalogIT {
+
+  private static final String DEFAULT_PAIMON_CATALOG = 
"test_flink_paimon_hive_catalog";
+
+  @Override
+  protected void createGravitinoCatalogByFlinkSql(String catalogName) {
+    tableEnv.executeSql(
+        String.format(
+            "create catalog %s with ("
+                + "'type'='gravitino-paimon', "
+                + "'warehouse'='%s',"
+                + "'metastore'='hive',"
+                + "'uri'='%s'"
+                + ")",
+            catalogName, warehouse, hiveMetastoreUri));
+  }
+
+  @Override
+  protected String getPaimonCatalogName() {
+    return DEFAULT_PAIMON_CATALOG;
+  }
+
+  @Override
+  protected Map<String, String> getPaimonCatalogOptions() {
+    return ImmutableMap.of(
+        PaimonConstants.CATALOG_BACKEND, "hive", "warehouse", warehouse, 
"uri", hiveMetastoreUri);
+  }
+
+  @Override
+  protected String getWarehouse() {
+    return warehouse;
+  }
+}
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonLocalFileSystemBackendIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonLocalFileSystemBackendIT.java
new file mode 100644
index 0000000000..a84a240336
--- /dev/null
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonLocalFileSystemBackendIT.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.integration.test.paimon;
+
+import com.google.common.collect.ImmutableMap;
+import java.nio.file.Path;
+import java.util.Map;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.io.TempDir;
+
+@Tag("gravitino-docker-test")
+public class FlinkPaimonLocalFileSystemBackendIT extends FlinkPaimonCatalogIT {
+
+  @TempDir private static Path warehouseDir;
+
+  private static final String DEFAULT_PAIMON_CATALOG =
+      "test_flink_paimon_filesystem_schema_catalog";
+
+  @Override
+  protected void createGravitinoCatalogByFlinkSql(String catalogName) {
+    tableEnv.executeSql(
+        String.format(
+            "create catalog %s with ("
+                + "'type'='gravitino-paimon', "
+                + "'warehouse'='%s',"
+                + "'metastore'='filesystem'"
+                + ")",
+            catalogName, warehouseDir));
+  }
+
+  @Override
+  protected String getPaimonCatalogName() {
+    return DEFAULT_PAIMON_CATALOG;
+  }
+
+  @Override
+  protected Map<String, String> getPaimonCatalogOptions() {
+    return ImmutableMap.of(
+        PaimonConstants.CATALOG_BACKEND, "filesystem", "warehouse", 
warehouseDir.toString());
+  }
+
+  @Override
+  protected String getWarehouse() {
+    return warehouseDir.toString();
+  }
+}

Reply via email to