This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new fd26b565ae [#3515] feat(flink-connector): Support flink iceberg 
catalog (#5914)
fd26b565ae is described below

commit fd26b565aef1527fcd126b89c4e110180fa00a83
Author: Xiaojian Sun <sunxiaojian...@163.com>
AuthorDate: Thu Jan 16 15:56:55 2025 +0800

    [#3515] feat(flink-connector): Support flink iceberg catalog (#5914)
    
    ### What changes were proposed in this pull request?
    
    Support flink iceberg catalog
    
    ### Why are the changes needed?
    
    Fix: [#3515](https://github.com/apache/gravitino/issues/3515)
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    FlinkIcebergCatalogIT
    FlinkIcebergHiveCatalogIT
---
 docs/flink-connector/flink-catalog-iceberg.md      |  78 +++++
 flink-connector/flink/build.gradle.kts             |   6 +
 .../connector/catalog/GravitinoCatalogManager.java |  11 -
 .../connector/iceberg/GravitinoIcebergCatalog.java |  67 +++++
 .../iceberg/GravitinoIcebergCatalogFactory.java    |  95 ++++++
 .../GravitinoIcebergCatalogFactoryOptions.java     |  33 +++
 .../iceberg/IcebergPropertiesConstants.java        |  49 ++++
 .../iceberg/IcebergPropertiesConverter.java        |  84 ++++++
 .../org.apache.flink.table.factories.Factory       |   3 +-
 .../iceberg/TestIcebergPropertiesConverter.java    |  82 ++++++
 .../connector/integration/test/FlinkCommonIT.java  |  62 ++--
 .../connector/integration/test/FlinkEnvIT.java     |  37 ++-
 .../integration/test/hive/FlinkHiveCatalogIT.java  |  42 +--
 .../FlinkIcebergCatalogIT.java}                    | 325 +++++++--------------
 .../test/iceberg/FlinkIcebergHiveCatalogIT.java    |  46 +++
 .../test/paimon/FlinkPaimonCatalogIT.java          |  21 +-
 .../integration/test/utils/TestUtils.java          |   3 +-
 17 files changed, 757 insertions(+), 287 deletions(-)

diff --git a/docs/flink-connector/flink-catalog-iceberg.md 
b/docs/flink-connector/flink-catalog-iceberg.md
new file mode 100644
index 0000000000..54d7c0879f
--- /dev/null
+++ b/docs/flink-connector/flink-catalog-iceberg.md
@@ -0,0 +1,78 @@
+---
+title: "Flink connector Iceberg catalog"
+slug: /flink-connector/flink-catalog-iceberg
+keyword: flink connector iceberg catalog
+license: "This software is licensed under the Apache License version 2."
+---
+
+The Apache Gravitino Flink connector can be used to read and write Iceberg 
tables, with the metadata managed by the Gravitino server.
+To enable the Flink connector, you must download the Iceberg Flink runtime JAR 
and place it in the Flink classpath.
+
+## Capabilities
+
+#### Supported DML and DDL operations:
+
+- `CREATE CATALOG`
+- `CREATE DATABASE`
+- `CREATE TABLE`
+- `DROP TABLE`
+- `ALTER TABLE`
+- `INSERT INTO & OVERWRITE`
+- `SELECT`
+
+#### Operations not supported:
+
+- Partition operations
+- View operations
+- Metadata tables, like:
+  - `{iceberg_catalog}.{iceberg_database}.{iceberg_table}&snapshots`
+- Query UDF
+- `UPDATE` clause
+- `DELETE` clause
+- `CREATE TABLE LIKE` clause
+
+## SQL example
+```sql
+
+-- Suppose iceberg_a is the Iceberg catalog name managed by Gravitino
+
+USE iceberg_a;
+
+CREATE DATABASE IF NOT EXISTS mydatabase;
+USE mydatabase;
+
+CREATE TABLE sample (
+    id BIGINT COMMENT 'unique id',
+    data STRING NOT NULL
+) PARTITIONED BY (data) 
+WITH ('format-version'='2');
+
+INSERT INTO sample
+VALUES (1, 'A'), (2, 'B');
+
+SELECT * FROM sample WHERE data = 'B';
+
+```
+
+## Catalog properties
+
+The Gravitino Flink connector transforms the following properties in a catalog 
to Flink connector configuration.
+
+
+| Gravitino catalog property name | Flink Iceberg connector configuration | 
Description                                                                     
                                                                                
                                                 | Since Version     |
+|---------------------------------|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------|
+| `catalog-backend`               | `catalog-type`                        | 
Catalog backend type, currently, only `Hive` Catalog is supported, `JDBC` and 
`Rest` in Continuous Validation                                                 
                                                   | 0.8.0-incubating  |
+| `uri`                           | `uri`                                 | 
Catalog backend URI                                                             
                                                                                
                                                 | 0.8.0-incubating  |
+| `warehouse`                     | `warehouse`                           | 
Catalog backend warehouse                                                       
                                                                                
                                                 | 0.8.0-incubating  |
+| `io-impl`                       | `io-impl`                             | 
The IO implementation for `FileIO` in Iceberg.                                  
                                                                                
                                                 | 0.8.0-incubating  |
+| `oss-endpoint`                  | `oss.endpoint`                        | 
The endpoint of Aliyun OSS service.                                             
                                                                                
                                                 | 0.8.0-incubating  |
+| `oss-access-key-id`             | `client.access-key-id`                | 
The static access key ID used to access OSS data.                               
                                                                                
                                                 | 0.8.0-incubating |
+| `oss-secret-access-key`         | `client.access-key-secret`            | 
The static secret access key used to access OSS data.                           
                                                                                
                                                 | 0.8.0-incubating |
+
+Gravitino catalog property names with the prefix `flink.bypass.` are passed to 
Flink iceberg connector. For example, using `flink.bypass.clients` to pass the 
`clients` to the Flink iceberg connector.
+
+## Storage
+
+### OSS
+
+Additionally, you need download the [Aliyun OSS 
SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip), and 
copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` to 
the Flink classpath.
diff --git a/flink-connector/flink/build.gradle.kts 
b/flink-connector/flink/build.gradle.kts
index f137a3eae1..4c9bd036ae 100644
--- a/flink-connector/flink/build.gradle.kts
+++ b/flink-connector/flink/build.gradle.kts
@@ -30,6 +30,8 @@ var paimonVersion: String = libs.versions.paimon.get()
 val flinkVersion: String = libs.versions.flink.get()
 val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".")
 
+val icebergVersion: String = libs.versions.iceberg.get()
+
 // The Flink only support scala 2.12, and all scala api will be removed in a 
future version.
 // You can find more detail at the following issues:
 // https://issues.apache.org/jira/browse/FLINK-23986,
@@ -44,6 +46,8 @@ dependencies {
   implementation(libs.guava)
 
   compileOnly(project(":clients:client-java-runtime", configuration = 
"shadow"))
+
+  
compileOnly("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion")
   
compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
   compileOnly("org.apache.flink:flink-table-common:$flinkVersion")
   compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion")
@@ -88,7 +92,9 @@ dependencies {
   testImplementation(libs.testcontainers)
   testImplementation(libs.testcontainers.junit.jupiter)
   testImplementation(libs.testcontainers.mysql)
+  testImplementation(libs.metrics.core)
 
+  
testImplementation("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion")
   
testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
   testImplementation("org.apache.flink:flink-table-common:$flinkVersion")
   testImplementation("org.apache.flink:flink-table-api-java:$flinkVersion")
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
index 7693e5d4c9..0b0b89f3a5 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
@@ -35,12 +35,10 @@ public class GravitinoCatalogManager {
   private static GravitinoCatalogManager gravitinoCatalogManager;
 
   private volatile boolean isClosed = false;
-  private final String metalakeName;
   private final GravitinoMetalake metalake;
   private final GravitinoAdminClient gravitinoClient;
 
   private GravitinoCatalogManager(String gravitinoUri, String metalakeName) {
-    this.metalakeName = metalakeName;
     this.gravitinoClient = GravitinoAdminClient.builder(gravitinoUri).build();
     this.metalake = gravitinoClient.loadMetalake(metalakeName);
   }
@@ -99,15 +97,6 @@ public class GravitinoCatalogManager {
     return catalog;
   }
 
-  /**
-   * Get the metalake.
-   *
-   * @return the metalake name.
-   */
-  public String getMetalakeName() {
-    return metalakeName;
-  }
-
   /**
    * Create catalog in Gravitino.
    *
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
new file mode 100644
index 0000000000..30fac96bbc
--- /dev/null
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.iceberg;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.factories.Factory;
+import org.apache.gravitino.flink.connector.PartitionConverter;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
+import org.apache.iceberg.flink.FlinkCatalog;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+
+/** Gravitino Iceberg Catalog. */
+public class GravitinoIcebergCatalog extends BaseCatalog {
+
+  private final FlinkCatalog icebergCatalog;
+
+  protected GravitinoIcebergCatalog(
+      String catalogName,
+      String defaultDatabase,
+      PropertiesConverter propertiesConverter,
+      PartitionConverter partitionConverter,
+      Map<String, String> properties) {
+    super(catalogName, defaultDatabase, propertiesConverter, 
partitionConverter);
+    FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
+    this.icebergCatalog = (FlinkCatalog) 
flinkCatalogFactory.createCatalog(catalogName, properties);
+  }
+
+  @Override
+  public void open() throws CatalogException {
+    icebergCatalog.open();
+  }
+
+  @Override
+  public void close() throws CatalogException {
+    icebergCatalog.close();
+  }
+
+  @Override
+  public Optional<Factory> getFactory() {
+    return icebergCatalog.getFactory();
+  }
+
+  @Override
+  protected AbstractCatalog realCatalog() {
+    return icebergCatalog;
+  }
+}
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java
new file mode 100644
index 0000000000..ad0363d986
--- /dev/null
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.iceberg;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
+import org.apache.gravitino.flink.connector.PartitionConverter;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
+import org.apache.gravitino.flink.connector.utils.FactoryUtils;
+
+public class GravitinoIcebergCatalogFactory implements BaseCatalogFactory {
+
+  @Override
+  public Catalog createCatalog(Context context) {
+    final FactoryUtil.CatalogFactoryHelper helper =
+        FactoryUtils.createCatalogFactoryHelper(this, context);
+    return new GravitinoIcebergCatalog(
+        context.getName(),
+        
helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE),
+        propertiesConverter(),
+        partitionConverter(),
+        context.getOptions());
+  }
+
+  @Override
+  public String factoryIdentifier() {
+    return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER;
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return Collections.emptySet();
+  }
+
+  /**
+   * Define gravitino catalog provider.
+   *
+   * @return
+   */
+  @Override
+  public String gravitinoCatalogProvider() {
+    return "lakehouse-iceberg";
+  }
+
+  /**
+   * Define gravitino catalog type.
+   *
+   * @return
+   */
+  @Override
+  public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
+    return org.apache.gravitino.Catalog.Type.RELATIONAL;
+  }
+
+  /**
+   * Define properties converter.
+   *
+   * @return
+   */
+  @Override
+  public PropertiesConverter propertiesConverter() {
+    return IcebergPropertiesConverter.INSTANCE;
+  }
+
+  @Override
+  public PartitionConverter partitionConverter() {
+    return DefaultPartitionConverter.INSTANCE;
+  }
+}
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java
new file mode 100644
index 0000000000..95e1a21de8
--- /dev/null
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.iceberg;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+
+public class GravitinoIcebergCatalogFactoryOptions {
+
+  public static final String IDENTIFIER = "gravitino-iceberg";
+  public static final ConfigOption<String> DEFAULT_DATABASE =
+      ConfigOptions.key(FlinkCatalogFactory.DEFAULT_DATABASE)
+          .stringType()
+          .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME);
+}
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java
new file mode 100644
index 0000000000..163cfac882
--- /dev/null
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.iceberg;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+
+public class IcebergPropertiesConstants {
+  @VisibleForTesting
+  public static String GRAVITINO_ICEBERG_CATALOG_BACKEND = 
IcebergConstants.CATALOG_BACKEND;
+
+  public static final String ICEBERG_CATALOG_TYPE = 
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE;
+
+  public static final String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE = 
IcebergConstants.WAREHOUSE;
+
+  public static final String ICEBERG_CATALOG_WAREHOUSE = 
CatalogProperties.WAREHOUSE_LOCATION;
+
+  public static final String GRAVITINO_ICEBERG_CATALOG_URI = 
IcebergConstants.URI;
+
+  public static final String ICEBERG_CATALOG_URI = CatalogProperties.URI;
+
+  @VisibleForTesting
+  public static String ICEBERG_CATALOG_BACKEND_HIVE = 
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;
+
+  public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive";
+
+  @VisibleForTesting
+  public static final String ICEBERG_CATALOG_BACKEND_REST = 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
+}
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
new file mode 100644
index 0000000000..7684d3eadb
--- /dev/null
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.iceberg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+
+public class IcebergPropertiesConverter implements PropertiesConverter {
+  public static IcebergPropertiesConverter INSTANCE = new 
IcebergPropertiesConverter();
+
+  private IcebergPropertiesConverter() {}
+
+  private static final Map<String, String> GRAVITINO_CONFIG_TO_FLINK_ICEBERG =
+      ImmutableMap.of(
+          IcebergConstants.CATALOG_BACKEND, 
IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE);
+
+  @Override
+  public Map<String, String> toFlinkCatalogProperties(Map<String, String> 
gravitinoProperties) {
+    Preconditions.checkArgument(
+        gravitinoProperties != null, "Iceberg Catalog properties should not be 
null.");
+
+    Map<String, String> all = new HashMap<>();
+    if (gravitinoProperties != null) {
+      gravitinoProperties.forEach(
+          (k, v) -> {
+            if (k.startsWith(FLINK_PROPERTY_PREFIX)) {
+              String newKey = k.substring(FLINK_PROPERTY_PREFIX.length());
+              all.put(newKey, v);
+            }
+          });
+    }
+    Map<String, String> transformedProperties =
+        IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties);
+
+    if (transformedProperties != null) {
+      all.putAll(transformedProperties);
+    }
+    all.put(
+        CommonCatalogOptions.CATALOG_TYPE.key(), 
GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
+    // Map "catalog-backend" to "catalog-type".
+    // TODO If catalog backend is CUSTOM, we need special compatibility logic.
+    GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach(
+        (key, value) -> {
+          if (all.containsKey(key)) {
+            String config = all.remove(key);
+            all.put(value, config);
+          }
+        });
+    return all;
+  }
+
+  @Override
+  public Map<String, String> toGravitinoTableProperties(Map<String, String> 
properties) {
+    return new HashMap<>(properties);
+  }
+
+  @Override
+  public Map<String, String> toFlinkTableProperties(Map<String, String> 
properties) {
+    return new HashMap<>(properties);
+  }
+}
diff --git 
a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index a535afb6dc..45ff2512e7 100644
--- 
a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -19,4 +19,5 @@
 
 org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory
 org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory
-org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory
\ No newline at end of file
+org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory
+org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactory
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
new file mode 100644
index 0000000000..d6de522f39
--- /dev/null
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.iceberg;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergPropertiesConverter {
+  private static final IcebergPropertiesConverter CONVERTER = 
IcebergPropertiesConverter.INSTANCE;
+
+  @Test
+  void testCatalogPropertiesWithHiveBackend() {
+    Map<String, String> properties =
+        CONVERTER.toFlinkCatalogProperties(
+            ImmutableMap.of(
+                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+                
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE,
+                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
+                "hive-uri",
+                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
+                "hive-warehouse",
+                "key1",
+                "value1"));
+    Assertions.assertEquals(
+        ImmutableMap.of(
+            CommonCatalogOptions.CATALOG_TYPE.key(),
+            GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
+            IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
+            IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE,
+            IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
+            "hive-uri",
+            IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
+            "hive-warehouse"),
+        properties);
+  }
+
+  @Test
+  void testCatalogPropertiesWithRestBackend() {
+    Map<String, String> properties =
+        CONVERTER.toFlinkCatalogProperties(
+            ImmutableMap.of(
+                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+                IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
+                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
+                "rest-uri",
+                IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
+                "rest-warehouse",
+                "key1",
+                "value1"));
+    Assertions.assertEquals(
+        ImmutableMap.of(
+            CommonCatalogOptions.CATALOG_TYPE.key(),
+            GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
+            IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
+            IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
+            IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
+            "rest-uri",
+            IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
+            "rest-warehouse"),
+        properties);
+  }
+}
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
index 5a363e4e51..b45e5f46ec 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
@@ -72,6 +72,14 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
     return true;
   }
 
+  protected boolean supportGetSchemaWithoutCommentAndOption() {
+    return true;
+  }
+
+  protected abstract String getProvider();
+
+  protected abstract boolean supportDropCascade();
+
   @Test
   public void testCreateSchema() {
     doWithCatalog(
@@ -83,13 +91,14 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
             TestUtils.assertTableResult(tableResult, ResultKind.SUCCESS);
             catalog.asSchemas().schemaExists(schema);
           } finally {
-            catalog.asSchemas().dropSchema(schema, true);
+            catalog.asSchemas().dropSchema(schema, supportDropCascade());
             Assertions.assertFalse(catalog.asSchemas().schemaExists(schema));
           }
         });
   }
 
   @Test
+  @EnabledIf("supportGetSchemaWithoutCommentAndOption")
   public void testGetSchemaWithoutCommentAndOption() {
     doWithCatalog(
         currentCatalog(),
@@ -134,12 +143,11 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
             Schema loadedSchema = catalog.asSchemas().loadSchema(schema);
             Assertions.assertEquals(schema, loadedSchema.name());
             Assertions.assertEquals(comment, loadedSchema.comment());
-            Assertions.assertEquals(2, loadedSchema.properties().size());
             Assertions.assertEquals(propertyValue, 
loadedSchema.properties().get(propertyKey));
             Assertions.assertEquals(
                 location, 
loadedSchema.properties().get(HiveConstants.LOCATION));
           } finally {
-            catalog.asSchemas().dropSchema(schema, true);
+            catalog.asSchemas().dropSchema(schema, supportDropCascade());
             Assertions.assertFalse(catalog.asSchemas().schemaExists(schema));
           }
         });
@@ -177,9 +185,9 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
             Assertions.assertEquals(schema2, schemas[2]);
             Assertions.assertEquals(schema3, schemas[3]);
           } finally {
-            catalog.asSchemas().dropSchema(schema, true);
-            catalog.asSchemas().dropSchema(schema2, true);
-            catalog.asSchemas().dropSchema(schema3, true);
+            catalog.asSchemas().dropSchema(schema, supportDropCascade());
+            catalog.asSchemas().dropSchema(schema2, supportDropCascade());
+            catalog.asSchemas().dropSchema(schema3, supportDropCascade());
             Assertions.assertEquals(1, 
catalog.asSchemas().listSchemas().length);
           }
         });
@@ -204,7 +212,6 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
             Schema loadedSchema = catalog.asSchemas().loadSchema(schema);
             Assertions.assertEquals(schema, loadedSchema.name());
             Assertions.assertEquals("test comment", loadedSchema.comment());
-            Assertions.assertEquals(3, loadedSchema.properties().size());
             Assertions.assertEquals("value1", 
loadedSchema.properties().get("key1"));
             Assertions.assertEquals("value2", 
loadedSchema.properties().get("key2"));
             
Assertions.assertNotNull(loadedSchema.properties().get("location"));
@@ -215,11 +222,10 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
             Schema reloadedSchema = catalog.asSchemas().loadSchema(schema);
             Assertions.assertEquals(schema, reloadedSchema.name());
             Assertions.assertEquals("test comment", reloadedSchema.comment());
-            Assertions.assertEquals(4, reloadedSchema.properties().size());
             Assertions.assertEquals("new-value", 
reloadedSchema.properties().get("key1"));
             Assertions.assertEquals("value3", 
reloadedSchema.properties().get("key3"));
           } finally {
-            catalog.asSchemas().dropSchema(schema, true);
+            catalog.asSchemas().dropSchema(schema, supportDropCascade());
           }
         });
   }
@@ -270,7 +276,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
               Row.of("A", 1.0),
               Row.of("B", 2.0));
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
@@ -303,7 +310,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
               Row.of("test_table1"),
               Row.of("test_table2"));
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
@@ -320,12 +328,11 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
           NameIdentifier identifier = NameIdentifier.of(databaseName, 
tableName);
           catalog.asTableCatalog().createTable(identifier, columns, 
"comment1", ImmutableMap.of());
           
Assertions.assertTrue(catalog.asTableCatalog().tableExists(identifier));
-
-          TableResult result = sql("DROP TABLE %s", tableName);
-          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          sql("DROP TABLE IF EXISTS %s", tableName);
           
Assertions.assertFalse(catalog.asTableCatalog().tableExists(identifier));
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
@@ -379,7 +386,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
             fail(e);
           }
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
@@ -415,7 +423,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
               };
           assertColumns(expected, actual);
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
@@ -466,6 +475,7 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
                       .asTableCatalog()
                       .loadTable(NameIdentifier.of(databaseName, tableName));
               Assertions.assertEquals(newComment, gravitinoTable.comment());
+
             } catch (DatabaseNotExistException
                 | TableAlreadyExistException
                 | TableNotExistException e) {
@@ -475,7 +485,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
             fail("Catalog doesn't exist");
           }
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
@@ -511,7 +522,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
               };
           assertColumns(expected, actual);
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
@@ -542,7 +554,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
               new Column[] {Column.of("order_amount", Types.IntegerType.get(), 
"ORDER_AMOUNT")};
           assertColumns(expected, actual);
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
@@ -584,7 +597,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
               };
           assertColumns(expected, actual);
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
@@ -612,7 +626,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
           Assertions.assertTrue(
               
catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName, 
newTableName)));
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
@@ -655,6 +670,7 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
           Assertions.assertEquals("value1", properties.get("key"));
           Assertions.assertNull(properties.get("key2"));
         },
-        true);
+        true,
+        supportDropCascade());
   }
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
index f56b5297e1..959123f336 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
@@ -19,19 +19,25 @@
 package org.apache.gravitino.flink.connector.integration.test;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.errorprone.annotations.FormatMethod;
 import com.google.errorprone.annotations.FormatString;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.function.Consumer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.ResultKind;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
 import 
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
 import org.apache.gravitino.integration.test.container.ContainerSuite;
 import org.apache.gravitino.integration.test.container.HiveContainer;
@@ -154,31 +160,56 @@ public abstract class FlinkEnvIT extends BaseIT {
   }
 
   @FormatMethod
-  protected TableResult sql(@FormatString String sql, Object... args) {
+  protected static TableResult sql(@FormatString String sql, Object... args) {
     return tableEnv.executeSql(String.format(sql, args));
   }
 
   protected void doWithSchema(
       Catalog catalog, String schemaName, Consumer<Catalog> action, boolean 
dropSchema) {
+    doWithSchema(catalog, schemaName, action, dropSchema, true);
+  }
+
+  protected void doWithSchema(
+      Catalog catalog,
+      String schemaName,
+      Consumer<Catalog> action,
+      boolean dropSchema,
+      boolean cascade) {
     Preconditions.checkNotNull(catalog);
     Preconditions.checkNotNull(schemaName);
     try {
       tableEnv.useCatalog(catalog.name());
       if (!catalog.asSchemas().schemaExists(schemaName)) {
-        catalog.asSchemas().createSchema(schemaName, null, null);
+        catalog.asSchemas().createSchema(schemaName, null, 
getCreateSchemaProps(schemaName));
       }
       tableEnv.useDatabase(schemaName);
       action.accept(catalog);
     } finally {
       if (dropSchema) {
-        catalog.asSchemas().dropSchema(schemaName, true);
+        clearTableInSchema();
+        catalog.asSchemas().dropSchema(schemaName, cascade);
       }
     }
   }
 
+  protected Map<String, String> getCreateSchemaProps(String schemaName) {
+    return null;
+  }
+
   protected static void doWithCatalog(Catalog catalog, Consumer<Catalog> 
action) {
     Preconditions.checkNotNull(catalog);
     tableEnv.useCatalog(catalog.name());
     action.accept(catalog);
   }
+
+  /** Iceberg requires deleting the table first, then deleting the schema. */
+  protected static void clearTableInSchema() {
+    TableResult result = sql("SHOW TABLES");
+    List<Row> rows = Lists.newArrayList(result.collect());
+    for (Row row : rows) {
+      String tableName = row.getField(0).toString();
+      TableResult deleteResult = sql("DROP TABLE IF EXISTS %s", tableName);
+      TestUtils.assertTableResult(deleteResult, ResultKind.SUCCESS);
+    }
+  }
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
index bb7b25f6b2..7792068e24 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
@@ -29,7 +29,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
@@ -73,7 +72,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
   private static org.apache.gravitino.Catalog hiveCatalog;
 
   @BeforeAll
-  static void hiveStartUp() {
+  void hiveStartUp() {
     initDefaultHiveCatalog();
   }
 
@@ -83,13 +82,13 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
     metalake.dropCatalog(DEFAULT_HIVE_CATALOG, true);
   }
 
-  protected static void initDefaultHiveCatalog() {
+  protected void initDefaultHiveCatalog() {
     Preconditions.checkNotNull(metalake);
     hiveCatalog =
         metalake.createCatalog(
             DEFAULT_HIVE_CATALOG,
             org.apache.gravitino.Catalog.Type.RELATIONAL,
-            "hive",
+            getProvider(),
             null,
             ImmutableMap.of("metastore.uris", hiveMetastoreUri));
   }
@@ -583,32 +582,23 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
         true);
   }
 
+  @Override
+  protected Map<String, String> getCreateSchemaProps(String schemaName) {
+    return ImmutableMap.of("location", warehouse + "/" + schemaName);
+  }
+
   @Override
   protected org.apache.gravitino.Catalog currentCatalog() {
     return hiveCatalog;
   }
 
-  protected void doWithSchema(
-      org.apache.gravitino.Catalog catalog,
-      String schemaName,
-      Consumer<org.apache.gravitino.Catalog> action,
-      boolean dropSchema) {
-    Preconditions.checkNotNull(catalog);
-    Preconditions.checkNotNull(schemaName);
-    try {
-      tableEnv.useCatalog(catalog.name());
-      if (!catalog.asSchemas().schemaExists(schemaName)) {
-        catalog
-            .asSchemas()
-            .createSchema(
-                schemaName, null, ImmutableMap.of("location", warehouse + "/" 
+ schemaName));
-      }
-      tableEnv.useDatabase(schemaName);
-      action.accept(catalog);
-    } finally {
-      if (dropSchema) {
-        catalog.asSchemas().dropSchema(schemaName, true);
-      }
-    }
+  @Override
+  protected String getProvider() {
+    return "hive";
+  }
+
+  @Override
+  protected boolean supportDropCascade() {
+    return true;
   }
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
similarity index 61%
copy from 
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
copy to 
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
index bb7b25f6b2..0834def90b 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.flink.connector.integration.test.hive;
+
+package org.apache.gravitino.flink.connector.integration.test.iceberg;
 
 import static 
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns;
 import static 
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn;
@@ -25,18 +26,13 @@ import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_T
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ResultKind;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDescriptor;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -44,13 +40,12 @@ import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.catalog.DefaultCatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
 import org.apache.flink.types.Row;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.catalog.hive.HiveConstants;
-import org.apache.gravitino.flink.connector.PropertiesConverter;
-import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalog;
-import 
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalog;
+import 
org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactoryOptions;
 import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT;
 import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
 import org.apache.gravitino.rel.Column;
@@ -58,54 +53,41 @@ import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.types.Types;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
-@Tag("gravitino-docker-test")
-public class FlinkHiveCatalogIT extends FlinkCommonIT {
-  private static final String DEFAULT_HIVE_CATALOG = 
"test_flink_hive_schema_catalog";
+public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT {
 
-  private static org.apache.gravitino.Catalog hiveCatalog;
+  private static final String DEFAULT_ICEBERG_CATALOG = 
"flink_iceberg_catalog";
 
-  @BeforeAll
-  static void hiveStartUp() {
-    initDefaultHiveCatalog();
-  }
-
-  @AfterAll
-  static void hiveStop() {
-    Preconditions.checkNotNull(metalake);
-    metalake.dropCatalog(DEFAULT_HIVE_CATALOG, true);
-  }
+  private static org.apache.gravitino.Catalog icebergCatalog;
 
-  protected static void initDefaultHiveCatalog() {
+  @BeforeAll
+  public void before() {
     Preconditions.checkNotNull(metalake);
-    hiveCatalog =
+    icebergCatalog =
         metalake.createCatalog(
-            DEFAULT_HIVE_CATALOG,
+            DEFAULT_ICEBERG_CATALOG,
             org.apache.gravitino.Catalog.Type.RELATIONAL,
-            "hive",
+            getProvider(),
             null,
-            ImmutableMap.of("metastore.uris", hiveMetastoreUri));
+            getCatalogConfigs());
   }
 
+  protected abstract Map<String, String> getCatalogConfigs();
+
   @Test
-  public void testCreateGravitinoHiveCatalog() {
+  public void testCreateGravitinoIcebergCatalog() {
     tableEnv.useCatalog(DEFAULT_CATALOG);
     int numCatalogs = tableEnv.listCatalogs().length;
 
     // Create a new catalog.
-    String catalogName = "gravitino_hive";
-    Configuration configuration = new Configuration();
+    String catalogName = "gravitino_iceberg_catalog";
+    Configuration configuration = Configuration.fromMap(getCatalogConfigs());
     configuration.set(
-        CommonCatalogOptions.CATALOG_TYPE, 
GravitinoHiveCatalogFactoryOptions.IDENTIFIER);
-    configuration.set(HiveCatalogFactoryOptions.HIVE_CONF_DIR, 
"src/test/resources/flink-tests");
-    configuration.set(GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS, 
hiveMetastoreUri);
+        CommonCatalogOptions.CATALOG_TYPE, 
GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
+
     CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName, 
configuration);
     tableEnv.createCatalog(catalogName, catalogDescriptor);
     Assertions.assertTrue(metalake.catalogExists(catalogName));
@@ -113,23 +95,12 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
     // Check the catalog properties.
     org.apache.gravitino.Catalog gravitinoCatalog = 
metalake.loadCatalog(catalogName);
     Map<String, String> properties = gravitinoCatalog.properties();
-    Assertions.assertEquals(hiveMetastoreUri, 
properties.get(HiveConstants.METASTORE_URIS));
-    Map<String, String> flinkProperties =
-        gravitinoCatalog.properties().entrySet().stream()
-            .filter(e -> 
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
-            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-    Assertions.assertEquals(2, flinkProperties.size());
-    Assertions.assertEquals(
-        "src/test/resources/flink-tests",
-        
flinkProperties.get(flinkByPass(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key())));
-    Assertions.assertEquals(
-        GravitinoHiveCatalogFactoryOptions.IDENTIFIER,
-        
flinkProperties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key())));
+    Assertions.assertEquals(hiveMetastoreUri, 
properties.get(IcebergConstants.URI));
 
     // Get the created catalog.
     Optional<org.apache.flink.table.catalog.Catalog> catalog = 
tableEnv.getCatalog(catalogName);
     Assertions.assertTrue(catalog.isPresent());
-    Assertions.assertInstanceOf(GravitinoHiveCatalog.class, catalog.get());
+    Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, catalog.get());
 
     // List catalogs.
     String[] catalogs = tableEnv.listCatalogs();
@@ -154,52 +125,46 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
     tableEnv.executeSql("drop catalog " + catalogName);
     Assertions.assertFalse(metalake.catalogExists(catalogName));
 
-    Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName);
+    Optional<org.apache.flink.table.catalog.Catalog> droppedCatalog =
+        tableEnv.getCatalog(catalogName);
     Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be 
dropped");
   }
 
   @Test
-  public void testCreateGravitinoHiveCatalogUsingSQL() {
+  public void testCreateGravitinoIcebergUsingSQL() {
     tableEnv.useCatalog(DEFAULT_CATALOG);
     int numCatalogs = tableEnv.listCatalogs().length;
 
     // Create a new catalog.
-    String catalogName = "gravitino_hive_sql";
+    String catalogName = "gravitino_iceberg_using_sql";
     tableEnv.executeSql(
         String.format(
             "create catalog %s with ("
-                + "'type'='gravitino-hive', "
-                + "'hive-conf-dir'='src/test/resources/flink-tests',"
-                + "'hive.metastore.uris'='%s',"
-                + "'unknown.key'='unknown.value'"
+                + "'type'='%s', "
+                + "'catalog-backend'='%s',"
+                + "'uri'='%s',"
+                + "'warehouse'='%s'"
                 + ")",
-            catalogName, hiveMetastoreUri));
+            catalogName,
+            GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
+            getCatalogBackend(),
+            hiveMetastoreUri,
+            warehouse));
     Assertions.assertTrue(metalake.catalogExists(catalogName));
 
     // Check the properties of the created catalog.
     org.apache.gravitino.Catalog gravitinoCatalog = 
metalake.loadCatalog(catalogName);
     Map<String, String> properties = gravitinoCatalog.properties();
-    Assertions.assertEquals(hiveMetastoreUri, 
properties.get(HiveConstants.METASTORE_URIS));
-    Map<String, String> flinkProperties =
-        properties.entrySet().stream()
-            .filter(e -> 
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
-            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-    Assertions.assertEquals(3, flinkProperties.size());
-    Assertions.assertEquals(
-        "src/test/resources/flink-tests",
-        
flinkProperties.get(flinkByPass(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key())));
-    Assertions.assertEquals(
-        GravitinoHiveCatalogFactoryOptions.IDENTIFIER,
-        
flinkProperties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key())));
+    Assertions.assertEquals(hiveMetastoreUri, 
properties.get(IcebergConstants.URI));
+
     Assertions.assertEquals(
-        "unknown.value",
-        flinkProperties.get(flinkByPass("unknown.key")),
-        "The unknown.key will not cause failure and will be saved in 
Gravitino.");
+        GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
+        properties.get(CommonCatalogOptions.CATALOG_TYPE.key()));
 
     // Get the created catalog.
     Optional<org.apache.flink.table.catalog.Catalog> catalog = 
tableEnv.getCatalog(catalogName);
     Assertions.assertTrue(catalog.isPresent());
-    Assertions.assertInstanceOf(GravitinoHiveCatalog.class, catalog.get());
+    Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, catalog.get());
 
     // List catalogs.
     String[] catalogs = tableEnv.listCatalogs();
@@ -229,52 +194,25 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
     tableEnv.executeSql("drop catalog " + catalogName);
     Assertions.assertFalse(metalake.catalogExists(catalogName));
 
-    Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName);
+    Optional<org.apache.flink.table.catalog.Catalog> droppedCatalog =
+        tableEnv.getCatalog(catalogName);
     Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be 
dropped");
   }
 
-  @Test
-  public void testCreateGravitinoHiveCatalogRequireOptions() {
-    tableEnv.useCatalog(DEFAULT_CATALOG);
-
-    // Failed to create the catalog for missing the required options.
-    String catalogName = "gravitino_hive_sql2";
-    Assertions.assertThrows(
-        ValidationException.class,
-        () -> {
-          tableEnv.executeSql(
-              String.format(
-                  "create catalog %s with ("
-                      + "'type'='gravitino-hive', "
-                      + "'hive-conf-dir'='src/test/resources/flink-tests'"
-                      + ")",
-                  catalogName));
-        },
-        "The hive.metastore.uris is required.");
-
-    Assertions.assertFalse(metalake.catalogExists(catalogName));
-  }
-
   @Test
   public void testGetCatalogFromGravitino() {
     // list catalogs.
     int numCatalogs = tableEnv.listCatalogs().length;
 
     // create a new catalog.
-    String catalogName = "hive_catalog_in_gravitino";
+    String catalogName = "iceberg_catalog_in_gravitino";
     org.apache.gravitino.Catalog gravitinoCatalog =
         metalake.createCatalog(
             catalogName,
             org.apache.gravitino.Catalog.Type.RELATIONAL,
-            "hive",
+            getProvider(),
             null,
-            ImmutableMap.of(
-                "flink.bypass.hive-conf-dir",
-                "src/test/resources/flink-tests",
-                "flink.bypass.hive.test",
-                "hive.config",
-                "metastore.uris",
-                hiveMetastoreUri));
+            getCatalogConfigs());
     Assertions.assertNotNull(gravitinoCatalog);
     Assertions.assertEquals(catalogName, gravitinoCatalog.name());
     Assertions.assertTrue(metalake.catalogExists(catalogName));
@@ -282,15 +220,10 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
         numCatalogs + 1, tableEnv.listCatalogs().length, "Should create a new 
catalog");
 
     // get the catalog from Gravitino.
-    Optional<Catalog> flinkHiveCatalog = tableEnv.getCatalog(catalogName);
-    Assertions.assertTrue(flinkHiveCatalog.isPresent());
-    Assertions.assertInstanceOf(GravitinoHiveCatalog.class, 
flinkHiveCatalog.get());
-    GravitinoHiveCatalog gravitinoHiveCatalog = (GravitinoHiveCatalog) 
flinkHiveCatalog.get();
-    HiveConf hiveConf = gravitinoHiveCatalog.getHiveConf();
-    Assertions.assertTrue(hiveConf.size() > 0, "Should have hive conf");
-    Assertions.assertEquals("hive.config", hiveConf.get("hive.test"));
-    Assertions.assertEquals(
-        hiveMetastoreUri, 
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
+    Optional<org.apache.flink.table.catalog.Catalog> flinkIcebergCatalog =
+        tableEnv.getCatalog(catalogName);
+    Assertions.assertTrue(flinkIcebergCatalog.isPresent());
+    Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, 
flinkIcebergCatalog.get());
 
     // drop the catalog.
     tableEnv.useCatalog(DEFAULT_CATALOG);
@@ -301,123 +234,99 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
   }
 
   @Test
-  public void testHivePartitionTable() {
-    String databaseName = "test_create_hive_partition_table_db";
-    String tableName = "test_create_hive_partition_table";
-    String comment = "test comment";
+  public void testIcebergTableWithPartition() {
+    String databaseName = "test_iceberg_table_partition";
+    String tableName = "iceberg_table_with_partition";
     String key = "test key";
     String value = "test value";
 
     doWithSchema(
-        currentCatalog(),
+        icebergCatalog,
         databaseName,
         catalog -> {
           TableResult result =
               sql(
-                  "CREATE TABLE %s "
-                      + "(string_type STRING COMMENT 'string_type', "
-                      + " double_type DOUBLE COMMENT 'double_type')"
-                      + " COMMENT '%s' "
-                      + " PARTITIONED BY (string_type, double_type)"
+                  "CREATE TABLE %s ("
+                      + " id BIGINT COMMENT 'unique id',"
+                      + " data STRING NOT NULL"
+                      + " ) PARTITIONED BY (data)"
                       + " WITH ("
                       + "'%s' = '%s')",
-                  tableName, comment, key, value);
+                  tableName, key, value);
           TestUtils.assertTableResult(result, ResultKind.SUCCESS);
 
           Table table =
               
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
           Assertions.assertNotNull(table);
-          Assertions.assertEquals(comment, table.comment());
           Assertions.assertEquals(value, table.properties().get(key));
           Column[] columns =
               new Column[] {
-                Column.of("string_type", Types.StringType.get(), 
"string_type", true, false, null),
-                Column.of("double_type", Types.DoubleType.get(), "double_type")
+                Column.of("id", Types.LongType.get(), "unique id", true, 
false, null),
+                Column.of("data", Types.StringType.get(), null, false, false, 
null)
               };
           assertColumns(columns, table.columns());
-          Transform[] partitions =
-              new Transform[] {
-                Transforms.identity("string_type"), 
Transforms.identity("double_type")
-              };
+          Transform[] partitions = new Transform[] 
{Transforms.identity("data")};
           Assertions.assertArrayEquals(partitions, table.partitioning());
 
           // load flink catalog
           try {
-            Catalog flinkCatalog = 
tableEnv.getCatalog(currentCatalog().name()).get();
+            org.apache.flink.table.catalog.Catalog flinkCatalog =
+                tableEnv.getCatalog(currentCatalog().name()).get();
             CatalogBaseTable flinkTable =
                 flinkCatalog.getTable(ObjectPath.fromString(databaseName + "." 
+ tableName));
             DefaultCatalogTable catalogTable = (DefaultCatalogTable) 
flinkTable;
             Assertions.assertTrue(catalogTable.isPartitioned());
             Assertions.assertArrayEquals(
-                new String[] {"string_type", "double_type"},
-                catalogTable.getPartitionKeys().toArray());
+                new String[] {"data"}, 
catalogTable.getPartitionKeys().toArray());
           } catch (Exception e) {
             Assertions.fail("Table should be exist", e);
           }
 
           // write and read.
           TestUtils.assertTableResult(
-              sql("INSERT INTO %s VALUES ('A', 1.0), ('B', 2.0)", tableName),
+              sql("INSERT INTO %s VALUES (1, 'A'), (2, 'B')", tableName),
               ResultKind.SUCCESS_WITH_CONTENT,
               Row.of(-1L));
           TestUtils.assertTableResult(
-              sql("SELECT * FROM %s ORDER BY double_type", tableName),
+              sql("SELECT * FROM %s ORDER BY data", tableName),
               ResultKind.SUCCESS_WITH_CONTENT,
-              Row.of("A", 1.0),
-              Row.of("B", 2.0));
-          try {
-            Assertions.assertTrue(
-                hdfs.exists(
-                    new Path(
-                        table.properties().get("location") + 
"/string_type=A/double_type=1.0")));
-            Assertions.assertTrue(
-                hdfs.exists(
-                    new Path(
-                        table.properties().get("location") + 
"/string_type=B/double_type=2.0")));
-          } catch (IOException e) {
-            Assertions.fail("The partition directory should be exist.", e);
-          }
+              Row.of(1, "A"),
+              Row.of(2, "B"));
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
-  public void testCreateHiveTable() {
-    String databaseName = "test_create_hive_table_db";
-    String tableName = "test_create_hive_table";
-    String comment = "test comment";
+  public void testCreateIcebergTable() {
+    String databaseName = "test_create_iceberg_table";
+    String tableName = "iceberg_table";
+    String comment = "test table comment";
     String key = "test key";
     String value = "test value";
 
-    // 1. The NOT NULL constraint for column is only supported since Hive 3.0,
-    // but the current Gravitino Hive catalog only supports Hive 2.x.
-    // 2. Hive doesn't support Time and Timestamp with timezone type.
-    // 3. Flink SQL only support to create Interval Month and Second(3).
     doWithSchema(
-        metalake.loadCatalog(DEFAULT_HIVE_CATALOG),
+        metalake.loadCatalog(DEFAULT_ICEBERG_CATALOG),
         databaseName,
         catalog -> {
           TableResult result =
               sql(
-                  "CREATE TABLE %s "
-                      + "(string_type STRING COMMENT 'string_type', "
+                  "CREATE TABLE %s ("
+                      + " string_type STRING COMMENT 'string_type', "
                       + " double_type DOUBLE COMMENT 'double_type',"
                       + " int_type INT COMMENT 'int_type',"
                       + " varchar_type VARCHAR COMMENT 'varchar_type',"
-                      + " char_type CHAR COMMENT 'char_type',"
                       + " boolean_type BOOLEAN COMMENT 'boolean_type',"
-                      + " byte_type TINYINT COMMENT 'byte_type',"
                       + " binary_type VARBINARY(10) COMMENT 'binary_type',"
                       + " decimal_type DECIMAL(10, 2) COMMENT 'decimal_type',"
                       + " bigint_type BIGINT COMMENT 'bigint_type',"
                       + " float_type FLOAT COMMENT 'float_type',"
                       + " date_type DATE COMMENT 'date_type',"
                       + " timestamp_type TIMESTAMP COMMENT 'timestamp_type',"
-                      + " smallint_type SMALLINT COMMENT 'smallint_type',"
                       + " array_type ARRAY<INT> COMMENT 'array_type',"
                       + " map_type MAP<INT, STRING> COMMENT 'map_type',"
-                      + " struct_type ROW<k1 INT, k2 String>)"
-                      + " COMMENT '%s' WITH ("
+                      + " struct_type ROW<k1 INT, k2 String>"
+                      + " ) COMMENT '%s' WITH ("
                       + "'%s' = '%s')",
                   tableName, comment, key, value);
           TestUtils.assertTableResult(result, ResultKind.SUCCESS);
@@ -433,9 +342,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
                 Column.of("double_type", Types.DoubleType.get(), 
"double_type"),
                 Column.of("int_type", Types.IntegerType.get(), "int_type"),
                 Column.of("varchar_type", Types.StringType.get(), 
"varchar_type"),
-                Column.of("char_type", Types.FixedCharType.of(1), "char_type"),
                 Column.of("boolean_type", Types.BooleanType.get(), 
"boolean_type"),
-                Column.of("byte_type", Types.ByteType.get(), "byte_type"),
                 Column.of("binary_type", Types.BinaryType.get(), 
"binary_type"),
                 Column.of("decimal_type", Types.DecimalType.of(10, 2), 
"decimal_type"),
                 Column.of("bigint_type", Types.LongType.get(), "bigint_type"),
@@ -443,7 +350,6 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
                 Column.of("date_type", Types.DateType.get(), "date_type"),
                 Column.of(
                     "timestamp_type", Types.TimestampType.withoutTimeZone(), 
"timestamp_type"),
-                Column.of("smallint_type", Types.ShortType.get(), 
"smallint_type"),
                 Column.of(
                     "array_type", Types.ListType.of(Types.IntegerType.get(), 
true), "array_type"),
                 Column.of(
@@ -460,28 +366,25 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
           assertColumns(columns, table.columns());
           Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning());
         },
-        true);
+        true,
+        supportDropCascade());
   }
 
   @Test
-  public void testGetHiveTable() {
+  public void testGetIcebergTable() {
     Column[] columns =
         new Column[] {
           Column.of("string_type", Types.StringType.get(), "string_type", 
true, false, null),
           Column.of("double_type", Types.DoubleType.get(), "double_type"),
           Column.of("int_type", Types.IntegerType.get(), "int_type"),
           Column.of("varchar_type", Types.StringType.get(), "varchar_type"),
-          Column.of("char_type", Types.FixedCharType.of(1), "char_type"),
           Column.of("boolean_type", Types.BooleanType.get(), "boolean_type"),
-          Column.of("byte_type", Types.ByteType.get(), "byte_type"),
           Column.of("binary_type", Types.BinaryType.get(), "binary_type"),
           Column.of("decimal_type", Types.DecimalType.of(10, 2), 
"decimal_type"),
           Column.of("bigint_type", Types.LongType.get(), "bigint_type"),
           Column.of("float_type", Types.FloatType.get(), "float_type"),
           Column.of("date_type", Types.DateType.get(), "date_type"),
           Column.of("timestamp_type", Types.TimestampType.withoutTimeZone(), 
"timestamp_type"),
-          Column.of("smallint_type", Types.ShortType.get(), "smallint_type"),
-          Column.of("fixed_char_type", Types.FixedCharType.of(10), 
"fixed_char_type"),
           Column.of("array_type", Types.ListType.of(Types.IntegerType.get(), 
true), "array_type"),
           Column.of(
               "map_type",
@@ -495,9 +398,9 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
               null)
         };
 
-    String databaseName = "test_get_hive_table_db";
+    String databaseName = "test_get_iceberg_table";
     doWithSchema(
-        metalake.loadCatalog(DEFAULT_HIVE_CATALOG),
+        metalake.loadCatalog(DEFAULT_ICEBERG_CATALOG),
         databaseName,
         catalog -> {
           String tableName = "test_desc_table";
@@ -511,7 +414,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
                   ImmutableMap.of("k1", "v1"));
 
           Optional<org.apache.flink.table.catalog.Catalog> flinkCatalog =
-              tableEnv.getCatalog(DEFAULT_HIVE_CATALOG);
+              tableEnv.getCatalog(DEFAULT_ICEBERG_CATALOG);
           Assertions.assertTrue(flinkCatalog.isPresent());
           try {
             CatalogBaseTable table =
@@ -531,13 +434,9 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
                   org.apache.flink.table.catalog.Column.physical(
                           "varchar_type", DataTypes.VARCHAR(Integer.MAX_VALUE))
                       .withComment("varchar_type"),
-                  org.apache.flink.table.catalog.Column.physical("char_type", 
DataTypes.CHAR(1))
-                      .withComment("char_type"),
                   org.apache.flink.table.catalog.Column.physical(
                           "boolean_type", DataTypes.BOOLEAN())
                       .withComment("boolean_type"),
-                  org.apache.flink.table.catalog.Column.physical("byte_type", 
DataTypes.TINYINT())
-                      .withComment("byte_type"),
                   
org.apache.flink.table.catalog.Column.physical("binary_type", DataTypes.BYTES())
                       .withComment("binary_type"),
                   org.apache.flink.table.catalog.Column.physical(
@@ -552,12 +451,6 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
                   org.apache.flink.table.catalog.Column.physical(
                           "timestamp_type", DataTypes.TIMESTAMP())
                       .withComment("timestamp_type"),
-                  org.apache.flink.table.catalog.Column.physical(
-                          "smallint_type", DataTypes.SMALLINT())
-                      .withComment("smallint_type"),
-                  org.apache.flink.table.catalog.Column.physical(
-                          "fixed_char_type", DataTypes.CHAR(10))
-                      .withComment("fixed_char_type"),
                   org.apache.flink.table.catalog.Column.physical(
                           "array_type", DataTypes.ARRAY(DataTypes.INT()))
                       .withComment("array_type"),
@@ -580,35 +473,29 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
             Assertions.fail(e);
           }
         },
-        true);
+        true,
+        supportDropCascade());
+  }
+
+  @Override
+  protected Catalog currentCatalog() {
+    return icebergCatalog;
   }
 
   @Override
-  protected org.apache.gravitino.Catalog currentCatalog() {
-    return hiveCatalog;
+  protected String getProvider() {
+    return "lakehouse-iceberg";
   }
 
-  protected void doWithSchema(
-      org.apache.gravitino.Catalog catalog,
-      String schemaName,
-      Consumer<org.apache.gravitino.Catalog> action,
-      boolean dropSchema) {
-    Preconditions.checkNotNull(catalog);
-    Preconditions.checkNotNull(schemaName);
-    try {
-      tableEnv.useCatalog(catalog.name());
-      if (!catalog.asSchemas().schemaExists(schemaName)) {
-        catalog
-            .asSchemas()
-            .createSchema(
-                schemaName, null, ImmutableMap.of("location", warehouse + "/" 
+ schemaName));
-      }
-      tableEnv.useDatabase(schemaName);
-      action.accept(catalog);
-    } finally {
-      if (dropSchema) {
-        catalog.asSchemas().dropSchema(schemaName, true);
-      }
-    }
+  @Override
+  protected boolean supportGetSchemaWithoutCommentAndOption() {
+    return false;
   }
+
+  @Override
+  protected boolean supportDropCascade() {
+    return false;
+  }
+
+  protected abstract String getCatalogBackend();
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java
new file mode 100644
index 0000000000..fc21ce2c24
--- /dev/null
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.integration.test.iceberg;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants;
+import org.junit.jupiter.api.Tag;
+
+@Tag("gravitino-docker-test")
+public class FlinkIcebergHiveCatalogIT extends FlinkIcebergCatalogIT {
+
+  @Override
+  protected Map<String, String> getCatalogConfigs() {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    catalogProperties.put(
+        IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
+        IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE);
+    catalogProperties.put(
+        IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, 
warehouse);
+    catalogProperties.put(
+        IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, 
hiveMetastoreUri);
+    return catalogProperties;
+  }
+
+  protected String getCatalogBackend() {
+    return "hive";
+  }
+}
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
index 57a17c2a11..a03b4a198e 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java
@@ -47,12 +47,22 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
     return false;
   }
 
+  @Override
+  protected String getProvider() {
+    return "lakehouse-paimon";
+  }
+
+  @Override
+  protected boolean supportDropCascade() {
+    return true;
+  }
+
   protected Catalog currentCatalog() {
     return catalog;
   }
 
   @BeforeAll
-  static void setup() {
+  void setup() {
     initPaimonCatalog();
   }
 
@@ -62,13 +72,13 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
     metalake.dropCatalog(DEFAULT_PAIMON_CATALOG, true);
   }
 
-  private static void initPaimonCatalog() {
+  private void initPaimonCatalog() {
     Preconditions.checkNotNull(metalake);
     catalog =
         metalake.createCatalog(
             DEFAULT_PAIMON_CATALOG,
             org.apache.gravitino.Catalog.Type.RELATIONAL,
-            "lakehouse-paimon",
+            getProvider(),
             null,
             ImmutableMap.of(
                 PaimonConstants.CATALOG_BACKEND,
@@ -98,4 +108,9 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {
     Map<String, String> properties = gravitinoCatalog.properties();
     Assertions.assertEquals(warehouse, properties.get("warehouse"));
   }
+
+  @Override
+  protected Map<String, String> getCreateSchemaProps(String schemaName) {
+    return null;
+  }
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java
index ba16a9c07b..02710bcfb3 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java
@@ -42,7 +42,8 @@ public class TestUtils {
         Row expectedRow = expected[i];
         Row actualRow = actualRows.get(i);
         Assertions.assertEquals(expectedRow.getKind(), actualRow.getKind());
-        Assertions.assertEquals(expectedRow, actualRow);
+        // Only compare string value.
+        Assertions.assertEquals(expectedRow.toString(), actualRow.toString());
       }
     }
   }

Reply via email to