This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eb474bce82f [HUDI-8187] Hudi 1.0 reader should be able to read both 
1.0 and 0.x tables with custom key generator (#11926)
eb474bce82f is described below

commit eb474bce82fbf36c23641bb5661f6390497300db
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed Sep 25 00:55:51 2024 +0530

    [HUDI-8187] Hudi 1.0 reader should be able to read both 1.0 and 0.x tables 
with custom key generator (#11926)
    
    [HUDI-8187] Hudi 1.0 reader should be able to read both 1.0 and 0.x tables 
with custom key generator
---
 .../org/apache/hudi/cli/commands/SparkMain.java    |  3 +-
 .../apache/hudi/keygen/CustomAvroKeyGenerator.java | 61 ++++++++++++++++------
 .../org/apache/hudi/keygen/CustomKeyGenerator.java |  4 +-
 .../datasources/SparkParsePartitionUtil.scala      | 14 +++--
 .../sink/clustering/HoodieFlinkClusteringJob.java  |  3 +-
 .../scala/org/apache/hudi/BucketIndexSupport.scala | 10 ++--
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  4 +-
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |  5 +-
 .../hudi/HoodieHadoopFsRelationFactory.scala       |  9 +++-
 .../apache/hudi/keygen/TestCustomKeyGenerator.java | 32 +++++++++---
 .../org/apache/hudi/TestHoodieFileIndex.scala      |  6 +++
 .../TestSparkSqlWithCustomKeyGenerator.scala       | 29 ++++++++--
 12 files changed, 134 insertions(+), 46 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 497470ab36f..ec4dbd20bbb 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -471,7 +472,7 @@ public class SparkMain {
     metaClient.getTableConfig().getProps().forEach((k, v) -> 
propsMap.put(k.toString(), v.toString()));
     propsMap.put(HoodieWriteConfig.SKIP_DEFAULT_PARTITION_VALIDATION.key(), 
"true");
     propsMap.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
metaClient.getTableConfig().getRecordKeyFieldProp());
-    propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
metaClient.getTableConfig().getPartitionFieldProp());
+    propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig()).orElse(""));
     propsMap.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), 
