This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cb14ca727583 refactor(core): Unify record key/index key splitting and
extraction (#18842)
cb14ca727583 is described below
commit cb14ca7275833e9e76f1c3116ba96cf31af6f8e9
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jun 4 17:03:18 2026 +0800
refactor(core): Unify record key/index key splitting and extraction (#18842)
---
.../org/apache/hudi/config/HoodieIndexConfig.java | 11 +++++----
.../apache/hudi/index/bucket/BucketIdentifier.java | 2 +-
.../hudi/index/bucket/HoodieBucketIndex.java | 4 ++--
.../apache/hudi/keygen/CustomAvroKeyGenerator.java | 6 +----
.../java/org/apache/hudi/keygen/KeyGenUtils.java | 26 +++++++++++++---------
.../table/BucketIndexBulkInsertPartitioner.java | 4 ++--
.../hudi/client/TestBaseHoodieWriteClient.java | 5 +++--
.../apache/hudi/config/TestHoodieWriteConfig.java | 20 +++++++++++++++++
.../org/apache/hudi/keygen/TestKeyGenUtils.java | 21 +++++++++++++++++
...arkConsistentBucketDuplicateUpdateStrategy.java | 4 ++--
.../org/apache/hudi/keygen/CustomKeyGenerator.java | 8 +------
.../apache/hudi/configuration/OptionsResolver.java | 13 ++++-------
.../bucket/ConsistentBucketAssignFunction.java | 2 +-
.../ConsistentBucketStreamWriteFunction.java | 4 ++--
.../org/apache/hudi/table/HoodieTableSource.java | 2 +-
.../hudi/configuration/TestOptionsResolver.java | 4 ++--
.../scala/org/apache/hudi/BucketIndexSupport.scala | 4 ++--
.../hudi/command/DeleteHoodieTableCommand.scala | 5 ++++-
.../hudi/command/DeleteHoodieTableCommand.scala | 5 ++++-
19 files changed, 94 insertions(+), 56 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 9cedac1be74d..1d2c6bb2b426 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -29,6 +29,7 @@ import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bucket.partition.PartitionBucketIndexRule;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import lombok.Getter;
@@ -39,9 +40,8 @@ import javax.annotation.concurrent.Immutable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.List;
import java.util.Properties;
-import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES;
import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
@@ -777,10 +777,9 @@ public class HoodieIndexConfig extends HoodieConfig {
hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD,
hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
} else {
- boolean valid = Arrays
-
.stream(hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(","))
- .collect(Collectors.toSet())
-
.containsAll(Arrays.asList(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD).split(",")));
+ List<String> recordKeyFields =
KeyGenUtils.getRecordKeyFields(hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
+ List<String> indexKeyFields =
KeyGenUtils.getIndexKeyFields(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD));
+ boolean valid = recordKeyFields.containsAll(indexKeyFields);
if (!valid) {
throw new HoodieIndexException("Bucket index key (if configured)
must be subset of record key.");
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index eed3ab39599c..2bde3aec815b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -42,7 +42,7 @@ public class BucketIdentifier implements Serializable {
}
protected static List<String> getHashKeys(String recordKey, String
indexKeyFields) {
- return getHashKeysUsingIndexFields(recordKey,
Arrays.asList(indexKeyFields.split(",")));
+ return getHashKeysUsingIndexFields(recordKey,
KeyGenUtils.getIndexKeyFields(indexKeyFields));
}
protected static List<String> getHashKeys(String recordKey, List<String>
indexKeyFields) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
index 38c7cb5319a3..7b9c28b46216 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
@@ -29,13 +29,13 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -57,7 +57,7 @@ public abstract class HoodieBucketIndex extends
HoodieIndex<Object, Object> {
super(config);
this.numBuckets = config.getBucketIndexNumBuckets();
- this.indexKeyFields =
Arrays.asList(config.getBucketIndexHashField().split(","));
+ this.indexKeyFields =
KeyGenUtils.getIndexKeyFields(config.getBucketIndexHashField());
log.info("Use bucket index, numBuckets = " + numBuckets + ", indexFields:
" + indexKeyFields);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index beeb4947acd4..6ab6378367ae 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -62,11 +62,7 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator
{
public CustomAvroKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields =
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
null))
- .map(recordKeyConfigValue ->
- Arrays.stream(recordKeyConfigValue.split(FIELD_SEPARATOR))
- .map(String::trim).collect(Collectors.toList())
- ).orElse(Collections.emptyList());
+ this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props);
this.partitionPathFields =
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELD_SEPARATOR)).map(String::trim).collect(Collectors.toList());
this.recordKeyGenerator = getRecordKeyFieldNames().size() == 1 ? new
SimpleAvroKeyGenerator(config) : new ComplexAvroKeyGenerator(config);
this.partitionKeyGenerators =
getPartitionKeyGenerators(this.partitionPathFields, config);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 292f261c2223..81faa0cdb2ba 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -39,11 +39,9 @@ import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.BiFunction;
-import java.util.stream.Collectors;
import static
org.apache.hudi.config.HoodieWriteConfig.COMPLEX_KEYGEN_NEW_ENCODING;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
@@ -72,7 +70,7 @@ public class KeyGenUtils {
*/
public static KeyGeneratorType inferKeyGeneratorType(
Option<String> recordsKeyFields, String partitionFields) {
- int numRecordKeyFields = recordsKeyFields.map(fields ->
fields.split(",").length).orElse(0);
+ int numRecordKeyFields = recordsKeyFields.map(keyStr ->
getRecordKeyFields(keyStr).size()).orElse(0);
KeyGeneratorType partitionKeyGeneratorType =
inferKeyGeneratorTypeFromPartitionFields(partitionFields);
if (numRecordKeyFields <= 1) {
return partitionKeyGeneratorType;
@@ -331,13 +329,21 @@ public class KeyGenUtils {
}
public static List<String> getRecordKeyFields(TypedProperties props) {
- return
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
null))
- .map(recordKeyConfigValue ->
- Arrays.stream(recordKeyConfigValue.split(","))
- .map(String::trim)
- .filter(s -> !s.isEmpty())
- .collect(Collectors.toList())
- ).orElse(Collections.emptyList());
+ return
getRecordKeyFields(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
null));
+ }
+
+ public static List<String> getRecordKeyFields(String recordKeys) {
+ return getKeyFields(recordKeys);
+ }
+
+ public static List<String> getIndexKeyFields(String indexKeys) {
+ return getKeyFields(indexKeys);
+ }
+
+ private static List<String> getKeyFields(String keys) {
+ return Option.ofNullable(keys)
+ .map(value -> StringUtils.split(value, ","))
+ .orElse(Collections.emptyList());
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
index 7d8d7c6900ed..5fd3909f0562 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
@@ -25,12 +25,12 @@ import
org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.io.AppendHandleFactory;
import org.apache.hudi.io.SingleFileHandleCreateFactory;
import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
/**
@@ -50,7 +50,7 @@ public abstract class BucketIndexBulkInsertPartitioner<T>
extends BucketSortBulk
public BucketIndexBulkInsertPartitioner(HoodieTable table, String
sortString, boolean preserveHoodieMetadata) {
super(table, sortString);
- this.indexKeyFields =
Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
+ this.indexKeyFields =
KeyGenUtils.getIndexKeyFields(table.getConfig().getBucketIndexHashField());
this.consistentLogicalTimestampEnabled =
table.getConfig().isConsistentLogicalTimestampEnabled();
this.preserveHoodieMetadata = preserveHoodieMetadata;
// Multiple bulk inserts into COW using `BucketIndexBulkInsertPartitioner`
is restricted, otherwise AppendHandleFactory will produce MOR log files
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
index 0bbe624eafa5..983abbae3038 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
@@ -46,6 +46,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.simple.HoodieSimpleIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
@@ -242,7 +243,7 @@ class TestBaseHoodieWriteClient extends
HoodieCommonTestHarness {
if (tableVersion <= 8 && enableComplexKeyGeneratorValidation
&&
(ComplexAvroKeyGenerator.class.getCanonicalName().equals(keyGeneratorClass)
||
"org.apache.hudi.keygen.ComplexKeyGenerator".equals(keyGeneratorClass))
- && recordKeyFields.split(",").length == 1) {
+ && KeyGenUtils.getRecordKeyFields(recordKeyFields).size() == 1) {
assertComplexKeyGeneratorValidationThrows(() ->
writeClient.initTable(WriteOperationType.INSERT, Option.empty()), "ingestion");
} else {
writeClient.initTable(WriteOperationType.INSERT, Option.empty());
@@ -399,4 +400,4 @@ class TestBaseHoodieWriteClient extends
HoodieCommonTestHarness {
}
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 6c1d1418b190..044e41ed89d2 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieWriteConfig.Builder;
+import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -619,6 +620,25 @@ public class TestHoodieWriteConfig {
assertEquals("org.apache.hudi.table.action.commit.UpsertPartitioner",
overwritePartitioner.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME));
}
+ @Test
+ public void testBucketIndexKeyFieldValidationTrimsFields() {
+ Properties props = new Properties();
+ props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid,
name");
+
+ HoodieIndexConfig indexConfig = HoodieIndexConfig.newBuilder()
+ .fromProperties(props)
+ .withIndexType(HoodieIndex.IndexType.BUCKET)
+ .withIndexKeyField(" name")
+ .build();
+ assertEquals(" name",
indexConfig.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD));
+
+ assertThrows(HoodieIndexException.class, () ->
HoodieIndexConfig.newBuilder()
+ .fromProperties(props)
+ .withIndexType(HoodieIndex.IndexType.BUCKET)
+ .withIndexKeyField("missing")
+ .build());
+ }
+
@Test
void testBloomIndexFileIdKeySortingConfig() {
Properties props = new Properties();
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
index 777c4792901f..cd837c0608e8 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
@@ -18,9 +18,11 @@
package org.apache.hudi.keygen;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.avro.Schema;
@@ -31,6 +33,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static
org.apache.hudi.common.table.HoodieTableConfig.KEY_GENERATOR_TYPE;
@@ -121,6 +124,24 @@ public class TestKeyGenUtils {
KeyGenUtils.inferKeyGeneratorTypeFromPartitionFields(null));
}
+ @Test
+ public void testGetRecordKeyFields() {
+ assertEquals(Collections.emptyList(),
KeyGenUtils.getRecordKeyFields((String) null));
+ assertEquals(Collections.emptyList(), KeyGenUtils.getRecordKeyFields(""));
+ assertEquals(Arrays.asList("id", "ts", "name"),
KeyGenUtils.getRecordKeyFields(" id,ts, name,, "));
+
+ TypedProperties props = new TypedProperties();
+ props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), " id,ts
");
+ assertEquals(Arrays.asList("id", "ts"),
KeyGenUtils.getRecordKeyFields(props));
+ }
+
+ @Test
+ public void testGetIndexKeyFields() {
+ assertEquals(Collections.emptyList(), KeyGenUtils.getIndexKeyFields(null));
+ assertEquals(Collections.emptyList(), KeyGenUtils.getIndexKeyFields(""));
+ assertEquals(Arrays.asList("id", "ts", "name"),
KeyGenUtils.getIndexKeyFields(" id,ts, name,, "));
+ }
+
@Test
public void testExtractRecordKeys() {
// if for recordKey one column only is used, then there is no added column
name before value
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
index 2d8247de92e7..eb4b308a668a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
@@ -28,11 +28,11 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import
org.apache.hudi.table.action.cluster.util.ConsistentHashingUpdateStrategyUtils;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -74,7 +74,7 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T>
extends UpdateStrat
ConsistentHashingUpdateStrategyUtils.constructPartitionToIdentifier(partitions,
table);
// Produce records tagged with new record location
- List<String> indexKeyFields =
Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
+ List<String> indexKeyFields =
KeyGenUtils.getIndexKeyFields(table.getConfig().getBucketIndexHashField());
HoodieData<HoodieRecord<T>> redirectedRecordsRDD =
filteredRecordsRDD.map(r -> {
Pair<String, ConsistentBucketIdentifier> identifierPair =
partitionToIdentifier.get(r.getPartitionPath());
ConsistentHashingNode node =
identifierPair.getValue().getBucket(r.getKey(), indexKeyFields);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index 954af2000c1b..45f77ff9ce28 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -63,12 +63,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
// NOTE: We have to strip partition-path configuration, since it could
only be interpreted by
// this key-gen
super(stripPartitionPathConfig(props));
- this.recordKeyFields =
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
null))
- .map(recordKeyConfigValue ->
- Arrays.stream(recordKeyConfigValue.split(","))
- .map(String::trim)
- .collect(Collectors.toList())
- ).orElse(Collections.emptyList());
+ this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props);
String partitionPathFields =
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
this.partitionPathFields = partitionPathFields == null
? Collections.emptyList()
@@ -167,4 +162,3 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator
{
return filtered;
}
}
-
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 74c5257cf44f..cc32351e62cf 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -40,6 +40,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.sink.buffer.BufferMemoryType;
import org.apache.hudi.sink.overwrite.PartitionOverwriteMode;
@@ -168,21 +169,15 @@ public class OptionsResolver {
*/
public static String[] getRecordKeys(Configuration conf) {
final String recordKeyStr = conf.get(FlinkOptions.RECORD_KEY_FIELD);
- if (StringUtils.isNullOrEmpty(recordKeyStr)) {
- return new String[]{};
- }
- return recordKeyStr.split(",");
+ return KeyGenUtils.getRecordKeyFields(recordKeyStr).toArray(new String[0]);
}
/**
* Return the bucket index keys as an array.
*/
public static String[] getBucketIndexKeys(Configuration conf) {
- final String indexKeyStr = conf.get(FlinkOptions.INDEX_KEY_FIELD);
- if (StringUtils.isNullOrEmpty(indexKeyStr)) {
- return new String[]{};
- }
- return indexKeyStr.split(",");
+ final String indexKeyStr = getIndexKeyField(conf);
+ return KeyGenUtils.getIndexKeyFields(indexKeyStr).toArray(new String[0]);
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
index ec2fa104c58e..089224ac985d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
@@ -68,7 +68,7 @@ public class ConsistentBucketAssignFunction extends
ProcessFunctionAdapter<Hoodi
public ConsistentBucketAssignFunction(Configuration conf) {
this.config = conf;
- this.indexKeyFields =
Arrays.asList(OptionsResolver.getIndexKeyField(conf).split(","));
+ this.indexKeyFields =
Arrays.asList(OptionsResolver.getBucketIndexKeys(conf));
this.bucketNum = conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
index 40fd32f2e9c6..91f1fd91e61a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.MappingIterator;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.buffer.RowDataBucket;
import
org.apache.hudi.sink.clustering.update.strategy.ConsistentBucketUpdateStrategy;
@@ -65,7 +65,7 @@ public class ConsistentBucketStreamWriteFunction extends
StreamWriteFunction {
@Override
public void open(Configuration parameters) throws IOException {
super.open(parameters);
- List<String> indexKeyFields =
Arrays.asList(config.get(FlinkOptions.INDEX_KEY_FIELD).split(","));
+ List<String> indexKeyFields =
Arrays.asList(OptionsResolver.getBucketIndexKeys(config));
this.updateStrategy = new ConsistentBucketUpdateStrategy(this.writeClient,
indexKeyFields);
log.info("Create update strategy with index key fields: {}",
indexKeyFields);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 0c180404fcec..6aa845920855 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -497,7 +497,7 @@ public class HoodieTableSource extends FileIndexReader
implements
if (!OptionsResolver.isBucketIndexType(conf) || dataFilters.isEmpty()) {
return Option.empty();
}
- Set<String> indexKeyFields =
Arrays.stream(OptionsResolver.getIndexKeyField(conf).split(",")).collect(Collectors.toSet());
+ Set<String> indexKeyFields =
Arrays.stream(OptionsResolver.getBucketIndexKeys(conf)).collect(Collectors.toSet());
List<ResolvedExpression> indexKeyFilters =
dataFilters.stream().filter(expr -> ExpressionUtils.isEqualsLitExpr(expr,
indexKeyFields)).collect(Collectors.toList());
if (!ExpressionUtils.isFilteringByAllFields(indexKeyFilters,
indexKeyFields)) {
return Option.empty();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
index f7266891e1de..c64018c3b79f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
@@ -84,7 +84,7 @@ public class TestOptionsResolver {
assertArrayEquals(new String[]{}, OptionsResolver.getRecordKeys(conf));
conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid, name");
- assertArrayEquals(new String[]{"uuid", " name"},
OptionsResolver.getRecordKeys(conf));
+ assertArrayEquals(new String[]{"uuid", "name"},
OptionsResolver.getRecordKeys(conf));
}
@Test
@@ -96,7 +96,7 @@ public class TestOptionsResolver {
assertArrayEquals(new String[]{},
OptionsResolver.getBucketIndexKeys(conf));
conf.set(FlinkOptions.INDEX_KEY_FIELD, "uuid, name");
- assertArrayEquals(new String[]{"uuid", " name"},
OptionsResolver.getBucketIndexKeys(conf));
+ assertArrayEquals(new String[]{"uuid", "name"},
OptionsResolver.getBucketIndexKeys(conf));
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
index 71c0ac4d379a..e9aee3ab78b4 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.index.bucket.BucketIdentifier
import org.apache.hudi.keygen.KeyGenerator
+import org.apache.hudi.keygen.KeyGenUtils
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.avro.generic.GenericData
@@ -212,7 +213,7 @@ class BucketIndexSupport(spark: SparkSession,
if (bucketHashFields == null || bucketHashFields.isEmpty) {
Option.apply(null)
} else {
-
Option.apply(JavaConverters.seqAsJavaListConverter(bucketHashFields.split(",")).asJava)
+ Option.apply(KeyGenUtils.getIndexKeyFields(bucketHashFields))
}
}
@@ -224,4 +225,3 @@ class BucketIndexSupport(spark: SparkSession,
object BucketIndexSupport {
val INDEX_NAME = "BUCKET"
}
-
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index b03b6fec3b8f..bd4ca0c751e7 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES,
SPARK_SQL_WRITES_PREPPED_KEY}
import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.keygen.KeyGenUtils
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession
@@ -34,6 +35,8 @@ import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import
org.apache.spark.sql.hudi.command.HoodieCommandMetrics.updateCommitMetrics
import
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
+import scala.collection.JavaConverters._
+
case class DeleteHoodieTableCommand(catalogTable: HoodieCatalogTable, query:
LogicalPlan, config: Map[String, String]) extends DataWritingCommand
with SparkAdapterSupport
with ProvidesHoodieConfig {
@@ -76,7 +79,7 @@ object DeleteHoodieTableCommand extends SparkAdapterSupport
with ProvidesHoodieC
}
val recordKeysStr =
config.getOrElse(HoodieTableConfig.RECORDKEY_FIELDS.key(), "")
- val recordKeys = recordKeysStr.split(",").filter(_.nonEmpty)
+ val recordKeys =
KeyGenUtils.getRecordKeyFields(recordKeysStr).asScala.toSeq
// get all columns which are used in condition
val conditionColumns = if (condition == null) {
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index e40cd2e4840b..b05d826fbb1b 100644
---
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES,
SPARK_SQL_WRITES_PREPPED_KEY}
import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.keygen.KeyGenUtils
import org.apache.spark.sql
import org.apache.spark.sql._
@@ -35,6 +36,8 @@ import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import
org.apache.spark.sql.hudi.command.HoodieCommandMetrics.updateCommitMetrics
import
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
+import scala.collection.JavaConverters._
+
case class DeleteHoodieTableCommand(catalogTable: HoodieCatalogTable, query:
LogicalPlan, config: Map[String, String]) extends DataWritingCommand
with SparkAdapterSupport
with ProvidesHoodieConfig {
@@ -81,7 +84,7 @@ object DeleteHoodieTableCommand extends SparkAdapterSupport
with ProvidesHoodieC
}
val recordKeysStr =
config.getOrElse(HoodieTableConfig.RECORDKEY_FIELDS.key(), "")
- val recordKeys = recordKeysStr.split(",").filter(_.nonEmpty)
+ val recordKeys =
KeyGenUtils.getRecordKeyFields(recordKeysStr).asScala.toSeq
// get all columns which are used in condition
val conditionColumns = if (condition == null) {