This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cb14ca727583 refactor(core): Unify record key/index key splitting and 
extraction (#18842)
cb14ca727583 is described below

commit cb14ca7275833e9e76f1c3116ba96cf31af6f8e9
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jun 4 17:03:18 2026 +0800

    refactor(core): Unify record key/index key splitting and extraction (#18842)
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  | 11 +++++----
 .../apache/hudi/index/bucket/BucketIdentifier.java |  2 +-
 .../hudi/index/bucket/HoodieBucketIndex.java       |  4 ++--
 .../apache/hudi/keygen/CustomAvroKeyGenerator.java |  6 +----
 .../java/org/apache/hudi/keygen/KeyGenUtils.java   | 26 +++++++++++++---------
 .../table/BucketIndexBulkInsertPartitioner.java    |  4 ++--
 .../hudi/client/TestBaseHoodieWriteClient.java     |  5 +++--
 .../apache/hudi/config/TestHoodieWriteConfig.java  | 20 +++++++++++++++++
 .../org/apache/hudi/keygen/TestKeyGenUtils.java    | 21 +++++++++++++++++
 ...arkConsistentBucketDuplicateUpdateStrategy.java |  4 ++--
 .../org/apache/hudi/keygen/CustomKeyGenerator.java |  8 +------
 .../apache/hudi/configuration/OptionsResolver.java | 13 ++++-------
 .../bucket/ConsistentBucketAssignFunction.java     |  2 +-
 .../ConsistentBucketStreamWriteFunction.java       |  4 ++--
 .../org/apache/hudi/table/HoodieTableSource.java   |  2 +-
 .../hudi/configuration/TestOptionsResolver.java    |  4 ++--
 .../scala/org/apache/hudi/BucketIndexSupport.scala |  4 ++--
 .../hudi/command/DeleteHoodieTableCommand.scala    |  5 ++++-
 .../hudi/command/DeleteHoodieTableCommand.scala    |  5 ++++-
 19 files changed, 94 insertions(+), 56 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 9cedac1be74d..1d2c6bb2b426 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -29,6 +29,7 @@ import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.bucket.partition.PartitionBucketIndexRule;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import lombok.Getter;
@@ -39,9 +40,8 @@ import javax.annotation.concurrent.Immutable;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES;
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
@@ -777,10 +777,9 @@ public class HoodieIndexConfig extends HoodieConfig {
           hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD,
               
hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
         } else {
-          boolean valid = Arrays
-              
.stream(hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(","))
-              .collect(Collectors.toSet())
-              
.containsAll(Arrays.asList(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD).split(",")));
+          List<String> recordKeyFields = 
KeyGenUtils.getRecordKeyFields(hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
+          List<String> indexKeyFields = 
KeyGenUtils.getIndexKeyFields(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD));
+          boolean valid = recordKeyFields.containsAll(indexKeyFields);
           if (!valid) {
             throw new HoodieIndexException("Bucket index key (if configured) 
must be subset of record key.");
           }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index eed3ab39599c..2bde3aec815b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -42,7 +42,7 @@ public class BucketIdentifier implements Serializable {
   }
 
   protected static List<String> getHashKeys(String recordKey, String 
indexKeyFields) {
-    return getHashKeysUsingIndexFields(recordKey, 
Arrays.asList(indexKeyFields.split(",")));
+    return getHashKeysUsingIndexFields(recordKey, 
KeyGenUtils.getIndexKeyFields(indexKeyFields));
   }
 
   protected static List<String> getHashKeys(String recordKey, List<String> 
indexKeyFields) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
index 38c7cb5319a3..7b9c28b46216 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
@@ -29,13 +29,13 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.table.HoodieTable;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -57,7 +57,7 @@ public abstract class HoodieBucketIndex extends 
HoodieIndex<Object, Object> {
     super(config);
 
     this.numBuckets = config.getBucketIndexNumBuckets();
-    this.indexKeyFields = 
Arrays.asList(config.getBucketIndexHashField().split(","));
+    this.indexKeyFields = 
KeyGenUtils.getIndexKeyFields(config.getBucketIndexHashField());
     log.info("Use bucket index, numBuckets = " + numBuckets + ", indexFields: 
" + indexKeyFields);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index beeb4947acd4..6ab6378367ae 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -62,11 +62,7 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator 
{
 
   public CustomAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null))
-        .map(recordKeyConfigValue ->
-            Arrays.stream(recordKeyConfigValue.split(FIELD_SEPARATOR))
-                .map(String::trim).collect(Collectors.toList())
-        ).orElse(Collections.emptyList());
+    this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props);
     this.partitionPathFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELD_SEPARATOR)).map(String::trim).collect(Collectors.toList());
     this.recordKeyGenerator = getRecordKeyFieldNames().size() == 1 ? new 
SimpleAvroKeyGenerator(config) : new ComplexAvroKeyGenerator(config);
     this.partitionKeyGenerators = 
getPartitionKeyGenerators(this.partitionPathFields, config);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 292f261c2223..81faa0cdb2ba 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -39,11 +39,9 @@ import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.BiFunction;
-import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.config.HoodieWriteConfig.COMPLEX_KEYGEN_NEW_ENCODING;
 import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
@@ -72,7 +70,7 @@ public class KeyGenUtils {
    */
   public static KeyGeneratorType inferKeyGeneratorType(
       Option<String> recordsKeyFields, String partitionFields) {
-    int numRecordKeyFields = recordsKeyFields.map(fields -> 
fields.split(",").length).orElse(0);
+    int numRecordKeyFields = recordsKeyFields.map(keyStr -> 
getRecordKeyFields(keyStr).size()).orElse(0);
     KeyGeneratorType partitionKeyGeneratorType = 
inferKeyGeneratorTypeFromPartitionFields(partitionFields);
     if (numRecordKeyFields <= 1) {
       return partitionKeyGeneratorType;
@@ -331,13 +329,21 @@ public class KeyGenUtils {
   }
 
   public static List<String> getRecordKeyFields(TypedProperties props) {
-    return 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null))
-        .map(recordKeyConfigValue ->
-            Arrays.stream(recordKeyConfigValue.split(","))
-                .map(String::trim)
-                .filter(s -> !s.isEmpty())
-                .collect(Collectors.toList())
-        ).orElse(Collections.emptyList());
+    return 
getRecordKeyFields(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null));
+  }
+
+  public static List<String> getRecordKeyFields(String recordKeys) {
+    return getKeyFields(recordKeys);
+  }
+
+  public static List<String> getIndexKeyFields(String indexKeys) {
+    return getKeyFields(indexKeys);
+  }
+
+  private static List<String> getKeyFields(String keys) {
+    return Option.ofNullable(keys)
+        .map(value -> StringUtils.split(value, ","))
+        .orElse(Collections.emptyList());
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
index 7d8d7c6900ed..5fd3909f0562 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
@@ -25,12 +25,12 @@ import 
org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.io.AppendHandleFactory;
 import org.apache.hudi.io.SingleFileHandleCreateFactory;
 import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.keygen.KeyGenUtils;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -50,7 +50,7 @@ public abstract class BucketIndexBulkInsertPartitioner<T> 
extends BucketSortBulk
 
   public BucketIndexBulkInsertPartitioner(HoodieTable table, String 
sortString, boolean preserveHoodieMetadata) {
     super(table, sortString);
-    this.indexKeyFields = 
Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
+    this.indexKeyFields = 
KeyGenUtils.getIndexKeyFields(table.getConfig().getBucketIndexHashField());
     this.consistentLogicalTimestampEnabled = 
table.getConfig().isConsistentLogicalTimestampEnabled();
     this.preserveHoodieMetadata = preserveHoodieMetadata;
     // Multiple bulk inserts into COW using `BucketIndexBulkInsertPartitioner` 
is restricted, otherwise AppendHandleFactory will produce MOR log files
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
index 0bbe624eafa5..983abbae3038 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
@@ -46,6 +46,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.simple.HoodieSimpleIndex;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
@@ -242,7 +243,7 @@ class TestBaseHoodieWriteClient extends 
HoodieCommonTestHarness {
     if (tableVersion <= 8 && enableComplexKeyGeneratorValidation
         && 
(ComplexAvroKeyGenerator.class.getCanonicalName().equals(keyGeneratorClass)
         || 
"org.apache.hudi.keygen.ComplexKeyGenerator".equals(keyGeneratorClass))
-        && recordKeyFields.split(",").length == 1) {
+        && KeyGenUtils.getRecordKeyFields(recordKeyFields).size() == 1) {
       assertComplexKeyGeneratorValidationThrows(() -> 
writeClient.initTable(WriteOperationType.INSERT, Option.empty()), "ingestion");
     } else {
       writeClient.initTable(WriteOperationType.INSERT, Option.empty());
@@ -399,4 +400,4 @@ class TestBaseHoodieWriteClient extends 
HoodieCommonTestHarness {
 
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 6c1d1418b190..044e41ed89d2 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig.Builder;
+import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
@@ -619,6 +620,25 @@ public class TestHoodieWriteConfig {
     assertEquals("org.apache.hudi.table.action.commit.UpsertPartitioner", 
overwritePartitioner.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME));
   }
 
+  @Test
+  public void testBucketIndexKeyFieldValidationTrimsFields() {
+    Properties props = new Properties();
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid, 
name");
+
+    HoodieIndexConfig indexConfig = HoodieIndexConfig.newBuilder()
+        .fromProperties(props)
+        .withIndexType(HoodieIndex.IndexType.BUCKET)
+        .withIndexKeyField(" name")
+        .build();
+    assertEquals(" name", 
indexConfig.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD));
+
+    assertThrows(HoodieIndexException.class, () -> 
HoodieIndexConfig.newBuilder()
+        .fromProperties(props)
+        .withIndexType(HoodieIndex.IndexType.BUCKET)
+        .withIndexKeyField("missing")
+        .build());
+  }
+
   @Test
   void testBloomIndexFileIdKeySortingConfig() {
     Properties props = new Properties();
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
index 777c4792901f..cd837c0608e8 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 
 import org.apache.avro.Schema;
@@ -31,6 +33,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.hudi.common.table.HoodieTableConfig.KEY_GENERATOR_TYPE;
@@ -121,6 +124,24 @@ public class TestKeyGenUtils {
         KeyGenUtils.inferKeyGeneratorTypeFromPartitionFields(null));
   }
 
+  @Test
+  public void testGetRecordKeyFields() {
+    assertEquals(Collections.emptyList(), 
KeyGenUtils.getRecordKeyFields((String) null));
+    assertEquals(Collections.emptyList(), KeyGenUtils.getRecordKeyFields(""));
+    assertEquals(Arrays.asList("id", "ts", "name"), 
KeyGenUtils.getRecordKeyFields(" id,ts, name,, "));
+
+    TypedProperties props = new TypedProperties();
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), " id,ts 
");
+    assertEquals(Arrays.asList("id", "ts"), 
KeyGenUtils.getRecordKeyFields(props));
+  }
+
+  @Test
+  public void testGetIndexKeyFields() {
+    assertEquals(Collections.emptyList(), KeyGenUtils.getIndexKeyFields(null));
+    assertEquals(Collections.emptyList(), KeyGenUtils.getIndexKeyFields(""));
+    assertEquals(Arrays.asList("id", "ts", "name"), 
KeyGenUtils.getIndexKeyFields(" id,ts, name,, "));
+  }
+
   @Test
   public void testExtractRecordKeys() {
     // if for recordKey one column only is used, then there is no added column 
name before value
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
index 2d8247de92e7..eb4b308a668a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
@@ -28,11 +28,11 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
 import 
org.apache.hudi.table.action.cluster.util.ConsistentHashingUpdateStrategyUtils;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -74,7 +74,7 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T> 
extends UpdateStrat
         
ConsistentHashingUpdateStrategyUtils.constructPartitionToIdentifier(partitions, 
table);
 
     // Produce records tagged with new record location
-    List<String> indexKeyFields = 
Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
+    List<String> indexKeyFields = 
KeyGenUtils.getIndexKeyFields(table.getConfig().getBucketIndexHashField());
     HoodieData<HoodieRecord<T>> redirectedRecordsRDD = 
filteredRecordsRDD.map(r -> {
       Pair<String, ConsistentBucketIdentifier> identifierPair = 
partitionToIdentifier.get(r.getPartitionPath());
       ConsistentHashingNode node = 
identifierPair.getValue().getBucket(r.getKey(), indexKeyFields);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index 954af2000c1b..45f77ff9ce28 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -63,12 +63,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
     // NOTE: We have to strip partition-path configuration, since it could 
only be interpreted by
     //       this key-gen
     super(stripPartitionPathConfig(props));
-    this.recordKeyFields = 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null))
-        .map(recordKeyConfigValue ->
-            Arrays.stream(recordKeyConfigValue.split(","))
-                .map(String::trim)
-                .collect(Collectors.toList())
-        ).orElse(Collections.emptyList());
+    this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props);
     String partitionPathFields = 
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
     this.partitionPathFields = partitionPathFields == null
         ? Collections.emptyList()
@@ -167,4 +162,3 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator 
{
     return filtered;
   }
 }
-
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 74c5257cf44f..cc32351e62cf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -40,6 +40,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.sink.buffer.BufferMemoryType;
 import org.apache.hudi.sink.overwrite.PartitionOverwriteMode;
@@ -168,21 +169,15 @@ public class OptionsResolver {
    */
   public static String[] getRecordKeys(Configuration conf) {
     final String recordKeyStr = conf.get(FlinkOptions.RECORD_KEY_FIELD);
-    if (StringUtils.isNullOrEmpty(recordKeyStr)) {
-      return new String[]{};
-    }
-    return recordKeyStr.split(",");
+    return KeyGenUtils.getRecordKeyFields(recordKeyStr).toArray(new String[0]);
   }
 
   /**
    * Return the bucket index keys as an array.
    */
   public static String[] getBucketIndexKeys(Configuration conf) {
-    final String indexKeyStr = conf.get(FlinkOptions.INDEX_KEY_FIELD);
-    if (StringUtils.isNullOrEmpty(indexKeyStr)) {
-      return new String[]{};
-    }
-    return indexKeyStr.split(",");
+    final String indexKeyStr = getIndexKeyField(conf);
+    return KeyGenUtils.getIndexKeyFields(indexKeyStr).toArray(new String[0]);
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
index ec2fa104c58e..089224ac985d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
@@ -68,7 +68,7 @@ public class ConsistentBucketAssignFunction extends 
ProcessFunctionAdapter<Hoodi
 
   public ConsistentBucketAssignFunction(Configuration conf) {
     this.config = conf;
-    this.indexKeyFields = 
Arrays.asList(OptionsResolver.getIndexKeyField(conf).split(","));
+    this.indexKeyFields = 
Arrays.asList(OptionsResolver.getBucketIndexKeys(conf));
     this.bucketNum = conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
index 40fd32f2e9c6..91f1fd91e61a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.MappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.StreamWriteFunction;
 import org.apache.hudi.sink.buffer.RowDataBucket;
 import 
org.apache.hudi.sink.clustering.update.strategy.ConsistentBucketUpdateStrategy;
@@ -65,7 +65,7 @@ public class ConsistentBucketStreamWriteFunction extends 
StreamWriteFunction {
   @Override
   public void open(Configuration parameters) throws IOException {
     super.open(parameters);
-    List<String> indexKeyFields = 
Arrays.asList(config.get(FlinkOptions.INDEX_KEY_FIELD).split(","));
+    List<String> indexKeyFields = 
Arrays.asList(OptionsResolver.getBucketIndexKeys(config));
     this.updateStrategy = new ConsistentBucketUpdateStrategy(this.writeClient, 
indexKeyFields);
     log.info("Create update strategy with index key fields: {}", 
indexKeyFields);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 0c180404fcec..6aa845920855 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -497,7 +497,7 @@ public class HoodieTableSource extends FileIndexReader 
implements
     if (!OptionsResolver.isBucketIndexType(conf) || dataFilters.isEmpty()) {
       return Option.empty();
     }
-    Set<String> indexKeyFields = 
Arrays.stream(OptionsResolver.getIndexKeyField(conf).split(",")).collect(Collectors.toSet());
+    Set<String> indexKeyFields = 
Arrays.stream(OptionsResolver.getBucketIndexKeys(conf)).collect(Collectors.toSet());
     List<ResolvedExpression> indexKeyFilters = 
dataFilters.stream().filter(expr -> ExpressionUtils.isEqualsLitExpr(expr, 
indexKeyFields)).collect(Collectors.toList());
     if (!ExpressionUtils.isFilteringByAllFields(indexKeyFilters, 
indexKeyFields)) {
       return Option.empty();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
index f7266891e1de..c64018c3b79f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
@@ -84,7 +84,7 @@ public class TestOptionsResolver {
     assertArrayEquals(new String[]{}, OptionsResolver.getRecordKeys(conf));
 
     conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid, name");
-    assertArrayEquals(new String[]{"uuid", " name"}, 
OptionsResolver.getRecordKeys(conf));
+    assertArrayEquals(new String[]{"uuid", "name"}, 
OptionsResolver.getRecordKeys(conf));
   }
 
   @Test
@@ -96,7 +96,7 @@ public class TestOptionsResolver {
     assertArrayEquals(new String[]{}, 
OptionsResolver.getBucketIndexKeys(conf));
 
     conf.set(FlinkOptions.INDEX_KEY_FIELD, "uuid, name");
-    assertArrayEquals(new String[]{"uuid", " name"}, 
OptionsResolver.getBucketIndexKeys(conf));
+    assertArrayEquals(new String[]{"uuid", "name"}, 
OptionsResolver.getBucketIndexKeys(conf));
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
index 71c0ac4d379a..e9aee3ab78b4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.index.HoodieIndex
 import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.index.bucket.BucketIdentifier
 import org.apache.hudi.keygen.KeyGenerator
+import org.apache.hudi.keygen.KeyGenUtils
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 
 import org.apache.avro.generic.GenericData
@@ -212,7 +213,7 @@ class BucketIndexSupport(spark: SparkSession,
     if (bucketHashFields == null || bucketHashFields.isEmpty) {
       Option.apply(null)
     } else {
-      
Option.apply(JavaConverters.seqAsJavaListConverter(bucketHashFields.split(",")).asJava)
+      Option.apply(KeyGenUtils.getIndexKeyFields(bucketHashFields))
     }
   }
 
@@ -224,4 +225,3 @@ class BucketIndexSupport(spark: SparkSession,
 object BucketIndexSupport {
   val INDEX_NAME = "BUCKET"
 }
-
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index b03b6fec3b8f..bd4ca0c751e7 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command
 import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
 import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES, 
SPARK_SQL_WRITES_PREPPED_KEY}
 import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.keygen.KeyGenUtils
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.SparkSession
@@ -34,6 +35,8 @@ import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import 
org.apache.spark.sql.hudi.command.HoodieCommandMetrics.updateCommitMetrics
 import 
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
 
+import scala.collection.JavaConverters._
+
 case class DeleteHoodieTableCommand(catalogTable: HoodieCatalogTable, query: 
LogicalPlan, config: Map[String, String]) extends DataWritingCommand
   with SparkAdapterSupport
   with ProvidesHoodieConfig {
@@ -76,7 +79,7 @@ object DeleteHoodieTableCommand extends SparkAdapterSupport 
with ProvidesHoodieC
     }
 
     val recordKeysStr = 
config.getOrElse(HoodieTableConfig.RECORDKEY_FIELDS.key(), "")
-    val recordKeys = recordKeysStr.split(",").filter(_.nonEmpty)
+    val recordKeys = 
KeyGenUtils.getRecordKeyFields(recordKeysStr).asScala.toSeq
 
     // get all columns which are used in condition
     val conditionColumns = if (condition == null) {
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index e40cd2e4840b..b05d826fbb1b 100644
--- 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command
 import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
 import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES, 
SPARK_SQL_WRITES_PREPPED_KEY}
 import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.keygen.KeyGenUtils
 
 import org.apache.spark.sql
 import org.apache.spark.sql._
@@ -35,6 +36,8 @@ import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import 
org.apache.spark.sql.hudi.command.HoodieCommandMetrics.updateCommitMetrics
 import 
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
 
+import scala.collection.JavaConverters._
+
 case class DeleteHoodieTableCommand(catalogTable: HoodieCatalogTable, query: 
LogicalPlan, config: Map[String, String]) extends DataWritingCommand
   with SparkAdapterSupport
   with ProvidesHoodieConfig {
@@ -81,7 +84,7 @@ object DeleteHoodieTableCommand extends SparkAdapterSupport 
with ProvidesHoodieC
     }
 
     val recordKeysStr = 
config.getOrElse(HoodieTableConfig.RECORDKEY_FIELDS.key(), "")
-    val recordKeys = recordKeysStr.split(",").filter(_.nonEmpty)
+    val recordKeys = 
KeyGenUtils.getRecordKeyFields(recordKeysStr).asScala.toSeq
 
     // get all columns which are used in condition
     val conditionColumns = if (condition == null) {

Reply via email to