metaClient.getTableConfig().getKeyGeneratorClassName());
     return propsMap;
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index 1aaee836e7e..beeb4947acd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.keygen;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieKeyException;
@@ -76,8 +77,11 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator 
{
       return Collections.emptyList(); // Corresponds to no partition case
     } else {
       return partitionPathFields.stream().map(field -> {
-        Pair<String, CustomAvroKeyGenerator.PartitionKeyType> partitionAndType 
= getPartitionFieldAndKeyType(field);
-        CustomAvroKeyGenerator.PartitionKeyType keyType = 
partitionAndType.getRight();
+        Pair<String, Option<CustomAvroKeyGenerator.PartitionKeyType>> 
partitionAndType = getPartitionFieldAndKeyType(field);
+        if (partitionAndType.getRight().isEmpty()) {
+          throw getPartitionPathFormatException();
+        }
+        CustomAvroKeyGenerator.PartitionKeyType keyType = 
partitionAndType.getRight().get();
         String partitionPathField = partitionAndType.getLeft();
         switch (keyType) {
           case SIMPLE:
@@ -95,35 +99,53 @@ public class CustomAvroKeyGenerator extends 
BaseKeyGenerator {
     }
   }
 
-  public static List<PartitionKeyType> getPartitionTypes(List<String> 
partitionPathFields) {
+  /**
+   * Returns list of partition types configured in the partition fields for 
custom key generator.
+   *
+   * @param tableConfig Table config where partition fields are configured
+   */
+  public static List<PartitionKeyType> getPartitionTypes(HoodieTableConfig 
tableConfig) {
+    List<String> partitionPathFields = 
HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(Collections.emptyList());
     if (partitionPathFields.size() == 1 && 
partitionPathFields.get(0).isEmpty()) {
       return Collections.emptyList(); // Corresponds to no partition case
     } else {
-      return partitionPathFields.stream().map(field -> {
-        Pair<String, CustomAvroKeyGenerator.PartitionKeyType> partitionAndType 
= getPartitionFieldAndKeyType(field);
-        return partitionAndType.getRight();
-      }).collect(Collectors.toList());
+      return partitionPathFields.stream().map(field -> 
getPartitionFieldAndKeyType(field).getRight())
+          .filter(Option::isPresent)
+          .map(Option::get)
+          .collect(Collectors.toList());
     }
   }
 
-  public static List<String> getTimestampFields(List<String> 
partitionPathFields) {
-    if (partitionPathFields.size() == 1 && 
partitionPathFields.get(0).isEmpty()) {
-      return Collections.emptyList(); // Corresponds to no partition case
+  /**
+   * Returns the partition fields with timestamp partition type.
+   *
+   * @param tableConfig Table config where partition fields are configured
+   * @return Optional list of partition fields with timestamp partition type
+   */
+  public static Option<List<String>> getTimestampFields(HoodieTableConfig 
tableConfig) {
+    List<String> partitionPathFields = 
HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(Collections.emptyList());
+    if (partitionPathFields.isEmpty() || (partitionPathFields.size() == 1 && 
partitionPathFields.get(0).isEmpty())) {
+      return Option.of(Collections.emptyList()); // Corresponds to no 
partition case
+    } else if 
(getPartitionFieldAndKeyType(partitionPathFields.get(0)).getRight().isEmpty()) {
+      // Partition type is not configured for the partition fields therefore 
timestamp partition fields
+      // can not be determined
+      return Option.empty();
     } else {
-      return partitionPathFields.stream()
+      return Option.of(partitionPathFields.stream()
           .map(CustomAvroKeyGenerator::getPartitionFieldAndKeyType)
-          .filter(fieldAndKeyType -> 
fieldAndKeyType.getRight().equals(PartitionKeyType.TIMESTAMP))
+          .filter(fieldAndKeyType -> fieldAndKeyType.getRight().isPresent() && 
fieldAndKeyType.getRight().get().equals(PartitionKeyType.TIMESTAMP))
           .map(Pair::getLeft)
-          .collect(Collectors.toList());
+          .collect(Collectors.toList()));
     }
   }
 
-  public static Pair<String, PartitionKeyType> 
getPartitionFieldAndKeyType(String field) {
+  public static Pair<String, Option<PartitionKeyType>> 
getPartitionFieldAndKeyType(String field) {
     String[] fieldWithType = 
field.split(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX);
-    if (fieldWithType.length != 2) {
-      throw new HoodieKeyException("Unable to find field names for partition 
path in proper format");
+    if (fieldWithType.length == 2) {
+      return Pair.of(fieldWithType[0], 
Option.of(PartitionKeyType.valueOf(fieldWithType[1].toUpperCase())));
+    } else {
+      return Pair.of(fieldWithType[0], Option.empty());
     }
-    return Pair.of(fieldWithType[0], 
PartitionKeyType.valueOf(fieldWithType[1].toUpperCase()));
   }
 
   @Override
@@ -158,6 +180,11 @@ public class CustomAvroKeyGenerator extends 
BaseKeyGenerator {
     }
   }
 
+  static HoodieKeyGeneratorException getPartitionPathFormatException() {
+    return new HoodieKeyGeneratorException("Unable to find field names for 
partition path in proper format. "
+        + "Please specify the partition field names in format 
`field1:type1,field2:type2`. Example: `city:simple,ts:timestamp`");
+  }
+
   public String getDefaultPartitionPathSeparator() {
     return DEFAULT_PARTITION_PATH_SEPARATOR;
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index f5d891f04a9..43a30079f65 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -37,6 +37,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPathFormatException;
+
 /**
  * This is a generic implementation of KeyGenerator where users can configure 
record key as a single field or a combination of fields. Similarly partition 
path can be configured to have multiple
  * fields or only one field. This class expects value for prop 
"hoodie.datasource.write.partitionpath.field" in a specific format. For example:
@@ -85,7 +87,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
       return partitionPathFields.stream().map(field -> {
         String[] fieldWithType = field.split(CUSTOM_KEY_GENERATOR_SPLIT_REGEX);
         if (fieldWithType.length != 2) {
-          throw new HoodieKeyGeneratorException("Unable to find field names 
for partition path in proper format");
+          throw getPartitionPathFormatException();
         }
         String partitionPathField = fieldWithType[0];
         CustomAvroKeyGenerator.PartitionKeyType keyType = 
CustomAvroKeyGenerator.PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
index 304c2cfd9a5..26cdd5eb05c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util
 import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
 import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, 
CustomKeyGenerator, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.hudi.util.JFunction
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.{DataType, StringType, StructField, 
StructType}
@@ -67,17 +68,20 @@ trait SparkParsePartitionUtil extends Serializable with 
Logging {
 
     def getPartitionStructFields(keyGeneratorPartitionFieldsOpt: 
util.Option[String], keyGeneratorClassName: String) = {
       val partitionFields: Array[StructField] = if 
(keyGeneratorPartitionFieldsOpt.isPresent
+        && 
keyGeneratorPartitionFieldsOpt.get().contains(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX)
         && 
(classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
         || 
classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)))
 {
         val keyGeneratorPartitionFields = 
keyGeneratorPartitionFieldsOpt.get().split(BaseKeyGenerator.FIELD_SEPARATOR)
         keyGeneratorPartitionFields.map(field => 
CustomAvroKeyGenerator.getPartitionFieldAndKeyType(field))
           .map(pair => {
             val partitionField = pair.getLeft
-            val partitionKeyType = pair.getRight
-            partitionKeyType match {
-              case PartitionKeyType.SIMPLE => 
nameFieldMap.getOrElse(partitionField, null)
-              case PartitionKeyType.TIMESTAMP => if 
(shouldUseStringTypeForTimestampPartitionKeyType) StructField(partitionField, 
StringType) else nameFieldMap.getOrElse(partitionField, null)
-            }
+            val partitionKeyTypeOpt = pair.getRight
+            partitionKeyTypeOpt.map[StructField] {
+              JFunction.toJavaFunction {
+                case PartitionKeyType.SIMPLE => 
nameFieldMap.getOrElse(partitionField, null)
+                case PartitionKeyType.TIMESTAMP => if 
(shouldUseStringTypeForTimestampPartitionKeyType) StructField(partitionField, 
StringType) else nameFieldMap.getOrElse(partitionField, null)
+              }
+            }.orElse(nameFieldMap.getOrElse(partitionField, null))
           })
           .filter(structField => structField != null)
           .array
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index e0828a0431e..f6fc148c39d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.clustering;
 import org.apache.hudi.async.HoodieAsyncTableService;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.ClusteringUtils;
@@ -185,7 +186,7 @@ public class HoodieFlinkClusteringJob {
       conf.setString(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
 
       // set partition field
-      conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
metaClient.getTableConfig().getPartitionFieldProp());
+      conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig()).orElse(""));
 
       // set table schema
       CompactionUtil.setAvroSchema(conf, metaClient);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
index 0dd75cf2325..1f535e70cf1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
@@ -17,7 +17,7 @@
 
 package org.apache.hudi
 
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.FileSlice
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
@@ -28,7 +28,6 @@ import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.index.bucket.BucketIdentifier
 import org.apache.hudi.keygen.KeyGenerator
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-
 import org.apache.avro.generic.GenericData
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions
@@ -46,8 +45,11 @@ class BucketIndexSupport(spark: SparkSession,
 
   private val log = LoggerFactory.getLogger(getClass)
 
-  private val keyGenerator =
-    HoodieSparkKeyGeneratorFactory.createKeyGenerator(metadataConfig.getProps)
+  private val keyGenerator = {
+    val props = new TypedProperties(metadataConfig.getProps())
+    TypedProperties.putAll(props, metaClient.getTableConfig.getProps)
+    HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
+  }
 
   private lazy val avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema(false)
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index ef9ccc9aebc..5239ec9d6b9 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -1011,8 +1011,8 @@ object DataSourceOptionsHelper {
     if (!params.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) && 
tableConfig.getRawRecordKeyFieldProp != null) {
       missingWriteConfigs ++= 
Map(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> 
tableConfig.getRawRecordKeyFieldProp)
     }
-    if (!params.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) 
&& tableConfig.getPartitionFieldProp != null) {
-      missingWriteConfigs ++= 
Map(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> 
tableConfig.getPartitionFieldProp)
+    if (!params.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) 
&& 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).isPresent) {
+      missingWriteConfigs ++= 
Map(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).get())
     }
     if (!params.contains(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key()) 
&& tableConfig.getKeyGeneratorClassName != null) {
       missingWriteConfigs ++= 
Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> 
tableConfig.getKeyGeneratorClassName)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index ccca13256f9..5ea4d460714 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -526,7 +526,7 @@ object HoodieFileIndex extends Logging {
     if (tableConfig != null) {
       properties.setProperty(RECORDKEY_FIELD.key, 
tableConfig.getRecordKeyFields.orElse(Array.empty).mkString(","))
       properties.setProperty(PRECOMBINE_FIELD.key, 
Option(tableConfig.getPreCombineField).getOrElse(""))
-      properties.setProperty(PARTITIONPATH_FIELD.key, 
tableConfig.getPartitionFields.orElse(Array.apply("")).mkString(","))
+      properties.setProperty(PARTITIONPATH_FIELD.key, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).orElse(""))
     }
 
     properties
@@ -625,8 +625,7 @@ object HoodieFileIndex extends Logging {
       Set(0)
     } else if 
(keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM.getClassName)
       || 
keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM_AVRO.getClassName)) {
-      val partitionFields = 
HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(java.util.Collections.emptyList[String]())
-      val partitionTypes = 
CustomAvroKeyGenerator.getPartitionTypes(partitionFields)
+      val partitionTypes = 
CustomAvroKeyGenerator.getPartitionTypes(tableConfig)
       var partitionIndexes: Set[Int] = Set.empty
       for (i <- 0 until partitionTypes.size()) {
         if (partitionTypes.get(i).equals(PartitionKeyType.TIMESTAMP)) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index e6197856b41..65b9dbf0d09 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -94,7 +94,14 @@ abstract class HoodieBaseHadoopFsRelationFactory(val 
sqlContext: SQLContext,
     tableConfig.getPartitionFields.orElse(Array.empty).toSeq
   } else {
     //it's custom keygen
-    
CustomAvroKeyGenerator.getTimestampFields(HoodieTableConfig.getPartitionFieldsForKeyGenerator(tableConfig).orElse(java.util.Collections.emptyList[String]())).asScala.toSeq
+    val timestampFieldsOpt = 
CustomAvroKeyGenerator.getTimestampFields(tableConfig)
+    if (timestampFieldsOpt.isPresent) {
+      timestampFieldsOpt.get().asScala.toSeq
+    } else {
+      // timestamp fields above are determined using partition type
+      // For older tables the partition type may not be available so falling 
back to partition fields in those cases
+      tableConfig.getPartitionFields.orElse(Array.empty).toSeq
+    }
   }
 
   protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: 
Option[InternalSchema]) = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index 52eb36bca33..bdf8911695f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -20,6 +20,8 @@ package org.apache.hudi.keygen;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -37,8 +39,6 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.Arrays;
-import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -175,11 +175,31 @@ public class TestCustomKeyGenerator extends 
KeyGeneratorTestUtilities {
 
   @Test
   public void testCustomKeyGeneratorPartitionType() {
-    List<String> partitionFields = Arrays.asList("random:simple", 
"ts_ms:timestamp");
-    Object[] partitionTypes = 
CustomAvroKeyGenerator.getPartitionTypes(partitionFields).toArray();
+    HoodieTableConfig tableConfig = new HoodieTableConfig();
+    tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS.key(), 
"random:simple,ts_ms:timestamp");
+    Object[] partitionTypes = 
CustomAvroKeyGenerator.getPartitionTypes(tableConfig).toArray();
     assertArrayEquals(new CustomAvroKeyGenerator.PartitionKeyType[] 
{CustomAvroKeyGenerator.PartitionKeyType.SIMPLE, 
CustomAvroKeyGenerator.PartitionKeyType.TIMESTAMP}, partitionTypes);
-    Pair<String, CustomAvroKeyGenerator.PartitionKeyType> 
partitionFieldAndType = 
CustomAvroKeyGenerator.getPartitionFieldAndKeyType("random:simple");
-    assertEquals(Pair.of("random", 
CustomAvroKeyGenerator.PartitionKeyType.SIMPLE), partitionFieldAndType);
+    Pair<String, Option<CustomAvroKeyGenerator.PartitionKeyType>> 
partitionFieldAndType = 
CustomAvroKeyGenerator.getPartitionFieldAndKeyType("random:simple");
+    assertEquals(Pair.of("random", 
Option.of(CustomAvroKeyGenerator.PartitionKeyType.SIMPLE)), 
partitionFieldAndType);
+  }
+
+  @Test
+  public void testCustomKeyGeneratorTimestampFieldsAPI() {
+    HoodieTableConfig tableConfig = new HoodieTableConfig();
+    tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS.key(), 
"simple1:simple,ts1:timestamp,ts2:timestamp");
+    Object[] timestampFields = 
CustomAvroKeyGenerator.getTimestampFields(tableConfig).get().toArray();
+    // Only two timestamp fields are returned
+    assertArrayEquals(new String[] {"ts1", "ts2"}, timestampFields);
+
+    tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS.key(), 
"simple1,ts1,ts2");
+    Option<?> timestampFieldsOpt = 
CustomAvroKeyGenerator.getTimestampFields(tableConfig);
+    // Empty option is returned since no partition type is available
+    assertTrue(timestampFieldsOpt.isEmpty());
+
+    tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS.key(), 
"simple1:simple,simple2:simple,simple3:simple");
+    timestampFields = 
CustomAvroKeyGenerator.getTimestampFields(tableConfig).get().toArray();
+    // No timestamp partitions
+    assertEquals(0, timestampFields.length);
   }
 
   public void testTimestampBasedKeyGenerator(TypedProperties props) throws 
IOException {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index fc52ff685f2..ccc322b79ff 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -841,6 +841,12 @@ class TestHoodieFileIndex extends 
HoodieSparkClientTestBase with ScalaAssertionS
     assertEquals(expectedPartitionSchema, 
SparkAdapterSupport.sparkAdapter.getSparkParsePartitionUtil.getPartitionSchema(tableConfig,
       schema, shouldUseStringTypeForTimestampPartitionKeyType = false))
 
+    tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS, "f1,f2")
+    expectedPartitionSchema = StructType.apply(List(fields(0), fields(1)))
+    // With custom key generator handling, timestamp partition field f2 would 
have input schema type with old partition format
+    assertEquals(expectedPartitionSchema, 
SparkAdapterSupport.sparkAdapter.getSparkParsePartitionUtil.getPartitionSchema(tableConfig,
+      schema, shouldUseStringTypeForTimestampPartitionKeyType = true))
+
     tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_TYPE, 
KeyGeneratorType.COMPLEX.name())
     tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS, "f1,f2")
     // With other key generators, timestamp partition field f2 would have 
input schema type
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
index ab9ae27c3fd..46da43264fc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala
@@ -23,7 +23,7 @@ import org.apache.avro.Schema
 import 
org.apache.hudi.DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.table.TableSchemaResolver
+import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver}
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.functional.TestSparkSqlWithCustomKeyGenerator._
@@ -52,22 +52,27 @@ class TestSparkSqlWithCustomKeyGenerator extends 
HoodieSparkSqlTestBase {
               "(ts=202401, segment='cat2')", "202401/cat2",
               Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", 
"202402/cat1", "202402/cat3", "202402/cat5"),
               TS_FORMATTER_FUNC,
-              (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + 
"/" + segment),
+              (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + 
"/" + segment, false),
             Seq("MERGE_ON_READ", "segment:simple",
               "(segment='cat3')", "cat3",
               Seq("cat1", "cat2", "cat4", "cat5"),
               TS_TO_STRING_FUNC,
-              (_: Integer, segment: String) => segment),
+              (_: Integer, segment: String) => segment, false),
             Seq("MERGE_ON_READ", "ts:timestamp",
               "(ts=202312)", "202312",
               Seq("202401", "202402"),
               TS_FORMATTER_FUNC,
-              (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts)),
+              (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts), false),
             Seq("MERGE_ON_READ", "ts:timestamp,segment:simple",
               "(ts=202401, segment='cat2')", "202401/cat2",
               Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", 
"202402/cat1", "202402/cat3", "202402/cat5"),
               TS_FORMATTER_FUNC,
-              (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + 
"/" + segment)
+              (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + 
"/" + segment, false),
+            Seq("MERGE_ON_READ", "ts:timestamp,segment:simple",
+              "(ts=202401, segment='cat2')", "202401/cat2",
+              Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", 
"202402/cat1", "202402/cat3", "202402/cat5"),
+              TS_FORMATTER_FUNC,
+              (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + 
"/" + segment, true)
           ).foreach { testParams =>
             withTable(generateTableName) { tableName =>
               LOG.warn("Testing with parameters: " + testParams)
@@ -89,11 +94,21 @@ class TestSparkSqlWithCustomKeyGenerator extends 
HoodieSparkSqlTestBase {
               } else {
                 ""
               }
+              val useOlderPartitionFieldFormat = 
testParams(7).asInstanceOf[Boolean]
 
               prepareTableWithKeyGenerator(
                 tableName, tablePath, tableType,
                 CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, 
timestampKeyGeneratorConfig)
 
+              if (useOlderPartitionFieldFormat) {
+                var metaClient = createMetaClient(spark, tablePath)
+                val props = new TypedProperties()
+                props.put(HoodieTableConfig.PARTITION_FIELDS.key(), 
metaClient.getTableConfig.getPartitionFieldProp)
+                HoodieTableConfig.update(metaClient.getStorage, 
metaClient.getMetaPath, props)
+                metaClient = createMetaClient(spark, tablePath)
+                assertEquals(metaClient.getTableConfig.getPartitionFieldProp, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse(""))
+              }
+
               // SQL CTAS with table properties containing key generator write 
configs
               createTableWithSql(tableName, tablePath,
                 s"hoodie.datasource.write.partitionpath.field = 
'$writePartitionFields'" + timestampKeyGenProps)
@@ -244,6 +259,10 @@ class TestSparkSqlWithCustomKeyGenerator extends 
HoodieSparkSqlTestBase {
 
               // Validate ts field is still of type int in the table
               validateTsFieldSchema(tablePath, "ts", Schema.Type.INT)
+              if (useOlderPartitionFieldFormat) {
+                val metaClient = createMetaClient(spark, tablePath)
+                assertEquals(metaClient.getTableConfig.getPartitionFieldProp, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse(""))
+              }
             }
           }
         }

Reply via email to