This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push: new fd26b565ae [#3515] feat(flink-connector): Support flink iceberg catalog (#5914) fd26b565ae is described below commit fd26b565aef1527fcd126b89c4e110180fa00a83 Author: Xiaojian Sun <sunxiaojian...@163.com> AuthorDate: Thu Jan 16 15:56:55 2025 +0800 [#3515] feat(flink-connector): Support flink iceberg catalog (#5914) ### What changes were proposed in this pull request? Support flink iceberg catalog ### Why are the changes needed? Fix: [#3515](https://github.com/apache/gravitino/issues/3515) ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? FlinkIcebergCatalogIT FlinkIcebergHiveCatalogIT --- docs/flink-connector/flink-catalog-iceberg.md | 78 +++++ flink-connector/flink/build.gradle.kts | 6 + .../connector/catalog/GravitinoCatalogManager.java | 11 - .../connector/iceberg/GravitinoIcebergCatalog.java | 67 +++++ .../iceberg/GravitinoIcebergCatalogFactory.java | 95 ++++++ .../GravitinoIcebergCatalogFactoryOptions.java | 33 +++ .../iceberg/IcebergPropertiesConstants.java | 49 ++++ .../iceberg/IcebergPropertiesConverter.java | 84 ++++++ .../org.apache.flink.table.factories.Factory | 3 +- .../iceberg/TestIcebergPropertiesConverter.java | 82 ++++++ .../connector/integration/test/FlinkCommonIT.java | 62 ++-- .../connector/integration/test/FlinkEnvIT.java | 37 ++- .../integration/test/hive/FlinkHiveCatalogIT.java | 42 +-- .../FlinkIcebergCatalogIT.java} | 325 +++++++-------------- .../test/iceberg/FlinkIcebergHiveCatalogIT.java | 46 +++ .../test/paimon/FlinkPaimonCatalogIT.java | 21 +- .../integration/test/utils/TestUtils.java | 3 +- 17 files changed, 757 insertions(+), 287 deletions(-) diff --git a/docs/flink-connector/flink-catalog-iceberg.md b/docs/flink-connector/flink-catalog-iceberg.md new file mode 100644 index 0000000000..54d7c0879f --- /dev/null +++ b/docs/flink-connector/flink-catalog-iceberg.md @@ -0,0 +1,78 @@ +--- +title: "Flink connector Iceberg catalog" +slug: /flink-connector/flink-catalog-iceberg +keyword: flink connector iceberg catalog +license: "This software is licensed under the Apache License version 2." +--- + +The Apache Gravitino Flink connector can be used to read and write Iceberg tables, with the metadata managed by the Gravitino server. +To enable the Flink connector, you must download the Iceberg Flink runtime JAR and place it in the Flink classpath. + +## Capabilities + +#### Supported DML and DDL operations: + +- `CREATE CATALOG` +- `CREATE DATABASE` +- `CREATE TABLE` +- `DROP TABLE` +- `ALTER TABLE` +- `INSERT INTO & OVERWRITE` +- `SELECT` + +#### Operations not supported: + +- Partition operations +- View operations +- Metadata tables, like: + - `{iceberg_catalog}.{iceberg_database}.{iceberg_table}&snapshots` +- Query UDF +- `UPDATE` clause +- `DELETE` clause +- `CREATE TABLE LIKE` clause + +## SQL example +```sql + +-- Suppose iceberg_a is the Iceberg catalog name managed by Gravitino + +USE iceberg_a; + +CREATE DATABASE IF NOT EXISTS mydatabase; +USE mydatabase; + +CREATE TABLE sample ( + id BIGINT COMMENT 'unique id', + data STRING NOT NULL +) PARTITIONED BY (data) +WITH ('format-version'='2'); + +INSERT INTO sample +VALUES (1, 'A'), (2, 'B'); + +SELECT * FROM sample WHERE data = 'B'; + +``` + +## Catalog properties + +The Gravitino Flink connector transforms the following properties in a catalog to Flink connector configuration. + + +| Gravitino catalog property name | Flink Iceberg connector configuration | Description | Since Version | +|---------------------------------|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------| +| `catalog-backend` | `catalog-type` | Catalog backend type, currently, only `Hive` Catalog is supported, `JDBC` and `Rest` in Continuous Validation | 0.8.0-incubating | +| `uri` | `uri` | Catalog backend URI | 0.8.0-incubating | +| `warehouse` | `warehouse` | Catalog backend warehouse | 0.8.0-incubating | +| `io-impl` | `io-impl` | The IO implementation for `FileIO` in Iceberg. | 0.8.0-incubating | +| `oss-endpoint` | `oss.endpoint` | The endpoint of Aliyun OSS service. | 0.8.0-incubating | +| `oss-access-key-id` | `client.access-key-id` | The static access key ID used to access OSS data. | 0.8.0-incubating | +| `oss-secret-access-key` | `client.access-key-secret` | The static secret access key used to access OSS data. | 0.8.0-incubating | + +Gravitino catalog property names with the prefix `flink.bypass.` are passed to Flink iceberg connector. For example, using `flink.bypass.clients` to pass the `clients` to the Flink iceberg connector. + +## Storage + +### OSS + +Additionally, you need download the [Aliyun OSS SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip), and copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` to the Flink classpath. diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index f137a3eae1..4c9bd036ae 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -30,6 +30,8 @@ var paimonVersion: String = libs.versions.paimon.get() val flinkVersion: String = libs.versions.flink.get() val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".") +val icebergVersion: String = libs.versions.iceberg.get() + // The Flink only support scala 2.12, and all scala api will be removed in a future version. // You can find more detail at the following issues: // https://issues.apache.org/jira/browse/FLINK-23986, @@ -44,6 +46,8 @@ dependencies { implementation(libs.guava) compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) + + compileOnly("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion") compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") compileOnly("org.apache.flink:flink-table-common:$flinkVersion") compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion") @@ -88,7 +92,9 @@ dependencies { testImplementation(libs.testcontainers) testImplementation(libs.testcontainers.junit.jupiter) testImplementation(libs.testcontainers.mysql) + testImplementation(libs.metrics.core) + testImplementation("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion") testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") testImplementation("org.apache.flink:flink-table-common:$flinkVersion") testImplementation("org.apache.flink:flink-table-api-java:$flinkVersion") diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java index 7693e5d4c9..0b0b89f3a5 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java @@ -35,12 +35,10 @@ public class GravitinoCatalogManager { private static GravitinoCatalogManager gravitinoCatalogManager; private volatile boolean isClosed = false; - private final String metalakeName; private final GravitinoMetalake metalake; private final GravitinoAdminClient gravitinoClient; private GravitinoCatalogManager(String gravitinoUri, String metalakeName) { - this.metalakeName = metalakeName; this.gravitinoClient = GravitinoAdminClient.builder(gravitinoUri).build(); this.metalake = gravitinoClient.loadMetalake(metalakeName); } @@ -99,15 +97,6 @@ public class GravitinoCatalogManager { return catalog; } - /** - * Get the metalake. - * - * @return the metalake name. - */ - public String getMetalakeName() { - return metalakeName; - } - /** * Create catalog in Gravitino. * diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java new file mode 100644 index 0000000000..30fac96bbc --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.iceberg; + +import java.util.Map; +import java.util.Optional; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.Factory; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalog; +import org.apache.iceberg.flink.FlinkCatalog; +import org.apache.iceberg.flink.FlinkCatalogFactory; + +/** Gravitino Iceberg Catalog. */ +public class GravitinoIcebergCatalog extends BaseCatalog { + + private final FlinkCatalog icebergCatalog; + + protected GravitinoIcebergCatalog( + String catalogName, + String defaultDatabase, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter, + Map<String, String> properties) { + super(catalogName, defaultDatabase, propertiesConverter, partitionConverter); + FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory(); + this.icebergCatalog = (FlinkCatalog) flinkCatalogFactory.createCatalog(catalogName, properties); + } + + @Override + public void open() throws CatalogException { + icebergCatalog.open(); + } + + @Override + public void close() throws CatalogException { + icebergCatalog.close(); + } + + @Override + public Optional<Factory> getFactory() { + return icebergCatalog.getFactory(); + } + + @Override + protected AbstractCatalog realCatalog() { + return icebergCatalog; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java new file mode 100644 index 0000000000..ad0363d986 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.iceberg; + +import java.util.Collections; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.gravitino.flink.connector.DefaultPartitionConverter; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; +import org.apache.gravitino.flink.connector.utils.FactoryUtils; + +public class GravitinoIcebergCatalogFactory implements BaseCatalogFactory { + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtils.createCatalogFactoryHelper(this, context); + return new GravitinoIcebergCatalog( + context.getName(), + helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE), + propertiesConverter(), + partitionConverter(), + context.getOptions()); + } + + @Override + public String factoryIdentifier() { + return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return Collections.emptySet(); + } + + /** + * Define gravitino catalog provider. + * + * @return + */ + @Override + public String gravitinoCatalogProvider() { + return "lakehouse-iceberg"; + } + + /** + * Define gravitino catalog type. + * + * @return + */ + @Override + public org.apache.gravitino.Catalog.Type gravitinoCatalogType() { + return org.apache.gravitino.Catalog.Type.RELATIONAL; + } + + /** + * Define properties converter. + * + * @return + */ + @Override + public PropertiesConverter propertiesConverter() { + return IcebergPropertiesConverter.INSTANCE; + } + + @Override + public PartitionConverter partitionConverter() { + return DefaultPartitionConverter.INSTANCE; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java new file mode 100644 index 0000000000..95e1a21de8 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.flink.FlinkCatalogFactory; + +public class GravitinoIcebergCatalogFactoryOptions { + + public static final String IDENTIFIER = "gravitino-iceberg"; + public static final ConfigOption<String> DEFAULT_DATABASE = + ConfigOptions.key(FlinkCatalogFactory.DEFAULT_DATABASE) + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME); +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java new file mode 100644 index 0000000000..163cfac882 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.flink.FlinkCatalogFactory; + +public class IcebergPropertiesConstants { + @VisibleForTesting + public static String GRAVITINO_ICEBERG_CATALOG_BACKEND = IcebergConstants.CATALOG_BACKEND; + + public static final String ICEBERG_CATALOG_TYPE = FlinkCatalogFactory.ICEBERG_CATALOG_TYPE; + + public static final String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE = IcebergConstants.WAREHOUSE; + + public static final String ICEBERG_CATALOG_WAREHOUSE = CatalogProperties.WAREHOUSE_LOCATION; + + public static final String GRAVITINO_ICEBERG_CATALOG_URI = IcebergConstants.URI; + + public static final String ICEBERG_CATALOG_URI = CatalogProperties.URI; + + @VisibleForTesting + public static String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE; + + public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; + + @VisibleForTesting + public static final String ICEBERG_CATALOG_BACKEND_REST = CatalogUtil.ICEBERG_CATALOG_TYPE_REST; +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java new file mode 100644 index 0000000000..7684d3eadb --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils; +import org.apache.gravitino.flink.connector.PropertiesConverter; + +public class IcebergPropertiesConverter implements PropertiesConverter { + public static IcebergPropertiesConverter INSTANCE = new IcebergPropertiesConverter(); + + private IcebergPropertiesConverter() {} + + private static final Map<String, String> GRAVITINO_CONFIG_TO_FLINK_ICEBERG = + ImmutableMap.of( + IcebergConstants.CATALOG_BACKEND, IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE); + + @Override + public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) { + Preconditions.checkArgument( + gravitinoProperties != null, "Iceberg Catalog properties should not be null."); + + Map<String, String> all = new HashMap<>(); + if (gravitinoProperties != null) { + gravitinoProperties.forEach( + (k, v) -> { + if (k.startsWith(FLINK_PROPERTY_PREFIX)) { + String newKey = k.substring(FLINK_PROPERTY_PREFIX.length()); + all.put(newKey, v); + } + }); + } + Map<String, String> transformedProperties = + IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties); + + if (transformedProperties != null) { + all.putAll(transformedProperties); + } + all.put( + CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER); + // Map "catalog-backend" to "catalog-type". + // TODO If catalog backend is CUSTOM, we need special compatibility logic. + GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach( + (key, value) -> { + if (all.containsKey(key)) { + String config = all.remove(key); + all.put(value, config); + } + }); + return all; + } + + @Override + public Map<String, String> toGravitinoTableProperties(Map<String, String> properties) { + return new HashMap<>(properties); + } + + @Override + public Map<String, String> toFlinkTableProperties(Map<String, String> properties) { + return new HashMap<>(properties); + } +} diff --git a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index a535afb6dc..45ff2512e7 100644 --- a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -19,4 +19,5 @@ org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory -org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory \ No newline at end of file +org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory +org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactory diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java new file mode 100644 index 0000000000..d6de522f39 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestIcebergPropertiesConverter { + private static final IcebergPropertiesConverter CONVERTER = IcebergPropertiesConverter.INSTANCE; + + @Test + void testCatalogPropertiesWithHiveBackend() { + Map<String, String> properties = + CONVERTER.toFlinkCatalogProperties( + ImmutableMap.of( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, + "hive-uri", + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, + "hive-warehouse", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + CommonCatalogOptions.CATALOG_TYPE.key(), + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE, + IcebergPropertiesConstants.ICEBERG_CATALOG_URI, + "hive-uri", + IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, + "hive-warehouse"), + properties); + } + + @Test + void testCatalogPropertiesWithRestBackend() { + Map<String, String> properties = + CONVERTER.toFlinkCatalogProperties( + ImmutableMap.of( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, + "rest-uri", + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, + "rest-warehouse", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + CommonCatalogOptions.CATALOG_TYPE.key(), + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST, + IcebergPropertiesConstants.ICEBERG_CATALOG_URI, + "rest-uri", + IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, + "rest-warehouse"), + properties); + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 5a363e4e51..b45e5f46ec 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -72,6 +72,14 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { return true; } + protected boolean supportGetSchemaWithoutCommentAndOption() { + return true; + } + + protected abstract String getProvider(); + + protected abstract boolean supportDropCascade(); + @Test public void testCreateSchema() { doWithCatalog( @@ -83,13 +91,14 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { TestUtils.assertTableResult(tableResult, ResultKind.SUCCESS); catalog.asSchemas().schemaExists(schema); } finally { - catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); } }); } @Test + @EnabledIf("supportGetSchemaWithoutCommentAndOption") public void testGetSchemaWithoutCommentAndOption() { doWithCatalog( currentCatalog(), @@ -134,12 +143,11 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { Schema loadedSchema = catalog.asSchemas().loadSchema(schema); Assertions.assertEquals(schema, loadedSchema.name()); Assertions.assertEquals(comment, loadedSchema.comment()); - Assertions.assertEquals(2, loadedSchema.properties().size()); Assertions.assertEquals(propertyValue, loadedSchema.properties().get(propertyKey)); Assertions.assertEquals( location, loadedSchema.properties().get(HiveConstants.LOCATION)); } finally { - catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); } }); @@ -177,9 +185,9 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { Assertions.assertEquals(schema2, schemas[2]); Assertions.assertEquals(schema3, schemas[3]); } finally { - catalog.asSchemas().dropSchema(schema, true); - catalog.asSchemas().dropSchema(schema2, true); - catalog.asSchemas().dropSchema(schema3, true); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); + catalog.asSchemas().dropSchema(schema2, supportDropCascade()); + catalog.asSchemas().dropSchema(schema3, supportDropCascade()); Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); } }); @@ -204,7 +212,6 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { Schema loadedSchema = catalog.asSchemas().loadSchema(schema); Assertions.assertEquals(schema, loadedSchema.name()); Assertions.assertEquals("test comment", loadedSchema.comment()); - Assertions.assertEquals(3, loadedSchema.properties().size()); Assertions.assertEquals("value1", loadedSchema.properties().get("key1")); Assertions.assertEquals("value2", loadedSchema.properties().get("key2")); Assertions.assertNotNull(loadedSchema.properties().get("location")); @@ -215,11 +222,10 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { Schema reloadedSchema = catalog.asSchemas().loadSchema(schema); Assertions.assertEquals(schema, reloadedSchema.name()); Assertions.assertEquals("test comment", reloadedSchema.comment()); - Assertions.assertEquals(4, reloadedSchema.properties().size()); Assertions.assertEquals("new-value", reloadedSchema.properties().get("key1")); Assertions.assertEquals("value3", reloadedSchema.properties().get("key3")); } finally { - catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); } }); } @@ -270,7 +276,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { Row.of("A", 1.0), Row.of("B", 2.0)); }, - true); + true, + supportDropCascade()); } @Test @@ -303,7 +310,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { Row.of("test_table1"), Row.of("test_table2")); }, - true); + true, + supportDropCascade()); } @Test @@ -320,12 +328,11 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { NameIdentifier identifier = NameIdentifier.of(databaseName, tableName); catalog.asTableCatalog().createTable(identifier, columns, "comment1", ImmutableMap.of()); Assertions.assertTrue(catalog.asTableCatalog().tableExists(identifier)); - - TableResult result = sql("DROP TABLE %s", tableName); - TestUtils.assertTableResult(result, ResultKind.SUCCESS); + sql("DROP TABLE IF EXISTS %s", tableName); Assertions.assertFalse(catalog.asTableCatalog().tableExists(identifier)); }, - true); + true, + supportDropCascade()); } @Test @@ -379,7 +386,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { fail(e); } }, - true); + true, + supportDropCascade()); } @Test @@ -415,7 +423,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { }; assertColumns(expected, actual); }, - true); + true, + supportDropCascade()); } @Test @@ -466,6 +475,7 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { .asTableCatalog() .loadTable(NameIdentifier.of(databaseName, tableName)); Assertions.assertEquals(newComment, gravitinoTable.comment()); + } catch (DatabaseNotExistException | TableAlreadyExistException | TableNotExistException e) { @@ -475,7 +485,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { fail("Catalog doesn't exist"); } }, - true); + true, + supportDropCascade()); } @Test @@ -511,7 +522,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { }; assertColumns(expected, actual); }, - true); + true, + supportDropCascade()); } @Test @@ -542,7 +554,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { new Column[] {Column.of("order_amount", Types.IntegerType.get(), "ORDER_AMOUNT")}; assertColumns(expected, actual); }, - true); + true, + supportDropCascade()); } @Test @@ -584,7 +597,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { }; assertColumns(expected, actual); }, - true); + true, + supportDropCascade()); } @Test @@ -612,7 +626,8 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { Assertions.assertTrue( catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName, newTableName))); }, - true); + true, + supportDropCascade()); } @Test @@ -655,6 +670,7 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { Assertions.assertEquals("value1", properties.get("key")); Assertions.assertNull(properties.get("key2")); }, - true); + true, + supportDropCascade()); } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java index f56b5297e1..959123f336 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -19,19 +19,25 @@ package org.apache.gravitino.flink.connector.integration.test; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.function.Consumer; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.types.Row; import org.apache.gravitino.Catalog; import org.apache.gravitino.client.GravitinoMetalake; import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; import org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions; import org.apache.gravitino.integration.test.container.ContainerSuite; import org.apache.gravitino.integration.test.container.HiveContainer; @@ -154,31 +160,56 @@ public abstract class FlinkEnvIT extends BaseIT { } @FormatMethod - protected TableResult sql(@FormatString String sql, Object... args) { + protected static TableResult sql(@FormatString String sql, Object... args) { return tableEnv.executeSql(String.format(sql, args)); } protected void doWithSchema( Catalog catalog, String schemaName, Consumer<Catalog> action, boolean dropSchema) { + doWithSchema(catalog, schemaName, action, dropSchema, true); + } + + protected void doWithSchema( + Catalog catalog, + String schemaName, + Consumer<Catalog> action, + boolean dropSchema, + boolean cascade) { Preconditions.checkNotNull(catalog); Preconditions.checkNotNull(schemaName); try { tableEnv.useCatalog(catalog.name()); if (!catalog.asSchemas().schemaExists(schemaName)) { - catalog.asSchemas().createSchema(schemaName, null, null); + catalog.asSchemas().createSchema(schemaName, null, getCreateSchemaProps(schemaName)); } tableEnv.useDatabase(schemaName); action.accept(catalog); } finally { if (dropSchema) { - catalog.asSchemas().dropSchema(schemaName, true); + clearTableInSchema(); + catalog.asSchemas().dropSchema(schemaName, cascade); } } } + protected Map<String, String> getCreateSchemaProps(String schemaName) { + return null; + } + protected static void doWithCatalog(Catalog catalog, Consumer<Catalog> action) { Preconditions.checkNotNull(catalog); tableEnv.useCatalog(catalog.name()); action.accept(catalog); } + + /** Iceberg requires deleting the table first, then deleting the schema. */ + protected static void clearTableInSchema() { + TableResult result = sql("SHOW TABLES"); + List<Row> rows = Lists.newArrayList(result.collect()); + for (Row row : rows) { + String tableName = row.getField(0).toString(); + TableResult deleteResult = sql("DROP TABLE IF EXISTS %s", tableName); + TestUtils.assertTableResult(deleteResult, ResultKind.SUCCESS); + } + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index bb7b25f6b2..7792068e24 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -29,7 +29,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; @@ -73,7 +72,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog hiveCatalog; @BeforeAll - static void hiveStartUp() { + void hiveStartUp() { initDefaultHiveCatalog(); } @@ -83,13 +82,13 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { metalake.dropCatalog(DEFAULT_HIVE_CATALOG, true); } - protected static void initDefaultHiveCatalog() { + protected void initDefaultHiveCatalog() { Preconditions.checkNotNull(metalake); hiveCatalog = metalake.createCatalog( DEFAULT_HIVE_CATALOG, org.apache.gravitino.Catalog.Type.RELATIONAL, - "hive", + getProvider(), null, ImmutableMap.of("metastore.uris", hiveMetastoreUri)); } @@ -583,32 +582,23 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { true); } + @Override + protected Map<String, String> getCreateSchemaProps(String schemaName) { + return ImmutableMap.of("location", warehouse + "/" + schemaName); + } + @Override protected org.apache.gravitino.Catalog currentCatalog() { return hiveCatalog; } - protected void doWithSchema( - org.apache.gravitino.Catalog catalog, - String schemaName, - Consumer<org.apache.gravitino.Catalog> action, - boolean dropSchema) { - Preconditions.checkNotNull(catalog); - Preconditions.checkNotNull(schemaName); - try { - tableEnv.useCatalog(catalog.name()); - if (!catalog.asSchemas().schemaExists(schemaName)) { - catalog - .asSchemas() - .createSchema( - schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName)); - } - tableEnv.useDatabase(schemaName); - action.accept(catalog); - } finally { - if (dropSchema) { - catalog.asSchemas().dropSchema(schemaName, true); - } - } + @Override + protected String getProvider() { + return "hive"; + } + + @Override + protected boolean supportDropCascade() { + return true; } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java similarity index 61% copy from flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java copy to flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java index bb7b25f6b2..0834def90b 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java @@ -16,7 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.gravitino.flink.connector.integration.test.hive; + +package org.apache.gravitino.flink.connector.integration.test.iceberg; import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns; import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn; @@ -25,18 +26,13 @@ import static org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_T import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import java.io.IOException; import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.function.Consumer; -import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.CatalogTable; @@ -44,13 +40,12 @@ import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.DefaultCatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; -import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; import org.apache.flink.types.Row; +import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; -import org.apache.gravitino.catalog.hive.HiveConstants; -import org.apache.gravitino.flink.connector.PropertiesConverter; -import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalog; -import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalog; +import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactoryOptions; import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; import org.apache.gravitino.rel.Column; @@ -58,54 +53,41 @@ import org.apache.gravitino.rel.Table; import org.apache.gravitino.rel.expressions.transforms.Transform; import org.apache.gravitino.rel.expressions.transforms.Transforms; import org.apache.gravitino.rel.types.Types; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -@Tag("gravitino-docker-test") -public class FlinkHiveCatalogIT extends FlinkCommonIT { - private static final String DEFAULT_HIVE_CATALOG = "test_flink_hive_schema_catalog"; +public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT { - private static org.apache.gravitino.Catalog hiveCatalog; + private static final String DEFAULT_ICEBERG_CATALOG = "flink_iceberg_catalog"; - @BeforeAll - static void hiveStartUp() { - initDefaultHiveCatalog(); - } - - @AfterAll - static void hiveStop() { - Preconditions.checkNotNull(metalake); - metalake.dropCatalog(DEFAULT_HIVE_CATALOG, true); - } + private static org.apache.gravitino.Catalog icebergCatalog; - protected static void initDefaultHiveCatalog() { + @BeforeAll + public void before() { Preconditions.checkNotNull(metalake); - hiveCatalog = + icebergCatalog = metalake.createCatalog( - DEFAULT_HIVE_CATALOG, + DEFAULT_ICEBERG_CATALOG, org.apache.gravitino.Catalog.Type.RELATIONAL, - "hive", + getProvider(), null, - ImmutableMap.of("metastore.uris", hiveMetastoreUri)); + getCatalogConfigs()); } + protected abstract Map<String, String> getCatalogConfigs(); + @Test - public void testCreateGravitinoHiveCatalog() { + public void testCreateGravitinoIcebergCatalog() { tableEnv.useCatalog(DEFAULT_CATALOG); int numCatalogs = tableEnv.listCatalogs().length; // Create a new catalog. - String catalogName = "gravitino_hive"; - Configuration configuration = new Configuration(); + String catalogName = "gravitino_iceberg_catalog"; + Configuration configuration = Configuration.fromMap(getCatalogConfigs()); configuration.set( - CommonCatalogOptions.CATALOG_TYPE, GravitinoHiveCatalogFactoryOptions.IDENTIFIER); - configuration.set(HiveCatalogFactoryOptions.HIVE_CONF_DIR, "src/test/resources/flink-tests"); - configuration.set(GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS, hiveMetastoreUri); + CommonCatalogOptions.CATALOG_TYPE, GravitinoIcebergCatalogFactoryOptions.IDENTIFIER); + CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName, configuration); tableEnv.createCatalog(catalogName, catalogDescriptor); Assertions.assertTrue(metalake.catalogExists(catalogName)); @@ -113,23 +95,12 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { // Check the catalog properties. org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); Map<String, String> properties = gravitinoCatalog.properties(); - Assertions.assertEquals(hiveMetastoreUri, properties.get(HiveConstants.METASTORE_URIS)); - Map<String, String> flinkProperties = - gravitinoCatalog.properties().entrySet().stream() - .filter(e -> e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Assertions.assertEquals(2, flinkProperties.size()); - Assertions.assertEquals( - "src/test/resources/flink-tests", - flinkProperties.get(flinkByPass(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key()))); - Assertions.assertEquals( - GravitinoHiveCatalogFactoryOptions.IDENTIFIER, - flinkProperties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key()))); + Assertions.assertEquals(hiveMetastoreUri, properties.get(IcebergConstants.URI)); // Get the created catalog. Optional<org.apache.flink.table.catalog.Catalog> catalog = tableEnv.getCatalog(catalogName); Assertions.assertTrue(catalog.isPresent()); - Assertions.assertInstanceOf(GravitinoHiveCatalog.class, catalog.get()); + Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, catalog.get()); // List catalogs. String[] catalogs = tableEnv.listCatalogs(); @@ -154,52 +125,46 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { tableEnv.executeSql("drop catalog " + catalogName); Assertions.assertFalse(metalake.catalogExists(catalogName)); - Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName); + Optional<org.apache.flink.table.catalog.Catalog> droppedCatalog = + tableEnv.getCatalog(catalogName); Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); } @Test - public void testCreateGravitinoHiveCatalogUsingSQL() { + public void testCreateGravitinoIcebergUsingSQL() { tableEnv.useCatalog(DEFAULT_CATALOG); int numCatalogs = tableEnv.listCatalogs().length; // Create a new catalog. - String catalogName = "gravitino_hive_sql"; + String catalogName = "gravitino_iceberg_using_sql"; tableEnv.executeSql( String.format( "create catalog %s with (" - + "'type'='gravitino-hive', " - + "'hive-conf-dir'='src/test/resources/flink-tests'," - + "'hive.metastore.uris'='%s'," - + "'unknown.key'='unknown.value'" + + "'type'='%s', " + + "'catalog-backend'='%s'," + + "'uri'='%s'," + + "'warehouse'='%s'" + ")", - catalogName, hiveMetastoreUri)); + catalogName, + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + getCatalogBackend(), + hiveMetastoreUri, + warehouse)); Assertions.assertTrue(metalake.catalogExists(catalogName)); // Check the properties of the created catalog. org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); Map<String, String> properties = gravitinoCatalog.properties(); - Assertions.assertEquals(hiveMetastoreUri, properties.get(HiveConstants.METASTORE_URIS)); - Map<String, String> flinkProperties = - properties.entrySet().stream() - .filter(e -> e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Assertions.assertEquals(3, flinkProperties.size()); - Assertions.assertEquals( - "src/test/resources/flink-tests", - flinkProperties.get(flinkByPass(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key()))); - Assertions.assertEquals( - GravitinoHiveCatalogFactoryOptions.IDENTIFIER, - flinkProperties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key()))); + Assertions.assertEquals(hiveMetastoreUri, properties.get(IcebergConstants.URI)); + Assertions.assertEquals( - "unknown.value", - flinkProperties.get(flinkByPass("unknown.key")), - "The unknown.key will not cause failure and will be saved in Gravitino."); + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + properties.get(CommonCatalogOptions.CATALOG_TYPE.key())); // Get the created catalog. Optional<org.apache.flink.table.catalog.Catalog> catalog = tableEnv.getCatalog(catalogName); Assertions.assertTrue(catalog.isPresent()); - Assertions.assertInstanceOf(GravitinoHiveCatalog.class, catalog.get()); + Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, catalog.get()); // List catalogs. String[] catalogs = tableEnv.listCatalogs(); @@ -229,52 +194,25 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { tableEnv.executeSql("drop catalog " + catalogName); Assertions.assertFalse(metalake.catalogExists(catalogName)); - Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName); + Optional<org.apache.flink.table.catalog.Catalog> droppedCatalog = + tableEnv.getCatalog(catalogName); Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); } - @Test - public void testCreateGravitinoHiveCatalogRequireOptions() { - tableEnv.useCatalog(DEFAULT_CATALOG); - - // Failed to create the catalog for missing the required options. - String catalogName = "gravitino_hive_sql2"; - Assertions.assertThrows( - ValidationException.class, - () -> { - tableEnv.executeSql( - String.format( - "create catalog %s with (" - + "'type'='gravitino-hive', " - + "'hive-conf-dir'='src/test/resources/flink-tests'" - + ")", - catalogName)); - }, - "The hive.metastore.uris is required."); - - Assertions.assertFalse(metalake.catalogExists(catalogName)); - } - @Test public void testGetCatalogFromGravitino() { // list catalogs. int numCatalogs = tableEnv.listCatalogs().length; // create a new catalog. - String catalogName = "hive_catalog_in_gravitino"; + String catalogName = "iceberg_catalog_in_gravitino"; org.apache.gravitino.Catalog gravitinoCatalog = metalake.createCatalog( catalogName, org.apache.gravitino.Catalog.Type.RELATIONAL, - "hive", + getProvider(), null, - ImmutableMap.of( - "flink.bypass.hive-conf-dir", - "src/test/resources/flink-tests", - "flink.bypass.hive.test", - "hive.config", - "metastore.uris", - hiveMetastoreUri)); + getCatalogConfigs()); Assertions.assertNotNull(gravitinoCatalog); Assertions.assertEquals(catalogName, gravitinoCatalog.name()); Assertions.assertTrue(metalake.catalogExists(catalogName)); @@ -282,15 +220,10 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { numCatalogs + 1, tableEnv.listCatalogs().length, "Should create a new catalog"); // get the catalog from Gravitino. - Optional<Catalog> flinkHiveCatalog = tableEnv.getCatalog(catalogName); - Assertions.assertTrue(flinkHiveCatalog.isPresent()); - Assertions.assertInstanceOf(GravitinoHiveCatalog.class, flinkHiveCatalog.get()); - GravitinoHiveCatalog gravitinoHiveCatalog = (GravitinoHiveCatalog) flinkHiveCatalog.get(); - HiveConf hiveConf = gravitinoHiveCatalog.getHiveConf(); - Assertions.assertTrue(hiveConf.size() > 0, "Should have hive conf"); - Assertions.assertEquals("hive.config", hiveConf.get("hive.test")); - Assertions.assertEquals( - hiveMetastoreUri, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); + Optional<org.apache.flink.table.catalog.Catalog> flinkIcebergCatalog = + tableEnv.getCatalog(catalogName); + Assertions.assertTrue(flinkIcebergCatalog.isPresent()); + Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, flinkIcebergCatalog.get()); // drop the catalog. tableEnv.useCatalog(DEFAULT_CATALOG); @@ -301,123 +234,99 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { } @Test - public void testHivePartitionTable() { - String databaseName = "test_create_hive_partition_table_db"; - String tableName = "test_create_hive_partition_table"; - String comment = "test comment"; + public void testIcebergTableWithPartition() { + String databaseName = "test_iceberg_table_partition"; + String tableName = "iceberg_table_with_partition"; String key = "test key"; String value = "test value"; doWithSchema( - currentCatalog(), + icebergCatalog, databaseName, catalog -> { TableResult result = sql( - "CREATE TABLE %s " - + "(string_type STRING COMMENT 'string_type', " - + " double_type DOUBLE COMMENT 'double_type')" - + " COMMENT '%s' " - + " PARTITIONED BY (string_type, double_type)" + "CREATE TABLE %s (" + + " id BIGINT COMMENT 'unique id'," + + " data STRING NOT NULL" + + " ) PARTITIONED BY (data)" + " WITH (" + "'%s' = '%s')", - tableName, comment, key, value); + tableName, key, value); TestUtils.assertTableResult(result, ResultKind.SUCCESS); Table table = catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName)); Assertions.assertNotNull(table); - Assertions.assertEquals(comment, table.comment()); Assertions.assertEquals(value, table.properties().get(key)); Column[] columns = new Column[] { - Column.of("string_type", Types.StringType.get(), "string_type", true, false, null), - Column.of("double_type", Types.DoubleType.get(), "double_type") + Column.of("id", Types.LongType.get(), "unique id", true, false, null), + Column.of("data", Types.StringType.get(), null, false, false, null) }; assertColumns(columns, table.columns()); - Transform[] partitions = - new Transform[] { - Transforms.identity("string_type"), Transforms.identity("double_type") - }; + Transform[] partitions = new Transform[] {Transforms.identity("data")}; Assertions.assertArrayEquals(partitions, table.partitioning()); // load flink catalog try { - Catalog flinkCatalog = tableEnv.getCatalog(currentCatalog().name()).get(); + org.apache.flink.table.catalog.Catalog flinkCatalog = + tableEnv.getCatalog(currentCatalog().name()).get(); CatalogBaseTable flinkTable = flinkCatalog.getTable(ObjectPath.fromString(databaseName + "." + tableName)); DefaultCatalogTable catalogTable = (DefaultCatalogTable) flinkTable; Assertions.assertTrue(catalogTable.isPartitioned()); Assertions.assertArrayEquals( - new String[] {"string_type", "double_type"}, - catalogTable.getPartitionKeys().toArray()); + new String[] {"data"}, catalogTable.getPartitionKeys().toArray()); } catch (Exception e) { Assertions.fail("Table should be exist", e); } // write and read. TestUtils.assertTableResult( - sql("INSERT INTO %s VALUES ('A', 1.0), ('B', 2.0)", tableName), + sql("INSERT INTO %s VALUES (1, 'A'), (2, 'B')", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(-1L)); TestUtils.assertTableResult( - sql("SELECT * FROM %s ORDER BY double_type", tableName), + sql("SELECT * FROM %s ORDER BY data", tableName), ResultKind.SUCCESS_WITH_CONTENT, - Row.of("A", 1.0), - Row.of("B", 2.0)); - try { - Assertions.assertTrue( - hdfs.exists( - new Path( - table.properties().get("location") + "/string_type=A/double_type=1.0"))); - Assertions.assertTrue( - hdfs.exists( - new Path( - table.properties().get("location") + "/string_type=B/double_type=2.0"))); - } catch (IOException e) { - Assertions.fail("The partition directory should be exist.", e); - } + Row.of(1, "A"), + Row.of(2, "B")); }, - true); + true, + supportDropCascade()); } @Test - public void testCreateHiveTable() { - String databaseName = "test_create_hive_table_db"; - String tableName = "test_create_hive_table"; - String comment = "test comment"; + public void testCreateIcebergTable() { + String databaseName = "test_create_iceberg_table"; + String tableName = "iceberg_table"; + String comment = "test table comment"; String key = "test key"; String value = "test value"; - // 1. The NOT NULL constraint for column is only supported since Hive 3.0, - // but the current Gravitino Hive catalog only supports Hive 2.x. - // 2. Hive doesn't support Time and Timestamp with timezone type. - // 3. Flink SQL only support to create Interval Month and Second(3). doWithSchema( - metalake.loadCatalog(DEFAULT_HIVE_CATALOG), + metalake.loadCatalog(DEFAULT_ICEBERG_CATALOG), databaseName, catalog -> { TableResult result = sql( - "CREATE TABLE %s " - + "(string_type STRING COMMENT 'string_type', " + "CREATE TABLE %s (" + + " string_type STRING COMMENT 'string_type', " + " double_type DOUBLE COMMENT 'double_type'," + " int_type INT COMMENT 'int_type'," + " varchar_type VARCHAR COMMENT 'varchar_type'," - + " char_type CHAR COMMENT 'char_type'," + " boolean_type BOOLEAN COMMENT 'boolean_type'," - + " byte_type TINYINT COMMENT 'byte_type'," + " binary_type VARBINARY(10) COMMENT 'binary_type'," + " decimal_type DECIMAL(10, 2) COMMENT 'decimal_type'," + " bigint_type BIGINT COMMENT 'bigint_type'," + " float_type FLOAT COMMENT 'float_type'," + " date_type DATE COMMENT 'date_type'," + " timestamp_type TIMESTAMP COMMENT 'timestamp_type'," - + " smallint_type SMALLINT COMMENT 'smallint_type'," + " array_type ARRAY<INT> COMMENT 'array_type'," + " map_type MAP<INT, STRING> COMMENT 'map_type'," - + " struct_type ROW<k1 INT, k2 String>)" - + " COMMENT '%s' WITH (" + + " struct_type ROW<k1 INT, k2 String>" + + " ) COMMENT '%s' WITH (" + "'%s' = '%s')", tableName, comment, key, value); TestUtils.assertTableResult(result, ResultKind.SUCCESS); @@ -433,9 +342,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { Column.of("double_type", Types.DoubleType.get(), "double_type"), Column.of("int_type", Types.IntegerType.get(), "int_type"), Column.of("varchar_type", Types.StringType.get(), "varchar_type"), - Column.of("char_type", Types.FixedCharType.of(1), "char_type"), Column.of("boolean_type", Types.BooleanType.get(), "boolean_type"), - Column.of("byte_type", Types.ByteType.get(), "byte_type"), Column.of("binary_type", Types.BinaryType.get(), "binary_type"), Column.of("decimal_type", Types.DecimalType.of(10, 2), "decimal_type"), Column.of("bigint_type", Types.LongType.get(), "bigint_type"), @@ -443,7 +350,6 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { Column.of("date_type", Types.DateType.get(), "date_type"), Column.of( "timestamp_type", Types.TimestampType.withoutTimeZone(), "timestamp_type"), - Column.of("smallint_type", Types.ShortType.get(), "smallint_type"), Column.of( "array_type", Types.ListType.of(Types.IntegerType.get(), true), "array_type"), Column.of( @@ -460,28 +366,25 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { assertColumns(columns, table.columns()); Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning()); }, - true); + true, + supportDropCascade()); } @Test - public void testGetHiveTable() { + public void testGetIcebergTable() { Column[] columns = new Column[] { Column.of("string_type", Types.StringType.get(), "string_type", true, false, null), Column.of("double_type", Types.DoubleType.get(), "double_type"), Column.of("int_type", Types.IntegerType.get(), "int_type"), Column.of("varchar_type", Types.StringType.get(), "varchar_type"), - Column.of("char_type", Types.FixedCharType.of(1), "char_type"), Column.of("boolean_type", Types.BooleanType.get(), "boolean_type"), - Column.of("byte_type", Types.ByteType.get(), "byte_type"), Column.of("binary_type", Types.BinaryType.get(), "binary_type"), Column.of("decimal_type", Types.DecimalType.of(10, 2), "decimal_type"), Column.of("bigint_type", Types.LongType.get(), "bigint_type"), Column.of("float_type", Types.FloatType.get(), "float_type"), Column.of("date_type", Types.DateType.get(), "date_type"), Column.of("timestamp_type", Types.TimestampType.withoutTimeZone(), "timestamp_type"), - Column.of("smallint_type", Types.ShortType.get(), "smallint_type"), - Column.of("fixed_char_type", Types.FixedCharType.of(10), "fixed_char_type"), Column.of("array_type", Types.ListType.of(Types.IntegerType.get(), true), "array_type"), Column.of( "map_type", @@ -495,9 +398,9 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { null) }; - String databaseName = "test_get_hive_table_db"; + String databaseName = "test_get_iceberg_table"; doWithSchema( - metalake.loadCatalog(DEFAULT_HIVE_CATALOG), + metalake.loadCatalog(DEFAULT_ICEBERG_CATALOG), databaseName, catalog -> { String tableName = "test_desc_table"; @@ -511,7 +414,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { ImmutableMap.of("k1", "v1")); Optional<org.apache.flink.table.catalog.Catalog> flinkCatalog = - tableEnv.getCatalog(DEFAULT_HIVE_CATALOG); + tableEnv.getCatalog(DEFAULT_ICEBERG_CATALOG); Assertions.assertTrue(flinkCatalog.isPresent()); try { CatalogBaseTable table = @@ -531,13 +434,9 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { org.apache.flink.table.catalog.Column.physical( "varchar_type", DataTypes.VARCHAR(Integer.MAX_VALUE)) .withComment("varchar_type"), - org.apache.flink.table.catalog.Column.physical("char_type", DataTypes.CHAR(1)) - .withComment("char_type"), org.apache.flink.table.catalog.Column.physical( "boolean_type", DataTypes.BOOLEAN()) .withComment("boolean_type"), - org.apache.flink.table.catalog.Column.physical("byte_type", DataTypes.TINYINT()) - .withComment("byte_type"), org.apache.flink.table.catalog.Column.physical("binary_type", DataTypes.BYTES()) .withComment("binary_type"), org.apache.flink.table.catalog.Column.physical( @@ -552,12 +451,6 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { org.apache.flink.table.catalog.Column.physical( "timestamp_type", DataTypes.TIMESTAMP()) .withComment("timestamp_type"), - org.apache.flink.table.catalog.Column.physical( - "smallint_type", DataTypes.SMALLINT()) - .withComment("smallint_type"), - org.apache.flink.table.catalog.Column.physical( - "fixed_char_type", DataTypes.CHAR(10)) - .withComment("fixed_char_type"), org.apache.flink.table.catalog.Column.physical( "array_type", DataTypes.ARRAY(DataTypes.INT())) .withComment("array_type"), @@ -580,35 +473,29 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { Assertions.fail(e); } }, - true); + true, + supportDropCascade()); + } + + @Override + protected Catalog currentCatalog() { + return icebergCatalog; } @Override - protected org.apache.gravitino.Catalog currentCatalog() { - return hiveCatalog; + protected String getProvider() { + return "lakehouse-iceberg"; } - protected void doWithSchema( - org.apache.gravitino.Catalog catalog, - String schemaName, - Consumer<org.apache.gravitino.Catalog> action, - boolean dropSchema) { - Preconditions.checkNotNull(catalog); - Preconditions.checkNotNull(schemaName); - try { - tableEnv.useCatalog(catalog.name()); - if (!catalog.asSchemas().schemaExists(schemaName)) { - catalog - .asSchemas() - .createSchema( - schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName)); - } - tableEnv.useDatabase(schemaName); - action.accept(catalog); - } finally { - if (dropSchema) { - catalog.asSchemas().dropSchema(schemaName, true); - } - } + @Override + protected boolean supportGetSchemaWithoutCommentAndOption() { + return false; } + + @Override + protected boolean supportDropCascade() { + return false; + } + + protected abstract String getCatalogBackend(); } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java new file mode 100644 index 0000000000..fc21ce2c24 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.integration.test.iceberg; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants; +import org.junit.jupiter.api.Tag; + +@Tag("gravitino-docker-test") +public class FlinkIcebergHiveCatalogIT extends FlinkIcebergCatalogIT { + + @Override + protected Map<String, String> getCatalogConfigs() { + Map<String, String> catalogProperties = Maps.newHashMap(); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, warehouse); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, hiveMetastoreUri); + return catalogProperties; + } + + protected String getCatalogBackend() { + return "hive"; + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index 57a17c2a11..a03b4a198e 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -47,12 +47,22 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT { return false; } + @Override + protected String getProvider() { + return "lakehouse-paimon"; + } + + @Override + protected boolean supportDropCascade() { + return true; + } + protected Catalog currentCatalog() { return catalog; } @BeforeAll - static void setup() { + void setup() { initPaimonCatalog(); } @@ -62,13 +72,13 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT { metalake.dropCatalog(DEFAULT_PAIMON_CATALOG, true); } - private static void initPaimonCatalog() { + private void initPaimonCatalog() { Preconditions.checkNotNull(metalake); catalog = metalake.createCatalog( DEFAULT_PAIMON_CATALOG, org.apache.gravitino.Catalog.Type.RELATIONAL, - "lakehouse-paimon", + getProvider(), null, ImmutableMap.of( PaimonConstants.CATALOG_BACKEND, @@ -98,4 +108,9 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT { Map<String, String> properties = gravitinoCatalog.properties(); Assertions.assertEquals(warehouse, properties.get("warehouse")); } + + @Override + protected Map<String, String> getCreateSchemaProps(String schemaName) { + return null; + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java index ba16a9c07b..02710bcfb3 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java @@ -42,7 +42,8 @@ public class TestUtils { Row expectedRow = expected[i]; Row actualRow = actualRows.get(i); Assertions.assertEquals(expectedRow.getKind(), actualRow.getKind()); - Assertions.assertEquals(expectedRow, actualRow); + // Only compare string value. + Assertions.assertEquals(expectedRow.toString(), actualRow.toString()); } } }