This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push: new e626faaa59 [#6216] flink-connector: Add integration Tests for GravitinoPaimonCatalog with Catalog-backend as HiveMetaStore (#6805) e626faaa59 is described below commit e626faaa59527ab9ee087f4c38c500dba7fd25e1 Author: yangyang zhong <35210666+hdyg...@users.noreply.github.com> AuthorDate: Mon Apr 7 10:12:21 2025 +0800 [#6216] flink-connector: Add integration Tests for GravitinoPaimonCatalog with Catalog-backend as HiveMetaStore (#6805) ### Why are the changes needed? Fix: #6216 ### How was this patch tested? FlinkPaimonHiveCatalogIT --- .../test/paimon/FlinkPaimonCatalogIT.java | 76 +++++++++------------- .../test/paimon/FlinkPaimonHiveBackendIT.java | 59 +++++++++++++++++ .../FlinkPaimonLocalFileSystemBackendIT.java | 63 ++++++++++++++++++ 3 files changed, 152 insertions(+), 46 deletions(-) diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index 948e02dd81..caa480c756 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -19,28 +19,17 @@ package org.apache.gravitino.flink.connector.integration.test.paimon; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import java.nio.file.Path; import java.util.Map; import org.apache.gravitino.Catalog; -import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; -@Tag("gravitino-docker-test") -public class FlinkPaimonCatalogIT extends FlinkCommonIT { +public abstract class FlinkPaimonCatalogIT extends FlinkCommonIT { - @TempDir private static Path warehouseDir; - - private static final String DEFAULT_PAIMON_CATALOG = - "test_flink_paimon_filesystem_schema_catalog"; - - private static org.apache.gravitino.Catalog catalog; + protected org.apache.gravitino.Catalog catalog; @Override protected boolean supportSchemaOperationWithCommentAndOptions() { @@ -61,56 +50,51 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT { return catalog; } - @BeforeAll - void setup() { - initPaimonCatalog(); - } - - @AfterAll - void stop() { - Preconditions.checkNotNull(metalake); - metalake.dropCatalog(DEFAULT_PAIMON_CATALOG, true); - } - private void initPaimonCatalog() { Preconditions.checkNotNull(metalake); catalog = metalake.createCatalog( - DEFAULT_PAIMON_CATALOG, + getPaimonCatalogName(), org.apache.gravitino.Catalog.Type.RELATIONAL, getProvider(), null, - ImmutableMap.of( - PaimonConstants.CATALOG_BACKEND, - "filesystem", - "warehouse", - warehouseDir.toString())); + getPaimonCatalogOptions()); } + protected abstract void createGravitinoCatalogByFlinkSql(String catalogName); + + protected abstract String getPaimonCatalogName(); + + protected abstract Map<String, String> getPaimonCatalogOptions(); + + @BeforeAll + void paimonSetup() { + initPaimonCatalog(); + } + + @AfterAll + void paimonStop() { + Preconditions.checkNotNull(metalake); + metalake.dropCatalog(getPaimonCatalogName(), true); + } + + protected abstract String getWarehouse(); + @Test public void testCreateGravitinoPaimonCatalogUsingSQL() { tableEnv.useCatalog(DEFAULT_CATALOG); int numCatalogs = tableEnv.listCatalogs().length; - String catalogName = "gravitino_hive_sql"; - String warehouse = warehouseDir.toString(); - tableEnv.executeSql( - String.format( - "create catalog %s with (" - + "'type'='gravitino-paimon', " - + "'warehouse'='%s'," - + "'metastore'='filesystem'" - + ")", - catalogName, warehouse)); + String catalogName = "gravitino_paimon_catalog"; + createGravitinoCatalogByFlinkSql(catalogName); String[] catalogs = tableEnv.listCatalogs(); Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); Assertions.assertTrue(metalake.catalogExists(catalogName)); org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); Map<String, String> properties = gravitinoCatalog.properties(); - Assertions.assertEquals(warehouse, properties.get("warehouse")); - } - - @Override - protected Map<String, String> getCreateSchemaProps(String schemaName) { - return null; + Assertions.assertEquals(getWarehouse(), properties.get("warehouse")); + tableEnv.executeSql("drop catalog " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + Assertions.assertEquals( + numCatalogs, tableEnv.listCatalogs().length, "The created catalog should be dropped."); } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonHiveBackendIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonHiveBackendIT.java new file mode 100644 index 0000000000..9025baeb5f --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonHiveBackendIT.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.integration.test.paimon; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; +import org.junit.jupiter.api.Tag; + +@Tag("gravitino-docker-test") +public class FlinkPaimonHiveBackendIT extends FlinkPaimonCatalogIT { + + private static final String DEFAULT_PAIMON_CATALOG = "test_flink_paimon_hive_catalog"; + + @Override + protected void createGravitinoCatalogByFlinkSql(String catalogName) { + tableEnv.executeSql( + String.format( + "create catalog %s with (" + + "'type'='gravitino-paimon', " + + "'warehouse'='%s'," + + "'metastore'='hive'," + + "'uri'='%s'" + + ")", + catalogName, warehouse, hiveMetastoreUri)); + } + + @Override + protected String getPaimonCatalogName() { + return DEFAULT_PAIMON_CATALOG; + } + + @Override + protected Map<String, String> getPaimonCatalogOptions() { + return ImmutableMap.of( + PaimonConstants.CATALOG_BACKEND, "hive", "warehouse", warehouse, "uri", hiveMetastoreUri); + } + + @Override + protected String getWarehouse() { + return warehouse; + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonLocalFileSystemBackendIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonLocalFileSystemBackendIT.java new file mode 100644 index 0000000000..a84a240336 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonLocalFileSystemBackendIT.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.integration.test.paimon; + +import com.google.common.collect.ImmutableMap; +import java.nio.file.Path; +import java.util.Map; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.io.TempDir; + +@Tag("gravitino-docker-test") +public class FlinkPaimonLocalFileSystemBackendIT extends FlinkPaimonCatalogIT { + + @TempDir private static Path warehouseDir; + + private static final String DEFAULT_PAIMON_CATALOG = + "test_flink_paimon_filesystem_schema_catalog"; + + @Override + protected void createGravitinoCatalogByFlinkSql(String catalogName) { + tableEnv.executeSql( + String.format( + "create catalog %s with (" + + "'type'='gravitino-paimon', " + + "'warehouse'='%s'," + + "'metastore'='filesystem'" + + ")", + catalogName, warehouseDir)); + } + + @Override + protected String getPaimonCatalogName() { + return DEFAULT_PAIMON_CATALOG; + } + + @Override + protected Map<String, String> getPaimonCatalogOptions() { + return ImmutableMap.of( + PaimonConstants.CATALOG_BACKEND, "filesystem", "warehouse", warehouseDir.toString()); + } + + @Override + protected String getWarehouse() { + return warehouseDir.toString(); + } +}