lasdf1234 commented on code in PR #10560: URL: https://github.com/apache/gravitino/pull/10560#discussion_r3014079308
########## flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/session/FlinkSupportsSessionCatalogIT.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.integration.test.session; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Optional; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; +import org.apache.gravitino.catalog.hive.HiveConstants; +import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions; +import org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT; +import org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests verifying that the {@code gravitino.supportsSessionCatalog} option correctly + * routes catalog operations to the appropriate store: + * + * <ul> + * <li>Gravitino-managed catalog types (e.g. {@code gravitino-hive}) are persisted to the + * Gravitino server via {@link + * org.apache.gravitino.flink.connector.store.GravitinoCatalogStore}. + * <li>Session-scoped catalog types (e.g. {@code generic_in_memory}) are held in memory only and + * never reach the Gravitino server. + * <li>Both stores contribute to the catalog list returned by {@code SHOW CATALOGS}. + * </ul> + * + * <p>Requires a running Hive Docker container for the Gravitino-managed catalog tests. + */ +@Tag("gravitino-docker-test") +public class FlinkSupportsSessionCatalogIT extends FlinkEnvIT { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkSupportsSessionCatalogIT.class); + + private static String hiveConfDir; + + @Override + protected String getProvider() { + return "hive"; + } + + /** + * Overrides the default Flink environment to enable {@code + * gravitino.supportsSessionCatalog=true}, which wires a {@link + * org.apache.gravitino.flink.connector.store.GravitinoSessionCatalogStore} as the catalog store. + */ + @Override + protected void initFlinkEnv() { + initHiveConfDir(); + final Configuration configuration = new Configuration(); + configuration.setString( + "table.catalog-store.kind", GravitinoCatalogStoreFactoryOptions.GRAVITINO); + configuration.setString("table.catalog-store.gravitino.gravitino.metalake", GRAVITINO_METALAKE); + configuration.setString("table.catalog-store.gravitino.gravitino.uri", gravitinoUri); + configuration.setBoolean( + "table.catalog-store.gravitino.gravitino.supportsSessionCatalog", true); + EnvironmentSettings settings = + EnvironmentSettings.newInstance().withConfiguration(configuration).inBatchMode().build(); + tableEnv = TableEnvironment.create(settings); + LOG.info( + "Flink env with supportsSessionCatalog=true initialized, Gravitino uri: {}.", gravitinoUri); + } + + @BeforeAll + void sessionCatalogStartUp() { + Preconditions.checkArgument(metalake != null, "metalake should not be null"); + LOG.info("FlinkSupportsSessionCatalogIT startup complete."); + } + + @AfterAll + static void sessionCatalogStop() { + Preconditions.checkArgument(metalake != null, "metalake should not be null"); + LOG.info("FlinkSupportsSessionCatalogIT teardown complete."); + } + + /** + * A Gravitino-managed catalog type ({@code gravitino-hive}) created via {@link + * TableEnvironment#createCatalog} must be persisted to the Gravitino server. + */ + @Test + public void testCreateGravitinoHiveCatalog() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String catalogName = "session_it_gravitino_hive"; + + Configuration conf = new Configuration(); + conf.set(CommonCatalogOptions.CATALOG_TYPE, GravitinoHiveCatalogFactoryOptions.IDENTIFIER); + conf.set(HiveCatalogFactoryOptions.HIVE_CONF_DIR, hiveConfDir); + conf.set(GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS, hiveMetastoreUri); + CatalogDescriptor descriptor = CatalogDescriptor.of(catalogName, conf); + tableEnv.createCatalog(catalogName, descriptor); + + try { + Assertions.assertTrue( + metalake.catalogExists(catalogName), + "Gravitino-managed catalog must be persisted to the Gravitino server"); + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should contain the created catalog"); + org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Assertions.assertEquals( + hiveMetastoreUri, gravitinoCatalog.properties().get(HiveConstants.METASTORE_URIS)); + } finally { + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("DROP CATALOG " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + } + } + + /** + * A Gravitino-managed catalog type ({@code gravitino-hive}) created via Flink SQL {@code CREATE + * CATALOG} must be persisted to the Gravitino server. + */ + @Test + public void testCreateGravitinoHiveCatalogUsingSQL() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String catalogName = "session_it_gravitino_hive_sql"; + + tableEnv.executeSql( + String.format( + "CREATE CATALOG %s WITH (" + + "'type'='%s'," + + "'hive-conf-dir'='%s'," + + "'hive.metastore.uris'='%s'" + + ")", + catalogName, + GravitinoHiveCatalogFactoryOptions.IDENTIFIER, + hiveConfDir, + hiveMetastoreUri)); + + try { + Assertions.assertTrue( + metalake.catalogExists(catalogName), + "Gravitino-managed catalog must be persisted to the Gravitino server"); + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should contain the created catalog"); + } finally { + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("DROP CATALOG " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + } + } + + /** + * A session-scoped catalog type ({@code generic_in_memory}) created via Flink SQL must be + * accessible in Flink but must NOT be persisted to the Gravitino server. + */ + @Test + public void testCreateSessionScopedCatalog() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String catalogName = "session_it_memory_catalog"; + + tableEnv.executeSql( + String.format("CREATE CATALOG %s WITH ('type'='generic_in_memory')", catalogName)); + + try { + Assertions.assertFalse( + metalake.catalogExists(catalogName), + "Session-scoped catalog must NOT be persisted to the Gravitino server"); + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should contain the created catalog"); + } finally { + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("DROP CATALOG " + catalogName); + } + } + + /** + * {@code SHOW CATALOGS} must return catalogs from both the Gravitino-backed store and the + * in-memory store when {@code supportsSessionCatalog=true}. + */ + @Test + public void testListCatalogsReturnsBothStores() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String gravitinoCatalogName = "session_it_list_gravitino_hive"; + String sessionCatalogName = "session_it_list_memory_catalog"; + + tableEnv.executeSql( + String.format( + "CREATE CATALOG %s WITH (" + + "'type'='%s'," + + "'hive-conf-dir'='%s'," + + "'hive.metastore.uris'='%s'" + + ")", + gravitinoCatalogName, + GravitinoHiveCatalogFactoryOptions.IDENTIFIER, + hiveConfDir, + hiveMetastoreUri)); + tableEnv.executeSql( + String.format("CREATE CATALOG %s WITH ('type'='generic_in_memory')", sessionCatalogName)); + + try { + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 2, catalogs.length, "Should have two more catalogs"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(gravitinoCatalogName), + "Gravitino-managed catalog must appear in SHOW CATALOGS"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(sessionCatalogName), + "Session-scoped catalog must appear in SHOW CATALOGS"); + } finally { + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("DROP CATALOG " + gravitinoCatalogName); + tableEnv.executeSql("DROP CATALOG " + sessionCatalogName); + Assertions.assertFalse(metalake.catalogExists(gravitinoCatalogName)); + } + } + + /** + * Dropping a Gravitino-managed catalog via Flink SQL must remove it from the Gravitino server and + * from the Flink catalog list. + */ + @Test + public void testDropGravitinoHiveCatalog() { + tableEnv.useCatalog(DEFAULT_CATALOG); + String catalogName = "session_it_drop_gravitino_hive"; + + tableEnv.executeSql( + String.format( + "CREATE CATALOG %s WITH (" + + "'type'='%s'," + + "'hive-conf-dir'='%s'," + + "'hive.metastore.uris'='%s'" + + ")", + catalogName, + GravitinoHiveCatalogFactoryOptions.IDENTIFIER, + hiveConfDir, + hiveMetastoreUri)); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + tableEnv.executeSql("DROP CATALOG " + catalogName); + + Assertions.assertFalse( + metalake.catalogExists(catalogName), + "Dropped Gravitino-managed catalog must be removed from the Gravitino server"); + Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName); + Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); + } + + /** + * Dropping a session-scoped catalog via Flink SQL must remove it from the in-memory store without + * error, since it was never registered in Gravitino. + */ + @Test + public void testDropSessionScopedCatalog() { + tableEnv.useCatalog(DEFAULT_CATALOG); + String catalogName = "session_it_drop_memory_catalog"; + + tableEnv.executeSql( + String.format("CREATE CATALOG %s WITH ('type'='generic_in_memory')", catalogName)); + Assertions.assertFalse( + metalake.catalogExists(catalogName), + "Session-scoped catalog must NOT be in Gravitino before drop"); + + tableEnv.executeSql("DROP CATALOG " + catalogName); + + Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName); + Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); + } + + private static void initHiveConfDir() { + if (hiveConfDir != null) { + return; + } + try { + java.nio.file.Path dir = java.nio.file.Files.createTempDirectory("flink-session-hive-conf"); + java.nio.file.Path hiveSite = dir.resolve("hive-site.xml"); + String hiveSiteXml = + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" Review Comment: Got resolved it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
