This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 74e7dd4786 [#1570] feat(spark-connector): Support partition management
(#7067)
74e7dd4786 is described below
commit 74e7dd47862aecab2d1884cdcf86e7f05083722c
Author: tian bao <[email protected]>
AuthorDate: Thu May 15 20:18:34 2025 +0800
[#1570] feat(spark-connector): Support partition management (#7067)
### What changes were proposed in this pull request?
Supports operations (add,list,drop) on the following field types:
- STRING, CHAR, VARCHAR
- INT, TINYINT, SMALLINT, BIGINT, FLOAT, DOUBLE, DECIMAL
- DATE
- BOOLEAN
Not Supports operations yet:
TIMESTAMP and BINARY(There should be very few people using it)
load not supports beacause spark datasource v2 do not support it.
### Why are the changes needed?
Support partition manage.
https://github.com/apache/gravitino/issues/1570
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
1. UT for partition convert.
2. IT test for partition sql.
3. Spark sql tests.
---
.../spark/connector/hive/SparkHiveTable.java | 75 ++++++++-
.../utils/HiveGravitinoOperationOperator.java | 178 +++++++++++++++++++++
.../spark/connector/utils/SparkPartitionUtils.java | 161 +++++++++++++++++++
.../integration/test/hive/SparkHiveCatalogIT.java | 51 ++++++
.../integration/test/util/SparkUtilIT.java | 4 +
.../connector/utils/TestSparkPartitionUtils.java | 153 ++++++++++++++++++
6 files changed, 621 insertions(+), 1 deletion(-)
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTable.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTable.java
index 784d493a5c..fbe684aefc 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTable.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTable.java
@@ -19,23 +19,30 @@
package org.apache.gravitino.spark.connector.hive;
+import com.google.common.base.Preconditions;
import java.util.Map;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.spark.connector.PropertiesConverter;
import org.apache.gravitino.spark.connector.SparkTransformConverter;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
import org.apache.gravitino.spark.connector.utils.GravitinoTableInfoHelper;
+import
org.apache.gravitino.spark.connector.utils.HiveGravitinoOperationOperator;
import org.apache.kyuubi.spark.connector.hive.HiveTable;
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
+import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
/** Keep consistent behavior with the SparkIcebergTable */
-public class SparkHiveTable extends HiveTable {
+public class SparkHiveTable extends HiveTable implements
SupportsPartitionManagement {
private GravitinoTableInfoHelper gravitinoTableInfoHelper;
+ private HiveGravitinoOperationOperator hiveGravitinoOperationOperator;
public SparkHiveTable(
Identifier identifier,
@@ -54,6 +61,7 @@ public class SparkHiveTable extends HiveTable {
propertiesConverter,
sparkTransformConverter,
sparkTypeConverter);
+ this.hiveGravitinoOperationOperator = new
HiveGravitinoOperationOperator(gravitinoTable);
}
@Override
@@ -76,4 +84,69 @@ public class SparkHiveTable extends HiveTable {
public Transform[] partitioning() {
return gravitinoTableInfoHelper.partitioning();
}
+
+ @Override
+ public void createPartition(InternalRow ident, Map<String, String>
properties)
+ throws PartitionAlreadyExistsException, UnsupportedOperationException {
+ hiveGravitinoOperationOperator.createPartition(ident, properties,
partitionSchema());
+ }
+
+ @Override
+ public boolean dropPartition(InternalRow ident) {
+ return hiveGravitinoOperationOperator.dropPartition(ident,
partitionSchema());
+ }
+
+ @Override
+ public void replacePartitionMetadata(InternalRow ident, Map<String, String>
properties)
+ throws NoSuchPartitionException, UnsupportedOperationException {
+ throw new UnsupportedOperationException("Replace partition is not
supported");
+ }
+
+ @Override
+ public Map<String, String> loadPartitionMetadata(InternalRow ident)
+ throws UnsupportedOperationException {
+ return hiveGravitinoOperationOperator.loadPartitionMetadata(ident,
partitionSchema());
+ }
+
+ @Override
+ public InternalRow[] listPartitionIdentifiers(String[] names, InternalRow
ident) {
+ return hiveGravitinoOperationOperator.listPartitionIdentifiers(names,
ident, partitionSchema());
+ }
+
+ @Override
+ public boolean partitionExists(InternalRow ident) {
+ String[] partitionNames = partitionSchema().names();
+ Preconditions.checkArgument(
+ ident.numFields() == partitionNames.length,
+ String.format(
+ "The number of fields (%d) in the partition identifier is not
equal to "
+ + "the partition schema length (%d). "
+ + "The identifier might not refer to one partition.",
+ ident.numFields(), partitionNames.length));
+
+ return hiveGravitinoOperationOperator.partitionExists(partitionNames,
ident, partitionSchema());
+ }
+
+ @Override
+ public Object productElement(int n) {
+ if (n == 0) {
+ return gravitinoTableInfoHelper;
+ }
+
+ if (n == 1) {
+ return hiveGravitinoOperationOperator;
+ }
+
+ throw new IndexOutOfBoundsException("Invalid index: " + n);
+ }
+
+ @Override
+ public int productArity() {
+ return 2;
+ }
+
+ @Override
+ public boolean canEqual(Object that) {
+ return that instanceof SparkHiveTable;
+ }
}
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/HiveGravitinoOperationOperator.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/HiveGravitinoOperationOperator.java
new file mode 100644
index 0000000000..a88cdfbf70
--- /dev/null
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/HiveGravitinoOperationOperator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.jetbrains.annotations.NotNull;
+
+public class HiveGravitinoOperationOperator {
+
+ private org.apache.gravitino.rel.Table gravitinoTable;
+ private static final String PARTITION_NAME_DELIMITER = "/";
+ private static final String PARTITION_VALUE_DELIMITER = "=";
+
+ public HiveGravitinoOperationOperator(org.apache.gravitino.rel.Table
gravitinoTable) {
+ this.gravitinoTable = gravitinoTable;
+ }
+
+ public void createPartition(
+ InternalRow ident, Map<String, String> properties, StructType
partitionSchema)
+ throws PartitionAlreadyExistsException {
+ List<String[]> fields = new ArrayList<>();
+ List<Literal<?>> values = new ArrayList<>();
+
+ int numFields = ident.numFields();
+ for (int i = 0; i < numFields; i++) {
+ StructField structField = partitionSchema.apply(i);
+ DataType dataType = structField.dataType();
+ fields.add(new String[] {structField.name()});
+ values.add(SparkPartitionUtils.toGravitinoLiteral(ident, i, dataType));
+ }
+
+ Partition partition =
+ Partitions.identity(
+ null, fields.toArray(new String[0][0]), values.toArray(new
Literal[0]), properties);
+
+ try {
+ gravitinoTable.supportPartitions().addPartition(partition);
+ } catch (org.apache.gravitino.exceptions.PartitionAlreadyExistsException
e) {
+ throw new
org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException(
+ e.getMessage());
+ }
+ }
+
+ public boolean dropPartition(InternalRow ident, StructType partitionSchema) {
+ String partitionName = getHivePartitionName(ident, partitionSchema);
+ return gravitinoTable.supportPartitions().dropPartition(partitionName);
+ }
+
+ public InternalRow[] listPartitionIdentifiers(
+ String[] names, InternalRow ident, StructType partitionSchema) {
+ // Get all partitions
+ String[] allPartitions =
gravitinoTable.supportPartitions().listPartitionNames();
+
+ boolean isNeedAll = names != null && names.length == 0 ? true : false;
+
+ String[] partitionNames =
+ getHivePartitionName(names, ident,
partitionSchema).split(PARTITION_NAME_DELIMITER);
+ String[] partitionNamesWithDelimiter =
+ Arrays.stream(partitionNames)
+ .map(name -> name + PARTITION_NAME_DELIMITER)
+ .toArray(String[]::new);
+
+ return Arrays.stream(allPartitions)
+ .map(e -> e + PARTITION_NAME_DELIMITER)
+ .filter(
+ e -> {
+ if (isNeedAll) {
+ return true;
+ }
+ for (String name : partitionNamesWithDelimiter) {
+ // exactly match
+ if (!e.contains(name)) {
+ return false;
+ }
+ }
+ return true;
+ })
+ .map(e -> e.substring(0, e.length() - 1))
+ .map(e -> toSparkPartition(e, partitionSchema))
+ .toArray(GenericInternalRow[]::new);
+ }
+
+ public Map<String, String> loadPartitionMetadata(InternalRow ident,
StructType partitionSchema) {
+ String partitionName = getHivePartitionName(ident, partitionSchema);
+ Partition partition =
gravitinoTable.supportPartitions().getPartition(partitionName);
+ return partition == null ? Collections.emptyMap() : partition.properties();
+ }
+
+ public boolean partitionExists(String[] names, InternalRow ident, StructType
partitionSchema) {
+ // Get all partitions
+ if (names != null && names.length == 0) {
+ return gravitinoTable.supportPartitions().listPartitionNames().length >
0;
+ }
+
+ String partitionName = getHivePartitionName(names, ident, partitionSchema);
+ try {
+ return gravitinoTable.supportPartitions().partitionExists(partitionName);
+ } catch (NoSuchPartitionException noSuchPartitionException) {
+ return false;
+ }
+ }
+
+ private InternalRow toSparkPartition(String partitionName, StructType
partitionSchema) {
+ String[] splits = partitionName.split(PARTITION_NAME_DELIMITER);
+ Object[] values = new Object[splits.length];
+ for (int i = 0; i < splits.length; i++) {
+ values[i] =
+ SparkPartitionUtils.getSparkPartitionValue(
+ splits[i].split(PARTITION_VALUE_DELIMITER)[1],
partitionSchema.apply(i).dataType());
+ }
+ return new GenericInternalRow(values);
+ }
+
+ private @NotNull String getHivePartitionName(
+ String[] names, InternalRow ident, StructType partitionSchema) {
+ StringBuilder partitionName = new StringBuilder();
+ for (int i = 0; i < names.length; i++) {
+ StructField structField = partitionSchema.apply(i);
+ DataType dataType = structField.dataType();
+ partitionName.append(
+ names[i]
+ + PARTITION_VALUE_DELIMITER
+ + SparkPartitionUtils.getPartitionValueAsString(ident, i,
dataType));
+ if (i < names.length - 1) {
+ partitionName.append(PARTITION_NAME_DELIMITER);
+ }
+ }
+ return partitionName.toString();
+ }
+
+ private @NotNull String getHivePartitionName(InternalRow ident, StructType
partitionSchema) {
+ StringBuilder partitionName = new StringBuilder();
+ int numFields = ident.numFields();
+ for (int i = 0; i < numFields; i++) {
+ StructField structField = partitionSchema.apply(i);
+ DataType dataType = structField.dataType();
+ partitionName.append(
+ structField.name()
+ + PARTITION_VALUE_DELIMITER
+ + SparkPartitionUtils.getPartitionValueAsString(ident, i,
dataType));
+ if (i < numFields - 1) {
+ partitionName.append(PARTITION_NAME_DELIMITER);
+ }
+ }
+ return partitionName.toString();
+ }
+}
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/SparkPartitionUtils.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/SparkPartitionUtils.java
new file mode 100644
index 0000000000..a8e88b6b18
--- /dev/null
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/SparkPartitionUtils.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.utils;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.CharType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.VarcharType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class SparkPartitionUtils {
+
+ private SparkPartitionUtils() {}
+
+ public static Literal<?> toGravitinoLiteral(InternalRow ident, int ordinal,
DataType sparkType) {
+ if (sparkType instanceof ByteType) {
+ return Literals.byteLiteral(ident.getByte(ordinal));
+ } else if (sparkType instanceof ShortType) {
+ return Literals.shortLiteral(ident.getShort(ordinal));
+ } else if (sparkType instanceof IntegerType) {
+ return Literals.integerLiteral(ident.getInt(ordinal));
+ } else if (sparkType instanceof LongType) {
+ return Literals.longLiteral(ident.getLong(ordinal));
+ } else if (sparkType instanceof FloatType) {
+ return Literals.floatLiteral(ident.getFloat(ordinal));
+ } else if (sparkType instanceof DoubleType) {
+ return Literals.doubleLiteral(ident.getDouble(ordinal));
+ } else if (sparkType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) sparkType;
+ org.apache.spark.sql.types.Decimal decimal =
+ ident.getDecimal(ordinal, decimalType.precision(),
decimalType.scale());
+ return Literals.decimalLiteral(
+
org.apache.gravitino.rel.types.Decimal.of(decimal.toJavaBigDecimal()));
+ } else if (sparkType instanceof StringType) {
+ return Literals.stringLiteral(ident.getString(ordinal));
+ } else if (sparkType instanceof VarcharType) {
+ VarcharType varcharType = (VarcharType) sparkType;
+ return Literals.varcharLiteral(varcharType.length(),
ident.getString(ordinal));
+ } else if (sparkType instanceof CharType) {
+ CharType charType = (CharType) sparkType;
+ return Literals.of(ident.get(ordinal, sparkType),
Types.FixedCharType.of(charType.length()));
+ } else if (sparkType instanceof BooleanType) {
+ return Literals.booleanLiteral(ident.getBoolean(ordinal));
+ } else if (sparkType instanceof DateType) {
+ LocalDate localDate = LocalDate.ofEpochDay(ident.getInt(ordinal));
+ return Literals.dateLiteral(localDate);
+ }
+ throw new UnsupportedOperationException("Not support " +
sparkType.toString());
+ }
+
+ public static String getPartitionValueAsString(
+ InternalRow ident, int ordinal, DataType dataType) {
+ if (ident.isNullAt(ordinal)) {
+ return null;
+ }
+ if (dataType instanceof ByteType) {
+ return String.valueOf(ident.getByte(ordinal));
+ } else if (dataType instanceof ShortType) {
+ return String.valueOf(ident.getShort(ordinal));
+ } else if (dataType instanceof IntegerType) {
+ return String.valueOf(ident.getInt(ordinal));
+ } else if (dataType instanceof StringType) {
+ return ident.getUTF8String(ordinal).toString();
+ } else if (dataType instanceof VarcharType) {
+ return ident.get(ordinal, dataType).toString();
+ } else if (dataType instanceof CharType) {
+ return ident.get(ordinal, dataType).toString();
+ } else if (dataType instanceof DateType) {
+ // DateType spark use int store.
+ LocalDate localDate = LocalDate.ofEpochDay(ident.getInt(ordinal));
+ return localDate.format(DateTimeFormatter.ISO_LOCAL_DATE);
+ } else if (dataType instanceof BooleanType) {
+ return String.valueOf(ident.getBoolean(ordinal));
+ } else if (dataType instanceof LongType) {
+ return String.valueOf(ident.getLong(ordinal));
+ } else if (dataType instanceof DoubleType) {
+ return String.valueOf(ident.getDouble(ordinal));
+ } else if (dataType instanceof FloatType) {
+ return String.valueOf(ident.getFloat(ordinal));
+ } else if (dataType instanceof DecimalType) {
+ return ident
+ .getDecimal(
+ ordinal, ((DecimalType) dataType).precision(), ((DecimalType)
dataType).scale())
+ .toString();
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Unsupported partition column type: %s", dataType));
+ }
+ }
+
+ public static Object getSparkPartitionValue(String hivePartitionValue,
DataType dataType) {
+ if (hivePartitionValue == null) {
+ return null;
+ }
+ try {
+ if (dataType instanceof ByteType) {
+ return Byte.valueOf(hivePartitionValue);
+ } else if (dataType instanceof ShortType) {
+ return Short.valueOf(hivePartitionValue);
+ } else if (dataType instanceof IntegerType) {
+ return Integer.parseInt(hivePartitionValue);
+ } else if (dataType instanceof LongType) {
+ return Long.parseLong(hivePartitionValue);
+ } else if (dataType instanceof StringType) {
+ return UTF8String.fromString(hivePartitionValue);
+ } else if (dataType instanceof DateType) {
+ LocalDate localDate = LocalDate.parse(hivePartitionValue);
+ // DateType spark use int store.
+ return (int) localDate.toEpochDay();
+ } else if (dataType instanceof BooleanType) {
+ return Boolean.parseBoolean(hivePartitionValue);
+ } else if (dataType instanceof DoubleType) {
+ return Double.parseDouble(hivePartitionValue);
+ } else if (dataType instanceof FloatType) {
+ return Float.parseFloat(hivePartitionValue);
+ } else if (dataType instanceof DecimalType) {
+ return Decimal.apply(hivePartitionValue);
+ } else {
+ throw new UnsupportedOperationException("Unsupported partition type: "
+ dataType);
+ }
+ } catch (Exception e) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Failed to convert partition value '%s' to type %s",
hivePartitionValue, dataType),
+ e);
+ }
+ }
+}
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
index 64c286da39..09d6f902ec 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
@@ -129,6 +129,57 @@ public abstract class SparkHiveCatalogIT extends
SparkCommonIT {
checkPartitionDirExists(tableInfo);
}
+ @Test
+ void testManagePartitionTable() {
+ String tableName = "hive_partition_ops_table";
+
+ dropTableIfExists(tableName);
+ String createTableSQL = getCreateSimpleTableString(tableName);
+ createTableSQL = createTableSQL + "PARTITIONED BY (age_p1 INT, age_p2
STRING)";
+ sql(createTableSQL);
+
+ List<Object[]> partitionInfo = getTablePartitions(tableName);
+ Assertions.assertEquals(0, partitionInfo.size());
+
+ sql("ALTER TABLE " + tableName + " ADD PARTITION (age_p1=20,
age_p2='twenty')");
+ sql("ALTER TABLE " + tableName + " ADD PARTITION (age_p1=21,
age_p2='twenty one')");
+ partitionInfo = getTablePartitions(tableName);
+ Assertions.assertEquals(2, partitionInfo.size());
+ Assertions.assertEquals("age_p1=20/age_p2=twenty",
partitionInfo.get(0)[0]);
+ Assertions.assertEquals("age_p1=21/age_p2=twenty one",
partitionInfo.get(1)[0]);
+
+ sql("ALTER TABLE " + tableName + " DROP PARTITION (age_p1=20,
age_p2='twenty')");
+ partitionInfo = getTablePartitions(tableName);
+ Assertions.assertEquals(1, partitionInfo.size());
+ Assertions.assertEquals("age_p1=21/age_p2=twenty one",
partitionInfo.get(0)[0]);
+
+ sql(
+ "ALTER TABLE "
+ + tableName
+ + " ADD PARTITION (age_p1=22, age_p2='twenty two') "
+ + "LOCATION
'/user/hive/warehouse/hive_partition_ops_table/age_p1=22/age_p2=twentytwo' ");
+ partitionInfo = getTablePartitions(tableName);
+ Assertions.assertEquals(2, partitionInfo.size());
+ Assertions.assertEquals("age_p1=21/age_p2=twenty one",
partitionInfo.get(0)[0]);
+ Assertions.assertEquals("age_p1=22/age_p2=twenty two",
partitionInfo.get(1)[0]);
+
+ partitionInfo = sql("SHOW PARTITIONS " + tableName + " PARTITION
(age_p1=21)");
+ Assertions.assertEquals(1, partitionInfo.size());
+ Assertions.assertEquals("age_p1=21/age_p2=twenty one",
partitionInfo.get(0)[0]);
+
+ // test exactly match
+ partitionInfo = sql("SHOW PARTITIONS " + tableName + " PARTITION
(age_p1=2)");
+ Assertions.assertEquals(0, partitionInfo.size());
+
+ Exception exception =
+ Assertions.assertThrows(
+ Exception.class,
+ () -> {
+ sql("ALTER TABLE " + tableName + " ADD PARTITION (age_p1=21,
age_p2='twenty one')");
+ });
+ Assertions.assertTrue(exception.getMessage().contains("Partition already
exists"));
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWriteHiveDynamicPartition(boolean isInsertOverWrite) {
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
index 5c188f5800..f5f480ff28 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
@@ -138,6 +138,10 @@ public abstract class SparkUtilIT extends BaseIT {
return SparkTableInfo.create(table.table());
}
+ protected List<Object[]> getTablePartitions(String tableName) {
+ return sql("SHOW PARTITIONS " + tableName);
+ }
+
protected void dropTableIfExists(String tableName) {
sql("DROP TABLE IF EXISTS " + tableName);
}
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/utils/TestSparkPartitionUtils.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/utils/TestSparkPartitionUtils.java
new file mode 100644
index 0000000000..1b9737b844
--- /dev/null
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/utils/TestSparkPartitionUtils.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.utils;
+
+import java.time.LocalDate;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public class TestSparkPartitionUtils {
+
+ boolean boolValue = true;
+ byte byteValue = 100;
+ short shortValue = 1000;
+ int intValue = 100000;
+ long longValue = 10000000000L;
+ float floatValue = 3.14f;
+ double doubleValue = 3.1415926535;
+ UTF8String stringValue = UTF8String.fromString("Hello World");
+ int date = 0;
+ private InternalRow internalRow =
+ new GenericInternalRow(
+ new Object[] {
+ boolValue,
+ byteValue,
+ shortValue,
+ intValue,
+ longValue,
+ floatValue,
+ doubleValue,
+ stringValue,
+ date
+ });
+ private Literal[] literals =
+ new Literals.LiteralImpl[] {
+ Literals.booleanLiteral(boolValue),
+ Literals.byteLiteral(byteValue),
+ Literals.shortLiteral(shortValue),
+ Literals.integerLiteral(intValue),
+ Literals.longLiteral(longValue),
+ Literals.floatLiteral(floatValue),
+ Literals.doubleLiteral(doubleValue),
+ Literals.stringLiteral(stringValue.toString()),
+ Literals.dateLiteral(LocalDate.of(1970, 1, 1)),
+ };
+ private String[] hivePartitionValues = {
+ "true",
+ "100",
+ "1000",
+ "100000",
+ "10000000000",
+ "3.14",
+ "3.1415926535",
+ "Hello World",
+ "1970-01-01"
+ };
+ StructType schema =
+ new StructType(
+ new StructField[] {
+ new StructField("boolean", DataTypes.BooleanType, false,
Metadata.empty()),
+ new StructField("byte", DataTypes.ByteType, false,
Metadata.empty()),
+ new StructField("short", DataTypes.ShortType, false,
Metadata.empty()),
+ new StructField("int", DataTypes.IntegerType, false,
Metadata.empty()),
+ new StructField("long", DataTypes.LongType, false,
Metadata.empty()),
+ new StructField("float", DataTypes.FloatType, false,
Metadata.empty()),
+ new StructField("double", DataTypes.DoubleType, false,
Metadata.empty()),
+ new StructField("string", DataTypes.StringType, false,
Metadata.empty()),
+ new StructField("date", DataTypes.DateType, false,
Metadata.empty())
+ });
+
+ @Test
+ void testToGravitinoLiteral() {
+ int numFields = internalRow.numFields();
+ for (int i = 0; i < numFields; i++) {
+ DataType dataType = schema.apply(i).dataType();
+ Assertions.assertEquals(
+ literals[i], SparkPartitionUtils.toGravitinoLiteral(internalRow, i,
dataType));
+ }
+
+ Assertions.assertThrowsExactly(
+ UnsupportedOperationException.class,
+ () ->
+ SparkPartitionUtils.toGravitinoLiteral(
+ new GenericInternalRow(new Object[] {"1970-01-01 00:00:00"}),
+ 0,
+ DataTypes.TimestampType));
+ }
+
+ @Test
+ void testGetPartitionValueAsString() {
+ int numFields = internalRow.numFields();
+ for (int i = 0; i < numFields; i++) {
+ DataType dataType = schema.apply(i).dataType();
+ Assertions.assertEquals(
+ hivePartitionValues[i],
+ SparkPartitionUtils.getPartitionValueAsString(internalRow, i,
dataType));
+ }
+
+ Assertions.assertThrowsExactly(
+ UnsupportedOperationException.class,
+ () ->
+ SparkPartitionUtils.getPartitionValueAsString(
+ new GenericInternalRow(new Object[] {"1970-01-01 00:00:00"}),
+ 0,
+ DataTypes.TimestampType));
+ }
+
+ @Test
+ void testGetSparkPartitionValue() {
+ int numFields = internalRow.numFields();
+ for (int i = 0; i < numFields; i++) {
+ DataType dataType = schema.apply(i).dataType();
+ Assertions.assertEquals(
+ internalRow.get(i, dataType),
+ SparkPartitionUtils.getSparkPartitionValue(hivePartitionValues[i],
dataType));
+ }
+
+ Assertions.assertThrowsExactly(
+ UnsupportedOperationException.class,
+ () ->
+ SparkPartitionUtils.getSparkPartitionValue(
+ "1970-01-01 00:00:00", DataTypes.TimestampType));
+ }
+}