This is an automated email from the ASF dual-hosted git repository. diqiu50 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push: new eded01901 [#4757]feat(trino-connector): Support more partition and sort order features of the Iceberg catalog (#4925) eded01901 is described below commit eded019014bcb85763bef6bd33ed17fb0fd235e1 Author: Yuhui <h...@datastrato.com> AuthorDate: Tue Oct 22 17:03:45 2024 +0800 [#4757]feat(trino-connector): Support more partition and sort order features of the Iceberg catalog (#4925) ### What changes were proposed in this pull request? Support Iceberg partition expressions like: year(x), month(x), day(x), hour(x), bucket(x, n), truncate(x,n) Support Iceberg sort order expressions like: field DESC, field ASC, field DESC NULLS FIRST, field ASC NULLS LAST ### Why are the changes needed? Fix: #4757 ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? New UTs and ITs --- trino-connector/integration-test/build.gradle.kts | 22 +- .../integration/test/CloseableGroupTest.java | 57 ---- .../integration/test/TrinoQueryTestTool.java | 2 +- .../00005_partition_sort_order.sql | 63 +++++ .../00005_partition_sort_order.txt | 71 +++++ .../trino-test-tools/trino_test.sh | 4 +- .../trino/connector/GravitinoErrorCode.java | 2 +- .../connector/catalog/iceberg/ExpressionUtil.java | 308 ++++++++++++++++++++ .../catalog/iceberg/IcebergMetadataAdapter.java | 35 +-- .../catalog/iceberg/TestExpressionUtil.java | 311 +++++++++++++++++++++ 10 files changed, 776 insertions(+), 99 deletions(-) diff --git a/trino-connector/integration-test/build.gradle.kts b/trino-connector/integration-test/build.gradle.kts index d4733d136..a238cd60e 100644 --- a/trino-connector/integration-test/build.gradle.kts +++ b/trino-connector/integration-test/build.gradle.kts @@ -70,17 +70,24 @@ dependencies { testRuntimeOnly(libs.junit.jupiter.engine) } +tasks.register("setupDependencies") { + dependsOn(":trino-connector:trino-connector:jar") + dependsOn(":catalogs:catalog-lakehouse-iceberg:jar", ":catalogs:catalog-lakehouse-iceberg:runtimeJars") + dependsOn(":catalogs:catalog-jdbc-mysql:jar", ":catalogs:catalog-jdbc-mysql:runtimeJars") + dependsOn(":catalogs:catalog-jdbc-postgresql:jar", ":catalogs:catalog-jdbc-postgresql:runtimeJars") + dependsOn(":catalogs:catalog-hive:jar", ":catalogs:catalog-hive:runtimeJars") +} + +tasks.build { + dependsOn("setupDependencies") +} + tasks.test { val skipITs = project.hasProperty("skipITs") if (skipITs) { exclude("**/integration/test/**") } else { - dependsOn(":trino-connector:trino-connector:jar") - dependsOn(":catalogs:catalog-lakehouse-iceberg:jar", ":catalogs:catalog-lakehouse-iceberg:runtimeJars") - dependsOn(":catalogs:catalog-jdbc-mysql:jar", ":catalogs:catalog-jdbc-mysql:runtimeJars") - dependsOn(":catalogs:catalog-jdbc-postgresql:jar", ":catalogs:catalog-jdbc-postgresql:runtimeJars") - dependsOn(":catalogs:catalog-hive:jar", ":catalogs:catalog-hive:runtimeJars") - + dependsOn("setupDependencies") doFirst { copy { from("${project.rootDir}/dev/docker/trino/conf") @@ -113,8 +120,9 @@ tasks.test { } tasks.register<JavaExec>("TrinoTest") { + dependsOn("build") classpath = sourceSets["test"].runtimeClasspath - mainClass.set("org.apache.gravitino.integration.test.trino.TrinoQueryTestTool") + mainClass.set("org.apache.gravitino.trino.connector.integration.test.TrinoQueryTestTool") if (JavaVersion.current() > JavaVersion.VERSION_1_8) { jvmArgs = listOf( diff --git a/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/CloseableGroupTest.java b/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/CloseableGroupTest.java deleted file mode 100644 index 75402108b..000000000 --- a/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/CloseableGroupTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.gravitino.trino.connector.integration.test; - -import java.io.Closeable; -import org.apache.gravitino.integration.test.util.CloseableGroup; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -public class CloseableGroupTest { - @Test - public void callCloseableTest() throws Exception { - Closeable closeable1 = Mockito.mock(Closeable.class); - Closeable closeable2 = Mockito.mock(Closeable.class); - Closeable closeable3 = Mockito.mock(Closeable.class); - - CloseableGroup closeableGroup = CloseableGroup.create(); - closeableGroup.register(closeable1); - closeableGroup.register(closeable2); - closeableGroup.register(closeable3); - - closeableGroup.close(); - Mockito.verify(closeable1).close(); - Mockito.verify(closeable2).close(); - Mockito.verify(closeable3).close(); - } - - @Test - public void callAutoCloseableTest() throws Exception { - Closeable closeable1 = Mockito.mock(Closeable.class); - AutoCloseable closeable2 = Mockito.mock(AutoCloseable.class); - - CloseableGroup closeableGroup = CloseableGroup.create(); - closeableGroup.register(closeable1); - closeableGroup.register(closeable2); - - closeableGroup.close(); - Mockito.verify(closeable1).close(); - Mockito.verify(closeable2).close(); - } -} diff --git a/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryTestTool.java b/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryTestTool.java index dee82d6a3..eeb56c0d4 100644 --- a/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryTestTool.java +++ b/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoQueryTestTool.java @@ -185,7 +185,7 @@ public class TrinoQueryTestTool { if (testSetsDir.isEmpty()) { testSetsDir = TrinoQueryIT.class.getClassLoader().getResource("trino-ci-testset").getPath(); - testSetsDir = ITUtils.joinPath(testSetsDir, "trino-ci-testset/testsets"); + testSetsDir = ITUtils.joinPath(testSetsDir, "testsets"); } else { TrinoQueryIT.testsetsDir = testSetsDir; } diff --git a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00005_partition_sort_order.sql b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00005_partition_sort_order.sql new file mode 100644 index 000000000..e3a353f09 --- /dev/null +++ b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00005_partition_sort_order.sql @@ -0,0 +1,63 @@ +CREATE SCHEMA gt_db2; + +USE gt_db2; + +CREATE TABLE lineitem( + orderkey bigint, + partkey bigint, + suppkey bigint, + linenumber integer, + quantity decimal(12, 2), + extendedprice decimal(12, 2), + discount decimal(12, 2), + tax decimal(12, 2), + returnflag varchar, + linestatus varchar, + shipdate date, + commitdate date, + receiptdate date, + shipinstruct varchar, + shipmode varchar, + comment varchar +) +WITH ( + partitioning = ARRAY['year(commitdate)'], + sorted_by = ARRAY['partkey', 'extendedprice desc'] +); + +show create table lineitem; + +insert into lineitem select * from tpch.tiny.lineitem; + +select * from lineitem order by orderkey, partkey limit 5; + +CREATE TABLE tb01( + orderkey bigint, + partkey bigint, + suppkey bigint, + linenumber integer, + quantity decimal(12, 2), + extendedprice decimal(12, 2), + discount decimal(12, 2), + tax decimal(12, 2), + returnflag varchar, + linestatus varchar, + shipdate date, + commitdate date, + receiptdate date, + shipinstruct varchar, + shipmode varchar, + comment varchar +) +WITH ( + partitioning = ARRAY['day(commitdate)', 'month(shipdate)', 'bucket(partkey, 2)', 'truncate(shipinstruct, 2)'], + sorted_by = ARRAY['partkey asc nulls last', 'extendedprice DESC NULLS FIRST'] +); + +show create table tb01; + +drop table tb01; + +drop table lineitem; + +drop schema gt_db2; diff --git a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00005_partition_sort_order.txt b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00005_partition_sort_order.txt new file mode 100644 index 000000000..429ae871c --- /dev/null +++ b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00005_partition_sort_order.txt @@ -0,0 +1,71 @@ +CREATE SCHEMA + +USE + +CREATE TABLE + +"CREATE TABLE %.gt_db2.lineitem ( + orderkey bigint, + partkey bigint, + suppkey bigint, + linenumber integer, + quantity decimal(12, 2), + extendedprice decimal(12, 2), + discount decimal(12, 2), + tax decimal(12, 2), + returnflag varchar, + linestatus varchar, + shipdate date, + commitdate date, + receiptdate date, + shipinstruct varchar, + shipmode varchar, + comment varchar +) +COMMENT '' +WITH ( + location = 'hdfs://%/user/iceberg/warehouse/TrinoQueryIT/gt_db2%/lineitem', + partitioning = ARRAY['year(commitdate)'], + sorted_by = ARRAY['partkey','extendedprice DESC'] +)" + +INSERT: 60175 rows + +"1","22","48","4","28.00","25816.56","0.09","0.06","N","O","1996-04-21","1996-03-30","1996-05-16","NONE","AIR","lites. fluffily even de" +"1","157","10","6","32.00","33828.80","0.07","0.02","N","O","1996-01-30","1996-02-07","1996-02-03","DELIVER IN PERSON","MAIL","arefully slyly ex" +"1","241","23","5","24.00","27389.76","0.10","0.04","N","O","1996-03-30","1996-03-14","1996-04-01","NONE","FOB"," pending foxes. slyly re" +"1","637","38","3","8.00","12301.04","0.10","0.02","N","O","1996-01-29","1996-03-05","1996-01-31","TAKE BACK RETURN","REG AIR","riously. regular, express dep" +"1","674","75","2","36.00","56688.12","0.09","0.06","N","O","1996-04-12","1996-02-28","1996-04-20","TAKE BACK RETURN","MAIL","ly final dependencies: slyly bold " + +CREATE TABLE + +"CREATE TABLE %.gt_db2.tb01 ( + orderkey bigint, + partkey bigint, + suppkey bigint, + linenumber integer, + quantity decimal(12, 2), + extendedprice decimal(12, 2), + discount decimal(12, 2), + tax decimal(12, 2), + returnflag varchar, + linestatus varchar, + shipdate date, + commitdate date, + receiptdate date, + shipinstruct varchar, + shipmode varchar, + comment varchar +) +COMMENT '' +WITH ( + location = 'hdfs://%/user/iceberg/warehouse/TrinoQueryIT/gt_db2%/tb01', + partitioning = ARRAY['day(commitdate)','month(shipdate)','bucket(partkey, 2)','truncate(shipinstruct, 2)'], + sorted_by = ARRAY['partkey ASC NULLS LAST','extendedprice DESC NULLS FIRST'] +)" + +DROP TABLE + +DROP TABLE + +DROP SCHEMA \ No newline at end of file diff --git a/trino-connector/integration-test/trino-test-tools/trino_test.sh b/trino-connector/integration-test/trino-test-tools/trino_test.sh index 012adc127..d74e45ac3 100755 --- a/trino-connector/integration-test/trino-test-tools/trino_test.sh +++ b/trino-connector/integration-test/trino-test-tools/trino_test.sh @@ -19,11 +19,11 @@ set -e -DIR=$(cd "$(dirname "$0")" && pwd)/../.. +DIR=$(cd "$(dirname "$0")" && pwd)/../../.. export GRAVITINO_ROOT_DIR=$(cd "$DIR" && pwd) export GRAVITINO_HOME=$GRAVITINO_ROOT_DIR export GRAVITINO_TEST=true -export HADOOP_USER_NAME=root +export HADOOP_USER_NAME=anonymous echo $GRAVITINO_ROOT_DIR cd $GRAVITINO_ROOT_DIR diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoErrorCode.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoErrorCode.java index b79145e95..5741e4427 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoErrorCode.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoErrorCode.java @@ -50,7 +50,7 @@ public enum GravitinoErrorCode implements ErrorCodeSupplier { GRAVITINO_OPERATION_FAILED(22, EXTERNAL), GRAVITINO_RUNTIME_ERROR(23, EXTERNAL), GRAVITINO_DUPLICATED_CATALOGS(24, EXTERNAL), - ; + GRAVITINO_EXPRESSION_ERROR(25, EXTERNAL); // suppress ImmutableEnumChecker because ErrorCode is outside the project. @SuppressWarnings("ImmutableEnumChecker") diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/ExpressionUtil.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/ExpressionUtil.java new file mode 100644 index 000000000..2ee253e27 --- /dev/null +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/ExpressionUtil.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.trino.connector.catalog.iceberg; + +import static org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_EXPRESSION_ERROR; + +import io.trino.spi.TrinoException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.MatchResult; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.gravitino.rel.expressions.Expression; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.literals.Literal; +import org.apache.gravitino.rel.expressions.sorts.NullOrdering; +import org.apache.gravitino.rel.expressions.sorts.SortDirection; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.sorts.SortOrders; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; + +/** This class is used to convert expression of bucket, sort_by, partition object to string */ +public class ExpressionUtil { + private static final String IDENTIFIER = "[a-zA-Z_][a-zA-Z0-9_]*"; + private static final String FUNCTION_ARG_INT = "(\\d+)"; + private static final String FUNCTION_ARG_IDENTIFIER = "(" + IDENTIFIER + ")"; + private static final Pattern YEAR_FUNCTION_PATTERN = + Pattern.compile("year\\(" + FUNCTION_ARG_IDENTIFIER + "\\)", Pattern.CASE_INSENSITIVE); + private static final Pattern MONTH_FUNCTION_PATTERN = + Pattern.compile("month\\(" + FUNCTION_ARG_IDENTIFIER + "\\)", Pattern.CASE_INSENSITIVE); + private static final Pattern DAY_FUNCTION_PATTERN = + Pattern.compile("day\\(" + FUNCTION_ARG_IDENTIFIER + "\\)", Pattern.CASE_INSENSITIVE); + private static final Pattern HOUR_FUNCTION_PATTERN = + Pattern.compile("hour\\(" + FUNCTION_ARG_IDENTIFIER + "\\)", Pattern.CASE_INSENSITIVE); + private static final Pattern BUCKET_FUNCTION_PATTERN = + Pattern.compile( + "bucket\\(" + FUNCTION_ARG_IDENTIFIER + ",\\s*" + FUNCTION_ARG_INT + "\\)", + Pattern.CASE_INSENSITIVE); + private static final Pattern TRUNCATE_FUNCTION_PATTERN = + Pattern.compile( + "truncate\\(" + FUNCTION_ARG_IDENTIFIER + ",\\s*" + FUNCTION_ARG_INT + "\\)", + Pattern.CASE_INSENSITIVE); + private static final Pattern IDENTIFILER_PATTERN = + Pattern.compile(IDENTIFIER, Pattern.CASE_INSENSITIVE); + + private static final String SORT_DIRECTION_ASC = "ASC"; + private static final String SORT_DIRECTION_DESC = "DESC"; + private static final String NULL_ORDERING_FIRST = "NULLS FIRST"; + private static final String NULL_ORDERING_LAST = "NULLS LAST"; + private static final String SORT_DIRECTION = + "(" + SORT_DIRECTION_ASC + "|" + SORT_DIRECTION_DESC + ")"; + private static final String NULL_ORDERING = + "(" + NULL_ORDERING_FIRST + "|" + NULL_ORDERING_LAST + ")"; + private static final Pattern SROT_ORDER_PATTERN = + Pattern.compile(IDENTIFIER, Pattern.CASE_INSENSITIVE); + private static final Pattern SORT_ORDER_WITH_SORT_DIRECTION_PATTERN = + Pattern.compile("(" + IDENTIFIER + ")\\s+" + SORT_DIRECTION, Pattern.CASE_INSENSITIVE); + private static final Pattern SORT_ORDER_WITH_SORT_DIRECTION_AND_NULL_ORDERING_PATTERN = + Pattern.compile( + "(" + IDENTIFIER + ")\\s+" + SORT_DIRECTION + "\\s+" + NULL_ORDERING, + Pattern.CASE_INSENSITIVE); + + public static List<String> expressionToPartitionFiled(Transform[] transforms) { + try { + List<String> partitionFields = new ArrayList<>(); + for (Transform transform : transforms) { + partitionFields.add(transFormToString(transform)); + } + return partitionFields; + } catch (IllegalArgumentException e) { + throw new TrinoException( + GRAVITINO_EXPRESSION_ERROR, + "Error to handle transform Expressions :" + e.getMessage(), + e); + } + } + + public static Transform[] partitionFiledToExpression(List<String> partitions) { + try { + List<Transform> partitionTransforms = new ArrayList<>(); + for (String partition : partitions) { + parseTransform(partitionTransforms, partition); + } + return partitionTransforms.toArray(new Transform[0]); + } catch (IllegalArgumentException e) { + throw new TrinoException( + GRAVITINO_EXPRESSION_ERROR, "Error parsing the partition field: " + e.getMessage(), e); + } + } + + public static List<String> expressionToSortOrderFiled(SortOrder[] orders) { + try { + List<String> orderFields = new ArrayList<>(); + for (SortOrder order : orders) { + orderFields.add(sortOrderToString(order)); + } + return orderFields; + } catch (IllegalArgumentException e) { + throw new TrinoException( + GRAVITINO_EXPRESSION_ERROR, + "Error to handle the sort order expressions : " + e.getMessage(), + e); + } + } + + public static SortOrder[] sortOrderFiledToExpression(List<String> orderFields) { + try { + List<SortOrder> sortOrders = new ArrayList<>(); + for (String orderField : orderFields) { + parseSortOrder(sortOrders, orderField); + } + return sortOrders.toArray(new SortOrder[0]); + } catch (IllegalArgumentException e) { + throw new TrinoException( + GRAVITINO_EXPRESSION_ERROR, "Error parsing the sort order field: " + e.getMessage(), e); + } + } + + private static void parseTransform(List<Transform> transforms, String value) { + boolean match = + false + || tryMatch( + value, + IDENTIFILER_PATTERN, + (m) -> { + transforms.add(Transforms.identity(m.group(0))); + }) + || tryMatch( + value, + YEAR_FUNCTION_PATTERN, + (m) -> { + transforms.add(Transforms.year(m.group(1))); + }) + || tryMatch( + value, + MONTH_FUNCTION_PATTERN, + (m) -> { + transforms.add(Transforms.month(m.group(1))); + }) + || tryMatch( + value, + DAY_FUNCTION_PATTERN, + (m) -> { + transforms.add(Transforms.day(m.group(1))); + }) + || tryMatch( + value, + HOUR_FUNCTION_PATTERN, + (m) -> { + transforms.add(Transforms.hour(m.group(1))); + }) + || tryMatch( + value, + BUCKET_FUNCTION_PATTERN, + (m) -> { + transforms.add( + Transforms.bucket(Integer.parseInt(m.group(2)), new String[] {m.group(1)})); + }) + || tryMatch( + value, + TRUNCATE_FUNCTION_PATTERN, + (m) -> { + transforms.add( + Transforms.truncate(Integer.parseInt(m.group(2)), new String[] {m.group(1)})); + }); + if (!match) { + throw new IllegalArgumentException("Unparsed expression: " + value); + } + } + + private static boolean tryMatch(String value, Pattern pattern, MatchHandler handler) { + Matcher matcher = pattern.matcher(value); + if (matcher.matches()) { + handler.invoke(matcher.toMatchResult()); + return true; + } + return false; + } + + private static String transFormToString(Transform transform) { + if (transform instanceof Transforms.IdentityTransform) { + return ((Transforms.IdentityTransform) transform).fieldName()[0]; + + } else if (transform instanceof Transforms.YearTransform + || transform instanceof Transforms.MonthTransform + || transform instanceof Transforms.DayTransform + || transform instanceof Transforms.HourTransform) { + return String.format( + "%s(%s)", transform.name(), expressionToString(transform.arguments()[0])); + + } else if (transform instanceof Transforms.BucketTransform) { + Transforms.BucketTransform bucketTransform = (Transforms.BucketTransform) transform; + return String.format( + "%s(%s, %s)", + bucketTransform.name(), bucketTransform.fieldNames()[0][0], bucketTransform.numBuckets()); + + } else if (transform instanceof Transforms.TruncateTransform) { + Transforms.TruncateTransform truncateTransform = (Transforms.TruncateTransform) transform; + return String.format( + "%s(%s, %s)", + truncateTransform.name(), truncateTransform.fieldName()[0], truncateTransform.width()); + } + + throw new IllegalArgumentException( + String.format( + "Unsupported transform %s with %d parameters: ", + transform, transform.arguments().length)); + } + + private static String expressionToString(Expression expression) { + if (expression instanceof NamedReference) { + return ((NamedReference) expression).fieldName()[0]; + } else if (expression instanceof Literal<?>) { + return ((Literal<?>) expression).value().toString(); + } + throw new IllegalArgumentException("Unsupported expression: " + expression); + } + + private static String sortOrderToString(SortOrder order) { + Expression orderExpression = order.expression(); + if (!(orderExpression instanceof NamedReference)) { + throw new IllegalArgumentException( + "Only supported sort expression of NamedReference, the expression: " + orderExpression); + } + + String columnName = ((NamedReference) orderExpression).fieldName()[0]; + if (order.direction() == SortDirection.ASCENDING) { + if (order.nullOrdering() == NullOrdering.NULLS_LAST) { + return String.format("%s ASC NULLS LAST", columnName); + } else { + return columnName; + } + } else if (order.direction() == SortDirection.DESCENDING) { + if (order.nullOrdering() == NullOrdering.NULLS_FIRST) { + return String.format("%s DESC NULLS FIRST", columnName); + } else { + return columnName + " DESC"; + } + } + throw new IllegalArgumentException("Unsupported sort order: " + order); + } + + private static void parseSortOrder(List<SortOrder> sortOrders, String value) { + boolean match = + false + || tryMatch( + value, + SROT_ORDER_PATTERN, + (m) -> { + NamedReference.FieldReference sortField = NamedReference.field(m.group(0)); + sortOrders.add(SortOrders.ascending(sortField)); + }) + || tryMatch( + value, + SORT_ORDER_WITH_SORT_DIRECTION_PATTERN, + (m) -> { + NamedReference.FieldReference sortField = NamedReference.field(m.group(1)); + SortDirection sortDirection = + m.group(1).toUpperCase().equals(SORT_DIRECTION_ASC) + ? SortDirection.ASCENDING + : SortDirection.DESCENDING; + NullOrdering nullOrdering = + sortDirection.equals(SortDirection.ASCENDING) + ? NullOrdering.NULLS_FIRST + : NullOrdering.NULLS_LAST; + sortOrders.add(SortOrders.of(sortField, sortDirection, nullOrdering)); + }) + || tryMatch( + value, + SORT_ORDER_WITH_SORT_DIRECTION_AND_NULL_ORDERING_PATTERN, + (m) -> { + NamedReference.FieldReference sortField = NamedReference.field(m.group(1)); + SortDirection sortDirection = + m.group(2).toUpperCase().equals(SORT_DIRECTION_ASC) + ? SortDirection.ASCENDING + : SortDirection.DESCENDING; + NullOrdering nullOrdering = + m.group(3).toUpperCase().equals(NULL_ORDERING_FIRST) + ? NullOrdering.NULLS_FIRST + : NullOrdering.NULLS_LAST; + sortOrders.add(SortOrders.of(sortField, sortDirection, nullOrdering)); + }); + if (!match) { + throw new IllegalArgumentException("Unparsed expression: " + value); + } + } + + interface MatchHandler { + void invoke(MatchResult matchResult); + } +} diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergMetadataAdapter.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergMetadataAdapter.java index 321bc3bff..27ab016d0 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergMetadataAdapter.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/iceberg/IcebergMetadataAdapter.java @@ -24,21 +24,15 @@ import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.session.PropertyMetadata; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; import org.apache.gravitino.catalog.property.PropertyConverter; -import org.apache.gravitino.rel.expressions.Expression; -import org.apache.gravitino.rel.expressions.NamedReference; import org.apache.gravitino.rel.expressions.sorts.SortOrder; -import org.apache.gravitino.rel.expressions.sorts.SortOrders; import org.apache.gravitino.rel.expressions.transforms.Transform; -import org.apache.gravitino.rel.expressions.transforms.Transforms; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter; import org.apache.gravitino.trino.connector.metadata.GravitinoColumn; import org.apache.gravitino.trino.connector.metadata.GravitinoTable; @@ -126,20 +120,12 @@ public class IcebergMetadataAdapter extends CatalogConnectorMetadataAdapter { new GravitinoTable(schemaName, tableName, columns, comment, properties); if (!partitionColumns.isEmpty()) { - Transform[] partitioning = - partitionColumns.stream().map(Transforms::identity).toArray(Transform[]::new); + Transform[] partitioning = ExpressionUtil.partitionFiledToExpression(partitionColumns); gravitinoTable.setPartitioning(partitioning); } if (!sortColumns.isEmpty()) { - SortOrder[] sorting = - sortColumns.stream() - .map( - sortingColumn -> { - Expression expression = NamedReference.field(sortingColumn); - return SortOrders.ascending(expression); - }) - .toArray(SortOrder[]::new); + SortOrder[] sorting = ExpressionUtil.sortOrderFiledToExpression(sortColumns); gravitinoTable.setSortOrders(sorting); } @@ -158,28 +144,15 @@ public class IcebergMetadataAdapter extends CatalogConnectorMetadataAdapter { Map<String, Object> properties = toTrinoTableProperties(gravitinoTable.getProperties()); if (ArrayUtils.isNotEmpty(gravitinoTable.getPartitioning())) { - // Only support simple partition now like partition by a, b, c. - // Format like partition like partition by year(a), b, c is NOT supported now. properties.put( IcebergPropertyMeta.ICEBERG_PARTITIONING_PROPERTY, - gravitinoTable.getPartitioning().length > 0 - ? Arrays.stream(gravitinoTable.getPartitioning()) - .map(ts -> ((Transform.SingleFieldTransform) ts).fieldName()[0]) - .collect(Collectors.toList()) - : Collections.emptyList()); + ExpressionUtil.expressionToPartitionFiled(gravitinoTable.getPartitioning())); } if (ArrayUtils.isNotEmpty(gravitinoTable.getSortOrders())) { - // Only support the simple format properties.put( IcebergPropertyMeta.ICEBERG_SORTED_BY_PROPERTY, - Arrays.stream(gravitinoTable.getSortOrders()) - .map( - sortOrder -> { - Expression expression = sortOrder.expression(); - return ((NamedReference) expression).fieldName()[0]; - }) - .collect(Collectors.toList())); + ExpressionUtil.expressionToSortOrderFiled(gravitinoTable.getSortOrders())); } return new ConnectorTableMetadata( diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/iceberg/TestExpressionUtil.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/iceberg/TestExpressionUtil.java new file mode 100644 index 000000000..086429f21 --- /dev/null +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/iceberg/TestExpressionUtil.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.trino.connector.catalog.iceberg; + +import io.trino.spi.TrinoException; +import java.util.List; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.sorts.NullOrdering; +import org.apache.gravitino.rel.expressions.sorts.SortDirection; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.sorts.SortOrders; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestExpressionUtil { + + @Test + void testPartitionFiledToExpression() { + List<String> partitionField = List.of("f1"); + Transform[] transforms = ExpressionUtil.partitionFiledToExpression(partitionField); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(Transforms.identity(new String[] {"f1"}), transforms[0]); + + partitionField = List.of("year(f1)"); + transforms = ExpressionUtil.partitionFiledToExpression(partitionField); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(Transforms.year("f1"), transforms[0]); + + partitionField = List.of("MONTH(f2)"); + transforms = ExpressionUtil.partitionFiledToExpression(partitionField); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(Transforms.month("f2"), transforms[0]); + + partitionField = List.of("day(f3)"); + transforms = ExpressionUtil.partitionFiledToExpression(partitionField); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(Transforms.day("f3"), transforms[0]); + + partitionField = List.of("hour(f4)"); + transforms = ExpressionUtil.partitionFiledToExpression(partitionField); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(transforms[0], Transforms.day("f4")); + + partitionField = List.of("bucket(f2,10)"); + transforms = ExpressionUtil.partitionFiledToExpression(partitionField); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(transforms[0], Transforms.bucket(10, new String[] {"f2"})); + + partitionField = List.of("TRUNCATE(f1, 3)"); + transforms = ExpressionUtil.partitionFiledToExpression(partitionField); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(transforms[0], Transforms.truncate(3, new String[] {"f1"})); + + partitionField = List.of("truncate(f1, 3)"); + transforms = ExpressionUtil.partitionFiledToExpression(partitionField); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(transforms[0], Transforms.truncate(3, new String[] {"f1"})); + + partitionField = List.of("month(order_date)", "BUCKET(account_number, 10)", "country"); + transforms = ExpressionUtil.partitionFiledToExpression(partitionField); + Assertions.assertEquals(3, transforms.length); + Assertions.assertEquals(transforms[0], Transforms.month("order_date")); + Assertions.assertEquals(transforms[1], Transforms.bucket(10, new String[] {"account_number"})); + Assertions.assertEquals(transforms[2], Transforms.identity(new String[] {"country"})); + } + + @Test + void testErrorOfPartitionFiledToExpression() { + // test invalid partition field name + Assertions.assertThrows( + TrinoException.class, + () -> { + List<String> partitionField = List.of("12"); + ExpressionUtil.partitionFiledToExpression(partitionField); + }, + "Error parsing partition field"); + + // test no exists partition function name + Assertions.assertThrows( + TrinoException.class, + () -> { + List<String> partitionField = List.of("abs(f1)"); + ExpressionUtil.partitionFiledToExpression(partitionField); + }, + "Error parsing partition field"); + + // test error function arguments + Assertions.assertThrows( + TrinoException.class, + () -> { + List<String> partitionField = List.of("year(f1, f2)"); + ExpressionUtil.partitionFiledToExpression(partitionField); + }, + "Error parsing partition field"); + + // test error function arguments + Assertions.assertThrows( + TrinoException.class, + () -> { + List<String> partitionField = List.of("year(12)"); + ExpressionUtil.partitionFiledToExpression(partitionField); + }, + "Error parsing partition field"); + + Assertions.assertThrows( + TrinoException.class, + () -> { + List<String> partitionField = List.of("buket(f1, f2)"); + ExpressionUtil.partitionFiledToExpression(partitionField); + }, + "Error parsing partition field"); + } + + @Test + void testExpressionToPartitionFiled() { + Transform[] transforms = new Transform[] {Transforms.identity(new String[] {"f1"})}; + List<String> partitionFiled = ExpressionUtil.expressionToPartitionFiled(transforms); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(partitionFiled.get(0), "f1"); + + transforms = new Transform[] {Transforms.year("f1")}; + partitionFiled = ExpressionUtil.expressionToPartitionFiled(transforms); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(partitionFiled.get(0), "year(f1)"); + + transforms = new Transform[] {Transforms.month("f2")}; + partitionFiled = ExpressionUtil.expressionToPartitionFiled(transforms); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(partitionFiled.get(0), "month(f2)"); + + transforms = new Transform[] {Transforms.day("f3")}; + partitionFiled = ExpressionUtil.expressionToPartitionFiled(transforms); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(partitionFiled.get(0), "day(f3)"); + + transforms = new Transform[] {Transforms.hour("f4")}; + partitionFiled = ExpressionUtil.expressionToPartitionFiled(transforms); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(partitionFiled.get(0), "hour(f4)"); + + transforms = new Transform[] {Transforms.bucket(10, new String[] {"f2"})}; + partitionFiled = ExpressionUtil.expressionToPartitionFiled(transforms); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(partitionFiled.get(0), "bucket(f2, 10)"); + + transforms = new Transform[] {Transforms.truncate(3, new String[] {"f1"})}; + partitionFiled = ExpressionUtil.expressionToPartitionFiled(transforms); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(partitionFiled.get(0), "truncate(f1, 3)"); + + transforms = new Transform[] {Transforms.truncate(3, new String[] {"f1"})}; + partitionFiled = ExpressionUtil.expressionToPartitionFiled(transforms); + Assertions.assertEquals(1, transforms.length); + Assertions.assertEquals(partitionFiled.get(0), "truncate(f1, 3)"); + + transforms = + new Transform[] { + Transforms.month("order_date"), + Transforms.bucket(10, new String[] {"account_number"}), + Transforms.identity(new String[] {"country"}) + }; + partitionFiled = ExpressionUtil.expressionToPartitionFiled(transforms); + Assertions.assertEquals(3, transforms.length); + Assertions.assertEquals(partitionFiled.get(0), "month(order_date)"); + Assertions.assertEquals(partitionFiled.get(1), "bucket(account_number, 10)"); + Assertions.assertEquals(partitionFiled.get(2), "country"); + } + + @Test + void testExpressionToSortOrderFiled() { + SortOrder[] sortOrders = new SortOrder[] {SortOrders.ascending(NamedReference.field("f1"))}; + List<String> sortOrderFiled = ExpressionUtil.expressionToSortOrderFiled(sortOrders); + Assertions.assertEquals(1, sortOrders.length); + Assertions.assertEquals("f1", sortOrderFiled.get(0)); + + sortOrders = new SortOrder[] {SortOrders.descending(NamedReference.field("f2"))}; + sortOrderFiled = ExpressionUtil.expressionToSortOrderFiled(sortOrders); + Assertions.assertEquals(1, sortOrders.length); + Assertions.assertEquals("f2 DESC", sortOrderFiled.get(0)); + + sortOrders = + new SortOrder[] { + SortOrders.of( + NamedReference.field("f1"), SortDirection.ASCENDING, NullOrdering.NULLS_LAST) + }; + sortOrderFiled = ExpressionUtil.expressionToSortOrderFiled(sortOrders); + Assertions.assertEquals(1, sortOrders.length); + Assertions.assertEquals("f1 ASC NULLS LAST", sortOrderFiled.get(0)); + + sortOrders = + new SortOrder[] { + SortOrders.of( + NamedReference.field("f2"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST) + }; + sortOrderFiled = ExpressionUtil.expressionToSortOrderFiled(sortOrders); + Assertions.assertEquals(1, sortOrders.length); + Assertions.assertEquals("f2 DESC NULLS FIRST", sortOrderFiled.get(0)); + + sortOrders = + new SortOrder[] { + SortOrders.ascending(NamedReference.field("f1")), + SortOrders.descending(NamedReference.field("f2")), + SortOrders.of( + NamedReference.field("f3"), SortDirection.ASCENDING, NullOrdering.NULLS_LAST), + SortOrders.of( + NamedReference.field("f4"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST) + }; + sortOrderFiled = ExpressionUtil.expressionToSortOrderFiled(sortOrders); + Assertions.assertEquals(4, sortOrders.length); + Assertions.assertEquals("f1", sortOrderFiled.get(0)); + Assertions.assertEquals("f2 DESC", sortOrderFiled.get(1)); + Assertions.assertEquals("f3 ASC NULLS LAST", sortOrderFiled.get(2)); + Assertions.assertEquals("f4 DESC NULLS FIRST", sortOrderFiled.get(3)); + } + + @Test + void testSortOrderFiledToExpression() { + List<String> sortOrderFiled = List.of("f1"); + SortOrder[] sortOrders = ExpressionUtil.sortOrderFiledToExpression(sortOrderFiled); + Assertions.assertEquals(1, sortOrders.length); + Assertions.assertEquals(SortOrders.ascending(NamedReference.field("f1")), sortOrders[0]); + + sortOrderFiled = List.of("F2 desc"); + sortOrders = ExpressionUtil.sortOrderFiledToExpression(sortOrderFiled); + Assertions.assertEquals(1, sortOrders.length); + Assertions.assertEquals(SortOrders.descending(NamedReference.field("F2")), sortOrders[0]); + + sortOrderFiled = List.of("f1 ASC NULLS LAST"); + sortOrders = ExpressionUtil.sortOrderFiledToExpression(sortOrderFiled); + Assertions.assertEquals(1, sortOrders.length); + Assertions.assertEquals( + SortOrders.of(NamedReference.field("f1"), SortDirection.ASCENDING, NullOrdering.NULLS_LAST), + sortOrders[0]); + + sortOrderFiled = List.of("f2 desc nulls first"); + sortOrders = ExpressionUtil.sortOrderFiledToExpression(sortOrderFiled); + Assertions.assertEquals(1, sortOrders.length); + Assertions.assertEquals( + SortOrders.of( + NamedReference.field("f2"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST), + sortOrders[0]); + + sortOrderFiled = List.of("f1", "f2 DESC", "f3 ASC NULLS LAST", "F4 DESC NULLS FIRST"); + sortOrders = ExpressionUtil.sortOrderFiledToExpression(sortOrderFiled); + Assertions.assertEquals(4, sortOrders.length); + Assertions.assertEquals(SortOrders.ascending(NamedReference.field("f1")), sortOrders[0]); + Assertions.assertEquals(SortOrders.descending(NamedReference.field("f2")), sortOrders[1]); + Assertions.assertEquals( + SortOrders.of(NamedReference.field("f3"), SortDirection.ASCENDING, NullOrdering.NULLS_LAST), + sortOrders[2]); + Assertions.assertEquals( + SortOrders.of( + NamedReference.field("F4"), SortDirection.DESCENDING, NullOrdering.NULLS_FIRST), + sortOrders[3]); + } + + @Test + void testErrorOfSortOrderFiledToExpression() { + // test invalid sort order field name + Assertions.assertThrows( + TrinoException.class, + () -> { + List<String> sortOrderFields = List.of("12"); + ExpressionUtil.partitionFiledToExpression(sortOrderFields); + }, + "Error parsing partition field"); + + Assertions.assertThrows( + TrinoException.class, + () -> { + List<String> sortOrderFields = List.of("f12", "1"); + ExpressionUtil.partitionFiledToExpression(sortOrderFields); + }, + "Error parsing partition field"); + + // test invalid sort order format + Assertions.assertThrows( + TrinoException.class, + () -> { + List<String> sortOrderFields = List.of("f12 dxxx"); + ExpressionUtil.partitionFiledToExpression(sortOrderFields); + }, + "Error parsing partition field"); + + Assertions.assertThrows( + TrinoException.class, + () -> { + List<String> sortOrderFields = List.of("f12 asc nulls all"); + ExpressionUtil.partitionFiledToExpression(sortOrderFields); + }, + "Error parsing partition field"); + } +}