This is an automated email from the ASF dual-hosted git repository.
jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eb474bce82f [HUDI-8187] Hudi 1.0 reader should be able to read both
1.0 and 0.x tables with custom key generator (#11926)
eb474bce82f is described below
commit eb474bce82fbf36c23641bb5661f6390497300db
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed Sep 25 00:55:51 2024 +0530
[HUDI-8187] Hudi 1.0 reader should be able to read both 1.0 and 0.x tables
with custom key generator (#11926)
[HUDI-8187] Hudi 1.0 reader should be able to read both 1.0 and 0.x tables
with custom key generator
---
.../org/apache/hudi/cli/commands/SparkMain.java | 3 +-
.../apache/hudi/keygen/CustomAvroKeyGenerator.java | 61 ++++++++++++++++------
.../org/apache/hudi/keygen/CustomKeyGenerator.java | 4 +-
.../datasources/SparkParsePartitionUtil.scala | 14 +++--
.../sink/clustering/HoodieFlinkClusteringJob.java | 3 +-
.../scala/org/apache/hudi/BucketIndexSupport.scala | 10 ++--
.../scala/org/apache/hudi/DataSourceOptions.scala | 4 +-
.../scala/org/apache/hudi/HoodieFileIndex.scala | 5 +-
.../hudi/HoodieHadoopFsRelationFactory.scala | 9 +++-
.../apache/hudi/keygen/TestCustomKeyGenerator.java | 32 +++++++++---
.../org/apache/hudi/TestHoodieFileIndex.scala | 6 +++
.../TestSparkSqlWithCustomKeyGenerator.scala | 29 ++++++++--
12 files changed, 134 insertions(+), 46 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 497470ab36f..ec4dbd20bbb 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -471,7 +472,7 @@ public class SparkMain {
metaClient.getTableConfig().getProps().forEach((k, v) ->
propsMap.put(k.toString(), v.toString()));
propsMap.put(HoodieWriteConfig.SKIP_DEFAULT_PARTITION_VALIDATION.key(),
"true");
propsMap.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
metaClient.getTableConfig().getRecordKeyFieldProp());
- propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
metaClient.getTableConfig().getPartitionFieldProp());
+ propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig()).orElse(""));
propsMap.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
metaClient.getTableConfig().getKeyGeneratorClassName());
return propsMap;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index 1aaee836e7e..beeb4947acd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -19,6 +19,7 @@
package org.apache.hudi.keygen;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieKeyException;
@@ -76,8 +77,11 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator
{
return Collections.emptyList(); // Corresponds to no partition case
} else {
return partitionPathFields.stream().map(field -> {
- Pair<String, CustomAvroKeyGenerator.PartitionKeyType> partitionAndType
= getPartitionFieldAndKeyType(field);
- CustomAvroKeyGenerator.PartitionKeyType keyType =
partitionAndType.getRight();
+ Pair<String, Option<CustomAvroKeyGenerator.PartitionKeyType>>
partitionAndType = getPartitionFieldAndKeyType(field);
+ if (partitionAndType.getRight().isEmpty()) {
+ throw getPartitionPathFormatException();
+ }
+ CustomAvroKeyGenerator.PartitionKeyType keyType =
partitionAndType.getRight().get();
String partitionPathField = partitionAndType.getLeft();
switch (keyType) {
case SIMPLE:
@@ -95,35 +99,53 @@ public class CustomAvroKeyGenerator extends
BaseKeyGenerator {
}
}
- public static List<PartitionKeyType> getPartitionTypes(List<String>
partitionPathFields) {
+ /**
+ * Returns list of partition types configured in the partition fields for
custom key generator.
+ *
+ * @param tableConfig Table config where partition fields are configured
+ */
+ public static List<PartitionKeyType> getPartitionTypes(HoodieTableConfig
tableConfig) {
+ List<String> partitionPathFields =
HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(Collections.emptyList());
if (partitionPathFields.size() == 1 &&
partitionPathFields.get(0).isEmpty()) {
return Collections.emptyList(); // Corresponds to no partition case
} else {
- return partitionPathFields.stream().map(field -> {
- Pair<String, CustomAvroKeyGenerator.PartitionKeyType> partitionAndType
= getPartitionFieldAndKeyType(field);
- return partitionAndType.getRight();
- }).collect(Collectors.toList());
+ return partitionPathFields.stream().map(field ->
getPartitionFieldAndKeyType(field).getRight())
+ .filter(Option::isPresent)
+ .map(Option::get)
+ .collect(Collectors.toList());
}
}
- public static List<String> getTimestampFields(List<String>
partitionPathFields) {
- if (partitionPathFields.size() == 1 &&
partitionPathFields.get(0).isEmpty()) {
- return Collections.emptyList(); // Corresponds to no partition case
+ /**
+ * Returns the partition fields with timestamp partition type.
+ *
+ * @param tableConfig Table config where partition fields are configured
+ * @return Optional list of partition fields with timestamp partition type
+ */
+ public static Option<List<String>> getTimestampFields(HoodieTableConfig
tableConfig) {
+ List<String> partitionPathFields =
HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(Collections.emptyList());
+ if (partitionPathFields.isEmpty() || (partitionPathFields.size() == 1 &&
partitionPathFields.get(0).isEmpty())) {
+ return Option.of(Collections.emptyList()); // Corresponds to no
partition case
+ } else if
(getPartitionFieldAndKeyType(partitionPathFields.get(0)).getRight().isEmpty()) {
+ // Partition type is not configured for the partition fields therefore
timestamp partition fields
+ // can not be determined
+ return Option.empty();
} else {
- return partitionPathFields.stream()
+ return Option.of(partitionPathFields.stream()
.map(CustomAvroKeyGenerator::getPartitionFieldAndKeyType)
- .filter(fieldAndKeyType ->
fieldAndKeyType.getRight().equals(PartitionKeyType.TIMESTAMP))
+ .filter(fieldAndKeyType -> fieldAndKeyType.getRight().isPresent() &&
fieldAndKeyType.getRight().get().equals(PartitionKeyType.TIMESTAMP))
.map(Pair::getLeft)
- .collect(Collectors.toList());
+ .collect(Collectors.toList()));
}
}
- public static Pair<String, PartitionKeyType>
getPartitionFieldAndKeyType(String field) {
+ public static Pair<String, Option<PartitionKeyType>>
getPartitionFieldAndKeyType(String field) {
String[] fieldWithType =
field.split(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX);
- if (fieldWithType.length != 2) {
- throw new HoodieKeyException("Unable to find field names for partition
path in proper format");
+ if (fieldWithType.length == 2) {
+ return Pair.of(fieldWithType[0],
Option.of(PartitionKeyType.valueOf(fieldWithType[1].toUpperCase())));
+ } else {
+ return Pair.of(fieldWithType[0], Option.empty());
}
- return Pair.of(fieldWithType[0],
PartitionKeyType.valueOf(fieldWithType[1].toUpperCase()));
}
@Override
@@ -158,6 +180,11 @@ public class CustomAvroKeyGenerator extends
BaseKeyGenerator {
}
}
+ static HoodieKeyGeneratorException getPartitionPathFormatException() {
+ return new HoodieKeyGeneratorException("Unable to find field names for
partition path in proper format. "
+ + "Please specify the partition field names in format
`field1:type1,field2:type2`. Example: `city:simple,ts:timestamp`");
+ }
+
public String getDefaultPartitionPathSeparator() {
return DEFAULT_PARTITION_PATH_SEPARATOR;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index f5d891f04a9..43a30079f65 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -37,6 +37,8 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPathFormatException;
+
/**
* This is a generic implementation of KeyGenerator where users can configure
record key as a single field or a combination of fields. Similarly partition
path can be configured to have multiple
* fields or only one field. This class expects value for prop
"hoodie.datasource.write.partitionpath.field" in a specific format. For example:
@@ -85,7 +87,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
return partitionPathFields.stream().map(field -> {
String[] fieldWithType = field.split(CUSTOM_KEY_GENERATOR_SPLIT_REGEX);
if (fieldWithType.length != 2) {
- throw new HoodieKeyGeneratorException("Unable to find field names
for partition path in proper format");
+ throw getPartitionPathFormatException();
}
String partitionPathField = fieldWithType[0];
CustomAvroKeyGenerator.PartitionKeyType keyType =
CustomAvroKeyGenerator.PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
index 304c2cfd9a5..26cdd5eb05c 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util
import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator,
CustomKeyGenerator, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.hudi.util.JFunction
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{DataType, StringType, StructField,
StructType}
@@ -67,17 +68,20 @@ trait SparkParsePartitionUtil extends Serializable with
Logging {
def getPartitionStructFields(keyGeneratorPartitionFieldsOpt:
util.Option[String], keyGeneratorClassName: String) = {
val partitionFields: Array[StructField] = if
(keyGeneratorPartitionFieldsOpt.isPresent
+ &&
keyGeneratorPartitionFieldsOpt.get().contains(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX)
&&
(classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
||
classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)))
{
val keyGeneratorPartitionFields =
keyGeneratorPartitionFieldsOpt.get().split(BaseKeyGenerator.FIELD_SEPARATOR)
keyGeneratorPartitionFields.map(field =>
CustomAvroKeyGenerator.getPartitionFieldAndKeyType(field))
.map(pair => {
val partitionField = pair.getLeft
- val partitionKeyType = pair.getRight
- partitionKeyType match {
- case PartitionKeyType.SIMPLE =>
nameFieldMap.getOrElse(partitionField, null)
- case PartitionKeyType.TIMESTAMP => if
(shouldUseStringTypeForTimestampPartitionKeyType) StructField(partitionField,
StringType) else nameFieldMap.getOrElse(partitionField, null)
- }
+ val partitionKeyTypeOpt = pair.getRight
+ partitionKeyTypeOpt.map[StructField] {
+ JFunction.toJavaFunction {
+ case PartitionKeyType.SIMPLE =>
nameFieldMap.getOrElse(partitionField, null)
+ case PartitionKeyType.TIMESTAMP => if
(shouldUseStringTypeForTimestampPartitionKeyType) StructField(partitionField,
StringType) else nameFieldMap.getOrElse(partitionField, null)
+ }
+ }.orElse(nameFieldMap.getOrElse(partitionField, null))
})
.filter(structField => structField != null)
.array
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index e0828a0431e..f6fc148c39d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.clustering;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
@@ -185,7 +186,7 @@ public class HoodieFlinkClusteringJob {
conf.setString(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
// set partition field
- conf.setString(FlinkOptions.PARTITION_PATH_FIELD,
metaClient.getTableConfig().getPartitionFieldProp());
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD,
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig()).orElse(""));
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
index 0dd75cf2325..1f535e70cf1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
@@ -17,7 +17,7 @@
package org.apache.hudi
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
@@ -28,7 +28,6 @@ import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.index.bucket.BucketIdentifier
import org.apache.hudi.keygen.KeyGenerator
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-
import org.apache.avro.generic.GenericData
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions
@@ -46,8 +45,11 @@ class BucketIndexSupport(spark: SparkSession,
private val log = LoggerFactory.getLogger(getClass)
- private val keyGenerator =
- HoodieSparkKeyGeneratorFactory.createKeyGenerator(metadataConfig.getProps)
+ private val keyGenerator = {
+ val props = new TypedProperties(metadataConfig.getProps())
+ TypedProperties.putAll(props, metaClient.getTableConfig.getProps)
+ HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
+ }
private lazy val avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema(false)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index ef9ccc9aebc..5239ec9d6b9 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -1011,8 +1011,8 @@ object DataSourceOptionsHelper {
if (!params.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) &&
tableConfig.getRawRecordKeyFieldProp != null) {
missingWriteConfigs ++=
Map(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() ->
tableConfig.getRawRecordKeyFieldProp)
}
- if (!params.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
&& tableConfig.getPartitionFieldProp != null) {
- missingWriteConfigs ++=
Map(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() ->
tableConfig.getPartitionFieldProp)
+ if (!params.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
&&
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).isPresent) {
+ missingWriteConfigs ++=
Map(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() ->
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).get())
}
if (!params.contains(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key())
&& tableConfig.getKeyGeneratorClassName != null) {
missingWriteConfigs ++=
Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() ->
tableConfig.getKeyGeneratorClassName)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index ccca13256f9..5ea4d460714 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -526,7 +526,7 @@ object HoodieFileIndex extends Logging {
if (tableConfig != null) {
properties.setProperty(RECORDKEY_FIELD.key,
tableConfig.getRecordKeyFields.orElse(Array.empty).mkString(","))
properties.setProperty(PRECOMBINE_FIELD.key,
Option(tableConfig.getPreCombineField).getOrElse(""))
- properties.setProperty(PARTITIONPATH_FIELD.key,
tableConfig.getPartitionFields.orElse(Array.apply("")).mkString(","))
+ properties.setProperty(PARTITIONPATH_FIELD.key,
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).orElse(""))
}
properties
@@ -625,8 +625,7 @@ object HoodieFileIndex extends Logging {
Set(0)
} else if
(keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM.getClassName)
||
keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM_AVRO.getClassName)) {
- val partitionFields =
HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(java.util.Collections.emptyList[String]())
- val partitionTypes =
CustomAvroKeyGenerator.getPartitionTypes(partitionFields)
+ val partitionTypes =
CustomAvroKeyGenerator.getPartitionTypes(tableConfig)
var partitionIndexes: Set[Int] = Set.empty
for (i <- 0 until partitionTypes.size()) {
if (partitionTypes.get(i).equals(PartitionKeyType.TIMESTAMP)) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index e6197856b41..65b9dbf0d09 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -94,7 +94,14 @@ abstract class HoodieBaseHadoopFsRelationFactory(val
sqlContext: SQLContext,
tableConfig.getPartitionFields.orElse(Array.empty).toSeq
} else {
//it's custom keygen
-
CustomAvroKeyGenerator.getTimestampFields(HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(java.util.Collections.emptyList[String]())).asScala.toSeq
+ val timestampFieldsOpt =
CustomAvroKeyGenerator.getTimestampFields(tableConfig)
+ if (timestampFieldsOpt.isPresent) {
+ timestampFieldsOpt.get().asScala.toSeq
+ } else {
+ // timestamp fields above are determined using partition type
+ // For older tables the partition type may not be available so falling
back to partition fields in those cases
+ tableConfig.getPartitionFields.orElse(Array.empty).toSeq
+ }
}
protected lazy val (tableAvroSchema: Schema, internalSchemaOpt:
Option[InternalSchema]) = {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index 52eb36bca33..bdf8911695f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -20,6 +20,8 @@ package org.apache.hudi.keygen;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -37,8 +39,6 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.Arrays;
-import java.util.List;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -175,11 +175,31 @@ public class TestCustomKeyGenerator extends
KeyGeneratorTestUtilities {
@Test
public void testCustomKeyGeneratorPartitionType() {
- List<String> partitionFields = Arrays.asList("random:simple",
"ts_ms:timestamp");
- Object[] partitionTypes =
CustomAvroKeyGenerator.getPartitionTypes(partitionFields).toArray();
+ HoodieTableConfig tableConfig = new HoodieTableConfig();
+ tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS.key(),
"random:simple,ts_ms:timestamp");
+ Object[] partitionTypes =
CustomAvroKeyGenerator.getPartitionTypes(tableConfig).toArray();
assertArrayEquals(new CustomAvroKeyGenerator.PartitionKeyType[]
{CustomAvroKeyGenerator.PartitionKeyType.SIMPLE,
CustomAvroKeyGenerator.PartitionKeyType.TIMESTAMP}, partitionTypes);
- Pair<String, CustomAvroKeyGenerator.PartitionKeyType>
partitionFieldAndType =
CustomAvroKeyGenerator.getPartitionFieldAndKeyType("random:simple");
- assertEquals(Pair.of("random",
CustomAvroKeyGenerator.PartitionKeyType.SIMPLE), partitionFieldAndType);
+ Pair<String, Option<CustomAvroKeyGenerator.PartitionKeyType>>
partitionFieldAndType =
CustomAvroKeyGenerator.getPartitionFieldAndKeyType("random:simple");
+ assertEquals(Pair.of("random",
Option.of(CustomAvroKeyGenerator.PartitionKeyType.SIMPLE)),
partitionFieldAndType);
+ }
+
+ @Test
+ public void testCustomKeyGeneratorTimestampFieldsAPI() {
+ HoodieTableConfig tableConfig = new HoodieTableConfig();
+ tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS.key(),
"simple1:simple,ts1:timestamp,ts2:timestamp");
+ Object[] timestampFields =
CustomAvroKeyGenerator.getTimestampFields(tableConfig).get().toArray();
+ // Only two timestamp fields are returned
+ assertArrayEquals(new String[] {"ts1", "ts2"}, timestampFields);
+
+ tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS.key(),
"simple1,ts1,ts2");
+ Option<?> timestampFieldsOpt =
CustomAvroKeyGenerator.getTimestampFields(tableConfig);
+ // Empty option is returned since no partition type is available
+ assertTrue(timestampFieldsOpt.isEmpty());
+
+ tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS.key(),
"simple1:simple,simple2:simple,simple3:simple");
+ timestampFields =
CustomAvroKeyGenerator.getTimestampFields(tableConfig).get().toArray();
+ // No timestamp partitions
+ assertEquals(0, timestampFields.length);
}
public void testTimestampBasedKeyGenerator(TypedProperties props) throws
IOException {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index fc52ff685f2..ccc322b79ff 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -841,6 +841,12 @@ class TestHoodieFileIndex extends
HoodieSparkClientTestBase with ScalaAssertionS
assertEquals(expectedPartitionSchema,
SparkAdapterSupport.sparkAdapter.getSparkParsePartitionUtil.getPartitionSchema(tableConfig,
schema, shouldUseStringTypeForTimestampPartitionKeyType = false))
+ tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS, "f1,f2")
+ expectedPartitionSchema = StructType.apply(List(fields(0), fields(1)))
+ // With custom key generator handling, timestamp partition field f2 would
have input schema type with old partition format
+ assertEquals(expectedPartitionSchema,
SparkAdapterSupport.sparkAdapter.getSparkParsePartitionUtil.getPartitionSchema(tableConfig,
+ schema, shouldUseStringTypeForTimestampPartitionKeyType = true))
+
tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_TYPE,
KeyGeneratorType.COMPLEX.name())
tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS, "f1,f2")
// With other key generators, timestamp partition field f2 would have
input schema type
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
index ab9ae27c3fd..46da43264fc 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
@@ -23,7 +23,7 @@ import org.apache.avro.Schema
import
org.apache.hudi.DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.table.TableSchemaResolver
+import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.functional.TestSparkSqlWithCustomKeyGenerator._
@@ -52,22 +52,27 @@ class TestSparkSqlWithCustomKeyGenerator extends
HoodieSparkSqlTestBase {
"(ts=202401, segment='cat2')", "202401/cat2",
Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3",
"202402/cat1", "202402/cat3", "202402/cat5"),
TS_FORMATTER_FUNC,
- (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) +
"/" + segment),
+ (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) +
"/" + segment, false),
Seq("MERGE_ON_READ", "segment:simple",
"(segment='cat3')", "cat3",
Seq("cat1", "cat2", "cat4", "cat5"),
TS_TO_STRING_FUNC,
- (_: Integer, segment: String) => segment),
+ (_: Integer, segment: String) => segment, false),
Seq("MERGE_ON_READ", "ts:timestamp",
"(ts=202312)", "202312",
Seq("202401", "202402"),
TS_FORMATTER_FUNC,
- (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts)),
+ (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts), false),
Seq("MERGE_ON_READ", "ts:timestamp,segment:simple",
"(ts=202401, segment='cat2')", "202401/cat2",
Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3",
"202402/cat1", "202402/cat3", "202402/cat5"),
TS_FORMATTER_FUNC,
- (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) +
"/" + segment)
+ (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) +
"/" + segment, false),
+ Seq("MERGE_ON_READ", "ts:timestamp,segment:simple",
+ "(ts=202401, segment='cat2')", "202401/cat2",
+ Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3",
"202402/cat1", "202402/cat3", "202402/cat5"),
+ TS_FORMATTER_FUNC,
+ (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) +
"/" + segment, true)
).foreach { testParams =>
withTable(generateTableName) { tableName =>
LOG.warn("Testing with parameters: " + testParams)
@@ -89,11 +94,21 @@ class TestSparkSqlWithCustomKeyGenerator extends
HoodieSparkSqlTestBase {
} else {
""
}
+ val useOlderPartitionFieldFormat =
testParams(7).asInstanceOf[Boolean]
prepareTableWithKeyGenerator(
tableName, tablePath, tableType,
CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields,
timestampKeyGeneratorConfig)
+ if (useOlderPartitionFieldFormat) {
+ var metaClient = createMetaClient(spark, tablePath)
+ val props = new TypedProperties()
+ props.put(HoodieTableConfig.PARTITION_FIELDS.key(),
metaClient.getTableConfig.getPartitionFieldProp)
+ HoodieTableConfig.update(metaClient.getStorage,
metaClient.getMetaPath, props)
+ metaClient = createMetaClient(spark, tablePath)
+ assertEquals(metaClient.getTableConfig.getPartitionFieldProp,
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse(""))
+ }
+
// SQL CTAS with table properties containing key generator write
configs
createTableWithSql(tableName, tablePath,
s"hoodie.datasource.write.partitionpath.field =
'$writePartitionFields'" + timestampKeyGenProps)
@@ -244,6 +259,10 @@ class TestSparkSqlWithCustomKeyGenerator extends
HoodieSparkSqlTestBase {
// Validate ts field is still of type int in the table
validateTsFieldSchema(tablePath, "ts", Schema.Type.INT)
+ if (useOlderPartitionFieldFormat) {
+ val metaClient = createMetaClient(spark, tablePath)
+ assertEquals(metaClient.getTableConfig.getPartitionFieldProp,
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse(""))
+ }
}
}
}