This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 5191e26130 [#6368] improvement(flink-connector): Add tests for REST
Catalog to the Flink connector for Iceberg (#6622)
5191e26130 is described below
commit 5191e261302fbcffcb0291799be8d27bf252b110
Author: Xiaojian Sun <[email protected]>
AuthorDate: Thu Mar 13 08:47:04 2025 +0800
[#6368] improvement(flink-connector): Add tests for REST Catalog to the
Flink connector for Iceberg (#6622)
### What changes were proposed in this pull request?
Add tests for REST Catalog to the Flink connector for Iceberg
### Why are the changes needed?
Fix: #6368
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
N/A
---
flink-connector/flink/build.gradle.kts | 2 +
.../connector/integration/test/FlinkCommonIT.java | 3 -
.../connector/integration/test/FlinkEnvIT.java | 58 ++++++++++++-
.../test/iceberg/FlinkIcebergCatalogIT.java | 8 +-
.../test/iceberg/FlinkIcebergHiveCatalogIT.java | 6 ++
.../test/iceberg/FlinkIcebergRestCatalogIT.java | 98 ++++++++++++++++++++++
6 files changed, 166 insertions(+), 9 deletions(-)
diff --git a/flink-connector/flink/build.gradle.kts
b/flink-connector/flink/build.gradle.kts
index 6cbfbfa53b..d3f09ffa8f 100644
--- a/flink-connector/flink/build.gradle.kts
+++ b/flink-connector/flink/build.gradle.kts
@@ -98,6 +98,8 @@ dependencies {
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.metrics.core)
+ testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
+
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion")
testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
testImplementation("org.apache.flink:flink-table-common:$flinkVersion")
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
index 8ff6f8db7a..d6ba039968 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
@@ -77,8 +77,6 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
return true;
}
- protected abstract String getProvider();
-
protected abstract boolean supportDropCascade();
protected boolean supportsPrimaryKey() {
@@ -219,7 +217,6 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
Assertions.assertEquals("test comment", loadedSchema.comment());
Assertions.assertEquals("value1",
loadedSchema.properties().get("key1"));
Assertions.assertEquals("value2",
loadedSchema.properties().get("key2"));
-
Assertions.assertNotNull(loadedSchema.properties().get("location"));
TestUtils.assertTableResult(
sql("ALTER DATABASE %s SET ('key1'='new-value',
'key3'='value3')", schema),
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
index 959123f336..d054fed6c3 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
@@ -24,6 +24,7 @@ import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
@@ -37,11 +38,13 @@ import org.apache.flink.types.Row;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.client.GravitinoMetalake;
import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants;
import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
import
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
import org.apache.gravitino.integration.test.container.ContainerSuite;
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.server.web.JettyServerConfig;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -51,6 +54,9 @@ import org.slf4j.LoggerFactory;
public abstract class FlinkEnvIT extends BaseIT {
private static final Logger LOG = LoggerFactory.getLogger(FlinkEnvIT.class);
private static final ContainerSuite CONTAINER_SUITE =
ContainerSuite.getInstance();
+
+ protected static final String icebergRestServiceName = "iceberg-rest";
+
protected static final String GRAVITINO_METALAKE = "flink";
protected static final String DEFAULT_CATALOG = "default_catalog";
@@ -65,21 +71,30 @@ public abstract class FlinkEnvIT extends BaseIT {
private static String gravitinoUri = "http://127.0.0.1:8090";
+ private final String lakeHouseIcebergProvider = "lakehouse-iceberg";
+
+ protected String icebergRestServiceUri;
+
@BeforeAll
- void startUp() {
+ void startUp() throws Exception {
+ initHiveEnv();
+ if (lakeHouseIcebergProvider.equalsIgnoreCase(getProvider())) {
+ initIcebergRestServiceEnv();
+ }
// Start Gravitino server
+ super.startIntegrationTest();
initGravitinoEnv();
initMetalake();
- initHiveEnv();
initHdfsEnv();
initFlinkEnv();
LOG.info("Startup Flink env successfully, Gravitino uri: {}.",
gravitinoUri);
}
@AfterAll
- static void stop() {
+ void stop() throws IOException, InterruptedException {
stopFlinkEnv();
stopHdfsEnv();
+ super.stopIntegrationTest();
LOG.info("Stop Flink env successfully.");
}
@@ -87,10 +102,39 @@ public abstract class FlinkEnvIT extends BaseIT {
return PropertiesConverter.FLINK_PROPERTY_PREFIX + key;
}
+ protected abstract String getProvider();
+
+ private void initIcebergRestServiceEnv() {
+ ignoreIcebergRestService = false;
+ Map<String, String> icebergRestServiceConfigs = new HashMap<>();
+ icebergRestServiceConfigs.put(
+ "gravitino."
+ + icebergRestServiceName
+ + "."
+ + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE);
+ icebergRestServiceConfigs.put(
+ "gravitino."
+ + icebergRestServiceName
+ + "."
+ + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
+ hiveMetastoreUri);
+ icebergRestServiceConfigs.put(
+ "gravitino."
+ + icebergRestServiceName
+ + "."
+ + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
+ warehouse);
+ registerCustomConfigs(icebergRestServiceConfigs);
+ }
+
private void initGravitinoEnv() {
// Gravitino server is already started by AbstractIT, just construct
gravitinoUrl
int gravitinoPort = getGravitinoServerPort();
gravitinoUri = String.format("http://127.0.0.1:%d", gravitinoPort);
+ if (lakeHouseIcebergProvider.equalsIgnoreCase(getProvider())) {
+ this.icebergRestServiceUri = getIcebergRestServiceUri();
+ }
}
private void initMetalake() {
@@ -212,4 +256,12 @@ public abstract class FlinkEnvIT extends BaseIT {
TestUtils.assertTableResult(deleteResult, ResultKind.SUCCESS);
}
}
+
+ private String getIcebergRestServiceUri() {
+ JettyServerConfig jettyServerConfig =
+ JettyServerConfig.fromConfig(
+ serverConfig, String.format("gravitino.%s.",
icebergRestServiceName));
+ return String.format(
+ "http://%s:%d/iceberg/", jettyServerConfig.getHost(),
jettyServerConfig.getHttpPort());
+ }
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
index f8a3cdf2e1..8f41126ae6 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
@@ -100,7 +100,7 @@ public abstract class FlinkIcebergCatalogIT extends
FlinkCommonIT {
// Check the catalog properties.
org.apache.gravitino.Catalog gravitinoCatalog =
metalake.loadCatalog(catalogName);
Map<String, String> properties = gravitinoCatalog.properties();
- Assertions.assertEquals(hiveMetastoreUri,
properties.get(IcebergConstants.URI));
+ Assertions.assertEquals(getUri(), properties.get(IcebergConstants.URI));
// Get the created catalog.
Optional<org.apache.flink.table.catalog.Catalog> catalog =
tableEnv.getCatalog(catalogName);
@@ -153,14 +153,14 @@ public abstract class FlinkIcebergCatalogIT extends
FlinkCommonIT {
catalogName,
GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
getCatalogBackend(),
- hiveMetastoreUri,
+ getUri(),
warehouse));
Assertions.assertTrue(metalake.catalogExists(catalogName));
// Check the properties of the created catalog.
org.apache.gravitino.Catalog gravitinoCatalog =
metalake.loadCatalog(catalogName);
Map<String, String> properties = gravitinoCatalog.properties();
- Assertions.assertEquals(hiveMetastoreUri,
properties.get(IcebergConstants.URI));
+ Assertions.assertEquals(getUri(), properties.get(IcebergConstants.URI));
// Get the created catalog.
Optional<org.apache.flink.table.catalog.Catalog> catalog =
tableEnv.getCatalog(catalogName);
@@ -499,4 +499,6 @@ public abstract class FlinkIcebergCatalogIT extends
FlinkCommonIT {
}
protected abstract String getCatalogBackend();
+
+ protected abstract String getUri();
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java
index fc21ce2c24..014fd48d51 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java
@@ -40,7 +40,13 @@ public class FlinkIcebergHiveCatalogIT extends
FlinkIcebergCatalogIT {
return catalogProperties;
}
+ @Override
protected String getCatalogBackend() {
return "hive";
}
+
+ @Override
+ protected String getUri() {
+ return hiveMetastoreUri;
+ }
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergRestCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergRestCatalogIT.java
new file mode 100644
index 0000000000..e8a5b4eb06
--- /dev/null
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergRestCatalogIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.integration.test.iceberg;
+
+import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.types.Row;
+import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants;
+import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+
+@Tag("gravitino-docker-test")
+public class FlinkIcebergRestCatalogIT extends FlinkIcebergCatalogIT {
+
+ @Override
+ protected Map<String, String> getCatalogConfigs() {
+ Map<String, String> catalogProperties = Maps.newHashMap();
+ catalogProperties.put(
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+ IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST);
+ catalogProperties.put(
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
warehouse);
+ catalogProperties.put(
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
icebergRestServiceUri);
+ return catalogProperties;
+ }
+
+ @Override
+ public void testListSchema() {
+ doWithCatalog(
+ currentCatalog(),
+ catalog -> {
+ String schema = "test_list_schema";
+ String schema2 = "test_list_schema2";
+ String schema3 = "test_list_schema3";
+
+ try {
+ TestUtils.assertTableResult(
+ sql("CREATE DATABASE IF NOT EXISTS %s", schema),
ResultKind.SUCCESS);
+ TestUtils.assertTableResult(
+ sql("CREATE DATABASE IF NOT EXISTS %s", schema2),
ResultKind.SUCCESS);
+ TestUtils.assertTableResult(
+ sql("CREATE DATABASE IF NOT EXISTS %s", schema3),
ResultKind.SUCCESS);
+ TestUtils.assertTableResult(
+ sql("SHOW DATABASES"),
+ ResultKind.SUCCESS_WITH_CONTENT,
+ Row.of("default"),
+ Row.of(schema),
+ Row.of(schema2),
+ Row.of(schema3));
+
+ String[] schemas = catalog.asSchemas().listSchemas();
+ Arrays.sort(schemas);
+ Assertions.assertEquals(4, schemas.length);
+ Assertions.assertEquals("default", schemas[0]);
+ Assertions.assertEquals(schema, schemas[1]);
+ Assertions.assertEquals(schema2, schemas[2]);
+ Assertions.assertEquals(schema3, schemas[3]);
+ } finally {
+ catalog.asSchemas().dropSchema(schema, supportDropCascade());
+ catalog.asSchemas().dropSchema(schema2, supportDropCascade());
+ catalog.asSchemas().dropSchema(schema3, supportDropCascade());
+ // TODO: The check cannot pass in CI, but it can be successful
locally.
+ // Assertions.assertEquals(1,
catalog.asSchemas().listSchemas().length);
+ }
+ });
+ }
+
+ @Override
+ protected String getCatalogBackend() {
+ return "rest";
+ }
+
+ @Override
+ protected String getUri() {
+ return icebergRestServiceUri;
+ }
+}